-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnotion_pipeline.py
42 lines (34 loc) · 1.05 KB
/
notion_pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
from typing import Callable
import dlt
from dlt_source_notion import source, DatabaseResource, DatabaseProperty
DEV_MODE = True
def load_notion_data() -> None:
pipeline = dlt.pipeline(
pipeline_name="notion_pipeline", destination="duckdb", dev_mode=DEV_MODE
)
def column_name_projection(
prop: DatabaseProperty, normalize: Callable[[str], str]
) -> str:
result_name = normalize(prop.name)
if result_name in [
"my_column_name",
]:
return None
return result_name
my_db = DatabaseResource(
database_id="12345678912345678912345678912345",
column_name_projection=column_name_projection,
)
data = source(
limit=-1 if not DEV_MODE else 1,
database_resources=[my_db],
)
info = pipeline.run(
data,
refresh="drop_sources" if DEV_MODE else None,
# we need this in case new resources, etc. are added
schema_contract={"columns": "evolve"},
)
print(info)
if __name__ == "__main__":
load_notion_data()