QThreadPoolExecutor 基础使用示例¶
概述¶
QThreadPoolExecutor 是一个线程池执行器,兼容 Python 标准库的 concurrent.futures API,提供了:
- 线程池管理,避免频繁创建销毁线程
- 批量任务处理
- Future 对象支持
- 与标准库相似的接口
基本用法示例¶
1. 简单的批量任务处理¶
Python
import sys
import time
from PySide6.QtWidgets import (
QApplication,
QMainWindow,
QPushButton,
QVBoxLayout,
QWidget,
QTextEdit,
QProgressBar,
)
from qthreadwithreturn import QThreadPoolExecutor
class BatchProcessingExample(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("QThreadPoolExecutor 批量处理示例")
self.setGeometry(100, 100, 600, 400)
self.setup_ui()
self._executor = None
self.futures = [] # 初始化 Future 列表引用
def setup_ui(self):
"""设置用户界面"""
central_widget = QWidget()
layout = QVBoxLayout()
# 控制按钮
self.start_button = QPushButton("开始批量处理")
self.start_button.clicked.connect(self.start_batch_processing)
# 进度条
self.progress_bar = QProgressBar()
self.progress_bar.setRange(0, 100)
self.progress_bar.setValue(0)
# 结果显示区域
self.result_display = QTextEdit()
self.result_display.setReadOnly(True)
self.result_display.append("准备就绪,点击按钮开始批量处理...")
layout.addWidget(self.start_button)
layout.addWidget(self.progress_bar)
layout.addWidget(self.result_display)
central_widget.setLayout(layout)
self.setCentralWidget(central_widget)
def start_batch_processing(self):
"""开始批量处理任务"""
self.start_button.setEnabled(False)
self.progress_bar.setValue(0)
self.result_display.clear()
self.result_display.append("开始批量处理...\n")
# 创建线程池执行器,最多4个工作线程
self._executor = QThreadPoolExecutor(max_workers=4)
# 模拟一批要处理的数据
self.data_to_process = [
{"id": 1, "content": "数据块1", "size": 5},
{"id": 2, "content": "数据块2", "size": 3},
{"id": 3, "content": "数据块3", "size": 8},
{"id": 4, "content": "数据块4", "size": 2},
{"id": 5, "content": "数据块5", "size": 6},
{"id": 6, "content": "数据块6", "size": 4},
{"id": 7, "content": "数据块7", "size": 7},
{"id": 8, "content": "数据块8", "size": 1},
]
self.total_tasks = len(self.data_to_process)
self.completed_tasks = 0
# 添加池级别完成回调 - 所有任务完成时自动调用
self._executor.add_done_callback(self.all_tasks_completed)
# 提交所有任务到线程池,为每个任务添加完成回调
self.futures = []
for data in self.data_to_process:
future = self._executor.submit(self.process_data_item, data)
# 为每个任务添加完成回调来更新进度
future.add_done_callback(self.on_task_completed)
self.futures.append(future)
def process_data_item(self, data_item):
"""处理单个数据项"""
item_id = data_item["id"]
content = data_item["content"]
size = data_item["size"]
print(f"开始处理数据项 {item_id}: {content}")
# 模拟处理时间(根据数据大小)
time.sleep(size * 0.5) # 每个大小单位耗时0.5秒
# 模拟处理结果
processed_content = f"[已处理] {content} (大小: {size})"
processing_time = size * 0.5
print(f"数据项 {item_id} 处理完成")
return {
"id": item_id,
"original_content": content,
"processed_content": processed_content,
"processing_time": processing_time,
"status": "success",
}
def on_task_completed(self, result):
"""单个任务完成的回调"""
self.completed_tasks += 1
# 更新进度条
progress = int((self.completed_tasks / self.total_tasks) * 100)
self.progress_bar.setValue(progress)
# 显示任务结果
self.result_display.append(
f"✓ 任务 {result['id']}: {result['processed_content']} "
f"(耗时: {result['processing_time']:.1f}秒)"
)
self.result_display.append(
f"进度: {self.completed_tasks}/{self.total_tasks} 完成\n"
)
def all_tasks_completed(self):
"""所有任务完成处理 - 由线程池自动调用"""
self.result_display.append("\n🎉 所有任务处理完成!\n")
# 收集所有结果和统计信息
all_results = []
failed_tasks = []
for i, future in enumerate(self.futures):
try:
result = future.result()
all_results.append(result)
except Exception as e:
failed_tasks.append((i + 1, str(e)))
self.result_display.append(f"✗ 任务 {i + 1} 失败: {str(e)}")
# 显示统计信息
self.result_display.append("\n📊 处理统计:")
self.result_display.append(f" - 总任务数: {self.total_tasks}")
self.result_display.append(f" - 成功任务: {len(all_results)}")
self.result_display.append(f" - 失败任务: {len(failed_tasks)}")
if all_results:
total_time = sum(r["processing_time"] for r in all_results)
self.result_display.append(f" - 总处理时间: {total_time:.1f}秒")
# 清理资源(不需要手动调用 shutdown)
self._executor = None
self.futures = []
# 重新启用按钮
self.start_button.setEnabled(True)
def closeEvent(self, event):
"""窗口关闭事件处理"""
if self._executor:
# 窗口关闭时不需要 wait=True,避免阻塞
self._executor.shutdown(wait=False)
self._executor = None
event.accept()
def main():
"""主函数"""
app = QApplication(sys.argv)
# 创建并显示主窗口
window = BatchProcessingExample()
window.show()
# 运行应用程序
sys.exit(app.exec())
if __name__ == "__main__":
main()
2. 使用 as_completed 处理动态任务¶
Python
import time
from PySide6.QtWidgets import (
QApplication,
QMainWindow,
QPushButton,
QVBoxLayout,
QWidget,
QTextEdit,
QLabel,
)
from PySide6.QtCore import QTimer
from qthreadwithreturn import QThreadPoolExecutor
class DynamicTaskExample(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("动态任务处理示例")
self.setGeometry(100, 100, 550, 450)
self.setup_ui()
self._executor = None
self.batch_counter = 0
self.current_batch_size = 0
self.current_batch_completed = 0
def setup_ui(self):
"""设置用户界面"""
central_widget = QWidget()
layout = QVBoxLayout()
# 控制按钮
self.start_button = QPushButton("开始动态任务处理")
self.start_button.clicked.connect(self.start_dynamic_tasks)
# 状态标签
self.status_label = QLabel("状态:准备就绪")
# 结果显示区域
self.result_display = QTextEdit()
self.result_display.setReadOnly(True)
self.result_display.append("点击按钮开始动态任务处理示例...")
layout.addWidget(self.start_button)
layout.addWidget(self.status_label)
layout.addWidget(self.result_display)
central_widget.setLayout(layout)
self.setCentralWidget(central_widget)
def start_dynamic_tasks(self):
"""开始动态任务处理"""
self.start_button.setEnabled(False)
self.result_display.clear()
self.result_display.append("开始动态任务处理...\n")
# 重置计数器
self.batch_counter = 0
self.current_batch_size = 0
self.current_batch_completed = 0
# 创建线程池执行器
self._executor = QThreadPoolExecutor(max_workers=3)
# 添加池级别完成回调 - 在所有任务完成后自动触发
self._executor.add_done_callback(self.on_all_tasks_completed)
# 生成所有任务并一次性提交
all_tasks = self.generate_all_tasks()
self.result_display.append(f"📋 总共生成 {len(all_tasks)} 个任务\n")
# 一次性提交所有任务到线程池(线程池会自动管理队列)
for i, task in enumerate(all_tasks, 1):
future = self._executor.submit(self.execute_dynamic_task, task)
# 为每个任务添加完成回调
future.add_done_callback(
lambda result, task_num=i: self.on_task_completed(result, task_num)
)
# 添加失败回调
future.add_failure_callback(
lambda exc, task_num=i: self.on_task_failed(exc, task_num)
)
def generate_all_tasks(self):
"""生成所有任务(一次性生成所有任务而不是分批)"""
all_tasks = []
# 批次1
all_tasks.extend(
[
{
"type": "download",
"url": "http://example.com/file1.txt",
"size": 5,
"batch": 1,
},
{
"type": "process",
"data": "图像处理任务",
"complexity": 3,
"batch": 1,
},
{"type": "upload", "destination": "服务器A", "size": 2, "batch": 1},
]
)
# 批次2
all_tasks.extend(
[
{
"type": "download",
"url": "http://example.com/file2.txt",
"size": 4,
"batch": 2,
},
{
"type": "process",
"data": "数据分析任务",
"complexity": 6,
"batch": 2,
},
{"type": "upload", "destination": "服务器B", "size": 3, "batch": 2},
]
)
# 批次3
all_tasks.extend(
[
{
"type": "process",
"data": "报告生成任务",
"complexity": 2,
"batch": 3,
},
{
"type": "download",
"url": "http://example.com/file3.txt",
"size": 7,
"batch": 3,
},
]
)
return all_tasks
def execute_dynamic_task(self, task):
"""执行动态任务"""
task_type = task["type"]
task_id = id(task) # 使用对象ID作为任务标识
print(f"开始执行任务 {task_id}:{task_type}")
if task_type == "download":
result = self.simulate_download(task)
elif task_type == "process":
result = self.simulate_process(task)
elif task_type == "upload":
result = self.simulate_upload(task)
else:
raise ValueError(f"未知的任务类型: {task_type}")
print(f"任务 {task_id} 执行完成")
return result
def simulate_download(self, task):
"""模拟下载任务"""
url = task["url"]
size = task["size"]
print(f"模拟下载:{url} (大小: {size}MB)")
time.sleep(size * 0.8) # 下载时间
return {
"type": "download",
"url": url,
"size": size,
"status": "success",
"download_time": size * 0.8,
"content": f"从 {url} 下载的内容",
}
def simulate_process(self, task):
"""模拟处理任务"""
data = task["data"]
complexity = task["complexity"]
print(f"模拟处理:{data} (复杂度: {complexity})")
time.sleep(complexity * 0.6) # 处理时间
return {
"type": "process",
"data": data,
"complexity": complexity,
"status": "success",
"processing_time": complexity * 0.6,
"result": f"处理结果:{data} 已完成",
}
def simulate_upload(self, task):
"""模拟上传任务"""
destination = task["destination"]
size = task["size"]
print(f"模拟上传到 {destination} (大小: {size}MB)")
time.sleep(size * 1.0) # 上传时间
return {
"type": "upload",
"destination": destination,
"size": size,
"status": "success",
"upload_time": size * 1.0,
"response": f"上传到 {destination} 成功",
}
def on_task_completed(self, result, task_num):
"""单个任务完成的回调"""
# 显示任务结果
self.display_task_result(result, task_num)
self.status_label.setText(f"状态:正在处理任务... (已完成 {task_num} 个)")
def on_task_failed(self, exception, task_num):
"""任务失败的回调"""
self.display_task_error(str(exception), task_num)
self.status_label.setText(f"状态:任务 {task_num} 失败")
def display_task_result(self, result, task_num):
"""显示任务结果"""
task_type = result["type"]
batch = result.get("batch", "?")
if task_type == "download":
self.result_display.append(
f"✓ [任务{task_num}] [批次{batch}] 下载完成: {result['url']} "
f"(耗时: {result['download_time']:.1f}秒)"
)
elif task_type == "process":
self.result_display.append(
f"✓ [任务{task_num}] [批次{batch}] 处理完成: {result['data']} "
f"(耗时: {result['processing_time']:.1f}秒)"
)
elif task_type == "upload":
self.result_display.append(
f"✓ [任务{task_num}] [批次{batch}] 上传完成: {result['destination']} "
f"(耗时: {result['upload_time']:.1f}秒)"
)
def display_task_error(self, error, task_num):
"""显示任务错误"""
self.result_display.append(f"✗ [任务{task_num}] 失败: {error}")
def on_all_tasks_completed(self):
"""所有任务完成处理 - 由线程池自动调用"""
self.result_display.append("\n🎉 所有动态任务处理完成!")
self.status_label.setText("状态:所有任务已完成")
# 清理资源(不需要手动调用 shutdown)
self._executor = None
# 重新启用按钮
self.start_button.setEnabled(True)
def closeEvent(self, event):
"""窗口关闭事件处理"""
if self._executor:
# 窗口关闭时不需要 wait=True,避免阻塞
self._executor.shutdown(wait=False)
self._executor = None
event.accept()
def main():
"""主函数"""
app = QApplication([])
# 创建并显示主窗口
window = DynamicTaskExample()
window.show()
# 运行应用程序
return app.exec()
if __name__ == "__main__":
main()
总结¶
QThreadPoolExecutor 提供了一个强大而灵活的线程池管理方案:
主要特性¶
- 线程池管理:避免频繁创建销毁线程的开销
- 兼容性:与 Python 标准库
concurrent.futuresAPI 兼容 - 批量处理:支持同时处理多个任务
- 资源管理:支持手动和自动(上下文管理器)资源管理
- Future 对象:提供异步任务的结果获取机制
使用场景¶
- 批量数据处理
- 并发网络请求
- 文件批量操作
- 并行计算任务
- 任何需要并发处理的场景
最佳实践¶
- 使用上下文管理器确保资源正确释放
- 合理设置
max_workers避免资源竞争 - 始终处理任务执行中的异常
- 使用适当的超时设置防止无限等待
- 监控任务执行状态,及时发现问题
与 QThreadWithReturn 的对比¶
- QThreadWithReturn:适合单个或少量异步任务,需要精细控制
- QThreadPoolExecutor:适合批量任务处理,强调并发效率
- 选择建议:单个任务用 QThreadWithReturn,批量任务用 QThreadPoolExecutor