本サイトはアフィリエイト広告を利用しています

Raspberry PI 4 Model B(ラズパイ)でpythonプログラムを組んでみる~CAN定期送信~

背景

前回はシェルスクリプトを利用してラズパイからCAN定期送信を試みた。今回はpythonプログラムを利用してGUI経由で制御を試みる。前回の記事はこちら。

tkinterをインストール

pythonプログラムを用いてGUI制御する際、tkinterが必要らしい。以下コマンドで状況を確認。

python3 -m tkinter

以下のようなポップアップが表示される場合、インストール済みであり対処不要らしい。

pythonプログラムを実装

今回、GUIで制御したいと考え、以下のようなpythonプログラムを実装。

import tkinter as tk
from tkinter import ttk
import can
import threading
import time

class CANTransmitterApp:
    def __init__(self, master):
        self.master = master
        self.master.title("CAN Transmitter")

        self.interface_label = ttk.Label(master, text="Interface:")
        self.interface_label.grid(row=0, column=0, padx=10, pady=10, sticky=tk.W)

        self.interface_entry = ttk.Entry(master)
        self.interface_entry.grid(row=0, column=1, padx=10, pady=10)
        self.interface_entry.insert(tk.END, "can0")

        self.can_id_label = ttk.Label(master, text="CAN ID:")
        self.can_id_label.grid(row=1, column=0, padx=10, pady=10, sticky=tk.W)

        self.can_id_entry = ttk.Entry(master)
        self.can_id_entry.grid(row=1, column=1, padx=10, pady=10)
        self.can_id_entry.insert(tk.END, "123")

        self.data_label = ttk.Label(master, text="CAN Data (comma-separated hex values):")
        self.data_label.grid(row=2, column=0, padx=10, pady=10, sticky=tk.W)

        self.data_entry = ttk.Entry(master)
        self.data_entry.grid(row=2, column=1, padx=10, pady=10)
        self.data_entry.insert(tk.END, "11,22,33,44,55,66,77,88")

        self.period_label = ttk.Label(master, text="Period (seconds):")
        self.period_label.grid(row=3, column=0, padx=10, pady=10, sticky=tk.W)

        self.period_entry = ttk.Entry(master)
        self.period_entry.grid(row=3, column=1, padx=10, pady=10)
        self.period_entry.insert(tk.END, "1.0")

        self.start_button = ttk.Button(master, text="Start Sending", command=self.start_sending)
        self.start_button.grid(row=4, column=0, padx=10, pady=10)

        self.stop_button = ttk.Button(master, text="Stop Sending", command=self.stop_sending)
        self.stop_button.grid(row=4, column=1, padx=10, pady=10)

        self.status_label = ttk.Label(master, text="")
        self.status_label.grid(row=5, columnspan=2, padx=10, pady=10)

        self.is_sending = False
        self.bus = None
        self.send_thread = None

    def start_sending(self):
        if self.is_sending:
            self.status_label.config(text="Already sending.")
            return

        interface = self.interface_entry.get().strip()
        can_id = int(self.can_id_entry.get().strip(), 16)
        data_str = self.data_entry.get().strip()
        period = float(self.period_entry.get().strip())

        try:
            data = [int(byte, 16) for byte in data_str.split(',')]
            if len(data) != 8:
                raise ValueError("Data length must be 8 bytes.")
        except ValueError:
            self.status_label.config(text="Invalid CAN data format.")
            return

        try:
            self.bus = can.interface.Bus(channel=interface, bustype='socketcan', bitrate=500000)
        except Exception as e:
            self.status_label.config(text=f"Error: {str(e)}")
            return

        self.is_sending = True
        self.send_thread = threading.Thread(target=self.send_messages, args=(can_id, data, period))
        self.send_thread.start()
        self.status_label.config(text=f"Sending CAN messages...")

    def send_messages(self, can_id, data, period):
        message = can.Message(arbitration_id=can_id, data=data)

        while self.is_sending:
            try:
                self.bus.send(message)
                print(f"Sent CAN message: ID={hex(message.arbitration_id)}, Data={message.data}")
            except can.CanError as e:
                print(f"Error: Failed to send CAN message - {str(e)}")
                self.status_label.config(text=f"Error: Failed to send CAN message - {str(e)}")

            time.sleep(period)

        self.bus.shutdown()
        self.status_label.config(text="Stopped sending.")

    def stop_sending(self):
        if not self.is_sending:
            self.status_label.config(text="Not currently sending.")
            return

        self.is_sending = False
        if self.send_thread:
            self.send_thread.join()
        if self.bus:
            self.bus.shutdown()
        self.status_label.config(text="Stopped sending.")

