背景
前回はシェルスクリプトを利用してラズパイからCAN定期送信を試みた。今回はpythonプログラムを利用してGUI経由で制御を試みる。前回の記事はこちら。
tkinterをインストール
pythonプログラムを用いてGUI制御する際、tkinterが必要らしい。以下コマンドで状況を確認。
python3 -m tkinter
以下のようなポップアップが表示される場合、インストール済みであり対処不要らしい。
pythonプログラムを実装
今回、GUIで制御したいと考え、以下のようなpythonプログラムを実装。
import tkinter as tk
from tkinter import ttk
import can
import threading
import time
class CANTransmitterApp:
def __init__(self, master):
self.master = master
self.master.title("CAN Transmitter")
self.interface_label = ttk.Label(master, text="Interface:")
self.interface_label.grid(row=0, column=0, padx=10, pady=10, sticky=tk.W)
self.interface_entry = ttk.Entry(master)
self.interface_entry.grid(row=0, column=1, padx=10, pady=10)
self.interface_entry.insert(tk.END, "can0")
self.can_id_label = ttk.Label(master, text="CAN ID:")
self.can_id_label.grid(row=1, column=0, padx=10, pady=10, sticky=tk.W)
self.can_id_entry = ttk.Entry(master)
self.can_id_entry.grid(row=1, column=1, padx=10, pady=10)
self.can_id_entry.insert(tk.END, "123")
self.data_label = ttk.Label(master, text="CAN Data (comma-separated hex values):")
self.data_label.grid(row=2, column=0, padx=10, pady=10, sticky=tk.W)
self.data_entry = ttk.Entry(master)
self.data_entry.grid(row=2, column=1, padx=10, pady=10)
self.data_entry.insert(tk.END, "11,22,33,44,55,66,77,88")
self.period_label = ttk.Label(master, text="Period (seconds):")
self.period_label.grid(row=3, column=0, padx=10, pady=10, sticky=tk.W)
self.period_entry = ttk.Entry(master)
self.period_entry.grid(row=3, column=1, padx=10, pady=10)
self.period_entry.insert(tk.END, "1.0")
self.start_button = ttk.Button(master, text="Start Sending", command=self.start_sending)
self.start_button.grid(row=4, column=0, padx=10, pady=10)
self.stop_button = ttk.Button(master, text="Stop Sending", command=self.stop_sending)
self.stop_button.grid(row=4, column=1, padx=10, pady=10)
self.status_label = ttk.Label(master, text="")
self.status_label.grid(row=5, columnspan=2, padx=10, pady=10)
self.is_sending = False
self.bus = None
self.send_thread = None
def start_sending(self):
if self.is_sending:
self.status_label.config(text="Already sending.")
return
interface = self.interface_entry.get().strip()
can_id = int(self.can_id_entry.get().strip(), 16)
data_str = self.data_entry.get().strip()
period = float(self.period_entry.get().strip())
try:
data = [int(byte, 16) for byte in data_str.split(',')]
if len(data) != 8:
raise ValueError("Data length must be 8 bytes.")
except ValueError:
self.status_label.config(text="Invalid CAN data format.")
return
try:
self.bus = can.interface.Bus(channel=interface, bustype='socketcan', bitrate=500000)
except Exception as e:
self.status_label.config(text=f"Error: {str(e)}")
return
self.is_sending = True
self.send_thread = threading.Thread(target=self.send_messages, args=(can_id, data, period))
self.send_thread.start()
self.status_label.config(text=f"Sending CAN messages...")
def send_messages(self, can_id, data, period):
message = can.Message(arbitration_id=can_id, data=data)
while self.is_sending:
try:
self.bus.send(message)
print(f"Sent CAN message: ID={hex(message.arbitration_id)}, Data={message.data}")
except can.CanError as e:
print(f"Error: Failed to send CAN message - {str(e)}")
self.status_label.config(text=f"Error: Failed to send CAN message - {str(e)}")
time.sleep(period)
self.bus.shutdown()
self.status_label.config(text="Stopped sending.")
def stop_sending(self):
if not self.is_sending:
self.status_label.config(text="Not currently sending.")
return
self.is_sending = False
if self.send_thread:
self.send_thread.join()
if self.bus:
self.bus.shutdown()
self.status_label.config(text="Stopped sending.")
def main():
root = tk.Tk()
app = CANTransmitterApp(root)
root.mainloop()
if __name__ == "__main__":
main()
実装後、実行!
$ sudo python cansend5.py
以下は送信側の様子。送信できている!
以下は受信側の様子。受信できている!
停止時にフリーズする問題
Stop Sending(送信停止)をした時、フリーズして停止できなくなる問題がある。これにより、送信周期を変更できない。
問題の原因は、send_messages メソッド内の time.sleep(period) 。この呼び出しはGUIのメインスレッドで行われるため、stop_sending メソッドで send_thread.join() が呼ばれると、メインスレッドが send_messages の無限ループから抜け出せなくなり、GUIがフリーズする。解決策としては、time.sleep(period) の代わりに、タイマーを使用して定期的にCANメッセージを送信する方法がある。対処方としては以下。
- send_messages メソッド内の while ループを、タイマーを使った方法に変更
- threading.Timer を使って定期的に send_messages を呼び出す
- stop_sending メソッドで send_thread.cancel() を呼び出してタイマーを停止する
フリーズ問題対処後のコード
フリーズ問題に対処後のコードは以下。
import tkinter as tk
from tkinter import ttk
import can
import threading
import time
class CANTransmitterApp:
def __init__(self, master):
self.master = master
self.master.title("CAN Transmitter")
self.interface_label = ttk.Label(master, text="Interface:")
self.interface_label.grid(row=0, column=0, padx=10, pady=10, sticky=tk.W)
self.interface_entry = ttk.Entry(master)
self.interface_entry.grid(row=0, column=1, padx=10, pady=10)
self.interface_entry.insert(tk.END, "can0")
self.can_id_label = ttk.Label(master, text="CAN ID:")
self.can_id_label.grid(row=1, column=0, padx=10, pady=10, sticky=tk.W)
self.can_id_entry = ttk.Entry(master)
self.can_id_entry.grid(row=1, column=1, padx=10, pady=10)
self.can_id_entry.insert(tk.END, "123")
self.data_label = ttk.Label(master, text="CAN Data (comma-separated hex values):")
self.data_label.grid(row=2, column=0, padx=10, pady=10, sticky=tk.W)
self.data_entry = ttk.Entry(master)
self.data_entry.grid(row=2, column=1, padx=10, pady=10)
self.data_entry.insert(tk.END, "11,22,33,44,55,66,77,88")
self.period_label = ttk.Label(master, text="Period (seconds):")
self.period_label.grid(row=3, column=0, padx=10, pady=10, sticky=tk.W)
self.period_entry = ttk.Entry(master)
self.period_entry.grid(row=3, column=1, padx=10, pady=10)
self.period_entry.insert(tk.END, "1.0")
self.start_button = ttk.Button(master, text="Start Sending", command=self.start_sending)
self.start_button.grid(row=4, column=0, padx=10, pady=10)
self.stop_button = ttk.Button(master, text="Stop Sending", command=self.stop_sending)
self.stop_button.grid(row=4, column=1, padx=10, pady=10)
self.status_label = ttk.Label(master, text="")
self.status_label.grid(row=5, columnspan=2, padx=10, pady=10)
self.is_sending = False
self.bus = None
self.send_timer = None
def start_sending(self):
if self.is_sending:
self.status_label.config(text="Already sending.")
return
interface = self.interface_entry.get().strip()
can_id = int(self.can_id_entry.get().strip(), 16)
data_str = self.data_entry.get().strip()
period = float(self.period_entry.get().strip())
try:
data = [int(byte, 16) for byte in data_str.split(',')]
if len(data) != 8:
raise ValueError("Data length must be 8 bytes.")
except ValueError:
self.status_label.config(text="Invalid CAN data format.")
return
try:
self.bus = can.interface.Bus(channel=interface, bustype='socketcan', bitrate=500000)
except Exception as e:
self.status_label.config(text=f"Error: {str(e)}")
return
self.is_sending = True
self.send_messages(can_id, data, period)
def send_messages(self, can_id, data, period):
message = can.Message(arbitration_id=can_id, data=data)
if self.is_sending:
try:
self.bus.send(message)
print(f"Sent CAN message: ID={hex(message.arbitration_id)}, Data={message.data}")
except can.CanError as e:
print(f"Error: Failed to send CAN message - {str(e)}")
self.status_label.config(text=f"Error: Failed to send CAN message - {str(e)}")
self.send_timer = threading.Timer(period, self.send_messages, args=(can_id, data, period))
self.send_timer.start()
def stop_sending(self):
if not self.is_sending:
self.status_label.config(text="Not currently sending.")
return
self.is_sending = False
if self.send_timer:
self.send_timer.cancel()
if self.bus:
self.bus.shutdown()
self.status_label.config(text="Stopped sending.")
def main():
root = tk.Tk()
app = CANTransmitterApp(root)
root.mainloop()
if __name__ == "__main__":
main()
この修正により、定期的にCANメッセージを送信する際に time.sleep を使わずに、タイマーを用いて非同期に送信処理を行う。これにより、GUIがフリーズする問題が解決された。pythonプログラムにより、ラズパイ上でGUIを利用して任意のCAN定期送信の周期を変更することができるようになった。
定期送信の数を増やす
CAN送信が1つだけだと面白くないので3つに増やしてみた。
変更点は以下。
- 各CAN ID、CAN Data、Periodをリストで管理し、複数のCANフレームを処理する
- 各フレームの送信処理を個別にタイマーで管理
変更後のコードは以下。
import tkinter as tk
from tkinter import ttk
import can
import threading
import time
class CANTransmitterApp:
def __init__(self, master):
self.master = master
self.master.title("CAN Transmitter")
# Interface Input
self.interface_label = ttk.Label(master, text="Interface:")
self.interface_label.grid(row=0, column=0, padx=10, pady=10, sticky=tk.W)
self.interface_entry = ttk.Entry(master)
self.interface_entry.grid(row=0, column=1, padx=10, pady=10)
self.interface_entry.insert(tk.END, "can0")
# CAN ID Inputs
self.can_id_labels = [ttk.Label(master, text=f"CAN ID {i+1}:") for i in range(3)]
self.can_id_entries = [ttk.Entry(master) for i in range(3)]
for i in range(3):
self.can_id_labels[i].grid(row=i+1, column=0, padx=10, pady=10, sticky=tk.W)
self.can_id_entries[i].grid(row=i+1, column=1, padx=10, pady=10)
self.can_id_entries[i].insert(tk.END, f"{123 + i:03X}")
# CAN Data Inputs
self.data_labels = [ttk.Label(master, text=f"CAN Data {i+1} (comma-separated hex values):") for i in range(3)]
self.data_entries = [ttk.Entry(master) for i in range(3)]
for i in range(3):
self.data_labels[i].grid(row=4+i, column=0, padx=10, pady=10, sticky=tk.W)
self.data_entries[i].grid(row=4+i, column=1, padx=10, pady=10)
self.data_entries[i].insert(tk.END, "11,22,33,44,55,66,77,88")
# Period Inputs
self.period_labels = [ttk.Label(master, text=f"Period {i+1} (seconds):") for i in range(3)]
self.period_entries = [ttk.Entry(master) for i in range(3)]
for i in range(3):
self.period_labels[i].grid(row=7+i, column=0, padx=10, pady=10, sticky=tk.W)
self.period_entries[i].grid(row=7+i, column=1, padx=10, pady=10)
self.period_entries[i].insert(tk.END, "1.0")
# Control Buttons
self.start_button = ttk.Button(master, text="Start Sending", command=self.start_sending)
self.start_button.grid(row=10, column=0, padx=10, pady=10)
self.stop_button = ttk.Button(master, text="Stop Sending", command=self.stop_sending)
self.stop_button.grid(row=10, column=1, padx=10, pady=10)
# Status Label
self.status_label = ttk.Label(master, text="")
self.status_label.grid(row=11, columnspan=2, padx=10, pady=10)
self.is_sending = False
self.bus = None
self.send_timers = [None, None, None]
def start_sending(self):
if self.is_sending:
self.status_label.config(text="Already sending.")
return
interface = self.interface_entry.get().strip()
can_ids = [int(entry.get().strip(), 16) for entry in self.can_id_entries]
data_strs = [entry.get().strip() for entry in self.data_entries]
periods = [float(entry.get().strip()) for entry in self.period_entries]
try:
data_list = []
for data_str in data_strs:
data = [int(byte, 16) for byte in data_str.split(',')]
if len(data) != 8:
raise ValueError("Data length must be 8 bytes.")
data_list.append(data)
except ValueError:
self.status_label.config(text="Invalid CAN data format.")
return
try:
self.bus = can.interface.Bus(channel=interface, bustype='socketcan', bitrate=500000)
except Exception as e:
self.status_label.config(text=f"Error: {str(e)}")
return
self.is_sending = True
for i in range(3):
self.send_messages(can_ids[i], data_list[i], periods[i], i)
def send_messages(self, can_id, data, period, index):
message = can.Message(arbitration_id=can_id, data=data)
if self.is_sending:
try:
self.bus.send(message)
print(f"Sent CAN message: ID={hex(message.arbitration_id)}, Data={message.data}")
except can.CanError as e:
print(f"Error: Failed to send CAN message - {str(e)}")
self.status_label.config(text=f"Error: Failed to send CAN message - {str(e)}")
self.send_timers[index] = threading.Timer(period, self.send_messages, args=(can_id, data, period, index))
self.send_timers[index].start()
def stop_sending(self):
if not self.is_sending:
self.status_label.config(text="Not currently sending.")
return
self.is_sending = False
for timer in self.send_timers:
if timer:
timer.cancel()
if self.bus:
self.bus.shutdown()
self.status_label.config(text="Stopped sending.")
def main():
root = tk.Tk()
app = CANTransmitterApp(root)
root.mainloop()
if __name__ == "__main__":
main()
見た目を改良
ちょっとGUIの見た目が悪いので改編。
修正後のコードは以下。
import tkinter as tk
from tkinter import ttk
import can
import threading
import time
class CANTransmitterApp:
def __init__(self, master):
self.master = master
self.master.title("CAN Transmitter")
# Interface Input
self.interface_label = ttk.Label(master, text="Interface:")
self.interface_label.grid(row=0, column=0, padx=10, pady=10, sticky=tk.W)
self.interface_entry = ttk.Entry(master)
self.interface_entry.grid(row=0, column=1, padx=10, pady=10)
self.interface_entry.insert(tk.END, "can0")
# CAN ID, CAN Data, and Period Inputs
self.can_id_labels = [ttk.Label(master, text=f"CAN ID {i+1}:") for i in range(3)]
self.data_labels = [ttk.Label(master, text=f"CAN Data {i+1} (comma-separated hex values):") for i in range(3)]
self.period_labels = [ttk.Label(master, text=f"Period {i+1} (seconds):") for i in range(3)]
self.can_id_entries = [ttk.Entry(master) for i in range(3)]
self.data_entries = [ttk.Entry(master) for i in range(3)]
self.period_entries = [ttk.Entry(master) for i in range(3)]
for i in range(3):
# CAN ID
self.can_id_labels[i].grid(row=1+i, column=0, padx=10, pady=10, sticky=tk.W)
self.can_id_entries[i].grid(row=1+i, column=1, padx=10, pady=10)
self.can_id_entries[i].insert(tk.END, f"{123 + i:03X}")
# CAN Data
self.data_labels[i].grid(row=1+i, column=2, padx=10, pady=10, sticky=tk.W)
self.data_entries[i].grid(row=1+i, column=3, padx=10, pady=10)
self.data_entries[i].insert(tk.END, "11,22,33,44,55,66,77,88")
# Period
self.period_labels[i].grid(row=1+i, column=4, padx=10, pady=10, sticky=tk.W)
self.period_entries[i].grid(row=1+i, column=5, padx=10, pady=10)
self.period_entries[i].insert(tk.END, "1.0")
# Control Buttons
self.start_button = ttk.Button(master, text="Start Sending", command=self.start_sending)
self.start_button.grid(row=4, column=0, columnspan=3, padx=10, pady=10)
self.stop_button = ttk.Button(master, text="Stop Sending", command=self.stop_sending)
self.stop_button.grid(row=4, column=3, columnspan=3, padx=10, pady=10)
# Status Label
self.status_label = ttk.Label(master, text="")
self.status_label.grid(row=5, columnspan=6, padx=10, pady=10)
self.is_sending = False
self.bus = None
self.send_timers = [None, None, None]
def start_sending(self):
if self.is_sending:
self.status_label.config(text="Already sending.")
return
interface = self.interface_entry.get().strip()
can_ids = [int(entry.get().strip(), 16) for entry in self.can_id_entries]
data_strs = [entry.get().strip() for entry in self.data_entries]
periods = [float(entry.get().strip()) for entry in self.period_entries]
try:
data_list = []
for data_str in data_strs:
data = [int(byte, 16) for byte in data_str.split(',')]
if len(data) != 8:
raise ValueError("Data length must be 8 bytes.")
data_list.append(data)
except ValueError:
self.status_label.config(text="Invalid CAN data format.")
return
try:
self.bus = can.interface.Bus(channel=interface, bustype='socketcan', bitrate=500000)
except Exception as e:
self.status_label.config(text=f"Error: {str(e)}")
return
self.is_sending = True
for i in range(3):
self.send_messages(can_ids[i], data_list[i], periods[i], i)
def send_messages(self, can_id, data, period, index):
message = can.Message(arbitration_id=can_id, data=data)
if self.is_sending:
try:
self.bus.send(message)
print(f"Sent CAN message: ID={hex(message.arbitration_id)}, Data={message.data}")
except can.CanError as e:
print(f"Error: Failed to send CAN message - {str(e)}")
self.status_label.config(text=f"Error: Failed to send CAN message - {str(e)}")
self.send_timers[index] = threading.Timer(period, self.send_messages, args=(can_id, data, period, index))
self.send_timers[index].start()
def stop_sending(self):
if not self.is_sending:
self.status_label.config(text="Not currently sending.")
return
self.is_sending = False
for timer in self.send_timers:
if timer:
timer.cancel()
if self.bus:
self.bus.shutdown()
self.status_label.config(text="Stopped sending.")
def main():
root = tk.Tk()
app = CANTransmitterApp(root)
root.mainloop()
if __name__ == "__main__":
main()
CAN送受信ログをGUIで出力
CAN送受信ログをGUIで表示してみる。
変更点は以下3点。
- tk.Textウィジェットを追加して、CANメッセージの送受信ログを表示
- receive_messages メソッドを追加し、受信したメッセージをログとして表示
- log_message メソッドを使ってログメッセージを指定されたフォーマットで表示
改編後のコードは以下。
def start_sending(self):
if self.is_sending:
self.status_label.config(text="Already sending.")
return
interface = self.interface_entry.get().strip()
can_ids = [int(entry.get().strip(), 16) for entry in self.can_id_entries]
data_strs = [entry.get().strip() for entry in self.data_entries]
periods = [float(entry.get().strip()) for entry in self.period_entries]
try:
data_list = []
for data_str in data_strs:
data = [int(byte, 16) for byte in data_str.split(',')]
if len(data) != 8:
raise ValueError("Data length must be 8 bytes.")
data_list.append(data)
except ValueError:
self.status_label.config(text="Invalid CAN data format.")
return
try:
self.bus = can.interface.Bus(channel=interface, bustype='socketcan', bitrate=500000)
self.log_message("info", f"CAN interface {interface} connected.")
except Exception as e:
self.status_label.config(text=f"Error: {str(e)}")
return
self.is_sending = True
self.receive_thread = threading.Thread(target=self.receive_messages, daemon=True)
self.receive_thread.start()
for i in range(3):
self.send_messages(can_ids[i], data_list[i], periods[i], i)
def send_messages(self, can_id, data, period, index):
message = can.Message(arbitration_id=can_id, data=data)
if self.is_sending:
try:
self.bus.send(message)
self.log_message("send", message)
except can.CanError as e:
self.log_message("error", f"Failed to send CAN message - {str(e)}")
self.status_label.config(text=f"Error: Failed to send CAN message - {str(e)}")
self.send_timers[index] = threading.Timer(period, self.send_messages, args=(can_id, data, period, index))
self.send_timers[index].start()
def receive_messages(self):
while self.is_sending:
try:
message = self.bus.recv()
if message:
self.log_message("recv", message)
except Exception as e:
self.log_message("error", f"Failed to receive CAN message - {str(e)}")
def stop_sending(self):
if not self.is_sending:
self.status_label.config(text="Not currently sending.")
return
self.is_sending = False
for timer in self.send_timers:
if timer:
timer.cancel()
if self.bus:
self.bus.shutdown()
self.status_label.config(text="Stopped sending.")
def log_message(self, msg_type, message):
if msg_type == "send":
log_entry = f"Send : ID={hex(message.arbitration_id)} Data={','.join(f'0x{byte:02X}' for byte in message.data)}\n"
elif msg_type == "recv":
log_entry = f"Recv : ID={hex(message.arbitration_id)} Data={','.join(f'0x{byte:02X}' for byte in message.data)}\n"
elif msg_type == "error":
log_entry = f"Error : {message}\n"
else:
log_entry = f"{message}\n"
self.log_text.insert(tk.END, log_entry)
self.log_text.see(tk.END)
def main():
root = tk.Tk()
app = CANTransmitterApp(root)
root.mainloop()
if __name__ == "__main__":
main()
[…] […]