IoT Cybersecurity with Federated Learning + Reinforcement Learning in Python | FuzzuTech

 Demo :


Click Video πŸ‘‡πŸ‘‡πŸ‘‡



























Features:

✅ Embedded YouTube Video
✅ Keywords for SEO
✅ Tags from above
✅ Attractive thumbnail


Code :


# iot_federated_rl_gui.py

# Single-file: Python – IoT & Autonomous Protection with Federated Learning + RL (CustomTkinter GUI)

# Author: FuzzuTech

# Notes:

# - No external ML frameworks used; lightweight FedAvg + Logistic Regression (from scratch) using NumPy.

# - RL via tabular Q-learning with epsilon-greedy policy.

# - Live simulator of IoT traffic + attack scenarios (Normal, PortScan, BruteForce, DDoS).

# - Actions: Block, Throttle, Isolate, Honeypot. Reward balances damage vs. mitigation cost.

# - GUI: CustomTkinter with tabs (Dashboard, Simulator, Federated Learning, Logs, Settings).


import sys

import time

import json

import threading

import random

from collections import deque, defaultdict

import numpy as np


import tkinter as tk

import customtkinter as ctk

from tkinter import messagebox, filedialog


from matplotlib.figure import Figure

from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg


# --------------------------

# Utilities & Theme

# --------------------------

APP_TITLE = "FuzzuTech – IoT Autonomous Protection (FL + RL)"

ctk.set_appearance_mode("dark")

ctk.set_default_color_theme("dark-blue")


RNG = np.random.default_rng(42)


# --------------------------

# Simulated Traffic & Attacks

# --------------------------

ATTACK_TYPES = ["Normal", "PortScan", "BruteForce", "DDoS"]

ATTACK_BASE_RISK = {

    "Normal": 0.02,

    "PortScan": 0.35,

    "BruteForce": 0.55,

    "DDoS": 0.8,

}


def synth_traffic_batch(n=256, attack_bias=None):

    """

    Generates synthetic IoT traffic features.

    Features: [bytes_k, pkts, port_entropy, failed_auth, syn_rate]

    Label: 0 normal, 1 attack

    attack_bias: weights over attack types to skew dataset per client

    """

    if attack_bias is None:

        attack_bias = np.array([0.70, 0.10, 0.12, 0.08])

    attack_bias = attack_bias / attack_bias.sum()

    types = RNG.choice(ATTACK_TYPES, size=n, p=attack_bias)


    # Base distributions

    bytes_k = RNG.normal(20, 6, n)  # thousands of bytes

    pkts = RNG.normal(80, 25, n)

    port_entropy = RNG.normal(2.5, 0.5, n)

    failed_auth = np.clip(RNG.poisson(0.5, n), 0, None)

    syn_rate = RNG.normal(30, 8, n)


    # Attack shifts

    for i, t in enumerate(types):

        if t == "PortScan":

            port_entropy[i] += abs(RNG.normal(1.2, 0.3))

            syn_rate[i] += abs(RNG.normal(20, 6))

        elif t == "BruteForce":

            failed_auth[i] += abs(RNG.normal(6, 2))

            pkts[i] += abs(RNG.normal(60, 15))

        elif t == "DDoS":

            bytes_k[i] += abs(RNG.normal(200, 60))

            pkts[i] += abs(RNG.normal(500, 120))

            syn_rate[i] += abs(RNG.normal(80, 20))


    X = np.stack([bytes_k, pkts, port_entropy, failed_auth, syn_rate], axis=1)

    # Normalize a bit

    X = (X - X.mean(axis=0)) / (X.std(axis=0) + 1e-6)


    y = np.array([0 if t == "Normal" else 1 for t in types], dtype=int)

    return X, y, types


# --------------------------

# Federated Learning: Logistic Regression + FedAvg

# --------------------------

