返回目录

多线程教学.py PY

text/plain
12.37 KB
2026-01-14 20:12:28
下载文件

文件预览

语言: python | 编码: UTF-8
总行数: 381
import tkinter as tk
import threading
import queue
import time
import serial
import serial.tools.list_ports
from datetime import datetime, timedelta

# ========== 全局变量 ==========
# 线程控制标志
running_threads = True
# 任务队列
task_queue = queue.Queue()
# 定时任务列表
timer_tasks = []
# 灯光状态
light_status = "关"
# 串口对象
serial_port = None
# 串口连接状态
serial_connected = False

# ========== 串口工作线程函数 ==========
def serial_worker():
    """工作线程:处理真实的串口通信"""
    global serial_port, serial_connected, light_status
    
    print("[串口线程] 启动")
    
    while running_threads:
        try:
            # 从队列获取任务,最多等待0.5秒
            data = task_queue.get(timeout=0.5)
            
            if data == "exit":
                print("[串口线程] 收到退出信号")
                break
            
            # 只有串口已连接时才发送数据
            if serial_connected and serial_port and serial_port.is_open:
                print(f"[串口线程] 发送数据: {data}")
                
                # 发送数据到串口
                serial_port.write(data.encode('utf-8'))
                
                # 更新灯光状态
                if data == "1":
                    light_status = "开"
                    update_status(f"已发送开灯指令: {data}")
                elif data == "2":
                    light_status = "关"
                    update_status(f"已发送关灯指令: {data}")
                else:
                    update_status(f"已发送指令: {data}")
                    
                print(f"[串口线程] 发送完成,当前状态: {light_status}")
            else:
                print(f"[串口线程] 串口未连接,无法发送: {data}")
                update_status("串口未连接")
            
            task_queue.task_done()
            
        except queue.Empty:
            # 队列为空,继续循环
            continue
        except Exception as e:
            print(f"[串口线程] 错误: {e}")
            update_status(f"串口错误: {str(e)}")
    
    print("[串口线程] 结束")

# ========== 定时器线程函数 ==========
def timer_worker():
    """定时器线程:处理定时任务"""
    print("[定时器线程] 启动")
    
    while running_threads:
        try:
            current_time = datetime.now()
            
            # 检查是否有定时任务需要执行
            for task in timer_tasks[:]:  # 复制列表进行遍历
                scheduled_time, action, task_name = task
                
                if current_time >= scheduled_time:
                    print(f"[定时器] 执行任务: {task_name}")
                    
                    if action == "on":
                        task_queue.put("1")
                        update_status(f"定时开灯 ({task_name})")
                    elif action == "off":
                        task_queue.put("2")
                        update_status(f"定时关灯 ({task_name})")
                    
                    # 移除已执行的任务
                    timer_tasks.remove(task)
            
            time.sleep(0.1)  # 每0.1秒检查一次
            
        except Exception as e:
            print(f"[定时器] 错误: {e}")
    
    print("[定时器线程] 结束")

# ========== UI更新函数 ==========
def update_status(message):
    """在主线程中更新状态显示"""
    root.after(0, lambda: status_label.config(text=message))

def update_light_status():
    """更新灯光状态显示"""
    if light_status == "开":
        light_canvas.config(bg="yellow")
    else:
        light_canvas.config(bg="gray")
    
    # 每秒更新一次
    root.after(1000, update_light_status)

def add_timer_task(delay_seconds, action, task_name):
    """添加定时任务"""
    scheduled_time = datetime.now() + timedelta(seconds=delay_seconds)
    timer_tasks.append((scheduled_time, action, task_name))
    
    time_str = scheduled_time.strftime("%H:%M:%S")
    update_status(f"定时任务已添加: {task_name} ({time_str})")

def update_task_list():
    """更新定时任务列表显示"""
    task_listbox.delete(0, tk.END)
    
    for scheduled_time, action, task_name in timer_tasks:
        time_str = scheduled_time.strftime("%H:%M:%S")
        action_text = "开灯" if action == "on" else "关灯"
        task_listbox.insert(tk.END, f"{time_str} - {action_text} ({task_name})")
    
    # 每秒更新一次
    root.after(1000, update_task_list)

