Skip to content

Feature/80 add connectors and metadata #104

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 92 additions & 24 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@
###############################################

# LLM_PROVIDER=openai
# OPEN_AI_LLM_KEY=
# OPEN_AI_LLM_MODEL=gpt-4o
# OPEN_AI_KEY=sk-proj-----
# OPEN_AI_LLM_MODEL=gpt-4.1

# LLM_PROVIDER=gemini
# GEMINI_API_KEY=
# GEMINI_LLM_MODEL=gemini-2.0-flash-lite

# LLM_PROVIDER=azure
# AZURE_OPENAI_LLM_ENDPOINT=
# AZURE_OPENAI_LLM_KEY=
# AZURE_OPENAI_LLM_MODEL=
# AZURE_OPENAI_LLM_API_VERSION=
LLM_PROVIDER=azure
AZURE_OPENAI_LLM_ENDPOINT=https://-------.openai.azure.com/
AZURE_OPENAI_LLM_KEY=-
AZURE_OPENAI_LLM_MODEL=gpt4o
AZURE_OPENAI_LLM_API_VERSION=2024-07-01-preview

# LLM_PROVIDER=ollama
# OLLAMA_LLM_BASE_URL=
Expand All @@ -36,31 +36,99 @@
########### Embedding API SElECTION ###########
###############################################
# Only used if you are using an LLM that does not natively support embedding (openai or Azure)
# EMBEDDING_ENGINE='openai'
# OPEN_AI_KEY=sk-xxxx
# EMBEDDING_MODEL_PREF='text-embedding-ada-002'
# EMBEDDING_PROVIDER='openai'
# OPEN_AI_EMBEDDING_MODEL='text-embedding-ada-002'

# EMBEDDING_ENGINE='azure'
# AZURE_OPENAI_ENDPOINT=
# AZURE_OPENAI_KEY=
# EMBEDDING_MODEL_PREF='my-embedder-model' # This is the "deployment" on Azure you want to use for embeddings. Not the base model. Valid base model is text-embedding-ada-002
# EMBEDDING_PROVIDER=azure
# AZURE_OPENAI_EMBEDDING_ENDPOINT=https://-------.openai.azure.com/openai/deployments
# AZURE_OPENAI_EMBEDDING_KEY=-
# AZURE_OPENAI_EMBEDDING_MODEL='textembeddingada002' # This is the "deployment" on Azure you want to use for embeddings. Not the base model. Valid base model is text-embedding-ada-002
# AZURE_OPENAI_EMBEDDING_API_VERSION=2023-09-15-preview

# EMBEDDING_ENGINE='ollama'
# EMBEDDING_PROVIDER='ollama'
# EMBEDDING_BASE_PATH='http://host.docker.internal:11434'
# EMBEDDING_MODEL_PREF='nomic-embed-text:latest'
# EMBEDDING_MODEL='nomic-embed-text:latest'
# EMBEDDING_MODEL_MAX_CHUNK_LENGTH=8192

# EMBEDDING_ENGINE='bedrock'
# AWS_BEDROCK_EMBEDDING_ACCESS_KEY_ID=
# AWS_BEDROCK_EMBEDDING_ACCESS_KEY=
# AWS_BEDROCK_EMBEDDING_REGION=us-west-2
# AWS_BEDROCK_EMBEDDING_MODEL_PREF=amazon.embedding-embedding-ada-002:0
EMBEDDING_PROVIDER='bedrock'
AWS_BEDROCK_EMBEDDING_ACCESS_KEY_ID=--
AWS_BEDROCK_EMBEDDING_SECRET_ACCESS_KEY=-/-+-+-
AWS_BEDROCK_EMBEDDING_REGION=us-west-2
AWS_BEDROCK_EMBEDDING_MODEL=amazon.titan-embed-text-v2:0

# EMBEDDING_ENGINE='gemini'
# EMBEDDING_PROVIDER='gemini'
# GEMINI_EMBEDDING_API_KEY=
# EMBEDDING_MODEL_PREF='text-embedding-004'
# EMBEDDING_MODEL='text-embedding-004'

# EMBEDDING_ENGINE='huggingface'
# EMBEDDING_PROVIDER='huggingface'
# HUGGING_FACE_EMBEDDING_REPO_ID=
# HUGGING_FACE_EMBEDDING_MODEL=
# HUGGING_FACE_EMBEDDING_API_TOKEN=

