17
17
import java .util .Objects ;
18
18
import java .util .concurrent .CompletableFuture ;
19
19
import java .util .concurrent .CompletionStage ;
20
+ import java .util .concurrent .atomic .AtomicIntegerFieldUpdater ;
20
21
21
22
import static org .hibernate .reactive .util .impl .CompletionStages .completedFuture ;
22
23
@@ -56,22 +57,55 @@ public abstract class BlockingIdentifierGenerator implements ReactiveIdentifierG
56
57
//to reason about what the current state is and what the CombinerExecutor is
57
58
//supposed to work on.
58
59
private static class GeneratorState {
59
- private int loValue ;
60
- private long hiValue ;
60
+
61
+ private static final class LoHi {
62
+
63
+ private static final AtomicIntegerFieldUpdater <LoHi > LO_UPDATER = AtomicIntegerFieldUpdater .newUpdater (LoHi .class , "lo" );
64
+ private final long hi ;
65
+ private volatile long lo ;
66
+
67
+ LoHi (long hi ) {
68
+ this .hi = hi ;
69
+ this .lo = 1 ;
70
+ }
71
+
72
+ public long next (int blockSize ) {
73
+ if (lo >= blockSize ) {
74
+ return -1 ;
75
+ }
76
+ final long nextLo = LO_UPDATER .getAndIncrement (this );
77
+ if (nextLo < blockSize ) {
78
+ return hi + nextLo ;
79
+ }
80
+ return -1 ;
81
+ }
82
+ }
83
+
84
+ private volatile LoHi loHi ;
85
+
86
+ public long hi (long hi ) {
87
+ loHi = new LoHi (hi );
88
+ return hi ;
89
+ }
90
+
91
+ public long next (int blockSize ) {
92
+ final LoHi loHi = this .loHi ;
93
+ if (loHi == null ) {
94
+ return -1 ;
95
+ }
96
+ return loHi .next (blockSize );
97
+ }
61
98
}
62
99
63
100
//Critical section: needs to be accessed exclusively via the CombinerExecutor
64
101
//when there's contention; direct invocation is allowed in the fast path.
65
- private synchronized long next () {
66
- return state .loValue > 0 && state .loValue < getBlockSize ()
67
- ? state .hiValue + state .loValue ++
68
- : -1 ; //flag value indicating that we need to hit db
102
+ private long next () {
103
+ return state .next (getBlockSize ());
69
104
}
70
105
71
106
//Critical section: needs to be accessed exclusively via the CombinerExecutor
72
- private synchronized long next (long hi ) {
73
- state .hiValue = hi ;
74
- state .loValue = 1 ;
107
+ private long next (long hi ) {
108
+ state .hi (hi );
75
109
return hi ;
76
110
}
77
111
@@ -90,8 +124,7 @@ public CompletionStage<Long> generate(ReactiveConnectionSupplier connectionSuppl
90
124
//if it were to happen we should be better off with direct execution rather than using
91
125
//the co-operative executor:
92
126
if ( getBlockSize () <= 1 ) {
93
- return nextHiValue ( connectionSupplier )
94
- .thenApply ( i -> next ( i ) );
127
+ return nextHiValue ( connectionSupplier ).thenApply ( i -> next ( i ) );
95
128
}
96
129
97
130
final CompletableFuture <Long > resultForThisEventLoop = new CompletableFuture <>();
@@ -108,8 +141,7 @@ public CompletionStage<Long> generate(ReactiveConnectionSupplier connectionSuppl
108
141
} else {
109
142
context .runOnContext ( ( v ) -> resultForThisEventLoop .complete ( id ) );
110
143
}
111
- }
112
- else {
144
+ } else {
113
145
if ( t != null ) {
114
146
resultForThisEventLoop .completeExceptionally ( t );
115
147
} else {
@@ -137,32 +169,29 @@ public Task execute(GeneratorState state) {
137
169
// We don't need to update or initialize the hi
138
170
// value in the table, so just increment the lo
139
171
// value and return the next id in the block
140
- completedFuture ( local )
141
- .whenComplete ( this ::acceptAsReturnValue );
172
+ completedFuture ( local ).whenComplete ( this ::acceptAsReturnValue );
142
173
return null ;
143
174
} else {
144
- nextHiValue ( connectionSupplier )
145
- .whenComplete ( (newlyGeneratedHi , throwable ) -> {
146
- if ( throwable != null ) {
147
- result .completeExceptionally ( throwable );
148
- } else {
149
- //We ignore the state argument as we actually use the field directly
150
- //for convenience, but they are the same object.
151
- executor .submit ( stateIgnored -> {
152
- result .complete ( next ( newlyGeneratedHi ) );
153
- return null ;
154
- });
155
- }
156
- } );
175
+ nextHiValue ( connectionSupplier ).whenComplete ( (newlyGeneratedHi , throwable ) -> {
176
+ if ( throwable != null ) {
177
+ result .completeExceptionally ( throwable );
178
+ } else {
179
+ //We ignore the state argument as we actually use the field directly
180
+ //for convenience, but they are the same object.
181
+ executor .submit ( stateIgnored -> {
182
+ result .complete ( next ( newlyGeneratedHi ) );
183
+ return null ;
184
+ });
185
+ }
186
+ } );
157
187
return null ;
158
188
}
159
189
}
160
190
161
191
private void acceptAsReturnValue (final Long aLong , final Throwable throwable ) {
162
192
if ( throwable != null ) {
163
193
result .completeExceptionally ( throwable );
164
- }
165
- else {
194
+ } else {
166
195
result .complete ( aLong );
167
196
}
168
197
}
0 commit comments