B4J Question How to Download an Image and Preview the Downloaded Parts

xulihang

Well-Known Member
Licensed User
Longtime User
I have an MFP printer that supports a scanning protocol called escl (also known as mopria: https://mopria.org/what-is-mopria ). We can use HTTP to let the printer start scanning and get the scanned document.

The thing is that I would like to preview the image content and cancel the scanning process if the part I want is already scanned. I wonder if it is possible to do this with B4J. We can emulate the process by downloading a normal image on the Internet: https://en.wikipedia.org/wiki/File:Whole_world_-_land_and_oceans_12000.jpg
 
Last edited:

xulihang

Well-Known Member
Licensed User
Longtime User
I ask AI to write a Python version first and then I need to translate it into B4X.

Python:
import argparse
import time
from io import BytesIO

import cv2
import numpy as np
from requests import get as requests_get, post as requests_post
from PIL import Image, ImageFile

# VERY IMPORTANT: allow partial JPEG decoding
ImageFile.LOAD_TRUNCATED_IMAGES = True


def scan_with_preview(scanner_address, output_path="scanned.jpg"):
    xml = '''<scan:ScanSettings xmlns:scan="http://schemas.hp.com/imaging/escl/2011/05/03"
        xmlns:pwg="http://www.pwg.org/schemas/2010/12/sm">
        <pwg:Version>2.1</pwg:Version>
        <scan:Intent>Photo</scan:Intent>
        <pwg:InputSource>Platen</pwg:InputSource>
        <scan:DocumentFormatExt>image/jpeg</scan:DocumentFormatExt>
        <scan:XResolution>300</scan:XResolution>
        <scan:YResolution>300</scan:YResolution>
        <scan:ColorMode>RGB24</scan:ColorMode>
    </scan:ScanSettings>'''

    # Create scan job
    resp = requests_post(
        f"http://{scanner_address}/eSCL/ScanJobs",
        data=xml,
        headers={"Content-Type": "text/xml"},
        timeout=10
    )

    if resp.status_code != 201:
        raise RuntimeError("Failed to create scan job")

    doc_url = resp.headers["Location"] + "/NextDocument"

    print("Scanning... streaming preview")
    print("Press 'q' in the preview window to close it (scanning will continue)")
    
    # 创建OpenCV窗口
    window_name = "Scanner Preview"
    cv2.namedWindow(window_name, cv2.WINDOW_NORMAL)
    cv2.resizeWindow(window_name, 800, 600)  # 设置初始窗口大小

    # Stream image data
    r = requests_get(doc_url, stream=True, timeout=60)

    buffer = BytesIO()
    last_display_time = 0
    display_interval = 0.1  # 每0.1秒更新一次显示

    for chunk in r.iter_content(chunk_size=65536):
        if not chunk:
            continue

        buffer.write(chunk)

        # 控制显示频率,避免更新太频繁
        current_time = time.time()
        if current_time - last_display_time >= display_interval:
            # Try to decode partial JPEG
            try:
                buffer.seek(0)
                pil_image = Image.open(buffer)
                pil_image.load()  # partial load

                # 将PIL图像转换为OpenCV格式
                if pil_image.mode == 'L':  # 灰度图像
                    cv_image = np.array(pil_image)
                elif pil_image.mode == 'RGB':  # RGB图像
                    cv_image = cv2.cvtColor(np.array(pil_image), cv2.COLOR_RGB2BGR)
                elif pil_image.mode == 'RGBA':  # RGBA图像
                    cv_image = cv2.cvtColor(np.array(pil_image), cv2.COLOR_RGBA2BGR)
                else:
                    # 转换为RGB再处理
                    pil_image = pil_image.convert('RGB')
                    cv_image = cv2.cvtColor(np.array(pil_image), cv2.COLOR_RGB2BGR)
                
                # 显示图像
                cv2.imshow(window_name, cv_image)
                
                # 检查是否有按键按下(等待1ms)
                key = cv2.waitKey(1) & 0xFF
                if key == ord('q'):  # 按q键关闭预览窗口(扫描会继续)
                    cv2.destroyWindow(window_name)
                    print("Preview window closed. Scanning continues...")
                
                last_display_time = current_time

            except Exception as e:
                # Normal during early scan
                # 这里可以取消注释来查看具体错误
                # print(f"Preview error (normal during scan): {e}")
                pass

    # 关闭所有OpenCV窗口
    cv2.destroyAllWindows()

    # Save final image
    with open(output_path, "wb") as f:
        f.write(buffer.getvalue())

    print(f"Scan completed. Saved to {output_path}")
    
    # 可选:显示最终结果
    try:
        final_image = Image.open(output_path)
        if final_image.mode == 'L':
            final_cv_image = np.array(final_image)
        else:
            final_pil_rgb = final_image.convert('RGB')
            final_cv_image = cv2.cvtColor(np.array(final_pil_rgb), cv2.COLOR_RGB2BGR)
        
        cv2.namedWindow("Final Scan Result", cv2.WINDOW_NORMAL)
        cv2.imshow("Final Scan Result", final_cv_image)
        cv2.resizeWindow("Final Scan Result", 800, 600)
        print("Press any key in the window to close the final result...")
        cv2.waitKey(0)
        cv2.destroyAllWindows()
    except Exception as e:
        print(f"Could not display final image: {e}")


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("-d", "--device", required=True,
                        help="Scanner address (e.g. HPxxxx.local)")
    parser.add_argument("-o", "--output", default="scanned.jpg")
    args = parser.parse_args()

    scan_with_preview(args.device, args.output)
 
Upvote 0

xulihang

Well-Known Member
Licensed User
Longtime User
1769062482321.png


Here is the sample project I worked out. I need to use the raw OkHttpClient.
 

Attachments

  • ESCL.zip
    3.5 KB · Views: 69
Upvote 0
Top