# 多进程
# fork()
Unix/Linux 操作系统提供了一个 fork()
系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是 fork()
调用一次,返回两次,因为操作系统自动把当前父进程复制了一份子进程,然后,分别在父进程和子进程内返回。
Python 的 os 模块封装了常见的系统调用,其中就包括 fork
,可以在 Python 程序中使用 os.fork()
轻松创建子进程,创建进程失败返回一个负数,子进程返回 0,而父进程返回子进程的 ID,调用 fork()
方法后,子进程被创建并会复制父进程的所有内存空间,并从下一句开始父子进程各自独立运行,由于父子进程 fork()
返回值有区别,配合 if
结构可以让父子进程执行不同的内容。父进程 fork()
之前开辟的空间子进程同样拥有,父子进程对各自空间的操作不会相互影响。
相关方法 | 简介 |
---|---|
os.fork() | 创建进程 失败返回负数,子进程返回 0,父进程返回子进程的 ID |
os._exit(status) | 结束一个进程 status 是进程的终止状态 如 os._exit(0) |
sys.exit(status) | 退出进程 status 如果是整数则表示退出状态 status 是字符串则表示退出时打印内容 如 sys.exit("退出进程") |
pid,status = os.wait() | 在父进程中阻塞等待处理子进程退出 pid 退出的子进程的 PID status 子进程退出状态 |
os.getpid() | 获得当前进程的进程 ID |
os.getppid() | 获得父进程的进程 ID |
import os
# pid是返回值,如果创建进程失败返回一个负数,子进程返回0,而父进程返回子进程的ID。
pid = os.fork()
# 后面的内容将在父进程与子进程中独立执行
print("父子进程都要执行")
# 通过返回值来让父子进程执行不同的功能
if pid == 0:
print("这里只有子进程会执行")
else:
print("这里只有父进程在执行")
# 这里以后父子进程都会独立执行
print("父子进程都要执行")
2
3
4
5
6
7
8
9
10
11
12
孤儿进程
父进程先于子进程退出,此时子进程成为孤儿进程。父进程退出之后,子进程会被 PID 为 1 的 init 进程接管,这样子进程就不会受终端退出影响了,使用这个特性就可以创建在后台执行的程序,俗称守护进程(daemon)。
僵尸进程
子进程先于父进程退出,父进程又没有处理子进程的退出状态,此时子进程就会称为僵尸进程。僵尸进程虽然结束,但是会存留部分 PCB 在内存中,大量的僵尸进程会浪费系统的内存资源。
如何避免僵尸进程
1、使用 wait
函数处理子进程退出
import os
pid = os.fork()
if pid == 0:
os._exit(0) #子进程退出
else:
p, status = os.wait() # 阻塞等待子进程退出
2
3
4
5
6
2、创建二级子进程处理僵尸 父进程创建子进程,等待回收子进程,子进程创建二级子进程然后退出,二级子进程变为孤儿进程,和原来父进程一同执行事件
import os
pid = os.fork()
if pid == 0:
p = os.fork() # 二级子进程
if p == 0:
print("这里是二级子进程在执行")
else:
os._exit(0) # 一级子进程退出
else:
os.wait() # 等待一级子进程退出
2
3
4
5
6
7
8
9
10
3、通过信号处理子进程退出
子进程退出时会发送信号给父进程,如果父进程忽略子进程信号,则系统就会自动处理子进程退出。父进程在创建子进程前调用 signal.signal(signal.SIGCHLD,signal.SIG_IGN)
import os
import signal
# 子进程退出时父进程会忽略,此时子进程自动由系统处理
signal.signal(signal.SIGCHLD,signal.SIG_IGN)
pid = os.fork()
if pid == 0:
os._exit(0) #子进程退出
else:
while True:
pass
2
3
4
5
6
7
8
9
10
# multiprocessing
由于 Windows 没有 fork
调用,而如果我们需要在 Windows 上用 Python 编写多进程的程序,我们就需要使用到 multiprocessing
模块。
multiprocessing 常用方法 | 简介 |
---|---|
cpu_count() | 查看当前机器 CPU 核心数量 |
active_children() | 获得目前所有的运行的进程 |
# Process
在 multiprocessing 中,每一个进程都用一个 Process 类来表示。
# 创建进程
# group 分组,很少使用
# target 调用对象(方法名)
# name 别名
# args 调用对象的参数元组
# kwargs 调用对象的字典
multiprocessing.Process(group, target, name, args, kwargs)
2
3
4
5
6
7
Process 常用方法 | 简介 |
---|---|
start() | 开始运行进程 |
join() | 等待进程执行完毕后再执行后面的方法 |
from time import time, sleep
from multiprocessing import Process
def run(name):
print("子进程[%s]开始执行" % name)
sleep(3)
print("子进程[%s]执行结束" % name)
def main():
# 创建子进程
p = Process(target=run, args=("测试",))
p.start() # 开始执行
p.join() # 等待子进程结束后,再往下执行
print("父进程结束")
if __name__ == '__main__':
main()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
子进程[测试]开始执行
子进程[测试]执行结束
父进程结束
2
3
全局变量在多个进程中不能共享,在子进程中修改全局变量对父进程中的全局变量没有影响。因为父进程在创建子进程时对全局变量做了一个备份,父进程中的全局变量与子进程的全局变量完全是不同的两个变量。
from time import time, sleep
from multiprocessing import Process
num = 100
def run(name):
print("子进程[%s]开始执行" % name)
global num
num += 1
print("子进程num:%d" % num)
print("子进程[%s]执行结束" % name)
def main():
# 创建子进程
p = Process(target=run, args=("测试",))
p.start() # 开始执行
p.join() # 等待子进程结束后,再往下执行
# 在子进程中修改全局变量对父进程中的全局变量没有影响
print("父进程结束 num:%s" % num)
if __name__ == '__main__':
main()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
子进程[测试]开始执行
子进程num:101
子进程[测试]执行结束
父进程结束 num:100
2
3
4
# 进程池
如果要启动大量的子进程,可以用进程池的方式批量创建子进程
# 创建进程池
# num 参数标识可以同时执行的进程数量,默认大小是 CPU 核心数
Pool(num)
2
3
Pool 常用方法 | 简介 |
---|---|
apply_async() | 创建进程并放入进程池统一管理 |
close() | 在 close 之后不能再继续往进程池添加新的进程 |
join() | 进程池使用 join 方法前必须先 close |
from time import time, sleep
import os
from multiprocessing.pool import Pool
def run(name):
print("%s子进程开始,进程ID:%d" % (name, os.getpid()))
sleep(3)
print("%s子进程结束,进程ID:%d" % (name, os.getpid()))
def main():
# 创建进程池,表示可以同时执行的进程数量。默认大小是CPU的核心数
p = Pool(2)
for i in range(4):
# 创建进程,放入进程池统一管理
p.apply_async(run, args=(i,))
# 在调用join()之前必须要先close()
p.close()
# 进程池对象调用join,会等待进程池中所有的子进程结束完毕再去结束父进程
p.join()
print("父进程结束。")
if __name__ == '__main__':
main()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 自定义进程类
自定义进程类,继承 Process 类,重写 run 方法就可以了
from time import sleep
from multiprocessing import Process
class MyProcess(Process):
def __init__(self, name):
Process.__init__(self)
self.name = name
def run(self):
print("子进程启动")
sleep(3)
print("子进程结束")
def main():
p = MyProcess("name")
p.start() # 自动调用MyProcess的run()方法
p.join()
print("父进程结束")
if __name__ == '__main__':
main()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
子进程启动
子进程结束
父进程结束
2
3
# 进程间通讯
multiprocessing 提供了多种方法来进行进程间通讯,如 Queue、Pipe、Manager
注意
这里必须使用 multiprocessing 模块中的 Queue、Pipe、Manager
# Queue
多进程安全的队列,可以实现多个进程之间的数据传递,一个进程将数据写入队列,再由队列将数据交给另一进程,不能用于进程池,在进程池中可以使用 Manager 下的 Queue,用法相似
# 创建 Queue
# maxsize 队列中允许的最大项数,省略则无大小限制
Queue(maxsize)
2
3
Queue 常用方法 | 介绍 |
---|---|
get([block[,timeout]]) | 返回队列中的一个项目,如果队列为空方法将阻塞直到队列中有项目可用为止 block 控制阻塞行为,默认为 True,设置为 False 将引发异常 timeout 指定在阻塞模式中等待项目可用的时间长短,超时引发异常 |
get_nowait() | 同 get (False) 方法 |
put(item[,block[,timeout]]) | 将 item 放入队列,如果队列已满将阻塞至有空间可用为止 block 控制阻塞行为,默认为 True,设置为 False 将引发异常 timeout 指定在阻塞模式中等待可用空间的时间长短,超时引发异常 |
qsize() | 返回队列中目前项目的正确数量 |
empty() | 检查队列是否为空 |
full() | 检查队列是否已满 |
close() | 关闭队列,防止队列中加入更多数据 |
from time import time, sleep
from multiprocessing import Process,Queue
def runWrite(q):
q.put("测试数据")
sleep(3)
def runRead(q):
while True:
value = q.get()
print("读取到%s" % value)
def main():
# 父进程创建队列,并传递给子进程
q = Queue()
pw = Process(target=runWrite, args=(q,))
pr = Process(target=runRead, args=(q,))
pw.start()
pr.start()
pw.join()
# pr进程是一个死循环,无法等待其结束,只能强行结束(写进程结束了,所以读进程也可以结束了)
pr.terminate()
print("父进程结束")
if __name__ == '__main__':
main()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
读取到测试数据
父进程结束
2
# Pipe
Pipe 在内存中开辟管道空间,生成管道操作对象,两个进程使用同一个管道对象进行读写即可实现通信, Pipe 的性能高于 Queue,因为 Queue 中加了锁,Pipe 只能用于两个进程通信,在只需要两个进程通信的情况下优先考虑该方法
# 创建 Pipe
# duplex 管道工作模式,True 为双向,False 为单向,默认工作模式是双向
# p1,p2 双向模式 p1,p2 均可读写,单向模式 p1 只读 p2 只写
p1,p2 = Pipe(duplex)
2
3
4
Pipe 方法 | 介绍 |
---|---|
recv() | 从管道获取内容 |
send(data) | 向管道写入内容 |
from time import time, sleep
from multiprocessing import Process, Pipe
def runWrite(p):
p.send("测试数据")
sleep(3)
def runRead(p):
while True:
value = p.recv()
print("读取到%s" % value)
def main():
p1, p2 = Pipe()
pw = Process(target=runWrite, args=(p1,))
pr = Process(target=runRead, args=(p2,))
pw.start()
pr.start()
pw.join()
# pr进程是一个死循环,无法等待其结束,只能强行结束(写进程结束了,所以读进程也可以结束了)
pr.terminate()
print("父进程结束")
if __name__ == '__main__':
main()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
读取到测试数据 父进程结束
# Manager
Manager 控制了一个 server 进程,此进程包含的 python 对象 (list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Queue, Value 和 Array) 可以被其他的进程通过 proxies 来访问实现数据共享,一般需要通过加锁 Lock 来确保数据的安全性。当使用进程池需要实现通讯时需要使用该方法
from time import time, sleep
from multiprocessing import Process, Manager, Pool
def runWrite(q):
q.put("测试数据")
sleep(3)
def runRead(q):
value = q.get()
print("读取到%s" % value)
sleep(3)
def main():
queue = Manager().Queue() # 创建队列
pool = Pool(2) # 创建线程池
pool.apply_async(runWrite, args=(queue,))
pool.apply_async(runRead, args=(queue,))
pool.close()
pool.join()
print("父进程结束")
if __name__ == '__main__':
main()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
读取到测试数据 父进程结束
多个进程共享对象,以 dict 为例 为了确保数据安全使用 Lock 加锁,注意所有进程必须使用相同的 Lock 对象加锁
from time import time, sleep
from multiprocessing import Process, Manager, Pool
def run_test(dic, key, lock):
lock.acquire() # 加锁
dic[key] = "测试数据"
lock.release() # 解锁
sleep(3)
def main():
dic = Manager().dict() # 创建共享的 dict
lock = Manager().Lock() # 创建锁
pool = Pool(2)
for key in ['a', 'b', 'c', 'd']:
pool.apply_async(run_test, args=(dic, key, lock,))
pool.close()
pool.join()
print(dic)
if __name__ == '__main__':
main()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
{'a': '测试数据', 'b': '测试数据', 'd': '测试数据', 'c': '测试数据'}
# 多线程
Python 的标准库提供了两个模块:_thread 和 threading,_thread 是低级模块,threading 是高级模块,对 _thread 进行了封装。绝大多数情况下,我们只需要使用 threading 这个高级模块。 Python 的线程虽然是真正的线程,但解释器执行代码时,有一个 GIL 锁,任何 Python 线程执行前,必须先获得 GIL 锁,每执行 100 条字节码,解释器就自动释放 GIL 锁,让别的线程有机会执行。这个 GIL 全局锁实际上把所有线程的执行代码都给上了锁,所以多线程在 Python 中只能交替执行,即使 100 个线程跑在 100 核 CPU 上,也只能用到 1 个核。所以当需要多核任务时一般使用多进程处理
threading 方法 | 简介 |
---|---|
current_thread() | 返回当前对应调用者的控制线程的 Thread 对象 |
main_thread() | 返回主 Thread 对象 |
# Thread
# 创建线程
# group 分组,很少使用
# target 调用对象(方法名)
# name 别名
# args 调用对象的参数元组
# kwargs 调用对象的字典
threading.Thread([group[,target[,name[,args[,kwargs]]]]])
2
3
4
5
6
7
Thread 常用方法 | 简介 |
---|---|
start() | 开始线程任务 |
join() | 阻塞调用该方法的线程,直到被调用 join 方法的线程终结 |
from time import time, sleep
import threading
def run_test():
print("当前线程,%s" % threading.current_thread().name)
sleep(3)
def main():
print("当前线程,%s" % threading.current_thread().name)
thread = threading.Thread(target=run_test)
thread.start()
thread.join()
print("当前线程,%s" % threading.current_thread().name)
if __name__ == '__main__':
main()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
当前线程,MainThread
当前线程,Thread-1
当前线程,MainThread
2
3
# 通过继承 Thread 的方式创建线程
from time import time, sleep
import threading
class MyThread(threading.Thread):
# 实现 run 方法
def run(self):
print("当前线程,%s" % threading.current_thread().name)
sleep(3)
def main():
print("当前线程,%s" % threading.current_thread().name)
thread = MyThread()
thread.start()
thread.join()
print("当前线程,%s" % threading.current_thread().name)
if __name__ == '__main__':
main()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
当前线程,MainThread
当前线程,Thread-1
当前线程,MainThread
2
3
# 线程池
线程池的基类是 concurrent.futures 模块中的 Executor
, Executor
提供了两个子类 ThreadPoolExecutor
和 ProcessPoolExecutor
,其中 ThreadPoolExecutor
用于创建线程池 ProcessPoolExecutor
用于创建进程池。
线程池具有的功能特性
自动调度线程
主线程可以获取某一个线程(或者任务的)的状态,以及返回值。
当一个线程完成的时候,主线程能够立即知道。
让多线程和多进程的编码接口一致
由于多线程和多进程的编码接口一致,下面介绍的方法只需将 ThreadPoolExecutor
替换为 ProcessPoolExecutor
即可换为进程池
# 创建线程池
# max_workers 设置线程池中最多能同时运行的线程数目
ThreadPoolExecutor(max_workers)
2
3
Executor 常用方法 | 简介 |
---|---|
submit() | 提交任务到线程池 / 进程池 |
map(func, *iterables) | 为 iterables 的每个元素启动一个线程并发执行 func 函数,井收集每个线程的执行结果 |
shutdown() | 关闭线程池 / 进程池 |
程序将任务提交 (submit) 给线程池后, submit
方法会返回一个 Future
对象, Future
类主要用于获取线程任务的返回值。由于线程任务会在新线程中以异步方式执行,因此线程执行的函数相当于一个 “将来完成” 的任务,所以 Python 使用 Future
来代表
Future 常用方法 | 简介 |
---|---|
cancel() | 取消线程任务,已经开始运行的任务无法取消 |
cancelled() | 返回任务是否取消成功 |
running() | 返回任务是否正在运行 |
done() | 返回任务是否已经被取消或执行完成 |
result(timeout) | 获取任务最后返回的结果,如果未完成将阻塞当前线程 timeout 指定最多阻塞多少秒 |
exception() | 返回任务引发的异常,如果无异常返回 None |
add_done_callback(fn) | 为线程任务注册回调函数,任务结束会自动调用该函数 fn 回调函数,回调时会将当前 Future 对象作为参数传入 |
from time import time, sleep
from concurrent.futures import ThreadPoolExecutor, Future
def run(name):
sleep(3)
return "线程 %s 已经执行完毕" % name
def result(future):
print(future.result())
def main():
# 创建线程池
# 使用 with 可以在任务结束时自动关闭线程池
with ThreadPoolExecutor(3) as pool:
future1 = pool.submit(run, "A")
future1.add_done_callback(result)
future2 = pool.submit(run, "B")
future2.add_done_callback(result)
print("---------")
if __name__ == '__main__':
main()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
---------
线程 A 已经执行完毕
线程 B 已经执行完毕
2
3
使用 map
方法启动线程
from time import time, sleep
from concurrent.futures import ThreadPoolExecutor, Future
def run(name):
sleep(3)
return "线程 %s 已经执行完毕" % name
def main():
# 创建线程池
# 使用 with 可以在任务结束时自动关闭线程池
with ThreadPoolExecutor(3) as pool:
results = pool.map(run, ["A", "B"])
print("---------")
for r in results:
print(r)
if __name__ == '__main__':
main()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
---------
线程 A 已经执行完毕
线程 B 已经执行完毕
2
3
# 协程
协程 (微线程) 是比线程更轻量化的存在,一个线程也可以拥有多个协程,协程不是被操作系统内核所管理,而完全是由程序所控制
协程的好处
无需线程上下文切换的开销
无需原子操作锁定及同步的开销
方便切换控制流,简化编程模型
高并发 + 高扩展性 + 低成本:一个 CPU 支持上万的协程都不是问题。所以很适合用于高并发处理。
协程的缺点
无法利用多核资源
进行阻塞操作会阻塞掉整个程序
# 通过生成器 (Generator) 实现
模拟一个下载器 download
与管理者 manager
, manager
将下载地址发送给 download
下载,同时 download
下载后将结果返回 manager
,这里 download
是一个生成器
def download():
result = ""
while True:
# 获取下载地址,返回下载结果
url = yield result
print("开始下载 %s 的内容" % url)
result = "这是 %s 下载的结果" % url
def manager(load):
# 通过 send(None) 启动下载器
load.send(None)
for url in ["url1", "url2"]:
# 发送给下载器下载,获取下载结果
result = load.send(url)
print(result)
load.close()
if __name__ == '__main__':
download = download()
manager(download)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
开始下载 url1 的内容
这是 url1 下载的结果
开始下载 url2 的内容
这是 url2 下载的结果
2
3
4
首先调用 send(None)
启动生成器,然后获取数据并通过 send()
切换到 download
执行,
download
通过 yield
拿到消息处理后又通过 yield
把结果传回, manager
拿到 download
处理的结果继续生产下一条消息,最后通过 close()
关闭 download
来结束。整个流程由一个线程执行, download
和 manager
协作完成任务所以称为 “协程”
# 通过 greenlet 实现
生成器可以实现协程但并不完善也不易于理解,我们可以通过 greenlet
来更好的实现,使用前需先下载 greenlet
模块
pip install greenlet
常用方法 | 简介 |
---|---|
greenlet(func) | 启动一个协程 |
switch() | 切换协程 |
from greenlet import greenlet
def download(url):
while True:
print("开始下载 %s 的内容" % url)
result = "这是 %s 下载的结果" % url
# 切换 manager 运行
url = manager.switch(result)
def manager():
for url in ["url1", "url2"]:
# 切换到下载器
result = download.switch(url)
print(result)
# 创建协程
download = greenlet(download)
manager = greenlet(manager)
# 切换到 manager 执行
manager.switch()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
开始下载 url1 的内容
这是 url1 下载的结果
开始下载 url2 的内容
这是 url2 下载的结果
2
3
4
# 通过 gevent 实现
gevent
封装了 greenlet
,并实现了遇到 IO 自动切换,使用前需先下载 gevent
模块
pip install gevent
gevent 常用方法 | 简介 |
---|---|
spawn(func) | 启动一个协程 |
join() | 等待协程结束 |
joinall() | 可以等待多个协程结束,等同于对每一个都使用 join |
monkey.patch_all() | 调用后可以将一些阻塞式的标准库如 socket、ssl、threading 和 select 等模块变为非阻塞,以便 gevent 自动识别切换协程,只对调用该方法后导入的模块有效 |
import gevent
from gevent import monkey
# 把当前程序的所有 IO 操作标记起来,否则模块无法知道 IO 操作
monkey.patch_all()
from time import sleep
def func1():
print("正在执行 func1 开始")
sleep(1)
print("正在执行 func1 结束")
def func2():
print("正在执行 func2 开始")
sleep(3)
print("正在执行 func2 结束")
def func3():
print("正在执行 func3 开始")
sleep(2)
print("正在执行 func3 结束")
g1 = gevent.spawn(func1)
g2 = gevent.spawn(func2)
g3 = gevent.spawn(func3)
# 等价于 g1.join() g2.join() g3.join()
gevent.joinall([g1, g2, g3])
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
正在执行 func1 开始
正在执行 func2 开始
正在执行 func3 开始
正在执行 func1 结束
正在执行 func3 结束
正在执行 func2 结束
2
3
4
5
6
这里使用 sleep
模拟耗时 IO 操作,可以看到在遇到 sleep
时自动切换到其他协程运行,效果与多线程类似。