语言: python
| 编码: UTF-8
总行数: 381
import tkinter as tk
import threading
import queue
import time
import serial
import serial.tools.list_ports
from datetime import datetime, timedelta
# ========== 全局变量 ==========
# 线程控制标志
running_threads = True
# 任务队列
task_queue = queue.Queue()
# 定时任务列表
timer_tasks = []
# 灯光状态
light_status = "关"
# 串口对象
serial_port = None
# 串口连接状态
serial_connected = False
# ========== 串口工作线程函数 ==========
def serial_worker():
"""工作线程:处理真实的串口通信"""
global serial_port, serial_connected, light_status
print("[串口线程] 启动")
while running_threads:
try:
# 从队列获取任务,最多等待0.5秒
data = task_queue.get(timeout=0.5)
if data == "exit":
print("[串口线程] 收到退出信号")
break
# 只有串口已连接时才发送数据
if serial_connected and serial_port and serial_port.is_open:
print(f"[串口线程] 发送数据: {data}")
# 发送数据到串口
serial_port.write(data.encode('utf-8'))
# 更新灯光状态
if data == "1":
light_status = "开"
update_status(f"已发送开灯指令: {data}")
elif data == "2":
light_status = "关"
update_status(f"已发送关灯指令: {data}")
else:
update_status(f"已发送指令: {data}")
print(f"[串口线程] 发送完成,当前状态: {light_status}")
else:
print(f"[串口线程] 串口未连接,无法发送: {data}")
update_status("串口未连接")
task_queue.task_done()
except queue.Empty:
# 队列为空,继续循环
continue
except Exception as e:
print(f"[串口线程] 错误: {e}")
update_status(f"串口错误: {str(e)}")
print("[串口线程] 结束")
# ========== 定时器线程函数 ==========
def timer_worker():
"""定时器线程:处理定时任务"""
print("[定时器线程] 启动")
while running_threads:
try:
current_time = datetime.now()
# 检查是否有定时任务需要执行
for task in timer_tasks[:]: # 复制列表进行遍历
scheduled_time, action, task_name = task
if current_time >= scheduled_time:
print(f"[定时器] 执行任务: {task_name}")
if action == "on":
task_queue.put("1")
update_status(f"定时开灯 ({task_name})")
elif action == "off":
task_queue.put("2")
update_status(f"定时关灯 ({task_name})")
# 移除已执行的任务
timer_tasks.remove(task)
time.sleep(0.1) # 每0.1秒检查一次
except Exception as e:
print(f"[定时器] 错误: {e}")
print("[定时器线程] 结束")
# ========== UI更新函数 ==========
def update_status(message):
"""在主线程中更新状态显示"""
root.after(0, lambda: status_label.config(text=message))
def update_light_status():
"""更新灯光状态显示"""
if light_status == "开":
light_canvas.config(bg="yellow")
else:
light_canvas.config(bg="gray")
# 每秒更新一次
root.after(1000, update_light_status)
def add_timer_task(delay_seconds, action, task_name):
"""添加定时任务"""
scheduled_time = datetime.now() + timedelta(seconds=delay_seconds)
timer_tasks.append((scheduled_time, action, task_name))
time_str = scheduled_time.strftime("%H:%M:%S")
update_status(f"定时任务已添加: {task_name} ({time_str})")
def update_task_list():
"""更新定时任务列表显示"""
task_listbox.delete(0, tk.END)
for scheduled_time, action, task_name in timer_tasks:
time_str = scheduled_time.strftime("%H:%M:%S")
action_text = "开灯" if action == "on" else "关灯"
task_listbox.insert(tk.END, f"{time_str} - {action_text} ({task_name})")
# 每秒更新一次
root.after(1000, update_task_list)
# ========== 串口连接函数 ==========
def connect_serial():
"""连接串口"""
global serial_port, serial_connected
try:
# 获取选择的串口
selected_port = port_var.get()
if not selected_port:
update_status("请选择串口")
return
# 如果已有串口连接,先关闭
if serial_port and serial_port.is_open:
serial_port.close()
# 打开新串口
serial_port = serial.Serial(
port=selected_port,
baudrate=9600,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=1
)
if serial_port.is_open:
serial_connected = True
update_status(f"串口已连接: {selected_port}")
connect_btn.config(text="已连接", state="disabled", bg="green")
port_menu.config(state="disabled")
baud_menu.config(state="disabled")
print(f"[主线程] 串口已连接: {selected_port}")
else:
update_status("串口打开失败")
except Exception as e:
update_status(f"连接失败: {str(e)}")
print(f"[主线程] 串口连接失败: {e}")
def refresh_ports():
"""刷新串口列表"""
ports = serial.tools.list_ports.comports()
port_list = [port.device for port in ports]
# 更新下拉菜单
port_menu['menu'].delete(0, 'end')
for port in port_list:
port_menu['menu'].add_command(label=port, command=tk._setit(port_var, port))
if port_list:
port_var.set(port_list[0])
update_status(f"发现 {len(port_list)} 个串口")
else:
update_status("未发现串口")
# ========== 按钮事件处理函数 ==========
def turn_on():
"""开灯按钮点击事件"""
if serial_connected:
task_queue.put("1")
update_status("正在发送开灯指令...")
else:
update_status("请先连接串口")
def turn_off():
"""关灯按钮点击事件"""
if serial_connected:
task_queue.put("2")
update_status("正在发送关灯指令...")
else:
update_status("请先连接串口")
def schedule_on():
"""定时开灯按钮点击事件"""
try:
seconds = int(delay_entry.get())
if seconds <= 0:
update_status("请输入正数秒数")
return
task_name = f"{seconds}秒后开灯"
add_timer_task(seconds, "on", task_name)
except ValueError:
update_status("请输入有效的秒数")
def schedule_off():
"""定时关灯按钮点击事件"""
try:
seconds = int(delay_entry.get())
if seconds <= 0:
update_status("请输入正数秒数")
return
task_name = f"{seconds}秒后关灯"
add_timer_task(seconds, "off", task_name)
except ValueError:
update_status("请输入有效的秒数")
def on_closing():
"""窗口关闭事件"""
global running_threads
running_threads = False
# 发送退出信号给工作线程
task_queue.put("exit")
# 关闭串口
if serial_port and serial_port.is_open:
serial_port.close()
print("[主线程] 串口已关闭")
# 等待线程结束
time.sleep(0.5)
root.destroy()
print("[主线程] 程序结束")
# ========== 创建界面 ==========
root = tk.Tk()
root.title("多线程定时灯光控制 - 真实串口版")
root.geometry("450x550")
# 标题
title_label = tk.Label(root, text="多线程定时灯光控制", font=("Arial", 16, "bold"))
title_label.pack(pady=10)
# 串口设置区域
serial_frame = tk.Frame(root, relief=tk.GROOVE, borderwidth=2)
serial_frame.pack(pady=10, padx=20, fill=tk.X)
tk.Label(serial_frame, text="串口设置", font=("Arial", 12, "bold")).pack(pady=5)
# 串口选择
port_frame = tk.Frame(serial_frame)
port_frame.pack(pady=5)
tk.Label(port_frame, text="串口:").pack(side=tk.LEFT, padx=5)
port_var = tk.StringVar()
port_menu = tk.OptionMenu(port_frame, port_var, "")
port_menu.pack(side=tk.LEFT, padx=5)
refresh_btn = tk.Button(port_frame, text="刷新", command=refresh_ports)
refresh_btn.pack(side=tk.LEFT, padx=5)
# 波特率选择
baud_frame = tk.Frame(serial_frame)
baud_frame.pack(pady=5)
tk.Label(baud_frame, text="波特率:").pack(side=tk.LEFT, padx=5)
baud_var = tk.StringVar(value="9600")
baud_menu = tk.OptionMenu(baud_frame, baud_var, "9600", "115200", "19200", "38400", "57600")
baud_menu.pack(side=tk.LEFT, padx=5)
# 连接按钮
connect_btn = tk.Button(serial_frame, text="连接串口", command=connect_serial, bg="lightblue")
connect_btn.pack(pady=5)
# 灯光状态显示
light_frame = tk.Frame(root)
light_frame.pack(pady=10)
light_label = tk.Label(light_frame, text="灯光状态:", font=("Arial", 12))
light_label.pack(side=tk.LEFT, padx=5)
light_canvas = tk.Canvas(light_frame, width=30, height=30, bg="gray")
light_canvas.pack(side=tk.LEFT)
# 状态显示
status_label = tk.Label(root, text="请先连接串口", fg="blue", font=("Arial", 10))
status_label.pack(pady=5)
# 定时设置区域
timer_frame = tk.Frame(root)
timer_frame.pack(pady=15)
tk.Label(timer_frame, text="定时秒数:").grid(row=0, column=0, padx=5, pady=5)
delay_entry = tk.Entry(timer_frame, width=10)
delay_entry.insert(0, "5")
delay_entry.grid(row=0, column=1, padx=5, pady=5)
tk.Button(timer_frame, text="定时开灯", command=schedule_on, bg="orange").grid(row=0, column=2, padx=5, pady=5)
tk.Button(timer_frame, text="定时关灯", command=schedule_off, bg="lightblue").grid(row=0, column=3, padx=5, pady=5)
# 控制按钮区域
control_frame = tk.Frame(root)
control_frame.pack(pady=20)
tk.Button(control_frame, text="开灯", command=turn_on, bg="green", fg="white",
width=10, height=2).pack(side=tk.LEFT, padx=10)
tk.Button(control_frame, text="关灯", command=turn_off, bg="red", fg="white",
width=10, height=2).pack(side=tk.LEFT, padx=10)
# 线程状态显示
thread_frame = tk.Frame(root, relief=tk.RAISED, borderwidth=2)
thread_frame.pack(pady=15, padx=20, fill=tk.X)
tk.Label(thread_frame, text="线程状态:", font=("Arial", 10, "bold")).grid(row=0, column=0, columnspan=2)
tk.Label(thread_frame, text="主线程:").grid(row=1, column=0, sticky="e")
tk.Label(thread_frame, text="运行中", fg="green").grid(row=1, column=1, sticky="w", padx=5)
tk.Label(thread_frame, text="串口线程:").grid(row=2, column=0, sticky="e")
tk.Label(thread_frame, text="运行中", fg="green").grid(row=2, column=1, sticky="w", padx=5)
tk.Label(thread_frame, text="定时线程:").grid(row=3, column=0, sticky="e")
tk.Label(thread_frame, text="运行中", fg="green").grid(row=3, column=1, sticky="w", padx=5)
# 定时任务列表显示
task_frame = tk.Frame(root, relief=tk.SUNKEN, borderwidth=1)
task_frame.pack(pady=15, padx=20, fill=tk.X)
tk.Label(task_frame, text="等待执行的定时任务:", font=("Arial", 9)).pack(pady=5)
task_listbox = tk.Listbox(task_frame, height=3)
task_listbox.pack(fill=tk.X, padx=5, pady=5)
# ========== 启动线程和主循环 ==========
# 启动串口工作线程
threading.Thread(target=serial_worker, daemon=True).start()
# 启动定时器线程
threading.Thread(target=timer_worker, daemon=True).start()
# 启动UI更新
update_light_status()
update_task_list()
refresh_ports() # 初始刷新串口列表
# 绑定关闭事件
root.protocol("WM_DELETE_WINDOW", on_closing)
# 启动主循环
root.mainloop()