class LogisticRegressionTiny:

    def __init__(self, n_features):

        self.w = RNG.normal(0, 0.1, n_features)

        self.b = 0.0


    def predict_proba(self, X):

        z = X @ self.w + self.b

        return 1.0 / (1.0 + np.exp(-z))


    def predict(self, X, thr=0.5):

        p = self.predict_proba(X)

        return (p >= thr).astype(int)


    def loss_grad(self, X, y):

        p = self.predict_proba(X)

        # Binary cross-entropy

        eps = 1e-8

        loss = -np.mean(y * np.log(p + eps) + (1 - y) * np.log(1 - p + eps))

        # Gradients

        diff = (p - y)

        dw = X.T @ diff / X.shape[0]

        db = np.mean(diff)

        return loss, dw, db


    def fit_epoch(self, X, y, lr=0.05, l2=1e-4):

        loss, dw, db = self.loss_grad(X, y)

        self.w -= lr * (dw + l2 * self.w)

        self.b -= lr * db

        return loss


    def get_weights(self):

        return self.w.copy(), float(self.b)


    def set_weights(self, w, b):

        self.w = w.copy()

        self.b = float(b)


class FederatedServer:

    def __init__(self, n_features, n_clients=5, client_sizes=None):

        self.n_features = n_features

        self.global_model = LogisticRegressionTiny(n_features)

        self.n_clients = n_clients

        if client_sizes is None:

            self.client_sizes = [512] * n_clients

        else:

            self.client_sizes = client_sizes

        # Non-IID biases

        self.client_biases = []

        for i in range(n_clients):

            # Skew each client toward different attacks

            if i == 0:

                bias = np.array([0.85, 0.05, 0.06, 0.04])  # mostly normal

            elif i == 1:

                bias = np.array([0.55, 0.25, 0.15, 0.05])  # PortScan-prone

            elif i == 2:

                bias = np.array([0.60, 0.08, 0.25, 0.07])  # BruteForce-prone

            elif i == 3:

                bias = np.array([0.65, 0.10, 0.10, 0.15])  # some DDoS

            else:

                bias = np.array([0.70, 0.10, 0.12, 0.08])

            self.client_biases.append(bias)


    def round(self, epochs=2, lr=0.05):

        client_weights = []

        sizes = []

        for i in range(self.n_clients):

            Xi, yi, _ = synth_traffic_batch(self.client_sizes[i], self.client_biases[i])

            # local copy

            local = LogisticRegressionTiny(self.n_features)

            local.set_weights(*self.global_model.get_weights())

            # local train

            for _ in range(epochs):

                _ = local.fit_epoch(Xi, yi, lr=lr)

            client_weights.append(local.get_weights())

            sizes.append(len(yi))


        # FedAvg

        total = sum(sizes)

        avg_w = np.zeros_like(self.global_model.w)

        avg_b = 0.0

        for (w, b), m in zip(client_weights, sizes):

            avg_w += (m / total) * w

            avg_b += (m / total) * b

        self.global_model.set_weights(avg_w, avg_b)


        # Evaluate global

        Xval, yval, _ = synth_traffic_batch(512)

        preds = self.global_model.predict(Xval)

        acc = (preds == yval).mean()

        return acc


    def anomaly_score(self, x_row):

        # Probability of attack (0..1)

        p = self.global_model.predict_proba(x_row.reshape(1, -1))[0]

        return float(p)


# --------------------------

# RL Agent (Q-learning)

# --------------------------

ACTIONS = ["Block", "Throttle", "Isolate", "Honeypot"]

ACTION_COST = {

    "Block": 0.10,

    "Throttle": 0.06,

    "Isolate": 0.08,

    "Honeypot": 0.04,

}


def discretize(value, bins):

    # returns 0..bins-1

    v = max(0.0, min(0.9999, float(value)))

    return int(v * bins)


class RLAgent:

    def __init__(self, n_bins=6, gamma=0.92, alpha=0.25, epsilon=0.15):

        # state: (anomaly_bin 0..n_bins-1, last_attack 0..3)

        self.n_bins = n_bins

        self.gamma = gamma

        self.alpha = alpha

        self.epsilon = epsilon

        self.Q = defaultdict(lambda: np.zeros(len(ACTIONS)))

        self.last_state = None

        self.last_action = None


    def select_action(self, state):

        if RNG.random() < self.epsilon:

            return RNG.integers(0, len(ACTIONS))

        q = self.Q[state]

        return int(np.argmax(q))


    def update(self, s, a, r, s2, done=False):

        qsa = self.Q[s][a]

        max_next = 0.0 if done else np.max(self.Q[s2])

        target = r + self.gamma * max_next

        self.Q[s][a] = qsa + self.alpha * (target - qsa)


