Python 多线程与多进程

最近实习接触到这方面的东西,整理了一下

Python多线程并不是真的多线程

Python代码的执行由Python虚拟机(解释器)来控制。Python在设计之初就考虑要在主循环中,同时只有一个线程在执行,就像单CPU的系统中运行多个进程那样,内存中可以存放多个程序,但任意时刻,只有一个程序在CPU中运行。同样地,虽然Python解释器可以运行多个线程,只有一个线程在解释器中运行。对Python虚拟机的访问由全局解释器锁(GIL)来控制,正是这个锁能保证同时只有一个线程在运行

在多线程环境中,Python虚拟机按照以下方式执行。

  1. 设置GIL。
  2. 切换到一个线程去执行。
  3. 运行。
  4. 把线程设置为睡眠状态。
  5. 解锁GIL。
  6. 再次重复以上步骤。

对所有面向I/O的(会调用内建的操作系统C代码的)程序来说,GIL会在这个I/O调用之前被释放,以允许其他线程在这个线程等待I/O的时候运行。如果某线程并未使用很多I/O操作,它会在自己的时间片内一直占用处理器和GIL。也就是说,I/O密集型的Python程序比计算密集型的Python程序更能充分利用多线程的好处

我们都知道,比方我有一个4核的CPU,那么这样一来,在单位时间内每个核只能跑一个线程,然后时间片轮转切换。但是Python不一样,它不管你有几个核,单位时间多个核只能跑一个线程,然后时间片轮转。看起来很不可思议?但是这就是GIL搞的鬼。任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。通常我们用的解释器是官方实现的CPython,要真正利用多核,除非重写一个不带GIL的解释器。

Python 多线程

Python中有三种模式实现多线程:继承Thread类、Thread对象和multiprocessing.dummy线程池

继承Thread类

继承Thread类,通过重写它的run方法实现多线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#!/usr/bin/python
# encoding=utf-8
# 直接从Thread继承,创建一个新的class,把线程执行的代码放到这个新的 class里
import threading
import time
class ThreadImpl(threading.Thread):
def __init__(self, num):
threading.Thread.__init__(self)
self._num = num
def run(self):
global total, mutex
# 打印线程名
print threading.currentThread().getName()
for x in xrange(0, int(self._num)):
# 取得锁
mutex.acquire()
total = total + 1
# 释放锁
mutex.release()
if __name__ == '__main__':
#定义全局变量
global total, mutex
total = 0
# 创建锁
mutex = threading.Lock()
#定义线程池
threads = []
# 创建线程对象
for x in xrange(0, 40):
threads.append(ThreadImpl(100))
# 启动线程
for t in threads:
t.start()
# 等待子线程结束
for t in threads:
t.join()
# 打印执行结果
print total

需要注意的是:

  • 一定要有Thread.__init__(self)这句话
  • 执行的功能函数必须叫run

Thread对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#!/usr/bin/python
# encoding=utf-8
import threading
import time
def threadFunc(num):
global total, mutex
# 打印线程名
print threading.currentThread().getName()
for x in xrange(0, int(num)):
# 取得锁
mutex.acquire()
total = total + 1
# 释放锁
mutex.release()
def main(num):
#定义全局变量
global total, mutex
total = 0
# 创建锁
mutex = threading.Lock()
#定义线程池
threads = []
# 先创建线程对象
for x in xrange(0, num):
threads.append(threading.Thread(target=threadFunc, args=(100,)))
# 启动所有线程
for t in threads:
t.start()
# 主线程中等待所有子线程退出
for t in threads:
t.join()
# 打印执行结果
print total
if __name__ == '__main__':
# 创建40个线程
main(40)

需要注意的是:

  • args=是一个tuple,即使没有参数也应该用()
  • 如果希望子线程异步工作,要设置setDaemon为True;如果希望等待子线程工作结束后主进程再执行,在线程start后join

multiprocessing.dummy线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#!/usr/bin/python
# encoding=utf-8
import multiprocessing
from multiprocessing.dummy import Pool as ThreadPool
class Test(object):
def __init__(self):
self.pool = ThreadPool(processes=8)
def display(self, para):
print(para)
def multi_work(self, trans):
try:
self.pool.map(self.display, trans)
except multiprocessing.TimeoutError:
print("pool time out")
def close(self):
self.pool.close()
self.pool.join()
if __name__ == '__main__':
test = Test()
t = range(0, 6)
test.multi_work(t)
test.close()

注意:

  • pool的join要在close之后执行
  • pool.map有可能超时,尽量捕捉这个错误

Python 多进程

Python 多进程的实现也有三种:继承自multiprocessing.Process类、multiprocessing.process对象和multiprocessing pool进程池

继承自multiprocessing.Process类

这里和多线程第一种实现方式一样

1
2
3
4
5
6
7
8
9
import multiprocessing
import time
class Producer(multiprocessing.Process):
def __init__(self):
multiprocessing.Process.__init__(self)
def run(self):
pass

multiprocessing.process对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from multiprocessing import Process
import os
# 子进程要执行的代码
def run_proc(name):
print('Run child process %s (%s)...' % (name, os.getpid()))
if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Process(target=run_proc, args=('test',))
print('Child process will start.')
p.start()
p.join()
print('Child process end.')

multiprocessing pool进程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from multiprocessing import Pool
import os, time, random
def long_time_task(name):
print('Run task %s (%s)...' % (name, os.getpid()))
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print('Task %s runs %0.2f seconds.' % (name, (end - start)))
if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Pool(4)
for i in range(5):
p.apply_async(long_time_task, args=(i,))
print('Waiting for all subprocesses done...')
p.close()
p.join()
print('All subprocesses done.')

Tips

  1. 多线程的线程间消息传递使用Queue.Queue;多进程使用multiprocessing.Queue;进程池必须使用multiprocessing.manager().Queue
  2. 多进程的消息传递可以采取Queue和Pipe两种高级数据结构,其中Queue是用Pipe实现的。Pipe只能支持两个进程的生产消费关系,如果存在多生产或者多消费的场景,只能用Queue。Pipe的效率高一些,但是高得有限,整体来看,多进程的消息传递的效率不高,尽量不要进行消息传递

参考