跳转至

QThreadWithReturn 基础使用示例

概述

QThreadWithReturn 是一个功能强大的线程类,它结合了 Qt 的线程机制和 Python 的便捷性,支持: - 异步执行任务并获取返回值 - 设置超时时间 - 添加完成和失败回调函数 - 获取执行状态和结果

基本用法示例

1. 简单的任务执行

Python
import sys
import time
from PySide6.QtWidgets import (
    QApplication,
    QMainWindow,
    QPushButton,
    QVBoxLayout,
    QWidget,
    QLabel,
)
from PySide6.QtCore import QTimer
from qthreadwithreturn import QThreadWithReturn


class MainWindow(QMainWindow):
    def __init__(self):
        super().__init__()
        self.setWindowTitle("QThreadWithReturn 示例")
        self.setGeometry(100, 100, 400, 300)

        # 初始化线程引用
        self._thread = None
        self.current_task = None  # 初始化当前任务引用

        # 创建界面
        central_widget = QWidget()
        layout = QVBoxLayout()

        self.button = QPushButton("开始计算")
        self.status_label = QLabel("准备就绪")
        self.result_label = QLabel("结果:等待计算...")

        layout.addWidget(self.button)
        layout.addWidget(self.status_label)
        layout.addWidget(self.result_label)

        central_widget.setLayout(layout)
        self.setCentralWidget(central_widget)

        # 连接按钮点击信号
        self.button.clicked.connect(self.start_calculation)

    def start_calculation(self):
        """开始计算任务"""
        self.button.setEnabled(False)
        self.status_label.setText("计算中...")
        self.result_label.setText("结果:等待计算...")

        # 创建并启动线程
        # 模拟一个耗时的计算任务
        self._thread = QThreadWithReturn(self.complex_calculation, 10, 20)

        # 添加回调函数
        self._thread.add_done_callback(self.on_calculation_success)
        self._thread.add_failure_callback(self.on_calculation_failed)

        # 启动线程,设置超时时间为5秒
        self._thread.start(timeout_ms=5000)

    def complex_calculation(self, a, b):
        """模拟复杂的计算任务"""
        print(f"开始计算 {a} + {b} 的复杂运算...")

        # 模拟耗时操作
        time.sleep(2)  # 假设计算需要2秒

        # 执行实际计算
        result = a + b

        print(f"计算完成,结果:{result}")
        return result

    def on_calculation_success(self, result):
        """计算成功的回调函数"""
        print(f"回调:计算成功,结果为 {result}")

        # 更新界面(注意:这在主线程中执行)
        self.status_label.setText("计算完成!")
        self.result_label.setText(f"结果:{result}")
        self.button.setEnabled(True)

        # 清理线程引用
        self._thread = None

    def on_calculation_failed(self, exception):
        """计算失败的回调函数"""
        print(f"回调:计算失败,错误:{exception}")

        # 更新界面显示错误信息
        self.status_label.setText("计算失败!")
        self.result_label.setText(f"错误:{str(exception)}")
        self.button.setEnabled(True)

        # 清理线程引用
        self._thread = None


def main():
    """主函数"""
    app = QApplication(sys.argv)

    # 创建并显示主窗口
    window = MainWindow()
    window.show()

    # 运行应用程序
    sys.exit(app.exec())


if __name__ == "__main__":
    main()

2. 多个任务顺序执行

Python
import time
from PySide6.QtWidgets import QApplication, QPushButton, QTextEdit, QVBoxLayout, QWidget
from PySide6.QtCore import QTimer
from qthreadwithreturn import QThreadWithReturn


