本サイトはアフィリエイト広告を利用しています

Raspberry PI 4 Model B(ラズパイ)で計測した温度をMQTT送信(python)

背景

前回、ラズパイでMQTTサブスクとプッシュができるようになった。今回は、温度と湿度をラズパイで計測し、その内容を自動でプッシュするpythonプログラムを実装する。前回の記事はこちら。

ソースコード(メインプログラム)

メインプログラムは以下。

import tkinter as tk
from tkinter import scrolledtext, StringVar, OptionMenu
import paho.mqtt.client as mqtt
import subprocess
import time
import socket
import threading
import os
import signal
import datetime

humidity = None
temperature = None
client_socket = None
dht_process = None

# MQTT settings
BROKER = 'localhost'
PORT = 1883

# Initialize the text area and other UI components first
root = tk.Tk()
root.title("MQTT GUI")

root.geometry("600x400")

frame = tk.Frame(root)
frame.pack(padx=10, pady=10)

topic_var = StringVar(root)
topic_var.set("ondo/topic")  # Set default value

# Dropdown for selecting the topic
topic_label = tk.Label(frame, text="Topic:")
topic_label.grid(row=0, column=0)
topic_dropdown = OptionMenu(frame, topic_var, *["ondo/topic", "shitsudo/topic"])
topic_dropdown.grid(row=0, column=1)

message_label = tk.Label(frame, text="Message:")
message_label.grid(row=1, column=0)
message_entry = tk.Entry(frame, width=40)
message_entry.grid(row=1, column=1)

send_button = tk.Button(frame, text="Send", command=lambda: send_message())
send_button.grid(row=1, column=2)

# Initialize text area
text_area = scrolledtext.ScrolledText(root, width=70, height=20, state=tk.DISABLED)
text_area.pack(padx=50, pady=50)

# Function to display received messages
def on_message(client, userdata, message):
    text_area.config(state=tk.NORMAL)
    text_area.insert(tk.END, f"Received: {message.topic}: {message.payload.decode()}\n")
    text_area.config(state=tk.DISABLED)
    text_area.see(tk.END)

# Initialize MQTT client
client = mqtt.Client(protocol=mqtt.MQTTv311)  # Use the latest MQTT version
client.on_message = on_message

# Function to start Mosquitto
def start_mosquitto():
    # Start Mosquitto in the background
    subprocess.Popen(['sudo', 'systemctl', 'start', 'mosquitto'])
    # Wait a moment
    time.sleep(2)

# Start Mosquitto
start_mosquitto()

# Connect to MQTT broker and subscribe to topics
try:
    client.connect(BROKER, PORT, 60)
    
    # Topics to subscribe to
    topics = ["ondo/topic", "shitsudo/topic"]
    
    for topic in topics:
        client.subscribe(topic)
        text_area.config(state=tk.NORMAL)
        text_area.insert(tk.END, f"Subscribed to: {topic}\n")
        text_area.config(state=tk.DISABLED)
    
    client.loop_start()
except Exception as e:
    print(f"MQTT connection error: {e}")

# Function to send messages
def send_message():
    topic = topic_var.get()
    message = message_entry.get()
    print(f"Sending: Topic='{topic}', Message='{message}'")  # Debug output
    client.publish(topic, message)
    message_entry.delete(0, tk.END)

# Function to start dht.py in the background
def start_dht():
    global dht_process
    # Check if dht_process is already running
    if dht_process is None or dht_process.poll() is not None:  # Process is not running
        dht_process = subprocess.Popen(['python', 'dht.py'])
        print("Started dht.py process.")

# Start dht.py in the background
start_dht()

def read_sensor_data():
    global humidity, temperature, client_socket
    client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    time.sleep(3)

    try:
        client_socket.connect(('localhost', 12346))
        while True:
                data = client_socket.recv(1024).decode('utf-8')
                humidity, temperature = map(float, data.split(","))
    except Exception as e:
        print(f"Error: {e}")
    finally:
        if client_socket:
            client_socket.close()
            print("socket closed")

def auto_pub():
    if humidity is not None and temperature is not None:
        current_time = datetime.datetime.now().strftime('%H:%M:%S')
        message_humidity = f"time:{current_time} shitsudo:{humidity}"
        message_temperature = f"time:{current_time}, ondo:{temperature}"
        client.publish("shitsudo/topic", message_humidity)
        client.publish("ondo/topic", message_temperature)
    root.after(1000, auto_pub)

def on_closing():
    global client_socket, dht_process
    if client_socket:
        client_socket.close()
        print("socket closed")
    if dht_process:
        os.kill(dht_process.pid, signal.SIGTERM)
        print("dht killed")
    client.loop_stop()
    root.destroy()
    time.sleep(2)

thread = threading.Thread(target=read_sensor_data, daemon=True)
thread.start()

auto_pub()

root.protocol("WM_DELETE_WINDOW", on_closing)
root.mainloop()

ソースコード(サブプログラム)

バックグラウンドで実行しているプログラムは以下。

