本サイトはアフィリエイト広告を利用しています

ラズパイ4からCANコマンドを定期的に送信するツール(GUI)をpythonで作ってみた

ラズパイとCAN通信

背景

シェルスクリプトを利用してラズパイ4からPCへCANコマンドを定期的に送信することができました。

スクリプトだとやはり不便なため、GUIが欲しいところです。

ラズパイには標準でpythonが組める環境が整っていますので、CANコマンドを定期的に送信するツールを作ってみます。

前回の記事はこちらです。

tkinterをインストール

GUI化する際、tkinterなるものが必要なようです。

以下コマンドをターミナルで打ち込むと、インストール状況が確認できます。

python3 -m tkinter

以下のようなポップアップが表示される場合、インストール済みであり対処不要らしいです。

tkinter

早速ツールを実装

GUIで以下を制御するプログラムです。

  • CANID
  • 送信データ
  • 送信周期
  • 送信の開始/停止
import tkinter as tk
from tkinter import ttk
import can
import threading
import time

class CANTransmitterApp:
    def __init__(self, master):
        self.master = master
        self.master.title("CAN Transmitter")

        self.interface_label = ttk.Label(master, text="Interface:")
        self.interface_label.grid(row=0, column=0, padx=10, pady=10, sticky=tk.W)

        self.interface_entry = ttk.Entry(master)
        self.interface_entry.grid(row=0, column=1, padx=10, pady=10)
        self.interface_entry.insert(tk.END, "can0")

        self.can_id_label = ttk.Label(master, text="CAN ID:")
        self.can_id_label.grid(row=1, column=0, padx=10, pady=10, sticky=tk.W)

        self.can_id_entry = ttk.Entry(master)
        self.can_id_entry.grid(row=1, column=1, padx=10, pady=10)
        self.can_id_entry.insert(tk.END, "123")

        self.data_label = ttk.Label(master, text="CAN Data (comma-separated hex values):")
        self.data_label.grid(row=2, column=0, padx=10, pady=10, sticky=tk.W)

        self.data_entry = ttk.Entry(master)
        self.data_entry.grid(row=2, column=1, padx=10, pady=10)
        self.data_entry.insert(tk.END, "11,22,33,44,55,66,77,88")

        self.period_label = ttk.Label(master, text="Period (seconds):")
        self.period_label.grid(row=3, column=0, padx=10, pady=10, sticky=tk.W)

        self.period_entry = ttk.Entry(master)
        self.period_entry.grid(row=3, column=1, padx=10, pady=10)
        self.period_entry.insert(tk.END, "1.0")

        self.start_button = ttk.Button(master, text="Start Sending", command=self.start_sending)
        self.start_button.grid(row=4, column=0, padx=10, pady=10)

        self.stop_button = ttk.Button(master, text="Stop Sending", command=self.stop_sending)
        self.stop_button.grid(row=4, column=1, padx=10, pady=10)

        self.status_label = ttk.Label(master, text="")
        self.status_label.grid(row=5, columnspan=2, padx=10, pady=10)

        self.is_sending = False
        self.bus = None
        self.send_thread = None

    def start_sending(self):
        if self.is_sending:
            self.status_label.config(text="Already sending.")
            return

        interface = self.interface_entry.get().strip()
        can_id = int(self.can_id_entry.get().strip(), 16)
        data_str = self.data_entry.get().strip()
        period = float(self.period_entry.get().strip())

        try:
            data = [int(byte, 16) for byte in data_str.split(',')]
            if len(data) != 8:
                raise ValueError("Data length must be 8 bytes.")
        except ValueError:
            self.status_label.config(text="Invalid CAN data format.")
            return

        try:
            self.bus = can.interface.Bus(channel=interface, bustype='socketcan', bitrate=500000)
        except Exception as e:
            self.status_label.config(text=f"Error: {str(e)}")
            return

        self.is_sending = True
        self.send_thread = threading.Thread(target=self.send_messages, args=(can_id, data, period))
        self.send_thread.start()
        self.status_label.config(text=f"Sending CAN messages...")

    def send_messages(self, can_id, data, period):
        message = can.Message(arbitration_id=can_id, data=data)

        while self.is_sending:
            try:
                self.bus.send(message)
                print(f"Sent CAN message: ID={hex(message.arbitration_id)}, Data={message.data}")
            except can.CanError as e:
                print(f"Error: Failed to send CAN message - {str(e)}")
                self.status_label.config(text=f"Error: Failed to send CAN message - {str(e)}")

            time.sleep(period)

        self.bus.shutdown()
        self.status_label.config(text="Stopped sending.")

    def stop_sending(self):
        if not self.is_sending:
            self.status_label.config(text="Not currently sending.")
            return

        self.is_sending = False
        if self.send_thread:
            self.send_thread.join()
        if self.bus:
            self.bus.shutdown()
        self.status_label.config(text="Stopped sending.")

