从Monte Carlo谈pySpark的fork引发的Bug

这是一篇老文章,记录了发现PySpark一个bug的过程,现重新整理下:

截止2016-05-19已发布最新Spark版本,如果你在使用pySpark,并且也用 import random的方式生成随机数,就可能会遇到下面的问题:

刚学Spark,故先看一段Monte Carlo method 求Pi的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from random import random
from operator import add
def funcx(x):
# print x[0],x[1]
return 1 if x[0]**2 + x[1]**2 < 1 else 0
def genRnd(ind):
x=random() * 2 - 1
y=random() * 2 - 1
return (x,y)
def runsp(total):
ret=sc.parallelize(xrange(total),1).map(genRnd).map(funcx).reduce(lambda x, y: x + y)/float(total) * 4
print ret
runsp(3)

spark-shell方式运行上述代码,多次运行runsp(n), 会发现几点有趣现象:

1, 按理说, n越大,虽不是越能逼近pi,但是逼近pi的概率应该是越大的。然而发现似乎并不如此,起初以为是python生成伪随机算法导致,还好通过下面一个现象发现问题。但是伪随机算法在多大程度上干扰了Monte Carlo求值?这个后面会写一篇从数学上分析下。

2, 多次运行runsp(n),就会发现输出值是不变的而不是随机的,-_-# 事实上,放开上述 print注释,就会发现下面输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
>>> total=3
>>> sc.parallelize(xrange(total),1).map(genRnd).map(funcx).reduce(add)/float(total) * 4
0.896083541418 -0.635625854075
-0.0423532645466 -0.526910255885
0.498518696049 -0.872983895832
1.3333333333333333
>>> sc.parallelize(xrange(total),1).map(genRnd).map(funcx).reduce(add)/float(total) * 4
0.896083541418 -0.635625854075
-0.0423532645466 -0.526910255885
0.498518696049 -0.872983895832
1.3333333333333333
>>> sc.parallelize(xrange(total),1).map(genRnd).map(funcx).reduce(add)/float(total) * 4
0.896083541418 -0.635625854075
-0.0423532645466 -0.526910255885
0.498518696049 -0.872983895832
1.3333333333333333
>>> exit()

不废话了,这应该是pyspark的一个bug,而且对于使用 python random.random()的生成随机科学计算来说更严重。

主要是因为,当用xrange时候,new worker的,如下pyspark的 daemon.py里面代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
if listen_sock in ready_fds:
try:
sock, _ = listen_sock.accept()
except OSError as e:
if e.errno == EINTR:
continue
raise
# Launch a worker process
try:
pid = os.fork()
except OSError as e:
if e.errno in (EAGAIN, EINTR):
time.sleep(1)
pid = os.fork() # error here will shutdown daemon
else:
outfile = sock.makefile(mode='wb')
write_int(e.errno, outfile) # Signal that the fork failed
outfile.flush()
outfile.close()
sock.close()
continue
if pid == 0:
# in child process
listen_sock.close()
try:
# Acknowledge that the fork was successful
outfile = sock.makefile(mode="wb")
write_int(os.getpid(), outfile)
outfile.flush()
outfile.close()
while True:
code = worker(sock)
if not reuse or code:
# wait for closing
try:
while sock.recv(1024):
pass
except Exception:
pass
break
gc.collect()
except:
traceback.print_exc()
os._exit(1)
else:
os._exit(0)
else:
sock.close()

当生成RDD,map之后,reduce,就会进入上述代码,注意其中一句

pid = os.fork()

这句,会fork一个子进程,fork子进程会复制父进程空间,damon.py通过import pyspark.worker 间接import了shuffle.py的 import random,也就是说,每次fork的时候,复制了父的random,python的random是伪随机的,也就是说,子进程的random的下一个状态是确定的,所以会出现上述每次运行得到的随机序列一样的情况。

fix

最简单的fix办法,想必也会想到,就是在fork之后,worker代理调用random之前,进行random.seed(),
这确实是一种方法了,比如在上述代码 code = worker(sock)之前加一句random.seed()

引用
Linux系统调用 fork:
Fork - Linux Programmer’s Manual

系统调用跟我学(2)
最初印象深的fork主题文章是来源于developerworks中国上的一篇文章,可惜太久了找不到,快速阅读可以看下面几篇
Linux进程-基础Linux进程-fork
下面结合源码解析了fork的调用原理
Linux中fork系统调用分析
Linux下fork函数及pthread函数的总结
linux系统编程之进程3进程复制fork,孤儿进程,僵尸进程