class MultiTaskExample:
    def __init__(self):
        self.app = QApplication([])
        self.setup_ui()
        self.current_task = None
        self.results = []

    def setup_ui(self):
        """设置用户界面"""
        self.window = QWidget()
        self.window.setWindowTitle("多任务顺序执行示例")
        self.window.setGeometry(100, 100, 500, 400)

        layout = QVBoxLayout()

        self.start_button = QPushButton("开始执行任务")
        self.start_button.clicked.connect(self.start_tasks)

        self.result_display = QTextEdit()
        self.result_display.setReadOnly(True)
        self.result_display.append("准备就绪,点击按钮开始执行任务...")

        layout.addWidget(self.start_button)
        layout.addWidget(self.result_display)

        self.window.setLayout(layout)
        self.window.show()

    def start_tasks(self):
        """开始执行多个任务"""
        self.start_button.setEnabled(False)
        self.results = []
        self.result_display.clear()
        self.result_display.append("开始执行任务...")

        # 定义要执行的任务列表
        self.tasks = [
            ("任务1:数据加载", self.load_data, 3),
            ("任务2:数据处理", self.process_data, 2),
            ("任务3:数据分析", self.analyze_data, 4),
            ("任务4:生成报告", self.generate_report, 1),
        ]

        # 执行第一个任务
        self.execute_next_task()

    def execute_next_task(self):
        """执行下一个任务"""
        if not self.tasks:
            # 所有任务完成
            self.on_all_tasks_completed()
            return

        # 获取下一个任务
        task_name, task_func, task_arg = self.tasks.pop(0)

        self.result_display.append(f"\n开始执行:{task_name}")

        # 创建并执行任务线程
        self.current_task = QThreadWithReturn(task_func, task_arg)
        self.current_task.add_done_callback(
            lambda result: self.on_task_completed(task_name, result)
        )
        self.current_task.add_failure_callback(
            lambda exception: self.on_task_failed(task_name, exception)
        )

        self.current_task.start(timeout_ms=10000)  # 10秒超时

    def load_data(self, seconds):
        """模拟数据加载任务"""
        print(f"模拟数据加载,耗时 {seconds} 秒...")
        time.sleep(seconds)
        return [f"数据项_{i}" for i in range(10)]  # 模拟加载的数据

    def process_data(self, data_count):
        """模拟数据处理任务"""
        print(f"模拟处理 {data_count} 项数据,耗时 2 秒...")
        time.sleep(2)
        return f"已处理 {data_count} 项数据"

    def analyze_data(self, complexity):
        """模拟数据分析任务"""
        print(f"模拟数据分析,复杂度 {complexity},耗时 4 秒...")
        time.sleep(4)
        return {"分析结果": "成功", "准确率": "95%"}

    def generate_report(self, format_type):
        """模拟报告生成任务"""
        print(f"模拟生成报告格式 {format_type},耗时 1 秒...")
        time.sleep(1)
        return "报告生成完成:PDF格式,共5页"

    def on_task_completed(self, task_name, result):
        """任务完成回调"""
        print(f"任务完成:{task_name},结果:{result}")
        self.result_display.append(f"✓ {task_name} 完成")
        self.results.append((task_name, result))

        # 清理当前任务
        self.current_task = None

        # 执行下一个任务
        QTimer.singleShot(500, self.execute_next_task)  # 延迟500ms执行下一个任务

    def on_task_failed(self, task_name, exception):
        """任务失败回调"""
        print(f"任务失败:{task_name},错误:{exception}")
        self.result_display.append(f"✗ {task_name} 失败:{str(exception)}")

        # 清理当前任务
        self.current_task = None

        # 继续执行下一个任务(或者可以选择中止)
        QTimer.singleShot(500, self.execute_next_task)

    def on_all_tasks_completed(self):
        """所有任务完成"""
        self.result_display.append("\n🎉 所有任务执行完成!")
        self.result_display.append("\n详细结果:")
        for task_name, result in self.results:
            self.result_display.append(f"  - {task_name}{result}")

        self.start_button.setEnabled(True)
        print("所有任务执行完成!")

    def run(self):
        """运行应用程序"""
        return self.app.exec()


def main():
    """主函数"""
    example = MultiTaskExample()
    return example.run()


if __name__ == "__main__":
    main()

3. 带进度反馈的任务

Python
import time
from PySide6.QtWidgets import (
    QApplication,
    QMainWindow,
    QPushButton,
    QProgressBar,
    QLabel,
    QVBoxLayout,
    QWidget,
)
from PySide6.QtCore import Signal, QObject
from qthreadwithreturn import QThreadWithReturn


class ProgressSignals(QObject):
    """用于进度反馈的信号类"""

    progress_updated = Signal(int, str)  # 进度百分比,状态信息