def main():
    root = tk.Tk()
    app = CANTransmitterApp(root)
    root.mainloop()

if __name__ == "__main__":
    main()

ターミナルでプログラムを実行してみます。

$ sudo python cansend5.py

ラズパイ側の様子です。送信できていそうです。

pythonのツールからCAN送信している様子

以下は受信側(PC)の様子です。受信できていますね。

USB2CANでCANバスをモニターしている様子

停止時にフリーズする問題

Stop Sending(送信停止)を押下すると、フリーズして停止できなくなりました。

これにより、送信周期を変更できないという問題が。。。

問題の原因は、send_messages メソッド内の time.sleep(period)でした 。

この呼び出しはGUIのメインスレッドで行われるため、stop_sending メソッドで send_thread.join() が呼ばれると、メインスレッドが send_messages の無限ループから抜け出せなくなり、GUIがフリーズします。

解決策としては、time.sleep(period) の代わりに、タイマーを使用して定期的にCANメッセージを送信する方法を取りました。まとめますと

  • send_messages メソッド内の while ループを、タイマーを使った方法に変更
  • threading.Timer を使って定期的に send_messages を呼び出す
  • stop_sending メソッドで send_thread.cancel() を呼び出してタイマーを停止する

フリーズ問題対処後のコード

フリーズ問題に対処した後のコードは以下です。

import tkinter as tk
from tkinter import ttk
import can
import threading
import time

