跳转至

QThreadPoolExecutor

QThreadPoolExecutor 是一个 PySide6 线程池执行器,提供与 concurrent.futures.ThreadPoolExecutor 兼容的 API,用于管理和执行多个并发任务。

类概述

Python
class QThreadPoolExecutor:
    """PySide6 线程池执行器"""

主要特性

  • API 兼容:与 concurrent.futures.ThreadPoolExecutor 完全兼容
  • 自动管理:自动管理线程池大小和任务调度
  • 回调机制:支持池级别完成回调和任务级别失败回调
  • 线程命名:支持线程命名和初始化器
  • 任务控制:支持任务取消和强制停止
  • Qt 集成:与 Qt 事件循环无缝集成
  • 内存安全:自动管理线程生命周期,防止内存泄漏

构造函数

Python
def __init__(
    self,
    max_workers: Optional[int] = None,
    thread_name_prefix: str = "",
    initializer: Optional[Callable] = None,
    initargs: Tuple = (),
)

初始化线程池执行器。

参数

参数 类型 默认值 描述
max_workers Optional[int] None 最大工作线程数。如果为 None,默认为 min((CPU 核心数 * 2), 32)
thread_name_prefix str "" 线程名称前缀,用于调试和日志记录。最终格式为 "{前缀}-Worker-{序号}"
initializer Optional[Callable] None 每个工作线程启动时调用的初始化函数,在任务执行前调用
initargs Tuple () 传递给 initializer 的参数元组

异常

  • ValueError:当 max_workers <= 0

示例

Python
# 基本用法
pool = QThreadPoolExecutor(max_workers=4)

# 带线程命名
pool = QThreadPoolExecutor(
    max_workers=2,
    thread_name_prefix="Worker"
)

# 带初始化器
def init_worker(worker_id):
    print(f"Worker {worker_id} initialized")

pool = QThreadPoolExecutor(
    max_workers=3,
    thread_name_prefix="MyWorker",
    initializer=init_worker,
    initargs=("TestWorker",)
)

主要方法

submit()

Python
def submit(self, fn: Callable, /, *args, **kwargs) -> "QThreadWithReturn"

提交任务到线程池执行。

参数

参数 类型 描述
fn Callable 要执行的可调用对象
*args tuple 传递给 fn 的位置参数
**kwargs dict 传递给 fn 的关键字参数

返回值

  • QThreadWithReturn:代表异步执行结果的 Future 对象

异常

  • RuntimeError:当线程池已关闭时

示例

Python
pool = QThreadPoolExecutor(max_workers=2)

# 提交简单任务
future = pool.submit(lambda x: x ** 2, 5)
result = future.result()  # 25

# 提交带参数的任务
def greet(name, message="Hello"):
    return f"{message}, {name}!"

future = pool.submit(greet, "Alice", message="Hi")
result = future.result()  # "Hi, Alice!"

# 批量提交
futures = [
    pool.submit(str.upper, text)
    for text in ['hello', 'world', 'python']
]
results = [f.result() for f in futures]
# ['HELLO', 'WORLD', 'PYTHON']

shutdown()

Python
def shutdown(
    self,
    force_stop: bool = False,
    *,
    cancel_futures: bool = False,
    wait: bool = False,
) -> None

关闭线程池。

参数

参数 类型 默认值 描述
force_stop bool False 如果为 True,立即强制终止所有任务(最高优先级)
cancel_futures bool False 如果为 True,取消所有待处理的任务
wait bool False 如果为 True,阻塞直到任务完成(会发出警告)

