diff --git a/packages/core/src/driver/database.driver-persister.ts b/packages/core/src/driver/database.driver-persister.ts new file mode 100644 index 0000000..bbb4a9d --- /dev/null +++ b/packages/core/src/driver/database.driver-persister.ts @@ -0,0 +1,8 @@ +export interface DatabaseDriverPersister { + + persist(entity: T): void; + + remove(entity: T): void; + + flush(): Promise; +} \ No newline at end of file diff --git a/packages/core/src/driver/database.driver.ts b/packages/core/src/driver/database.driver.ts index cef7eb7..aab4157 100644 --- a/packages/core/src/driver/database.driver.ts +++ b/packages/core/src/driver/database.driver.ts @@ -1,14 +1,7 @@ import { InboxOutboxTransportEvent } from '../model/inbox-outbox-transport-event.interface'; +import { DatabaseDriverPersister } from './database.driver-persister'; - -export interface DatabaseDriver { +export interface DatabaseDriver extends DatabaseDriverPersister { createInboxOutboxTransportEvent(eventName: string, eventPayload: any, expireAt: number, readyToRetryAfter: number | null): InboxOutboxTransportEvent; - findAndExtendReadyToRetryEvents(limit: number): Promise; - - persist(entity: T): Promise; - - remove(entity: T): Promise; - - flush(): Promise; } diff --git a/packages/core/src/emitter/contract/inbox-outbox-event.interface.ts b/packages/core/src/emitter/contract/inbox-outbox-event.interface.ts index aa31a23..34d44e4 100644 --- a/packages/core/src/emitter/contract/inbox-outbox-event.interface.ts +++ b/packages/core/src/emitter/contract/inbox-outbox-event.interface.ts @@ -1,6 +1,6 @@ export abstract class InboxOutboxEvent { /** - * @description Should be unique static name of the event + * @description Should be unique name of the event */ - name: string; + public abstract readonly name: string; } diff --git a/packages/core/src/emitter/transactional-event-emitter.ts b/packages/core/src/emitter/transactional-event-emitter.ts index 6a805fd..3e4ec30 100644 --- a/packages/core/src/emitter/transactional-event-emitter.ts +++ b/packages/core/src/emitter/transactional-event-emitter.ts @@ -1,11 +1,13 @@ import { Inject, Injectable } from '@nestjs/common'; import { DATABASE_DRIVER_FACTORY_TOKEN, DatabaseDriverFactory } from '../driver/database-driver.factory'; +import { DatabaseDriver } from '../driver/database.driver'; import { InboxOutboxModuleEventOptions, InboxOutboxModuleOptions, MODULE_OPTIONS_TOKEN } from '../inbox-outbox.module-definition'; import { IListener } from '../listener/contract/listener.interface'; import { ListenerDuplicateNameException } from '../listener/exception/listener-duplicate-name.exception'; import { INBOX_OUTBOX_EVENT_PROCESSOR_TOKEN, InboxOutboxEventProcessorContract } from '../processor/inbox-outbox-event-processor.contract'; import { EVENT_CONFIGURATION_RESOLVER_TOKEN, EventConfigurationResolverContract } from '../resolver/event-configuration-resolver.contract'; import { InboxOutboxEvent } from './contract/inbox-outbox-event.interface'; +import { DatabaseDriverPersister } from '../driver/database.driver-persister'; export enum TransactionalEventEmitterOperations { persist = 'persist', @@ -21,7 +23,7 @@ export class TransactionalEventEmitter { @Inject(DATABASE_DRIVER_FACTORY_TOKEN) private databaseDriverFactory: DatabaseDriverFactory, @Inject(INBOX_OUTBOX_EVENT_PROCESSOR_TOKEN) private inboxOutboxEventProcessor: InboxOutboxEventProcessorContract, @Inject(EVENT_CONFIGURATION_RESOLVER_TOKEN) private eventConfigurationResolver: EventConfigurationResolverContract, - ) {} + ) {} async emit( event: InboxOutboxEvent, @@ -29,6 +31,7 @@ export class TransactionalEventEmitter { operation: TransactionalEventEmitterOperations; entity: any; }[], + customDatabaseDriverPersister?: DatabaseDriverPersister, ): Promise { const eventOptions: InboxOutboxModuleEventOptions = this.options.events.find((optionEvent) => optionEvent.name === event.name); @@ -39,24 +42,21 @@ export class TransactionalEventEmitter { const databaseDriver = this.databaseDriverFactory.create(this.eventConfigurationResolver); const currentTimestamp = new Date().getTime(); - const inboxOutboxTransportEvent = databaseDriver.createInboxOutboxTransportEvent( - event.name, - event, - currentTimestamp + eventOptions.listeners.expiresAtTTL, - currentTimestamp + eventOptions.listeners.readyToRetryAfterTTL, - ); + const inboxOutboxTransportEvent = databaseDriver.createInboxOutboxTransportEvent(event.name, event, currentTimestamp + eventOptions.listeners.expiresAtTTL, currentTimestamp + eventOptions.listeners.readyToRetryAfterTTL); + + const persister = customDatabaseDriverPersister ?? databaseDriver; entities.forEach((entity) => { - if (entity.operation === 'persist') { - databaseDriver.persist(entity.entity); + if (entity.operation === TransactionalEventEmitterOperations.persist) { + persister.persist(entity.entity); } - if (entity.operation === 'remove') { - databaseDriver.remove(entity.entity); + if (entity.operation === TransactionalEventEmitterOperations.remove) { + persister.remove(entity.entity); } }); - databaseDriver.persist(inboxOutboxTransportEvent); - await databaseDriver.flush(); + persister.persist(inboxOutboxTransportEvent); + await persister.flush(); this.inboxOutboxEventProcessor.process(eventOptions, inboxOutboxTransportEvent, this.getListeners(event.name)); }