为什么ProcessPoolExecutor和Pool在调用Super()时崩溃?
问题描述
1.为什么以下使用concurrent.futures模块的Python代码永远挂起?
import concurrent.futures
class A:
def f(self):
print("called")
class B(A):
def f(self):
executor = concurrent.futures.ProcessPoolExecutor(max_workers=2)
executor.submit(super().f)
if __name__ == "__main__":
B().f()
该调用引发不可见的异常[Errno 24] Too many open files(若要查看该异常,请将行executor.submit(super().f)替换为print(executor.submit(super().f).exception()))。
但是,将ProcessPoolExecutor替换为ThreadPoolExecutor会如预期的那样打印";调用";。
2.以下使用multiprocessing.pool模块的Python代码为什么引发异常AssertionError: daemonic processes are not allowed to have children?
import multiprocessing.pool
class A:
def f(self):
print("called")
class B(A):
def f(self):
pool = multiprocessing.pool.Pool(2)
pool.apply(super().f)
if __name__ == "__main__":
B().f()
但是,将Pool替换为ThreadPool会如预期的那样打印";调用";。
环境:CPython3.7、MacOS 10.14。
解决方案
concurrent.futures.ProcessPoolExecutor和multiprocessing.pool.Pool使用multiprocessing.queues.Queue将功函数对象从调用方传递到工作进程,Queue使用pickle模块进行序列化/反序列化,但无法正确处理绑定了子类实例的方法对象:
f = super().f
print(f)
pf = pickle.loads(pickle.dumps(f))
print(pf)
输出:
<bound method A.f of <__main__.B object at 0x104b24da0>>
<bound method B.f of <__main__.B object at 0x104cfab38>>
A.f变为B.f,这将有效地在工作进程中创建对B.ftoB.f的无限递归调用。
pickle.dumps绑定方法对象的利用__reduce__方法,IMO,its implementation没有考虑这个场景,它没有照顾到真正的func对象,而只是试图从实例selfobj(B())中取回简单名称(f),这导致了B.f,很可能是一个错误。
好消息是,因为我们知道问题出在哪里,我们可以通过实现我们自己的归约函数来修复它,该函数尝试从原始函数(A.f)和实例obj(B())重新创建绑定的方法对象:
import types
import copyreg
import multiprocessing
def my_reduce(obj):
return (obj.__func__.__get__, (obj.__self__,))
copyreg.pickle(types.MethodType, my_reduce)
multiprocessing.reduction.register(types.MethodType, my_reduce)
我们可以这样做,因为绑定方法是一个描述符。
PS:我已提交a bug report。
相关文章