DATAHUB_SERVER = 'http://-.-.-.-:-'


###############################################
######## Database Connector SELECTION #########
###############################################

# clickhouse
# DB_TYPE=clickhouse
# CLICKHOUSE_HOST=_._._._
# CLICKHOUSE_PORT=9000
# CLICKHOUSE_USER=_
# CLICKHOUSE_PASSWORD=_
# CLICKHOUSE_DATABASE=_

# databricks
# DB_TYPE=databricks
# DATABRICKS_HOST=_
# DATABRICKS_HTTP_PATH=_
# DATABRICKS_ACCESS_TOKEN=_

# duckdb
# DB_TYPE=duckdb
# DUCKDB_PATH=./data/duckdb.db

# mariadb
# DB_TYPE=mariadb
# MARIADB_HOST=_
# MARIADB_PORT=3306
# MARIADB_USER=_
# MARIADB_PASSWORD=_
# MARIADB_DATABASE=_

# mysql
# DB_TYPE=mysql
# MYSQL_HOST=_
# MYSQL_PORT=3306
# MYSQL_USER=_
# MYSQL_PASSWORD=_
# MYSQL_DATABASE=_

# oracle
# DB_TYPE=oracle
# ORACLE_HOST=_
# ORACLE_PORT=1521
# ORACLE_USER=_
# ORACLE_PASSWORD=_
# ORACLE_DATABASE=_
# ORACLE_SERVICE_NAME=_

# postgres
# DB_TYPE=postgres
# POSTGRES_HOST=_
# POSTGRES_PORT=5432
# POSTGRES_USER=_
# POSTGRES_PASSWORD=_
# POSTGRES_DATABASE=_

# snowflake
# DB_TYPE=snowflake
# SNOWFLAKE_USER=_
# SNOWFLAKE_PASSWORD=_
# SNOWFLAKE_ACCOUNT=_

# sqlite
# DB_TYPE=sqlite
# SQLITE_PATH=./data/sqlite.db
127 changes: 127 additions & 0 deletions db_utils/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
from typing import Optional
import os
from .config import DBConfig
from .logger import logger

from dotenv import load_dotenv

from .base_connector import BaseConnector

from .clickhouse_connector import ClickHouseConnector
from .postgres_connector import PostgresConnector
from .mysql_connector import MySQLConnector
from .mariadb_connector import MariaDBConnector
from .oracle_connector import OracleConnector
from .duckdb_connector import DuckDBConnector
from .databricks_connector import DatabricksConnector
from .snowflake_connector import SnowflakeConnector

env_path = os.path.join(os.getcwd(), ".env")

if os.path.exists(env_path):
load_dotenv(env_path, override=True)
print(f"✅ 환경변수 파일(.env)이 {os.getcwd()}에 로드되었습니다!")
else:
print(f"⚠️ 환경변수 파일(.env)이 {os.getcwd()}에 없습니다!")

def get_db_connector(db_type: Optional[str] = None, config: Optional[DBConfig] = None):
"""
Return the appropriate DB connector instance.
- If db_type is not provided, loads from environment variable DB_TYPE
- If config is not provided, loads from environment using db_type

Parameters:
db_type (Optional[str]): Database type (e.g., 'postgresql', 'mysql')
config (Optional[DBConfig]): Connection config

Returns:
BaseConnector: Initialized DB connector instance

Raises:
ValueError: If type/config is missing or invalid
"""
if db_type is None:
db_type = os.getenv("DB_TYPE")
if not db_type:
raise ValueError("DB type must be provided or set in environment as DB_TYPE.")

db_type = db_type.lower()

if config is None:
config = load_config_from_env(db_type.upper())

connector_map = {
"clickhouse": ClickHouseConnector,
"postgresql": PostgresConnector,
"mysql": MySQLConnector,
"mariadb": MariaDBConnector,
"oracle": OracleConnector,
"duckdb": DuckDBConnector,
"databricks": DatabricksConnector,
"snowflake": SnowflakeConnector,
}

if db_type not in connector_map:
logger.error(f"Unsupported DB type: {db_type}")
raise ValueError(f"Unsupported DB type: {db_type}")

required_fields = {
"oracle": ["extra.service_name"],
"databricks": ["extra.http_path", "extra.access_token"],
"snowflake": ["extra.account"],
}