class CANTransmitterApp:
    def __init__(self, master):
        self.master = master
        self.master.title("CAN Transmitter")

        self.interface_label = ttk.Label(master, text="Interface:")
        self.interface_label.grid(row=0, column=0, padx=10, pady=10, sticky=tk.W)

        self.interface_entry = ttk.Entry(master)
        self.interface_entry.grid(row=0, column=1, padx=10, pady=10)
        self.interface_entry.insert(tk.END, "can0")

        self.can_id_label = ttk.Label(master, text="CAN ID:")
        self.can_id_label.grid(row=1, column=0, padx=10, pady=10, sticky=tk.W)

        self.can_id_entry = ttk.Entry(master)
        self.can_id_entry.grid(row=1, column=1, padx=10, pady=10)
        self.can_id_entry.insert(tk.END, "123")

        self.data_label = ttk.Label(master, text="CAN Data (comma-separated hex values):")
        self.data_label.grid(row=2, column=0, padx=10, pady=10, sticky=tk.W)

        self.data_entry = ttk.Entry(master)
        self.data_entry.grid(row=2, column=1, padx=10, pady=10)
        self.data_entry.insert(tk.END, "11,22,33,44,55,66,77,88")

        self.period_label = ttk.Label(master, text="Period (seconds):")
        self.period_label.grid(row=3, column=0, padx=10, pady=10, sticky=tk.W)

        self.period_entry = ttk.Entry(master)
        self.period_entry.grid(row=3, column=1, padx=10, pady=10)
        self.period_entry.insert(tk.END, "1.0")

        self.start_button = ttk.Button(master, text="Start Sending", command=self.start_sending)
        self.start_button.grid(row=4, column=0, padx=10, pady=10)

        self.stop_button = ttk.Button(master, text="Stop Sending", command=self.stop_sending)
        self.stop_button.grid(row=4, column=1, padx=10, pady=10)

        self.status_label = ttk.Label(master, text="")
        self.status_label.grid(row=5, columnspan=2, padx=10, pady=10)

        self.is_sending = False
        self.bus = None
        self.send_timer = None

    def start_sending(self):
        if self.is_sending:
            self.status_label.config(text="Already sending.")
            return

        interface = self.interface_entry.get().strip()
        can_id = int(self.can_id_entry.get().strip(), 16)
        data_str = self.data_entry.get().strip()
        period = float(self.period_entry.get().strip())

        try:
            data = [int(byte, 16) for byte in data_str.split(',')]
            if len(data) != 8:
                raise ValueError("Data length must be 8 bytes.")
        except ValueError:
            self.status_label.config(text="Invalid CAN data format.")
            return

        try:
            self.bus = can.interface.Bus(channel=interface, bustype='socketcan', bitrate=500000)
        except Exception as e:
            self.status_label.config(text=f"Error: {str(e)}")
            return

        self.is_sending = True
        self.send_messages(can_id, data, period)

    def send_messages(self, can_id, data, period):
        message = can.Message(arbitration_id=can_id, data=data)

        if self.is_sending:
            try:
                self.bus.send(message)
                print(f"Sent CAN message: ID={hex(message.arbitration_id)}, Data={message.data}")
            except can.CanError as e:
                print(f"Error: Failed to send CAN message - {str(e)}")
                self.status_label.config(text=f"Error: Failed to send CAN message - {str(e)}")

            self.send_timer = threading.Timer(period, self.send_messages, args=(can_id, data, period))
            self.send_timer.start()

    def stop_sending(self):
        if not self.is_sending:
            self.status_label.config(text="Not currently sending.")
            return

        self.is_sending = False
        if self.send_timer:
            self.send_timer.cancel()
        if self.bus:
            self.bus.shutdown()

        self.status_label.config(text="Stopped sending.")

def main():
    root = tk.Tk()
    app = CANTransmitterApp(root)
    root.mainloop()

if __name__ == "__main__":
    main()

この修正により、定期的にCANメッセージを送信する際に time.sleep を使わずに、タイマーを用いて非同期に送信処理を行うようになりました。

やっと、GUIを利用して任意のCAN定期送信の周期を変更することができるようになりました。

定期送信の数を増やす

CAN送信が1つだけだと面白くないので3つに増やしてみました。

GUIでCAN定期送信している様子

変更点は以下です。

  • 各CAN ID、CAN Data、Periodをリストで管理し、複数のCANフレームを処理する
  • 各フレームの送信処理を個別にタイマーで管理

変更後のコードは以下です。

import tkinter as tk
from tkinter import ttk
import can
import threading
import time

