IoT Predictive Maintenance System in Python | Real-Time Anomaly Detection with MQTT

 Demo :


Click Video πŸ‘‡πŸ‘‡πŸ‘‡






























Features:

  • Embed YouTube Video πŸŽ₯

  • Add screenshots of GUI + code snippet

  • SEO keywords → Python IoT Project, Predictive Maintenance App, MQTT HiveMQ Python, Tkinter IoT GUI


Code :


import tkinter as tk

from tkinter import ttk, messagebox

import paho.mqtt.client as mqtt

import json

import random

import threading

import time


# MQTT Configuration

MQTT_BROKER = "broker.hivemq.com"

MQTT_PORT = 1883

MQTT_TOPIC = "fuzzutech/iot/anomaly"


# Global Variables

anomaly_detected = False


# MQTT Callbacks

def on_connect(client, userdata, flags, rc):

    print(f"Connected to MQTT Broker with result code {rc}")

    client.subscribe(MQTT_TOPIC)


def on_message(client, userdata, msg):

    global anomaly_detected

    payload = json.loads(msg.payload.decode())

    sensor_value = payload['sensor_value']

    print(f"Received Sensor Value: {sensor_value}")


    # Simple anomaly detection logic

    if sensor_value > 80:

        anomaly_detected = True

        app.update_status(f"Anomaly Detected! Sensor Value: {sensor_value}")

    else:

        anomaly_detected = False

        app.update_status(f"System Normal. Sensor Value: {sensor_value}")


# MQTT Client Setup

client = mqtt.Client()

client.on_connect = on_connect

client.on_message = on_message

client.connect(MQTT_BROKER, MQTT_PORT, 60)


# Start MQTT in background thread

def mqtt_loop():

    client.loop_forever()


mqtt_thread = threading.Thread(target=mqtt_loop)

mqtt_thread.daemon = True

mqtt_thread.start()


# GUI Application

class PredictiveMaintenanceApp:

    def __init__(self, root):

        self.root = root

        self.root.title("IoT Predictive Maintenance System")

        self.root.geometry("400x400")

        self.root.configure(bg="#1e1e1e")


        title = tk.Label(root, text="IoT Predictive Maintenance", font=("Arial", 20, "bold"), fg="white", bg="#1e1e1e")

        title.pack(pady=20)


        self.status_label = tk.Label(root, text="Waiting for data...", font=("Arial", 14), fg="lightgreen", bg="#1e1e1e")

        self.status_label.pack(pady=20)


        self.detect_button = ttk.Button(root, text="Simulate Sensor Data", command=self.simulate_sensor_data)

        self.detect_button.pack(pady=20)


    def update_status(self, status_text):

        self.status_label.config(text=status_text)


    def simulate_sensor_data(self):

        # Simulate publishing random sensor data

        sensor_value = random.randint(50, 100)

        payload = json.dumps({"sensor_value": sensor_value})

        client.publish(MQTT_TOPIC, payload)

        print(f"Published Sensor Value: {sensor_value}")


# Launch GUI

root = tk.Tk()

app = PredictiveMaintenanceApp(root)

root.mainloop()


Comments

Popular posts from this blog

πŸš€ Simple Login & Registration System in Python Tkinter πŸ“±

πŸ“‘ Fuzzu Packet Sniffer – Python GUI for Real-Time IP Monitoring | Tkinter + Scapy

πŸ”₯ Advanced MP3 Music Player in Python | CustomTkinter + Pygame | Free Source Code