@@ -8,6 +8,8 @@ import com.segment.analytics.kotlin.core.utilities.SegmentInstant
8
8
import com.segment.analytics.kotlin.core.utilities.getString
9
9
import com.segment.analytics.kotlin.core.utilities.putInContext
10
10
import com.segment.analytics.kotlin.core.utilities.updateJsonObject
11
+ import com.segment.analytics.kotlin.core.utilities.set
12
+ import com.segment.analytics.kotlin.core.utils.StubAfterPlugin
11
13
import com.segment.analytics.kotlin.core.utils.StubPlugin
12
14
import com.segment.analytics.kotlin.core.utils.TestRunPlugin
13
15
import com.segment.analytics.kotlin.core.utils.clearPersistentStorage
@@ -17,7 +19,6 @@ import io.mockk.*
17
19
import kotlinx.coroutines.runBlocking
18
20
import kotlinx.coroutines.test.TestScope
19
21
import kotlinx.coroutines.test.UnconfinedTestDispatcher
20
- import kotlinx.coroutines.test.runBlockingTest
21
22
import kotlinx.coroutines.test.runTest
22
23
import kotlinx.serialization.json.buildJsonObject
23
24
import kotlinx.serialization.json.jsonObject
@@ -34,6 +35,7 @@ import java.io.ByteArrayInputStream
34
35
import java.net.HttpURLConnection
35
36
import java.util.Date
36
37
import java.util.UUID
38
+ import java.util.concurrent.Semaphore
37
39
38
40
@TestInstance(TestInstance .Lifecycle .PER_CLASS )
39
41
class AnalyticsTests {
@@ -979,4 +981,174 @@ class AnalyticsTests {
979
981
context = baseContext
980
982
integrations = emptyJsonObject
981
983
}
984
+ }
985
+
986
+ class AsyncAnalyticsTests {
987
+ private lateinit var analytics: Analytics
988
+
989
+ private lateinit var afterPlugin: StubAfterPlugin
990
+
991
+ private lateinit var httpSemaphore: Semaphore
992
+
993
+ private lateinit var assertSemaphore: Semaphore
994
+
995
+ private lateinit var actual: CapturingSlot <BaseEvent >
996
+
997
+ init {
998
+ httpSemaphore = Semaphore (0 )
999
+ assertSemaphore = Semaphore (0 )
1000
+
1001
+ val settings = """
1002
+ {"integrations":{"Segment.io":{"apiKey":"1vNgUqwJeCHmqgI9S1sOm9UHCyfYqbaQ"}},"plan":{},"edgeFunction":{}}
1003
+ """ .trimIndent()
1004
+ mockkConstructor(HTTPClient ::class )
1005
+ val settingsStream = ByteArrayInputStream (
1006
+ settings.toByteArray()
1007
+ )
1008
+ val httpConnection: HttpURLConnection = mockk()
1009
+ val connection = object : Connection (httpConnection, settingsStream, null ) {}
1010
+ every { anyConstructed<HTTPClient >().settings(" cdn-settings.segment.com/v1" ) } answers {
1011
+ // suspend http calls until we tracked events
1012
+ // this will force events get into startup queue
1013
+ httpSemaphore.acquire()
1014
+ connection
1015
+ }
1016
+
1017
+ afterPlugin = spyk(StubAfterPlugin ())
1018
+ actual = slot<BaseEvent >()
1019
+ every { afterPlugin.execute(capture(actual)) } answers {
1020
+ val input = firstArg<BaseEvent ?>()
1021
+ // since this is an after plugin, when its execute function is called,
1022
+ // it is guaranteed that the enrichment closure has been called.
1023
+ // so we can release the semaphore on assertions.
1024
+ assertSemaphore.release()
1025
+ input
1026
+ }
1027
+
1028
+ }
1029
+
1030
+
1031
+ @BeforeEach
1032
+ fun setup () {
1033
+ clearPersistentStorage()
1034
+ analytics = Analytics (Configuration (writeKey = " 123" , application = " Test" ))
1035
+ }
1036
+
1037
+ @Test
1038
+ fun `startup queue should replay with track enrichment closure` () {
1039
+ val expectedEvent = " foo"
1040
+ val expectedAnonymousId = " bar"
1041
+
1042
+ analytics.add(afterPlugin)
1043
+ analytics.track(expectedEvent) {
1044
+ it?.anonymousId = expectedAnonymousId
1045
+ it
1046
+ }
1047
+
1048
+ // now we have tracked event, i.e. event added to startup queue
1049
+ // release the semaphore put on http client, so we startup queue will replay the events
1050
+ httpSemaphore.release()
1051
+ // now we need to wait for events being fully replayed before making assertions
1052
+ assertSemaphore.acquire()
1053
+
1054
+ assertTrue(actual.isCaptured)
1055
+ actual.captured.let {
1056
+ assertTrue(it is TrackEvent )
1057
+ val e = it as TrackEvent
1058
+ assertTrue(e.properties.isEmpty())
1059
+ assertEquals(expectedEvent, e.event)
1060
+ assertEquals(expectedAnonymousId, e.anonymousId)
1061
+ }
1062
+ }
1063
+
1064
+ @Test
1065
+ fun `startup queue should replay with identify enrichment closure` () {
1066
+ val expected = buildJsonObject {
1067
+ put(" foo" , " baz" )
1068
+ }
1069
+ val expectedUserId = " newUserId"
1070
+
1071
+ analytics.add(afterPlugin)
1072
+ analytics.identify(expectedUserId) {
1073
+ if (it is IdentifyEvent ) {
1074
+ it.traits = updateJsonObject(it.traits) {
1075
+ it[" foo" ] = " baz"
1076
+ }
1077
+ }
1078
+ it
1079
+ }
1080
+
1081
+ // now we have tracked event, i.e. event added to startup queue
1082
+ // release the semaphore put on http client, so we startup queue will replay the events
1083
+ httpSemaphore.release()
1084
+ // now we need to wait for events being fully replayed before making assertions
1085
+ assertSemaphore.acquire()
1086
+
1087
+ val actualUserId = analytics.userId()
1088
+
1089
+ assertTrue(actual.isCaptured)
1090
+ actual.captured.let {
1091
+ assertTrue(it is IdentifyEvent )
1092
+ val e = it as IdentifyEvent
1093
+ assertEquals(expected, e.traits)
1094
+ assertEquals(expectedUserId, actualUserId)
1095
+ }
1096
+ }
1097
+
1098
+ @Test
1099
+ fun `startup queue should replay with group enrichment closure` () {
1100
+ val expected = buildJsonObject {
1101
+ put(" foo" , " baz" )
1102
+ }
1103
+ val expectedGroupId = " foo"
1104
+
1105
+ analytics.add(afterPlugin)
1106
+ analytics.group(expectedGroupId) {
1107
+ if (it is GroupEvent ) {
1108
+ it.traits = updateJsonObject(it.traits) {
1109
+ it[" foo" ] = " baz"
1110
+ }
1111
+ }
1112
+ it
1113
+ }
1114
+
1115
+ // now we have tracked event, i.e. event added to startup queue
1116
+ // release the semaphore put on http client, so we startup queue will replay the events
1117
+ httpSemaphore.release()
1118
+ // now we need to wait for events being fully replayed before making assertions
1119
+ assertSemaphore.acquire()
1120
+
1121
+ assertTrue(actual.isCaptured)
1122
+ actual.captured.let {
1123
+ assertTrue(it is GroupEvent )
1124
+ val e = it as GroupEvent
1125
+ assertEquals(expected, e.traits)
1126
+ assertEquals(expectedGroupId, e.groupId)
1127
+ }
1128
+ }
1129
+
1130
+ @Test
1131
+ fun `startup queue should replay with alias enrichment closure` () {
1132
+ val expected = " bar"
1133
+
1134
+ analytics.add(afterPlugin)
1135
+ analytics.alias(expected) {
1136
+ it?.anonymousId = " test"
1137
+ it
1138
+ }
1139
+
1140
+ // now we have tracked event, i.e. event added to startup queue
1141
+ // release the semaphore put on http client, so we startup queue will replay the events
1142
+ httpSemaphore.release()
1143
+ // now we need to wait for events being fully replayed before making assertions
1144
+ assertSemaphore.acquire()
1145
+
1146
+ assertTrue(actual.isCaptured)
1147
+ actual.captured.let {
1148
+ assertTrue(it is AliasEvent )
1149
+ val e = it as AliasEvent
1150
+ assertEquals(expected, e.userId)
1151
+ assertEquals(" test" , e.anonymousId)
1152
+ }
1153
+ }
982
1154
}
0 commit comments