class CANTransmitterApp:
    def __init__(self, master):
        self.master = master
        self.master.title("CAN Transmitter")

        # Interface Input
        self.interface_label = ttk.Label(master, text="Interface:")
        self.interface_label.grid(row=0, column=0, padx=10, pady=10, sticky=tk.W)
        self.interface_entry = ttk.Entry(master)
        self.interface_entry.grid(row=0, column=1, padx=10, pady=10)
        self.interface_entry.insert(tk.END, "can0")

        # CAN ID Inputs
        self.can_id_labels = [ttk.Label(master, text=f"CAN ID {i+1}:") for i in range(3)]
        self.can_id_entries = [ttk.Entry(master) for i in range(3)]
        for i in range(3):
            self.can_id_labels[i].grid(row=i+1, column=0, padx=10, pady=10, sticky=tk.W)
            self.can_id_entries[i].grid(row=i+1, column=1, padx=10, pady=10)
            self.can_id_entries[i].insert(tk.END, f"{123 + i:03X}")

        # CAN Data Inputs
        self.data_labels = [ttk.Label(master, text=f"CAN Data {i+1} (comma-separated hex values):") for i in range(3)]
        self.data_entries = [ttk.Entry(master) for i in range(3)]
        for i in range(3):
            self.data_labels[i].grid(row=4+i, column=0, padx=10, pady=10, sticky=tk.W)
            self.data_entries[i].grid(row=4+i, column=1, padx=10, pady=10)
            self.data_entries[i].insert(tk.END, "11,22,33,44,55,66,77,88")

        # Period Inputs
        self.period_labels = [ttk.Label(master, text=f"Period {i+1} (seconds):") for i in range(3)]
        self.period_entries = [ttk.Entry(master) for i in range(3)]
        for i in range(3):
            self.period_labels[i].grid(row=7+i, column=0, padx=10, pady=10, sticky=tk.W)
            self.period_entries[i].grid(row=7+i, column=1, padx=10, pady=10)
            self.period_entries[i].insert(tk.END, "1.0")

        # Control Buttons
        self.start_button = ttk.Button(master, text="Start Sending", command=self.start_sending)
        self.start_button.grid(row=10, column=0, padx=10, pady=10)
        self.stop_button = ttk.Button(master, text="Stop Sending", command=self.stop_sending)
        self.stop_button.grid(row=10, column=1, padx=10, pady=10)

        # Status Label
        self.status_label = ttk.Label(master, text="")
        self.status_label.grid(row=11, columnspan=2, padx=10, pady=10)

        self.is_sending = False
        self.bus = None
        self.send_timers = [None, None, None]

    def start_sending(self):
        if self.is_sending:
            self.status_label.config(text="Already sending.")
            return

        interface = self.interface_entry.get().strip()
        can_ids = [int(entry.get().strip(), 16) for entry in self.can_id_entries]
        data_strs = [entry.get().strip() for entry in self.data_entries]
        periods = [float(entry.get().strip()) for entry in self.period_entries]

        try:
            data_list = []
            for data_str in data_strs:
                data = [int(byte, 16) for byte in data_str.split(',')]
                if len(data) != 8:
                    raise ValueError("Data length must be 8 bytes.")
                data_list.append(data)
        except ValueError:
            self.status_label.config(text="Invalid CAN data format.")
            return

        try:
            self.bus = can.interface.Bus(channel=interface, bustype='socketcan', bitrate=500000)
        except Exception as e:
            self.status_label.config(text=f"Error: {str(e)}")
            return

        self.is_sending = True
        for i in range(3):
            self.send_messages(can_ids[i], data_list[i], periods[i], i)

    def send_messages(self, can_id, data, period, index):
        message = can.Message(arbitration_id=can_id, data=data)

        if self.is_sending:
            try:
                self.bus.send(message)
                print(f"Sent CAN message: ID={hex(message.arbitration_id)}, Data={message.data}")
            except can.CanError as e:
                print(f"Error: Failed to send CAN message - {str(e)}")
                self.status_label.config(text=f"Error: Failed to send CAN message - {str(e)}")

            self.send_timers[index] = threading.Timer(period, self.send_messages, args=(can_id, data, period, index))
            self.send_timers[index].start()

    def stop_sending(self):
        if not self.is_sending:
            self.status_label.config(text="Not currently sending.")
            return

        self.is_sending = False
        for timer in self.send_timers:
            if timer:
                timer.cancel()
        if self.bus:
            self.bus.shutdown()

        self.status_label.config(text="Stopped sending.")

def main():
    root = tk.Tk()
    app = CANTransmitterApp(root)
    root.mainloop()

if __name__ == "__main__":
    main()

見た目を改良

ちょっとGUIの見た目が悪いので改編しました。

CAN定期送信するツール
import tkinter as tk
from tkinter import ttk
import can
import threading
import time

