ThreadPoolExecutor
Python3.2以后,官方新增了concurrent.futures模块,该模块提供线程池ThreadPoolExecutor和进程池ProcessPoolExecutor
简单使用例子
from concurrent.futures import ThreadPoolExecutor
import time
# 1. 定义一个任务函数
def task(name):
time.sleep(5)
print(f"Executing task {name}")
return 1
# 2. 创建 ThreadPoolExecutor 对象
with ThreadPoolExecutor(max_workers=4) as executor: # executor = ThreadPoolExecutor(max_workers=3)
# 3. 提交任务给线程池执行
executor.submit(task, "Task 1")
executor.submit(task, "Task 2")
s = executor.submit(task, "Task 3")
# 4. 使用result函数获取结果
print(s.result()) # s.result 会阻塞后续的执行
print(1)
# executor.shutdown() # 使用with语法时不需要手动关闭线程池
#打印
# Executing task Task 2Executing task Task 3Executing task Task 1
# 9999
# 1
使用with语句
使用with语句可以不用调用用executor.shutdown()
import shutil
with ThreadPoolExecutor(max_workers=4) as e:
e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
e.submit(shutil.copy, 'src4.txt', 'dest4.txt')
查看子线程是否结束
def task(name, second):
time.sleep(second)
print(f"Executing task {name}")
return 9999
# 2. 创建 ThreadPoolExecutor 对象
with ThreadPoolExecutor(max_workers=7) as executor: # executor = ThreadPoolExecutor(max_workers=3)
# 3. 提交任务给线程池执行
s1 = executor.submit(task, "Task 1", 1)
s2 = executor.submit(task, "Task 2", 7)
s3 = executor.submit(task, "Task 3", 4)
time.sleep(2)
print(f"{s1.done()}")
print(f"{s2.done()}")
# 打印
# Executing task Task 1
# True
# False
# Executing task Task 3
# Executing task Task 2
获取子线程返回
通过result函数获取子线程返回值
result函数可以接受一个timeout参数,表示最大等待时间,使用result函数会阻塞主线程
- 不设置:则默认一直等到子线程结束,获取返回值。
- 设置一个值,则表示最多等待 timeout 秒
- 如果还没到timeout秒,子线程就结束了,则获取到子线程返回值
- 如果超过了timeout秒,子线程还没结束,则主线程抛出TimeoutError异常,但子线程依旧会执行,直到返回
with ThreadPoolExecutor(max_workers=7) as executor: # executor = ThreadPoolExecutor(max_workers=3)
# 3. 提交任务给线程池执行
s1 = executor.submit(task, "Task 1", 1)
s2 = executor.submit(task, "Task 2", 7)
s3 = executor.submit(task, "Task 3", 4)
t1 = int(time.time())
try:
print(s3.result(timeout=2))
except Exception as e:
print("异常了")
pass
t2 = int(time.time())
print(t2 - t1)
#Executing task Task 1
#异常了
#2
#Executing task Task 3
#Executing task Task 2
等待部分或所有子线程结束
通过wait函数等待
wait(fs, timeout=None, return_when=ALL_COMPLETED)
FIRST_COMPLETED: wait返回结果的条件,默认为 ALL_COMPLETED 全部执行完成再返回
通过wait函数等待所有子线程结束,wait函数可以接收一个timeout参数,表示最大等待时间。
-
不设置,则默认一直等到所有子线程结束,会阻塞主线程
-
设置一个值,则表示最多等timeout秒
-
如果还没到timeout秒,子线程就结束了,则继续执行主线程
-
如果超过了timeout秒,子线程还没结束,也继续执行主线程
with ThreadPoolExecutor(max_workers=7) as executor: # executor = ThreadPoolExecutor(max_workers=3) s1 = executor.submit(task, "Task 1", 1) s2 = executor.submit(task, "Task 2", 7) s3 = executor.submit(task, "Task 3", 4) t1 = int(time.time()) wait([s1, s3], return_when=FIRST_COMPLETED) t2 = int(time.time()) print(f"wait等待了秒{t2 - t1}s") print(s3.result()) # Executing task Task 1 # Executing task Task 3 # wait等待了秒4s # 9999 # Executing task Task 2 t1 = int(time.time()) wait([s1, s3], timeout=2) t2 = int(time.time()) print(f"wait等待了秒{t2 - t1}s") print(s1.result()) print(s3.result()) print("end") # Executing task Task 1 # wait等待了秒2s # 9999 # Executing task Task 3 # 9999 # end # Executing task Task 2
-
通过as_completed函数等待
上面wait函数提供了判断任务是否结束的方法,但是不能在主线程中一直判断啊。最好的方法是当某个任务结束了,就给主线程返回结果,而不是一直判断每个任务是否结束。
ThreadPoolExecutorThreadPoolExecutor 中 的 as_completed() 就是这样一个方法,当子线程中的任务执行完后,直接用 result() 获取返回结果
as_completed() 方法是一个生成器,在没有任务完成的时候,会一直阻塞,除非设置了 timeout。
当有某个任务完成的时候,会 yield 这个任务,就能执行 for 循环下面的语句,然后继续阻塞住,循环到所有的任务结束。
同时,先完成的任务会先返回给主线程。
from concurrent.futures import ThreadPoolExecutor, as_completed, wait, FIRST_COMPLETED, ALL_COMPLETED
import time
def task(name, second):
time.sleep(second)
print(f"Executing task {name}")
return name
with ThreadPoolExecutor(max_workers=7) as executor: # executor = ThreadPoolExecutor(max_workers=3)
# 3. 提交任务给线程池执行
s1 = executor.submit(task, "Task 1", 2)
s2 = executor.submit(task, "Task 2", 7)
s3 = executor.submit(task, "Task 3", 4)
t1 = int(time.time())
for future in as_completed([s3, s1]):
print(future.result())
# 结果, 先执行完的先返回结果
Executing task Task 1
Task 1
Executing task Task 3
Task 3
Executing task Task 2
通过map函数等待
map(fn, *iterables, timeout=None)
fn: 第一个参数 fn 是需要线程执行的函数; iterables:第二个参数接受一个可迭代对象; timeout: 第三个参数 timeout 跟 wait() 的 timeout 一样,但由于 map 是返回线程执行的结果,如果 timeout小于线程执行时间会抛异常 TimeoutError。
from concurrent.futures import ThreadPoolExecutor, as_completed, wait, FIRST_COMPLETED, ALL_COMPLETED
import time
def task(args):
name, second = args # 这里使用元组解构
time.sleep(second)
return name
with ThreadPoolExecutor(max_workers=7) as executor:
i = 1
for result in executor.map(task, [("task 1", 5), ("task 2", 2), ("task 3", 7)]):
print("task{}:{}".format(i, result))
i += 1
# 结果
task1:task 1
task2:task 2
task3:task 3
可以看到执行结果与上面的 as_completed() 方法的结果不同,输出顺序和列表的顺序相同,就算 2s 的任务先执行完成,也会先打印前面提交的任务返回的结果。
...