Python IoT Smart Energy Meter GUI (CTkinter + MQTT) — Live Power, kWh & Daily Cost

 Demo :


Click Video πŸ‘‡πŸ‘‡πŸ‘‡




























Slug:
python-iot-smart-energy-meter-ctkinter-mqtt-live-power-cost


Description :
Build a Python IoT Smart Energy Meter GUI (CTkinter + MQTT) with live charts, kWh & daily cost estimate. Demo Mode included. Code & setup inside.


Features :

  • Hero image: GUI dashboard screenshot (dark glass UI + chart)

  • Intro: what/why + 5-line value pitch

  • Features: KPI cards, live chart, tariff → cost/day, Demo Mode, MQTT topic/broker

  • Quick Start: install, run, default broker/topic, JSON payload example

  • Code Embed: main.py snippet + repo link

  • Use Cases: home audit, student projects, ESP32/RPi

  • CTA: Subscribe to FuzzuTech; comment “BILL ₹ SAVED” for assets

  • FAQ: MQTT not available? Use Demo Mode; change tariff?


Code :


# -*- coding: utf-8 -*-

"""

FuzzuTech — IoT Smart Energy Meter GUI

Modern + Stylish + Attractive (CustomTkinter) with MQTT subscribe + live chart.

- Dark theme, glassy cards, neon accents

- Real-time power line-chart (last N seconds)

- KPI cards: Voltage, Current, Power, Energy (kWh), PF, Frequency

- Cost estimate/day (configurable tariff)

- Demo Mode (random data) if MQTT not available

"""


import json, random, threading, time, sys

from collections import deque

from datetime import datetime


import customtkinter as ctk

from tkinter import messagebox


# Matplotlib (for live chart)

from matplotlib.figure import Figure

from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg


# MQTT (optional, safe import)

try:

    import paho.mqtt.client as mqtt

    MQTT_AVAILABLE = True

except Exception:

    MQTT_AVAILABLE = False


APP_TITLE = "FuzzuTech • Smart Energy Meter"

DEFAULT_BROKER = "test.mosquitto.org"

DEFAULT_PORT = 1883

DEFAULT_TOPIC = "fuzzutech/iot/energy/livingroom"

UI_REFRESH_MS = 500            # UI refresh interval

PLOT_SECONDS = 120             # keep last N seconds in chart

