Skip to content

Commit 3e9b2eb

Browse files
committed
Added connection strings for the database and initial resolution made using them
1 parent bf2b5a2 commit 3e9b2eb

File tree

11 files changed

+508
-21
lines changed

11 files changed

+508
-21
lines changed

src/packages/dumbo/src/benchmarks/index.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,13 @@ import 'dotenv/config';
22

33
import Benchmark from 'benchmark';
44
import pg from 'pg';
5-
import { rawSql, single } from '..';
5+
import { postgreSQLConnectionString, rawSql, single } from '..';
66
import { defaultPostgreSQLConenctionString, dumbo } from '../pg';
77

8-
const connectionString =
8+
const connectionString = postgreSQLConnectionString(
99
process.env.BENCHMARK_POSTGRESQL_CONNECTION_STRING ??
10-
defaultPostgreSQLConenctionString;
10+
defaultPostgreSQLConenctionString,
11+
);
1112

1213
const pooled = process.env.BENCHMARK_CONNECTION_POOLED === 'true';
1314

src/packages/dumbo/src/core/connections/connection.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ export interface Connection<
1414
Connector extends ConnectorType = ConnectorType,
1515
DbClient = unknown,
1616
> extends WithSQLExecutor,
17-
DatabaseTransactionFactory<Connector> {
17+
DatabaseTransactionFactory<Connector, DbClient> {
1818
connector: Connector;
1919
open: () => Promise<DbClient>;
2020
close: () => Promise<void>;

src/packages/dumbo/src/core/index.ts

Lines changed: 268 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,17 @@
1-
import type { Connection, ConnectionPool } from './connections';
2-
import type { ConnectorType } from './connectors';
1+
import {
2+
SQL,
3+
type QueryResult,
4+
type QueryResultRow,
5+
type SQLCommandOptions,
6+
type SQLExecutor,
7+
type SQLQueryOptions,
8+
} from '../core';
9+
import {
10+
createConnectionPool,
11+
type Connection,
12+
type ConnectionPool,
13+
} from './connections';
14+
import type { ConnectorType, ConnectorTypeParts } from './connectors';
315

416
export * from './connections';
517
export * from './connectors';
@@ -11,11 +23,261 @@ export * from './serializer';
1123
export * from './sql';
1224
export * from './tracing';
1325

14-
export type DumboOptions<Connector extends ConnectorType = ConnectorType> = {
15-
connector?: Connector;
16-
};
17-
1826
export type Dumbo<
1927
Connector extends ConnectorType = ConnectorType,
2028
ConnectionType extends Connection<Connector> = Connection<Connector>,
2129
> = ConnectionPool<ConnectionType>;
30+
31+
export type PostgreSQLConnectionString =
32+
| `postgresql://${string}`
33+
| `postgres://${string}`;
34+
35+
export const postgreSQLConnectionString = (
36+
connectionString: string,
37+
): PostgreSQLConnectionString => {
38+
if (
39+
!connectionString.startsWith('postgresql://') &&
40+
!connectionString.startsWith('postgres://')
41+
) {
42+
throw new Error(
43+
`Invalid PostgreSQL connection string: ${connectionString}. It should start with "postgresql://".`,
44+
);
45+
}
46+
return connectionString as PostgreSQLConnectionString;
47+
};
48+
49+
export type SQLiteConnectionString =
50+
| `file:${string}`
51+
| `:memory:`
52+
| `/${string}`
53+
| `./${string}`;
54+
55+
// Helper type to infer the connector type based on connection string
56+
export type InferConnector<T extends DatabaseConnectionString> =
57+
T extends PostgreSQLConnectionString
58+
? ConnectorType<'PostgreSQL', 'pg'>
59+
: T extends SQLiteConnectionString
60+
? ConnectorType<'SQLite', 'sqlite3'>
61+
: never;
62+
63+
// Helper type to infer the connection type based on connection string
64+
export type InferConnection<T extends DatabaseConnectionString> =
65+
T extends PostgreSQLConnectionString
66+
? Connection<ConnectorType<'PostgreSQL', 'pg'>>
67+
: T extends SQLiteConnectionString
68+
? Connection<ConnectorType<'SQLite', 'sqlite3'>>
69+
: never;
70+
71+
export type DatabaseConnectionString =
72+
| PostgreSQLConnectionString
73+
| SQLiteConnectionString;
74+
75+
export type DumboConnectionOptions<
76+
T extends DatabaseConnectionString = DatabaseConnectionString,
77+
> = {
78+
connectionString: T;
79+
};
80+
81+
export const createLazyExecutor = (
82+
importExecutor: () => Promise<SQLExecutor>,
83+
): SQLExecutor => {
84+
let executor: SQLExecutor | null = null;
85+
86+
const getExecutor = async (): Promise<SQLExecutor> => {
87+
if (!executor) {
88+
try {
89+
executor = await importExecutor();
90+
} catch (error) {
91+
throw new Error(
92+
`Failed to import SQL executor: ${error instanceof Error ? error.message : String(error)}`,
93+
);
94+
}
95+
}
96+
return executor;
97+
};
98+
99+
return {
100+
query: async <Result extends QueryResultRow = QueryResultRow>(
101+
sql: SQL,
102+
options?: SQLQueryOptions,
103+
): Promise<QueryResult<Result>> => {
104+
const exec = await getExecutor();
105+
return exec.query<Result>(sql, options);
106+
},
107+
108+
batchQuery: async <Result extends QueryResultRow = QueryResultRow>(
109+
sqls: SQL[],
110+
options?: SQLQueryOptions,
111+
): Promise<QueryResult<Result>[]> => {
112+
const exec = await getExecutor();
113+
return exec.batchQuery<Result>(sqls, options);
114+
},
115+
116+
command: async <Result extends QueryResultRow = QueryResultRow>(
117+
sql: SQL,
118+
options?: SQLCommandOptions,
119+
): Promise<QueryResult<Result>> => {
120+
const exec = await getExecutor();
121+
return exec.command<Result>(sql, options);
122+
},
123+
124+
batchCommand: async <Result extends QueryResultRow = QueryResultRow>(
125+
sqls: SQL[],
126+
options?: SQLCommandOptions,
127+
): Promise<QueryResult<Result>[]> => {
128+
const exec = await getExecutor();
129+
return exec.batchCommand<Result>(sqls, options);
130+
},
131+
};
132+
};
133+
134+
export const createDeferredConnection = <Connector extends ConnectorType>(
135+
connector: Connector,
136+
importConnection: () => Promise<Connection<Connector>>,
137+
): Connection<Connector> => {
138+
const getConnection = importConnection();
139+
140+
const execute = createLazyExecutor(async () => {
141+
const conn = await getConnection;
142+
return conn.execute;
143+
});
144+
145+
const connection: Connection<Connector> = {
146+
connector,
147+
execute,
148+
149+
open: async (): Promise<unknown> => {
150+
const conn = await getConnection;
151+
return conn.open();
152+
},
153+
154+
close: async (): Promise<void> => {
155+
if (getConnection) {
156+
const conn = await getConnection;
157+
await conn.close();
158+
}
159+
},
160+
161+
transaction: () => {
162+
const transaction = getConnection.then((c) => c.transaction());
163+
164+
return {
165+
connector,
166+
connection,
167+
execute: createLazyExecutor(async () => (await transaction).execute),
168+
begin: async () => (await transaction).begin(),
169+
commit: async () => (await transaction).commit(),
170+
rollback: async () => (await transaction).rollback(),
171+
};
172+
},
173+
withTransaction: async (handle) => {
174+
const connection = await getConnection;
175+
return connection.withTransaction(handle);
176+
},
177+
};
178+
179+
return connection;
180+
};
181+
182+
export const createDeferredConnectionPool = <Connector extends ConnectorType>(
183+
connector: Connector,
184+
importPool: () => Promise<ConnectionPool<Connection<Connector>>>,
185+
): ConnectionPool<Connection<Connector>> => {
186+
let poolPromise: Promise<ConnectionPool<Connection<Connector>>> | null = null;
187+
188+
const getPool = async (): Promise<ConnectionPool<Connection<Connector>>> => {
189+
if (!poolPromise) {
190+
try {
191+
poolPromise = importPool();
192+
} catch (error) {
193+
throw new Error(
194+
`Failed to import connection pool: ${error instanceof Error ? error.message : String(error)}`,
195+
);
196+
}
197+
}
198+
return poolPromise;
199+
};
200+
201+
return createConnectionPool({
202+
connector,
203+
getConnection: () =>
204+
createDeferredConnection(connector, async () =>
205+
(await getPool()).connection(),
206+
),
207+
});
208+
};
209+
210+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
211+
const importDrivers: Record<string, () => Promise<any>> = {
212+
'postgresql:pg': () => import('../storage/postgresql/pg'),
213+
'sqlite:sqlite3': () => import('../storage/sqlite/sqlite3'),
214+
};
215+
216+
export const parseConnectionString = (
217+
connectionString: DatabaseConnectionString,
218+
): ConnectorTypeParts => {
219+
if (
220+
connectionString.startsWith('postgresql://') ||
221+
connectionString.startsWith('postgres://')
222+
) {
223+
return {
224+
databaseType: 'postgresql',
225+
driverName: 'pg',
226+
};
227+
}
228+
229+
if (
230+
connectionString.startsWith('file:') ||
231+
connectionString === ':memory:' ||
232+
connectionString.startsWith('/') ||
233+
connectionString.startsWith('./')
234+
) {
235+
return {
236+
databaseType: 'sqlite',
237+
driverName: 'sqlite3',
238+
};
239+
}
240+
241+
throw new Error(
242+
`Unsupported database connection string: ${connectionString}`,
243+
);
244+
};
245+
246+
export function dumbo<
247+
ConnectionString extends DatabaseConnectionString,
248+
DatabaseOptions extends DumboConnectionOptions<ConnectionString>,
249+
>(options: DatabaseOptions): ConnectionPool<InferConnection<ConnectionString>> {
250+
const { connectionString } = options;
251+
252+
const { databaseType, driverName } = parseConnectionString(connectionString);
253+
254+
const connector: InferConnection<ConnectionString>['connector'] =
255+
`${databaseType}:${driverName}` as InferConnection<ConnectionString>['connector'];
256+
257+
const importDriver = importDrivers[connector];
258+
if (!importDriver) {
259+
throw new Error(`Unsupported connector: ${connector}`);
260+
}
261+
262+
const importAndCreatePool = async () => {
263+
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
264+
const module = await importDriver();
265+
266+
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
267+
const poolFactory: (options: {
268+
connectionString: string;
269+
}) => ConnectionPool<InferConnection<ConnectionString>> =
270+
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
271+
'dumbo' in module ? module.dumbo : undefined;
272+
273+
if (poolFactory === undefined)
274+
throw new Error(`No pool factory found for connector: ${connector}`);
275+
276+
return poolFactory({ connectionString });
277+
};
278+
279+
return createDeferredConnectionPool(
280+
connector,
281+
importAndCreatePool,
282+
) as ConnectionPool<InferConnection<ConnectionString>>;
283+
}

src/packages/dumbo/src/pg.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,2 @@
1-
export * from './core';
21
export * from './storage/postgresql/core';
32
export * from './storage/postgresql/pg';

src/packages/dumbo/src/sqlite3.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,2 @@
1-
export * from './core';
21
export * from './storage/sqlite/core';
32
export * from './storage/sqlite/sqlite3';

src/packages/dumbo/src/storage/postgresql/core/schema/migrations.int.spec.ts

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,26 @@ import assert from 'assert';
66
import { after, before, beforeEach, describe, it } from 'node:test';
77
import { tableExists } from '..';
88
import { type Dumbo } from '../../../..';
9-
import { count, rawSql, sql } from '../../../../core';
10-
import { type SQLMigration, MIGRATIONS_LOCK_ID } from '../../../../core/schema';
9+
import {
10+
count,
11+
postgreSQLConnectionString,
12+
rawSql,
13+
sql,
14+
type PostgreSQLConnectionString,
15+
} from '../../../../core';
16+
import { MIGRATIONS_LOCK_ID, type SQLMigration } from '../../../../core/schema';
1117
import { dumbo } from '../../../../pg';
1218
import { acquireAdvisoryLock, releaseAdvisoryLock } from '../locks';
1319
import { runPostgreSQLMigrations } from './migrations';
1420

1521
void describe('Migration Integration Tests', () => {
1622
let pool: Dumbo;
1723
let postgres: StartedPostgreSqlContainer;
18-
let connectionString: string;
24+
let connectionString: PostgreSQLConnectionString;
1925

2026
before(async () => {
2127
postgres = await new PostgreSqlContainer().start();
22-
connectionString = postgres.getConnectionUri();
28+
connectionString = postgreSQLConnectionString(postgres.getConnectionUri());
2329
pool = dumbo({ connectionString });
2430
});
2531

0 commit comments

Comments
 (0)