背景
前回、ラズパイでMQTTサブスクとプッシュができるようになった。今回は、温度と湿度をラズパイで計測し、その内容を自動でプッシュするpythonプログラムを実装する。前回の記事はこちら。
ソースコード(メインプログラム)
メインプログラムは以下。
import tkinter as tk
from tkinter import scrolledtext, StringVar, OptionMenu
import paho.mqtt.client as mqtt
import subprocess
import time
import socket
import threading
import os
import signal
import datetime
humidity = None
temperature = None
client_socket = None
dht_process = None
# MQTT settings
BROKER = 'localhost'
PORT = 1883
# Initialize the text area and other UI components first
root = tk.Tk()
root.title("MQTT GUI")
root.geometry("600x400")
frame = tk.Frame(root)
frame.pack(padx=10, pady=10)
topic_var = StringVar(root)
topic_var.set("ondo/topic") # Set default value
# Dropdown for selecting the topic
topic_label = tk.Label(frame, text="Topic:")
topic_label.grid(row=0, column=0)
topic_dropdown = OptionMenu(frame, topic_var, *["ondo/topic", "shitsudo/topic"])
topic_dropdown.grid(row=0, column=1)
message_label = tk.Label(frame, text="Message:")
message_label.grid(row=1, column=0)
message_entry = tk.Entry(frame, width=40)
message_entry.grid(row=1, column=1)
send_button = tk.Button(frame, text="Send", command=lambda: send_message())
send_button.grid(row=1, column=2)
# Initialize text area
text_area = scrolledtext.ScrolledText(root, width=70, height=20, state=tk.DISABLED)
text_area.pack(padx=50, pady=50)
# Function to display received messages
def on_message(client, userdata, message):
text_area.config(state=tk.NORMAL)
text_area.insert(tk.END, f"Received: {message.topic}: {message.payload.decode()}\n")
text_area.config(state=tk.DISABLED)
text_area.see(tk.END)
# Initialize MQTT client
client = mqtt.Client(protocol=mqtt.MQTTv311) # Use the latest MQTT version
client.on_message = on_message
# Function to start Mosquitto
def start_mosquitto():
# Start Mosquitto in the background
subprocess.Popen(['sudo', 'systemctl', 'start', 'mosquitto'])
# Wait a moment
time.sleep(2)
# Start Mosquitto
start_mosquitto()
# Connect to MQTT broker and subscribe to topics
try:
client.connect(BROKER, PORT, 60)
# Topics to subscribe to
topics = ["ondo/topic", "shitsudo/topic"]
for topic in topics:
client.subscribe(topic)
text_area.config(state=tk.NORMAL)
text_area.insert(tk.END, f"Subscribed to: {topic}\n")
text_area.config(state=tk.DISABLED)
client.loop_start()
except Exception as e:
print(f"MQTT connection error: {e}")
# Function to send messages
def send_message():
topic = topic_var.get()
message = message_entry.get()
print(f"Sending: Topic='{topic}', Message='{message}'") # Debug output
client.publish(topic, message)
message_entry.delete(0, tk.END)
# Function to start dht.py in the background
def start_dht():
global dht_process
# Check if dht_process is already running
if dht_process is None or dht_process.poll() is not None: # Process is not running
dht_process = subprocess.Popen(['python', 'dht.py'])
print("Started dht.py process.")
# Start dht.py in the background
start_dht()
def read_sensor_data():
global humidity, temperature, client_socket
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
time.sleep(3)
try:
client_socket.connect(('localhost', 12346))
while True:
data = client_socket.recv(1024).decode('utf-8')
humidity, temperature = map(float, data.split(","))
except Exception as e:
print(f"Error: {e}")
finally:
if client_socket:
client_socket.close()
print("socket closed")
def auto_pub():
if humidity is not None and temperature is not None:
current_time = datetime.datetime.now().strftime('%H:%M:%S')
message_humidity = f"time:{current_time} shitsudo:{humidity}"
message_temperature = f"time:{current_time}, ondo:{temperature}"
client.publish("shitsudo/topic", message_humidity)
client.publish("ondo/topic", message_temperature)
root.after(1000, auto_pub)
def on_closing():
global client_socket, dht_process
if client_socket:
client_socket.close()
print("socket closed")
if dht_process:
os.kill(dht_process.pid, signal.SIGTERM)
print("dht killed")
client.loop_stop()
root.destroy()
time.sleep(2)
thread = threading.Thread(target=read_sensor_data, daemon=True)
thread.start()
auto_pub()
root.protocol("WM_DELETE_WINDOW", on_closing)
root.mainloop()
ソースコード(サブプログラム)
バックグラウンドで実行しているプログラムは以下。
import RPi.GPIO as GPIO
import time
import socket
dhtPin = 17
GPIO.setmode(GPIO.BCM)
MAX_UNCHANGE_COUNT = 100
STATE_INIT_PULL_DOWN = 1
STATE_INIT_PULL_UP = 2
STATE_DATA_FIRST_PULL_DOWN = 3
STATE_DATA_PULL_UP = 4
STATE_DATA_PULL_DOWN = 5
def readDht11():
GPIO.setup(dhtPin, GPIO.OUT)
GPIO.output(dhtPin, GPIO.HIGH)
time.sleep(0.05)
GPIO.output(dhtPin, GPIO.LOW)
time.sleep(0.02)
GPIO.setup(dhtPin, GPIO.IN, GPIO.PUD_UP)
unchanged_count = 0
last = -1
data = []
while True:
current = GPIO.input(dhtPin)
data.append(current)
if last != current:
unchanged_count = 0
last = current
else:
unchanged_count += 1
if unchanged_count > MAX_UNCHANGE_COUNT:
break
state = STATE_INIT_PULL_DOWN
lengths = []
current_length = 0
for current in data:
current_length += 1
if state == STATE_INIT_PULL_DOWN:
if current == GPIO.LOW:
state = STATE_INIT_PULL_UP
else:
continue
if state == STATE_INIT_PULL_UP:
if current == GPIO.HIGH:
state = STATE_DATA_FIRST_PULL_DOWN
else:
continue
if state == STATE_DATA_FIRST_PULL_DOWN:
if current == GPIO.LOW:
state = STATE_DATA_PULL_UP
else:
continue
if state == STATE_DATA_PULL_UP:
if current == GPIO.HIGH:
current_length = 0
state = STATE_DATA_PULL_DOWN
else:
continue
if state == STATE_DATA_PULL_DOWN:
if current == GPIO.LOW:
lengths.append(current_length)
state = STATE_DATA_PULL_UP
else:
continue
if len(lengths) != 40:
#print ("Data not good, skip")
return False
shortest_pull_up = min(lengths)
longest_pull_up = max(lengths)
halfway = (longest_pull_up + shortest_pull_up) / 2
bits = []
the_bytes = []
byte = 0
for length in lengths:
bit = 0
if length > halfway:
bit = 1
bits.append(bit)
#print ("bits: %s, length: %d" % (bits, len(bits)))
for i in range(0, len(bits)):
byte = byte << 1
if (bits[i]):
byte = byte | 1
else:
byte = byte | 0
if ((i + 1) % 8 == 0):
the_bytes.append(byte)
byte = 0
#print (the_bytes)
checksum = (the_bytes[0] + the_bytes[1] + the_bytes[2] + the_bytes[3]) & 0xFF
if the_bytes[4] != checksum:
#print ("Data not good, skip")
return False
return the_bytes[0], the_bytes[2]
def create_server_socket():
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
server_socket.bind(('localhost', 12346)) # localhost port12346
server_socket.listen(1)
print("Server is listening on port 12346...")
return server_socket
except OSError as e:
print(f"Failed to bind socket: {e}")
server_socket.close()
return None
def main():
server_socket = create_server_socket()
if server_socket is None:
print("Retrying to create server socket...")
time.sleep(1)
main()
return
try:
conn, addr = server_socket.accept()
print("Connected by", addr)
while True:
result = readDht11()
if result:
humidity, temperature = result
#print("humidity: %s %%, Temperature: %s C" % (humidity, temperature))
conn.sendall(f"{humidity},{temperature}".encode('utf-8'))
time.sleep(1)
except KeyboardInterrupt:
print("Program interrupted.")
except Exception as e:
print(f"Error: {e}")
finally:
conn.close()
server_socket.close()
GPIO.cleanup() # GPIO cleanup here
if __name__ == '__main__':
main()
実行している様子
動いた。。。
プログラム概要
メイン側ではMQTTサブスクとGUI表示を担っている。そして、温度と湿度をサブプログラムからソケット接続で取得するようにした。サブプログラムは温度と湿度を計測し、ソケット接続を待機する。
苦労した点
GUIを終了した際、適切にソケットを初期化できず、2回目以降動作しなかった。メインプログラムで色々エラー処理を入れたがダメで、結局サブプログラム側で対処を入れたところ上手く動作するようになった。サブプログラム側の対処は2点。
①server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)を設定することで、ソケットが再起動する際にアドレスの再利用を許可する。これにより、すぐに再バインドを行うことが可能になる。
②バインドに失敗した場合、1秒待機してから再度main()を呼び出して、ソケットの再作成を試みる(無限ループなのであまり良くない)。
[…] PI 4 Model B(ラズパイ)でCAN通信をしてみる Raspberry PI 4 Model B(ラズパイ)で計測した温度をMQTT送信(python) MQTT経由でRaspberry PI 4 Model B(ラズパイ)からモーター制御 […]