Skip to main content
Skip to main content
多线程/进程模块API文档
文档类型
API 文档
相关模块
worker
相关版本
v0.x
发布日期
2026-02-15

Worker 模块 API 文档

按「描述、函数签名、参数、输出、示例」列出 Worker 模块中应用/基础设施代码会直接使用的入口;内部 helper 和抽象基类不列入。架构与设计见 architecture.md / decisions.md,快速上手见 overview.md


ProcessWorker(构造函数)

基于 multiprocessing 的多进程任务执行器,适合 CPU 密集或混合型任务。支持 Batch / 队列两种执行模式,并内置进度统计和日志输出。

ProcessWorker(max_workers: Optional[int] = None, execution_mode: ExecutionMode = ExecutionMode.QUEUE, batch_size: Optional[int] = None, job_executor: Optional[Callable] = None, enable_monitoring: bool = True, timeout: float = 300.0, is_verbose: bool = False, debug: bool = False, start_method: str = "spawn")

参数(摘选)

参数类型说明
max_workersint | None最大并行进程数;None 时使用 CPU 核数
execution_modeExecutionMode执行模式:BATCHQUEUE
batch_sizeint | NoneBatch 模式时每批大小;默认使用 CPU 核数
job_executorCallable任务执行函数,签名约定为 func(payload) -> Any
timeoutfloat单个任务超时时间(秒)
is_verbosebool是否打印详细日志

无(构造实例)

from core.infra.worker import ProcessWorker, ProcessExecutionMode

def run_job(payload):
    return payload["x"] + payload["y"]

worker = ProcessWorker(
    max_workers=None,
    execution_mode=ProcessExecutionMode.QUEUE,
    job_executor=run_job,
    is_verbose=True,
)

ProcessWorker.run_jobs

执行一批任务,支持单次或分批调用,并返回统计信息。

ProcessWorker.run_jobs(jobs: Optional[List[Dict[str, Any]]] = None, total_jobs: Optional[int] = None) -> Dict[str, Any]

参数类型说明
jobsList[Dict[str, Any]] | None任务列表,每个任务形如 {"id": str, "payload": Any}{"id": str, "data": Any}
total_jobsint | None总任务数(分批执行时用于正确计算进度)

Dict[str, Any] —— 含 total_jobscompleted_jobsfailed_jobstotal_duration 等统计字段。

jobs = [{"id": f"job_{i}", "payload": {"x": i, "y": 1}} for i in range(100)]
stats = worker.run_jobs(jobs)
results = worker.get_results()

ProcessWorker.get_results / get_successful_results / get_failed_results

获取任务执行结果列表,或只获取成功/失败的结果。

 

  • ProcessWorker.get_results() -> List[JobResult]
  • ProcessWorker.get_successful_results() -> List[JobResult]
  • ProcessWorker.get_failed_results() -> List[JobResult]

List[JobResult] —— 每个结果包含 job_idstatusresulterrorduration 等。


ProcessWorker.print_stats / reset

打印运行统计信息,或重置执行器状态(清空任务、结果和统计信息)。

 

  • ProcessWorker.print_stats() -> None
  • ProcessWorker.reset() -> None

ProcessWorker.resolve_max_workers(推荐配合 ConfigManager 使用)

根据模块任务类型(CPU/IO/Mixed)和预留核心数自动计算合适的 worker 数量,内部会通过 ConfigManager.get_module_config() 读取 Worker 配置。

ProcessWorker.resolve_max_workers(max_workers: Union[str, int], module_name: str) -> int

