ThreadPoolExecutor

Python3.2以后,官方新增了concurrent.futures模块,该模块提供线程池ThreadPoolExecutor和进程池ProcessPoolExecutor

简单使用例子

from concurrent.futures import ThreadPoolExecutor
import time

# 1. 定义一个任务函数
def task(name):
    time.sleep(5)
    print(f"Executing task {name}")
    return 1

# 2. 创建 ThreadPoolExecutor 对象
with ThreadPoolExecutor(max_workers=4) as executor:  # executor = ThreadPoolExecutor(max_workers=3)
    # 3. 提交任务给线程池执行
    executor.submit(task, "Task 1")
    executor.submit(task, "Task 2")
    s = executor.submit(task, "Task 3")
    # 4. 使用result函数获取结果
    print(s.result()) # s.result 会阻塞后续的执行
		print(1)

# executor.shutdown()  # 使用with语法时不需要手动关闭线程池

#打印
# Executing task Task 2Executing task Task 3Executing task Task 1


# 9999
# 1

使用with语句

使用with语句可以不用调用用executor.shutdown()

import shutil
with ThreadPoolExecutor(max_workers=4) as e:
    e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
    e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
    e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
    e.submit(shutil.copy, 'src4.txt', 'dest4.txt')

查看子线程是否结束

def task(name, second):
    time.sleep(second)
    print(f"Executing task {name}")
    return 9999

# 2. 创建 ThreadPoolExecutor 对象
with ThreadPoolExecutor(max_workers=7) as executor:  # executor = ThreadPoolExecutor(max_workers=3)
    # 3. 提交任务给线程池执行
    s1 = executor.submit(task, "Task 1", 1)
    s2 = executor.submit(task, "Task 2", 7)
    s3 = executor.submit(task, "Task 3", 4)

    time.sleep(2)
    print(f"{s1.done()}")
    print(f"{s2.done()}")
    
# 打印
# Executing task Task 1
# True
# False
# Executing task Task 3
# Executing task Task 2

获取子线程返回

通过result函数获取子线程返回值

result函数可以接受一个timeout参数,表示最大等待时间,使用result函数会阻塞主线程

  • 不设置:则默认一直等到子线程结束,获取返回值。
  • 设置一个值,则表示最多等待 timeout 秒
    1. 如果还没到timeout秒,子线程就结束了,则获取到子线程返回值
    2. 如果超过了timeout秒,子线程还没结束,则主线程抛出TimeoutError异常,但子线程依旧会执行,直到返回
with ThreadPoolExecutor(max_workers=7) as executor:  # executor = ThreadPoolExecutor(max_workers=3)
    # 3. 提交任务给线程池执行
    s1 = executor.submit(task, "Task 1", 1)
    s2 = executor.submit(task, "Task 2", 7)
    s3 = executor.submit(task, "Task 3", 4)

    t1 = int(time.time())
    try:
        print(s3.result(timeout=2))
    except Exception as e:
        print("异常了")
        pass
    t2 = int(time.time())
    print(t2 - t1)
    
    
#Executing task Task 1
#异常了
#2
#Executing task Task 3
#Executing task Task 2

等待部分或所有子线程结束

通过wait函数等待

wait(fs, timeout=None, return_when=ALL_COMPLETED)

FIRST_COMPLETED: wait返回结果的条件,默认为 ALL_COMPLETED 全部执行完成再返回

通过wait函数等待所有子线程结束,wait函数可以接收一个timeout参数,表示最大等待时间。

  • 不设置,则默认一直等到所有子线程结束,会阻塞主线程

  • 设置一个值,则表示最多等timeout秒

    1. 如果还没到timeout秒,子线程就结束了,则继续执行主线程

    2. 如果超过了timeout秒,子线程还没结束,也继续执行主线程

      with ThreadPoolExecutor(max_workers=7) as executor:  # executor = ThreadPoolExecutor(max_workers=3)
          s1 = executor.submit(task, "Task 1", 1)
          s2 = executor.submit(task, "Task 2", 7)
          s3 = executor.submit(task, "Task 3", 4)
      
          t1 = int(time.time())
          wait([s1, s3], return_when=FIRST_COMPLETED)
          t2 = int(time.time())
          print(f"wait等待了秒{t2 - t1}s")
          print(s3.result())
      
      # Executing task Task 1
      # Executing task Task 3
      # wait等待了秒4s
      # 9999
      # Executing task Task 2
      
      
          t1 = int(time.time())
          wait([s1, s3], timeout=2)
          t2 = int(time.time())
          print(f"wait等待了秒{t2 - t1}s")
          print(s1.result())
          print(s3.result())
          print("end")
      
      # Executing task Task 1
      # wait等待了秒2s
      # 9999
      # Executing task Task 3
      # 9999
      # end
      # Executing task Task 2
      
通过as_completed函数等待

上面wait函数提供了判断任务是否结束的方法,但是不能在主线程中一直判断啊。最好的方法是当某个任务结束了,就给主线程返回结果,而不是一直判断每个任务是否结束。

ThreadPoolExecutorThreadPoolExecutor 中 的 as_completed() 就是这样一个方法,当子线程中的任务执行完后,直接用 result() 获取返回结果

as_completed() 方法是一个生成器,在没有任务完成的时候,会一直阻塞,除非设置了 timeout。

当有某个任务完成的时候,会 yield 这个任务,就能执行 for 循环下面的语句,然后继续阻塞住,循环到所有的任务结束。

同时,先完成的任务会先返回给主线程。

from concurrent.futures import ThreadPoolExecutor, as_completed, wait, FIRST_COMPLETED, ALL_COMPLETED
import time

def task(name, second):
    time.sleep(second)
    print(f"Executing task {name}")
    return name

with ThreadPoolExecutor(max_workers=7) as executor:  # executor = ThreadPoolExecutor(max_workers=3)
    # 3. 提交任务给线程池执行
    s1 = executor.submit(task, "Task 1", 2)
    s2 = executor.submit(task, "Task 2", 7)
    s3 = executor.submit(task, "Task 3", 4)

    t1 = int(time.time())

    for future in as_completed([s3, s1]):
        print(future.result())
        
# 结果, 先执行完的先返回结果
Executing task Task 1
Task 1
Executing task Task 3
Task 3
Executing task Task 2

通过map函数等待

map(fn, *iterables, timeout=None)

fn: 第一个参数 fn 是需要线程执行的函数; iterables:第二个参数接受一个可迭代对象; timeout: 第三个参数 timeout 跟 wait() 的 timeout 一样,但由于 map 是返回线程执行的结果,如果 timeout小于线程执行时间会抛异常 TimeoutError。

from concurrent.futures import ThreadPoolExecutor, as_completed, wait, FIRST_COMPLETED, ALL_COMPLETED
import time

def task(args):
    name, second = args  # 这里使用元组解构
    time.sleep(second)
    return name

with ThreadPoolExecutor(max_workers=7) as executor:
    i = 1
    for result in executor.map(task, [("task 1", 5), ("task 2", 2), ("task 3", 7)]):
        print("task{}:{}".format(i, result))
        i += 1

        
# 结果
task1:task 1
task2:task 2
task3:task 3

可以看到执行结果与上面的 as_completed() 方法的结果不同,输出顺序和列表的顺序相同,就算 2s 的任务先执行完成,也会先打印前面提交的任务返回的结果。