python multiprocessing 多进程踩坑
之所以写这个东西,还是因为之前操作系统学了个寂寞,考前背个板混个分数就过了。但是这玩意儿在实际应用的时候,才会体会到某些坑爹的地方~
众所周知,在 python 中,I/O 密集型任务 可以用多线程的方式来实现(threading 库);然而,对于 计算密集型 任务,由于 python 中全局锁 GIL 的存在,多线程并不能起到一个加速的作用。所以此时,一般使用多进程的方式实现(multiprocessing 库)。虽然对于多进程,网上资料一搜一大把,但是这并不妨碍实际操作过程中会踩到坑。
需求分析
现在我们有一个 worker
函数,接受不同的输入参数 args
,之后进行一些计算。每个进程都可以起一个 worker
函数。
然而,在计算的时候,我们需要借助一些计算资源(如 GPU、网络服务器等)。那么我们可以用一个 multiprocessing.Array
来管理这些资源。
下面,以使用 GPU 为例:现在机器上有 4 台 GPU,而每台 GPU 上最多可以同时挂 5 个程序(这样不会炸显存)。那么我们可以维护这样一个数组:
1 | shared_resource_buf = multiprocessing.Array('i', [5] * 4) |
那么 worker
函数的基本结构可以抽象成这样:
1 | def worker(args, shared_resource_buf): |
初步实现
首先明确一下 multiprocessing.Array
的特性:
Returns a synchronized shared array
也就是说对它的访问、赋值操作都是原子的,不需要考虑这些操作的同步问题。那么我们可以写出一个初步的代码来:
1 | shared_resource_buf = multiprocessing.Array('i', [5] * 4) |
然而,该实现会有一个问题:试想我们有进程 A 和进程 B 同时在运行 worker
函数,但是此时 shared_resource_buf[0] = 1
,其他分量不可用,也就是说进程 A 和进程 B 中有一个进程需要先等另一个进程拿到资源、执行完 work
、释放资源之后,才能去拿 resource_id=0
对应的资源,并计算。
但是在实际的执行过程中,会出现这样的执行顺序:
i=0
,进程 A 走到if shared_resource_buf[i] >= 1:
,此时shared_resource_buf[0] = 1
满足条件,通过;i=0
,进程 B 走到if shared_resource_buf[i] >= 1:
,此时shared_resource_buf[0] = 1
满足条件,通过;- 进程 A 拿到
resource_id=0
对应资源; - 进程 B 拿到
resource_id=0
对应资源;
这样就会和我们预想的情景不一样。对此,我们弄出来的解决方法也很简单:加锁!
土味实现
python 的 multiprocessing 库也向我们提供了同步锁的实现:multiprocessing.Lock
参考下文:
https://cloud.tencent.com/developer/article/1485111
也就是说
lock.acquire
是获取锁,如果不能获取到那么进程就阻塞;lock.release
是释放锁,是
我们可以用锁将上面那一段给锁起来:
1 | shared_resource_buf = multiprocessing.Array('i', [5] * 4) |
这样实现是 OK 的,但是还是会出现一些问题:
首先是一个比较小的问题:就是只需执行途中如果 kill 掉的话,可能会触发多次 lock.release
从而报错:
ValueError: semaphore or lock released too many times
然后是比较关键的:一开始我不知道 shared_resource_buf
是原子变量,所以也把下面的归还操作锁起来了:
1 | shared_resource_buf = multiprocessing.Array('i', [5] * 4) |
问题就来了:同样地,试想我们有进程 A 和进程 B 同时在运行 worker
函数,但是此时 shared_resource_buf[0] = 1
,其他分量不可用,也就是说进程 A 和进程 B 中有一个进程需要先等另一个进程拿到资源、执行完 work
、释放资源之后,才能去拿 resource_id=0
对应的资源,并计算。
但是在实际的执行过程中,会出现这样的执行顺序:
i=0
,进程 A 拿到锁,走到if shared_resource_buf[i] >= 1:
,此时shared_resource_buf[0] = 1
满足条件,通过,进程 A 释放锁;- 进程 A 执行
work
,利用资源进行计算; i=0
,进程 B 拿到锁,走到if shared_resource_buf[i] >= 1:
,此时shared_resource_buf[0] = 0
不满足条件,sleep;- 进程 A 尝试归还资源,但是因为锁拿不到,所以阻塞;
- 进程 B 始终拿不到锁,所以一直在睡大觉。
这样就死锁了。
稍微正常一些的实现
上面的土味实现问题出在:临界区过大了,在不该锁(while True
轮询)的时候锁上了,所以才会出现死锁的情况。
所以解决方法也就是:将 while True
轮询挪到最外面,单次只尝试一次轮询,发现暂时没资源的话就释放锁,并睡觉。代码如下:
1 | shared_resource_buf = multiprocessing.Array('i', [5] * 4) |
这样大概应该也许就没啥问题了……
所以说啊,进程同步真的是坠痛苦的,锁一没用好的话一下子就……