跳转至

Python 多线程处理

ThreadPoolExecutor 是 Python concurrent.futures 模块中提供的一个用于并行任务处理的类,它能够创建一个线程池并为多个任务提供并行执行的能力。ThreadPoolExecutor 使得多线程编程变得更加简洁和高效。

1. 导入 ThreadPoolExecutor

要使用 ThreadPoolExecutor,需要导入 concurrent.futures 模块:

from concurrent.futures import ThreadPoolExecutor

2. ThreadPoolExecutor 的基本用法

ThreadPoolExecutor 中,你可以定义并行执行的任务数量,并使用 submit()map() 方法来提交任务。

示例 1:使用 submit() 提交任务

from concurrent.futures import ThreadPoolExecutor

# 定义一个简单的任务函数
def task(n):
    print(f"Task {n} is running")

# 使用 ThreadPoolExecutor 来并行执行任务
with ThreadPoolExecutor(max_workers=4) as executor:
    # 提交多个任务到线程池
    futures = [executor.submit(task, i) for i in range(5)]

    # 等待所有任务完成
    for future in futures:
        future.result()  # 这将阻塞直到任务完成

代码解析:

  1. ThreadPoolExecutor(max_workers=4):创建一个线程池,最多同时运行 4 个线程。你可以根据需要设置线程池的大小。
  2. submit():将任务提交给线程池,并返回一个 future 对象。通过 future.result() 可以获取任务的返回值或等待任务执行完成。
  3. future.result():会阻塞当前线程,直到 future 对象关联的任务完成。

示例 2:使用 map() 提交任务

map() 方法的作用类似于内建的 map(),但它会将函数并行应用于所有输入数据,并且返回一个迭代器。

from concurrent.futures import ThreadPoolExecutor

# 定义一个简单的任务函数
def task(n):
    print(f"Task {n} is running")
    return n * 2

# 使用 ThreadPoolExecutor 来并行执行任务
with ThreadPoolExecutor(max_workers=4) as executor:
    results = executor.map(task, range(5))  # 将任务应用到每个输入元素

    # 打印结果
    for result in results:
        print(f"Result: {result}")

代码解析:

  1. map()map() 会将任务函数应用到每个输入元素。这里它会将 task 函数应用到 range(5) 中的每个数字。
  2. results:返回一个迭代器,按任务的顺序返回结果。

示例 3:捕获异常

你也可以通过 try/except 来捕获任务执行中的异常。

from concurrent.futures import ThreadPoolExecutor

# 定义一个会抛出异常的任务函数
def task(n):
    if n == 3:
        raise ValueError("An error occurred in task 3")
    return n * 2

# 使用 ThreadPoolExecutor 来并行执行任务
with ThreadPoolExecutor(max_workers=4) as executor:
    futures = [executor.submit(task, i) for i in range(5)]

    for future in futures:
        try:
            result = future.result()  # 获取任务的返回值,如果任务抛出异常,这里会抛出
            print(f"Result: {result}")
        except Exception as e:
            print(f"Task generated an exception: {e}")

代码解析:

  • future.result():如果任务抛出了异常,future.result() 会重新抛出异常,你可以捕获并处理。

4. 总结

  • ThreadPoolExecutor:用于创建一个线程池,可以并行执行多个任务。
  • submit():提交任务到线程池,返回一个 future 对象,你可以通过 future.result() 来获取结果。
  • map():将函数并行应用到多个输入数据,返回一个结果迭代器。
  • max_workers:定义线程池中最多允许同时运行的线程数。

ThreadPoolExecutor 非常适合 I/O 密集型任务,例如网络请求、文件读写等,但对于 CPU 密集型任务,Python 的全局解释器锁(GIL)可能会限制并行的效果,这种情况下使用 ProcessPoolExecutor 可能更合适。