

新闻资讯
技术教程python 标准库 `multiprocessing.pool` 按进程数分配资源,而非按核心数;若需“每进程独占 n 核”,需手动控制进程数并配合 cpu 绑定(如 `taskset` 或 `psutil`),而非依赖内置 api。
Python 的 multiprocessing.Pool 本身不提供“每进程分配 N 个 CPU 核心”的原生接口——它的第一个参数 processes 表示启动多少个独立进程,而非为每个进程预留多少逻辑核心。操作系统调度器负责将这些进程映射到可用 CPU 上,但默认不保证隔离或独占。所谓“N 核每进程”,本质是资源编排需求,需结合进程数量控制与 CPU 亲和性(CPU affinity)手动实现。
计算合理进程数:若系统共有 TOTAL_CORES 个逻辑核心,每个任务理想占用 N_CORES_PER_PROCESS 核,且需为父进程保留 reserve_cores(如 2 核),则应设置:
import multiprocessing as mp TOTAL_CORES = mp.cpu_count() # e.g., 16 N_CORES_PER_PROCESS = 2 reserve_cores = 2 processes = max(1, (TOTAL_CORES - reserve_cores) // N_CORES_PER_PROCESS) # e.g., (16-2)//2 =7
在子进程中绑定 CPU 核心(关键!):使用 psutil 或 os.sched_setaffinity(Linux/macOS)限制每个 worker 进程仅运行在指定核心集合上。示例(Linux):
import psutil
import os
def bind_to_cores(core_ids):
"""将当前进程绑定到 core_ids 指定的核心列表"""
try:
p = psutil.Process()
p.cpu_affinity(core_ids)
except (psutil.AccessDenied, AttributeError, NotImplementedError):
pass # 忽略无权限或平台不支持的情况
def worker_init(core_range):
# 每个 worker 初始化时绑定一组连续核心,例如 [0,1], [2,3], ...
start = core_range[0]
end = core_range[1]
bind_to_cores(list(range(start, end)))
# 构建核心分组:[[0,1], [2,3], ..., [12,13]]
core_groups = [
[i, i + N_CORES_PER_PROCESS]
for i in range(0, processes * N_CORES_PER_PROCESS, N_CORES_PER_PROCESS)
]
with mp.Pool(
processes=processes,
initializer=worker_init,
initargs=(core_groups[0],) # 注意:实际需为每个进程动态分配不同组 → 需用更健壮方式(见下文)
) as pool:
results = pool.map(func, range(10000))⚠️ 注意:initializer 对所有 worker 执行同一份参数,无法自动区分进程 ID。更可靠的方式是使用 concurrent.futures.ProcessPoolExecutor 自定义 mp.Process 子类,或借助 loky(joblib 后端)等高级库支持 per-worker 初始化。
joblib.Parallel 内置对 CPU 绑定的支持(通过 backend='loky' + prefer='processes'),并允许显式控制核心分配策略:
from joblib import Parallel, delayed
import os
# 设置环境变量(Linux/macOS),让子进程自动绑定
os.environ['OMP_NUM_THREADS'] = '1'
os.environ['OPENBLAS_NUM_THREADS'] = '1'
# 使用 loky backend 并显式指定 n_jobs
results = Parallel(
n_jobs=7, # = (16-2)//2
backend='loky',
prefer='processes'
)(delayed(func)(x) for x in range(10000))