参数类型说明
max_workersstr | int"auto" 表示自动计算,或显式指定数字
module_namestr模块名称,用于 worker 配置(如 "OpportunityEnumerator"

int —— 实际使用的 worker 数量。


MultiThreadWorker(构造函数)

基于 ThreadPoolExecutor 的轻量级多线程任务执行器,适合 IO 密集型任务或轻量并发场景。

MultiThreadWorker(max_workers: int = 5, execution_mode: ExecutionMode = ExecutionMode.PARALLEL, job_executor: Optional[Callable] = None, enable_monitoring: bool = True, timeout: float = 30.0, is_verbose: bool = False, debug: bool = False)

无(构造实例)

from core.infra.worker import MultiThreadWorker, ThreadExecutionMode

def fetch_url(url: str) -> str:
    # 伪代码:实际可使用 requests 等库
    return f"content of {url}"

worker = MultiThreadWorker(
    max_workers=20,
    execution_mode=ThreadExecutionMode.PARALLEL,
    job_executor=fetch_url,
    is_verbose=True,
)

jobs = [{"id": f"url_{i}", "data": f"https://example.com/{i}"} for i in range(100)]
stats = worker.run_jobs(jobs)
results = worker.get_results()

传统 Worker 接口(ProcessWorker / MultiThreadWorker)

ProcessExecutor(多进程执行器)

新的模块化架构下的多进程执行器,是对 ProcessWorker 的轻量封装,便于与 Orchestrator 组合使用。

ProcessExecutor(max_workers: Optional[int] = None, execution_mode: ExecutionMode = ExecutionMode.QUEUE, job_executor: Optional[Callable] = None, is_verbose: bool = False)

ProcessExecutor 实例,实现 Executor 接口。

from core.infra.worker import ProcessExecutor

def run_job(payload):
    return payload["x"] * 2

executor = ProcessExecutor(
    max_workers=None,
    job_executor=run_job,
    is_verbose=True,
)

jobs = [{"id": str(i), "data": {"x": i}} for i in range(100)]
results = executor.run_jobs(jobs)

ProcessExecutor.run_jobs

执行一批任务并返回 JobResult 列表,内部通过 ProcessWorker.run_jobs 完成。

ProcessExecutor.run_jobs(jobs: List[Dict[str, Any]], total_jobs: Optional[int] = None) -> List[JobResult]


ListJobSource

简单的列表任务源,从预加载的 Python 列表中按批次提供任务。

ListJobSource(jobs: List[Any])

常用方法

  • ListJobSource.get_batch(size: int) -> List[Any] —— 获取一批任务
  • ListJobSource.has_more() -> bool —— 是否还有任务
  • ListJobSource.total_count() -> int —— 任务总数
  • ListJobSource.reset() -> None —— 重置游标
from core.infra.worker import ListJobSource

job_source = ListJobSource(jobs=list(range(100)))
batch = job_source.get_batch(10)

模块化执行器与任务源

MemoryMonitor(内存监控器)

监控当前进程 RSS 内存使用情况,估算每个任务的内存占用,并提供简要告警信息。

MemoryMonitor(memory_budget_mb: float, baseline_rss_mb: Optional[float] = None)

常用方法

  • MemoryMonitor.update(... ) -> None —— 更新监控状态(通常由调度器调用)
  • MemoryMonitor.get_stats() -> Dict[str, Any] —— 获取当前内存统计
  • MemoryMonitor.get_warnings() -> List[str] —— 获取告警列表
  • MemoryMonitor.export_snapshot() -> Dict[str, Any] —— 导出监控快照
from core.infra.worker import MemoryMonitor

monitor = MemoryMonitor(memory_budget_mb=4096.0)
stats = monitor.get_stats()
warnings = monitor.get_warnings()

MemoryAwareScheduler(内存感知调度器)

基于内存监控数据动态调整 batch 大小的调度器,可与 Orchestrator 一起使用,实现「既尽量用满资源,又避免 OOM」的执行策略。

MemoryAwareScheduler(jobs: List[Any], memory_budget_mb: float | str | None = "auto", warmup_batch_size: int | str = "auto", min_batch_size: int | str = "auto", max_batch_size: int | str = "auto", smooth_factor: float = 0.3, summary_weight: float = 0.2, monitor_interval: int = 5, log: Optional[logging.Logger] = None)

常用方法

  • iter_batches() -> Iterable[List[Any]] —— 迭代批次任务(与 Orchestrator 配合)
  • get_next_batch_size() -> int —— 获取下一个批次大小
  • update_after_batch(batch_size: int, batch_results: List[Any], finished_jobs: int) -> None —— 在批次执行后更新状态
  • get_progress() -> Dict[str, Any] —— 获取进度信息(总任务数、已完成数、百分比等)
  • get_memory_warning() -> Optional[str] —— 获取当前内存告警(如有)
from core.infra.worker import (
    Orchestrator,
    ProcessExecutor,
    ListJobSource,
    MemoryAwareScheduler,
)

jobs = [{"id": str(i), "data": i} for i in range(10000)]
job_source = ListJobSource(jobs)
scheduler = MemoryAwareScheduler(jobs)
executor = ProcessExecutor(job_executor=lambda x: x * 2)

orchestrator = Orchestrator(
    executor=executor,
    job_source=job_source,
    scheduler=scheduler,
)

result = orchestrator.run()

内存监控与调度

Orchestrator(构造函数)

组合执行器、任务源、调度器、监控器、聚合器、错误处理器等组件,对外提供统一的高级执行 API。

Orchestrator(executor: Executor, job_source: JobSource, scheduler: Optional[Scheduler] = None, monitor: Optional[Monitor] = None, aggregator: Optional[Aggregator] = None, error_handler: Optional[ErrorHandler] = None)

参数(摘选)

参数类型说明
executorExecutor实际执行任务的执行器(如 ProcessExecutor
job_sourceJobSource任务源(如 ListJobSource
schedulerScheduler | None可选调度器(如 MemoryAwareScheduler
monitorMonitor | None可选监控器(如 MemoryMonitor
aggregatorAggregator | None结果聚合器
error_handlerErrorHandler | None错误处理器

Orchestrator.run

执行所有任务,并返回包含原始结果列表和聚合摘要的字典。

Orchestrator.run() -> Dict[str, Any]

 

{
  "results": List[JobResult],  # 每个任务的执行结果
  "summary": Dict[str, Any],   # 聚合信息和监控统计
}

Orchestrator.shutdown

关闭执行器等组件,释放资源。通常在长时间运行的服务停止前调用;脚本模式下可省略。

Orchestrator.shutdown() -> None


Orchestrator(编排器)