本サイトはアフィリエイト広告を利用しています

ラズパイ4をECUに見立てて車両診断コマンド(UDSコマンド)を送ってみた

ラズパイとCAN通信

背景

前回はラズパイを使ってCANコマンドの定期送信、USB2CANからの定期受信をするツールを使って遊びました。

今回は定期送信という概念ではなく、車両診断コマンド、通称UDSコマンドをラズパイに送ってみます。

UDSとは、Unified Diagnostics Servicesの略で、国際的な規格で定められた通信プロトコルです。

USB2CANをPCに繋いでラズパイへCANコマンドを送るのは簡単ですが、ラズパイから何かしら応答を返信させるのが今回の挑戦の肝でございます。

前回の記事はこちらです。

UDSコマンド(SID=0x22)に対して簡単な応答をするプログラム

まずはお試しです。

CANID=0x740のCANコマンドを受信した場合、CANID=0x748のCANコマンドをPCへ送信するプログラムです。

送信データの内容は適当なのですが、取りあえず疎通はできました。

import tkinter as tk
from tkinter import ttk
import can
import threading
import time

class CANTransmitterApp:
    def __init__(self, master):
        self.master = master
        self.master.title("CAN Transmitter")

        # Interface Input
        self.interface_label = ttk.Label(master, text="Interface:")
        self.interface_label.grid(row=0, column=0, padx=10, pady=10, sticky=tk.W)
        self.interface_entry = ttk.Entry(master)
        self.interface_entry.grid(row=0, column=1, padx=10, pady=10)
        self.interface_entry.insert(tk.END, "can0")

        # CAN ID, CAN Data, and Period Inputs
        self.can_id_labels = [ttk.Label(master, text=f"CAN ID {i+1}:") for i in range(3)]
        self.data_labels = [ttk.Label(master, text=f"CAN Data {i+1} (comma-separated hex values):") for i in range(3)]
        self.period_labels = [ttk.Label(master, text=f"Period {i+1} (seconds):") for i in range(3)]
        self.can_id_entries = [ttk.Entry(master) for i in range(3)]
        self.data_entries = [ttk.Entry(master) for i in range(3)]
        self.period_entries = [ttk.Entry(master) for i in range(3)]

        for i in range(3):
            # CAN ID
            self.can_id_labels[i].grid(row=1+i, column=0, padx=10, pady=10, sticky=tk.W)
            self.can_id_entries[i].grid(row=1+i, column=1, padx=10, pady=10)
            self.can_id_entries[i].insert(tk.END, f"{123 + i:03X}")

            # CAN Data
            self.data_labels[i].grid(row=1+i, column=2, padx=10, pady=10, sticky=tk.W)
            self.data_entries[i].grid(row=1+i, column=3, padx=10, pady=10)
            self.data_entries[i].insert(tk.END, "11,22,33,44,55,66,77,88")

            # Period
            self.period_labels[i].grid(row=1+i, column=4, padx=10, pady=10, sticky=tk.W)
            self.period_entries[i].grid(row=1+i, column=5, padx=10, pady=10)
            self.period_entries[i].insert(tk.END, "1.0")

        # Control Buttons
        self.start_button = ttk.Button(master, text="Start Sending", command=self.start_sending)
        self.start_button.grid(row=4, column=0, columnspan=3, padx=10, pady=10)
        self.stop_button = ttk.Button(master, text="Stop Sending", command=self.stop_sending)
        self.stop_button.grid(row=4, column=3, columnspan=3, padx=10, pady=10)

        # Status Label
        self.status_label = ttk.Label(master, text="")
        self.status_label.grid(row=5, columnspan=6, padx=10, pady=10)

        # Log Text
        self.log_text = tk.Text(master, height=10, width=80)
        self.log_text.grid(row=6, columnspan=6, padx=10, pady=10)
        
        self.is_sending = False
        self.bus = None
        self.send_timers = [None, None, None]

    def start_sending(self):
        if self.is_sending:
            self.status_label.config(text="Already sending.")
            return

        interface = self.interface_entry.get().strip()
        can_ids = [int(entry.get().strip(), 16) for entry in self.can_id_entries]
        data_strs = [entry.get().strip() for entry in self.data_entries]
        periods = [float(entry.get().strip()) for entry in self.period_entries]

        try:
            data_list = []
            for data_str in data_strs:
                data = [int(byte, 16) for byte in data_str.split(',')]
                if len(data) != 8:
                    raise ValueError("Data length must be 8 bytes.")
                data_list.append(data)
        except ValueError:
            self.status_label.config(text="Invalid CAN data format.")
            return

        try:
            self.bus = can.interface.Bus(channel=interface, bustype='socketcan', bitrate=500000)
            self.log_message("info", f"CAN interface {interface} connected.")
        except Exception as e:
            self.status_label.config(text=f"Error: {str(e)}")
            return

        self.is_sending = True
        self.receive_thread = threading.Thread(target=self.receive_messages, daemon=True)
        self.receive_thread.start()
        for i in range(3):
            self.send_messages(can_ids[i], data_list[i], periods[i], i)

    def send_messages(self, can_id, data, period, index):
        message = can.Message(arbitration_id=can_id, data=data)

        if self.is_sending:
            try:
                self.bus.send(message)
                self.log_message("send", message)
            except can.CanError as e:
                self.log_message("error", f"Failed to send CAN message - {str(e)}")
                self.status_label.config(text=f"Error: Failed to send CAN message - {str(e)}")

            self.send_timers[index] = threading.Timer(period, self.send_messages, args=(can_id, data, period, index))
            self.send_timers[index].start()

    def receive_messages(self):
        while self.is_sending:
            try:
                message = self.bus.recv()
                if message:
                    self.log_message("recv", message)
                    # Check for UDS command with CANID 0x740
                    if message.arbitration_id == 0x740:
                        response_data = self.create_ud_response(message.data)
                        response_message = can.Message(arbitration_id=0x748, data=response_data)
                        self.bus.send(response_message)
                        self.log_message("send", response_message)
            except Exception as e:
                self.log_message("error", f"Failed to receive CAN message - {str(e)}")

    def create_ud_response(self, request_data):
        # ISO 14229 compliant response example
        request_data_list = list(request_data)
        print("Received request data:", ", ".join(f"0x{byte:02X}" for byte in request_data))
        if request_data_list[1] == 0x22:  # Example of a specific UDS request
            return [0x62] + request_data_list[1:]  # Example response with 0x72 as the positive response code
        return [0x7F, request_data_list[0], 0x31]  # Negative response: request out of range

    def stop_sending(self):
        if not self.is_sending:
            self.status_label.config(text="Not currently sending.")
            return

        self.is_sending = False
        for timer in self.send_timers:
            if timer:
                timer.cancel()
        if self.bus:
            self.bus.shutdown()

        self.status_label.config(text="Stopped sending.")

    def log_message(self, msg_type, message):
        if msg_type == "send":
            log_entry = f"Send : ID={hex(message.arbitration_id)} Data={','.join(f'0x{byte:02X}' for byte in message.data)}\n"
        elif msg_type == "recv":
            log_entry = f"Recv : ID={hex(message.arbitration_id)} Data={','.join(f'0x{byte:02X}' for byte in message.data)}\n"
        elif msg_type == "error":
            log_entry = f"Error : {message}\n"
        else:
            log_entry = f"{message}\n"
        self.log_text.insert(tk.END, log_entry)
        self.log_text.see(tk.END)

