:star2:总结放开头

  • 创建进程池可以形象的理解为创建了一个能够并行的流水线,只消耗一次创建流水线的成本,处理接收到的的任务。相对的,如果不使用进程池,每个要求并行的任务都会新建一次进程,浪费时间。
  • 编程中本来没有进程池的概念的,除了python,其他的语言都是使用线程池(而进程是执行分隔开的任务)。python因为GIL的原因(仅限Cython),线程无法并行,所以把线程池的概念迁移到了进程,命名为进程池。

:star2:python进程池

当需要创建的子进程数量不多时,可以直接利用multiprocessing中的Process动态成生多个进程。

但如果是上百甚至上千个目标,手动的去创建进程的工作量巨大,此时就可以用到multiprocessing模块提供的Pool方法。

  1. 初始化Pool时,可以指定一个最大进程数
  2. 当有新的请求提交到Pool中时
    • 如果池还没有满,那么就会 创建 一个新的进程用来执行该请求;
    • 如果池中的进程数已经达到指定的最大值,那么该请求就会等待,直到池中有进程结束,才会用之前的进程来执行新的任务。

:star2:进程池如何使用?


:star:apply()

函数原型:apply(func[, args=()[, kwds={}]])

该函数用于传递不定参数,同python中的apply函数一致,主进程会被阻塞直到函数执行结束(不建议使用,并且3.x以后不再出现)


:star:apply_async

函数原型:apply_async(func[, args=()[, kwds={}[, callback=None]]])

与apply用法一致,但它是非阻塞的且支持结果返回后进行回调


:star:map()

函数原型:map(func, iterable[, chunksize=None])

Pool类中的map方法,与内置的map函数用法行为基本一致,它会使进程阻塞直到结果返回
注意:虽然第二个参数是一个迭代器,但在实际使用中,必须在整个队列都就绪后,程序才会运行子进程


:star:map_async()

函数原型:map_async(func, iterable[, chunksize[, callback]])
与map用法一致,但是它是非阻塞的


:star:close()

关闭进程池(pool),使其不再接受新的任务


:star:terminal()

结束工作进程,不再处理未处理的任务


:star:join()

主进程阻塞等待子进程的退出, join方法要在close或terminate之后使用


:star:使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 导入相关multiprocessing包
import multiprocessing

# 创建拥有CPU核心数量的进程的进程池
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())

for i in range(100):
# # 阻塞等待当前任务的进程结束
# pool.apply(func=pow, args=(i,2))

# 不阻塞等待当前任务的进程结束
pool.apply_async(func=pow, args=(i, 2))

# # map函数到一个列表,阻塞等待返回值
# results = pool.map(func=print, iterable=[i for i in range(10000)])

# # 不阻塞等待返回值,未运行完就调用results会报错。
# results = pool.map_async(func=print, iterable=[i for i in range(10000)])

# close后不会有新的进程加入到pool
pool.close()

# join函数等待所有子进程结束 # 调用join之前,先调用close函数,否则会出错。
pool.join()

# # 结束工作进程,不再处理未完成的任务。
# pool.terminate()


:star2:进程池中的进程和一般的进程有何区别?


:star:进程池中的Queue

  • 如果要使用进程池创建进程,就需要使用multiprocessing.Manager()中的Queue(),而不是multiprocessing.Queue()
  • 使用的方法是一样的,都是创建了一个缓存队列,再采用q.put()添加、q.get()阻塞等待获取。

:star2:参考文献