1
+ import json
2
+ import hashlib
3
+ import hmac
4
+ import base64
5
+ import requests
6
+ from datetime import datetime , timezone
7
+
8
+
9
+ default_log_type = 'SocketSecurityTool'
10
+
11
+
12
+ class Sentinel :
13
+ def __init__ (self , workspace_id : str , shared_key : str ):
14
+ """
15
+ Initializes the Microsoft Sentinel client with credentials and HTTP source URL.
16
+
17
+ :param workspace_id: The Microsoft Sentinel Customer ID
18
+ :param shared_key: The Microsoft Sentinel Shared Key
19
+ :param log_type: The Microsoft Sentinel Log Type
20
+ """
21
+ self .workspace_id = workspace_id
22
+ self .shared_key = shared_key
23
+ self .uri = f"https://{ self .workspace_id } .ods.opinsights.azure.com/api/logs?api-version=2016-04-01"
24
+
25
+ def _generate_signature (self , content_length : int , date : str ) -> str :
26
+ """
27
+ Generates the HMAC SHA256 signature required for authentication.
28
+
29
+ :param content_length: Length of the request body
30
+ :param date: Current date in RFC 1123 format
31
+ :return: Authorization signature string
32
+ """
33
+ method = 'POST'
34
+ content_type = 'application/json'
35
+ resource = '/api/logs'
36
+ x_headers = f"x-ms-date:{ date } "
37
+ string_to_hash = f"{ method } \n { content_length } \n { content_type } \n { x_headers } \n { resource } "
38
+ bytes_to_hash = bytes (string_to_hash , encoding = "utf-8" )
39
+
40
+ decoded_key = base64 .b64decode (self .shared_key )
41
+ hashed_string = hmac .new (decoded_key , bytes_to_hash , digestmod = hashlib .sha256 ).digest ()
42
+ signature = base64 .b64encode (hashed_string ).decode ()
43
+
44
+ return f"SharedKey { self .workspace_id } :{ signature } "
45
+
46
+ def send_events (self , events : list , log_type : str = default_log_type ) -> list :
47
+ """
48
+ Sends a single event to Microsoft Sentinel.
49
+
50
+ :param events: Dictionary representing the event data
51
+ :param log_type:
52
+ :return: Response from the Sentinel API
53
+ """
54
+ errors = []
55
+ events = Sentinel .normalize_events (events , log_type )
56
+ for event in events :
57
+ response = self .send_event (event , log_type )
58
+ if response ["status_code" ] != 200 :
59
+ errors .append (response )
60
+ return errors
61
+
62
+ def send_event (self , event_data : dict , log_type : str = default_log_type ) -> dict :
63
+ """
64
+ Sends a batch of events to a logging endpoint. This function serializes a
65
+ list of events into JSON format, computes the necessary authorization
66
+ headers, and sends them via an HTTP POST request to the configured logging
67
+ endpoint.
68
+
69
+ :param event_data: An event that is serialized to JSON
70
+ and sent to the logging endpoint.
71
+ :type event_data: dict
72
+ :param log_type: The type of log under which the events should be
73
+ categorized. Defaults to the class's `default_log_type`.
74
+ :type log_type: str, optional
75
+ :return: A dictionary with the HTTP response status code and response text
76
+ from the logging endpoint.
77
+ :rtype: dict
78
+ """
79
+ body = json .dumps (
80
+ {
81
+ "detail" : event_data ,
82
+ "SourceComputerId" : "socket-security-tools"
83
+ }
84
+ )
85
+ content_length = len (body )
86
+ rfc1123date = datetime .utcnow ().strftime ('%a, %d %b %Y %H:%M:%S GMT' )
87
+ authorization = self ._generate_signature (content_length , rfc1123date )
88
+
89
+ headers = {
90
+ "Content-Type" : "application/json" ,
91
+ "Authorization" : authorization ,
92
+ "Log-Type" : log_type ,
93
+ "x-ms-date" : rfc1123date
94
+ }
95
+
96
+ response = requests .post (self .uri , data = body , headers = headers )
97
+ return {
98
+ "status_code" : response .status_code ,
99
+ "response_text" : response .text
100
+ }
101
+
102
+ @staticmethod
103
+ def transform_gosec_event (event ):
104
+ """Transforms a Gosec security event into the correct Sentinel schema."""
105
+ return {
106
+ "TimeGenerated" : datetime .now (timezone .utc ).isoformat (),
107
+ "SourceComputerId" : event .get ("cwd" , "Unknown" ),
108
+ "OperationStatus" : event .get ("confidence" , "Unknown" ),
109
+ "Detail" : event .get ("details" , "Unknown" ),
110
+ "OperationCategory" : "Static Analysis" ,
111
+ "Solution" : event .get ("cwe" , {}).get ("url" , "No remediation guide available" ),
112
+ "Message" : event .get ("details" , "Unknown" ),
113
+ "FilePath" : event .get ("file" , "Unknown" ),
114
+ "URL" : event .get ("url" , "N/A" ),
115
+ "Timestamp" : event .get ("timestamp" , datetime .now (timezone .utc ).isoformat ()),
116
+ "Plugin" : "Gosec" ,
117
+ "Severity" : event .get ("severity" , "Unknown" ),
118
+ "RuleID" : event .get ("rule_id" , "Unknown" ),
119
+ "CWE_ID" : event .get ("cwe" , {}).get ("id" , "Unknown" ),
120
+ }
121
+
122
+ @staticmethod
123
+ def transform_bandit_event (event ):
124
+ """Transforms a Bandit security event into the correct Sentinel schema."""
125
+ return {
126
+ "TimeGenerated" : datetime .now (timezone .utc ).isoformat (),
127
+ "SourceComputerId" : event .get ("cwd" , "Unknown" ),
128
+ "OperationStatus" : event .get ("issue_severity" , "Unknown" ),
129
+ "Detail" : event .get ("issue_text" , "Unknown" ),
130
+ "OperationCategory" : event .get ("test_name" , "Static Analysis" ),
131
+ "Solution" : event .get ("more_info" , "No remediation guide available" ),
132
+ "Message" : event .get ("issue_text" , "Unknown issue" ),
133
+ "FilePath" : event .get ("filename" , "Unknown" ),
134
+ "URL" : event .get ("url" , "N/A" ),
135
+ "Timestamp" : event .get ("timestamp" , datetime .now (timezone .utc ).isoformat ()),
136
+ "Plugin" : "Bandit" ,
137
+ "Severity" : event .get ("issue_severity" , "Unknown" ),
138
+ "TestID" : event .get ("test_id" , "Unknown" ),
139
+ "CWE_ID" : event .get ("issue_cwe" , {}).get ("id" , "Unknown" ),
140
+ "CWE_Link" : event .get ("issue_cwe" , {}).get ("link" , "Unknown" )
141
+ }
142
+
143
+ @staticmethod
144
+ def transform_trufflehog_event (event ):
145
+ """Transforms a Trufflehog event into the correct Sentinel schema."""
146
+ return {
147
+ "TimeGenerated" : datetime .now (timezone .utc ).isoformat (),
148
+ "SourceComputerId" : event .get ("cwd" , "Unknown" ),
149
+ "OperationStatus" : "Success" if event .get ("Verified" , False ) else "Failure" ,
150
+ "Detail" : event .get ("DetectorName" , "Unknown Detection" ),
151
+ "OperationCategory" : event .get ("SourceName" , "Secret Scanning" ),
152
+ "Solution" : event .get ("ExtraData" , {}).get ("rotation_guide" , "No remediation guide available" ),
153
+ "Message" : event .get ("Raw" , "Potential secret detected" ),
154
+ "FilePath" : event .get ("SourceMetadata" , {}).get ("Data" , {}).get ("Filesystem" , {}).get ("file" , "Unknown" ),
155
+ "Timestamp" : event .get ("timestamp" , datetime .now (timezone .utc ).isoformat ()),
156
+ "Plugin" : "Trufflehog" ,
157
+ "Severity" : "HIGH" if not event .get ("Verified" , False ) else "LOW" ,
158
+ "SourceType" : event .get ("SourceType" , "Unknown" ),
159
+ "DetectorType" : event .get ("DetectorType" , "Unknown" )
160
+ }
161
+
162
+ @staticmethod
163
+ def normalize_events (events : list , plugin_name : str ):
164
+ """Detects event type and normalizes them for Sentinel ingestion."""
165
+ formatted_events = []
166
+
167
+ for event in events :
168
+ if isinstance (event , str ):
169
+ try :
170
+ event = json .loads (event ) # Convert from string if necessary
171
+ except json .JSONDecodeError :
172
+ print (f"Skipping invalid event: { event } " )
173
+ continue # Skip invalid JSON entries
174
+
175
+ if "plugin_name" in event :
176
+ if "bandit" in plugin_name .lower ():
177
+ formatted_events .append (Sentinel .transform_bandit_event (event ))
178
+ elif "trufflehog" in plugin_name .lower ():
179
+ formatted_events .append (Sentinel .transform_trufflehog_event (event ))
180
+ elif "gosec" in plugin_name .lower ():
181
+ formatted_events .append (Sentinel .transform_gosec_event (event ))
182
+ return formatted_events
0 commit comments