快速开始¶
本指南将帮助您快速上手 QThreadWithReturn,了解基本概念和核心用法。
安装¶
使用 uv(推荐)¶
使用 pip¶
基本概念¶
QThreadWithReturn¶
QThreadWithReturn 是一个带返回值的线程类,提供类似 concurrent.futures.Future 的 API:
- 异步执行:在后台线程中执行函数
- 返回值:可以获取函数的返回值
- 回调机制:支持成功和失败回调
- 超时控制:可以设置执行超时
- 任务取消:支持优雅取消和强制终止
QThreadPoolExecutor¶
QThreadPoolExecutor 是线程池执行器,用于管理多个并发任务:
- 线程池管理:自动管理工作线程数量
- 任务调度:智能调度任务到可用线程
- 批量处理:支持批量提交和处理任务
- 资源管理:自动管理线程生命周期
第一个示例¶
让我们从一个简单的例子开始:
Python
import time
from PySide6.QtWidgets import QApplication, QWidget, QPushButton, QLabel, QVBoxLayout
from qthreadwithreturn import QThreadWithReturn
class SimpleExample(QWidget):
def __init__(self):
super().__init__()
self.setWindowTitle("我的第一个多线程应用")
self.setGeometry(100, 100, 400, 200)
layout = QVBoxLayout()
self.button = QPushButton("开始任务", self)
self.button.clicked.connect(self.start_task)
layout.addWidget(self.button)
self.label = QLabel("等待任务...", self)
layout.addWidget(self.label)
self.setLayout(layout)
def start_task(self):
"""启动耗时任务"""
self.button.setEnabled(False)
self.label.setText("任务执行中...")
# 定义耗时任务
def long_running_task():
time.sleep(3) # 模拟3秒的耗时操作
return "任务完成!"
# 创建线程
thread = QThreadWithReturn(long_running_task)
# 添加成功回调
thread.add_done_callback(self.on_success)
# 添加失败回调
thread.add_failure_callback(self.on_failure)
# 启动线程
thread.start()
def on_success(self, result):
"""任务成功完成回调"""
self.label.setText(result)
self.button.setEnabled(True)
def on_failure(self, error):
"""任务失败回调"""
self.label.setText(f"任务失败: {error}")
self.button.setEnabled(True)
if __name__ == '__main__':
app = QApplication([])
window = SimpleExample()
window.show()
app.exec()
核心用法¶
2. 带参数的任务¶
Python
# 位置参数
thread = QThreadWithReturn(pow, 2, 10) # 2^10
# 关键字参数
def greet(name, message="Hello"):
return f"{message}, {name}!"
thread = QThreadWithReturn(greet, "Alice", message="Hi")
# 混合参数
def process_data(data, multiplier=1, offset=0):
return [x * multiplier + offset for x in data]
thread = QThreadWithReturn(
process_data,
[1, 2, 3, 4, 5],
multiplier=2,
offset=10
)
3. 超时控制¶
Python
# 5秒超时
thread = QThreadWithReturn(long_running_task)
thread.start(timeout_ms=5000)
try:
result = thread.result(timeout_ms=5000)
print(f"结果: {result}")
except TimeoutError:
print("任务超时")
4. 任务取消¶
Python
thread = QThreadWithReturn(long_running_task)
thread.start()
# 优雅取消
success = thread.cancel()
if success:
print("任务已取消")
# 强制终止(紧急情况)
success = thread.cancel(force_stop=True)
if success:
print("任务已强制终止")
线程池使用¶
线程池回调¶
Python
pool = QThreadPoolExecutor(max_workers=2)
# 添加池级别完成回调
pool.add_done_callback(lambda: print("所有任务完成!"))
# 添加任务失败回调
pool.add_failure_callback(lambda e: print(f"任务失败: {e}"))
# 提交任务
for i in range(5):
future = pool.submit(task_function, i)
future.add_done_callback(lambda r, i=i: print(f"任务 {i} 完成: {r}"))
pool.shutdown()
使用 as_completed¶
Python
pool = QThreadPoolExecutor(max_workers=3)
# 提交多个任务
futures = [
pool.submit(task, i)
for i in range(5)
]
# 按完成顺序处理结果
for future in QThreadPoolExecutor.as_completed(futures):
try:
result = future.result()
print(f"任务完成,结果: {result}")
except Exception as e:
print(f"任务失败: {e}")
pool.shutdown()
常见问题¶
Q: 什么时候使用 QThreadWithReturn vs QThreadPoolExecutor?¶
A: - QThreadWithReturn:适合单个、独立的任务 - QThreadPoolExecutor:适合多个并发任务,需要线程池管理
Q: 如何避免 UI 冻结?¶
A:
- 所有耗时操作都应该在后台线程中执行
- 使用回调函数更新 UI,而不是直接等待结果
- 避免在主线程中调用 result() 或 wait() 而不设置超时
Q: 如何处理任务中的异常?¶
A:
- 添加失败回调:thread.add_failure_callback(handler)
- 使用 try-except 捕获 result() 的异常
- 检查 exception() 方法获取异常信息
Q: 线程池应该设置多大?¶
A:
- CPU 密集型:os.cpu_count() 或 os.cpu_count() - 1
- I/O 密集型:os.cpu_count() * 2 到 os.cpu_count() * 5
- 混合型:根据实际测试调整
下一步¶
现在您已经了解了 QThreadWithReturn 的基本用法,可以继续学习:
- API 参考 - QThreadWithReturn API 文档
- API 参考 - QThreadPoolExecutor API 文档
- QThreadWithReturn 应用示例 - 完整的应用示例
- QThreadPoolExecutor 应用示例 - 线程池应用示例
开始构建您的多线程应用吧!