missing = []
for path in required_fields.get(db_type, []):
cur = config
for key in path.split("."):
cur = cur.get(key) if isinstance(cur, dict) else None
if cur is None:
missing.append(path)
break

if missing:
logger.error(f"Missing required fields for {db_type}: {', '.join(missing)}")
raise ValueError(f"Missing required fields for {db_type}: {', '.join(missing)}")

return connector_map[db_type](config)



def load_config_from_env(prefix: str) -> DBConfig:
"""
Load DBConfig from environment variables with a given prefix.
Standard keys are extracted, all other prefixed keys go to 'extra'.

Example:
If prefix = 'SNOWFLAKE', loads:
- SNOWFLAKE_HOST
- SNOWFLAKE_USER
- SNOWFLAKE_PASSWORD
- SNOWFLAKE_PORT
- SNOWFLAKE_DATABASE
Other keys like SNOWFLAKE_ACCOUNT, SNOWFLAKE_WAREHOUSE -> extra
"""
base_keys = {"HOST", "PORT", "USER", "PASSWORD", "DATABASE"}

# Extract standard values
config = {
"host": os.getenv(f"{prefix}_HOST"),
"port": int(os.getenv(f"{prefix}_PORT")) if os.getenv(f"{prefix}_PORT") else None,
"user": os.getenv(f"{prefix}_USER"),
"password": os.getenv(f"{prefix}_PASSWORD"),
"database": os.getenv(f"{prefix}_DATABASE"),
}

# Auto-detect extra keys
extra = {}
for key, value in os.environ.items():
if key.startswith(f"{prefix}_"):
suffix = key[len(f"{prefix}_"):]
if suffix.upper() not in base_keys:
extra[suffix.lower()] = value

if extra:
config["extra"] = extra

return DBConfig(**config)
27 changes: 27 additions & 0 deletions db_utils/base_connector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from abc import ABC, abstractmethod
import pandas as pd

class BaseConnector(ABC):
"""
Abstract base class for database connectors.
"""

@abstractmethod
def connect(self):
"""
Initialize the database connection.
"""
pass

@abstractmethod
def run_sql(self, sql: str) -> pd.DataFrame:
"""
Returns the result of the SQL query as a pandas DataFrame.

Parameters:
sql (str): SQL query string to be executed.

Returns:
pd.DataFrame: Result of the SQL query as a pandas DataFrame.
"""
pass
70 changes: 70 additions & 0 deletions db_utils/clickhouse_connector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from .base_connector import BaseConnector
from clickhouse_driver import Client
import pandas as pd
from db_utils import DBConfig, logger

class ClickHouseConnector(BaseConnector):
"""
Connect to ClickHouse and execute SQL queries.
"""
client = None

def __init__(self, config: DBConfig):
"""
Initialize the ClickHouseConnector with connection parameters.

Parameters:
config (DBConfig): Configuration object containing connection parameters.
"""
self.host = config["host"]
self.port = config["port"]
self.user = config["user"]
self.password = config["password"]
self.database = config["database"]
self.connect()

def connect(self) -> None:
"""
Establish a connection to the ClickHouse server.
"""
try:
self.client = Client(
host=self.host,
port=self.port,
user=self.user,
password=self.password,
database=self.database,
)
logger.info("Successfully connected to ClickHouse.")
except Exception as e:
logger.error(f"Failed to connect to ClickHouse: {e}")
raise

def run_sql(self, sql: str) -> pd.DataFrame:
"""
Execute a SQL query and return the result as a pandas DataFrame.

Parameters:
sql (str): SQL query string to be executed.

Returns:
pd.DataFrame: Result of the SQL query as a pandas DataFrame.
"""
if self.client is None:
self.connect()

try:
result = self.client.query(sql).result()
return pd.DataFrame(result)
except Exception as e:
logger.error(f"Failed to execute SQL query: {e}")
raise

def close(self) -> None:
"""
Close the connection to the ClickHouse server.
"""
if self.client:
self.client.disconnect()
logger.error("Connection to ClickHouse closed.")
self.client = None
10 changes: 10 additions & 0 deletions db_utils/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from typing import Optional, Dict, TypedDict

class DBConfig(TypedDict):

host: str
port: Optional[int]
user: Optional[str]
password: Optional[str]
database: Optional[str]
extra: Optional[Dict[str, str]]
Loading
Loading