@@ -7,6 +7,7 @@ import { ListenerDuplicateNameException } from '../listener/exception/listener-d
7
7
import { INBOX_OUTBOX_EVENT_PROCESSOR_TOKEN , InboxOutboxEventProcessorContract } from '../processor/inbox-outbox-event-processor.contract' ;
8
8
import { EVENT_CONFIGURATION_RESOLVER_TOKEN , EventConfigurationResolverContract } from '../resolver/event-configuration-resolver.contract' ;
9
9
import { InboxOutboxEvent } from './contract/inbox-outbox-event.interface' ;
10
+ import { DatabaseDriverPersister } from '../driver/database.driver-persister' ;
10
11
11
12
export enum TransactionalEventEmitterOperations {
12
13
persist = 'persist' ,
@@ -30,50 +31,32 @@ export class TransactionalEventEmitter {
30
31
operation : TransactionalEventEmitterOperations ;
31
32
entity : any ;
32
33
} [ ] ,
33
- ) : Promise < void > ;
34
- async emit (
35
- event : InboxOutboxEvent ,
36
- entities : {
37
- operation : TransactionalEventEmitterOperations ;
38
- entity : any ;
39
- } [ ] ,
40
- databaseDriver : DatabaseDriver ,
41
- ) : Promise < void > ;
42
-
43
- async emit (
44
- event : InboxOutboxEvent ,
45
- entities : {
46
- operation : TransactionalEventEmitterOperations ;
47
- entity : any ;
48
- } [ ] ,
49
- databaseDriver ?: DatabaseDriver ,
34
+ customDatabaseDriverPersister ?: DatabaseDriverPersister ,
50
35
) : Promise < void > {
51
36
const eventOptions : InboxOutboxModuleEventOptions = this . options . events . find ( ( optionEvent ) => optionEvent . name === event . name ) ;
52
37
53
38
if ( ! eventOptions ) {
54
39
throw new Error ( `Event ${ event . name } is not configured. Did you forget to add it to the module options?` ) ;
55
40
}
56
41
57
- if ( ! databaseDriver ) {
58
- databaseDriver = this . databaseDriverFactory . create ( this . eventConfigurationResolver ) ;
59
- }
42
+ const databaseDriver = this . databaseDriverFactory . create ( this . eventConfigurationResolver ) ;
60
43
const currentTimestamp = new Date ( ) . getTime ( ) ;
61
44
62
- const inboxOutboxTransportEvent = this . databaseDriverFactory
63
- . create ( this . eventConfigurationResolver )
64
- . createInboxOutboxTransportEvent ( event . name , event , currentTimestamp + eventOptions . listeners . expiresAtTTL , currentTimestamp + eventOptions . listeners . readyToRetryAfterTTL ) ;
45
+ const inboxOutboxTransportEvent = databaseDriver . createInboxOutboxTransportEvent ( event . name , event , currentTimestamp + eventOptions . listeners . expiresAtTTL , currentTimestamp + eventOptions . listeners . readyToRetryAfterTTL ) ;
46
+
47
+ const persister = customDatabaseDriverPersister || databaseDriver ;
65
48
66
49
entities . forEach ( ( entity ) => {
67
- if ( entity . operation === ' persist' ) {
68
- databaseDriver . persist ( entity . entity ) ;
50
+ if ( entity . operation === TransactionalEventEmitterOperations . persist ) {
51
+ persister . persist ( entity . entity ) ;
69
52
}
70
- if ( entity . operation === ' remove' ) {
71
- databaseDriver . remove ( entity . entity ) ;
53
+ if ( entity . operation === TransactionalEventEmitterOperations . remove ) {
54
+ persister . remove ( entity . entity ) ;
72
55
}
73
56
} ) ;
74
57
75
- databaseDriver . persist ( inboxOutboxTransportEvent ) ;
76
- await databaseDriver . flush ( ) ;
58
+ persister . persist ( inboxOutboxTransportEvent ) ;
59
+ await persister . flush ( ) ;
77
60
78
61
this . inboxOutboxEventProcessor . process ( eventOptions , inboxOutboxTransportEvent , this . getListeners ( event . name ) ) ;
79
62
}
0 commit comments