|
| 1 | +""" |
| 2 | +이 모듈은 ClickHouse 데이터베이스에 연결하고 SQL 쿼리를 실행하여 결과를 pandas DataFrame으로 반환하는 기능을 제공합니다. |
| 3 | +
|
| 4 | +구성 요소: |
| 5 | +- 환경 변수에서 접속 정보를 불러와 ClickHouse 서버에 연결합니다. |
| 6 | +- SQL 쿼리를 실행하고 결과를 pandas DataFrame으로 반환합니다. |
| 7 | +- 연결 실패 및 쿼리 오류에 대해 로깅을 통해 디버깅을 지원합니다. |
| 8 | +""" |
| 9 | + |
| 10 | +import logging |
1 | 11 | import os
|
2 |
| -from typing import Union |
| 12 | +from typing import Optional |
| 13 | + |
3 | 14 | import pandas as pd
|
4 | 15 | from clickhouse_driver import Client
|
5 | 16 | from dotenv import load_dotenv
|
6 | 17 |
|
7 |
| -# 환경변수 |
8 | 18 | load_dotenv()
|
9 | 19 |
|
| 20 | +logging.basicConfig( |
| 21 | + level=logging.INFO, |
| 22 | + format="%(asctime)s [%(levelname)s] %(message)s", |
| 23 | + datefmt="%Y-%m-%d %H:%M:%S", |
| 24 | +) |
| 25 | +logger = logging.getLogger(__name__) |
| 26 | + |
10 | 27 |
|
11 | 28 | class ConnectDB:
|
| 29 | + """ |
| 30 | + ClickHouse 데이터베이스에 연결하고 SQL 쿼리를 실행하는 클래스입니다. |
| 31 | +
|
| 32 | + 환경 변수에서 접속 정보를 로드하여 ClickHouse 서버에 연결하며, |
| 33 | + SQL 쿼리 실행 결과를 pandas DataFrame으로 반환합니다. |
| 34 | + """ |
| 35 | + |
12 | 36 | def __init__(self):
|
13 |
| - self.client = None |
| 37 | + """ |
| 38 | + ConnectDB 클래스의 인스턴스를 초기화합니다. |
| 39 | +
|
| 40 | + 환경 변수에서 ClickHouse 접속 정보를 읽고, 즉시 서버에 연결을 시도합니다. |
| 41 | + """ |
| 42 | + |
| 43 | + self.client: Optional[Client] = None |
14 | 44 | self.host = os.getenv("CLICKHOUSE_HOST")
|
15 | 45 | self.dbname = os.getenv("CLICKHOUSE_DATABASE")
|
16 | 46 | self.user = os.getenv("CLICKHOUSE_USER")
|
17 | 47 | self.password = os.getenv("CLICKHOUSE_PASSWORD")
|
18 | 48 | self.port = os.getenv("CLICKHOUSE_PORT")
|
19 | 49 |
|
20 |
| - def connect_to_clickhouse(self): |
21 |
| - |
22 |
| - # ClickHouse 서버 정보 |
23 |
| - self.client = Client( |
24 |
| - host=self.host, |
25 |
| - port=self.port, |
26 |
| - user=self.user, |
27 |
| - password=self.password, |
28 |
| - database=self.dbname, # 예: '127.0.0.1' # 기본 TCP 포트 |
29 |
| - ) |
30 |
| - |
31 |
| - def run_sql(self, sql: str) -> Union[pd.DataFrame, None]: |
32 |
| - if self.client: |
33 |
| - try: |
34 |
| - result = self.client.execute(sql, with_column_types=True) |
35 |
| - # 결과와 컬럼 정보 분리 |
36 |
| - rows, columns = result |
37 |
| - column_names = [col[0] for col in columns] |
38 |
| - |
39 |
| - # Create a pandas dataframe from the results |
40 |
| - df = pd.DataFrame(rows, columns=column_names) |
41 |
| - return df |
42 |
| - |
43 |
| - except Exception as e: |
44 |
| - raise e |
| 50 | + self.connect_to_clickhouse() |
| 51 | + |
| 52 | + def connect_to_clickhouse(self) -> None: |
| 53 | + """ |
| 54 | + ClickHouse 서버에 연결을 시도합니다. |
| 55 | +
|
| 56 | + 연결에 성공하면 client 객체가 설정되며, 실패 시 예외를 발생시킵니다. |
| 57 | + 연결 상태는 로깅을 통해 출력됩니다. |
| 58 | + """ |
| 59 | + |
| 60 | + try: |
| 61 | + self.client = Client( |
| 62 | + host=self.host, |
| 63 | + port=self.port, |
| 64 | + user=self.user, |
| 65 | + password=self.password, |
| 66 | + database=self.dbname, |
| 67 | + ) |
| 68 | + logger.info("Successfully connected to ClickHouse.") |
| 69 | + except Exception as e: |
| 70 | + logger.error("Failed to connect to ClickHouse: %s", e) |
| 71 | + raise |
| 72 | + |
| 73 | + def run_sql(self, sql: str) -> pd.DataFrame: |
| 74 | + """ |
| 75 | + SQL 쿼리를 실행하고 결과를 pandas DataFrame으로 반환합니다. |
| 76 | + 내부적으로 ClickHouse 클라이언트가 없으면 자동으로 재연결을 시도합니다. |
| 77 | +
|
| 78 | + Parameters: |
| 79 | + sql (str): 실행할 SQL 쿼리 문자열 |
| 80 | +
|
| 81 | + Returns: |
| 82 | + pd.DataFrame: 쿼리 실행 결과를 담은 DataFrame 객체 |
| 83 | +
|
| 84 | + Raises: |
| 85 | + Exception: SQL 실행 중 오류 발생 시 예외를 발생시킵니다. |
| 86 | + """ |
| 87 | + |
| 88 | + if not self.client: |
| 89 | + logger.warning( |
| 90 | + "ClickHouse client is not initialized. Attempting to reconnect..." |
| 91 | + ) |
| 92 | + self.connect_to_clickhouse() |
| 93 | + |
| 94 | + try: |
| 95 | + result = self.client.execute(sql, with_column_types=True) |
| 96 | + rows, columns = result |
| 97 | + column_names = [col[0] for col in columns] |
| 98 | + df = pd.DataFrame(rows, columns=column_names) |
| 99 | + return df |
| 100 | + |
| 101 | + except Exception as e: |
| 102 | + logger.exception("An error occurred while executing SQL: %s", e) |
| 103 | + raise |
0 commit comments