背景
シェルスクリプトを利用してラズパイ4からPCへCANコマンドを定期的に送信することができました。
スクリプトだとやはり不便なため、GUIが欲しいところです。
ラズパイには標準でpythonが組める環境が整っていますので、CANコマンドを定期的に送信するツールを作ってみます。
前回の記事はこちらです。
tkinterをインストール
GUI化する際、tkinterなるものが必要なようです。
以下コマンドをターミナルで打ち込むと、インストール状況が確認できます。
python3 -m tkinter
以下のようなポップアップが表示される場合、インストール済みであり対処不要らしいです。
早速ツールを実装
GUIで以下を制御するプログラムです。
- CANID
- 送信データ
- 送信周期
- 送信の開始/停止
import tkinter as tk
from tkinter import ttk
import can
import threading
import time
class CANTransmitterApp:
def __init__(self, master):
self.master = master
self.master.title("CAN Transmitter")
self.interface_label = ttk.Label(master, text="Interface:")
self.interface_label.grid(row=0, column=0, padx=10, pady=10, sticky=tk.W)
self.interface_entry = ttk.Entry(master)
self.interface_entry.grid(row=0, column=1, padx=10, pady=10)
self.interface_entry.insert(tk.END, "can0")
self.can_id_label = ttk.Label(master, text="CAN ID:")
self.can_id_label.grid(row=1, column=0, padx=10, pady=10, sticky=tk.W)
self.can_id_entry = ttk.Entry(master)
self.can_id_entry.grid(row=1, column=1, padx=10, pady=10)
self.can_id_entry.insert(tk.END, "123")
self.data_label = ttk.Label(master, text="CAN Data (comma-separated hex values):")
self.data_label.grid(row=2, column=0, padx=10, pady=10, sticky=tk.W)
self.data_entry = ttk.Entry(master)
self.data_entry.grid(row=2, column=1, padx=10, pady=10)
self.data_entry.insert(tk.END, "11,22,33,44,55,66,77,88")
self.period_label = ttk.Label(master, text="Period (seconds):")
self.period_label.grid(row=3, column=0, padx=10, pady=10, sticky=tk.W)
self.period_entry = ttk.Entry(master)
self.period_entry.grid(row=3, column=1, padx=10, pady=10)
self.period_entry.insert(tk.END, "1.0")
self.start_button = ttk.Button(master, text="Start Sending", command=self.start_sending)
self.start_button.grid(row=4, column=0, padx=10, pady=10)
self.stop_button = ttk.Button(master, text="Stop Sending", command=self.stop_sending)
self.stop_button.grid(row=4, column=1, padx=10, pady=10)
self.status_label = ttk.Label(master, text="")
self.status_label.grid(row=5, columnspan=2, padx=10, pady=10)
self.is_sending = False
self.bus = None
self.send_thread = None
def start_sending(self):
if self.is_sending:
self.status_label.config(text="Already sending.")
return
interface = self.interface_entry.get().strip()
can_id = int(self.can_id_entry.get().strip(), 16)
data_str = self.data_entry.get().strip()
period = float(self.period_entry.get().strip())
try:
data = [int(byte, 16) for byte in data_str.split(',')]
if len(data) != 8:
raise ValueError("Data length must be 8 bytes.")
except ValueError:
self.status_label.config(text="Invalid CAN data format.")
return
try:
self.bus = can.interface.Bus(channel=interface, bustype='socketcan', bitrate=500000)
except Exception as e:
self.status_label.config(text=f"Error: {str(e)}")
return
self.is_sending = True
self.send_thread = threading.Thread(target=self.send_messages, args=(can_id, data, period))
self.send_thread.start()
self.status_label.config(text=f"Sending CAN messages...")
def send_messages(self, can_id, data, period):
message = can.Message(arbitration_id=can_id, data=data)
while self.is_sending:
try:
self.bus.send(message)
print(f"Sent CAN message: ID={hex(message.arbitration_id)}, Data={message.data}")
except can.CanError as e:
print(f"Error: Failed to send CAN message - {str(e)}")
self.status_label.config(text=f"Error: Failed to send CAN message - {str(e)}")
time.sleep(period)
self.bus.shutdown()
self.status_label.config(text="Stopped sending.")
def stop_sending(self):
if not self.is_sending:
self.status_label.config(text="Not currently sending.")
return
self.is_sending = False
if self.send_thread:
self.send_thread.join()
if self.bus:
self.bus.shutdown()
self.status_label.config(text="Stopped sending.")
def main():
root = tk.Tk()
app = CANTransmitterApp(root)
root.mainloop()
if __name__ == "__main__":
main()
ターミナルでプログラムを実行してみます。
$ sudo python cansend5.py
ラズパイ側の様子です。送信できていそうです。
以下は受信側(PC)の様子です。受信できていますね。
停止時にフリーズする問題
Stop Sending(送信停止)を押下すると、フリーズして停止できなくなりました。
これにより、送信周期を変更できないという問題が。。。
問題の原因は、send_messages メソッド内の time.sleep(period)でした 。
この呼び出しはGUIのメインスレッドで行われるため、stop_sending メソッドで send_thread.join() が呼ばれると、メインスレッドが send_messages の無限ループから抜け出せなくなり、GUIがフリーズします。
解決策としては、time.sleep(period) の代わりに、タイマーを使用して定期的にCANメッセージを送信する方法を取りました。まとめますと
- send_messages メソッド内の while ループを、タイマーを使った方法に変更
- threading.Timer を使って定期的に send_messages を呼び出す
- stop_sending メソッドで send_thread.cancel() を呼び出してタイマーを停止する
フリーズ問題対処後のコード
フリーズ問題に対処した後のコードは以下です。
import tkinter as tk
from tkinter import ttk
import can
import threading
import time
class CANTransmitterApp:
def __init__(self, master):
self.master = master
self.master.title("CAN Transmitter")
self.interface_label = ttk.Label(master, text="Interface:")
self.interface_label.grid(row=0, column=0, padx=10, pady=10, sticky=tk.W)
self.interface_entry = ttk.Entry(master)
self.interface_entry.grid(row=0, column=1, padx=10, pady=10)
self.interface_entry.insert(tk.END, "can0")
self.can_id_label = ttk.Label(master, text="CAN ID:")
self.can_id_label.grid(row=1, column=0, padx=10, pady=10, sticky=tk.W)
self.can_id_entry = ttk.Entry(master)
self.can_id_entry.grid(row=1, column=1, padx=10, pady=10)
self.can_id_entry.insert(tk.END, "123")
self.data_label = ttk.Label(master, text="CAN Data (comma-separated hex values):")
self.data_label.grid(row=2, column=0, padx=10, pady=10, sticky=tk.W)
self.data_entry = ttk.Entry(master)
self.data_entry.grid(row=2, column=1, padx=10, pady=10)
self.data_entry.insert(tk.END, "11,22,33,44,55,66,77,88")
self.period_label = ttk.Label(master, text="Period (seconds):")
self.period_label.grid(row=3, column=0, padx=10, pady=10, sticky=tk.W)
self.period_entry = ttk.Entry(master)
self.period_entry.grid(row=3, column=1, padx=10, pady=10)
self.period_entry.insert(tk.END, "1.0")
self.start_button = ttk.Button(master, text="Start Sending", command=self.start_sending)
self.start_button.grid(row=4, column=0, padx=10, pady=10)
self.stop_button = ttk.Button(master, text="Stop Sending", command=self.stop_sending)
self.stop_button.grid(row=4, column=1, padx=10, pady=10)
self.status_label = ttk.Label(master, text="")
self.status_label.grid(row=5, columnspan=2, padx=10, pady=10)
self.is_sending = False
self.bus = None
self.send_timer = None
def start_sending(self):
if self.is_sending:
self.status_label.config(text="Already sending.")
return
interface = self.interface_entry.get().strip()
can_id = int(self.can_id_entry.get().strip(), 16)
data_str = self.data_entry.get().strip()
period = float(self.period_entry.get().strip())
try:
data = [int(byte, 16) for byte in data_str.split(',')]
if len(data) != 8:
raise ValueError("Data length must be 8 bytes.")
except ValueError:
self.status_label.config(text="Invalid CAN data format.")
return
try:
self.bus = can.interface.Bus(channel=interface, bustype='socketcan', bitrate=500000)
except Exception as e:
self.status_label.config(text=f"Error: {str(e)}")
return
self.is_sending = True
self.send_messages(can_id, data, period)
def send_messages(self, can_id, data, period):
message = can.Message(arbitration_id=can_id, data=data)
if self.is_sending:
try:
self.bus.send(message)
print(f"Sent CAN message: ID={hex(message.arbitration_id)}, Data={message.data}")
except can.CanError as e:
print(f"Error: Failed to send CAN message - {str(e)}")
self.status_label.config(text=f"Error: Failed to send CAN message - {str(e)}")
self.send_timer = threading.Timer(period, self.send_messages, args=(can_id, data, period))
self.send_timer.start()
def stop_sending(self):
if not self.is_sending:
self.status_label.config(text="Not currently sending.")
return
self.is_sending = False
if self.send_timer:
self.send_timer.cancel()
if self.bus:
self.bus.shutdown()
self.status_label.config(text="Stopped sending.")
def main():
root = tk.Tk()
app = CANTransmitterApp(root)
root.mainloop()
if __name__ == "__main__":
main()
この修正により、定期的にCANメッセージを送信する際に time.sleep を使わずに、タイマーを用いて非同期に送信処理を行うようになりました。
やっと、GUIを利用して任意のCAN定期送信の周期を変更することができるようになりました。
定期送信の数を増やす
CAN送信が1つだけだと面白くないので3つに増やしてみました。
変更点は以下です。
- 各CAN ID、CAN Data、Periodをリストで管理し、複数のCANフレームを処理する
- 各フレームの送信処理を個別にタイマーで管理
変更後のコードは以下です。
import tkinter as tk
from tkinter import ttk
import can
import threading
import time
class CANTransmitterApp:
def __init__(self, master):
self.master = master
self.master.title("CAN Transmitter")
# Interface Input
self.interface_label = ttk.Label(master, text="Interface:")
self.interface_label.grid(row=0, column=0, padx=10, pady=10, sticky=tk.W)
self.interface_entry = ttk.Entry(master)
self.interface_entry.grid(row=0, column=1, padx=10, pady=10)
self.interface_entry.insert(tk.END, "can0")
# CAN ID Inputs
self.can_id_labels = [ttk.Label(master, text=f"CAN ID {i+1}:") for i in range(3)]
self.can_id_entries = [ttk.Entry(master) for i in range(3)]
for i in range(3):
self.can_id_labels[i].grid(row=i+1, column=0, padx=10, pady=10, sticky=tk.W)
self.can_id_entries[i].grid(row=i+1, column=1, padx=10, pady=10)
self.can_id_entries[i].insert(tk.END, f"{123 + i:03X}")
# CAN Data Inputs
self.data_labels = [ttk.Label(master, text=f"CAN Data {i+1} (comma-separated hex values):") for i in range(3)]
self.data_entries = [ttk.Entry(master) for i in range(3)]
for i in range(3):
self.data_labels[i].grid(row=4+i, column=0, padx=10, pady=10, sticky=tk.W)
self.data_entries[i].grid(row=4+i, column=1, padx=10, pady=10)
self.data_entries[i].insert(tk.END, "11,22,33,44,55,66,77,88")
# Period Inputs
self.period_labels = [ttk.Label(master, text=f"Period {i+1} (seconds):") for i in range(3)]
self.period_entries = [ttk.Entry(master) for i in range(3)]
for i in range(3):
self.period_labels[i].grid(row=7+i, column=0, padx=10, pady=10, sticky=tk.W)
self.period_entries[i].grid(row=7+i, column=1, padx=10, pady=10)
self.period_entries[i].insert(tk.END, "1.0")
# Control Buttons
self.start_button = ttk.Button(master, text="Start Sending", command=self.start_sending)
self.start_button.grid(row=10, column=0, padx=10, pady=10)
self.stop_button = ttk.Button(master, text="Stop Sending", command=self.stop_sending)
self.stop_button.grid(row=10, column=1, padx=10, pady=10)
# Status Label
self.status_label = ttk.Label(master, text="")
self.status_label.grid(row=11, columnspan=2, padx=10, pady=10)
self.is_sending = False
self.bus = None
self.send_timers = [None, None, None]
def start_sending(self):
if self.is_sending:
self.status_label.config(text="Already sending.")
return
interface = self.interface_entry.get().strip()
can_ids = [int(entry.get().strip(), 16) for entry in self.can_id_entries]
data_strs = [entry.get().strip() for entry in self.data_entries]
periods = [float(entry.get().strip()) for entry in self.period_entries]
try:
data_list = []
for data_str in data_strs:
data = [int(byte, 16) for byte in data_str.split(',')]
if len(data) != 8:
raise ValueError("Data length must be 8 bytes.")
data_list.append(data)
except ValueError:
self.status_label.config(text="Invalid CAN data format.")
return
try:
self.bus = can.interface.Bus(channel=interface, bustype='socketcan', bitrate=500000)
except Exception as e:
self.status_label.config(text=f"Error: {str(e)}")
return
self.is_sending = True
for i in range(3):
self.send_messages(can_ids[i], data_list[i], periods[i], i)
def send_messages(self, can_id, data, period, index):
message = can.Message(arbitration_id=can_id, data=data)
if self.is_sending:
try:
self.bus.send(message)
print(f"Sent CAN message: ID={hex(message.arbitration_id)}, Data={message.data}")
except can.CanError as e:
print(f"Error: Failed to send CAN message - {str(e)}")
self.status_label.config(text=f"Error: Failed to send CAN message - {str(e)}")
self.send_timers[index] = threading.Timer(period, self.send_messages, args=(can_id, data, period, index))
self.send_timers[index].start()
def stop_sending(self):
if not self.is_sending:
self.status_label.config(text="Not currently sending.")
return
self.is_sending = False
for timer in self.send_timers:
if timer:
timer.cancel()
if self.bus:
self.bus.shutdown()
self.status_label.config(text="Stopped sending.")
def main():
root = tk.Tk()
app = CANTransmitterApp(root)
root.mainloop()
if __name__ == "__main__":
main()
見た目を改良
ちょっとGUIの見た目が悪いので改編しました。
import tkinter as tk
from tkinter import ttk
import can
import threading
import time
class CANTransmitterApp:
def __init__(self, master):
self.master = master
self.master.title("CAN Transmitter")
# Interface Input
self.interface_label = ttk.Label(master, text="Interface:")
self.interface_label.grid(row=0, column=0, padx=10, pady=10, sticky=tk.W)
self.interface_entry = ttk.Entry(master)
self.interface_entry.grid(row=0, column=1, padx=10, pady=10)
self.interface_entry.insert(tk.END, "can0")
# CAN ID, CAN Data, and Period Inputs
self.can_id_labels = [ttk.Label(master, text=f"CAN ID {i+1}:") for i in range(3)]
self.data_labels = [ttk.Label(master, text=f"CAN Data {i+1} (comma-separated hex values):") for i in range(3)]
self.period_labels = [ttk.Label(master, text=f"Period {i+1} (seconds):") for i in range(3)]
self.can_id_entries = [ttk.Entry(master) for i in range(3)]
self.data_entries = [ttk.Entry(master) for i in range(3)]
self.period_entries = [ttk.Entry(master) for i in range(3)]
for i in range(3):
# CAN ID
self.can_id_labels[i].grid(row=1+i, column=0, padx=10, pady=10, sticky=tk.W)
self.can_id_entries[i].grid(row=1+i, column=1, padx=10, pady=10)
self.can_id_entries[i].insert(tk.END, f"{123 + i:03X}")
# CAN Data
self.data_labels[i].grid(row=1+i, column=2, padx=10, pady=10, sticky=tk.W)
self.data_entries[i].grid(row=1+i, column=3, padx=10, pady=10)
self.data_entries[i].insert(tk.END, "11,22,33,44,55,66,77,88")
# Period
self.period_labels[i].grid(row=1+i, column=4, padx=10, pady=10, sticky=tk.W)
self.period_entries[i].grid(row=1+i, column=5, padx=10, pady=10)
self.period_entries[i].insert(tk.END, "1.0")
# Control Buttons
self.start_button = ttk.Button(master, text="Start Sending", command=self.start_sending)
self.start_button.grid(row=4, column=0, columnspan=3, padx=10, pady=10)
self.stop_button = ttk.Button(master, text="Stop Sending", command=self.stop_sending)
self.stop_button.grid(row=4, column=3, columnspan=3, padx=10, pady=10)
# Status Label
self.status_label = ttk.Label(master, text="")
self.status_label.grid(row=5, columnspan=6, padx=10, pady=10)
self.is_sending = False
self.bus = None
self.send_timers = [None, None, None]
def start_sending(self):
if self.is_sending:
self.status_label.config(text="Already sending.")
return
interface = self.interface_entry.get().strip()
can_ids = [int(entry.get().strip(), 16) for entry in self.can_id_entries]
data_strs = [entry.get().strip() for entry in self.data_entries]
periods = [float(entry.get().strip()) for entry in self.period_entries]
try:
data_list = []
for data_str in data_strs:
data = [int(byte, 16) for byte in data_str.split(',')]
if len(data) != 8:
raise ValueError("Data length must be 8 bytes.")
data_list.append(data)
except ValueError:
self.status_label.config(text="Invalid CAN data format.")
return
try:
self.bus = can.interface.Bus(channel=interface, bustype='socketcan', bitrate=500000)
except Exception as e:
self.status_label.config(text=f"Error: {str(e)}")
return
self.is_sending = True
for i in range(3):
self.send_messages(can_ids[i], data_list[i], periods[i], i)
def send_messages(self, can_id, data, period, index):
message = can.Message(arbitration_id=can_id, data=data)
if self.is_sending:
try:
self.bus.send(message)
print(f"Sent CAN message: ID={hex(message.arbitration_id)}, Data={message.data}")
except can.CanError as e:
print(f"Error: Failed to send CAN message - {str(e)}")
self.status_label.config(text=f"Error: Failed to send CAN message - {str(e)}")
self.send_timers[index] = threading.Timer(period, self.send_messages, args=(can_id, data, period, index))
self.send_timers[index].start()
def stop_sending(self):
if not self.is_sending:
self.status_label.config(text="Not currently sending.")
return
self.is_sending = False
for timer in self.send_timers:
if timer:
timer.cancel()
if self.bus:
self.bus.shutdown()
self.status_label.config(text="Stopped sending.")
def main():
root = tk.Tk()
app = CANTransmitterApp(root)
root.mainloop()
if __name__ == "__main__":
main()
CAN送受信ログをGUIで出力
せっかくGUIがあるので、CANコマンドの送受信記録をモニタできるようにしてみました。
変更点は以下3点です。
- tk.Textウィジェットを追加して、CANメッセージの送受信ログを表示
- receive_messages メソッドを追加し、受信したメッセージをログとして表示
- log_message メソッドを使ってログメッセージを指定されたフォーマットで表示
改編後のコードは以下です。
def start_sending(self):
if self.is_sending:
self.status_label.config(text="Already sending.")
return
interface = self.interface_entry.get().strip()
can_ids = [int(entry.get().strip(), 16) for entry in self.can_id_entries]
data_strs = [entry.get().strip() for entry in self.data_entries]
periods = [float(entry.get().strip()) for entry in self.period_entries]
try:
data_list = []
for data_str in data_strs:
data = [int(byte, 16) for byte in data_str.split(',')]
if len(data) != 8:
raise ValueError("Data length must be 8 bytes.")
data_list.append(data)
except ValueError:
self.status_label.config(text="Invalid CAN data format.")
return
try:
self.bus = can.interface.Bus(channel=interface, bustype='socketcan', bitrate=500000)
self.log_message("info", f"CAN interface {interface} connected.")
except Exception as e:
self.status_label.config(text=f"Error: {str(e)}")
return
self.is_sending = True
self.receive_thread = threading.Thread(target=self.receive_messages, daemon=True)
self.receive_thread.start()
for i in range(3):
self.send_messages(can_ids[i], data_list[i], periods[i], i)
def send_messages(self, can_id, data, period, index):
message = can.Message(arbitration_id=can_id, data=data)
if self.is_sending:
try:
self.bus.send(message)
self.log_message("send", message)
except can.CanError as e:
self.log_message("error", f"Failed to send CAN message - {str(e)}")
self.status_label.config(text=f"Error: Failed to send CAN message - {str(e)}")
self.send_timers[index] = threading.Timer(period, self.send_messages, args=(can_id, data, period, index))
self.send_timers[index].start()
def receive_messages(self):
while self.is_sending:
try:
message = self.bus.recv()
if message:
self.log_message("recv", message)
except Exception as e:
self.log_message("error", f"Failed to receive CAN message - {str(e)}")
def stop_sending(self):
if not self.is_sending:
self.status_label.config(text="Not currently sending.")
return
self.is_sending = False
for timer in self.send_timers:
if timer:
timer.cancel()
if self.bus:
self.bus.shutdown()
self.status_label.config(text="Stopped sending.")
def log_message(self, msg_type, message):
if msg_type == "send":
log_entry = f"Send : ID={hex(message.arbitration_id)} Data={','.join(f'0x{byte:02X}' for byte in message.data)}\n"
elif msg_type == "recv":
log_entry = f"Recv : ID={hex(message.arbitration_id)} Data={','.join(f'0x{byte:02X}' for byte in message.data)}\n"
elif msg_type == "error":
log_entry = f"Error : {message}\n"
else:
log_entry = f"{message}\n"
self.log_text.insert(tk.END, log_entry)
self.log_text.see(tk.END)
def main():
root = tk.Tk()
app = CANTransmitterApp(root)
root.mainloop()
if __name__ == "__main__":
main()
CANの定期送信に関して一通り遊び倒した気がしますので、次回はちょっとマニアックな車両診断プロトコルを実装してみます。
[…] […]
[…] […]