class CANTransmitterApp:
    def __init__(self, master):
        self.master = master
        self.master.title("CAN Transmitter")

        # Interface Input
        self.interface_label = ttk.Label(master, text="Interface:")
        self.interface_label.grid(row=0, column=0, padx=10, pady=10, sticky=tk.W)
        self.interface_entry = ttk.Entry(master)
        self.interface_entry.grid(row=0, column=1, padx=10, pady=10)
        self.interface_entry.insert(tk.END, "can0")

        # CAN ID, CAN Data, and Period Inputs
        self.can_id_labels = [ttk.Label(master, text=f"CAN ID {i+1}:") for i in range(3)]
        self.data_labels = [ttk.Label(master, text=f"CAN Data {i+1} (comma-separated hex values):") for i in range(3)]
        self.period_labels = [ttk.Label(master, text=f"Period {i+1} (seconds):") for i in range(3)]
        self.can_id_entries = [ttk.Entry(master) for i in range(3)]
        self.data_entries = [ttk.Entry(master) for i in range(3)]
        self.period_entries = [ttk.Entry(master) for i in range(3)]

        for i in range(3):
            # CAN ID
            self.can_id_labels[i].grid(row=1+i, column=0, padx=10, pady=10, sticky=tk.W)
            self.can_id_entries[i].grid(row=1+i, column=1, padx=10, pady=10)
            self.can_id_entries[i].insert(tk.END, f"{123 + i:03X}")

            # CAN Data
            self.data_labels[i].grid(row=1+i, column=2, padx=10, pady=10, sticky=tk.W)
            self.data_entries[i].grid(row=1+i, column=3, padx=10, pady=10)
            self.data_entries[i].insert(tk.END, "11,22,33,44,55,66,77,88")

            # Period
            self.period_labels[i].grid(row=1+i, column=4, padx=10, pady=10, sticky=tk.W)
            self.period_entries[i].grid(row=1+i, column=5, padx=10, pady=10)
            self.period_entries[i].insert(tk.END, "1.0")

        # Control Buttons
        self.start_button = ttk.Button(master, text="Start Sending", command=self.start_sending)
        self.start_button.grid(row=4, column=0, columnspan=3, padx=10, pady=10)
        self.stop_button = ttk.Button(master, text="Stop Sending", command=self.stop_sending)
        self.stop_button.grid(row=4, column=3, columnspan=3, padx=10, pady=10)

        # Status Label
        self.status_label = ttk.Label(master, text="")
        self.status_label.grid(row=5, columnspan=6, padx=10, pady=10)

        self.is_sending = False
        self.bus = None
        self.send_timers = [None, None, None]

    def start_sending(self):
        if self.is_sending:
            self.status_label.config(text="Already sending.")
            return

        interface = self.interface_entry.get().strip()
        can_ids = [int(entry.get().strip(), 16) for entry in self.can_id_entries]
        data_strs = [entry.get().strip() for entry in self.data_entries]
        periods = [float(entry.get().strip()) for entry in self.period_entries]

        try:
            data_list = []
            for data_str in data_strs:
                data = [int(byte, 16) for byte in data_str.split(',')]
                if len(data) != 8:
                    raise ValueError("Data length must be 8 bytes.")
                data_list.append(data)
        except ValueError:
            self.status_label.config(text="Invalid CAN data format.")
            return

        try:
            self.bus = can.interface.Bus(channel=interface, bustype='socketcan', bitrate=500000)
        except Exception as e:
            self.status_label.config(text=f"Error: {str(e)}")
            return

        self.is_sending = True
        for i in range(3):
            self.send_messages(can_ids[i], data_list[i], periods[i], i)

    def send_messages(self, can_id, data, period, index):
        message = can.Message(arbitration_id=can_id, data=data)

        if self.is_sending:
            try:
                self.bus.send(message)
                print(f"Sent CAN message: ID={hex(message.arbitration_id)}, Data={message.data}")
            except can.CanError as e:
                print(f"Error: Failed to send CAN message - {str(e)}")
                self.status_label.config(text=f"Error: Failed to send CAN message - {str(e)}")

            self.send_timers[index] = threading.Timer(period, self.send_messages, args=(can_id, data, period, index))
            self.send_timers[index].start()

    def stop_sending(self):
        if not self.is_sending:
            self.status_label.config(text="Not currently sending.")
            return

        self.is_sending = False
        for timer in self.send_timers:
            if timer:
                timer.cancel()
        if self.bus:
            self.bus.shutdown()

        self.status_label.config(text="Stopped sending.")