def main():
    root = tk.Tk()
    app = CANTransmitterApp(root)
    root.mainloop()

if __name__ == "__main__":
    main()

実装後、実行!

$ sudo python cansend5.py

以下は送信側の様子。送信できている!

以下は受信側の様子。受信できている!

停止時にフリーズする問題

Stop Sending(送信停止)をした時、フリーズして停止できなくなる問題がある。これにより、送信周期を変更できない。

問題の原因は、send_messages メソッド内の time.sleep(period) 。この呼び出しはGUIのメインスレッドで行われるため、stop_sending メソッドで send_thread.join() が呼ばれると、メインスレッドが send_messages の無限ループから抜け出せなくなり、GUIがフリーズする。解決策としては、time.sleep(period) の代わりに、タイマーを使用して定期的にCANメッセージを送信する方法がある。対処方としては以下。

  • send_messages メソッド内の while ループを、タイマーを使った方法に変更
  • threading.Timer を使って定期的に send_messages を呼び出す
  • stop_sending メソッドで send_thread.cancel() を呼び出してタイマーを停止する

フリーズ問題対処後のコード

フリーズ問題に対処後のコードは以下。

import tkinter as tk
from tkinter import ttk
import can
import threading
import time

class CANTransmitterApp:
    def __init__(self, master):
        self.master = master
        self.master.title("CAN Transmitter")

        self.interface_label = ttk.Label(master, text="Interface:")
        self.interface_label.grid(row=0, column=0, padx=10, pady=10, sticky=tk.W)

        self.interface_entry = ttk.Entry(master)
        self.interface_entry.grid(row=0, column=1, padx=10, pady=10)
        self.interface_entry.insert(tk.END, "can0")

        self.can_id_label = ttk.Label(master, text="CAN ID:")
        self.can_id_label.grid(row=1, column=0, padx=10, pady=10, sticky=tk.W)

        self.can_id_entry = ttk.Entry(master)
        self.can_id_entry.grid(row=1, column=1, padx=10, pady=10)
        self.can_id_entry.insert(tk.END, "123")

        self.data_label = ttk.Label(master, text="CAN Data (comma-separated hex values):")
        self.data_label.grid(row=2, column=0, padx=10, pady=10, sticky=tk.W)

        self.data_entry = ttk.Entry(master)
        self.data_entry.grid(row=2, column=1, padx=10, pady=10)
        self.data_entry.insert(tk.END, "11,22,33,44,55,66,77,88")

        self.period_label = ttk.Label(master, text="Period (seconds):")
        self.period_label.grid(row=3, column=0, padx=10, pady=10, sticky=tk.W)

        self.period_entry = ttk.Entry(master)
        self.period_entry.grid(row=3, column=1, padx=10, pady=10)
        self.period_entry.insert(tk.END, "1.0")

        self.start_button = ttk.Button(master, text="Start Sending", command=self.start_sending)
        self.start_button.grid(row=4, column=0, padx=10, pady=10)

        self.stop_button = ttk.Button(master, text="Stop Sending", command=self.stop_sending)
        self.stop_button.grid(row=4, column=1, padx=10, pady=10)

        self.status_label = ttk.Label(master, text="")
        self.status_label.grid(row=5, columnspan=2, padx=10, pady=10)

        self.is_sending = False
        self.bus = None
        self.send_timer = None

    def start_sending(self):
        if self.is_sending:
            self.status_label.config(text="Already sending.")
            return

        interface = self.interface_entry.get().strip()
        can_id = int(self.can_id_entry.get().strip(), 16)
        data_str = self.data_entry.get().strip()
        period = float(self.period_entry.get().strip())

        try:
            data = [int(byte, 16) for byte in data_str.split(',')]
            if len(data) != 8:
                raise ValueError("Data length must be 8 bytes.")
        except ValueError:
            self.status_label.config(text="Invalid CAN data format.")
            return

        try:
            self.bus = can.interface.Bus(channel=interface, bustype='socketcan', bitrate=500000)
        except Exception as e:
            self.status_label.config(text=f"Error: {str(e)}")
            return

        self.is_sending = True
        self.send_messages(can_id, data, period)

    def send_messages(self, can_id, data, period):
        message = can.Message(arbitration_id=can_id, data=data)

        if self.is_sending:
            try:
                self.bus.send(message)
                print(f"Sent CAN message: ID={hex(message.arbitration_id)}, Data={message.data}")
            except can.CanError as e:
                print(f"Error: Failed to send CAN message - {str(e)}")
                self.status_label.config(text=f"Error: Failed to send CAN message - {str(e)}")

            self.send_timer = threading.Timer(period, self.send_messages, args=(can_id, data, period))
            self.send_timer.start()

    def stop_sending(self):
        if not self.is_sending:
            self.status_label.config(text="Not currently sending.")
            return

        self.is_sending = False
        if self.send_timer:
            self.send_timer.cancel()
        if self.bus:
            self.bus.shutdown()

        self.status_label.config(text="Stopped sending.")

