Skip to content

Commit 1937136

Browse files
authored
Merge pull request #2 from grafolean/fix/ipv6
Fix/ipv6
2 parents afb61f9 + d13697e commit 1937136

File tree

4 files changed

+55
-42
lines changed

4 files changed

+55
-42
lines changed

dbutils.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,4 +158,10 @@ def migration_step_3():
158158
def migration_step_4():
159159
with get_db_cursor() as c:
160160
# INTEGER (signed 32 bits) is not enough for in_bytes:
161-
c.execute(f"ALTER TABLE {DB_PREFIX}flows ALTER COLUMN in_bytes SET DATA TYPE BIGINT;")
161+
c.execute(f"ALTER TABLE {DB_PREFIX}flows ALTER COLUMN in_bytes SET DATA TYPE BIGINT;")
162+
163+
def migration_step_5():
164+
""" We want to save IPv6 addresses too """
165+
with get_db_cursor() as c:
166+
c.execute(f"ALTER TABLE {DB_PREFIX}flows RENAME COLUMN ipv4_dst_addr TO ipvX_dst_addr;")
167+
c.execute(f"ALTER TABLE {DB_PREFIX}flows RENAME COLUMN ipv4_src_addr TO ipvX_src_addr;")

netflowbot.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ def wrapper(*args, **kwargs):
5050

5151

5252
def path_part_encode(s):
53-
return s.replace(".", '%2e')
53+
return s.replace(".", '%2e').replace(":", '%3a')
5454

5555

5656
def _get_last_used_ts(job_id):
@@ -195,8 +195,8 @@ def perform_account_aggr_job(*args, **job_params):
195195
# l4_src_port | integer | source port
196196
# input_snmp | smallint | input interface index
197197
# output_snmp | smallint | output interface index
198-
# ipv4_dst_addr | inet | source IP
199-
# ipv4_src_addr | inet | destination IP
198+
# ipvX_dst_addr | inet | source IP
199+
# ipvX_src_addr | inet | destination IP
200200

