之所以写这个东西,还是因为之前操作系统学了个寂寞,考前背个板混个分数就过了。但是这玩意儿在实际应用的时候,才会体会到某些坑爹的地方~

众所周知,在 python 中,I/O 密集型任务 可以用多线程的方式来实现(threading 库);然而,对于 计算密集型 任务,由于 python 中全局锁 GIL 的存在,多线程并不能起到一个加速的作用。所以此时,一般使用多进程的方式实现(multiprocessing 库)。虽然对于多进程,网上资料一搜一大把,但是这并不妨碍实际操作过程中会踩到坑。

需求分析

现在我们有一个 worker 函数,接受不同的输入参数 args,之后进行一些计算。每个进程都可以起一个 worker 函数。

然而,在计算的时候,我们需要借助一些计算资源(如 GPU、网络服务器等)。那么我们可以用一个 multiprocessing.Array 来管理这些资源。

下面,以使用 GPU 为例:现在机器上有 4 台 GPU,而每台 GPU 上最多可以同时挂 5 个程序(这样不会炸显存)。那么我们可以维护这样一个数组:

1
shared_resource_buf = multiprocessing.Array('i', [5] * 4)

那么 worker 函数的基本结构可以抽象成这样:

1
2
3
4
5
6
7
8
9
10
11
12
def worker(args, shared_resource_buf):
resource_id = -1
'''
决定到底用哪个资源
'''
'''
拿到资源id
'''
work(args, resource_id)
'''
归还资源id
'''

初步实现

首先明确一下 multiprocessing.Array 的特性:

Returns a synchronized shared array

也就是说对它的访问、赋值操作都是原子的,不需要考虑这些操作的同步问题。那么我们可以写出一个初步的代码来:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
shared_resource_buf = multiprocessing.Array('i', [5] * 4)

def worker(args, shared_resource_buf):
'''
决定到底用哪个资源
拿到资源id
'''
resource_id = -1
while True:
for i in range(len(shared_resource_buf)):
if shared_resource_buf[i] >= 1:
resource_id = i
shared_resource_buf[i] -= 1
break
else:
time.sleep(10)
continue
break
work(args, resource_id)
'''
归还资源id
'''
shared_resource_buf[gpu_id] += 1

然而,该实现会有一个问题:试想我们有进程 A 和进程 B 同时在运行 worker 函数,但是此时 shared_resource_buf[0] = 1,其他分量不可用,也就是说进程 A 和进程 B 中有一个进程需要先等另一个进程拿到资源、执行完 work、释放资源之后,才能去拿 resource_id=0 对应的资源,并计算。

但是在实际的执行过程中,会出现这样的执行顺序:

  1. i=0,进程 A 走到 if shared_resource_buf[i] >= 1:,此时 shared_resource_buf[0] = 1 满足条件,通过;
  2. i=0,进程 B 走到 if shared_resource_buf[i] >= 1:,此时 shared_resource_buf[0] = 1 满足条件,通过;
  3. 进程 A 拿到 resource_id=0 对应资源;
  4. 进程 B 拿到 resource_id=0 对应资源;

这样就会和我们预想的情景不一样。对此,我们弄出来的解决方法也很简单:加锁!

土味实现

python 的 multiprocessing 库也向我们提供了同步锁的实现:multiprocessing.Lock

参考下文:

https://cloud.tencent.com/developer/article/1485111

也就是说

  • lock.acquire 是获取锁,如果不能获取到那么进程就阻塞;
  • lock.release 是释放锁,是

我们可以用锁将上面那一段给锁起来:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
shared_resource_buf = multiprocessing.Array('i', [5] * 4)
lock = multiprocessing.Lock()

def worker(args, shared_resource_buf, lock):
'''
决定到底用哪个资源
拿到资源id
'''
resource_id = -1
try:
lock.acquire()
while True:
for i in range(len(shared_resource_buf)):
if shared_resource_buf[i] >= 1:
resource_id = i
shared_resource_buf[i] -= 1
break
else:
time.sleep(10)
continue
break
except Exception as err:
raise err
finally:
lock.release()
work(args, resource_id)

'''
归还资源id
'''
shared_resource_buf[gpu_id] += 1

这样实现是 OK 的,但是还是会出现一些问题:

首先是一个比较小的问题:就是只需执行途中如果 kill 掉的话,可能会触发多次 lock.release 从而报错:

ValueError: semaphore or lock released too many times

然后是比较关键的:一开始我不知道 shared_resource_buf 是原子变量,所以也把下面的归还操作锁起来了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
shared_resource_buf = multiprocessing.Array('i', [5] * 4)
lock = multiprocessing.Lock()

def worker(args, shared_resource_buf, lock):
'''
决定到底用哪个资源
拿到资源id
'''
resource_id = -1
try:
lock.acquire()
while True:
for i in range(len(shared_resource_buf)):
if shared_resource_buf[i] >= 1:
resource_id = i
shared_resource_buf[i] -= 1
break
else:
time.sleep(10)
continue
break
except Exception as err:
raise err
finally:
lock.release()
work(args, resource_id)

'''
归还资源id
'''
try:
lock.acquire()
shared_resource_buf[gpu_id] += 1
except Exception as err:
raise err
finally:
lock.release()

问题就来了:同样地,试想我们有进程 A 和进程 B 同时在运行 worker 函数,但是此时 shared_resource_buf[0] = 1,其他分量不可用,也就是说进程 A 和进程 B 中有一个进程需要先等另一个进程拿到资源、执行完 work、释放资源之后,才能去拿 resource_id=0 对应的资源,并计算。

但是在实际的执行过程中,会出现这样的执行顺序:

  1. i=0,进程 A 拿到锁,走到 if shared_resource_buf[i] >= 1:,此时 shared_resource_buf[0] = 1 满足条件,通过,进程 A 释放锁;
  2. 进程 A 执行 work,利用资源进行计算;
  3. i=0,进程 B 拿到锁,走到 if shared_resource_buf[i] >= 1:,此时 shared_resource_buf[0] = 0 不满足条件,sleep;
  4. 进程 A 尝试归还资源,但是因为锁拿不到,所以阻塞;
  5. 进程 B 始终拿不到锁,所以一直在睡大觉。

这样就死锁了。

稍微正常一些的实现

上面的土味实现问题出在:临界区过大了,在不该锁(while True 轮询)的时候锁上了,所以才会出现死锁的情况。

所以解决方法也就是:将 while True 轮询挪到最外面,单次只尝试一次轮询,发现暂时没资源的话就释放锁,并睡觉。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
shared_resource_buf = multiprocessing.Array('i', [5] * 4)
lock = multiprocessing.Lock()

def worker(args, shared_resource_buf, lock):
'''
决定到底用哪个资源
拿到资源id
'''
resource_id = -1
while True:
try:
lock.acquire()
for i in range(len(shared_resource_buf)):
if shared_resource_buf[i] >= 1:
resource_id = i
shared_resource_buf[i] -= 1
break
except Exception as err:
raise err
finally:
lock.release()
if (resource_id == -1):
time.sleep(10)
continue
else:
break

work(args, resource_id)

'''
归还资源id
'''
try:
lock.acquire()
shared_resource_buf[gpu_id] += 1
except Exception as err:
raise err
finally:
lock.release()

这样大概应该也许就没啥问题了……

所以说啊,进程同步真的是坠痛苦的,锁一没用好的话一下子就……