def main():
    root = tk.Tk()
    app = CANTransmitterApp(root)
    root.mainloop()

if __name__ == "__main__":
    main()

この修正により、定期的にCANメッセージを送信する際に time.sleep を使わずに、タイマーを用いて非同期に送信処理を行う。これにより、GUIがフリーズする問題が解決された。pythonプログラムにより、ラズパイ上でGUIを利用して任意のCAN定期送信の周期を変更することができるようになった。

定期送信の数を増やす

CAN送信が1つだけだと面白くないので3つに増やしてみた。

変更点は以下。

  • 各CAN ID、CAN Data、Periodをリストで管理し、複数のCANフレームを処理する
  • 各フレームの送信処理を個別にタイマーで管理

変更後のコードは以下。

import tkinter as tk
from tkinter import ttk
import can
import threading
import time

class CANTransmitterApp:
    def __init__(self, master):
        self.master = master
        self.master.title("CAN Transmitter")

        # Interface Input
        self.interface_label = ttk.Label(master, text="Interface:")
        self.interface_label.grid(row=0, column=0, padx=10, pady=10, sticky=tk.W)
        self.interface_entry = ttk.Entry(master)
        self.interface_entry.grid(row=0, column=1, padx=10, pady=10)
        self.interface_entry.insert(tk.END, "can0")

        # CAN ID Inputs
        self.can_id_labels = [ttk.Label(master, text=f"CAN ID {i+1}:") for i in range(3)]
        self.can_id_entries = [ttk.Entry(master) for i in range(3)]
        for i in range(3):
            self.can_id_labels[i].grid(row=i+1, column=0, padx=10, pady=10, sticky=tk.W)
            self.can_id_entries[i].grid(row=i+1, column=1, padx=10, pady=10)
            self.can_id_entries[i].insert(tk.END, f"{123 + i:03X}")

        # CAN Data Inputs
        self.data_labels = [ttk.Label(master, text=f"CAN Data {i+1} (comma-separated hex values):") for i in range(3)]
        self.data_entries = [ttk.Entry(master) for i in range(3)]
        for i in range(3):
            self.data_labels[i].grid(row=4+i, column=0, padx=10, pady=10, sticky=tk.W)
            self.data_entries[i].grid(row=4+i, column=1, padx=10, pady=10)
            self.data_entries[i].insert(tk.END, "11,22,33,44,55,66,77,88")

        # Period Inputs
        self.period_labels = [ttk.Label(master, text=f"Period {i+1} (seconds):") for i in range(3)]
        self.period_entries = [ttk.Entry(master) for i in range(3)]
        for i in range(3):
            self.period_labels[i].grid(row=7+i, column=0, padx=10, pady=10, sticky=tk.W)
            self.period_entries[i].grid(row=7+i, column=1, padx=10, pady=10)
            self.period_entries[i].insert(tk.END, "1.0")

        # Control Buttons
        self.start_button = ttk.Button(master, text="Start Sending", command=self.start_sending)
        self.start_button.grid(row=10, column=0, padx=10, pady=10)
        self.stop_button = ttk.Button(master, text="Stop Sending", command=self.stop_sending)
        self.stop_button.grid(row=10, column=1, padx=10, pady=10)

        # Status Label
        self.status_label = ttk.Label(master, text="")
        self.status_label.grid(row=11, columnspan=2, padx=10, pady=10)

        self.is_sending = False
        self.bus = None
        self.send_timers = [None, None, None]

    def start_sending(self):
        if self.is_sending:
            self.status_label.config(text="Already sending.")
            return

        interface = self.interface_entry.get().strip()
        can_ids = [int(entry.get().strip(), 16) for entry in self.can_id_entries]
        data_strs = [entry.get().strip() for entry in self.data_entries]
        periods = [float(entry.get().strip()) for entry in self.period_entries]

        try:
            data_list = []
            for data_str in data_strs:
                data = [int(byte, 16) for byte in data_str.split(',')]
                if len(data) != 8:
                    raise ValueError("Data length must be 8 bytes.")
                data_list.append(data)
        except ValueError:
            self.status_label.config(text="Invalid CAN data format.")
            return

        try:
            self.bus = can.interface.Bus(channel=interface, bustype='socketcan', bitrate=500000)
        except Exception as e:
            self.status_label.config(text=f"Error: {str(e)}")
            return

        self.is_sending = True
        for i in range(3):
            self.send_messages(can_ids[i], data_list[i], periods[i], i)

    def send_messages(self, can_id, data, period, index):
        message = can.Message(arbitration_id=can_id, data=data)

        if self.is_sending:
            try:
                self.bus.send(message)
                print(f"Sent CAN message: ID={hex(message.arbitration_id)}, Data={message.data}")
            except can.CanError as e:
                print(f"Error: Failed to send CAN message - {str(e)}")
                self.status_label.config(text=f"Error: Failed to send CAN message - {str(e)}")

            self.send_timers[index] = threading.Timer(period, self.send_messages, args=(can_id, data, period, index))
            self.send_timers[index].start()

    def stop_sending(self):
        if not self.is_sending:
            self.status_label.config(text="Not currently sending.")
            return

        self.is_sending = False
        for timer in self.send_timers:
            if timer:
                timer.cancel()
        if self.bus:
            self.bus.shutdown()

        self.status_label.config(text="Stopped sending.")

