Skip to content

Commit d321253

Browse files
committed
Moved deferred pool, connection and executor to respectful files
1 parent 7deaffa commit d321253

File tree

14 files changed

+315
-291
lines changed

14 files changed

+315
-291
lines changed

src/packages/dumbo/src/benchmarks/index.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,14 @@ import 'dotenv/config';
22

33
import Benchmark from 'benchmark';
44
import pg from 'pg';
5-
import { postgreSQLConnectionString, rawSql, single } from '..';
6-
import { defaultPostgreSQLConenctionString, dumbo } from '../pg';
5+
import { rawSql, single } from '..';
6+
import {
7+
defaultPostgreSQLConenctionString,
8+
dumbo,
9+
PostgreSQLConnectionString,
10+
} from '../pg';
711

8-
const connectionString = postgreSQLConnectionString(
12+
const connectionString = PostgreSQLConnectionString(
913
process.env.BENCHMARK_POSTGRESQL_CONNECTION_STRING ??
1014
defaultPostgreSQLConenctionString,
1115
);

src/packages/dumbo/src/core/connections/connection.ts

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import type { ConnectorType } from '../connectors';
22
import {
3+
createLazyExecutor,
34
sqlExecutor,
45
type DbSQLExecutor,
56
type WithSQLExecutor,
@@ -85,3 +86,51 @@ export const createConnection = <
8586

8687
return typedConnection;
8788
};
89+
90+
export const createDeferredConnection = <Connector extends ConnectorType>(
91+
connector: Connector,
92+
importConnection: () => Promise<Connection<Connector>>,
93+
): Connection<Connector> => {
94+
const getConnection = importConnection();
95+
96+
const execute = createLazyExecutor(async () => {
97+
const conn = await getConnection;
98+
return conn.execute;
99+
});
100+
101+
const connection: Connection<Connector> = {
102+
connector,
103+
execute,
104+
105+
open: async (): Promise<unknown> => {
106+
const conn = await getConnection;
107+
return conn.open();
108+
},
109+
110+
close: async (): Promise<void> => {
111+
if (getConnection) {
112+
const conn = await getConnection;
113+
await conn.close();
114+
}
115+
},
116+
117+
transaction: () => {
118+
const transaction = getConnection.then((c) => c.transaction());
119+
120+
return {
121+
connector,
122+
connection,
123+
execute: createLazyExecutor(async () => (await transaction).execute),
124+
begin: async () => (await transaction).begin(),
125+
commit: async () => (await transaction).commit(),
126+
rollback: async () => (await transaction).rollback(),
127+
};
128+
},
129+
withTransaction: async (handle) => {
130+
const connection = await getConnection;
131+
return connection.withTransaction(handle);
132+
},
133+
};
134+
135+
return connection;
136+
};

src/packages/dumbo/src/core/connections/pool.ts

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import { createDeferredConnection } from '../connections';
2+
import type { ConnectorType } from '../connectors';
13
import {
24
executeInNewConnection,
35
sqlExecutorInNewConnection,
@@ -72,3 +74,37 @@ export const createConnectionPool = <
7274

7375
return result as ConnectionPoolType;
7476
};
77+
78+
export const createDeferredConnectionPool = <Connector extends ConnectorType>(
79+
connector: Connector,
80+
importPool: () => Promise<ConnectionPool<Connection<Connector>>>,
81+
): ConnectionPool<Connection<Connector>> => {
82+
let poolPromise: Promise<ConnectionPool<Connection<Connector>>> | null = null;
83+
84+
const getPool = async (): Promise<ConnectionPool<Connection<Connector>>> => {
85+
if (!poolPromise) {
86+
try {
87+
poolPromise = importPool();
88+
} catch (error) {
89+
throw new Error(
90+
`Failed to import connection pool: ${error instanceof Error ? error.message : String(error)}`,
91+
);
92+
}
93+
}
94+
return poolPromise;
95+
};
96+
97+
return createConnectionPool({
98+
connector,
99+
close: async () => {
100+
if (!poolPromise) return;
101+
const pool = await poolPromise;
102+
await pool.close();
103+
poolPromise = null;
104+
},
105+
getConnection: () =>
106+
createDeferredConnection(connector, async () =>
107+
(await getPool()).connection(),
108+
),
109+
});
110+
};

src/packages/dumbo/src/core/execute/execute.ts

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,3 +154,56 @@ export const executeInNewConnection = async <
154154
await connection.close();
155155
}
156156
};
157+
158+
export const createLazyExecutor = (
159+
importExecutor: () => Promise<SQLExecutor>,
160+
): SQLExecutor => {
161+
let executor: SQLExecutor | null = null;
162+
163+
const getExecutor = async (): Promise<SQLExecutor> => {
164+
if (!executor) {
165+
try {
166+
executor = await importExecutor();
167+
} catch (error) {
168+
throw new Error(
169+
`Failed to import SQL executor: ${error instanceof Error ? error.message : String(error)}`,
170+
);
171+
}
172+
}
173+
return executor;
174+
};
175+
176+
return {
177+
query: async <Result extends QueryResultRow = QueryResultRow>(
178+
sql: SQL,
179+
options?: SQLQueryOptions,
180+
): Promise<QueryResult<Result>> => {
181+
const exec = await getExecutor();
182+
return exec.query<Result>(sql, options);
183+
},
184+
185+
batchQuery: async <Result extends QueryResultRow = QueryResultRow>(
186+
sqls: SQL[],
187+
options?: SQLQueryOptions,
188+
): Promise<QueryResult<Result>[]> => {
189+
const exec = await getExecutor();
190+
return exec.batchQuery<Result>(sqls, options);
191+
},
192+
193+
command: async <Result extends QueryResultRow = QueryResultRow>(
194+
sql: SQL,
195+
options?: SQLCommandOptions,
196+
): Promise<QueryResult<Result>> => {
197+
const exec = await getExecutor();
198+
return exec.command<Result>(sql, options);
199+
},
200+
201+
batchCommand: async <Result extends QueryResultRow = QueryResultRow>(
202+
sqls: SQL[],
203+
options?: SQLCommandOptions,
204+
): Promise<QueryResult<Result>[]> => {
205+
const exec = await getExecutor();
206+
return exec.batchCommand<Result>(sqls, options);
207+
},
208+
};
209+
};

0 commit comments

Comments
 (0)