跳转至

QThreadPoolExecutor 基础使用示例

概述

QThreadPoolExecutor 是一个线程池执行器,兼容 Python 标准库的 concurrent.futures API,提供了: - 线程池管理,避免频繁创建销毁线程 - 批量任务处理 - Future 对象支持 - 与标准库相似的接口

基本用法示例

1. 简单的批量任务处理

Python
import sys
import time
from PySide6.QtWidgets import (
    QApplication,
    QMainWindow,
    QPushButton,
    QVBoxLayout,
    QWidget,
    QTextEdit,
    QProgressBar,
)
from qthreadwithreturn import QThreadPoolExecutor


class BatchProcessingExample(QMainWindow):
    def __init__(self):
        super().__init__()
        self.setWindowTitle("QThreadPoolExecutor 批量处理示例")
        self.setGeometry(100, 100, 600, 400)

        self.setup_ui()
        self._executor = None
        self.futures = []  # 初始化 Future 列表引用

    def setup_ui(self):
        """设置用户界面"""
        central_widget = QWidget()
        layout = QVBoxLayout()

        # 控制按钮
        self.start_button = QPushButton("开始批量处理")
        self.start_button.clicked.connect(self.start_batch_processing)

        # 进度条
        self.progress_bar = QProgressBar()
        self.progress_bar.setRange(0, 100)
        self.progress_bar.setValue(0)

        # 结果显示区域
        self.result_display = QTextEdit()
        self.result_display.setReadOnly(True)
        self.result_display.append("准备就绪,点击按钮开始批量处理...")

        layout.addWidget(self.start_button)
        layout.addWidget(self.progress_bar)
        layout.addWidget(self.result_display)

        central_widget.setLayout(layout)
        self.setCentralWidget(central_widget)

    def start_batch_processing(self):
        """开始批量处理任务"""
        self.start_button.setEnabled(False)
        self.progress_bar.setValue(0)
        self.result_display.clear()
        self.result_display.append("开始批量处理...\n")

        # 创建线程池执行器,最多4个工作线程
        self._executor = QThreadPoolExecutor(max_workers=4)

        # 模拟一批要处理的数据
        self.data_to_process = [
            {"id": 1, "content": "数据块1", "size": 5},
            {"id": 2, "content": "数据块2", "size": 3},
            {"id": 3, "content": "数据块3", "size": 8},
            {"id": 4, "content": "数据块4", "size": 2},
            {"id": 5, "content": "数据块5", "size": 6},
            {"id": 6, "content": "数据块6", "size": 4},
            {"id": 7, "content": "数据块7", "size": 7},
            {"id": 8, "content": "数据块8", "size": 1},
        ]

        self.total_tasks = len(self.data_to_process)
        self.completed_tasks = 0

        # 添加池级别完成回调 - 所有任务完成时自动调用
        self._executor.add_done_callback(self.all_tasks_completed)

        # 提交所有任务到线程池,为每个任务添加完成回调
        self.futures = []
        for data in self.data_to_process:
            future = self._executor.submit(self.process_data_item, data)
            # 为每个任务添加完成回调来更新进度
            future.add_done_callback(self.on_task_completed)
            self.futures.append(future)

    def process_data_item(self, data_item):
        """处理单个数据项"""
        item_id = data_item["id"]
        content = data_item["content"]
        size = data_item["size"]

        print(f"开始处理数据项 {item_id}: {content}")

        # 模拟处理时间(根据数据大小)
        time.sleep(size * 0.5)  # 每个大小单位耗时0.5秒

        # 模拟处理结果
        processed_content = f"[已处理] {content} (大小: {size})"
        processing_time = size * 0.5

        print(f"数据项 {item_id} 处理完成")

        return {
            "id": item_id,
            "original_content": content,
            "processed_content": processed_content,
            "processing_time": processing_time,
            "status": "success",
        }

    def on_task_completed(self, result):
        """单个任务完成的回调"""
        self.completed_tasks += 1

        # 更新进度条
        progress = int((self.completed_tasks / self.total_tasks) * 100)
        self.progress_bar.setValue(progress)

        # 显示任务结果
        self.result_display.append(
            f"✓ 任务 {result['id']}: {result['processed_content']} "
            f"(耗时: {result['processing_time']:.1f}秒)"
        )
        self.result_display.append(
            f"进度: {self.completed_tasks}/{self.total_tasks} 完成\n"
        )

    def all_tasks_completed(self):
        """所有任务完成处理 - 由线程池自动调用"""
        self.result_display.append("\n🎉 所有任务处理完成!\n")

        # 收集所有结果和统计信息
        all_results = []
        failed_tasks = []

        for i, future in enumerate(self.futures):
            try:
                result = future.result()
                all_results.append(result)
            except Exception as e:
                failed_tasks.append((i + 1, str(e)))
                self.result_display.append(f"✗ 任务 {i + 1} 失败: {str(e)}")

        # 显示统计信息
        self.result_display.append("\n📊 处理统计:")
        self.result_display.append(f"  - 总任务数: {self.total_tasks}")
        self.result_display.append(f"  - 成功任务: {len(all_results)}")
        self.result_display.append(f"  - 失败任务: {len(failed_tasks)}")

        if all_results:
            total_time = sum(r["processing_time"] for r in all_results)
            self.result_display.append(f"  - 总处理时间: {total_time:.1f}秒")

        # 清理资源(不需要手动调用 shutdown)
        self._executor = None
        self.futures = []

        # 重新启用按钮
        self.start_button.setEnabled(True)

    def closeEvent(self, event):
        """窗口关闭事件处理"""
        if self._executor:
            # 窗口关闭时不需要 wait=True,避免阻塞
            self._executor.shutdown(wait=False)
            self._executor = None
        event.accept()