def main():
    root = tk.Tk()
    app = CANTransmitterApp(root)
    root.mainloop()

if __name__ == "__main__":
    main()

見た目を改良

ちょっとGUIの見た目が悪いので改編。

修正後のコードは以下。

import tkinter as tk
from tkinter import ttk
import can
import threading
import time

class CANTransmitterApp:
    def __init__(self, master):
        self.master = master
        self.master.title("CAN Transmitter")

        # Interface Input
        self.interface_label = ttk.Label(master, text="Interface:")
        self.interface_label.grid(row=0, column=0, padx=10, pady=10, sticky=tk.W)
        self.interface_entry = ttk.Entry(master)
        self.interface_entry.grid(row=0, column=1, padx=10, pady=10)
        self.interface_entry.insert(tk.END, "can0")

        # CAN ID, CAN Data, and Period Inputs
        self.can_id_labels = [ttk.Label(master, text=f"CAN ID {i+1}:") for i in range(3)]
        self.data_labels = [ttk.Label(master, text=f"CAN Data {i+1} (comma-separated hex values):") for i in range(3)]
        self.period_labels = [ttk.Label(master, text=f"Period {i+1} (seconds):") for i in range(3)]
        self.can_id_entries = [ttk.Entry(master) for i in range(3)]
        self.data_entries = [ttk.Entry(master) for i in range(3)]
        self.period_entries = [ttk.Entry(master) for i in range(3)]

        for i in range(3):
            # CAN ID
            self.can_id_labels[i].grid(row=1+i, column=0, padx=10, pady=10, sticky=tk.W)
            self.can_id_entries[i].grid(row=1+i, column=1, padx=10, pady=10)
            self.can_id_entries[i].insert(tk.END, f"{123 + i:03X}")

            # CAN Data
            self.data_labels[i].grid(row=1+i, column=2, padx=10, pady=10, sticky=tk.W)
            self.data_entries[i].grid(row=1+i, column=3, padx=10, pady=10)
            self.data_entries[i].insert(tk.END, "11,22,33,44,55,66,77,88")

            # Period
            self.period_labels[i].grid(row=1+i, column=4, padx=10, pady=10, sticky=tk.W)
            self.period_entries[i].grid(row=1+i, column=5, padx=10, pady=10)
            self.period_entries[i].insert(tk.END, "1.0")

        # Control Buttons
        self.start_button = ttk.Button(master, text="Start Sending", command=self.start_sending)
        self.start_button.grid(row=4, column=0, columnspan=3, padx=10, pady=10)
        self.stop_button = ttk.Button(master, text="Stop Sending", command=self.stop_sending)
        self.stop_button.grid(row=4, column=3, columnspan=3, padx=10, pady=10)

        # Status Label
        self.status_label = ttk.Label(master, text="")
        self.status_label.grid(row=5, columnspan=6, padx=10, pady=10)

        self.is_sending = False
        self.bus = None
        self.send_timers = [None, None, None]

    def start_sending(self):
        if self.is_sending:
            self.status_label.config(text="Already sending.")
            return

        interface = self.interface_entry.get().strip()
        can_ids = [int(entry.get().strip(), 16) for entry in self.can_id_entries]
        data_strs = [entry.get().strip() for entry in self.data_entries]
        periods = [float(entry.get().strip()) for entry in self.period_entries]

        try:
            data_list = []
            for data_str in data_strs:
                data = [int(byte, 16) for byte in data_str.split(',')]
                if len(data) != 8:
                    raise ValueError("Data length must be 8 bytes.")
                data_list.append(data)
        except ValueError:
            self.status_label.config(text="Invalid CAN data format.")
            return

        try:
            self.bus = can.interface.Bus(channel=interface, bustype='socketcan', bitrate=500000)
        except Exception as e:
            self.status_label.config(text=f"Error: {str(e)}")
            return

        self.is_sending = True
        for i in range(3):
            self.send_messages(can_ids[i], data_list[i], periods[i], i)

    def send_messages(self, can_id, data, period, index):
        message = can.Message(arbitration_id=can_id, data=data)

        if self.is_sending:
            try:
                self.bus.send(message)
                print(f"Sent CAN message: ID={hex(message.arbitration_id)}, Data={message.data}")
            except can.CanError as e:
                print(f"Error: Failed to send CAN message - {str(e)}")
                self.status_label.config(text=f"Error: Failed to send CAN message - {str(e)}")

            self.send_timers[index] = threading.Timer(period, self.send_messages, args=(can_id, data, period, index))
            self.send_timers[index].start()

    def stop_sending(self):
        if not self.is_sending:
            self.status_label.config(text="Not currently sending.")
            return

        self.is_sending = False
        for timer in self.send_timers:
            if timer:
                timer.cancel()
        if self.bus:
            self.bus.shutdown()

        self.status_label.config(text="Stopped sending.")