import RPi.GPIO as GPIO
import time
import socket

dhtPin = 17

GPIO.setmode(GPIO.BCM)

MAX_UNCHANGE_COUNT = 100

STATE_INIT_PULL_DOWN = 1
STATE_INIT_PULL_UP = 2
STATE_DATA_FIRST_PULL_DOWN = 3
STATE_DATA_PULL_UP = 4
STATE_DATA_PULL_DOWN = 5

def readDht11():
    GPIO.setup(dhtPin, GPIO.OUT)
    GPIO.output(dhtPin, GPIO.HIGH)
    time.sleep(0.05)
    GPIO.output(dhtPin, GPIO.LOW)
    time.sleep(0.02)
    GPIO.setup(dhtPin, GPIO.IN, GPIO.PUD_UP)

    unchanged_count = 0
    last = -1
    data = []
    while True:
        current = GPIO.input(dhtPin)
        data.append(current)
        if last != current:
            unchanged_count = 0
            last = current
        else:
            unchanged_count += 1
            if unchanged_count > MAX_UNCHANGE_COUNT:
                break

    state = STATE_INIT_PULL_DOWN

    lengths = []
    current_length = 0

    for current in data:
        current_length += 1

        if state == STATE_INIT_PULL_DOWN:
            if current == GPIO.LOW:
                state = STATE_INIT_PULL_UP
            else:
                continue
        if state == STATE_INIT_PULL_UP:
            if current == GPIO.HIGH:
                state = STATE_DATA_FIRST_PULL_DOWN
            else:
                continue
        if state == STATE_DATA_FIRST_PULL_DOWN:
            if current == GPIO.LOW:
                state = STATE_DATA_PULL_UP
            else:
                continue
        if state == STATE_DATA_PULL_UP:
            if current == GPIO.HIGH:
                current_length = 0
                state = STATE_DATA_PULL_DOWN
            else:
                continue
        if state == STATE_DATA_PULL_DOWN:
            if current == GPIO.LOW:
                lengths.append(current_length)
                state = STATE_DATA_PULL_UP
            else:
                continue
    if len(lengths) != 40:
        #print ("Data not good, skip")
        return False

    shortest_pull_up = min(lengths)
    longest_pull_up = max(lengths)
    halfway = (longest_pull_up + shortest_pull_up) / 2
    bits = []
    the_bytes = []
    byte = 0

    for length in lengths:
        bit = 0
        if length > halfway:
            bit = 1
        bits.append(bit)
    #print ("bits: %s, length: %d" % (bits, len(bits)))
    for i in range(0, len(bits)):
        byte = byte << 1
        if (bits[i]):
            byte = byte | 1
        else:
            byte = byte | 0
        if ((i + 1) % 8 == 0):
            the_bytes.append(byte)
            byte = 0
    #print (the_bytes)
    checksum = (the_bytes[0] + the_bytes[1] + the_bytes[2] + the_bytes[3]) & 0xFF
    if the_bytes[4] != checksum:
        #print ("Data not good, skip")
        return False

    return the_bytes[0], the_bytes[2]

def create_server_socket():
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

    try:
        server_socket.bind(('localhost', 12346))  # localhost port12346
        server_socket.listen(1)
        print("Server is listening on port 12346...")
        return server_socket
    except OSError as e:
        print(f"Failed to bind socket: {e}")
        server_socket.close()
        return None

def main():
    server_socket = create_server_socket()
    
    if server_socket is None:
        print("Retrying to create server socket...")
        time.sleep(1)
        main()
        return

    try:
        conn, addr = server_socket.accept()
        print("Connected by", addr)

        while True:
            result = readDht11()
            if result:
                humidity, temperature = result
                #print("humidity: %s %%,  Temperature: %s C" % (humidity, temperature))
                conn.sendall(f"{humidity},{temperature}".encode('utf-8'))
            time.sleep(1)
            
    except KeyboardInterrupt:
        print("Program interrupted.")
    except Exception as e:
        print(f"Error: {e}")
    finally:
        conn.close()
        server_socket.close()
        GPIO.cleanup()  # GPIO cleanup here

if __name__ == '__main__':
    main()

実行している様子

動いた。。。

プログラム概要

メイン側ではMQTTサブスクとGUI表示を担っている。そして、温度と湿度をサブプログラムからソケット接続で取得するようにした。サブプログラムは温度と湿度を計測し、ソケット接続を待機する。

苦労した点

GUIを終了した際、適切にソケットを初期化できず、2回目以降動作しなかった。メインプログラムで色々エラー処理を入れたがダメで、結局サブプログラム側で対処を入れたところ上手く動作するようになった。サブプログラム側の対処は2点。

①server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)を設定することで、ソケットが再起動する際にアドレスの再利用を許可する。これにより、すぐに再バインドを行うことが可能になる。

②バインドに失敗した場合、1秒待機してから再度main()を呼び出して、ソケットの再作成を試みる(無限ループなのであまり良くない)。

1 COMMENT

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です