def main():
    """主函数"""
    app = QApplication(sys.argv)

    # 创建并显示主窗口
    window = BatchProcessingExample()
    window.show()

    # 运行应用程序
    sys.exit(app.exec())


if __name__ == "__main__":
    main()

2. 使用 as_completed 处理动态任务

Python
import time
from PySide6.QtWidgets import (
    QApplication,
    QMainWindow,
    QPushButton,
    QVBoxLayout,
    QWidget,
    QTextEdit,
    QLabel,
)
from PySide6.QtCore import QTimer
from qthreadwithreturn import QThreadPoolExecutor


class DynamicTaskExample(QMainWindow):
    def __init__(self):
        super().__init__()
        self.setWindowTitle("动态任务处理示例")
        self.setGeometry(100, 100, 550, 450)

        self.setup_ui()
        self._executor = None
        self.batch_counter = 0
        self.current_batch_size = 0
        self.current_batch_completed = 0

    def setup_ui(self):
        """设置用户界面"""
        central_widget = QWidget()
        layout = QVBoxLayout()

        # 控制按钮
        self.start_button = QPushButton("开始动态任务处理")
        self.start_button.clicked.connect(self.start_dynamic_tasks)

        # 状态标签
        self.status_label = QLabel("状态:准备就绪")

        # 结果显示区域
        self.result_display = QTextEdit()
        self.result_display.setReadOnly(True)
        self.result_display.append("点击按钮开始动态任务处理示例...")

        layout.addWidget(self.start_button)
        layout.addWidget(self.status_label)
        layout.addWidget(self.result_display)

        central_widget.setLayout(layout)
        self.setCentralWidget(central_widget)

    def start_dynamic_tasks(self):
        """开始动态任务处理"""
        self.start_button.setEnabled(False)
        self.result_display.clear()
        self.result_display.append("开始动态任务处理...\n")

        # 重置计数器
        self.batch_counter = 0
        self.current_batch_size = 0
        self.current_batch_completed = 0

        # 创建线程池执行器
        self._executor = QThreadPoolExecutor(max_workers=3)

        # 添加池级别完成回调 - 在所有任务完成后自动触发
        self._executor.add_done_callback(self.on_all_tasks_completed)

        # 生成所有任务并一次性提交
        all_tasks = self.generate_all_tasks()
        self.result_display.append(f"📋 总共生成 {len(all_tasks)} 个任务\n")

        # 一次性提交所有任务到线程池(线程池会自动管理队列)
        for i, task in enumerate(all_tasks, 1):
            future = self._executor.submit(self.execute_dynamic_task, task)
            # 为每个任务添加完成回调
            future.add_done_callback(
                lambda result, task_num=i: self.on_task_completed(result, task_num)
            )
            # 添加失败回调
            future.add_failure_callback(
                lambda exc, task_num=i: self.on_task_failed(exc, task_num)
            )

    def generate_all_tasks(self):
        """生成所有任务(一次性生成所有任务而不是分批)"""
        all_tasks = []

        # 批次1
        all_tasks.extend(
            [
                {
                    "type": "download",
                    "url": "http://example.com/file1.txt",
                    "size": 5,
                    "batch": 1,
                },
                {
                    "type": "process",
                    "data": "图像处理任务",
                    "complexity": 3,
                    "batch": 1,
                },
                {"type": "upload", "destination": "服务器A", "size": 2, "batch": 1},
            ]
        )

        # 批次2
        all_tasks.extend(
            [
                {
                    "type": "download",
                    "url": "http://example.com/file2.txt",
                    "size": 4,
                    "batch": 2,
                },
                {
                    "type": "process",
                    "data": "数据分析任务",
                    "complexity": 6,
                    "batch": 2,
                },
                {"type": "upload", "destination": "服务器B", "size": 3, "batch": 2},
            ]
        )

        # 批次3
        all_tasks.extend(
            [
                {
                    "type": "process",
                    "data": "报告生成任务",
                    "complexity": 2,
                    "batch": 3,
                },
                {
                    "type": "download",
                    "url": "http://example.com/file3.txt",
                    "size": 7,
                    "batch": 3,
                },
            ]
        )

        return all_tasks

    def execute_dynamic_task(self, task):
        """执行动态任务"""
        task_type = task["type"]
        task_id = id(task)  # 使用对象ID作为任务标识

        print(f"开始执行任务 {task_id}{task_type}")

        if task_type == "download":
            result = self.simulate_download(task)
        elif task_type == "process":
            result = self.simulate_process(task)
        elif task_type == "upload":
            result = self.simulate_upload(task)
        else:
            raise ValueError(f"未知的任务类型: {task_type}")

        print(f"任务 {task_id} 执行完成")
        return result

    def simulate_download(self, task):
        """模拟下载任务"""
        url = task["url"]
        size = task["size"]

        print(f"模拟下载:{url} (大小: {size}MB)")
        time.sleep(size * 0.8)  # 下载时间

        return {
            "type": "download",
            "url": url,
            "size": size,
            "status": "success",
            "download_time": size * 0.8,
            "content": f"从 {url} 下载的内容",
        }

    def simulate_process(self, task):
        """模拟处理任务"""
        data = task["data"]
        complexity = task["complexity"]

        print(f"模拟处理:{data} (复杂度: {complexity})")
        time.sleep(complexity * 0.6)  # 处理时间

        return {
            "type": "process",
            "data": data,
            "complexity": complexity,
            "status": "success",
            "processing_time": complexity * 0.6,
            "result": f"处理结果:{data} 已完成",
        }

    def simulate_upload(self, task):
        """模拟上传任务"""
        destination = task["destination"]
        size = task["size"]

        print(f"模拟上传到 {destination} (大小: {size}MB)")
        time.sleep(size * 1.0)  # 上传时间

        return {
            "type": "upload",
            "destination": destination,
            "size": size,
            "status": "success",
            "upload_time": size * 1.0,
            "response": f"上传到 {destination} 成功",
        }

    def on_task_completed(self, result, task_num):
        """单个任务完成的回调"""
        # 显示任务结果
        self.display_task_result(result, task_num)
        self.status_label.setText(f"状态:正在处理任务... (已完成 {task_num} 个)")

    def on_task_failed(self, exception, task_num):
        """任务失败的回调"""
        self.display_task_error(str(exception), task_num)
        self.status_label.setText(f"状态:任务 {task_num} 失败")

    def display_task_result(self, result, task_num):
        """显示任务结果"""
        task_type = result["type"]
        batch = result.get("batch", "?")

        if task_type == "download":
            self.result_display.append(
                f"✓ [任务{task_num}] [批次{batch}] 下载完成: {result['url']} "
                f"(耗时: {result['download_time']:.1f}秒)"
            )
        elif task_type == "process":
            self.result_display.append(
                f"✓ [任务{task_num}] [批次{batch}] 处理完成: {result['data']} "
                f"(耗时: {result['processing_time']:.1f}秒)"
            )
        elif task_type == "upload":
            self.result_display.append(
                f"✓ [任务{task_num}] [批次{batch}] 上传完成: {result['destination']} "
                f"(耗时: {result['upload_time']:.1f}秒)"
            )

    def display_task_error(self, error, task_num):
        """显示任务错误"""
        self.result_display.append(f"✗ [任务{task_num}] 失败: {error}")

    def on_all_tasks_completed(self):
        """所有任务完成处理 - 由线程池自动调用"""
        self.result_display.append("\n🎉 所有动态任务处理完成!")
        self.status_label.setText("状态:所有任务已完成")

        # 清理资源(不需要手动调用 shutdown)
        self._executor = None

        # 重新启用按钮
        self.start_button.setEnabled(True)

    def closeEvent(self, event):
        """窗口关闭事件处理"""
        if self._executor:
            # 窗口关闭时不需要 wait=True,避免阻塞
            self._executor.shutdown(wait=False)
            self._executor = None
        event.accept()


