-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathatu(simple).py
53 lines (45 loc) · 1.91 KB
/
atu(simple).py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
from scapy.all import *
import threading
from sklearn.ensemble import IsolationForest
import numpy as np
# Global variables for storing features and model
features = []
model = IsolationForest(contamination=0.05)
# Function to process packets and extract features
def process_packet(packet):
if IP in packet:
if TCP in packet:
features.append([packet[IP].len, packet[TCP].sport, packet[TCP].dport])
elif UDP in packet:
features.append([packet[IP].len, packet[UDP].sport, packet[UDP].dport])
# Function to train the anomaly detection model
def train_model():
global model
X_train = np.array(features)
model.fit(X_train)
# Function to monitor traffic and detect anomalies
def traffic_monitor():
sniff(filter="(tcp or udp)", prn=process_packet)
train_model()
# Function to send alerts
def send_alert(alert_type, description):
print(f"Alert: {alert_type} - {description}")
# Start a separate thread to monitor traffic and train the model
traffic_thread = threading.Thread(target=traffic_monitor)
traffic_thread.start()
# Function to detect anomalies in real-time traffic
def detect_anomalies(packet):
global model
if IP in packet:
if TCP in packet:
features_test = [[packet[IP].len, packet[TCP].sport, packet[TCP].dport]]
y_pred = model.predict(features_test)
if y_pred[0] == -1:
send_alert("Anomaly detected", f"Possible malicious TCP connection from {packet[IP].src} to {packet[IP].dst}")
elif UDP in packet:
features_test = [[packet[IP].len, packet[UDP].sport, packet[UDP].dport]]
y_pred = model.predict(features_test)
if y_pred[0] == -1:
send_alert("Anomaly detected", f"Possible malicious UDP connection from {packet[IP].src} to {packet[IP].dst}")
# Start sniffing in real-time and detect anomalies
sniff(filter="(tcp or udp)", prn=detect_anomalies)