IoT Predictive Maintenance System in Python | Real-Time Anomaly Detection with MQTT
Demo :
Click Video πππ
Features:
-
Embed YouTube Video π₯
-
Add screenshots of GUI + code snippet
-
SEO keywords → Python IoT Project, Predictive Maintenance App, MQTT HiveMQ Python, Tkinter IoT GUI
Code :
import tkinter as tk
from tkinter import ttk, messagebox
import paho.mqtt.client as mqtt
import json
import random
import threading
import time
# MQTT Configuration
MQTT_BROKER = "broker.hivemq.com"
MQTT_PORT = 1883
MQTT_TOPIC = "fuzzutech/iot/anomaly"
# Global Variables
anomaly_detected = False
# MQTT Callbacks
def on_connect(client, userdata, flags, rc):
print(f"Connected to MQTT Broker with result code {rc}")
client.subscribe(MQTT_TOPIC)
def on_message(client, userdata, msg):
global anomaly_detected
payload = json.loads(msg.payload.decode())
sensor_value = payload['sensor_value']
print(f"Received Sensor Value: {sensor_value}")
# Simple anomaly detection logic
if sensor_value > 80:
anomaly_detected = True
app.update_status(f"Anomaly Detected! Sensor Value: {sensor_value}")
else:
anomaly_detected = False
app.update_status(f"System Normal. Sensor Value: {sensor_value}")
# MQTT Client Setup
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect(MQTT_BROKER, MQTT_PORT, 60)
# Start MQTT in background thread
def mqtt_loop():
client.loop_forever()
mqtt_thread = threading.Thread(target=mqtt_loop)
mqtt_thread.daemon = True
mqtt_thread.start()
# GUI Application
class PredictiveMaintenanceApp:
def __init__(self, root):
self.root = root
self.root.title("IoT Predictive Maintenance System")
self.root.geometry("400x400")
self.root.configure(bg="#1e1e1e")
title = tk.Label(root, text="IoT Predictive Maintenance", font=("Arial", 20, "bold"), fg="white", bg="#1e1e1e")
title.pack(pady=20)
self.status_label = tk.Label(root, text="Waiting for data...", font=("Arial", 14), fg="lightgreen", bg="#1e1e1e")
self.status_label.pack(pady=20)
self.detect_button = ttk.Button(root, text="Simulate Sensor Data", command=self.simulate_sensor_data)
self.detect_button.pack(pady=20)
def update_status(self, status_text):
self.status_label.config(text=status_text)
def simulate_sensor_data(self):
# Simulate publishing random sensor data
sensor_value = random.randint(50, 100)
payload = json.dumps({"sensor_value": sensor_value})
client.publish(MQTT_TOPIC, payload)
print(f"Published Sensor Value: {sensor_value}")
# Launch GUI
root = tk.Tk()
app = PredictiveMaintenanceApp(root)
root.mainloop()
Comments
Post a Comment