def main():
    root = tk.Tk()
    app = CANTransmitterApp(root)
    root.mainloop()

if __name__ == "__main__":
    main()

CAN送受信ログをGUIで出力

せっかくGUIがあるので、CANコマンドの送受信記録をモニタできるようにしてみました。

CANコマンドの送受信記録

変更点は以下3点です。

  • tk.Textウィジェットを追加して、CANメッセージの送受信ログを表示
  • receive_messages メソッドを追加し、受信したメッセージをログとして表示
  • log_message メソッドを使ってログメッセージを指定されたフォーマットで表示

改編後のコードは以下です。


    def start_sending(self):
        if self.is_sending:
            self.status_label.config(text="Already sending.")
            return

        interface = self.interface_entry.get().strip()
        can_ids = [int(entry.get().strip(), 16) for entry in self.can_id_entries]
        data_strs = [entry.get().strip() for entry in self.data_entries]
        periods = [float(entry.get().strip()) for entry in self.period_entries]

        try:
            data_list = []
            for data_str in data_strs:
                data = [int(byte, 16) for byte in data_str.split(',')]
                if len(data) != 8:
                    raise ValueError("Data length must be 8 bytes.")
                data_list.append(data)
        except ValueError:
            self.status_label.config(text="Invalid CAN data format.")
            return

        try:
            self.bus = can.interface.Bus(channel=interface, bustype='socketcan', bitrate=500000)
            self.log_message("info", f"CAN interface {interface} connected.")
        except Exception as e:
            self.status_label.config(text=f"Error: {str(e)}")
            return

        self.is_sending = True
        self.receive_thread = threading.Thread(target=self.receive_messages, daemon=True)
        self.receive_thread.start()
        for i in range(3):
            self.send_messages(can_ids[i], data_list[i], periods[i], i)

    def send_messages(self, can_id, data, period, index):
        message = can.Message(arbitration_id=can_id, data=data)

        if self.is_sending:
            try:
                self.bus.send(message)
                self.log_message("send", message)
            except can.CanError as e:
                self.log_message("error", f"Failed to send CAN message - {str(e)}")
                self.status_label.config(text=f"Error: Failed to send CAN message - {str(e)}")

            self.send_timers[index] = threading.Timer(period, self.send_messages, args=(can_id, data, period, index))
            self.send_timers[index].start()

    def receive_messages(self):
        while self.is_sending:
            try:
                message = self.bus.recv()
                if message:
                    self.log_message("recv", message)
            except Exception as e:
                self.log_message("error", f"Failed to receive CAN message - {str(e)}")

    def stop_sending(self):
        if not self.is_sending:
            self.status_label.config(text="Not currently sending.")
            return

        self.is_sending = False
        for timer in self.send_timers:
            if timer:
                timer.cancel()
        if self.bus:
            self.bus.shutdown()

        self.status_label.config(text="Stopped sending.")

    def log_message(self, msg_type, message):
        if msg_type == "send":
            log_entry = f"Send : ID={hex(message.arbitration_id)} Data={','.join(f'0x{byte:02X}' for byte in message.data)}\n"
        elif msg_type == "recv":
            log_entry = f"Recv : ID={hex(message.arbitration_id)} Data={','.join(f'0x{byte:02X}' for byte in message.data)}\n"
        elif msg_type == "error":
            log_entry = f"Error : {message}\n"
        else:
            log_entry = f"{message}\n"
        self.log_text.insert(tk.END, log_entry)
        self.log_text.see(tk.END)

def main():
    root = tk.Tk()
    app = CANTransmitterApp(root)
    root.mainloop()

if __name__ == "__main__":
    main()

CANの定期送信に関して一通り遊び倒した気がしますので、次回はちょっとマニアックな車両診断プロトコルを実装してみます。

2 COMMENTS

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です