class LongRunningTaskExample(QMainWindow):
    def __init__(self):
        super().__init__()
        self.setWindowTitle("带进度反馈的任务示例")
        self.setGeometry(100, 100, 450, 200)

        self.setup_ui()
        self._thread = None
        self.progress_signals = None  # 初始化进度信号对象引用

    def setup_ui(self):
        """设置用户界面"""
        central_widget = QWidget()
        layout = QVBoxLayout()

        # 控制按钮
        self.start_button = QPushButton("开始长时间任务")
        self.start_button.clicked.connect(self.start_long_task)

        # 进度条
        self.progress_bar = QProgressBar()
        self.progress_bar.setRange(0, 100)
        self.progress_bar.setValue(0)

        # 状态标签
        self.status_label = QLabel("准备就绪")

        # 结果标签
        self.result_label = QLabel("结果:等待任务完成...")

        layout.addWidget(self.start_button)
        layout.addWidget(self.progress_bar)
        layout.addWidget(self.status_label)
        layout.addWidget(self.result_label)

        central_widget.setLayout(layout)
        self.setCentralWidget(central_widget)

    def start_long_task(self):
        """开始长时间运行的任务"""
        self.start_button.setEnabled(False)
        self.progress_bar.setValue(0)
        self.status_label.setText("任务执行中...")
        self.result_label.setText("结果:等待任务完成...")

        # 创建信号对象用于进度反馈
        self.progress_signals = ProgressSignals()
        self.progress_signals.progress_updated.connect(self.update_progress)

        # 创建并启动线程
        self._thread = QThreadWithReturn(
            self.long_running_task_with_progress,
            100,  # 总步数
            self.progress_signals,  # 进度信号对象
        )

        # 添加回调函数
        self._thread.add_done_callback(self.on_task_success)
        self._thread.add_failure_callback(self.on_task_failed)

        # 启动线程,设置较长的超时时间
        self._thread.start(timeout_ms=30000)  # 30秒超时

    def long_running_task_with_progress(self, total_steps, signals):
        """模拟长时间运行的任务,带进度反馈"""
        print(f"开始执行长时间任务,总共 {total_steps} 步...")

        results = []

        for step in range(1, total_steps + 1):
            # 检查任务是否被取消
            if hasattr(self._thread, "_is_cancelled") and self._thread._is_cancelled:
                print("任务被取消")
                raise Exception("任务被用户取消")

            # 模拟工作
            time.sleep(0.05)  # 每步耗时50毫秒

            # 计算进度
            progress = int((step / total_steps) * 100)

            # 每10步更新一次进度(更频繁的更新)
            if step % 10 == 0:
                status = f"正在处理第 {step} 步,共 {total_steps} 步"
                print(status)

                # 发射信号更新进度条
                signals.progress_updated.emit(progress, status)

            # 收集中间结果
            if step % 10 == 0:
                results.append(f"步骤{step}的结果")

        print("长时间任务执行完成")
        return {
            "total_steps": total_steps,
            "processed_items": len(results),
            "results": results[-5:],  # 只返回最后5个结果
            "success_rate": 100,
        }

    def update_progress(self, value, status):
        """更新进度显示(在主线程中执行)"""
        self.progress_bar.setValue(value)
        self.status_label.setText(status)

        # 更新结果标签显示当前进度
        self.result_label.setText(f"结果:任务进度 {value}%")

    def on_task_success(self, result):
        """任务成功完成回调"""
        print(f"任务成功完成,结果:{result}")

        self.progress_bar.setValue(100)
        self.status_label.setText("任务完成!")

        # 显示结果摘要
        result_text = (
            f"结果:成功处理 {result['processed_items']} 个项目,"
            f"成功率 {result['success_rate']}%"
        )
        self.result_label.setText(result_text)

        self.start_button.setEnabled(True)
        self._thread = None

    def on_task_failed(self, exception):
        """任务失败回调"""
        print(f"任务失败,错误:{exception}")

        self.status_label.setText("任务失败!")
        self.result_label.setText(f"结果:任务失败 - {str(exception)}")

        self.start_button.setEnabled(True)
        self._thread = None


def main():
    """主函数"""
    app = QApplication([])

    # 创建并显示主窗口
    window = LongRunningTaskExample()
    window.show()

    # 运行应用程序
    return app.exec()


if __name__ == "__main__":
    main()

总结

QThreadWithReturn 提供了一个简单而强大的方式来处理异步任务:

主要特性

  • 返回值支持:可以获取任务的执行结果
  • 回调机制:支持成功和失败的回调函数
  • 超时控制:可以设置任务执行的超时时间
  • 状态查询:可以查询任务是否完成、取消等状态
  • Qt集成:自动处理Qt环境下的信号槽机制

使用场景

  • 耗时的计算任务
  • 文件读写操作
  • 网络请求
  • 数据库操作
  • 任何可能阻塞UI的任务

最佳实践

  1. 合理设置超时时间,避免无限等待
  2. 始终添加失败回调处理异常情况
  3. 在回调中更新UI,保持界面响应性
  4. 及时清理线程引用,避免内存泄漏
  5. 使用进度反馈提升用户体验