🚨 Python Intrusion Detection System (IDS) – Real-Time ML + Tkinter GUI Project | FuzzuTech

 Demo :


Click Video πŸ‘‡πŸ‘‡πŸ‘‡































Features:
✔ SEO keywords included (IDS, ML, Tkinter, Python GUI)
✔ Embedded YouTube Short
✔ Full code snippet (main.py)
✔ Download links (GitHub/GDrive optional)
✔ Call-to-action: Subscribe + Follow socials


Code :


"""

main.py

Single-file IDS demo:

- Generates synthetic network-like dataset (if missing)

- Trains IsolationForest model and saves model_iforest.pkl

- Runs a modern-looking Tkinter GUI to simulate live packets,

  score them with the model and highlight anomalies.


Usage:

1) Create and activate a venv (recommended)

2) pip install -r requirements.txt

3) python main.py

"""


import os

import threading

import time

import random

import sys

import queue

from pathlib import Path

import tkinter as tk

from tkinter import ttk, messagebox


# --- third-party libs ---

try:

    import numpy as np

    import pandas as pd

    from sklearn.ensemble import IsolationForest

    import joblib

except Exception as e:

    print("Missing packages. Install with:\n pip install numpy pandas scikit-learn joblib")

    raise e


# ----------------------------

# Config / Paths

# ----------------------------

BASE_DIR = Path(__file__).resolve().parent

MODEL_PATH = BASE_DIR / "model_iforest.pkl"

DATA_PATH = BASE_DIR / "sample_network.csv"


# GUI Colors (dark-modern)

ACCENT_BG = "#0f172a"   # slate-900

CARD_BG   = "#111827"   # gray-900

ACCENT    = "#22d3ee"   # cyan-400

TEXT      = "#e5e7eb"   # gray-200

DANGER    = "#ef4444"   # red-500

SUCCESS   = "#16a34a"   # green-600

WARN      = "#f59e0b"   # amber-500


# ----------------------------

# Feature helpers (inline)

# ----------------------------

PROTO_MAP = {"tcp": 0, "udp": 1, "icmp": 2}


NUM_FEATURES = [

    "duration",

    "src_bytes",

    "dst_bytes",

    "pkts",

    "conn_rate",

    "failed_logins"

]


ALL_FEATURES = ["protocol"] + NUM_FEATURES


def encode_protocol(proto: str) -> int:

    return PROTO_MAP.get(str(proto).lower(), 0)


def row_to_features(row: dict) -> np.ndarray:

    proto = encode_protocol(row.get("protocol", "tcp"))

    vals = [

        float(row.get("duration", 0)),

        float(row.get("src_bytes", 0)),

        float(row.get("dst_bytes", 0)),

        float(row.get("pkts", 0)),

        float(row.get("conn_rate", 0)),

        float(row.get("failed_logins", 0)),

    ]

    return np.array([proto] + vals, dtype=float).reshape(1, -1)


# ----------------------------

# Synthetic data generator

# ----------------------------

def random_packet(anomalous=False):

    proto = random.choice(list(PROTO_MAP.keys()))

    if not anomalous:

        return {

            "protocol": proto,

            "duration": max(0.0, random.gauss(0.8, 0.5)),

            "src_bytes": abs(int(random.gauss(4000, 1200))),

            "dst_bytes": abs(int(random.gauss(3800, 1100))),

            "pkts": max(1, int(random.gauss(8, 3))),

            "conn_rate": max(0.01, random.gauss(2.5, 0.7)),

            "failed_logins": max(0, int(abs(random.gauss(0.1, 0.4))))

        }

    else:

        # anomaly: unusual large values or repeated failed logins

        return {

            "protocol": proto,

            "duration": max(0.0, random.gauss(12, 6)),

            "src_bytes": abs(int(random.gauss(120000, 50000))),

            "dst_bytes": abs(int(random.gauss(150000, 60000))),

            "pkts": max(1, int(random.gauss(250, 80))),

            "conn_rate": max(0.01, random.gauss(25, 10)),

            "failed_logins": max(1, int(abs(random.gauss(8, 4))))

        }


def generate_synthetic_csv(path: Path, n_normal=3000, n_anom=300):

    rows = []

    for _ in range(n_normal):

        rows.append(random_packet(anomalous=False))

    for _ in range(n_anom):

        rows.append(random_packet(anomalous=True))

    random.shuffle(rows)

    df = pd.DataFrame(rows)

    path.parent.mkdir(parents=True, exist_ok=True)

    df.to_csv(path, index=False)

    print(f"Synthetic dataset saved to {path}")


# ----------------------------

# Train model

# ----------------------------

