3
3
import java .util .Deque ;
4
4
import java .util .concurrent .ConcurrentLinkedDeque ;
5
5
import java .util .concurrent .atomic .AtomicReference ;
6
+ import java .util .concurrent .atomic .LongAdder ;
6
7
import java .util .function .Consumer ;
7
8
import java .util .function .Function ;
8
-
9
+ import java .util .function .Supplier ;
10
+
11
+ /**
12
+ * This is a utility class, whose main functionality is pooling object
13
+ * with a huge memory footprint and that are costly to be recreated at
14
+ * every usage like the {@link BufferRecycler}. It is intended for
15
+ * internal use only.
16
+ *
17
+ * @since 2.16
18
+ */
9
19
public interface ObjectPool <T > extends AutoCloseable {
10
20
11
21
T acquire ();
@@ -21,7 +31,22 @@ default void withPooledObject(Consumer<T> objectConsumer) {
21
31
}
22
32
23
33
enum Strategy {
24
- CONCURRENT_DEQUEUE , LOCK_FREE
34
+ CONCURRENT_DEQUEUE (ConcurrentDequePool ::new , false ), LOCK_FREE (LockFreePool ::new , false ),
35
+ DEBUG_CONCURRENT_DEQUEUE (ConcurrentDequePool ::new , true ), DEBUG_LOCK_FREE (LockFreePool ::new , true );
36
+
37
+ private final Function <Supplier , ObjectPool > constructor ;
38
+
39
+ private final boolean debug ;
40
+
41
+ Strategy (Function <Supplier , ObjectPool > constructor , boolean debug ) {
42
+ this .constructor = constructor ;
43
+ this .debug = debug ;
44
+ }
45
+
46
+ <T > ObjectPool <T > newObjectPool (Supplier <T > factory ) {
47
+ ObjectPool <T > pool = constructor .apply (factory );
48
+ return debug ? new DebugPoolDecorator <>(pool ) : pool ;
49
+ }
25
50
}
26
51
27
52
class StrategyHolder {
@@ -32,33 +57,29 @@ public static void setStrategy(String name) {
32
57
}
33
58
}
34
59
35
- static <T > ObjectPool <T > newObjectPool (Function <ObjectPool <T >, T > factory ) {
36
- switch (StrategyHolder .strategy ) {
37
- case CONCURRENT_DEQUEUE : return new ConcurrentDequePool <>(factory );
38
- case LOCK_FREE : return new LockFreePool <>(factory );
39
- }
40
- throw new UnsupportedOperationException ();
60
+ static <T > ObjectPool <T > newObjectPool (Supplier <T > factory ) {
61
+ return StrategyHolder .strategy .newObjectPool (factory );
41
62
}
42
63
43
64
class ConcurrentDequePool <T > implements ObjectPool <T > {
44
- private final Function < ObjectPool < T >, T > factory ;
65
+ private final Supplier < T > factory ;
45
66
private final Consumer <T > destroyer ;
46
67
47
68
private final Deque <T > pool = new ConcurrentLinkedDeque <>();
48
69
49
- public ConcurrentDequePool (Function < ObjectPool < T >, T > factory ) {
70
+ public ConcurrentDequePool (Supplier < T > factory ) {
50
71
this (factory , null );
51
72
}
52
73
53
- public ConcurrentDequePool (Function < ObjectPool < T >, T > factory , Consumer <T > destroyer ) {
74
+ public ConcurrentDequePool (Supplier < T > factory , Consumer <T > destroyer ) {
54
75
this .factory = factory ;
55
76
this .destroyer = destroyer ;
56
77
}
57
78
58
79
@ Override
59
80
public T acquire () {
60
81
T t = pool .pollFirst ();
61
- return t != null ? t : factory .apply ( this );
82
+ return t != null ? t : factory .get ( );
62
83
}
63
84
64
85
@ Override
@@ -77,9 +98,9 @@ public void close() throws Exception {
77
98
class LockFreePool <T > implements ObjectPool <T > {
78
99
private final AtomicReference <Node <T >> head = new AtomicReference <>();
79
100
80
- private final Function < ObjectPool < T >, T > factory ;
101
+ private final Supplier < T > factory ;
81
102
82
- public LockFreePool (Function < ObjectPool < T >, T > factory ) {
103
+ public LockFreePool (Supplier < T > factory ) {
83
104
this .factory = factory ;
84
105
}
85
106
@@ -88,14 +109,14 @@ public T acquire() {
88
109
for (int i = 0 ; i < 3 ; i ++) {
89
110
Node <T > currentHead = head .get ();
90
111
if (currentHead == null ) {
91
- return factory .apply ( this );
112
+ return factory .get ( );
92
113
}
93
114
if (head .compareAndSet (currentHead , currentHead .next )) {
94
115
currentHead .next = null ;
95
116
return currentHead .value ;
96
117
}
97
118
}
98
- return factory .apply ( this );
119
+ return factory .get ( );
99
120
}
100
121
101
122
@ Override
@@ -123,4 +144,42 @@ static class Node<T> {
123
144
}
124
145
}
125
146
}
147
+
148
+ class DebugPoolDecorator <T > implements ObjectPool <T > {
149
+
150
+ private final ObjectPool <T > pool ;
151
+
152
+ private final LongAdder acquireCounter = new LongAdder ();
153
+ private final LongAdder releaseCounter = new LongAdder ();
154
+
155
+ public DebugPoolDecorator (ObjectPool <T > pool ) {
156
+ this .pool = pool ;
157
+ }
158
+
159
+ @ Override
160
+ public T acquire () {
161
+ acquireCounter .increment ();
162
+ return pool .acquire ();
163
+ }
164
+
165
+ @ Override
166
+ public void release (T t ) {
167
+ releaseCounter .increment ();
168
+ pool .release (t );
169
+ }
170
+
171
+ @ Override
172
+ public void close () throws Exception {
173
+ System .out .println ("Closing " + this );
174
+ pool .close ();
175
+ }
176
+
177
+ @ Override
178
+ public String toString () {
179
+ return "DebugPoolDecorator{" +
180
+ "acquires = " + acquireCounter .sum () +
181
+ ", releases = " + releaseCounter .sum () +
182
+ '}' ;
183
+ }
184
+ }
126
185
}
0 commit comments