26
26
27
27
28
28
NETFLOW_AGGREGATION_INTERVALS = [
29
- ('1min' , 60 ),
30
- ('15min' , 15 * 60 ),
31
- ('1h' , 3600 ),
32
- ('4h' , 4 * 3600 ),
33
- ('24h' , 24 * 3600 ),
29
+ # label; interval; when to start first run (to make sure the runs are not aligned)
30
+ ('1min' , 60 , 0 ),
31
+ ('15min' , 15 * 60 , 15 ),
32
+ ('1h' , 3600 , 4 * 60 + 15 ),
33
+ ('4h' , 4 * 3600 , 29 * 60 + 15 ),
34
+ ('24h' , 24 * 3600 , 1 * 3600 + 29 * 60 + 15 ),
34
35
]
35
36
TOP_N_MAX = 10
36
37
@@ -71,7 +72,7 @@ def jobs(self):
71
72
accounts_infos [entity_info ["account_id" ]].append (entity_info )
72
73
73
74
for account_id , entities_infos in accounts_infos .items ():
74
- for interval_label , interval in NETFLOW_AGGREGATION_INTERVALS :
75
+ for interval_label , interval , first_run_ts in NETFLOW_AGGREGATION_INTERVALS :
75
76
job_id = f'aggr/{ interval_label } /{ account_id } '
76
77
job_params = {
77
78
"job_id" : job_id ,
@@ -81,7 +82,8 @@ def jobs(self):
81
82
"backend_url" : self .backend_url ,
82
83
"bot_token" : self .bot_token ,
83
84
}
84
- yield job_id , [interval ], NetFlowBot .perform_account_aggr_job , job_params
85
+ start_ts = int (time .time ()) + first_run_ts - interval # start_ts must be in the past
86
+ yield job_id , [interval ], NetFlowBot .perform_account_aggr_job , job_params , start_ts
85
87
86
88
87
89
@staticmethod
0 commit comments