MAX_POINTS = PLOT_SECONDS * (1000 // UI_REFRESH_MS)

TARIFF_PER_KWH = 9.0           # INR per kWh (editable in settings)


ctk.set_appearance_mode("dark")

ctk.set_default_color_theme("dark-blue")



class StatCard(ctk.CTkFrame):

    def __init__(self, master, title, unit="", accent="#22d3ee"):

        super().__init__(master, corner_radius=16)

        self.configure(fg_color=("gray12"))

        self.title = title

        self.unit = unit

        self.accent = accent


        self.header = ctk.CTkLabel(self, text=title, font=("Inter", 14, "bold"))

        self.header.pack(anchor="w", padx=14, pady=(12, 0))


        self.value_var = ctk.StringVar(value="—")

        self.value_lbl = ctk.CTkLabel(

            self,

            textvariable=self.value_var,

            font=("Inter", 24, "bold"),

            text_color=self.accent,

        )

        self.value_lbl.pack(anchor="w", padx=14, pady=(2, 12))


        self.gauge = ctk.CTkProgressBar(self, height=10, corner_radius=8, mode="determinate")

        self.gauge.set(0)

        self.gauge.pack(fill="x", padx=14, pady=(0, 14))


    def update_value(self, value, gauge_ratio=None):

        try:

            if value is None:

                self.value_var.set("—")

                self.gauge.set(0)

                return

            txt = f"{value:.2f} {self.unit}".strip()

            self.value_var.set(txt)

            if gauge_ratio is None:

                gauge_ratio = 0.0

            self.gauge.set(max(0.0, min(1.0, gauge_ratio)))

        except Exception:

            pass



class SettingsPanel(ctk.CTkFrame):

    def __init__(self, master, on_apply):

        super().__init__(master, corner_radius=16)

        self.configure(fg_color=("gray12"))

        self.on_apply = on_apply


        ctk.CTkLabel(self, text="Settings", font=("Inter", 16, "bold")).grid(row=0, column=0, sticky="w", padx=12, pady=(12, 4))


        # Broker/Topic

        self.broker = ctk.CTkEntry(self, placeholder_text="MQTT Broker", width=220)

        self.broker.insert(0, DEFAULT_BROKER)

        self.port = ctk.CTkEntry(self, placeholder_text="Port", width=80)

        self.port.insert(0, str(DEFAULT_PORT))

        self.topic = ctk.CTkEntry(self, placeholder_text="Topic", width=280)

        self.topic.insert(0, DEFAULT_TOPIC)


        self.broker.grid(row=1, column=0, padx=12, pady=4, sticky="w")

        self.port.grid(row=1, column=1, padx=6, pady=4, sticky="w")

        self.topic.grid(row=1, column=2, padx=6, pady=4, sticky="w")


        # Tariff

        ctk.CTkLabel(self, text="Tariff (INR/kWh):", font=("Inter", 12)).grid(row=2, column=0, padx=12, pady=(6, 0), sticky="w")

        self.tariff = ctk.CTkEntry(self, placeholder_text="INR/kWh", width=120)

        self.tariff.insert(0, str(TARIFF_PER_KWH))

        self.tariff.grid(row=2, column=1, padx=6, pady=(6, 0), sticky="w")


        # Demo mode

        self.demo_var = ctk.BooleanVar(value=False)

        self.demo_chk = ctk.CTkCheckBox(self, text="Demo Mode (no MQTT needed)", variable=self.demo_var)

        self.demo_chk.grid(row=2, column=2, padx=6, pady=(6, 0), sticky="w")


        # Apply btn

        self.apply_btn = ctk.CTkButton(self, text="Apply & (Re)Connect", command=self.apply)

        self.apply_btn.grid(row=3, column=0, padx=12, pady=10, sticky="w")


        self.grid_columnconfigure(2, weight=1)


    def apply(self):

        try:

            broker = self.broker.get().strip()

            port = int(self.port.get().strip())

            topic = self.topic.get().strip()

            tariff = float(self.tariff.get().strip())

            demo = self.demo_var.get()

            self.on_apply(broker, port, topic, tariff, demo)

        except Exception as e:

            messagebox.showerror("Settings", f"Invalid settings: {e}")



class EnergyApp(ctk.CTk):

    def __init__(self):

        super().__init__()

        self.title(APP_TITLE)

        try:

            self.iconbitmap("assets/icon.ico")

        except Exception:

            pass

        self.geometry("1100x720")

        self.minsize(980, 640)


        self.demo_mode = True

        self.tariff_inr_per_kwh = TARIFF_PER_KWH


        # Data buffers

        self.ts_buf = deque(maxlen=MAX_POINTS)

        self.power_buf = deque(maxlen=MAX_POINTS)

        self.voltage = None

        self.current = None

        self.power = None

        self.energy_kwh = 0.0

        self.pf = None

        self.freq = None


        # MQTT

        self.mqtt_client = None

        self.mqtt_connected = False

        self.mqtt_lock = threading.Lock()

        self.mqtt_params = (DEFAULT_BROKER, DEFAULT_PORT, DEFAULT_TOPIC)


        # Layout

        self.build_ui()

        self.after(UI_REFRESH_MS, self.refresh_ui)


    # ---------- UI ----------

    def build_ui(self):

        # Top bar

        top = ctk.CTkFrame(self, fg_color="transparent")

        top.pack(fill="x", padx=16, pady=(14, 8))

        title_lbl = ctk.CTkLabel(top, text=APP_TITLE, font=("Inter", 20, "bold"))

        title_lbl.pack(side="left")


        self.status_var = ctk.StringVar(value="Status: DISCONNECTED • Demo Mode ON")

        status = ctk.CTkLabel(top, textvariable=self.status_var, font=("Inter", 12))

        status.pack(side="right")


        # Settings

        self.settings = SettingsPanel(self, self.apply_settings)

        self.settings.pack(fill="x", padx=16, pady=(0, 10))


        # KPI cards row

        cards = ctk.CTkFrame(self, fg_color="transparent")

        cards.pack(fill="x", padx=16, pady=4)


        self.card_voltage = StatCard(cards, "Voltage", "V", accent="#60a5fa")

        self.card_current = StatCard(cards, "Current", "A", accent="#34d399")

        self.card_power   = StatCard(cards, "Active Power", "W", accent="#f472b6")

        self.card_energy  = StatCard(cards, "Energy", "kWh", accent="#fde047")

        self.card_pf      = StatCard(cards, "Power Factor", "", accent="#22d3ee")

        self.card_freq    = StatCard(cards, "Frequency", "Hz", accent="#a78bfa")


        for i, w in enumerate([self.card_voltage, self.card_current, self.card_power, self.card_energy, self.card_pf, self.card_freq]):

            w.grid(row=0, column=i, padx=8, pady=8, sticky="nsew")

            cards.grid_columnconfigure(i, weight=1)


        # Chart & estimates

        mid = ctk.CTkFrame(self, fg_color="transparent")

        mid.pack(fill="both", expand=True, padx=16, pady=8)


        # Matplotlib figure

        fig = Figure(figsize=(6, 3), dpi=100)

        self.ax = fig.add_subplot(111)

        self.ax.set_title("Real-time Power (W) — Last 2 min")

        self.ax.set_xlabel("Time")

        self.ax.set_ylabel("W")

        self.ax.grid(True, alpha=0.25)

        self.line, = self.ax.plot([], [])


        self.canvas = FigureCanvasTkAgg(fig, master=mid)

        self.canvas.get_tk_widget().pack(side="left", fill="both", expand=True, padx=(0, 8))


        # Right-side estimates

        right = ctk.CTkFrame(mid, corner_radius=16)

        right.pack(side="left", fill="y")

        ctk.CTkLabel(right, text="Estimates", font=("Inter", 16, "bold")).pack(anchor="w", padx=14, pady=(12, 6))


        self.estimate_power_var = ctk.StringVar(value="—")

        self.estimate_energy_day_var = ctk.StringVar(value="—")

        self.estimate_cost_day_var = ctk.StringVar(value="—")

        self.uptime_var = ctk.StringVar(value="—")


        for k, var in [

            ("Avg Power (last 2 min):", self.estimate_power_var),

            ("Projected Energy/day:", self.estimate_energy_day_var),

            ("Projected Cost/day (INR):", self.estimate_cost_day_var),

            ("Uptime:", self.uptime_var),

        ]:

            frm = ctk.CTkFrame(right, fg_color=("gray12"), corner_radius=12)

            frm.pack(fill="x", padx=12, pady=6)

            ctk.CTkLabel(frm, text=k, font=("Inter", 12, "bold")).pack(anchor="w", padx=12, pady=(8, 0))

            ctk.CTkLabel(frm, textvariable=var, font=("Inter", 16)).pack(anchor="w", padx=12, pady=(0, 8))


        self.start_time = time.time()


    # ---------- Settings / MQTT ----------

    def apply_settings(self, broker, port, topic, tariff, demo):

        self.demo_mode = demo

        self.tariff_inr_per_kwh = tariff

        self.mqtt_params = (broker, port, topic)

        if self.demo_mode:

            self.disconnect_mqtt()

            self.status_var.set("Status: DEMO MODE • MQTT disconnected")

        else:

            if not MQTT_AVAILABLE:

                messagebox.showwarning("MQTT", "paho-mqtt not installed. Switching to Demo Mode.")

                self.demo_mode = True

                self.status_var.set("Status: DEMO MODE (MQTT lib missing)")

            else:

                self.connect_mqtt()


    def connect_mqtt(self):

        self.disconnect_mqtt()

        broker, port, topic = self.mqtt_params


        self.mqtt_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)

        self.mqtt_client.on_connect = self.on_mqtt_connect

        self.mqtt_client.on_message = self.on_mqtt_message

        self.mqtt_client.on_disconnect = self.on_mqtt_disconnect


        def loop():

            try:

                self.mqtt_client.connect(broker, port, keepalive=30)

                self.mqtt_client.subscribe(topic, qos=0)

                self.mqtt_client.loop_forever(retry_first_connection=True)

            except Exception as e:

                with self.mqtt_lock:

                    self.mqtt_connected = False

                self.status_var.set(f"Status: MQTT ERROR ({e}) • Demo Mode suggested")


        t = threading.Thread(target=loop, daemon=True)

        t.start()


    def disconnect_mqtt(self):

        try:

            if self.mqtt_client:

                self.mqtt_client.disconnect()

        except Exception:

            pass

        self.mqtt_client = None

        with self.mqtt_lock:

            self.mqtt_connected = False


    # ---------- MQTT Callbacks ----------

    def on_mqtt_connect(self, client, userdata, flags, reason_code, properties=None):

        with self.mqtt_lock:

            self.mqtt_connected = (reason_code == 0)

        if reason_code == 0:

            self.status_var.set(f"Status: CONNECTED to {self.mqtt_params[0]} • Topic: {self.mqtt_params[2]}")

        else:

            self.status_var.set(f"Status: MQTT connect failed ({reason_code})")


    def on_mqtt_disconnect(self, client, userdata, reason_code, properties=None):

        with self.mqtt_lock:

            self.mqtt_connected = False

        self.status_var.set("Status: DISCONNECTED")


    def on_mqtt_message(self, client, userdata, msg):

        try:

            data = json.loads(msg.payload.decode("utf-8"))

        except Exception:

            return

        now = time.time()

        power = float(data.get("power", 0.0))

        self.power = power

        self.voltage = float(data.get("voltage")) if data.get("voltage") is not None else None

        self.current = float(data.get("current")) if data.get("current") is not None else None

        self.energy_kwh = float(data.get("energy_kwh")) if data.get("energy_kwh") is not None else self.energy_kwh

        self.pf = float(data.get("pf")) if data.get("pf") is not None else None

        self.freq = float(data.get("freq")) if data.get("freq") is not None else None


        self.ts_buf.append(now)

        self.power_buf.append(power)


    # ---------- Demo data ----------

    def generate_demo(self):

        now = time.time()

        # Simulate household load: base + small random walk

        base = 120 + 60 * (1 + time.time() % 10 / 10)  # varying base ~120-240W

        jitter = random.uniform(-25, 25)

        p = max(30.0, base + jitter)

        v = random.uniform(218, 234)

        i = p / v

        pf = random.uniform(0.85, 0.98)

        freq = random.uniform(49.8, 50.2)


        # integrate energy (Wh -> kWh)

        if self.power is not None:

            dt = UI_REFRESH_MS / 1000.0

            self.energy_kwh += (self.power * dt) / 3600000.0


        self.power = p

        self.voltage = v

        self.current = i

        self.pf = pf

        self.freq = freq


        self.ts_buf.append(now)

        self.power_buf.append(p)


    # ---------- UI Refresh ----------

    def refresh_ui(self):

        try:

            if self.demo_mode or not self.mqtt_connected:

                # in demo, integrate energy against previous power

                self.generate_demo()


            # Update KPI cards

            v = self.voltage

            i = self.current

            p = self.power

            ek = self.energy_kwh

            pf = self.pf

            fq = self.freq


            self.card_voltage.update_value(v, None if v is None else (min(v/260.0, 1.0)))

            self.card_current.update_value(i, None if i is None else (min(i/10.0, 1.0)))

            self.card_power.update_value(p, None if p is None else (min(p/3000.0, 1.0)))

            self.card_energy.update_value(ek, None if ek is None else (min(ek/10.0, 1.0)))

            self.card_pf.update_value(pf, None if pf is None else (pf))

            self.card_freq.update_value(fq, None if fq is None else (min((fq-45)/10.0, 1.0)))


            # Chart

            if len(self.ts_buf) >= 2:

                # Convert timestamps to seconds-from-now labels (keep readable)

                xs = [datetime.fromtimestamp(t).strftime("%H:%M:%S") for t in self.ts_buf]

                ys = list(self.power_buf)

                self.line.set_data(range(len(xs)), ys)

                self.ax.set_xlim(0, max(10, len(xs)))

                self.ax.set_ylim(0, max(300, max(ys) * 1.2))

                self.ax.set_xticks(range(0, len(xs), max(1, len(xs)//6)))

                self.ax.set_xticklabels([xs[k] for k in range(0, len(xs), max(1, len(xs)//6))], rotation=0)

                self.canvas.draw_idle()


            # Estimates

            if len(self.power_buf) > 0:

                avg_power = sum(self.power_buf) / len(self.power_buf)

                self.estimate_power_var.set(f"{avg_power:.1f} W")


                # Projection: if avg_power sustained for 24h

                energy_day_kwh = (avg_power * 24.0) / 1000.0

                self.estimate_energy_day_var.set(f"{energy_day_kwh:.2f} kWh")


                cost_day = energy_day_kwh * self.tariff_inr_per_kwh

                self.estimate_cost_day_var.set(f"{cost_day:.2f}")


            up = time.time() - self.start_time

            self.uptime_var.set(f"{int(up//3600)}h {int((up%3600)//60)}m {int(up%60)}s")


        except Exception:

            pass

        finally:

            self.after(UI_REFRESH_MS, self.refresh_ui)



if __name__ == "__main__":

    app = EnergyApp()

    app.mainloop()


Comments

Popular posts from this blog

πŸš€ Simple Login & Registration System in Python Tkinter πŸ“±

πŸ“‘ Fuzzu Packet Sniffer – Python GUI for Real-Time IP Monitoring | Tkinter + Scapy

πŸ”₯ Advanced MP3 Music Player in Python | CustomTkinter + Pygame | Free Source Code