# --------------------------

# Environment Dynamics

# --------------------------

def environment_damage(attack_type, base_prob):

    # Expected damage if not mitigated

    scale = {

        "Normal": 0.0,

        "PortScan": 0.3,

        "BruteForce": 0.6,

        "DDoS": 1.0,

    }[attack_type]

    # base_prob ~ anomaly probability

    return scale * (0.5 + 0.5 * base_prob)


def mitigation_effect(action, attack_type):

    # Higher is better mitigation (reduces damage)

    base = {

        "Block": 0.85,

        "Throttle": 0.55,

        "Isolate": 0.7,

        "Honeypot": 0.45,

    }[action]

    # Some actions are more/less effective per attack

    bonus = 0.0

    if attack_type == "PortScan" and action in ("Honeypot", "Block"):

        bonus += 0.1

    if attack_type == "BruteForce" and action in ("Block", "Isolate"):

        bonus += 0.1

    if attack_type == "DDoS" and action in ("Throttle", "Isolate"):

        bonus += 0.1

    return min(0.95, base + bonus)


def step_env(fl_server, rl_agent, last_attack_type):

    # Generate one traffic row with random attack type

    X, y, types = synth_traffic_batch(n=1)

    x = X[0]

    attack_type = types[0]

    anomaly = fl_server.anomaly_score(x)  # 0..1

    a_bin = discretize(anomaly, rl_agent.n_bins)

    last_idx = ATTACK_TYPES.index(last_attack_type) if last_attack_type in ATTACK_TYPES else 0

    state = (a_bin, last_idx)


    # RL choose action

    a_idx = rl_agent.select_action(state)

    action = ACTIONS[a_idx]


    # Compute reward: negative damage + small penalty for action cost + small bonus for accurate mitigation when attack present

    base_prob = anomaly

    dmg = environment_damage(attack_type, base_prob)

    effect = mitigation_effect(action, attack_type)

    mitigated_damage = dmg * (1 - effect)

    action_penalty = ACTION_COST[action]

    # Add tiny reward for avoiding over-mitigation on Normal

    over_mitigate = 0.04 if attack_type == "Normal" else 0.0


    reward = -mitigated_damage - action_penalty + (0.06 if (attack_type != "Normal" and effect > 0.6) else 0.0) - over_mitigate


    # Next state (use same anomaly for simplicity or regenerate)

    next_state = (a_bin, ATTACK_TYPES.index(attack_type))

    rl_agent.update(state, a_idx, reward, next_state, done=False)


    outcome = {

        "attack_type": attack_type,

        "anomaly": anomaly,

        "action": action,

        "reward": reward,

        "mitigated_damage": mitigated_damage,

        "raw_damage": dmg,

    }

    return outcome


# --------------------------

# GUI Application

# --------------------------

