跳转至

产线并行推理

指定多个推理设备

对于部分产线的 CLI 和 Python API,PaddleX 支持同时指定多个推理设备。如果指定了多个设备,产线初始化时将在每个设备上创建一个底层产线类对象的实例,并对接收到的输入进行并行推理。例如,对于通用版面解析 v3 产线:

paddlex --pipeline PP-StructureV3 \
        --input input_images/ \
        --use_doc_orientation_classify False \
        --use_doc_unwarping False \
        --use_textline_orientation False \
        --save_path ./output \
        --device 'gpu:0,1,2,3'
pipeline = create_pipeline(pipeline="PP-StructureV3", device="gpu:0,1,2,3")
output = pipeline.predict(
    input="input_images/",
    use_doc_orientation_classify=False,
    use_doc_unwarping=False,
    use_textline_orientation=False,
)

以上两个例子均使用 4 块 GPU(编号为 0、1、2、3)对 input_images 目录中的文件进行并行推理。

指定多个设备时,推理接口仍然与指定单设备时保持一致。请查看产线使用教程以了解某一产线是否支持指定多个推理设备。

多进程并行推理示例

除了使用 PaddleX 内置的多设备并行推理功能外,用户也可以结合实际场景,通过包装 PaddleX 产线推理 API 调用逻辑等手段实现并行推理加速,以期达到更优的加速比。如下是使用 Python 多进程实现多卡、多实例并行处理输入目录中的文件的示例代码:

import argparse
import sys
from multiprocessing import Manager, Process
from pathlib import Path
from queue import Empty

from paddlex import create_pipeline
from paddlex.utils.device import constr_device, parse_device


def worker(pipeline_name_or_config_path, device, task_queue, batch_size, output_dir):
    pipeline = create_pipeline(pipeline_name_or_config_path, device=device)

    should_end = False
    batch = []

    while not should_end:
        try:
            input_path = task_queue.get_nowait()
        except Empty:
            should_end = True
        else:
            batch.append(input_path)

        if batch and (len(batch) == batch_size or should_end):
            try:
                for result in pipeline.predict(batch):
                    input_path = Path(result["input_path"])
                    if result.get("page_index") is not None:
                        output_path = f"{input_path.stem}_{result['page_index']}.json"
                    else:
                        output_path = f"{input_path.stem}.json"
                    output_path = str(Path(output_dir, output_path))
                    result.save_to_json(output_path)
                    print(f"Processed {repr(str(input_path))}")
            except Exception as e:
                print(
                    f"Error processing {batch} on {repr(device)}: {e}", file=sys.stderr
                )
            batch.clear()


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--pipeline", type=str, required=True, help="Pipeline name or config path."
    )
    parser.add_argument("--input_dir", type=str, required=True, help="Input directory.")
    parser.add_argument(
        "--device",
        type=str,
        required=True,
        help="Specifies the devices for performing parallel inference.",
    )
    parser.add_argument(
        "--output_dir", type=str, default="output", help="Output directory."
    )
    parser.add_argument(
        "--instances_per_device",
        type=int,
        default=1,
        help="Number of pipeline instances per device.",
    )
    parser.add_argument(
        "--batch_size",
        type=int,
        default=1,
        help="Inference batch size for each pipeline instance.",
    )
    parser.add_argument(
        "--input_glob_pattern",
        type=str,
        default="*",
        help="Pattern to find the input files.",
    )
    args = parser.parse_args()

    input_dir = Path(args.input_dir)
    if not input_dir.exists():
        print(f"The input directory does not exist: {input_dir}", file=sys.stderr)
        return 2
    if not input_dir.is_dir():
        print(f"{repr(str(input_dir))} is not a directory.", file=sys.stderr)
        return 2

    output_dir = Path(args.output_dir)
    if output_dir.exists() and not output_dir.is_dir():
        print(f"{repr(str(output_dir))} is not a directory.", file=sys.stderr)
        return 2
    output_dir.mkdir(parents=True, exist_ok=True)

    device_type, device_ids = parse_device(args.device)
    if device_ids is None or len(device_ids) == 1:
        print(
            "Please specify at least two devices for performing parallel inference.",
            file=sys.stderr,
        )
        sys.exit(2)

    if args.batch_size <= 0:
        print("Batch size must be greater than 0.", file=sys.stderr)
        sys.exit(2)

    manager = Manager()
    task_queue = manager.Queue()
    for img_path in input_dir.glob(args.input_glob_pattern):
        task_queue.put(str(img_path))

    processes = []
    for device_id in device_ids:
        for _ in range(args.instances_per_device):
            device = constr_device(device_type, [device_id])
            p = Process(
                target=worker,
                args=(
                    args.pipeline,
                    device,
                    task_queue,
                    args.batch_size,
                    str(output_dir),
                ),
            )
            p.start()
            processes.append(p)

    for p in processes:
        p.join()

    print("All done")

    return 0


if __name__ == "__main__":
    sys.exit(main())

假设将上述脚本存储为 mp_infer.py,以下是一些调用示例:

# 通用版面解析 v3 产线
# 处理 `input_images` 目录中所有文件
# 使用 GPU 0、1、2、3,每块 GPU 上 1 个产线实例,每个实例一次处理 1 个输入文件
python mp_infer.py \
    --pipeline PP-StructureV3 \
    --input_dir input_images/ \
    --device 'gpu:0,1,2,3' \
    --output_dir output1

# 通用版面解析 v3 产线
# 处理 `input_images` 目录中所有后缀为 `.jpg` 的文件
# 使用 GPU 0、2,每块 GPU 上 2 个产线实例,每个实例一次处理 4 个输入文件
python mp_infer.py \
    --pipeline PP-StructureV3 \
    --input_dir input_images/ \
    --device 'gpu:0,2' \
    --output_dir output2 \
    --instances_per_device 2 \
    --batch_size 4 \
    --input_glob_pattern '*.jpg'