def train_and_save_model(data_path: Path, model_path: Path):

    print("Training model...")

    if not data_path.exists():

        print("Data missing – generating synthetic data...")

        generate_synthetic_csv(data_path)


    df = pd.read_csv(data_path)

    # preprocess: ensure columns exist

    for col in ALL_FEATURES:

        if col not in df.columns:

            # fill missing numeric columns with zeros and protocol with 'tcp'

            if col == "protocol":

                df[col] = "tcp"

            else:

                df[col] = 0.0

    # map protocol and fillna

    df["protocol"] = df["protocol"].astype(str).str.lower().map(PROTO_MAP).fillna(0).astype(int)

    for col in NUM_FEATURES:

        df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0.0)


    X = df[["protocol"] + NUM_FEATURES].values


    model = IsolationForest(

        n_estimators=200,

        max_samples="auto",

        contamination=0.08,

        random_state=42,

        n_jobs=-1

    )

    model.fit(X)

    joblib.dump(model, model_path)

    print("Model trained and saved to", model_path)

    return model


# ----------------------------

# Tkinter GUI App

# ----------------------------

class IDSApp(tk.Tk):

    def __init__(self, model):

        super().__init__()

        self.title("IDS Monitor — Python ML + Tkinter")

        self.geometry("1000x640")

        self.minsize(900, 600)

        self.configure(bg=ACCENT_BG)

        self.model = model


        # stats

        self.running = False

        self.total_count = 0

        self.anomaly_count = 0


        # packet queue for thread-safe GUI updates

        self.q = queue.Queue(maxsize=200)


        self._build_ui()


        # start periodic GUI updater

        self.after(200, self._consume_queue)


    def _build_ui(self):

        # Header

        header = tk.Frame(self, bg=ACCENT_BG)

        header.pack(fill="x", padx=16, pady=(12,6))

        title = tk.Label(header, text="Intrusion Detection System (ML + GUI)",

                         bg=ACCENT_BG, fg=TEXT, font=("Segoe UI", 18, "bold"))

        title.pack(side="left")

        badge = tk.Label(header, text="Live", bg=ACCENT, fg="#000", font=("Segoe UI", 10, "bold"), padx=10, pady=4)

        badge.pack(side="right")


        # Control panel

        panel = tk.Frame(self, bg=CARD_BG)

        panel.pack(fill="x", padx=16, pady=(0,12))

        panel.columnconfigure(5, weight=1)


        self.start_btn = tk.Button(panel, text="Start", command=self.start_monitor,

                                   bg=SUCCESS, fg="#000", font=("Segoe UI", 11, "bold"))

        self.stop_btn  = tk.Button(panel, text="Stop", command=self.stop_monitor,

                                   bg=DANGER, fg="#000", font=("Segoe UI", 11, "bold"))

        self.clear_btn = tk.Button(panel, text="Clear", command=self.clear_table,

                                   bg=WARN, fg="#000", font=("Segoe UI", 11, "bold"))


        self.start_btn.grid(row=0, column=0, padx=8, pady=8, sticky="w")

        self.stop_btn.grid(row=0, column=1, padx=8, pady=8, sticky="w")

        self.clear_btn.grid(row=0, column=2, padx=8, pady=8, sticky="w")


        self.stat_total = tk.Label(panel, text="Total: 0", bg=CARD_BG, fg=TEXT, font=("Segoe UI", 11))

        self.stat_anom  = tk.Label(panel, text="Anomalies: 0", bg=CARD_BG, fg=TEXT, font=("Segoe UI", 11, "bold"))

        self.stat_rate  = tk.Label(panel, text="Anomaly Rate: 0.00%", bg=CARD_BG, fg=TEXT, font=("Segoe UI", 11))


        self.stat_total.grid(row=0, column=3, padx=8, sticky="e")

        self.stat_anom.grid(row=0, column=4, padx=8, sticky="e")

        self.stat_rate.grid(row=0, column=5, padx=8, sticky="e")


        # Treeview area

        container = tk.Frame(self, bg=ACCENT_BG)

        container.pack(fill="both", expand=True, padx=16, pady=(0,16))


        style = ttk.Style(self)

        style.theme_use("default")

        style.configure("Treeview", background=CARD_BG, fieldbackground=CARD_BG,

                        foreground=TEXT, rowheight=26, borderwidth=0)

        style.configure("Treeview.Heading", background="#0b1220", foreground=TEXT, font=("Segoe UI", 10, "bold"))

        style.map("Treeview", background=[("selected", "#1f2937")])


        cols = ("protocol", "duration", "src_bytes", "dst_bytes", "pkts", "conn_rate", "failed_logins", "score", "label")

        self.tree = ttk.Treeview(container, columns=cols, show="headings", selectmode="none")

        for c in cols:

            self.tree.heading(c, text=c.upper())

            self.tree.column(c, anchor="center", width=110)

        self.tree.column("protocol", width=90)

        self.tree.column("label", width=100)


        vsb = ttk.Scrollbar(container, orient="vertical", command=self.tree.yview)

        self.tree.configure(yscroll=vsb.set)

        self.tree.grid(row=0, column=0, sticky="nsew")

        vsb.grid(row=0, column=1, sticky="ns")


        # Statusbar

        sb = tk.Frame(self, bg=ACCENT_BG)

        sb.pack(fill="x", padx=16, pady=(0,12))

        self.status = tk.Label(sb, text="Ready", bg=ACCENT_BG, fg="#9ca3af", font=("Segoe UI", 10))

        self.status.pack(side="left")


    def start_monitor(self):

        if self.model is None:

            messagebox.showerror("Error", "Model missing. Train model first.")

            return

        if self.running:

            return

        self.running = True

        self.status.config(text="Monitoring…")

        self.thread = threading.Thread(target=self._producer_loop, daemon=True)

        self.thread.start()


    def stop_monitor(self):

        self.running = False

        self.status.config(text="Stopped")


    def clear_table(self):

        for i in self.tree.get_children():

            self.tree.delete(i)

        self.total_count = 0

        self.anomaly_count = 0

        self._update_stats()


    def _update_stats(self):

        self.stat_total.config(text=f"Total: {self.total_count}")

        self.stat_anom.config(text=f"Anomalies: {self.anomaly_count}")

        rate = 0 if self.total_count == 0 else (self.anomaly_count / self.total_count) * 100

        self.stat_rate.config(text=f"Anomaly Rate: {rate:.2f}%")


    def _producer_loop(self):

        # produce packets into queue; simulate some anomalies occasionally

        while self.running:

            # make anomalies rarer; but sometimes burst

            is_anom = (random.random() < 0.12)  # ~12% anomalies

            pkt = random_packet(anomalous=is_anom)

            features = row_to_features(pkt)

            try:

                score = float(self.model.decision_function(features)[0])

                pred = int(self.model.predict(features)[0])  # -1 anomaly, 1 normal

            except Exception as e:

                score = 0.0

                pred = 1

            label = "ANOMALY" if pred == -1 else "OK"

            self.q.put((pkt, score, label))

            time.sleep(0.6)


    def _consume_queue(self):

        # called in mainloop periodically

        try:

            while True:

                pkt, score, label = self.q.get_nowait()

                self._insert_row(pkt, score, label)

        except queue.Empty:

            pass

        finally:

            # schedule next

            self.after(200, self._consume_queue)


    def _insert_row(self, pkt, score, label):

        self.total_count += 1

        if label == "ANOMALY":

            self.anomaly_count += 1


        values = (

            pkt["protocol"],

            f"{pkt['duration']:.2f}",

            pkt["src_bytes"],

            pkt["dst_bytes"],

            pkt["pkts"],

            f"{pkt['conn_rate']:.2f}",

            pkt["failed_logins"],

            f"{score:.3f}",

            label

        )

        iid = self.tree.insert("", "end", values=values)

        # keep last N rows (avoid memory growth)

        max_rows = 200

        if len(self.tree.get_children()) > max_rows:

            first = self.tree.get_children()[0]

            self.tree.delete(first)

        # color tag

        self.tree.tag_configure("anom", foreground=DANGER)

        self.tree.tag_configure("ok", foreground=SUCCESS)

        self.tree.item(iid, tags=("anom" if label == "ANOMALY" else "ok"))

        self._update_stats()


# ----------------------------

# Main runner

# ----------------------------

def main():

    # load or train model

    model = None

    if MODEL_PATH.exists():

        try:

            model = joblib.load(MODEL_PATH)

            print("Loaded model from", MODEL_PATH)

        except Exception as e:

            print("Failed to load model (will retrain):", e)

            model = train_and_save_model(DATA_PATH, MODEL_PATH)

    else:

        # train and save

        model = train_and_save_model(DATA_PATH, MODEL_PATH)


    # start GUI

    app = IDSApp(model)

    app.mainloop()


if __name__ == "__main__":

    main()

Comments

Popular posts from this blog

πŸš€ Simple Login & Registration System in Python Tkinter πŸ“±

πŸ”₯ Advanced MP3 Music Player in Python | CustomTkinter + Pygame | Free Source Code

πŸš€ Create a Python Screen Recorder with Audio (Complete Code)