概述
最近在开发过程中,发现了一个 Python 垃圾回收在多进程数据共享上的问题, 费了我很大劲才排查出来: 垃圾回收可能会造成共享变量内存异常,表象是:
A共享变量
的更新异常地同步到了B共享变量
上, 造成B共享变量
数据异常。
BUG 复现
import time
import multiprocessing as mp
class TestWorker(object):
def __init__(self):
self.id1 = mp.Value("d", 0)
def do_work(self, index):
while True:
self.id1.value = index
class TestProcessor(object):
def __init__(self, index):
self.index = index
self._worker = None
self.start_work_process_wrong()
# self.start_work_process_right()
def start_work_process_wrong(self):
worker = TestWorker()
process = mp.Process(
target=worker.do_work,
args=(self.index,)
)
process.start()
return process
def start_work_process_right(self):
# 把变量绑定到实例对象中,就不会被垃圾回收了
if self._worker is None:
self._worker = TestWorker()
process = mp.Process(
target=self._worker.do_work,
args=(self.index,)
)
process.start()
return process
if __name__ == "__main__":
p1 = TestProcessor(1111)
flag = mp.Value("d", -1)
p2 = TestProcessor(2222)
while True:
print(f"flag: {flag.value}")
time.sleep(1)
执行结果
由于没有对
flag
的赋值语句,预期flag
仍然为默认值-1.0
, 但实际却被更改成了1111.0
flag: 1111.0
flag: 1111.0
flag: 1111.0
flag: 1111.0
flag: 1111.0
分析
由于 TestProcessor.start_work_process_wrong
中由于 worker
是局部变量,当第二次调用 start_work_process_wrong
后,会触发了 python 的垃圾回收,导致p1
中局部变量 worker
在父进程中 被回收了,此时当工作子进程在子进程中对多进程共享变量进行赋值时 (self.id1.value = index
), 找不到父进程中的共享变量内存地址,异常地就近赋值给了该父进程的下一个共享变量的内存地址上(flag
),进而导致了数据安全问题。
规避方法
规避垃圾回收,将变量 worker
绑定到对象实例上,如上述 start_work_process_right
底层实现机制探索
经测试,如果父进程中的共享对象被垃圾回收了,子进程中更新的值总是会传递给父进程中,第一个找到的共享变量,可能底层C源码是用数组实现的, 具体就得分析源码了[TODO]
- 测试代码
import time
import multiprocessing as mp
class TestWorker(object):
def __init__(self):
self.id1 = mp.Value("d", 0)
def do_work(self, index):
i = 0
while True:
self.id1.value = index + i
i += 1
time.sleep(1)
class TestProcessor(object):
def __init__(self, index):
self.index = index
self._worker = None
self.start_work_process_wrong()
# self.start_work_process_right()
def start_work_process_wrong(self):
worker = TestWorker()
process = mp.Process(
target=worker.do_work,
args=(self.index,)
)
process.start()
return process
def start_work_process_right(self):
# 把变量绑定到实例对象中,就不会被垃圾回收了
if self._worker is None:
self._worker = TestWorker()
process = mp.Process(
target=self._worker.do_work,
args=(self.index,)
)
process.start()
return process
if __name__ == "__main__":
flag0 = mp.Value("d", -1)
t1 = TestProcessor(10000)
flag1 = mp.Value("d", -1)
t2 = TestProcessor(20000)
flag2 = mp.Value("d", -1)
t3 = TestProcessor(30000)
flag3 = mp.Value("d", -1)
while True:
print(f"flag0: {flag0.value} \t"
f"flag1: {flag1.value} \t"
f"flag2: {flag2.value} \t"
f"flag3: {flag3.value} ")
time.sleep(1)
执行结果
flag0: -1.0 flag1: 10000.0 flag2: 20000.0 flag3: -1.0
flag0: -1.0 flag1: 10001.0 flag2: 20001.0 flag3: 30000.0
flag0: -1.0 flag1: 10002.0 flag2: 20002.0 flag3: 30001.0
flag0: -1.0 flag1: 10003.0 flag2: 20003.0 flag3: 30002.0
flag0: -1.0 flag1: 10004.0 flag2: 20004.0 flag3: 30003.0
flag0: -1.0 flag1: 10005.0 flag2: 20005.0 flag3: 30004.0
flag0: -1.0 flag1: 10006.0 flag2: 20006.0 flag3: 30005.0
flag0: -1.0 flag1: 10007.0 flag2: 20007.0 flag3: 30006.0
flag0: -1.0 flag1: 10008.0 flag2: 20008.0 flag3: 30007.0
flag0: -1.0 flag1: 10009.0 flag2: 20009.0 flag3: 30008.0