IoT Edge Computing Dashboard in Python – Live MQTT Data Processing (FuzzuTech)
Demo :
Click Video πππ
π Features :
-
Real-time IoT sensor simulation
-
MQTT-based Edge Data publishing
-
Comfort Index computation
-
Interactive GUI with Python
Code :
import time
import random
import json
import threading
import customtkinter as ctk
import paho.mqtt.client as mqtt
# ---------------- MQTT Configuration ----------------
MQTT_BROKER = "broker.hivemq.com"
MQTT_PORT = 1883
MQTT_TOPIC = "fuzzutech/iot/edge"
client = mqtt.Client()
client.connect(MQTT_BROKER, MQTT_PORT, 60)
# ---------------- Sensor Simulation ----------------
def read_sensor_data():
"""Simulate Temperature & Humidity Sensor"""
temperature = round(random.uniform(20.0, 30.0), 2)
humidity = round(random.uniform(30.0, 70.0), 2)
return {"temperature": temperature, "humidity": humidity}
def process_data(sensor_data):
"""Compute Comfort Index"""
temp = sensor_data["temperature"]
hum = sensor_data["humidity"]
comfort_index = round(temp - (0.55 - 0.0055 * hum) * (temp - 14.5), 2)
return {"comfort_index": comfort_index}
# ---------------- GUI App ----------------
class IoTApp(ctk.CTk):
def __init__(self):
super().__init__()
self.title("⚡ IoT Edge Computing Dashboard")
self.geometry("400x400")
ctk.set_appearance_mode("dark")
ctk.set_default_color_theme("green")
# Title
self.title_label = ctk.CTkLabel(self, text="IoT Edge Data Processing", font=("Arial", 20, "bold"))
self.title_label.pack(pady=15)
# Labels
self.temp_label = ctk.CTkLabel(self, text="π‘️ Temperature: -- °C", font=("Arial", 16))
self.temp_label.pack(pady=5)
self.hum_label = ctk.CTkLabel(self, text="π§ Humidity: -- %", font=("Arial", 16))
self.hum_label.pack(pady=5)
self.ci_label = ctk.CTkLabel(self, text="π Comfort Index: --", font=("Arial", 16))
self.ci_label.pack(pady=5)
self.status_label = ctk.CTkLabel(self, text="π Connecting to MQTT...", font=("Arial", 14))
self.status_label.pack(pady=10)
# Start Button
self.start_btn = ctk.CTkButton(self, text="▶ Start Edge Processing", command=self.start_processing)
self.start_btn.pack(pady=15)
def start_processing(self):
self.status_label.configure(text="✅ Edge Processing Started...")
self.start_btn.configure(state="disabled")
# Run in background thread
threading.Thread(target=self.update_loop, daemon=True).start()
def update_loop(self):
while True:
sensor_data = read_sensor_data()
processed_data = process_data(sensor_data)
payload = {
"sensor_data": sensor_data,
"processed_data": processed_data,
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
}
# Publish to MQTT
client.publish(MQTT_TOPIC, json.dumps(payload))
# Update GUI Labels
self.temp_label.configure(text=f"π‘️ Temperature: {sensor_data['temperature']} °C")
self.hum_label.configure(text=f"π§ Humidity: {sensor_data['humidity']} %")
self.ci_label.configure(text=f"π Comfort Index: {processed_data['comfort_index']}")
self.status_label.configure(text=f"π‘ Published @ {payload['timestamp']}")
time.sleep(5)
# ---------------- Run App ----------------
if __name__ == "__main__":
app = IoTApp()
app.mainloop()
Comments
Post a Comment