Skip to content

Commit 2e7ab27

Browse files
committed
[NOID] Backport runMany updates
1 parent a98bc35 commit 2e7ab27

File tree

2 files changed

+130
-111
lines changed

2 files changed

+130
-111
lines changed

core/src/main/java/apoc/cypher/Cypher.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ private Object consumeResult(Result result, BlockingQueue<RowResult> queue, bool
239239
}
240240
}
241241

242-
private String removeShellControlCommands(String stmt) {
242+
public static String removeShellControlCommands(String stmt) {
243243
Matcher matcher = shellControl.matcher(stmt.trim());
244244
if (matcher.find()) {
245245
// an empty file get transformed into ":begin\n:commit" and that statement is not matched by the pattern
@@ -257,7 +257,7 @@ private boolean isPeriodicOperation(String stmt) {
257257
return stmt.matches("(?is).*using\\s+periodic.*");
258258
}
259259

260-
private Map<String, Object> toMap(QueryStatistics stats, long time, long rows) {
260+
protected static Map<String, Object> toMap(QueryStatistics stats, long time, long rows) {
261261
final Map<String, Object> map = map(
262262
"rows", rows,
263263
"time", time);

full/src/main/java/apoc/cypher/CypherExtended.java

Lines changed: 128 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,12 @@
4545
import java.util.List;
4646
import java.util.Map;
4747
import java.util.Scanner;
48+
import java.util.Spliterator;
4849
import java.util.concurrent.ArrayBlockingQueue;
4950
import java.util.concurrent.BlockingQueue;
5051
import java.util.concurrent.ExecutionException;
5152
import java.util.concurrent.Future;
53+
import java.util.concurrent.atomic.AtomicBoolean;
5254
import java.util.function.Consumer;
5355
import java.util.regex.Matcher;
5456
import java.util.regex.Pattern;
@@ -59,6 +61,7 @@
5961
import org.neo4j.graphdb.QueryStatistics;
6062
import org.neo4j.graphdb.Result;
6163
import org.neo4j.graphdb.Transaction;
64+
import org.neo4j.graphdb.security.AuthorizationViolationException;
6265
import org.neo4j.internal.helpers.collection.Iterators;
6366
import org.neo4j.logging.Log;
6467
import org.neo4j.procedure.Context;
@@ -76,7 +79,6 @@
7679
@Extended
7780
public class CypherExtended {
7881

79-
public static final String COMPILED_PREFIX = "CYPHER runtime=" + Util.COMPILED;
8082
public static final int PARTITIONS = 100 * Runtime.getRuntime().availableProcessors();
8183
public static final int MAX_BATCH = 10000;
8284

@@ -134,42 +136,84 @@ private Stream<RowResult> runNonSchemaFiles(
134136
@SuppressWarnings("unchecked")
135137
final Map<String, Object> parameters =
136138
(Map<String, Object>) config.getOrDefault("parameters", Collections.emptyMap());
137-
final boolean schemaOperation = false;
138-
return runFiles(fileNames, config, parameters, schemaOperation, defaultStatistics);
139+
return runFiles(fileNames, config, parameters, defaultStatistics);
139140
}
140141

141142
// This runs the files sequentially
142143
private Stream<RowResult> runFiles(
143-
List<String> fileNames,
144-
Map<String, Object> config,
145-
Map<String, Object> parameters,
146-
boolean schemaOperation,
147-
boolean defaultStatistics) {
144+
List<String> fileNames, Map<String, Object> config, Map<String, Object> params, boolean defaultStatistics) {
148145
boolean reportError = Util.toBoolean(config.get("reportError"));
149146
boolean addStatistics = Util.toBoolean(config.getOrDefault("statistics", defaultStatistics));
150-
int timeout = Util.toInteger(config.getOrDefault("timeout", 10));
151-
int queueCapacity = Util.toInteger(config.getOrDefault("queueCapacity", 100));
152-
var result = fileNames.stream().flatMap(fileName -> {
147+
return fileNames.stream().flatMap(fileName -> {
153148
final Reader reader = readerForFile(fileName);
154-
final Scanner scanner = createScannerFor(reader);
155-
return runManyStatements(
156-
scanner,
157-
parameters,
158-
schemaOperation,
159-
addStatistics,
160-
timeout,
161-
queueCapacity,
162-
reportError,
163-
fileName)
164-
.onClose(() -> Util.close(
165-
scanner,
166-
(e) -> log.info(
167-
"Cannot close the scanner for file " + fileName
168-
+ " because the following exception",
169-
e)));
149+
AtomicBoolean hasFailed = new AtomicBoolean(false);
150+
return Iterators.stream(new Scanner(reader).useDelimiter(";\r?\n"))
151+
.map(Cypher::removeShellControlCommands)
152+
.filter(s -> !s.isBlank())
153+
.flatMap(s -> streamInNewTx(s, params, addStatistics, fileName, reportError, hasFailed));
170154
});
155+
}
156+
157+
private Stream<RowResult> streamInNewTx(
158+
String cypher,
159+
Map<String, Object> params,
160+
boolean stats,
161+
String fileName,
162+
boolean reportError,
163+
AtomicBoolean hasFailed) {
164+
if (hasFailed.get()) return null;
165+
else if (isPeriodicOperation(cypher))
166+
return streamInNewExplicitTx(cypher, params, stats, fileName, reportError);
167+
final var innerTx = db.beginTx();
168+
try {
169+
// Hello fellow wanderer,
170+
// At this point you may have questions like;
171+
// - "Why do we execute this statement in a new transaction?"
172+
// My guess is as good as yours. This is the way of the apoc. Safe travels.
173+
174+
final var results = new RunManyResultSpliterator(innerTx.execute(cypher, params), stats, fileName, tx);
175+
return StreamSupport.stream(results, false).onClose(results::close).onClose(innerTx::commit);
176+
} catch (AuthorizationViolationException accessModeException) {
177+
// We meet again, few people make it this far into this world!
178+
// I hope you're not still seeking answers, there are few to give.
179+
// It has been written, in some long forgotten commits,
180+
// that failures of this kind should be avoided. The ancestors
181+
// were brave and used a regex based cypher parser to avoid
182+
// trying to execute schema changing statements all together.
183+
// We don't have that courage, and try to forget about it
184+
// after the fact instead.
185+
// One can only hope that by keeping this tradition alive,
186+
// in some form, we make some poor souls happier.
187+
innerTx.close();
188+
return Stream.empty();
189+
} catch (Throwable t) {
190+
innerTx.close();
191+
hasFailed.set(true);
192+
if (reportError) {
193+
String error = t.getMessage();
194+
return Stream.of(new RowResult(-1, Map.of("error", error), fileName));
195+
} else {
196+
return null;
197+
}
198+
}
199+
}
171200

172-
return result;
201+
private Stream<RowResult> streamInNewExplicitTx(
202+
String cypher, Map<String, Object> params, boolean stats, String fileName, boolean reportError) {
203+
try {
204+
final var results = new RunManyResultSpliterator(
205+
db.executeTransactionally(cypher, params, result -> result), stats, fileName, tx);
206+
return StreamSupport.stream(results, false).onClose(results::close);
207+
} catch (AuthorizationViolationException accessModeException) {
208+
return Stream.empty();
209+
} catch (Throwable t) {
210+
if (reportError) {
211+
String error = t.getMessage();
212+
return Stream.of(new RowResult(-1, Map.of("error", error), fileName));
213+
} else {
214+
return null;
215+
}
216+
}
173217
}
174218

175219
@Procedure(mode = Mode.SCHEMA)
@@ -186,34 +230,8 @@ public Stream<RowResult> runSchemaFile(
186230
public Stream<RowResult> runSchemaFiles(
187231
@Name("file") List<String> fileNames,
188232
@Name(value = "config", defaultValue = "{}") Map<String, Object> config) {
189-
final boolean schemaOperation = true;
190233
final Map<String, Object> parameters = Collections.emptyMap();
191-
return runFiles(fileNames, config, parameters, schemaOperation, true);
192-
}
193-
194-
private Stream<RowResult> runManyStatements(
195-
Scanner scanner,
196-
Map<String, Object> params,
197-
boolean schemaOperation,
198-
boolean addStatistics,
199-
int timeout,
200-
int queueCapacity,
201-
boolean reportError,
202-
String fileName) {
203-
BlockingQueue<RowResult> queue = runInSeparateThreadAndSendTombstone(
204-
queueCapacity,
205-
internalQueue -> {
206-
if (schemaOperation) {
207-
runSchemaStatementsInTx(
208-
scanner, internalQueue, params, addStatistics, timeout, reportError, fileName);
209-
} else {
210-
runDataStatementsInTx(
211-
scanner, internalQueue, params, addStatistics, timeout, reportError, fileName);
212-
}
213-
},
214-
RowResult.TOMBSTONE);
215-
return StreamSupport.stream(
216-
new QueueBasedSpliterator<>(queue, RowResult.TOMBSTONE, terminationGuard, Integer.MAX_VALUE), false);
234+
return runFiles(fileNames, config, parameters, true);
217235
}
218236

219237
private <T> BlockingQueue<T> runInSeparateThreadAndSendTombstone(
@@ -246,7 +264,6 @@ private void runDataStatementsInTx(
246264
BlockingQueue<RowResult> queue,
247265
Map<String, Object> params,
248266
boolean addStatistics,
249-
long timeout,
250267
boolean reportError,
251268
String fileName) {
252269
while (scanner.hasNext()) {
@@ -294,43 +311,6 @@ private void collectError(BlockingQueue<RowResult> queue, boolean reportError, E
294311
QueueUtil.put(queue, result, 10);
295312
}
296313

297-
private Scanner createScannerFor(Reader reader) {
298-
Scanner scanner = new Scanner(reader);
299-
scanner.useDelimiter(";\r?\n");
300-
return scanner;
301-
}
302-
303-
private void runSchemaStatementsInTx(
304-
Scanner scanner,
305-
BlockingQueue<RowResult> queue,
306-
Map<String, Object> params,
307-
boolean addStatistics,
308-
long timeout,
309-
boolean reportError,
310-
String fileName) {
311-
while (scanner.hasNext()) {
312-
String stmt = removeShellControlCommands(scanner.next());
313-
if (stmt.trim().isEmpty()) continue;
314-
boolean schemaOperation;
315-
try {
316-
schemaOperation = isSchemaOperation(stmt);
317-
} catch (Exception e) {
318-
collectError(queue, reportError, e, fileName);
319-
return;
320-
}
321-
if (schemaOperation) {
322-
Util.inTx(db, pools, txInThread -> {
323-
try (Result result = txInThread.execute(stmt, params)) {
324-
return consumeResult(result, queue, addStatistics, tx, fileName);
325-
} catch (Exception e) {
326-
collectError(queue, reportError, e, fileName);
327-
return null;
328-
}
329-
});
330-
}
331-
}
332-
}
333-
334314
private static final Pattern shellControl =
335315
Pattern.compile("^:?\\b(begin|commit|rollback)\\b", Pattern.CASE_INSENSITIVE);
336316

@@ -419,10 +399,6 @@ public static String withParamMapping(String fragment, Collection<String> keys)
419399
return declaration + fragment;
420400
}
421401

422-
public static String compiled(String fragment) {
423-
return fragment.substring(0, 6).equalsIgnoreCase("cypher") ? fragment : COMPILED_PREFIX + fragment;
424-
}
425-
426402
@Procedure
427403
@Description(
428404
"apoc.cypher.parallel(fragment, `paramMap`, `keyList`) yield value - executes fragments in parallel through a list defined in `paramMap` with a key `keyList`")
@@ -445,18 +421,6 @@ public Stream<MapResult> parallel(
445421
parallelParams.replace(key, v);
446422
return tx.execute(statement, parallelParams).stream().map(MapResult::new);
447423
});
448-
449-
/*
450-
params.entrySet().stream()
451-
.filter( e -> asCollection(e.getValue()).size() > 100)
452-
.map( (e) -> (Map.Entry<String,Collection>)(Map.Entry)e )
453-
.max( (max,e) -> e.getValue().size() )
454-
.map( (e) -> e.getValue().parallelStream().map( (v) -> {
455-
Map map = new HashMap<>(params);
456-
map.put(e.getKey(),as)
457-
}));
458-
return db.execute(statement,params).stream().map(MapResult::new);
459-
*/
460424
}
461425

462426
@Procedure
@@ -590,3 +554,58 @@ private Future<List<Map<String, Object>>> submit(
590554
});
591555
}
592556
}
557+
558+
class RunManyResultSpliterator implements Spliterator<CypherExtended.RowResult>, AutoCloseable {
559+
private final Result result;
560+
private final long start;
561+
private boolean statistics;
562+
private String fileName;
563+
private int rowCount;
564+
565+
private Transaction transaction;
566+
567+
RunManyResultSpliterator(Result result, boolean statistics, String fileName, Transaction transaction) {
568+
this.result = result;
569+
this.start = System.currentTimeMillis();
570+
this.statistics = statistics;
571+
this.fileName = fileName;
572+
this.transaction = transaction;
573+
}
574+
575+
@Override
576+
public boolean tryAdvance(Consumer<? super CypherExtended.RowResult> action) {
577+
if (result.hasNext()) {
578+
Map<String, Object> res = EntityUtil.anyRebind(transaction, result.next());
579+
action.accept(new CypherExtended.RowResult(rowCount++, res, fileName));
580+
return true;
581+
} else if (statistics) {
582+
final var stats =
583+
CypherExtended.toMap(result.getQueryStatistics(), System.currentTimeMillis() - start, rowCount);
584+
statistics = false;
585+
action.accept(new CypherExtended.RowResult(-1, stats, fileName));
586+
return true;
587+
}
588+
close();
589+
return false;
590+
}
591+
592+
@Override
593+
public Spliterator<CypherExtended.RowResult> trySplit() {
594+
return null;
595+
}
596+
597+
@Override
598+
public long estimateSize() {
599+
return result.hasNext() ? Long.MAX_VALUE : 1;
600+
}
601+
602+
@Override
603+
public int characteristics() {
604+
return Spliterator.ORDERED;
605+
}
606+
607+
@Override
608+
public void close() {
609+
result.close();
610+
}
611+
}

0 commit comments

Comments
 (0)