1
1
import { Inject , Injectable } from '@nestjs/common' ;
2
2
import { DATABASE_DRIVER_FACTORY_TOKEN , DatabaseDriverFactory } from '../driver/database-driver.factory' ;
3
+ import { DatabaseDriver } from '../driver/database.driver' ;
3
4
import { InboxOutboxModuleEventOptions , InboxOutboxModuleOptions , MODULE_OPTIONS_TOKEN } from '../inbox-outbox.module-definition' ;
4
5
import { IListener } from '../listener/contract/listener.interface' ;
5
6
import { ListenerDuplicateNameException } from '../listener/exception/listener-duplicate-name.exception' ;
6
7
import { INBOX_OUTBOX_EVENT_PROCESSOR_TOKEN , InboxOutboxEventProcessorContract } from '../processor/inbox-outbox-event-processor.contract' ;
7
8
import { EVENT_CONFIGURATION_RESOLVER_TOKEN , EventConfigurationResolverContract } from '../resolver/event-configuration-resolver.contract' ;
8
9
import { InboxOutboxEvent } from './contract/inbox-outbox-event.interface' ;
10
+ import { DatabaseDriverPersister } from '../driver/database.driver-persister' ;
9
11
10
12
export enum TransactionalEventEmitterOperations {
11
13
persist = 'persist' ,
@@ -21,14 +23,15 @@ export class TransactionalEventEmitter {
21
23
@Inject ( DATABASE_DRIVER_FACTORY_TOKEN ) private databaseDriverFactory : DatabaseDriverFactory ,
22
24
@Inject ( INBOX_OUTBOX_EVENT_PROCESSOR_TOKEN ) private inboxOutboxEventProcessor : InboxOutboxEventProcessorContract ,
23
25
@Inject ( EVENT_CONFIGURATION_RESOLVER_TOKEN ) private eventConfigurationResolver : EventConfigurationResolverContract ,
24
- ) { }
26
+ ) { }
25
27
26
28
async emit (
27
29
event : InboxOutboxEvent ,
28
30
entities : {
29
31
operation : TransactionalEventEmitterOperations ;
30
32
entity : any ;
31
33
} [ ] ,
34
+ customDatabaseDriverPersister ?: DatabaseDriverPersister ,
32
35
) : Promise < void > {
33
36
const eventOptions : InboxOutboxModuleEventOptions = this . options . events . find ( ( optionEvent ) => optionEvent . name === event . name ) ;
34
37
@@ -39,24 +42,21 @@ export class TransactionalEventEmitter {
39
42
const databaseDriver = this . databaseDriverFactory . create ( this . eventConfigurationResolver ) ;
40
43
const currentTimestamp = new Date ( ) . getTime ( ) ;
41
44
42
- const inboxOutboxTransportEvent = databaseDriver . createInboxOutboxTransportEvent (
43
- event . name ,
44
- event ,
45
- currentTimestamp + eventOptions . listeners . expiresAtTTL ,
46
- currentTimestamp + eventOptions . listeners . readyToRetryAfterTTL ,
47
- ) ;
45
+ const inboxOutboxTransportEvent = databaseDriver . createInboxOutboxTransportEvent ( event . name , event , currentTimestamp + eventOptions . listeners . expiresAtTTL , currentTimestamp + eventOptions . listeners . readyToRetryAfterTTL ) ;
46
+
47
+ const persister = customDatabaseDriverPersister ?? databaseDriver ;
48
48
49
49
entities . forEach ( ( entity ) => {
50
- if ( entity . operation === ' persist' ) {
51
- databaseDriver . persist ( entity . entity ) ;
50
+ if ( entity . operation === TransactionalEventEmitterOperations . persist ) {
51
+ persister . persist ( entity . entity ) ;
52
52
}
53
- if ( entity . operation === ' remove' ) {
54
- databaseDriver . remove ( entity . entity ) ;
53
+ if ( entity . operation === TransactionalEventEmitterOperations . remove ) {
54
+ persister . remove ( entity . entity ) ;
55
55
}
56
56
} ) ;
57
57
58
- databaseDriver . persist ( inboxOutboxTransportEvent ) ;
59
- await databaseDriver . flush ( ) ;
58
+ persister . persist ( inboxOutboxTransportEvent ) ;
59
+ await persister . flush ( ) ;
60
60
61
61
this . inboxOutboxEventProcessor . process ( eventOptions , inboxOutboxTransportEvent , this . getListeners ( event . name ) ) ;
62
62
}
0 commit comments