优先级说明

  1. force_stop=True:最高优先级,立即强制停止所有任务并触发回调,忽略其他参数
  2. cancel_futures=True:取消待处理任务,等待活跃任务完成(如果 wait=True
  3. wait=True:阻塞直到所有任务完成(会发出警告)

注意事项

  • shutdown 后不能再提交新任务
  • force_stop=True 会立即标记所有任务为完成并触发池级别回调
  • wait=True 会发出警告,因为可能导致 UI 无响应
  • 池级别完成回调会在所有任务完成后自动触发

示例

Python
pool = QThreadPoolExecutor(max_workers=2)

# 提交一些任务
futures = [pool.submit(time.sleep, 1) for _ in range(5)]

# 立即强制停止所有任务
pool.shutdown(force_stop=True)

# 取消待处理任务,等待活跃任务
pool.shutdown(cancel_futures=True, wait=True)

# UI应用中推荐使用异步关闭
pool.shutdown()  # 不阻塞主线程

add_done_callback()

Python
def add_done_callback(self, callback: Callable) -> None

添加池级别完成回调,当所有任务完成时执行。

参数

参数 类型 描述
callback Callable 回调函数,签名为 callback(),无参数

说明

回调函数会在主线程中执行(如果存在 Qt 应用),在以下情况触发: - 所有活跃任务已完成 - 所有待处理任务已处理

可以注册多个回调,它们将按注册顺序执行。

示例

Python
pool = QThreadPoolExecutor(max_workers=2)

# 添加完成回调
pool.add_done_callback(lambda: print("所有任务完成!"))
pool.add_done_callback(lambda: print("可以开始下一阶段了"))

# 提交任务
futures = [pool.submit(time.sleep, 1) for _ in range(3)]

# 任务完成后自动触发回调,无需调用 shutdown()

add_failure_callback()

Python
def add_failure_callback(self, callback: Callable) -> None

添加任务级别失败回调,当任何任务失败时执行。

参数

参数 类型 描述
callback Callable 回调函数,签名为 callback(exception)callback()

说明

回调函数会在主线程中执行(如果存在 Qt 应用),对每个失败的任务调用一次。可以注册多个回调。

支持无参数和单参数两种签名。

示例

Python
pool = QThreadPoolExecutor(max_workers=2)

# 添加失败回调
pool.add_failure_callback(lambda e: print(f"任务失败: {e}"))
pool.add_failure_callback(lambda: print("检测到失败"))

# 提交会失败的任务
future = pool.submit(lambda: 1/0)  # 触发两个回调

静态方法

as_completed()

Python
@staticmethod
def as_completed(
    fs: Iterable["QThreadWithReturn"],
    timeout_ms: int = -1
) -> Iterator["QThreadWithReturn"]:

返回一个迭代器,按完成顺序生成 Future 对象。

参数

参数 类型 默认值 描述
fs Iterable["QThreadWithReturn"] 必需 QThreadWithReturn 对象的可迭代集合
timeout_ms int -1 等待的最大毫秒数。<=0 表示无超时

返回值

  • Iterator["QThreadWithReturn"]:按完成顺序返回的 Future 对象迭代器

异常

  • TimeoutError:如果在超时时间内没有 Future 完成
  • TypeError:如果 timeout_ms 不是数字类型

示例

Python
pool = QThreadPoolExecutor(max_workers=4)

# 提交多个任务
futures = [
    pool.submit(task, i) 
    for i in range(5)
]

# 按完成顺序处理结果
for future in QThreadPoolExecutor.as_completed(futures):
    try:
        result = future.result()
        print(f"任务完成,结果: {result}")
    except Exception as e:
        print(f"任务失败: {e}")

# 带超时的处理
try:
    for future in QThreadPoolExecutor.as_completed(futures, timeout_ms=5000):
        result = future.result()
        print(f"结果: {result}")
except TimeoutError:
    print("等待超时")

上下文管理器

虽然支持上下文管理器,但在 GUI 应用中不建议使用,因为会导致 UI 阻塞。

Python
# 不推荐在 GUI 应用中使用
with QThreadPoolExecutor(max_workers=2) as pool:
    futures = [pool.submit(task, i) for i in range(5)]
    results = [f.result() for f in futures]  # 这里会阻塞 UI

完整使用示例

基础示例

Python
import time
from PySide6.QtWidgets import QApplication, QWidget, QPushButton, QLabel, QVBoxLayout
from qthreadwithreturn import QThreadPoolExecutor


class PoolExample(QWidget):
    def __init__(self):
        super().__init__()
        self.setWindowTitle("线程池示例")
        self.setGeometry(100, 100, 400, 300)

        layout = QVBoxLayout()

        self.start_btn = QPushButton("开始批量任务")
        self.start_btn.clicked.connect(self.start_batch_tasks)
        layout.addWidget(self.start_btn)

        self.status_label = QLabel("就绪")
        layout.addWidget(self.status_label)

        self.result_label = QLabel("")
        layout.addWidget(self.result_label)

        self.setLayout(layout)
        self.pool = None
        self.results = []

    def start_batch_tasks(self):
        """启动批量任务"""
        self.start_btn.setEnabled(False)
        self.status_label.setText("执行批量任务中...")
        self.result_label.setText("")
        self.results = []

        # 创建线程池
        self.pool = QThreadPoolExecutor(max_workers=3)

        # 添加池级别回调
        self.pool.add_done_callback(self.on_all_tasks_done)
        self.pool.add_failure_callback(self.on_task_failed)

        # 定义任务函数
        def compute_task(task_id):
            time.sleep(1 + task_id * 0.5)  # 模拟不同耗时
            return f"任务 {task_id} 完成"

        # 提交多个任务
        for i in range(5):
            future = self.pool.submit(compute_task, i)
            future.add_done_callback(lambda r, tid=i: self.on_task_done(r, tid))

        # 关闭线程池(不等待,可以不用直接写,任务完成自动关闭)
        self.pool.shutdown()

    def on_task_done(self, result, task_id):
        """单个任务完成回调"""
        self.results.append(result)
        self.status_label.setText(f"已完成 {len(self.results)}/5 个任务")

    def on_all_tasks_done(self):
        """所有任务完成回调"""
        self.status_label.setText("所有任务完成!")
        self.result_label.setText("\n".join(self.results))
        self.start_btn.setEnabled(True)

    def on_task_failed(self, error):
        """任务失败回调"""
        self.status_label.setText(f"任务失败: {error}")
        self.start_btn.setEnabled(True)


if __name__ == "__main__":
    app = QApplication([])
    window = PoolExample()
    window.show()
    app.exec()

高级示例:使用 as_completed

Python
import time
import random
from PySide6.QtWidgets import QApplication, QWidget, QPushButton, QTextEdit, QVBoxLayout
from qthreadwithreturn import QThreadPoolExecutor


class AsCompletedExample(QWidget):
    def __init__(self):
        super().__init__()
        self.setWindowTitle("as_completed 示例")
        self.setGeometry(100, 100, 500, 400)

        layout = QVBoxLayout()

        self.start_btn = QPushButton("开始任务")
        self.start_btn.clicked.connect(self.start_tasks)
        layout.addWidget(self.start_btn)

        self.result_text = QTextEdit()
        self.result_text.setReadOnly(True)
        layout.addWidget(self.result_text)

        self.setLayout(layout)
        self.pool = None

    def start_tasks(self):
        """启动任务并使用 as_completed 处理"""
        self.start_btn.setEnabled(False)
        self.result_text.clear()
        self.result_text.append("开始执行任务...\n")

        # 创建线程池
        self.pool = QThreadPoolExecutor(max_workers=3)

        # 定义随机耗时任务
        def random_task(task_id):
            delay = random.uniform(0.5, 3.0)
            time.sleep(delay)
            if random.random() < 0.2:  # 20% 概率失败
                raise ValueError(f"任务 {task_id} 随机失败")
            return f"任务 {task_id} 耗时 {delay:.2f}s"

        # 提交任务
        futures = [self.pool.submit(random_task, i) for i in range(8)]

        # 使用 as_completed 按完成顺序处理
        self.process_as_completed(futures)

        # 关闭线程池
        self.pool.shutdown()

    def process_as_completed(self, futures):
        """处理 as_completed 结果"""
        try:
            for future in QThreadPoolExecutor.as_completed(futures, timeout_ms=10000):
                try:
                    result = future.result()
                    self.result_text.append(f"✓ {result}")
                except Exception as e:
                    self.result_text.append(f"✗ 任务失败: {e}")
        except TimeoutError:
            self.result_text.append("⚠ 等待超时")

        self.result_text.append("\n所有任务处理完成!")
        self.start_btn.setEnabled(True)


if __name__ == "__main__":
    app = QApplication([])
    window = AsCompletedExample()
    window.show()
    app.exec()

最佳实践

1. 线程池大小设置

Python
import os

# CPU 密集型任务:CPU 核心数
cpu_pool = QThreadPoolExecutor(max_workers=os.cpu_count())

# I/O 密集型任务:CPU 核心数 * 2-5
io_pool = QThreadPoolExecutor(max_workers=os.cpu_count() * 3)

# 混合任务:根据实际情况调整
mixed_pool = QThreadPoolExecutor(max_workers=8)

2. 错误处理

Python
pool = QThreadPoolExecutor(max_workers=4)

# 添加全局错误处理
pool.add_failure_callback(lambda e: log_error(e))

# 为每个任务添加特定错误处理
future = pool.submit(risky_task)
future.add_failure_callback(lambda e: handle_task_error(e, future))

注意事项

  1. GUI 应用:避免使用 with 语句,因为它会阻塞 UI
  2. 线程池大小:根据任务类型(CPU 密集型 vs I/O 密集型)合理设置
  3. 错误处理:始终添加失败回调来处理异常
  4. 资源清理:在应用退出时调用 shutdown(force_stop=True)
  5. 回调顺序:多个回调按注册顺序执行
  6. 内存管理:库会自动管理内存,但建议在适当时机调用 shutdown()