Skip to content

Commit 17cad94

Browse files
authored
Merge pull request #1 from grafolean/feature/history
Feature/history
2 parents c85eb60 + 6f9f222 commit 17cad94

14 files changed

+951
-161
lines changed

.env.example

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
BACKEND_URL=https://grafolean.com/api
22
BOT_TOKEN=
3+
DB_DIR=
34
JOBS_REFRESH_INTERVAL=60
45
NETFLOW_PORT=2055

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,4 @@
11
.vscode
22
.env
3+
.cache
4+
__pycache__

.gitmodules

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
[submodule "pynetflow"]
22
path = pynetflow
3-
url = https://github.com/bitkeks/python-netflow-v9-softflowd.git
3+
url = https://github.com/grafolean/python-netflow-v9-softflowd.git

Dockerfile

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ FROM python:3.6-slim-stretch as build-backend
99
COPY ./ /netflowbot/
1010
WORKDIR /netflowbot
1111
RUN \
12+
rm -rf .git/ tests/ .vscode/ .pytest_cache/ __pycache__/ && \
1213
find ./ ! -name '*.py' -type f -exec rm '{}' ';' && \
13-
rm -rf tests/ .vscode/ .pytest_cache/ __pycache__/ && \
1414
python3.6 -m compileall -b ./ && \
1515
find ./ -name '*.py' -exec rm '{}' ';'
1616

@@ -38,8 +38,10 @@ WORKDIR /netflowbot
3838
HEALTHCHECK --interval=10s --retries=1 CMD /bin/bash -c "[ ! -f /tmp/fail_health_check ] || ( rm /tmp/fail_health_check && exit 1 )"
3939

4040
# CAREFUL:
41-
# There are two entrypoints, both of which should be running:
42-
# - netflowcollector: gathering packets and writing statistics to Redis
41+
# There are three entrypoints, all of which should be running: (use docker-compose.yml to start 3 services)
42+
# - netflowcollector: gathering packets and writing them to named pipe
43+
# - netflowwriter: reading packets from named pipe and writing them to DB
4344
# - netflowbot: Grafolean bot for NetFlow - sending data to Grafolean according to configured sensors
44-
#CMD ["python", "-m", "netflowcollector"]
45+
# CMD ["python", "-m", "netflowcollector"]
46+
# CMD ["python", "-m", "netflowwriter"]
4547
CMD ["python", "-m", "netflowbot"]

Pipfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ requests = "*"
1111
python-dotenv = "*"
1212
ansicolors = "*"
1313
grafoleancollector = "*"
14-
redis = "*"
14+
psycopg2-binary = "*"
1515

1616
[requires]
1717
python_version = "3.6"

Pipfile.lock

