Python多进程-垃圾回收造成多进程共享变量数据异常

概述

最近在开发过程中,发现了一个 Python 垃圾回收在多进程数据共享上的问题, 费了我很大劲才排查出来: 垃圾回收可能会造成共享变量内存异常,表象是: A共享变量的更新异常地同步到了B共享变量上, 造成 B共享变量 数据异常。

BUG 复现

import time
import multiprocessing as mp


class TestWorker(object):

    def __init__(self):
        self.id1 = mp.Value("d", 0)

    def do_work(self, index):
        while True:
            self.id1.value = index


class TestProcessor(object):
    def __init__(self, index):
        self.index = index
        self._worker = None
        self.start_work_process_wrong()
        # self.start_work_process_right()

    def start_work_process_wrong(self):
        worker = TestWorker()
        process = mp.Process(
            target=worker.do_work,
            args=(self.index,)
        )
        process.start()
        return process

    def start_work_process_right(self):
        # 把变量绑定到实例对象中,就不会被垃圾回收了
        if self._worker is None:
            self._worker = TestWorker()
        process = mp.Process(
            target=self._worker.do_work,
            args=(self.index,)
        )
        process.start()
        return process


if __name__ == "__main__":
    p1 = TestProcessor(1111)
    flag = mp.Value("d", -1)
    p2 = TestProcessor(2222)

    while True:
        print(f"flag: {flag.value}")
        time.sleep(1)


执行结果

由于没有对flag 的赋值语句,预期 flag 仍然为默认值 -1.0, 但实际却被更改成了 1111.0

flag: 1111.0
flag: 1111.0
flag: 1111.0
flag: 1111.0
flag: 1111.0

分析

由于 TestProcessor.start_work_process_wrong 中由于 worker 是局部变量,当第二次调用 start_work_process_wrong 后,会触发了 python 的垃圾回收,导致p1 中局部变量 worker 在父进程中 被回收了,此时当工作子进程在子进程中对多进程共享变量进行赋值时 (self.id1.value = index ), 找不到父进程中的共享变量内存地址,异常地就近赋值给了该父进程的下一个共享变量的内存地址上(flag),进而导致了数据安全问题。

规避方法

规避垃圾回收,将变量 worker 绑定到对象实例上,如上述 start_work_process_right

底层实现机制探索

经测试,如果父进程中的共享对象被垃圾回收了,子进程中更新的值总是会传递给父进程中,第一个找到的共享变量,可能底层C源码是用数组实现的, 具体就得分析源码了[TODO]

  • 测试代码
import time
import multiprocessing as mp


class TestWorker(object):

    def __init__(self):
        self.id1 = mp.Value("d", 0)

    def do_work(self, index):
        i = 0
        while True:
            self.id1.value = index + i
            i += 1
            time.sleep(1)


class TestProcessor(object):
    def __init__(self, index):
        self.index = index
        self._worker = None
        self.start_work_process_wrong()
        # self.start_work_process_right()

    def start_work_process_wrong(self):
        worker = TestWorker()
        process = mp.Process(
            target=worker.do_work,
            args=(self.index,)
        )
        process.start()
        return process

    def start_work_process_right(self):
        # 把变量绑定到实例对象中,就不会被垃圾回收了
        if self._worker is None:
            self._worker = TestWorker()
        process = mp.Process(
            target=self._worker.do_work,
            args=(self.index,)
        )
        process.start()
        return process


if __name__ == "__main__":
    flag0 = mp.Value("d", -1)
    t1 = TestProcessor(10000)
    flag1 = mp.Value("d", -1)
    t2 = TestProcessor(20000)
    flag2 = mp.Value("d", -1)
    t3 = TestProcessor(30000)
    flag3 = mp.Value("d", -1)

    while True:
        print(f"flag0: {flag0.value} \t"
              f"flag1: {flag1.value} \t"
              f"flag2: {flag2.value} \t"
              f"flag3: {flag3.value} ")
        time.sleep(1)

执行结果

flag0: -1.0 	flag1: 10000.0 	flag2: 20000.0 	flag3: -1.0 
flag0: -1.0 	flag1: 10001.0 	flag2: 20001.0 	flag3: 30000.0 
flag0: -1.0 	flag1: 10002.0 	flag2: 20002.0 	flag3: 30001.0 
flag0: -1.0 	flag1: 10003.0 	flag2: 20003.0 	flag3: 30002.0 
flag0: -1.0 	flag1: 10004.0 	flag2: 20004.0 	flag3: 30003.0 
flag0: -1.0 	flag1: 10005.0 	flag2: 20005.0 	flag3: 30004.0 
flag0: -1.0 	flag1: 10006.0 	flag2: 20006.0 	flag3: 30005.0 
flag0: -1.0 	flag1: 10007.0 	flag2: 20007.0 	flag3: 30006.0 
flag0: -1.0 	flag1: 10008.0 	flag2: 20008.0 	flag3: 30007.0 
flag0: -1.0 	flag1: 10009.0 	flag2: 20009.0 	flag3: 30008.0