QThreadPoolExecutor¶
QThreadPoolExecutor 是一个 PySide6 线程池执行器,提供与 concurrent.futures.ThreadPoolExecutor 兼容的 API,用于管理和执行多个并发任务。
类概述¶
主要特性¶
- API 兼容:与
concurrent.futures.ThreadPoolExecutor完全兼容 - 自动管理:自动管理线程池大小和任务调度
- 回调机制:支持池级别完成回调和任务级别失败回调
- 线程命名:支持线程命名和初始化器
- 任务控制:支持任务取消和强制停止
- Qt 集成:与 Qt 事件循环无缝集成
- 内存安全:自动管理线程生命周期,防止内存泄漏
构造函数¶
Python
def __init__(
self,
max_workers: Optional[int] = None,
thread_name_prefix: str = "",
initializer: Optional[Callable] = None,
initargs: Tuple = (),
)
初始化线程池执行器。
参数¶
| 参数 | 类型 | 默认值 | 描述 |
|---|---|---|---|
max_workers |
Optional[int] |
None |
最大工作线程数。如果为 None,默认为 min((CPU 核心数 * 2), 32) |
thread_name_prefix |
str |
"" |
线程名称前缀,用于调试和日志记录。最终格式为 "{前缀}-Worker-{序号}" |
initializer |
Optional[Callable] |
None |
每个工作线程启动时调用的初始化函数,在任务执行前调用 |
initargs |
Tuple |
() |
传递给 initializer 的参数元组 |
异常¶
ValueError:当max_workers <= 0时
示例¶
Python
# 基本用法
pool = QThreadPoolExecutor(max_workers=4)
# 带线程命名
pool = QThreadPoolExecutor(
max_workers=2,
thread_name_prefix="Worker"
)
# 带初始化器
def init_worker(worker_id):
print(f"Worker {worker_id} initialized")
pool = QThreadPoolExecutor(
max_workers=3,
thread_name_prefix="MyWorker",
initializer=init_worker,
initargs=("TestWorker",)
)
主要方法¶
submit()¶
提交任务到线程池执行。
参数¶
| 参数 | 类型 | 描述 |
|---|---|---|
fn |
Callable |
要执行的可调用对象 |
*args |
tuple |
传递给 fn 的位置参数 |
**kwargs |
dict |
传递给 fn 的关键字参数 |
返回值¶
QThreadWithReturn:代表异步执行结果的 Future 对象
异常¶
RuntimeError:当线程池已关闭时
示例¶
Python
pool = QThreadPoolExecutor(max_workers=2)
# 提交简单任务
future = pool.submit(lambda x: x ** 2, 5)
result = future.result() # 25
# 提交带参数的任务
def greet(name, message="Hello"):
return f"{message}, {name}!"
future = pool.submit(greet, "Alice", message="Hi")
result = future.result() # "Hi, Alice!"
# 批量提交
futures = [
pool.submit(str.upper, text)
for text in ['hello', 'world', 'python']
]
results = [f.result() for f in futures]
# ['HELLO', 'WORLD', 'PYTHON']
shutdown()¶
Python
def shutdown(
self,
force_stop: bool = False,
*,
cancel_futures: bool = False,
wait: bool = False,
) -> None
关闭线程池。
参数¶
| 参数 | 类型 | 默认值 | 描述 |
|---|---|---|---|
force_stop |
bool |
False |
如果为 True,立即强制终止所有任务(最高优先级) |
cancel_futures |
bool |
False |
如果为 True,取消所有待处理的任务 |
wait |
bool |
False |
如果为 True,阻塞直到任务完成(会发出警告) |
优先级说明¶
force_stop=True:最高优先级,立即强制停止所有任务并触发回调,忽略其他参数cancel_futures=True:取消待处理任务,等待活跃任务完成(如果wait=True)wait=True:阻塞直到所有任务完成(会发出警告)
注意事项¶
shutdown后不能再提交新任务force_stop=True会立即标记所有任务为完成并触发池级别回调wait=True会发出警告,因为可能导致 UI 无响应- 池级别完成回调会在所有任务完成后自动触发
示例¶
Python
pool = QThreadPoolExecutor(max_workers=2)
# 提交一些任务
futures = [pool.submit(time.sleep, 1) for _ in range(5)]
# 立即强制停止所有任务
pool.shutdown(force_stop=True)
# 取消待处理任务,等待活跃任务
pool.shutdown(cancel_futures=True, wait=True)
# UI应用中推荐使用异步关闭
pool.shutdown() # 不阻塞主线程
add_done_callback()¶
添加池级别完成回调,当所有任务完成时执行。
参数¶
| 参数 | 类型 | 描述 |
|---|---|---|
callback |
Callable |
回调函数,签名为 callback(),无参数 |
说明¶
回调函数会在主线程中执行(如果存在 Qt 应用),在以下情况触发: - 所有活跃任务已完成 - 所有待处理任务已处理
可以注册多个回调,它们将按注册顺序执行。
示例¶
Python
pool = QThreadPoolExecutor(max_workers=2)
# 添加完成回调
pool.add_done_callback(lambda: print("所有任务完成!"))
pool.add_done_callback(lambda: print("可以开始下一阶段了"))
# 提交任务
futures = [pool.submit(time.sleep, 1) for _ in range(3)]
# 任务完成后自动触发回调,无需调用 shutdown()
add_failure_callback()¶
添加任务级别失败回调,当任何任务失败时执行。
参数¶
| 参数 | 类型 | 描述 |
|---|---|---|
callback |
Callable |
回调函数,签名为 callback(exception) 或 callback() |
说明¶
回调函数会在主线程中执行(如果存在 Qt 应用),对每个失败的任务调用一次。可以注册多个回调。
支持无参数和单参数两种签名。
示例¶
Python
pool = QThreadPoolExecutor(max_workers=2)
# 添加失败回调
pool.add_failure_callback(lambda e: print(f"任务失败: {e}"))
pool.add_failure_callback(lambda: print("检测到失败"))
# 提交会失败的任务
future = pool.submit(lambda: 1/0) # 触发两个回调
静态方法¶
as_completed()¶
Python
@staticmethod
def as_completed(
fs: Iterable["QThreadWithReturn"],
timeout_ms: int = -1
) -> Iterator["QThreadWithReturn"]:
返回一个迭代器,按完成顺序生成 Future 对象。
参数¶
| 参数 | 类型 | 默认值 | 描述 |
|---|---|---|---|
fs |
Iterable["QThreadWithReturn"] |
必需 | QThreadWithReturn 对象的可迭代集合 |
timeout_ms |
int |
-1 |
等待的最大毫秒数。<=0 表示无超时 |
返回值¶
Iterator["QThreadWithReturn"]:按完成顺序返回的 Future 对象迭代器
异常¶
TimeoutError:如果在超时时间内没有 Future 完成TypeError:如果timeout_ms不是数字类型
示例¶
Python
pool = QThreadPoolExecutor(max_workers=4)
# 提交多个任务
futures = [
pool.submit(task, i)
for i in range(5)
]
# 按完成顺序处理结果
for future in QThreadPoolExecutor.as_completed(futures):
try:
result = future.result()
print(f"任务完成,结果: {result}")
except Exception as e:
print(f"任务失败: {e}")
# 带超时的处理
try:
for future in QThreadPoolExecutor.as_completed(futures, timeout_ms=5000):
result = future.result()
print(f"结果: {result}")
except TimeoutError:
print("等待超时")
上下文管理器¶
虽然支持上下文管理器,但在 GUI 应用中不建议使用,因为会导致 UI 阻塞。
Python
# 不推荐在 GUI 应用中使用
with QThreadPoolExecutor(max_workers=2) as pool:
futures = [pool.submit(task, i) for i in range(5)]
results = [f.result() for f in futures] # 这里会阻塞 UI
完整使用示例¶
基础示例¶
Python
import time
from PySide6.QtWidgets import QApplication, QWidget, QPushButton, QLabel, QVBoxLayout
from qthreadwithreturn import QThreadPoolExecutor
class PoolExample(QWidget):
def __init__(self):
super().__init__()
self.setWindowTitle("线程池示例")
self.setGeometry(100, 100, 400, 300)
layout = QVBoxLayout()
self.start_btn = QPushButton("开始批量任务")
self.start_btn.clicked.connect(self.start_batch_tasks)
layout.addWidget(self.start_btn)
self.status_label = QLabel("就绪")
layout.addWidget(self.status_label)
self.result_label = QLabel("")
layout.addWidget(self.result_label)
self.setLayout(layout)
self.pool = None
self.results = []
def start_batch_tasks(self):
"""启动批量任务"""
self.start_btn.setEnabled(False)
self.status_label.setText("执行批量任务中...")
self.result_label.setText("")
self.results = []
# 创建线程池
self.pool = QThreadPoolExecutor(max_workers=3)
# 添加池级别回调
self.pool.add_done_callback(self.on_all_tasks_done)
self.pool.add_failure_callback(self.on_task_failed)
# 定义任务函数
def compute_task(task_id):
time.sleep(1 + task_id * 0.5) # 模拟不同耗时
return f"任务 {task_id} 完成"
# 提交多个任务
for i in range(5):
future = self.pool.submit(compute_task, i)
future.add_done_callback(lambda r, tid=i: self.on_task_done(r, tid))
# 关闭线程池(不等待,可以不用直接写,任务完成自动关闭)
self.pool.shutdown()
def on_task_done(self, result, task_id):
"""单个任务完成回调"""
self.results.append(result)
self.status_label.setText(f"已完成 {len(self.results)}/5 个任务")
def on_all_tasks_done(self):
"""所有任务完成回调"""
self.status_label.setText("所有任务完成!")
self.result_label.setText("\n".join(self.results))
self.start_btn.setEnabled(True)
def on_task_failed(self, error):
"""任务失败回调"""
self.status_label.setText(f"任务失败: {error}")
self.start_btn.setEnabled(True)
if __name__ == "__main__":
app = QApplication([])
window = PoolExample()
window.show()
app.exec()
高级示例:使用 as_completed¶
Python
import time
import random
from PySide6.QtWidgets import QApplication, QWidget, QPushButton, QTextEdit, QVBoxLayout
from qthreadwithreturn import QThreadPoolExecutor
class AsCompletedExample(QWidget):
def __init__(self):
super().__init__()
self.setWindowTitle("as_completed 示例")
self.setGeometry(100, 100, 500, 400)
layout = QVBoxLayout()
self.start_btn = QPushButton("开始任务")
self.start_btn.clicked.connect(self.start_tasks)
layout.addWidget(self.start_btn)
self.result_text = QTextEdit()
self.result_text.setReadOnly(True)
layout.addWidget(self.result_text)
self.setLayout(layout)
self.pool = None
def start_tasks(self):
"""启动任务并使用 as_completed 处理"""
self.start_btn.setEnabled(False)
self.result_text.clear()
self.result_text.append("开始执行任务...\n")
# 创建线程池
self.pool = QThreadPoolExecutor(max_workers=3)
# 定义随机耗时任务
def random_task(task_id):
delay = random.uniform(0.5, 3.0)
time.sleep(delay)
if random.random() < 0.2: # 20% 概率失败
raise ValueError(f"任务 {task_id} 随机失败")
return f"任务 {task_id} 耗时 {delay:.2f}s"
# 提交任务
futures = [self.pool.submit(random_task, i) for i in range(8)]
# 使用 as_completed 按完成顺序处理
self.process_as_completed(futures)
# 关闭线程池
self.pool.shutdown()
def process_as_completed(self, futures):
"""处理 as_completed 结果"""
try:
for future in QThreadPoolExecutor.as_completed(futures, timeout_ms=10000):
try:
result = future.result()
self.result_text.append(f"✓ {result}")
except Exception as e:
self.result_text.append(f"✗ 任务失败: {e}")
except TimeoutError:
self.result_text.append("⚠ 等待超时")
self.result_text.append("\n所有任务处理完成!")
self.start_btn.setEnabled(True)
if __name__ == "__main__":
app = QApplication([])
window = AsCompletedExample()
window.show()
app.exec()
最佳实践¶
1. 线程池大小设置¶
Python
import os
# CPU 密集型任务:CPU 核心数
cpu_pool = QThreadPoolExecutor(max_workers=os.cpu_count())
# I/O 密集型任务:CPU 核心数 * 2-5
io_pool = QThreadPoolExecutor(max_workers=os.cpu_count() * 3)
# 混合任务:根据实际情况调整
mixed_pool = QThreadPoolExecutor(max_workers=8)
2. 错误处理¶
Python
pool = QThreadPoolExecutor(max_workers=4)
# 添加全局错误处理
pool.add_failure_callback(lambda e: log_error(e))
# 为每个任务添加特定错误处理
future = pool.submit(risky_task)
future.add_failure_callback(lambda e: handle_task_error(e, future))
注意事项¶
- GUI 应用:避免使用
with语句,因为它会阻塞 UI - 线程池大小:根据任务类型(CPU 密集型 vs I/O 密集型)合理设置
- 错误处理:始终添加失败回调来处理异常
- 资源清理:在应用退出时调用
shutdown(force_stop=True) - 回调顺序:多个回调按注册顺序执行
- 内存管理:库会自动管理内存,但建议在适当时机调用
shutdown()