class App(ctk.CTk):

    def __init__(self):

        super().__init__()

        self.title(APP_TITLE)

        self.geometry("1150x720")

        self.minsize(1000, 640)


        # State

        self.running = False

        self.loop_thread = None


        # Core models

        self.server = FederatedServer(n_features=5, n_clients=5)

        self.rl = RLAgent(n_bins=6, gamma=0.93, alpha=0.22, epsilon=0.12)


        # Metrics

        self.cum_reward = 0.0

        self.last_attack = "Normal"

        self.history_len = 150

        self.anomaly_hist = deque(maxlen=self.history_len)

        self.reward_hist = deque(maxlen=self.history_len)


        # Layout

        self.build_header()

        self.build_tabs()

        self.after(300, self.refresh_labels)


    def build_header(self):

        top = ctk.CTkFrame(self, height=60, corner_radius=0)

        top.pack(fill="x", padx=0, pady=0)


        title = ctk.CTkLabel(top, text=APP_TITLE, font=ctk.CTkFont(size=20, weight="bold"))

        title.pack(side="left", padx=16, pady=12)


        self.status_label = ctk.CTkLabel(top, text="Status: Idle", font=ctk.CTkFont(size=14))

        self.status_label.pack(side="right", padx=16)


    def build_tabs(self):

        self.tabs = ctk.CTkTabview(self)

        self.tabs.pack(expand=True, fill="both", padx=10, pady=10)


        self.tab_dash = self.tabs.add("Dashboard")

        self.tab_sim = self.tabs.add("Simulator")

        self.tab_fed = self.tabs.add("Federated Learning")

        self.tab_logs = self.tabs.add("Logs")

        self.tab_settings = self.tabs.add("Settings")


        self.build_dashboard()

        self.build_simulator()

        self.build_federated()

        self.build_logs()

        self.build_settings()


    # ----- Dashboard -----

    def build_dashboard(self):

        grid = self.tab_dash

        grid.grid_columnconfigure((0,1,2,3), weight=1)

        grid.grid_rowconfigure((0,1,2), weight=1)


        self.card_anom = self.metric_card(grid, "Anomaly Score", "0.00", 0, 0)

        self.card_attk = self.metric_card(grid, "Attack Type", "—", 0, 1)

        self.card_actn = self.metric_card(grid, "RL Action", "—", 0, 2)

        self.card_rewd = self.metric_card(grid, "Cumulative Reward", "0.00", 0, 3)


        # Chart: Anomaly & Reward

        fig = Figure(figsize=(8, 3), dpi=100)

        self.ax1 = fig.add_subplot(111)

        self.ax1.set_title("Live Anomaly (0–1) & Reward")

        self.ax1.set_ylim(-1.2, 1.2)

        self.ax1.set_xlabel("Steps")

        self.ax1.grid(alpha=0.3)


        self.canvas = FigureCanvasTkAgg(fig, master=grid)

        self.canvas.get_tk_widget().grid(row=1, column=0, columnspan=4, sticky="nsew", padx=8, pady=8)


        # Controls

        btns = ctk.CTkFrame(grid)

        btns.grid(row=2, column=0, columnspan=4, sticky="ew", padx=4, pady=6)

        for i in range(6):

            btns.grid_columnconfigure(i, weight=1)


        self.btn_start = ctk.CTkButton(btns, text="▶ Start Simulation", command=self.start_sim)

        self.btn_pause = ctk.CTkButton(btns, text="⏸ Pause", command=self.pause_sim)

        self.btn_step = ctk.CTkButton(btns, text="⏭ Step Once", command=self.step_once)

        self.btn_reset = ctk.CTkButton(btns, text="⟲ Reset", command=self.reset_all)

        self.btn_train = ctk.CTkButton(btns, text="⚙ Train 1 Fed Round", command=self.train_one_round)

        self.btn_trainn = ctk.CTkButton(btns, text="⚙⚙ Train 5 Fed Rounds", command=lambda: self.train_n_rounds(5))


        self.btn_start.grid(row=0, column=0, padx=6, pady=6, sticky="ew")

        self.btn_pause.grid(row=0, column=1, padx=6, pady=6, sticky="ew")

        self.btn_step.grid(row=0, column=2, padx=6, pady=6, sticky="ew")

        self.btn_reset.grid(row=0, column=3, padx=6, pady=6, sticky="ew")

        self.btn_train.grid(row=0, column=4, padx=6, pady=6, sticky="ew")

        self.btn_trainn.grid(row=0, column=5, padx=6, pady=6, sticky="ew")


    def metric_card(self, parent, title, value, r, c):

        card = ctk.CTkFrame(parent, corner_radius=16)

        card.grid(row=r, column=c, padx=6, pady=6, sticky="nsew")

        card.grid_rowconfigure((0,1), weight=1)

        card.grid_columnconfigure(0, weight=1)

        t = ctk.CTkLabel(card, text=title, font=ctk.CTkFont(size=14))

        v = ctk.CTkLabel(card, text=value, font=ctk.CTkFont(size=24, weight="bold"))

        t.pack(pady=(10,0))

        v.pack(pady=(2,10))

        return v


    # ----- Simulator -----

    def build_simulator(self):

        frame = self.tab_sim

        frame.grid_columnconfigure((0,1), weight=1)

        frame.grid_rowconfigure((0,1), weight=1)


        left = ctk.CTkFrame(frame)

        right = ctk.CTkFrame(frame)

        left.grid(row=0, column=0, sticky="nsew", padx=8, pady=8)

        right.grid(row=0, column=1, sticky="nsew", padx=8, pady=8)


        # Left: live text

        self.sim_text = tk.Text(left, height=20, bg="#0f1116", fg="#d7dae0", insertbackground="#d7dae0")

        self.sim_text.pack(expand=True, fill="both", padx=8, pady=8)


        # Right: controls

        c1 = ctk.CTkLabel(right, text="Simulation Speed (ms/step)")

        self.speed = ctk.CTkSlider(right, from_=50, to=1000, number_of_steps=19)

        self.speed.set(250)

        c2 = ctk.CTkLabel(right, text="RL Ξ΅ (exploration)")

        self.eps = ctk.CTkSlider(right, from_=0.0, to=0.5, number_of_steps=50, command=self.on_eps)

        self.eps.set(self.rl.epsilon)

        c3 = ctk.CTkLabel(right, text="RL Ξ± (learning rate)")

        self.alpha = ctk.CTkSlider(right, from_=0.05, to=0.5, number_of_steps=45, command=self.on_alpha)

        self.alpha.set(self.rl.alpha)

        c4 = ctk.CTkLabel(right, text="RL Ξ³ (discount)")

        self.gamma = ctk.CTkSlider(right, from_=0.7, to=0.99, number_of_steps=29, command=self.on_gamma)

        self.gamma.set(self.rl.gamma)


        c1.pack(pady=(20,4)); self.speed.pack(pady=(0,16), fill="x", padx=16)

        c2.pack(pady=(6,4)); self.eps.pack(pady=(0,16), fill="x", padx=16)

        c3.pack(pady=(6,4)); self.alpha.pack(pady=(0,16), fill="x", padx=16)

        c4.pack(pady=(6,4)); self.gamma.pack(pady=(0,16), fill="x", padx=16)


        self.lbl_hint = ctk.CTkLabel(right, text="Tip: Train a few FL rounds before running\nto improve anomaly scores.", justify="center")

        self.lbl_hint.pack(pady=8)


    def on_eps(self, val): self.rl.epsilon = float(val)

    def on_alpha(self, val): self.rl.alpha = float(val)

    def on_gamma(self, val): self.rl.gamma = float(val)


    # ----- Federated Learning -----

    def build_federated(self):

        frame = self.tab_fed

        frame.grid_columnconfigure((0,1), weight=1)

        frame.grid_rowconfigure((0,1,2), weight=1)


        self.acc_label = ctk.CTkLabel(frame, text="Global Model Acc: —", font=ctk.CTkFont(size=14, weight="bold"))

        self.acc_label.grid(row=0, column=0, sticky="w", padx=10, pady=8)


        btn_round = ctk.CTkButton(frame, text="Train 1 Round", command=self.train_one_round)

        btn_round5 = ctk.CTkButton(frame, text="Train 5 Rounds", command=lambda: self.train_n_rounds(5))

        btn_round10 = ctk.CTkButton(frame, text="Train 10 Rounds", command=lambda: self.train_n_rounds(10))

        btn_round.grid(row=0, column=1, padx=6, pady=6, sticky="e")

        btn_round5.grid(row=0, column=1, padx=6, pady=6)

        btn_round10.grid(row=0, column=1, padx=6, pady=6, sticky="w")


        # Model I/O

        io = ctk.CTkFrame(frame)

        io.grid(row=1, column=0, columnspan=2, sticky="ew", padx=8, pady=8)

        io.grid_columnconfigure((0,1,2,3), weight=1)

        btn_save = ctk.CTkButton(io, text="πŸ’Ύ Save Global Model", command=self.save_model)

        btn_load = ctk.CTkButton(io, text="πŸ“‚ Load Global Model", command=self.load_model)

        btn_save.grid(row=0, column=0, padx=6, pady=6, sticky="ew")

        btn_load.grid(row=0, column=1, padx=6, pady=6, sticky="ew")


        # Quick Evaluate

        btn_eval = ctk.CTkButton(io, text="πŸ“Š Evaluate (Fresh Data)", command=self.evaluate_global)

        btn_eval.grid(row=0, column=2, padx=6, pady=6, sticky="ew")


        self.eval_out = ctk.CTkTextbox(frame, height=220)

        self.eval_out.grid(row=2, column=0, columnspan=2, sticky="nsew", padx=8, pady=8)


    # ----- Logs -----

    def build_logs(self):

        frame = self.tab_logs

        frame.grid_columnconfigure(0, weight=1)

        frame.grid_rowconfigure(0, weight=1)

        self.log_box = tk.Text(frame, bg="#0f1116", fg="#d7dae0", insertbackground="#d7dae0")

        self.log_box.grid(row=0, column=0, sticky="nsew", padx=8, pady=8)


        bar = ctk.CTkFrame(frame)

        bar.grid(row=1, column=0, sticky="ew", padx=6, pady=6)

        btn_clear = ctk.CTkButton(bar, text="Clear Logs", command=lambda: self.log_box.delete("1.0", tk.END))

        btn_export = ctk.CTkButton(bar, text="Export Logs", command=self.export_logs)

        btn_clear.pack(side="left", padx=4)

        btn_export.pack(side="left", padx=4)


    # ----- Settings -----

    def build_settings(self):

        frame = self.tab_settings

        frame.grid_columnconfigure((0,1), weight=1)


        about = ("Settings & About\n"

                 "- Appearance: Dark (CustomTkinter)\n"

                 "- Adjust RL hyperparameters in Simulator tab.\n"

                 "- Train FL rounds before/while running to improve anomaly detection.\n"

                 "- Save/Load global model in Federated tab.\n"

                 "- This is a simulation for education & demo purposes.\n")

        lbl = ctk.CTkLabel(frame, text=about, justify="left")

        lbl.grid(row=0, column=0, sticky="w", padx=12, pady=12)


    # --------------------------

    # Actions

    # --------------------------

    def start_sim(self):

        if self.running:

            return

        self.running = True

        self.status_label.configure(text="Status: Running")

        self.loop_thread = threading.Thread(target=self.loop_run, daemon=True)

        self.loop_thread.start()


    def pause_sim(self):

        self.running = False

        self.status_label.configure(text="Status: Paused")


    def step_once(self):

        self.run_one_step()


    def reset_all(self):

        self.running = False

        self.cum_reward = 0.0

        self.last_attack = "Normal"

        self.anomaly_hist.clear()

        self.reward_hist.clear()

        self.ax1.cla()

        self.ax1.set_title("Live Anomaly (0–1) & Reward")

        self.ax1.set_ylim(-1.2, 1.2)

        self.ax1.set_xlabel("Steps")

        self.ax1.grid(alpha=0.3)

        self.canvas.draw()

        self.status_label.configure(text="Status: Reset")

        self.log("System reset.")


    def train_one_round(self):

        acc = self.server.round(epochs=2, lr=0.06)

        self.acc_label.configure(text=f"Global Model Acc: {acc*100:.2f}%")

        self.log(f"FL Round complete. Global accuracy: {acc*100:.2f}%")


    def train_n_rounds(self, n):

        for i in range(n):

            acc = self.server.round(epochs=2, lr=0.06)

            self.acc_label.configure(text=f"Global Model Acc: {acc*100:.2f}%")

            self.log(f"FL Round {i+1}/{n} done. Acc: {acc*100:.2f}%")


    def evaluate_global(self):

        X, y, _ = synth_traffic_batch(1024)

        preds = self.server.global_model.predict(X)

        acc = (preds == y).mean()

        p_attack = self.server.global_model.predict_proba(X)

        msg = f"[Evaluation]\nSamples: {len(y)}\nAccuracy: {acc*100:.2f}%\nAvg attack prob: {p_attack.mean():.3f}\n"

        self.eval_out.delete("1.0", tk.END)

        self.eval_out.insert(tk.END, msg)

        self.log(msg.strip())


    def save_model(self):

        path = filedialog.asksaveasfilename(defaultextension=".json", filetypes=[("JSON","*.json")])

        if not path: return

        w, b = self.server.global_model.get_weights()

        with open(path, "w", encoding="utf-8") as f:

            json.dump({"w": w.tolist(), "b": b}, f)

        self.log(f"Model saved to {path}")

        messagebox.showinfo("Saved", "Global model saved.")


    def load_model(self):

        path = filedialog.askopenfilename(filetypes=[("JSON","*.json")])

        if not path: return

        with open(path, "r", encoding="utf-8") as f:

            data = json.load(f)

        self.server.global_model.set_weights(np.array(data["w"], dtype=float), float(data["b"]))

        self.log(f"Model loaded from {path}")

        messagebox.showinfo("Loaded", "Global model loaded.")


    def export_logs(self):

        path = filedialog.asksaveasfilename(defaultextension=".log", filetypes=[("Log","*.log"), ("Text","*.txt")])

        if not path: return

        text = self.log_box.get("1.0", tk.END)

        with open(path, "w", encoding="utf-8") as f:

            f.write(text)

        messagebox.showinfo("Exported", "Logs exported.")


    # --------------------------

    # Loop & Step

    # --------------------------

    def loop_run(self):

        while self.running:

            self.run_one_step()

            delay = int(self.speed.get())

            time.sleep(max(0.03, delay/1000.0))


    def run_one_step(self):

        outcome = step_env(self.server, self.rl, self.last_attack)

        self.last_attack = outcome["attack_type"]

        self.cum_reward += outcome["reward"]


        # Logs on Simulator tab

        self.sim_text.insert(tk.END, f"[{time.strftime('%H:%M:%S')}] "

                                     f"Type={outcome['attack_type']} | "

                                     f"Anom={outcome['anomaly']:.2f} | "

                                     f"Act={outcome['action']} | "

                                     f"R={outcome['reward']:.3f}\n")

        self.sim_text.see(tk.END)


        # Charts

        self.anomaly_hist.append(outcome["anomaly"])

        self.reward_hist.append(float(np.clip(outcome["reward"], -1.0, 1.0)))


        self.ax1.cla()

        self.ax1.set_title("Live Anomaly (0–1) & Reward")

        self.ax1.set_ylim(-1.2, 1.2)

        self.ax1.set_xlabel("Steps")

        self.ax1.grid(alpha=0.3)

        self.ax1.plot(list(self.anomaly_hist), label="Anomaly")

        self.ax1.plot(list(self.reward_hist), label="Reward")

        self.ax1.legend(loc="upper right")

        self.canvas.draw()


        # Cards

        self.card_anom.configure(text=f"{outcome['anomaly']:.2f}")

        self.card_attk.configure(text=outcome["attack_type"])

        self.card_actn.configure(text=outcome["action"])

        self.card_rewd.configure(text=f"{self.cum_reward:.2f}")


        # Logs tab (less frequent)

        self.log(f"Event: {outcome['attack_type']}, Anom={outcome['anomaly']:.2f}, "

                 f"Act={outcome['action']}, Reward={outcome['reward']:.3f}", lite=True)


    def refresh_labels(self):

        # periodic small updates if needed

        self.after(600, self.refresh_labels)


    def log(self, msg, lite=False):

        if not lite:

            self.log_box.insert(tk.END, msg + "\n")

            self.log_box.see(tk.END)


# --------------------------

# Main

# --------------------------

if __name__ == "__main__":

    app = App()

    app.mainloop()

Comments

Popular posts from this blog

πŸš€ Simple Login & Registration System in Python Tkinter πŸ“±

πŸ“‘ Fuzzu Packet Sniffer – Python GUI for Real-Time IP Monitoring | Tkinter + Scapy

πŸ”₯ Advanced MP3 Music Player in Python | CustomTkinter + Pygame | Free Source Code