# ========== 串口连接函数 ==========
def connect_serial():
    """连接串口"""
    global serial_port, serial_connected
    
    try:
        # 获取选择的串口
        selected_port = port_var.get()
        
        if not selected_port:
            update_status("请选择串口")
            return
        
        # 如果已有串口连接,先关闭
        if serial_port and serial_port.is_open:
            serial_port.close()
        
        # 打开新串口
        serial_port = serial.Serial(
            port=selected_port,
            baudrate=9600,
            bytesize=serial.EIGHTBITS,
            parity=serial.PARITY_NONE,
            stopbits=serial.STOPBITS_ONE,
            timeout=1
        )
        
        if serial_port.is_open:
            serial_connected = True
            update_status(f"串口已连接: {selected_port}")
            connect_btn.config(text="已连接", state="disabled", bg="green")
            port_menu.config(state="disabled")
            baud_menu.config(state="disabled")
            print(f"[主线程] 串口已连接: {selected_port}")
        else:
            update_status("串口打开失败")
            
    except Exception as e:
        update_status(f"连接失败: {str(e)}")
        print(f"[主线程] 串口连接失败: {e}")

def refresh_ports():
    """刷新串口列表"""
    ports = serial.tools.list_ports.comports()
    port_list = [port.device for port in ports]
    
    # 更新下拉菜单
    port_menu['menu'].delete(0, 'end')
    for port in port_list:
        port_menu['menu'].add_command(label=port, command=tk._setit(port_var, port))
    
    if port_list:
        port_var.set(port_list[0])
        update_status(f"发现 {len(port_list)} 个串口")
    else:
        update_status("未发现串口")

# ========== 按钮事件处理函数 ==========
def turn_on():
    """开灯按钮点击事件"""
    if serial_connected:
        task_queue.put("1")
        update_status("正在发送开灯指令...")
    else:
        update_status("请先连接串口")

def turn_off():
    """关灯按钮点击事件"""
    if serial_connected:
        task_queue.put("2")
        update_status("正在发送关灯指令...")
    else:
        update_status("请先连接串口")

def schedule_on():
    """定时开灯按钮点击事件"""
    try:
        seconds = int(delay_entry.get())
        if seconds <= 0:
            update_status("请输入正数秒数")
            return
            
        task_name = f"{seconds}秒后开灯"
        add_timer_task(seconds, "on", task_name)
        
    except ValueError:
        update_status("请输入有效的秒数")

def schedule_off():
    """定时关灯按钮点击事件"""
    try:
        seconds = int(delay_entry.get())
        if seconds <= 0:
            update_status("请输入正数秒数")
            return
            
        task_name = f"{seconds}秒后关灯"
        add_timer_task(seconds, "off", task_name)
        
    except ValueError:
        update_status("请输入有效的秒数")

def on_closing():
    """窗口关闭事件"""
    global running_threads
    
    running_threads = False
    
    # 发送退出信号给工作线程
    task_queue.put("exit")
    
    # 关闭串口
    if serial_port and serial_port.is_open:
        serial_port.close()
        print("[主线程] 串口已关闭")
    
    # 等待线程结束
    time.sleep(0.5)
    
    root.destroy()
    print("[主线程] 程序结束")

# ========== 创建界面 ==========
root = tk.Tk()
root.title("多线程定时灯光控制 - 真实串口版")
root.geometry("450x550")

# 标题
title_label = tk.Label(root, text="多线程定时灯光控制", font=("Arial", 16, "bold"))
title_label.pack(pady=10)

# 串口设置区域
serial_frame = tk.Frame(root, relief=tk.GROOVE, borderwidth=2)
serial_frame.pack(pady=10, padx=20, fill=tk.X)

tk.Label(serial_frame, text="串口设置", font=("Arial", 12, "bold")).pack(pady=5)

# 串口选择
port_frame = tk.Frame(serial_frame)
port_frame.pack(pady=5)

tk.Label(port_frame, text="串口:").pack(side=tk.LEFT, padx=5)

port_var = tk.StringVar()
port_menu = tk.OptionMenu(port_frame, port_var, "")
port_menu.pack(side=tk.LEFT, padx=5)