def main():
    root = tk.Tk()
    app = CANTransmitterApp(root)
    root.mainloop()

if __name__ == "__main__":
    main()

CAN送受信ログをGUIで出力

CAN送受信ログをGUIで表示してみる。

変更点は以下3点。

  • tk.Textウィジェットを追加して、CANメッセージの送受信ログを表示
  • receive_messages メソッドを追加し、受信したメッセージをログとして表示
  • log_message メソッドを使ってログメッセージを指定されたフォーマットで表示

改編後のコードは以下。


    def start_sending(self):
        if self.is_sending:
            self.status_label.config(text="Already sending.")
            return

        interface = self.interface_entry.get().strip()
        can_ids = [int(entry.get().strip(), 16) for entry in self.can_id_entries]
        data_strs = [entry.get().strip() for entry in self.data_entries]
        periods = [float(entry.get().strip()) for entry in self.period_entries]

        try:
            data_list = []
            for data_str in data_strs:
                data = [int(byte, 16) for byte in data_str.split(',')]
                if len(data) != 8:
                    raise ValueError("Data length must be 8 bytes.")
                data_list.append(data)
        except ValueError:
            self.status_label.config(text="Invalid CAN data format.")
            return

        try:
            self.bus = can.interface.Bus(channel=interface, bustype='socketcan', bitrate=500000)
            self.log_message("info", f"CAN interface {interface} connected.")
        except Exception as e:
            self.status_label.config(text=f"Error: {str(e)}")
            return

        self.is_sending = True
        self.receive_thread = threading.Thread(target=self.receive_messages, daemon=True)
        self.receive_thread.start()
        for i in range(3):
            self.send_messages(can_ids[i], data_list[i], periods[i], i)

    def send_messages(self, can_id, data, period, index):
        message = can.Message(arbitration_id=can_id, data=data)

        if self.is_sending:
            try:
                self.bus.send(message)
                self.log_message("send", message)
            except can.CanError as e:
                self.log_message("error", f"Failed to send CAN message - {str(e)}")
                self.status_label.config(text=f"Error: Failed to send CAN message - {str(e)}")

            self.send_timers[index] = threading.Timer(period, self.send_messages, args=(can_id, data, period, index))
            self.send_timers[index].start()

    def receive_messages(self):
        while self.is_sending:
            try:
                message = self.bus.recv()
                if message:
                    self.log_message("recv", message)
            except Exception as e:
                self.log_message("error", f"Failed to receive CAN message - {str(e)}")

    def stop_sending(self):
        if not self.is_sending:
            self.status_label.config(text="Not currently sending.")
            return

        self.is_sending = False
        for timer in self.send_timers:
            if timer:
                timer.cancel()
        if self.bus:
            self.bus.shutdown()

        self.status_label.config(text="Stopped sending.")

    def log_message(self, msg_type, message):
        if msg_type == "send":
            log_entry = f"Send : ID={hex(message.arbitration_id)} Data={','.join(f'0x{byte:02X}' for byte in message.data)}\n"
        elif msg_type == "recv":
            log_entry = f"Recv : ID={hex(message.arbitration_id)} Data={','.join(f'0x{byte:02X}' for byte in message.data)}\n"
        elif msg_type == "error":
            log_entry = f"Error : {message}\n"
        else:
            log_entry = f"{message}\n"
        self.log_text.insert(tk.END, log_entry)
        self.log_text.see(tk.END)

def main():
    root = tk.Tk()
    app = CANTransmitterApp(root)
    root.mainloop()

if __name__ == "__main__":
    main()

1 COMMENT

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です