def main():
    root = tk.Tk()
    app = CANTransmitterApp(root)
    root.mainloop()

if __name__ == "__main__":
    main()

実行している様子です。

UDSコマンドを送信している様子

SID=0x22に対しDID(Data Identifier)を設定してみる

SID=0x22はRead data by Identifierと呼ばれているUDSコマンドの一つで、ECUから情報を読み出す際に利用されます。

SIDとはService IDの略で、IDによって実行できる機能が異なります。

Data Identiferとはデータの種別を識別するアドレスのようなもので、SID=0x22を利用する際に指定する必要があるパラメータです。

試しに、DID=0x1010を指定して要求した場合、そのDIDに紐づくデータを応答する仕組みを実装してみます。

どのようなデータを紐づけるかはメーカーで定義できるようになっておりますので、ここでは適当な値を応答させます。

まずはお試しで0xFFを固定応答させてみます。

ついでにフォーマットも間違っているので修正しました。

import tkinter as tk
from tkinter import ttk
import can
import threading
import time

class CANTransmitterApp:
    def __init__(self, master):
        self.master = master
        self.master.title("CAN Transmitter")

        # Interface Input
        self.interface_label = ttk.Label(master, text="Interface:")
        self.interface_label.grid(row=0, column=0, padx=10, pady=10, sticky=tk.W)
        self.interface_entry = ttk.Entry(master)
        self.interface_entry.grid(row=0, column=1, padx=10, pady=10)
        self.interface_entry.insert(tk.END, "can0")

        # CAN ID, CAN Data, and Period Inputs
        self.can_id_labels = [ttk.Label(master, text=f"CAN ID {i+1}:") for i in range(3)]
        self.data_labels = [ttk.Label(master, text=f"CAN Data {i+1} (comma-separated hex values):") for i in range(3)]
        self.period_labels = [ttk.Label(master, text=f"Period {i+1} (seconds):") for i in range(3)]
        self.can_id_entries = [ttk.Entry(master) for i in range(3)]
        self.data_entries = [ttk.Entry(master) for i in range(3)]
        self.period_entries = [ttk.Entry(master) for i in range(3)]

        for i in range(3):
            # CAN ID
            self.can_id_labels[i].grid(row=1+i, column=0, padx=10, pady=10, sticky=tk.W)
            self.can_id_entries[i].grid(row=1+i, column=1, padx=10, pady=10)
            self.can_id_entries[i].insert(tk.END, f"{123 + i:03X}")

            # CAN Data
            self.data_labels[i].grid(row=1+i, column=2, padx=10, pady=10, sticky=tk.W)
            self.data_entries[i].grid(row=1+i, column=3, padx=10, pady=10)
            self.data_entries[i].insert(tk.END, "11,22,33,44,55,66,77,88")

            # Period
            self.period_labels[i].grid(row=1+i, column=4, padx=10, pady=10, sticky=tk.W)
            self.period_entries[i].grid(row=1+i, column=5, padx=10, pady=10)
            self.period_entries[i].insert(tk.END, "1.0")

        # Control Buttons
        self.start_button = ttk.Button(master, text="Start Sending", command=self.start_sending)
        self.start_button.grid(row=4, column=0, columnspan=3, padx=10, pady=10)
        self.stop_button = ttk.Button(master, text="Stop Sending", command=self.stop_sending)
        self.stop_button.grid(row=4, column=3, columnspan=3, padx=10, pady=10)

        # Status Label
        self.status_label = ttk.Label(master, text="")
        self.status_label.grid(row=5, columnspan=6, padx=10, pady=10)

        # Log Text
        self.log_text = tk.Text(master, height=10, width=80)
        self.log_text.grid(row=6, columnspan=6, padx=10, pady=10)
        
        self.is_sending = False
        self.bus = None
        self.send_timers = [None, None, None]

    def start_sending(self):
        if self.is_sending:
            self.status_label.config(text="Already sending.")
            return

        interface = self.interface_entry.get().strip()
        can_ids = [int(entry.get().strip(), 16) for entry in self.can_id_entries]
        data_strs = [entry.get().strip() for entry in self.data_entries]
        periods = [float(entry.get().strip()) for entry in self.period_entries]

        try:
            data_list = []
            for data_str in data_strs:
                data = [int(byte, 16) for byte in data_str.split(',')]
                if len(data) != 8:
                    raise ValueError("Data length must be 8 bytes.")
                data_list.append(data)
        except ValueError:
            self.status_label.config(text="Invalid CAN data format.")
            return

        try:
            self.bus = can.interface.Bus(channel=interface, bustype='socketcan', bitrate=500000)
            self.log_message("info", f"CAN interface {interface} connected.")
        except Exception as e:
            self.status_label.config(text=f"Error: {str(e)}")
            return

        self.is_sending = True
        self.receive_thread = threading.Thread(target=self.receive_messages, daemon=True)
        self.receive_thread.start()
        for i in range(3):
            self.send_messages(can_ids[i], data_list[i], periods[i], i)

    def send_messages(self, can_id, data, period, index):
        message = can.Message(arbitration_id=can_id, data=data)

        if self.is_sending:
            try:
                self.bus.send(message)
                self.log_message("send", message)
            except can.CanError as e:
                self.log_message("error", f"Failed to send CAN message - {str(e)}")
                self.status_label.config(text=f"Error: Failed to send CAN message - {str(e)}")

            self.send_timers[index] = threading.Timer(period, self.send_messages, args=(can_id, data, period, index))
            self.send_timers[index].start()

    def receive_messages(self):
        while self.is_sending:
            try:
                message = self.bus.recv()
                if message:
                    self.log_message("recv", message)
                    # Check for UDS command with CANID 0x740
                    if message.arbitration_id == 0x740:
                        response_data = self.create_ud_response(message.data)
                        response_message = can.Message(arbitration_id=0x748, data=response_data)
                        self.bus.send(response_message)
                        self.log_message("send", response_message)
            except Exception as e:
                self.log_message("error", f"Failed to receive CAN message - {str(e)}")

    def create_ud_response(self, request_data):
        # ISO 14229 compliant response example
        request_data_list = list(request_data)
        print("Received request data:", ", ".join(f"0x{byte:02X}" for byte in request_data))
        if request_data_list[1] == 0x22:
            if request_data_list[2] == 0x10 and request_data_list[3] == 0x10:
                return [0x04] + [0x62] + request_data_list[2:4] + [0xFF] + [0x00] * 3 
        return [0x7F, request_data_list[0], 0x31]

    def stop_sending(self):
        if not self.is_sending:
            self.status_label.config(text="Not currently sending.")
            return

        self.is_sending = False
        for timer in self.send_timers:
            if timer:
                timer.cancel()
        if self.bus:
            self.bus.shutdown()

        self.status_label.config(text="Stopped sending.")

    def log_message(self, msg_type, message):
        if msg_type == "send":
            log_entry = f"Send : ID={hex(message.arbitration_id)} Data={','.join(f'0x{byte:02X}' for byte in message.data)}\n"
        elif msg_type == "recv":
            log_entry = f"Recv : ID={hex(message.arbitration_id)} Data={','.join(f'0x{byte:02X}' for byte in message.data)}\n"
        elif msg_type == "error":
            log_entry = f"Error : {message}\n"
        else:
            log_entry = f"{message}\n"
        self.log_text.insert(tk.END, log_entry)
        self.log_text.see(tk.END)