refresh_btn = tk.Button(port_frame, text="刷新", command=refresh_ports)
refresh_btn.pack(side=tk.LEFT, padx=5)

# 波特率选择
baud_frame = tk.Frame(serial_frame)
baud_frame.pack(pady=5)

tk.Label(baud_frame, text="波特率:").pack(side=tk.LEFT, padx=5)

baud_var = tk.StringVar(value="9600")
baud_menu = tk.OptionMenu(baud_frame, baud_var, "9600", "115200", "19200", "38400", "57600")
baud_menu.pack(side=tk.LEFT, padx=5)

# 连接按钮
connect_btn = tk.Button(serial_frame, text="连接串口", command=connect_serial, bg="lightblue")
connect_btn.pack(pady=5)

# 灯光状态显示
light_frame = tk.Frame(root)
light_frame.pack(pady=10)

light_label = tk.Label(light_frame, text="灯光状态:", font=("Arial", 12))
light_label.pack(side=tk.LEFT, padx=5)

light_canvas = tk.Canvas(light_frame, width=30, height=30, bg="gray")
light_canvas.pack(side=tk.LEFT)

# 状态显示
status_label = tk.Label(root, text="请先连接串口", fg="blue", font=("Arial", 10))
status_label.pack(pady=5)

# 定时设置区域
timer_frame = tk.Frame(root)
timer_frame.pack(pady=15)

tk.Label(timer_frame, text="定时秒数:").grid(row=0, column=0, padx=5, pady=5)

delay_entry = tk.Entry(timer_frame, width=10)
delay_entry.insert(0, "5")
delay_entry.grid(row=0, column=1, padx=5, pady=5)

tk.Button(timer_frame, text="定时开灯", command=schedule_on, bg="orange").grid(row=0, column=2, padx=5, pady=5)
tk.Button(timer_frame, text="定时关灯", command=schedule_off, bg="lightblue").grid(row=0, column=3, padx=5, pady=5)

# 控制按钮区域
control_frame = tk.Frame(root)
control_frame.pack(pady=20)

tk.Button(control_frame, text="开灯", command=turn_on, bg="green", fg="white", 
          width=10, height=2).pack(side=tk.LEFT, padx=10)

tk.Button(control_frame, text="关灯", command=turn_off, bg="red", fg="white", 
          width=10, height=2).pack(side=tk.LEFT, padx=10)

# 线程状态显示
thread_frame = tk.Frame(root, relief=tk.RAISED, borderwidth=2)
thread_frame.pack(pady=15, padx=20, fill=tk.X)

tk.Label(thread_frame, text="线程状态:", font=("Arial", 10, "bold")).grid(row=0, column=0, columnspan=2)

tk.Label(thread_frame, text="主线程:").grid(row=1, column=0, sticky="e")
tk.Label(thread_frame, text="运行中", fg="green").grid(row=1, column=1, sticky="w", padx=5)

tk.Label(thread_frame, text="串口线程:").grid(row=2, column=0, sticky="e")
tk.Label(thread_frame, text="运行中", fg="green").grid(row=2, column=1, sticky="w", padx=5)

tk.Label(thread_frame, text="定时线程:").grid(row=3, column=0, sticky="e")
tk.Label(thread_frame, text="运行中", fg="green").grid(row=3, column=1, sticky="w", padx=5)

# 定时任务列表显示
task_frame = tk.Frame(root, relief=tk.SUNKEN, borderwidth=1)
task_frame.pack(pady=15, padx=20, fill=tk.X)

tk.Label(task_frame, text="等待执行的定时任务:", font=("Arial", 9)).pack(pady=5)

task_listbox = tk.Listbox(task_frame, height=3)
task_listbox.pack(fill=tk.X, padx=5, pady=5)

# ========== 启动线程和主循环 ==========
# 启动串口工作线程
threading.Thread(target=serial_worker, daemon=True).start()

# 启动定时器线程
threading.Thread(target=timer_worker, daemon=True).start()

# 启动UI更新
update_light_status()
update_task_list()
refresh_ports()  # 初始刷新串口列表

# 绑定关闭事件
root.protocol("WM_DELETE_WINDOW", on_closing)

# 启动主循环
root.mainloop()