@@ -200,54 +200,60 @@ def _get_data(buffer):
200
200
netflow_version , flows = export .header .version , export .flows
201
201
if netflow_version == 9 :
202
202
for f in flows :
203
- yield (
204
- ts ,
205
- client_ip ,
206
- # "IN_BYTES":
207
- f .data ["IN_BYTES" ],
208
- # "PROTOCOL":
209
- f .data ["PROTOCOL" ],
210
- # "DIRECTION":
211
- f .data .get ("DIRECTION" , DIRECTION_INGRESS ),
212
- # "L4_DST_PORT":
213
- f .data ["L4_DST_PORT" ],
214
- # "L4_SRC_PORT":
215
- f .data ["L4_SRC_PORT" ],
216
- # "INPUT_SNMP":
217
- f .data ["INPUT_SNMP" ],
218
- # "OUTPUT_SNMP":
219
- f .data ["OUTPUT_SNMP" ],
220
- # "IPV4_DST_ADDR":
221
- socket .inet_aton (f .data ["IPV4_DST_ADDR" ]),
222
- # "IPV4_SRC_ADDR":
223
- socket .inet_aton (f .data ["IPV4_SRC_ADDR" ]),
224
- )
203
+ try :
204
+ yield (
205
+ ts ,
206
+ client_ip ,
207
+ # "IN_BYTES":
208
+ f .data ["IN_BYTES" ],
209
+ # "PROTOCOL":
210
+ f .data ["PROTOCOL" ],
211
+ # "DIRECTION":
212
+ f .data .get ("DIRECTION" , DIRECTION_INGRESS ),
213
+ # "L4_DST_PORT":
214
+ f .data ["L4_DST_PORT" ],
215
+ # "L4_SRC_PORT":
216
+ f .data ["L4_SRC_PORT" ],
217
+ # "INPUT_SNMP":
218
+ f .data ["INPUT_SNMP" ],
219
+ # "OUTPUT_SNMP":
220
+ f .data ["OUTPUT_SNMP" ],
221
+ # "IPV4_DST_ADDR":
222
+ socket .inet_aton (f .data ["IPV4_DST_ADDR" ]),
223
+ # "IPV4_SRC_ADDR":
224
+ socket .inet_aton (f .data ["IPV4_SRC_ADDR" ]),
225
+ )
226
+ except KeyError :
227
+ log .exception (f"[{ client_ip } ] Error decoding v9 flow, some data was missing. Contents: { repr (f .data )} " )
225
228
elif netflow_version == 5 :
226
229
for f in flows :
227
- yield (
228
- ts ,
229
- client_ip ,
230
- # "IN_BYTES":
231
- f .data ["IN_OCTETS" ],
232
- # "PROTOCOL":
233
- f .data ["PROTO" ],
234
- # "DIRECTION":
235
- DIRECTION_INGRESS ,
236
- # "L4_DST_PORT":
237
- f .data ["DST_PORT" ],
238
- # "L4_SRC_PORT":
239
- f .data ["SRC_PORT" ],
240
- # "INPUT_SNMP":
241
- f .data ["INPUT" ],
242
- # "OUTPUT_SNMP":
243
- f .data ["OUTPUT" ],
244
- # netflow v5 IP addresses are decoded to integers, which is less suitable for us - pack
245
- # them back to bytes:
246
- # "IPV4_DST_ADDR":
247
- struct .pack ('!I' , f .data ["IPV4_DST_ADDR" ]),
248
- # "IPV4_SRC_ADDR":
249
- struct .pack ('!I' , f .data ["IPV4_SRC_ADDR" ]),
250
- )
230
+ try :
231
+ yield (
232
+ ts ,
233
+ client_ip ,
234
+ # "IN_BYTES":
235
+ f .data ["IN_OCTETS" ],
236
+ # "PROTOCOL":
237
+ f .data ["PROTO" ],
238
+ # "DIRECTION":
239
+ DIRECTION_INGRESS ,
240
+ # "L4_DST_PORT":
241
+ f .data ["DST_PORT" ],
242
+ # "L4_SRC_PORT":
243
+ f .data ["SRC_PORT" ],
244
+ # "INPUT_SNMP":
245
+ f .data ["INPUT" ],
246
+ # "OUTPUT_SNMP":
247
+ f .data ["OUTPUT" ],
248
+ # netflow v5 IP addresses are decoded to integers, which is less suitable for us - pack
249
+ # them back to bytes:
250
+ # "IPV4_DST_ADDR":
251
+ struct .pack ('!I' , f .data ["IPV4_DST_ADDR" ]),
252
+ # "IPV4_SRC_ADDR":
253
+ struct .pack ('!I' , f .data ["IPV4_SRC_ADDR" ]),
254
+ )
255
+ except KeyError :
256
+ log .exception (f"[{ client_ip } ] Error decoding v5 flow, some data was missing. Contents: { repr (f .data )} " )
251
257
else :
252
258
log .error (f"[{ client_ip } ] Only Netflow v5 and v9 currently supported, ignoring record (version: [{ export .header .version } ])" )
253
259
0 commit comments