201201
job_id = job_params["job_id"]
202202
interval_label = job_params["interval_label"]
@@ -348,7 +348,7 @@ def get_top_N_IPs_for_entity_interfaces(interval_label, last_used_ts, max_ts, ti
348348
for interface_index, in c.fetchall():
349349
c2.execute(f"""
350350
SELECT
351-
f.{'ipv4_src_addr' if direction == DIRECTION_INGRESS else 'ipv4_dst_addr'},
351+
f.{'ipvX_src_addr' if direction == DIRECTION_INGRESS else 'ipvX_dst_addr'},
352352
sum(f.in_bytes) "traffic"
353353
FROM
354354
{DB_PREFIX}flows "f"
@@ -359,7 +359,7 @@ def get_top_N_IPs_for_entity_interfaces(interval_label, last_used_ts, max_ts, ti
359359
f.direction = %s AND
360360
f.{'input_snmp' if direction == DIRECTION_INGRESS else 'output_snmp'} = %s
361361
GROUP BY
362-
f.{'ipv4_src_addr' if direction == DIRECTION_INGRESS else 'ipv4_dst_addr'}
362+
f.{'ipvX_src_addr' if direction == DIRECTION_INGRESS else 'ipvX_dst_addr'}
363363
ORDER BY
364364
traffic desc
365365
LIMIT {TOP_N_MAX};
@@ -382,7 +382,7 @@ def get_top_N_IPs_for_entity(interval_label, last_used_ts, max_ts, time_between,
382382
values = []
383383
c.execute(f"""
384384
SELECT
385-
f.{'ipv4_src_addr' if direction == DIRECTION_INGRESS else 'ipv4_dst_addr'},
385+
f.{'ipvX_src_addr' if direction == DIRECTION_INGRESS else 'ipvX_dst_addr'},
386386
sum(f.in_bytes) "traffic"
387387
FROM
388388
{DB_PREFIX}flows "f"
@@ -392,7 +392,7 @@ def get_top_N_IPs_for_entity(interval_label, last_used_ts, max_ts, time_between,
392392
f.ts <= %s AND
393393
f.direction = %s
394394
GROUP BY
395-
f.{'ipv4_src_addr' if direction == DIRECTION_INGRESS else 'ipv4_dst_addr'}
395+
f.{'ipvX_src_addr' if direction == DIRECTION_INGRESS else 'ipvX_dst_addr'}
396396
ORDER BY
397397
traffic desc
398398
LIMIT {TOP_N_MAX};
@@ -499,7 +499,7 @@ def get_top_N_connections_for_entity(interval_label, last_used_ts, max_ts, time_
499499
values = []
500500
c.execute(f"""
501501
SELECT
502-
f.ipv4_src_addr, f.ipv4_dst_addr,
502+
f.ipvX_src_addr, f.ipvX_dst_addr,
503503
sum(f.in_bytes) "traffic"
504504
FROM
505505
{DB_PREFIX}flows "f"
@@ -509,15 +509,15 @@ def get_top_N_connections_for_entity(interval_label, last_used_ts, max_ts, time_
509509
f.ts <= %s AND
510510
f.direction = %s
511511
GROUP BY
512-
f.ipv4_src_addr, f.ipv4_dst_addr
512+
f.ipvX_src_addr, f.ipvX_dst_addr
513513
ORDER BY
514514
traffic desc
515515
LIMIT {TOP_N_MAX};
516516
""", (entity_ip, last_used_ts, max_ts, direction,))
517517

518518
output_path_entity = NetFlowBot.construct_output_path_prefix(interval_label, direction, entity_id, interface=None)
519-
for ipv4_src_addr, ipv4_dst_addr, traffic_bytes in c.fetchall():
520-
output_path = f"{output_path_entity}.topconn.{path_part_encode(ipv4_src_addr)}.{path_part_encode(ipv4_dst_addr)}"
519+
for ipvX_src_addr, ipvX_dst_addr, traffic_bytes in c.fetchall():
520+
output_path = f"{output_path_entity}.topconn.{path_part_encode(ipvX_src_addr)}.{path_part_encode(ipvX_dst_addr)}"
521521
values.append({
522522
'p': output_path,
523523
'v': traffic_bytes / time_between, # Bps

netflowwriter.py

Lines changed: 36 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,13 @@
4141
# 11-byte signature (constructed in this way to detect possible mangled bytes), flags, header extension
4242
# https://www.postgresql.org/docs/9.0/sql-copy.html#AEN59377
4343
PG_COPYFROM_INIT = struct.pack('!11sII', b'PGCOPY\n\377\r\n\0', 0, 0)
44-
# 4-byte INETv4 prefix: family, netmask, is_cidr, n bytes
44+
# 4-byte INETv4/v6 prefix: family, netmask, is_cidr, n bytes
4545
# https://doxygen.postgresql.org/network_8c_source.html#l00193
46-
IPV4_PREFIX = struct.pack('!BBBB', socket.AF_INET, 32, 0, 4)
46+
IPV4_ADDRESS_PREFIX = struct.pack('!BBBB', socket.AF_INET, 32, 0, 4)
47+
# Gotcha: IPv6 address family in Postgres is *not* socket.AF_INET6 (10),
48+
# instead it is defined as socket.AF_INET + 1 (2 + 1 == 3)
49+
# https://doxygen.postgresql.org/utils_2inet_8h_source.html#l00040
50+
IPV6_ADDRESS_PREFIX = struct.pack('!BBBB', socket.AF_INET + 1, 128, 0, 16)
4751

4852

4953
def _pgwriter_init():
@@ -52,22 +56,30 @@ def _pgwriter_init():
5256
return pg_writer
5357

5458

55-
def _pgwriter_write(pgwriter, ts, client_ip, IN_BYTES, PROTOCOL, DIRECTION, L4_DST_PORT, L4_SRC_PORT, INPUT_SNMP, OUTPUT_SNMP, IPV4_DST_ADDR, IPV4_SRC_ADDR):
56-
buf = struct.pack('!HiIi4s4siQiHiHiIiIiHiHi4s4si4s4s',
59+
def _pgwriter_write(pgwriter, ts, client_ip, IN_BYTES, PROTOCOL, DIRECTION, L4_DST_PORT, L4_SRC_PORT, INPUT_SNMP, OUTPUT_SNMP, address_family, IPVx_DST_ADDR, IPVx_SRC_ADDR):
60+
buf = struct.pack('!HiIi4s4siQiHiHiIiIiHiH',
5761
11, # number of columns
5862
4, int(ts), # integer - beware of Y2038 problem! :)
59-
8, IPV4_PREFIX, socket.inet_aton(client_ip), # 4 bytes prefix + 4 bytes IP
63+
8, IPV4_ADDRESS_PREFIX, socket.inet_aton(client_ip), # 4 bytes prefix + 4 bytes IP
6064
8, IN_BYTES, # bigint
6165
2, PROTOCOL,
6266
2, DIRECTION,
6367
4, L4_DST_PORT,
6468
4, L4_SRC_PORT,
6569
2, INPUT_SNMP,
6670
2, OUTPUT_SNMP,
67-
8, IPV4_PREFIX, IPV4_DST_ADDR,
68-
8, IPV4_PREFIX, IPV4_SRC_ADDR,
6971
)
70-
pgwriter.write(buf)
72+
if address_family != socket.AF_INET6:
73+
buf2 = struct.pack('!i4s4si4s4s',
74+
8, IPV4_ADDRESS_PREFIX, IPVx_DST_ADDR,
75+
8, IPV4_ADDRESS_PREFIX, IPVx_SRC_ADDR,
76+
)
77+
else:
78+
buf2 = struct.pack('!i4s16si4s16s',
79+
4 + 16, IPV6_ADDRESS_PREFIX, IPVx_DST_ADDR,
80+
4 + 16, IPV6_ADDRESS_PREFIX, IPVx_SRC_ADDR,
81+
)
82+
pgwriter.write(buf + buf2)
7183

7284

7385
def _pgwriter_finish(pgwriter):
@@ -138,9 +150,8 @@ def process_named_pipe(named_pipe_filename):
138150
except UnknownNetFlowVersion:
139151
log.warning("Unknown NetFlow version")
140152
continue
141-
except TemplateNotRecognized:
142-
log.warning("Failed to decode a v9 ExportPacket, template not "
143-
"recognized (if this happens at the start, it's ok)")
153+
except TemplateNotRecognized as ex:
154+
log.warning(f"Failed to decode a v9 ExportPacket, template not recognized (if this happens at the start, it's ok). Template id: {ex.template_id}")
144155
continue
145156

146157
except Exception as ex:
@@ -194,39 +205,36 @@ def write_buffer(buffer, partition_no):
194205

195206

196207
log.debug(f"Writing {len(buffer)} records to DB, partition {partition_no}")
197-
ipv6_ignored_records = 0 # we don't support IPv6 yet
198208
# save each of the flows within the record, but use execute_values() to perform bulk insert:
199209
def _get_data(buffer):
200210
for ts, client_ip, export in buffer:
201211
netflow_version, flows = export.header.version, export.flows
202212
if netflow_version == 9:
203213
for f in flows:
204214
try:
205-
if f.data.get("IP_PROTOCOL_VERSION", 4) == 6:
206-
ipv6_ignored_records += 1
207-
continue
215+
# if f.data.get("IP_PROTOCOL_VERSION", 4) == 6:
216+
if not f.data.get("IPV6_DST_ADDR", None) is None:
217+
address_family = socket.AF_INET6
218+
dst = socket.inet_pton(address_family, f.data["IPV6_DST_ADDR"])
219+
src = socket.inet_pton(address_family, f.data["IPV6_SRC_ADDR"])
220+
else:
221+
address_family = socket.AF_INET
222+
dst = socket.inet_aton(f.data["IPV4_DST_ADDR"])
223+
src = socket.inet_aton(f.data["IPV4_SRC_ADDR"])
208224

209225
yield (
210226
ts,
211227
client_ip,
212-
# "IN_BYTES":
213228
f.data["IN_BYTES"],
214-
# "PROTOCOL":
215229
f.data["PROTOCOL"],
216-
# "DIRECTION":
217230
f.data.get("DIRECTION", DIRECTION_INGRESS),
218-
# "L4_DST_PORT":
219231
f.data["L4_DST_PORT"],
220-
# "L4_SRC_PORT":
221232
f.data["L4_SRC_PORT"],
222-
# "INPUT_SNMP":
223233
f.data["INPUT_SNMP"],
224-
# "OUTPUT_SNMP":
225234
f.data["OUTPUT_SNMP"],
226-
# "IPV4_DST_ADDR":
227-
socket.inet_aton(f.data["IPV4_DST_ADDR"]),
228-
# "IPV4_SRC_ADDR":
229-
socket.inet_aton(f.data["IPV4_SRC_ADDR"]),
235+
address_family,
236+
dst,
237+
src,
230238
)
231239
except KeyError:
232240
log.exception(f"[{client_ip}] Error decoding v9 flow. Contents: {repr(f.data)}")
@@ -250,6 +258,8 @@ def _get_data(buffer):
250258
f.data["INPUT"],
251259
# "OUTPUT_SNMP":
252260
f.data["OUTPUT"],
261+
# address_family is always IPv4:
262+
socket.AF_INET,
253263
# netflow v5 IP addresses are decoded to integers, which is less suitable for us - pack
254264
# them back to bytes:
255265
# "IPV4_DST_ADDR":
@@ -267,9 +277,6 @@ def _get_data(buffer):
267277
_pgwriter_write(pgwriter, *data)
268278
_pgwriter_finish(pgwriter)
269279

270-
if ipv6_ignored_records > 0:
271-
log.error(f"We do not support IPv6 (yet), some IPv6 flow records were ignored: {ipv6_ignored_records}")
272-
273280

274281
if __name__ == "__main__":
275282
NAMED_PIPE_FILENAME = os.environ.get('NAMED_PIPE_FILENAME', None)

pynetflow

0 commit comments

Comments
 (0)