import argparse
import time
from io import BytesIO
import cv2
import numpy as np
from requests import get as requests_get, post as requests_post
from PIL import Image, ImageFile
# VERY IMPORTANT: allow partial JPEG decoding
ImageFile.LOAD_TRUNCATED_IMAGES = True
def scan_with_preview(scanner_address, output_path="scanned.jpg"):
xml = '''<scan:ScanSettings xmlns:scan="http://schemas.hp.com/imaging/escl/2011/05/03"
xmlns:pwg="http://www.pwg.org/schemas/2010/12/sm">
<pwg:Version>2.1</pwg:Version>
<scan:Intent>Photo</scan:Intent>
<pwg:InputSource>Platen</pwg:InputSource>
<scan:DocumentFormatExt>image/jpeg</scan:DocumentFormatExt>
<scan:XResolution>300</scan:XResolution>
<scan:YResolution>300</scan:YResolution>
<scan:ColorMode>RGB24</scan:ColorMode>
</scan:ScanSettings>'''
# Create scan job
resp = requests_post(
f"http://{scanner_address}/eSCL/ScanJobs",
data=xml,
headers={"Content-Type": "text/xml"},
timeout=10
)
if resp.status_code != 201:
raise RuntimeError("Failed to create scan job")
doc_url = resp.headers["Location"] + "/NextDocument"
print("Scanning... streaming preview")
print("Press 'q' in the preview window to close it (scanning will continue)")
# 创建OpenCV窗口
window_name = "Scanner Preview"
cv2.namedWindow(window_name, cv2.WINDOW_NORMAL)
cv2.resizeWindow(window_name, 800, 600) # 设置初始窗口大小
# Stream image data
r = requests_get(doc_url, stream=True, timeout=60)
buffer = BytesIO()
last_display_time = 0
display_interval = 0.1 # 每0.1秒更新一次显示
for chunk in r.iter_content(chunk_size=65536):
if not chunk:
continue
buffer.write(chunk)
# 控制显示频率,避免更新太频繁
current_time = time.time()
if current_time - last_display_time >= display_interval:
# Try to decode partial JPEG
try:
buffer.seek(0)
pil_image = Image.open(buffer)
pil_image.load() # partial load
# 将PIL图像转换为OpenCV格式
if pil_image.mode == 'L': # 灰度图像
cv_image = np.array(pil_image)
elif pil_image.mode == 'RGB': # RGB图像
cv_image = cv2.cvtColor(np.array(pil_image), cv2.COLOR_RGB2BGR)
elif pil_image.mode == 'RGBA': # RGBA图像
cv_image = cv2.cvtColor(np.array(pil_image), cv2.COLOR_RGBA2BGR)
else:
# 转换为RGB再处理
pil_image = pil_image.convert('RGB')
cv_image = cv2.cvtColor(np.array(pil_image), cv2.COLOR_RGB2BGR)
# 显示图像
cv2.imshow(window_name, cv_image)
# 检查是否有按键按下(等待1ms)
key = cv2.waitKey(1) & 0xFF
if key == ord('q'): # 按q键关闭预览窗口(扫描会继续)
cv2.destroyWindow(window_name)
print("Preview window closed. Scanning continues...")
last_display_time = current_time
except Exception as e:
# Normal during early scan
# 这里可以取消注释来查看具体错误
# print(f"Preview error (normal during scan): {e}")
pass
# 关闭所有OpenCV窗口
cv2.destroyAllWindows()
# Save final image
with open(output_path, "wb") as f:
f.write(buffer.getvalue())
print(f"Scan completed. Saved to {output_path}")
# 可选:显示最终结果
try:
final_image = Image.open(output_path)
if final_image.mode == 'L':
final_cv_image = np.array(final_image)
else:
final_pil_rgb = final_image.convert('RGB')
final_cv_image = cv2.cvtColor(np.array(final_pil_rgb), cv2.COLOR_RGB2BGR)
cv2.namedWindow("Final Scan Result", cv2.WINDOW_NORMAL)
cv2.imshow("Final Scan Result", final_cv_image)
cv2.resizeWindow("Final Scan Result", 800, 600)
print("Press any key in the window to close the final result...")
cv2.waitKey(0)
cv2.destroyAllWindows()
except Exception as e:
print(f"Could not display final image: {e}")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-d", "--device", required=True,
help="Scanner address (e.g. HPxxxx.local)")
parser.add_argument("-o", "--output", default="scanned.jpg")
args = parser.parse_args()
scan_with_preview(args.device, args.output)