Python IoT Smart Energy Meter GUI (CTkinter + MQTT) — Live Power, kWh & Daily Cost
Demo :
Click Video πππ
Slug:
python-iot-smart-energy-meter-ctkinter-mqtt-live-power-cost
Description :
Build a Python IoT Smart Energy Meter GUI (CTkinter + MQTT) with live charts, kWh & daily cost estimate. Demo Mode included. Code & setup inside.
Features :
-
Hero image: GUI dashboard screenshot (dark glass UI + chart)
-
Intro: what/why + 5-line value pitch
-
Features: KPI cards, live chart, tariff → cost/day, Demo Mode, MQTT topic/broker
-
Quick Start: install, run, default broker/topic, JSON payload example
-
Code Embed: main.py snippet + repo link
-
Use Cases: home audit, student projects, ESP32/RPi
-
CTA: Subscribe to FuzzuTech; comment “BILL ₹ SAVED” for assets
-
FAQ: MQTT not available? Use Demo Mode; change tariff?
Code :
# -*- coding: utf-8 -*-
"""
FuzzuTech — IoT Smart Energy Meter GUI
Modern + Stylish + Attractive (CustomTkinter) with MQTT subscribe + live chart.
- Dark theme, glassy cards, neon accents
- Real-time power line-chart (last N seconds)
- KPI cards: Voltage, Current, Power, Energy (kWh), PF, Frequency
- Cost estimate/day (configurable tariff)
- Demo Mode (random data) if MQTT not available
"""
import json, random, threading, time, sys
from collections import deque
from datetime import datetime
import customtkinter as ctk
from tkinter import messagebox
# Matplotlib (for live chart)
from matplotlib.figure import Figure
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
# MQTT (optional, safe import)
try:
import paho.mqtt.client as mqtt
MQTT_AVAILABLE = True
except Exception:
MQTT_AVAILABLE = False
APP_TITLE = "FuzzuTech • Smart Energy Meter"
DEFAULT_BROKER = "test.mosquitto.org"
DEFAULT_PORT = 1883
DEFAULT_TOPIC = "fuzzutech/iot/energy/livingroom"
UI_REFRESH_MS = 500 # UI refresh interval
PLOT_SECONDS = 120 # keep last N seconds in chart
MAX_POINTS = PLOT_SECONDS * (1000 // UI_REFRESH_MS)
TARIFF_PER_KWH = 9.0 # INR per kWh (editable in settings)
ctk.set_appearance_mode("dark")
ctk.set_default_color_theme("dark-blue")
class StatCard(ctk.CTkFrame):
def __init__(self, master, title, unit="", accent="#22d3ee"):
super().__init__(master, corner_radius=16)
self.configure(fg_color=("gray12"))
self.title = title
self.unit = unit
self.accent = accent
self.header = ctk.CTkLabel(self, text=title, font=("Inter", 14, "bold"))
self.header.pack(anchor="w", padx=14, pady=(12, 0))
self.value_var = ctk.StringVar(value="—")
self.value_lbl = ctk.CTkLabel(
self,
textvariable=self.value_var,
font=("Inter", 24, "bold"),
text_color=self.accent,
)
self.value_lbl.pack(anchor="w", padx=14, pady=(2, 12))
self.gauge = ctk.CTkProgressBar(self, height=10, corner_radius=8, mode="determinate")
self.gauge.set(0)
self.gauge.pack(fill="x", padx=14, pady=(0, 14))
def update_value(self, value, gauge_ratio=None):
try:
if value is None:
self.value_var.set("—")
self.gauge.set(0)
return
txt = f"{value:.2f} {self.unit}".strip()
self.value_var.set(txt)
if gauge_ratio is None:
gauge_ratio = 0.0
self.gauge.set(max(0.0, min(1.0, gauge_ratio)))
except Exception:
pass
class SettingsPanel(ctk.CTkFrame):
def __init__(self, master, on_apply):
super().__init__(master, corner_radius=16)
self.configure(fg_color=("gray12"))
self.on_apply = on_apply
ctk.CTkLabel(self, text="Settings", font=("Inter", 16, "bold")).grid(row=0, column=0, sticky="w", padx=12, pady=(12, 4))
# Broker/Topic
self.broker = ctk.CTkEntry(self, placeholder_text="MQTT Broker", width=220)
self.broker.insert(0, DEFAULT_BROKER)
self.port = ctk.CTkEntry(self, placeholder_text="Port", width=80)
self.port.insert(0, str(DEFAULT_PORT))
self.topic = ctk.CTkEntry(self, placeholder_text="Topic", width=280)
self.topic.insert(0, DEFAULT_TOPIC)
self.broker.grid(row=1, column=0, padx=12, pady=4, sticky="w")
self.port.grid(row=1, column=1, padx=6, pady=4, sticky="w")
self.topic.grid(row=1, column=2, padx=6, pady=4, sticky="w")
# Tariff
ctk.CTkLabel(self, text="Tariff (INR/kWh):", font=("Inter", 12)).grid(row=2, column=0, padx=12, pady=(6, 0), sticky="w")
self.tariff = ctk.CTkEntry(self, placeholder_text="INR/kWh", width=120)
self.tariff.insert(0, str(TARIFF_PER_KWH))
self.tariff.grid(row=2, column=1, padx=6, pady=(6, 0), sticky="w")
# Demo mode
self.demo_var = ctk.BooleanVar(value=False)
self.demo_chk = ctk.CTkCheckBox(self, text="Demo Mode (no MQTT needed)", variable=self.demo_var)
self.demo_chk.grid(row=2, column=2, padx=6, pady=(6, 0), sticky="w")
# Apply btn
self.apply_btn = ctk.CTkButton(self, text="Apply & (Re)Connect", command=self.apply)
self.apply_btn.grid(row=3, column=0, padx=12, pady=10, sticky="w")
self.grid_columnconfigure(2, weight=1)
def apply(self):
try:
broker = self.broker.get().strip()
port = int(self.port.get().strip())
topic = self.topic.get().strip()
tariff = float(self.tariff.get().strip())
demo = self.demo_var.get()
self.on_apply(broker, port, topic, tariff, demo)
except Exception as e:
messagebox.showerror("Settings", f"Invalid settings: {e}")
class EnergyApp(ctk.CTk):
def __init__(self):
super().__init__()
self.title(APP_TITLE)
try:
self.iconbitmap("assets/icon.ico")
except Exception:
pass
self.geometry("1100x720")
self.minsize(980, 640)
self.demo_mode = True
self.tariff_inr_per_kwh = TARIFF_PER_KWH
# Data buffers
self.ts_buf = deque(maxlen=MAX_POINTS)
self.power_buf = deque(maxlen=MAX_POINTS)
self.voltage = None
self.current = None
self.power = None
self.energy_kwh = 0.0
self.pf = None
self.freq = None
# MQTT
self.mqtt_client = None
self.mqtt_connected = False
self.mqtt_lock = threading.Lock()
self.mqtt_params = (DEFAULT_BROKER, DEFAULT_PORT, DEFAULT_TOPIC)
# Layout
self.build_ui()
self.after(UI_REFRESH_MS, self.refresh_ui)
# ---------- UI ----------
def build_ui(self):
# Top bar
top = ctk.CTkFrame(self, fg_color="transparent")
top.pack(fill="x", padx=16, pady=(14, 8))
title_lbl = ctk.CTkLabel(top, text=APP_TITLE, font=("Inter", 20, "bold"))
title_lbl.pack(side="left")
self.status_var = ctk.StringVar(value="Status: DISCONNECTED • Demo Mode ON")
status = ctk.CTkLabel(top, textvariable=self.status_var, font=("Inter", 12))
status.pack(side="right")
# Settings
self.settings = SettingsPanel(self, self.apply_settings)
self.settings.pack(fill="x", padx=16, pady=(0, 10))
# KPI cards row
cards = ctk.CTkFrame(self, fg_color="transparent")
cards.pack(fill="x", padx=16, pady=4)
self.card_voltage = StatCard(cards, "Voltage", "V", accent="#60a5fa")
self.card_current = StatCard(cards, "Current", "A", accent="#34d399")
self.card_power = StatCard(cards, "Active Power", "W", accent="#f472b6")
self.card_energy = StatCard(cards, "Energy", "kWh", accent="#fde047")
self.card_pf = StatCard(cards, "Power Factor", "", accent="#22d3ee")
self.card_freq = StatCard(cards, "Frequency", "Hz", accent="#a78bfa")
for i, w in enumerate([self.card_voltage, self.card_current, self.card_power, self.card_energy, self.card_pf, self.card_freq]):
w.grid(row=0, column=i, padx=8, pady=8, sticky="nsew")
cards.grid_columnconfigure(i, weight=1)
# Chart & estimates
mid = ctk.CTkFrame(self, fg_color="transparent")
mid.pack(fill="both", expand=True, padx=16, pady=8)
# Matplotlib figure
fig = Figure(figsize=(6, 3), dpi=100)
self.ax = fig.add_subplot(111)
self.ax.set_title("Real-time Power (W) — Last 2 min")
self.ax.set_xlabel("Time")
self.ax.set_ylabel("W")
self.ax.grid(True, alpha=0.25)
self.line, = self.ax.plot([], [])
self.canvas = FigureCanvasTkAgg(fig, master=mid)
self.canvas.get_tk_widget().pack(side="left", fill="both", expand=True, padx=(0, 8))
# Right-side estimates
right = ctk.CTkFrame(mid, corner_radius=16)
right.pack(side="left", fill="y")
ctk.CTkLabel(right, text="Estimates", font=("Inter", 16, "bold")).pack(anchor="w", padx=14, pady=(12, 6))
self.estimate_power_var = ctk.StringVar(value="—")
self.estimate_energy_day_var = ctk.StringVar(value="—")
self.estimate_cost_day_var = ctk.StringVar(value="—")
self.uptime_var = ctk.StringVar(value="—")
for k, var in [
("Avg Power (last 2 min):", self.estimate_power_var),
("Projected Energy/day:", self.estimate_energy_day_var),
("Projected Cost/day (INR):", self.estimate_cost_day_var),
("Uptime:", self.uptime_var),
]:
frm = ctk.CTkFrame(right, fg_color=("gray12"), corner_radius=12)
frm.pack(fill="x", padx=12, pady=6)
ctk.CTkLabel(frm, text=k, font=("Inter", 12, "bold")).pack(anchor="w", padx=12, pady=(8, 0))
ctk.CTkLabel(frm, textvariable=var, font=("Inter", 16)).pack(anchor="w", padx=12, pady=(0, 8))
self.start_time = time.time()
# ---------- Settings / MQTT ----------
def apply_settings(self, broker, port, topic, tariff, demo):
self.demo_mode = demo
self.tariff_inr_per_kwh = tariff
self.mqtt_params = (broker, port, topic)
if self.demo_mode:
self.disconnect_mqtt()
self.status_var.set("Status: DEMO MODE • MQTT disconnected")
else:
if not MQTT_AVAILABLE:
messagebox.showwarning("MQTT", "paho-mqtt not installed. Switching to Demo Mode.")
self.demo_mode = True
self.status_var.set("Status: DEMO MODE (MQTT lib missing)")
else:
self.connect_mqtt()
def connect_mqtt(self):
self.disconnect_mqtt()
broker, port, topic = self.mqtt_params
self.mqtt_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
self.mqtt_client.on_connect = self.on_mqtt_connect
self.mqtt_client.on_message = self.on_mqtt_message
self.mqtt_client.on_disconnect = self.on_mqtt_disconnect
def loop():
try:
self.mqtt_client.connect(broker, port, keepalive=30)
self.mqtt_client.subscribe(topic, qos=0)
self.mqtt_client.loop_forever(retry_first_connection=True)
except Exception as e:
with self.mqtt_lock:
self.mqtt_connected = False
self.status_var.set(f"Status: MQTT ERROR ({e}) • Demo Mode suggested")
t = threading.Thread(target=loop, daemon=True)
t.start()
def disconnect_mqtt(self):
try:
if self.mqtt_client:
self.mqtt_client.disconnect()
except Exception:
pass
self.mqtt_client = None
with self.mqtt_lock:
self.mqtt_connected = False
# ---------- MQTT Callbacks ----------
def on_mqtt_connect(self, client, userdata, flags, reason_code, properties=None):
with self.mqtt_lock:
self.mqtt_connected = (reason_code == 0)
if reason_code == 0:
self.status_var.set(f"Status: CONNECTED to {self.mqtt_params[0]} • Topic: {self.mqtt_params[2]}")
else:
self.status_var.set(f"Status: MQTT connect failed ({reason_code})")
def on_mqtt_disconnect(self, client, userdata, reason_code, properties=None):
with self.mqtt_lock:
self.mqtt_connected = False
self.status_var.set("Status: DISCONNECTED")
def on_mqtt_message(self, client, userdata, msg):
try:
data = json.loads(msg.payload.decode("utf-8"))
except Exception:
return
now = time.time()
power = float(data.get("power", 0.0))
self.power = power
self.voltage = float(data.get("voltage")) if data.get("voltage") is not None else None
self.current = float(data.get("current")) if data.get("current") is not None else None
self.energy_kwh = float(data.get("energy_kwh")) if data.get("energy_kwh") is not None else self.energy_kwh
self.pf = float(data.get("pf")) if data.get("pf") is not None else None
self.freq = float(data.get("freq")) if data.get("freq") is not None else None
self.ts_buf.append(now)
self.power_buf.append(power)
# ---------- Demo data ----------
def generate_demo(self):
now = time.time()
# Simulate household load: base + small random walk
base = 120 + 60 * (1 + time.time() % 10 / 10) # varying base ~120-240W
jitter = random.uniform(-25, 25)
p = max(30.0, base + jitter)
v = random.uniform(218, 234)
i = p / v
pf = random.uniform(0.85, 0.98)
freq = random.uniform(49.8, 50.2)
# integrate energy (Wh -> kWh)
if self.power is not None:
dt = UI_REFRESH_MS / 1000.0
self.energy_kwh += (self.power * dt) / 3600000.0
self.power = p
self.voltage = v
self.current = i
self.pf = pf
self.freq = freq
self.ts_buf.append(now)
self.power_buf.append(p)
# ---------- UI Refresh ----------
def refresh_ui(self):
try:
if self.demo_mode or not self.mqtt_connected:
# in demo, integrate energy against previous power
self.generate_demo()
# Update KPI cards
v = self.voltage
i = self.current
p = self.power
ek = self.energy_kwh
pf = self.pf
fq = self.freq
self.card_voltage.update_value(v, None if v is None else (min(v/260.0, 1.0)))
self.card_current.update_value(i, None if i is None else (min(i/10.0, 1.0)))
self.card_power.update_value(p, None if p is None else (min(p/3000.0, 1.0)))
self.card_energy.update_value(ek, None if ek is None else (min(ek/10.0, 1.0)))
self.card_pf.update_value(pf, None if pf is None else (pf))
self.card_freq.update_value(fq, None if fq is None else (min((fq-45)/10.0, 1.0)))
# Chart
if len(self.ts_buf) >= 2:
# Convert timestamps to seconds-from-now labels (keep readable)
xs = [datetime.fromtimestamp(t).strftime("%H:%M:%S") for t in self.ts_buf]
ys = list(self.power_buf)
self.line.set_data(range(len(xs)), ys)
self.ax.set_xlim(0, max(10, len(xs)))
self.ax.set_ylim(0, max(300, max(ys) * 1.2))
self.ax.set_xticks(range(0, len(xs), max(1, len(xs)//6)))
self.ax.set_xticklabels([xs[k] for k in range(0, len(xs), max(1, len(xs)//6))], rotation=0)
self.canvas.draw_idle()
# Estimates
if len(self.power_buf) > 0:
avg_power = sum(self.power_buf) / len(self.power_buf)
self.estimate_power_var.set(f"{avg_power:.1f} W")
# Projection: if avg_power sustained for 24h
energy_day_kwh = (avg_power * 24.0) / 1000.0
self.estimate_energy_day_var.set(f"{energy_day_kwh:.2f} kWh")
cost_day = energy_day_kwh * self.tariff_inr_per_kwh
self.estimate_cost_day_var.set(f"{cost_day:.2f}")
up = time.time() - self.start_time
self.uptime_var.set(f"{int(up//3600)}h {int((up%3600)//60)}m {int(up%60)}s")
except Exception:
pass
finally:
self.after(UI_REFRESH_MS, self.refresh_ui)
if __name__ == "__main__":
app = EnergyApp()
app.mainloop()
Comments
Post a Comment