def main():
    """主函数"""
    app = QApplication([])

    # 创建并显示主窗口
    window = DynamicTaskExample()
    window.show()

    # 运行应用程序
    return app.exec()


if __name__ == "__main__":
    main()

总结

QThreadPoolExecutor 提供了一个强大而灵活的线程池管理方案:

主要特性

  • 线程池管理:避免频繁创建销毁线程的开销
  • 兼容性:与 Python 标准库 concurrent.futures API 兼容
  • 批量处理:支持同时处理多个任务
  • 资源管理:支持手动和自动(上下文管理器)资源管理
  • Future 对象:提供异步任务的结果获取机制

使用场景

  • 批量数据处理
  • 并发网络请求
  • 文件批量操作
  • 并行计算任务
  • 任何需要并发处理的场景

最佳实践

  1. 使用上下文管理器确保资源正确释放
  2. 合理设置 max_workers 避免资源竞争
  3. 始终处理任务执行中的异常
  4. 使用适当的超时设置防止无限等待
  5. 监控任务执行状态,及时发现问题

与 QThreadWithReturn 的对比

  • QThreadWithReturn:适合单个或少量异步任务,需要精细控制
  • QThreadPoolExecutor:适合批量任务处理,强调并发效率
  • 选择建议:单个任务用 QThreadWithReturn,批量任务用 QThreadPoolExecutor