Lines changed: 67 additions & 36 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dbutils.py

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
from colors import color
2+
from contextlib import contextmanager
3+
import logging
4+
import os
5+
import sys
6+
import copy
7+
import json
8+
9+
import psycopg2
10+
from psycopg2.pool import ThreadedConnectionPool
11+
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, register_adapter
12+
from psycopg2.extras import Json
13+
14+
IS_DEBUG = os.environ.get('DEBUG', 'false') in ['true', 'yes', '1']
15+
logging.basicConfig(format='%(asctime)s.%(msecs)03d | %(levelname)s | %(message)s',
16+
datefmt='%Y-%m-%d %H:%M:%S', level=logging.DEBUG if IS_DEBUG else logging.INFO)
17+
logging.addLevelName(logging.DEBUG, color("DBG", 7))
18+
logging.addLevelName(logging.INFO, "INF")
19+
logging.addLevelName(logging.WARNING, color('WRN', fg='red'))
20+
logging.addLevelName(logging.ERROR, color('ERR', bg='red'))
21+
log = logging.getLogger("{}.{}".format(__name__, "dbutils"))
22+
23+
24+
db_pool = None
25+
DB_PREFIX = 'netflow_'
26+
register_adapter(dict, Json)
27+
28+
29+
# https://medium.com/@thegavrikstory/manage-raw-database-connection-pool-in-flask-b11e50cbad3
30+
@contextmanager
31+
def get_db_connection():
32+
global db_pool
33+
if db_pool is None:
34+
db_connect()
35+
36+
try:
37+
conn = db_pool.getconn()
38+
conn.autocommit = True
39+
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
40+
yield conn
41+
finally:
42+
db_pool.putconn(conn)
43+
44+
45+
@contextmanager
46+
def get_db_cursor(commit=False):
47+
with get_db_connection() as connection:
48+
cursor = connection.cursor()
49+
try:
50+
yield cursor
51+
if commit:
52+
connection.commit()
53+
finally:
54+
cursor.close()
55+
56+
57+
def db_connect():
58+
global db_pool
59+
host, dbname, user, password, connect_timeout = (
60+
os.environ.get('DB_HOST', 'localhost'),
61+
os.environ.get('DB_DATABASE', 'grafolean'),
62+
os.environ.get('DB_USERNAME', 'admin'),
63+
os.environ.get('DB_PASSWORD', 'admin'),
64+
int(os.environ.get('DB_CONNECT_TIMEOUT', '10'))
65+
)
66+
try:
67+
log.info("Connecting to database, host: [{}], db: [{}], user: [{}]".format(host, dbname, user))
68+
db_pool = ThreadedConnectionPool(1, 20,
69+
database=dbname,
70+
user=user,
71+
password=password,
72+
host=host,
73+
port=5432,
74+
connect_timeout=connect_timeout)
75+
except:
76+
db_pool = None
77+
log.error("DB connection failed")
78+
79+
80+
###########################
81+
# DB schema migration #
82+
###########################
83+
84+
def get_existing_schema_version():
85+
existing_schema_version = 0
86+
with get_db_cursor() as c:
87+
try:
88+
c.execute(f'SELECT schema_version FROM {DB_PREFIX}runtime_data;')
89+
res = c.fetchone()
90+
existing_schema_version = res[0]
91+
except psycopg2.ProgrammingError:
92+
pass
93+
return existing_schema_version
94+
95+
96+
def _get_migration_method(next_migration_version):
97+
method_name = 'migration_step_{}'.format(next_migration_version)
98+
return method_name if hasattr(sys.modules[__name__], method_name) else None
99+
100+
101+
def is_migration_needed():
102+
existing_schema_version = get_existing_schema_version()
103+
return _get_migration_method(existing_schema_version + 1) is not None
104+
105+
106+
def migrate_if_needed():
107+
existing_schema_version = get_existing_schema_version()
108+
try_migrating_to = existing_schema_version + 1
109+
while True:
110+
method_name = _get_migration_method(try_migrating_to)
111+
if method_name is None:
112+
break
113+
log.info("Migrating DB schema from {} to {}".format(existing_schema_version, try_migrating_to))
114+
method_to_call = getattr(sys.modules[__name__], method_name)
115+
method_to_call()
116+
# automatically upgrade schema version if there is no exception:
117+
with get_db_cursor() as c:
118+
c.execute(f'UPDATE {DB_PREFIX}runtime_data SET schema_version = %s;', (try_migrating_to,))
119+
try_migrating_to += 1
120+
if try_migrating_to == existing_schema_version + 1:
121+
return False # migration wasn't meeded
122+
else:
123+
return True
124+
125+
126+
def migration_step_1():
127+
with get_db_cursor() as c:
128+
c.execute(f'CREATE TABLE {DB_PREFIX}runtime_data (schema_version SMALLSERIAL NOT NULL);')
129+
c.execute(f'INSERT INTO {DB_PREFIX}runtime_data (schema_version) VALUES (1);')
130+
131+
def migration_step_2():
132+
with get_db_cursor() as c:
133+
# UNLOGGED: Disabling WAL avoids high I/O load. Since NetFlow data is of temporary nature, this still
134+
# allows us to perform queries, but if the database crashes it is acceptable to lose all of the records.
135+
c.execute(f'CREATE UNLOGGED TABLE {DB_PREFIX}records (seq BIGSERIAL NOT NULL PRIMARY KEY, ts NUMERIC(16,6) NOT NULL, client_ip TEXT);')
136+
137+
c.execute(f"""
138+
CREATE UNLOGGED TABLE {DB_PREFIX}flows (
139+
record INTEGER NOT NULL REFERENCES {DB_PREFIX}records(seq) ON DELETE CASCADE,
140+
IN_BYTES INTEGER,
141+
PROTOCOL SMALLINT,
142+
DIRECTION SMALLINT,
143+
L4_DST_PORT INTEGER,
144+
L4_SRC_PORT INTEGER,
145+
INPUT_SNMP SMALLINT,
146+
OUTPUT_SNMP SMALLINT,
147+
IPV4_DST_ADDR TEXT,
148+
IPV4_SRC_ADDR TEXT
149+
);
150+
""")
151+
c.execute(f'CREATE INDEX netflow_flows_record on netflow_flows (record);')
152+
153+
c.execute(f'CREATE TABLE {DB_PREFIX}bot_jobs (job_id TEXT NOT NULL PRIMARY KEY, last_used_seq BIGSERIAL);')
154+
155+
# slow db queries should be logged to help avoid performance problems:
156+
database_name = os.environ.get('DB_DATABASE', 'grafolean')
157+
c.execute(f'ALTER DATABASE %s SET log_min_duration_statement = 100;', (database_name,))

0 commit comments

Comments
 (0)