跳转至

产线并行推理

指定多个推理设备

对于部分产线的 CLI 和 Python API,PaddleOCR 支持同时指定多个推理设备。如果指定了多个设备,产线初始化时将在每个设备上创建一个底层产线类对象的实例,并对接收到的输入进行并行推理。例如,对于文档图像预处理产线:

paddleocr doc_preprocessor \
  --input input_images/ \
  --device 'gpu:0,1,2,3' \
  --use_doc_orientation_classify True \
  --use_doc_unwarping True
  --save_path ./output \
from paddleocr import DocPreprocessor


pipeline = DocPreprocessor(device="gpu:0,1,2,3") 
output = pipeline.predict(    
    input="input_images/",
    use_doc_orientation_classify=True,
    use_doc_unwarping=True)

以上两个例子均使用 4 块 GPU(编号为 0、1、2、3)对 doc_test_rotated.jpg 图片进行并行推理。

指定多个设备时,推理接口仍然与指定单设备时保持一致。请查看产线使用教程以了解某一产线是否支持指定多个推理设备。

多进程并行推理示例

除了使用 PaddleOCR 内置的多设备并行推理功能外,用户也可以结合实际场景,通过封装 PaddleOCR 产线 API 调用来实现并行处理,从而获得更优的加速效果。如下是使用 Python 多进程实现多卡、多实例并行处理输入目录中的文件的示例代码:

import argparse
import sys
from multiprocessing import Manager, Process
from pathlib import Path
from queue import Empty

import paddleocr


def load_pipeline(class_name: str, device: str):
    if not hasattr(paddleocr, class_name):
        raise ValueError(f"Class {class_name} not found in paddleocr module.")
    cls = getattr(paddleocr, class_name)
    return cls(device=device)


def worker(pipeline_class_path, device, task_queue, batch_size, output_dir):
    pipeline = load_pipeline(pipeline_class_path, device)

    should_end = False
    batch = []

    while not should_end:
        try:
            input_path = task_queue.get_nowait()
        except Empty:
            should_end = True
        else:
            batch.append(input_path)

        if batch and (len(batch) == batch_size or should_end):
            try:
                for result in pipeline.predict(batch):
                    input_path = Path(result["input_path"])
                    if result.get("page_index") is not None:
                        output_path = f"{input_path.stem}_{result['page_index']}.json"
                    else:
                        output_path = f"{input_path.stem}.json"
                    output_path = str(Path(output_dir, output_path))
                    result.save_to_json(output_path)
                    print(f"Processed {repr(str(input_path))}")
            except Exception as e:
                print(
                    f"Error processing {batch} on {repr(device)}: {e}",
                    file=sys.stderr
                )
            batch.clear()


def main():
    parser = argparse.ArgumentParser()

    parser.add_argument(
        "--pipeline",
        type=str,
        required=True,
        help="PaddleOCR pipeline, e.g. 'DocPreprocessor'.",
    )
    parser.add_argument(
        "--input_dir", type=str, required=True, help="Input directory."
    )
    parser.add_argument(
        "--device",
        type=str,
        required=True,
        help="Specifies the devices for performing parallel inference.",
    )
    parser.add_argument(
        "--output_dir", type=str, default="output", help="Output directory."
    )
    parser.add_argument(
        "--instances_per_device",
        type=int,
        default=1,
        help="Number of pipeline instances per device.",
    )
    parser.add_argument(
        "--batch_size",
        type=int,
        default=1,
        help="Inference batch size for each pipeline instance.",
    )
    parser.add_argument(
        "--input_glob_pattern",
        type=str,
        default="*",
        help="Pattern to find the input files.",
    )
    args = parser.parse_args()

    input_dir = Path(args.input_dir)
    if not input_dir.exists():
        print(f"The input directory does not exist: {input_dir}", file=sys.stderr)
        return 2
    if not input_dir.is_dir():
        print(f"{repr(str(input_dir))} is not a directory.", file=sys.stderr)
        return 2

    output_dir = Path(args.output_dir)
    if output_dir.exists() and not output_dir.is_dir():
        print(f"{repr(str(output_dir))} is not a directory.", file=sys.stderr)
        return 2
    output_dir.mkdir(parents=True, exist_ok=True)

    from paddlex.utils.device import constr_device, parse_device

    device_type, device_ids = parse_device(args.device)
    if device_ids is None or len(device_ids) == 1:
        print(
            "Please specify at least two devices for performing parallel inference.",
            file=sys.stderr,
        )
        return 2

    if args.batch_size <= 0:
        print("Batch size must be greater than 0.", file=sys.stderr)
        return 2

    with Manager() as manager:
        task_queue = manager.Queue()
        for img_path in input_dir.glob(args.input_glob_pattern):
            task_queue.put(str(img_path))

        processes = []
        for device_id in device_ids:
            for _ in range(args.instances_per_device):
                device = constr_device(device_type, [device_id])
                p = Process(
                    target=worker,
                    args=(
                        args.pipeline,
                        device,
                        task_queue,
                        args.batch_size,
                        str(output_dir),
                    ),
                )
                p.start()
                processes.append(p)

        for p in processes:
            p.join()

    print("All done")
    return 0


if __name__ == "__main__":
    sys.exit(main())

假设将上述脚本存储为 infer_mp.py,以下是一些调用示例:

# 确定 `--pipeline` 参数需查看其产线 **脚本方式** 导入类名称
# 此处为通用版面解析 v3 产线,对应PPStructureV3
# 其余产线例如印章文本产线对应SealRecognition,文档理解产线对应DocUnderstanding
# 处理 `input_images` 目录中所有文件
# 使用 GPU 0、1、2、3,每块 GPU 上 1 个产线实例,每个实例一次处理 1 个输入文件
python infer_mp.py \
    --pipeline PPStructureV3 \
    --input_dir input_images/ \
    --device 'gpu:0,1,2,3' \
    --output_dir output

# 通用版面解析 v3 产线
# 处理 `input_images` 目录中所有后缀为 `.jpg` 的文件
# 使用 GPU 0、2,每块 GPU 上 2 个产线实例,每个实例一次处理 4 个输入文件
python infer_mp.py \
    --pipeline PPStructureV3 \
    --input_dir input_images/ \
    --device 'gpu:0,2' \
    --output_dir output \
    --instances_per_device 2 \
    --batch_size 4 \
    --input_glob_pattern '*.jpg'