def main():
    root = tk.Tk()
    app = CANTransmitterApp(root)
    root.mainloop()

if __name__ == "__main__":
    main()

UDSのプロトコルに準拠しつつ0xFFを固定応答している様子です。

SID=0x22に対して応答

SID=0x2E(Write data by Identifier)でデータを書き換えてみる

固定値を読み出すだけでは面白くないので、読み出す値をSID=0x2Eで書き換えるプログラムに改造してみます。

SID=0x2EはSID=0x22の対をなすコマンドでして、DIDを指定することで、そのDID内のデータを書き換えることができるコマンドです。

DIDをキーにした辞書を用意して実装してみます。

import tkinter as tk
from tkinter import ttk
import can
import threading
import time

class CANTransmitterApp:
    def __init__(self, master):
        self.master = master
        self.master.title("CAN Transmitter")

        self.memory = {}

        # Interface Input
        self.interface_label = ttk.Label(master, text="Interface:")
        self.interface_label.grid(row=0, column=0, padx=10, pady=10, sticky=tk.W)
        self.interface_entry = ttk.Entry(master)
        self.interface_entry.grid(row=0, column=1, padx=10, pady=10)
        self.interface_entry.insert(tk.END, "can0")

        # CAN ID, CAN Data, and Period Inputs
        self.can_id_labels = [ttk.Label(master, text=f"CAN ID {i+1}:") for i in range(3)]
        self.data_labels = [ttk.Label(master, text=f"CAN Data {i+1} (comma-separated hex values):") for i in range(3)]
        self.period_labels = [ttk.Label(master, text=f"Period {i+1} (seconds):") for i in range(3)]
        self.can_id_entries = [ttk.Entry(master) for i in range(3)]
        self.data_entries = [ttk.Entry(master) for i in range(3)]
        self.period_entries = [ttk.Entry(master) for i in range(3)]

        for i in range(3):
            # CAN ID
            self.can_id_labels[i].grid(row=1+i, column=0, padx=10, pady=10, sticky=tk.W)
            self.can_id_entries[i].grid(row=1+i, column=1, padx=10, pady=10)
            self.can_id_entries[i].insert(tk.END, f"{123 + i:03X}")

            # CAN Data
            self.data_labels[i].grid(row=1+i, column=2, padx=10, pady=10, sticky=tk.W)
            self.data_entries[i].grid(row=1+i, column=3, padx=10, pady=10)
            self.data_entries[i].insert(tk.END, "11,22,33,44,55,66,77,88")

            # Period
            self.period_labels[i].grid(row=1+i, column=4, padx=10, pady=10, sticky=tk.W)
            self.period_entries[i].grid(row=1+i, column=5, padx=10, pady=10)
            self.period_entries[i].insert(tk.END, "1.0")

        # Control Buttons
        self.start_button = ttk.Button(master, text="Start Sending", command=self.start_sending)
        self.start_button.grid(row=4, column=0, columnspan=3, padx=10, pady=10)
        self.stop_button = ttk.Button(master, text="Stop Sending", command=self.stop_sending)
        self.stop_button.grid(row=4, column=3, columnspan=3, padx=10, pady=10)

        # Status Label
        self.status_label = ttk.Label(master, text="")
        self.status_label.grid(row=5, columnspan=6, padx=10, pady=10)

        # Log Text
        self.log_text = tk.Text(master, height=10, width=80)
        self.log_text.grid(row=6, columnspan=6, padx=10, pady=10)
        
        self.is_sending = False
        self.bus = None
        self.send_timers = [None, None, None]

    def start_sending(self):
        if self.is_sending:
            self.status_label.config(text="Already sending.")
            return

        interface = self.interface_entry.get().strip()
        can_ids = [int(entry.get().strip(), 16) for entry in self.can_id_entries]
        data_strs = [entry.get().strip() for entry in self.data_entries]
        periods = [float(entry.get().strip()) for entry in self.period_entries]

        try:
            data_list = []
            for data_str in data_strs:
                data = [int(byte, 16) for byte in data_str.split(',')]
                if len(data) != 8:
                    raise ValueError("Data length must be 8 bytes.")
                data_list.append(data)
        except ValueError:
            self.status_label.config(text="Invalid CAN data format.")
            return

        try:
            self.bus = can.interface.Bus(channel=interface, bustype='socketcan', bitrate=500000)
            self.log_message("info", f"CAN interface {interface} connected.")
        except Exception as e:
            self.status_label.config(text=f"Error: {str(e)}")
            return

        self.is_sending = True
        self.receive_thread = threading.Thread(target=self.receive_messages, daemon=True)
        self.receive_thread.start()
        for i in range(3):
            self.send_messages(can_ids[i], data_list[i], periods[i], i)

    def send_messages(self, can_id, data, period, index):
        message = can.Message(arbitration_id=can_id, data=data)

        if self.is_sending:
            try:
                self.bus.send(message)
                self.log_message("send", message)
            except can.CanError as e:
                self.log_message("error", f"Failed to send CAN message - {str(e)}")
                self.status_label.config(text=f"Error: Failed to send CAN message - {str(e)}")

            self.send_timers[index] = threading.Timer(period, self.send_messages, args=(can_id, data, period, index))
            self.send_timers[index].start()

    def receive_messages(self):
        while self.is_sending:
            try:
                message = self.bus.recv()
                if message:
                    self.log_message("recv", message)
                    # Check for UDS command with CANID 0x740
                    if message.arbitration_id == 0x740:
                        response_data = self.create_ud_response(message.data)
                        response_message = can.Message(arbitration_id=0x748, data=response_data)
                        self.bus.send(response_message)
                        self.log_message("send", response_message)
            except Exception as e:
                self.log_message("error", f"Failed to receive CAN message - {str(e)}")

    def create_ud_response(self, request_data):
        # ISO 14229 compliant response example
        request_data_list = list(request_data)
        print("Received request data:", ", ".join(f"0x{byte:02X}" for byte in request_data_list))

        # SID=0x22
        if request_data_list[1] == 0x22:
            # DID=0x10,0x10
            if request_data_list[2] == 0x10 and request_data_list[3] == 0x10:
                did = (request_data_list[2], request_data_list[3]) 
                if did in self.memory:
                    # value is valid
                    response_value = self.memory[did]
                else:
                    # value is not valid
                    response_value = 0xFF
                
                # responce data
                return [0x04] + [0x62] + [0x10, 0x10, response_value] + [0x00] * 3

        # SID=0x2E
        elif request_data_list[1] == 0x2E:
            # DID=0x10,0x10
            if request_data_list[2] == 0x10 and request_data_list[3] == 0x10:
                # write memory
                value_to_write = request_data_list[4] if len(request_data_list) > 4 else 0x00
                
                did = (0x10, 0x10) 
                self.memory[did] = value_to_write
                
                # responce data
                return [0x03] + [0x6E] + [0x10, 0x10] + [0x00] * 4

        
        return [0x7F, request_data_list[0], 0x31]  # Negative response


    def stop_sending(self):
        if not self.is_sending:
            self.status_label.config(text="Not currently sending.")
            return

        self.is_sending = False
        for timer in self.send_timers:
            if timer:
                timer.cancel()
        if self.bus:
            self.bus.shutdown()

        self.status_label.config(text="Stopped sending.")

    def log_message(self, msg_type, message):
        if msg_type == "send":
            log_entry = f"Send : ID={hex(message.arbitration_id)} Data={','.join(f'0x{byte:02X}' for byte in message.data)}\n"
        elif msg_type == "recv":
            log_entry = f"Recv : ID={hex(message.arbitration_id)} Data={','.join(f'0x{byte:02X}' for byte in message.data)}\n"
        elif msg_type == "error":
            log_entry = f"Error : {message}\n"
        else:
            log_entry = f"{message}\n"
        self.log_text.insert(tk.END, log_entry)
        self.log_text.see(tk.END)

def main():
    root = tk.Tk()
    app = CANTransmitterApp(root)
    root.mainloop()

if __name__ == "__main__":
    main()

まずはSID=0x22で読み出しします。結果は0xFF(初期値)です。

SID=0x22応答

そしてSID=0x2EでDID=0x1010の値を0xFF⇒0x99に書き換えます。

SID=0x2E応答

そして、再度SID=0x22でDID=0x1010の値を読みだしてみると・・・

SID=0x22応答

0x99に書き換わっています!

次はもっと複雑なSIDに挑戦してみます。

2 COMMENTS

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です