package apoc.periodic;

import apoc.Pools;
import apoc.cypher.Cypher;
import apoc.export.csv.CsvLoaderConfig;
import apoc.util.Util;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Result;
import org.neo4j.graphdb.Transaction;
import org.neo4j.internal.helpers.collection.Iterables;
import org.neo4j.internal.helpers.collection.Iterators;
import org.neo4j.internal.helpers.collection.Pair;
import org.neo4j.logging.Log;
import org.neo4j.procedure.Context;
import org.neo4j.procedure.Description;
import org.neo4j.procedure.Mode;
import org.neo4j.procedure.Name;
import org.neo4j.procedure.Procedure;
import org.neo4j.procedure.TerminationGuard;

/* loaded from: input_file:apoc/periodic/Periodic.class */
public class Periodic {
    public static final String CYPHER_RUNTIME_SLOTTED = "cypher runtime=slotted ";

    @Context
    public GraphDatabaseService db;

    @Context
    public TerminationGuard terminationGuard;

    @Context
    public Log log;

    @Context
    public Pools pools;

    @Context
    public Transaction tx;
    public static final Pattern RUNTIME_PATTERN = Pattern.compile("\\bruntime\\s*=", 2);
    public static final Pattern CYPHER_PREFIX_PATTERN = Pattern.compile("\\bcypher\\b", 2);
    static final Pattern LIMIT_PATTERN = Pattern.compile("\\slimit\\s", 2);

    /* loaded from: input_file:apoc/periodic/Periodic$BatchAndTotalCollector.class */
    public static class BatchAndTotalCollector {
        private final int failedParams;
        private long start = System.nanoTime();
        private AtomicLong batches = new AtomicLong();
        private long successes = 0;
        private AtomicLong count = new AtomicLong();
        private AtomicLong failedOps = new AtomicLong();
        private AtomicLong retried = new AtomicLong();
        private Map<String, Long> operationErrors = new ConcurrentHashMap();
        private AtomicInteger failedBatches = new AtomicInteger();
        private Map<String, Long> batchErrors = new HashMap();
        private Map<String, List<Map<String, Object>>> failedParamsMap = new ConcurrentHashMap();
        private final boolean wasTerminated;

        public BatchAndTotalCollector(TerminationGuard terminationGuard, int i) {
            this.failedParams = i;
            this.wasTerminated = Util.transactionIsTerminated(terminationGuard);
        }

        public BatchAndTotalResult getResult() {
            return new BatchAndTotalResult(this.batches.get(), this.count.get(), TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - this.start), this.successes, this.failedOps.get(), this.failedBatches.get(), this.retried.get(), this.operationErrors, this.batchErrors, this.wasTerminated, this.failedParamsMap);
        }

        public long getBatches() {
            return this.batches.get();
        }

        public long getCount() {
            return this.count.get();
        }

        public void incrementFailedOps(long j) {
            this.failedOps.addAndGet(j);
        }

        public void incrementBatches() {
            this.batches.incrementAndGet();
        }

        public void incrementSuccesses(long j) {
            this.successes += j;
        }

        public void incrementCount(long j) {
            this.count.addAndGet(j);
        }

        public Map<String, Long> getBatchErrors() {
            return this.batchErrors;
        }

        public Map<String, Long> getOperationErrors() {
            return this.operationErrors;
        }

        public Map<String, List<Map<String, Object>>> getFailedParamsMap() {
            return this.failedParamsMap;
        }

        public void amendFailedParamsMap(List<Map<String, Object>> list) {
            if (this.failedParams >= 0) {
                this.failedParamsMap.put(Long.toString(this.batches.get()), new ArrayList(list.subList(0, Math.min(this.failedParams + 1, list.size()))));
            }
        }

        public AtomicInteger getFailedBatches() {
            return this.failedBatches;
        }

        public void incrementRetried() {
            this.retried.incrementAndGet();
        }
    }

    /* loaded from: input_file:apoc/periodic/Periodic$BatchAndTotalResult.class */
    public static class BatchAndTotalResult {
        public final long batches;
        public final long total;
        public final long timeTaken;
        public final long committedOperations;
        public final long failedOperations;
        public final long failedBatches;
        public final long retries;
        public final Map<String, Long> errorMessages;
        public final Map<String, Object> batch;
        public final Map<String, Object> operations;
        public final boolean wasTerminated;
        public final Map<String, List<Map<String, Object>>> failedParams;

        public BatchAndTotalResult(long j, long j2, long j3, long j4, long j5, long j6, long j7, Map<String, Long> map, Map<String, Long> map2, boolean z, Map<String, List<Map<String, Object>>> map3) {
            this.batches = j;
            this.total = j2;
            this.timeTaken = j3;
            this.committedOperations = j4;
            this.failedOperations = j5;
            this.failedBatches = j6;
            this.retries = j7;
            this.errorMessages = map;
            this.wasTerminated = z;
            this.failedParams = map3;
            this.batch = Util.map("total", Long.valueOf(j), "failed", Long.valueOf(j6), "committed", Long.valueOf(j - j6), "errors", map2);
            this.operations = Util.map("total", Long.valueOf(j2), "failed", Long.valueOf(j5), "committed", Long.valueOf(j4), "errors", map);
        }

        public LoopingBatchAndTotalResult inLoop(Object obj) {
            return new LoopingBatchAndTotalResult(obj, this.batches, this.total);
        }
    }

    /* loaded from: input_file:apoc/periodic/Periodic$Countdown.class */
    private class Countdown implements Runnable {
        private final String name;
        private final String statement;
        private final long rate;
        private final transient Log log;

        public Countdown(String str, String str2, long j, Log log) {
            this.name = str;
            this.statement = str2;
            this.rate = j;
            this.log = log;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (Periodic.this.executeNumericResultStatement(this.statement, Collections.emptyMap()) > 0) {
                Periodic.this.pools.getScheduledExecutorService().schedule(() -> {
                    return Periodic.this.submit(this.name, this, this.log);
                }, this.rate, TimeUnit.SECONDS);
            }
        }
    }

    /* loaded from: input_file:apoc/periodic/Periodic$JobInfo.class */
    public static class JobInfo {
        public final String name;
        public long delay;
        public long rate;
        public boolean done;
        public boolean cancelled;

        public JobInfo(String str) {
            this.name = str;
        }

        public JobInfo(String str, long j, long j2) {
            this.name = str;
            this.delay = j;
            this.rate = j2;
        }

        public JobInfo update(Future future) {
            this.done = future.isDone();
            this.cancelled = future.isCancelled();
            return this;
        }

        public boolean equals(Object obj) {
            return this == obj || ((obj instanceof JobInfo) && this.name.equals(((JobInfo) obj).name));
        }

        public int hashCode() {
            return this.name.hashCode();
        }
    }

    /* loaded from: input_file:apoc/periodic/Periodic$LoopingBatchAndTotalResult.class */
    public static class LoopingBatchAndTotalResult {
        public Object loop;
        public long batches;
        public long total;

        public LoopingBatchAndTotalResult(Object obj, long j, long j2) {
            this.loop = obj;
            this.batches = j;
            this.total = j2;
        }
    }

    /* loaded from: input_file:apoc/periodic/Periodic$RundownResult.class */
    public static class RundownResult {
        public final long updates;
        public final long executions;
        public final long runtime;
        public final long batches;
        public final long failedBatches;
        public final Map<String, Long> batchErrors;
        public final long failedCommits;
        public final Map<String, Long> commitErrors;
        public final boolean wasTerminated;

        public RundownResult(long j, long j2, long j3, long j4, long j5, Map<String, Long> map, long j6, Map<String, Long> map2, boolean z) {
            this.updates = j;
            this.executions = j2;
            this.runtime = j3;
            this.batches = j4;
            this.failedBatches = j5;
            this.batchErrors = map;
            this.failedCommits = j6;
            this.commitErrors = map2;
            this.wasTerminated = z;
        }
    }

    @Procedure
    @Description("apoc.periodic.list - list all jobs")
    public Stream<JobInfo> list() {
        return this.pools.getJobList().entrySet().stream().map(entry -> {
            return ((JobInfo) entry.getKey()).update((Future) entry.getValue());
        });
    }

    @Procedure(mode = Mode.WRITE)
    @Description("apoc.periodic.commit(statement,params) - runs the given statement in separate transactions until it returns 0")
    public Stream<RundownResult> commit(@Name("statement") String str, @Name(value = "params", defaultValue = "{}") Map<String, Object> map) throws ExecutionException, InterruptedException {
        validateQuery(str);
        Map<String, Object> emptyMap = map == null ? Collections.emptyMap() : map;
        long j = 0;
        long j2 = 0;
        long j3 = 0;
        long nanoTime = System.nanoTime();
        if (!LIMIT_PATTERN.matcher(str).find()) {
            throw new IllegalArgumentException("the statement sent to apoc.periodic.commit must contain a `limit`");
        }
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicInteger atomicInteger2 = new AtomicInteger();
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        AtomicInteger atomicInteger3 = new AtomicInteger();
        ConcurrentHashMap concurrentHashMap2 = new ConcurrentHashMap();
        do {
            Map map2 = Util.map("_count", Long.valueOf(j3), "_total", Long.valueOf(j));
            j3 = ((Long) Util.getFuture(this.pools.getScheduledExecutorService().submit(() -> {
                atomicInteger.incrementAndGet();
                try {
                    return Long.valueOf(executeNumericResultStatement(str, Util.merge(map2, emptyMap)));
                } catch (Exception e) {
                    atomicInteger3.incrementAndGet();
                    recordError(concurrentHashMap2, e);
                    return 0L;
                }
            }), concurrentHashMap, atomicInteger2, 0L)).longValue();
            j += j3;
            if (j3 > 0) {
                j2++;
            }
            if (j3 <= 0) {
                break;
            }
        } while (!Util.transactionIsTerminated(this.terminationGuard));
        return Stream.of(new RundownResult(j, j2, TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - nanoTime), atomicInteger.get(), atomicInteger3.get(), concurrentHashMap2, atomicInteger2.get(), concurrentHashMap, Util.transactionIsTerminated(this.terminationGuard)));
    }

    private void recordError(Map<String, Long> map, Exception exc) {
        map.compute(ExceptionUtils.getRootCause(exc).getMessage(), (str, l) -> {
            return Long.valueOf(l == null ? 1L : l.longValue() + 1);
        });
    }

    private long executeNumericResultStatement(@Name("statement") String str, @Name("params") Map<String, Object> map) {
        return ((Long) this.db.executeTransactionally(str, map, result -> {
            return Long.valueOf(result.columnAs((String) Iterables.single(result.columns())).stream().mapToLong(obj -> {
                return ((Long) obj).longValue();
            }).sum());
        })).longValue();
    }

    @Procedure
    @Description("apoc.periodic.cancel(name) - cancel job with the given name")
    public Stream<JobInfo> cancel(@Name("name") String str) {
        JobInfo jobInfo = new JobInfo(str);
        Future remove = this.pools.getJobList().remove(jobInfo);
        if (remove == null) {
            return Stream.empty();
        }
        remove.cancel(false);
        return Stream.of(jobInfo.update(remove));
    }

    @Procedure(mode = Mode.WRITE)
    @Description("apoc.periodic.submit('name',statement) - submit a one-off background statement")
    public Stream<JobInfo> submit(@Name("name") String str, @Name("statement") String str2) {
        validateQuery(str2);
        return Stream.of(submit(str, () -> {
            try {
                this.db.executeTransactionally(str2);
            } catch (Exception e) {
                this.log.warn("in background task via submit", e);
                throw new RuntimeException(e);
            }
        }, this.log));
    }

    @Procedure(mode = Mode.WRITE)
    @Description("apoc.periodic.repeat('name',statement,repeat-rate-in-seconds, config) submit a repeatedly-called background statement. Fourth parameter 'config' is optional and can contain 'params' entry for nested statement.")
    public Stream<JobInfo> repeat(@Name("name") String str, @Name("statement") String str2, @Name("rate") long j, @Name(value = "config", defaultValue = "{}") Map<String, Object> map) {
        validateQuery(str2);
        Map map2 = (Map) map.getOrDefault("params", Collections.emptyMap());
        return Stream.of(schedule(str, () -> {
            this.db.executeTransactionally(str2, map2);
        }, 0L, j));
    }

    private void validateQuery(String str) {
        this.db.executeTransactionally("EXPLAIN " + str);
    }

    @Procedure(mode = Mode.WRITE)
    @Description("apoc.periodic.countdown('name',statement,repeat-rate-in-seconds) submit a repeatedly-called background statement until it returns 0")
    public Stream<JobInfo> countdown(@Name("name") String str, @Name("statement") String str2, @Name("rate") long j) {
        validateQuery(str2);
        JobInfo submit = submit(str, new Countdown(str, str2, j, this.log), this.log);
        submit.rate = j;
        return Stream.of(submit);
    }

    public <T> JobInfo submit(String str, Runnable runnable, Log log) {
        JobInfo jobInfo = new JobInfo(str);
        Future remove = this.pools.getJobList().remove(jobInfo);
        if (remove != null && !remove.isDone()) {
            remove.cancel(false);
        }
        this.pools.getJobList().put(jobInfo, this.pools.getScheduledExecutorService().submit(wrapTask(str, runnable, log)));
        return jobInfo;
    }

    public JobInfo schedule(String str, Runnable runnable, long j, long j2) {
        JobInfo jobInfo = new JobInfo(str, j, j2);
        Future remove = this.pools.getJobList().remove(jobInfo);
        if (remove != null && !remove.isDone()) {
            remove.cancel(false);
        }
        this.pools.getJobList().put(jobInfo, this.pools.getScheduledExecutorService().scheduleWithFixedDelay(wrapTask(str, runnable, this.log), j, j2, TimeUnit.SECONDS));
        return jobInfo;
    }

    private static Runnable wrapTask(String str, Runnable runnable, Log log) {
        return () -> {
            log.debug("Executing task " + str);
            try {
                runnable.run();
                log.debug("Executed task " + str);
            } catch (Exception e) {
                log.error("Error while executing task " + str + " because of the following exception (the task will be killed):", e);
                throw e;
            }
        };
    }

    @Procedure(mode = Mode.WRITE)
    @Deprecated
    @Description("apoc.periodic.rock_n_roll_while('some cypher for knowing when to stop', 'some cypher for iteration', 'some cypher as action on each iteration', 10000) YIELD batches, total - run the action statement in batches over the iterator statement's results in a separate thread. Returns number of batches and total processed rows")
    public Stream<LoopingBatchAndTotalResult> rock_n_roll_while(@Name("cypherLoop") String str, @Name("cypherIterate") String str2, @Name("cypherAction") String str3, @Name("batchSize") long j) {
        Result execute;
        validateQueries(Util.map("cypherLoop", str, "cypherIterate", str2));
        Stream<LoopingBatchAndTotalResult> empty = Stream.empty();
        HashMap hashMap = new HashMap(1);
        Object obj = null;
        while (true) {
            hashMap.put("previous", obj);
            execute = this.tx.execute(str, hashMap);
            try {
                obj = execute.next().get("loop");
                if (!Util.toBoolean(obj)) {
                    break;
                }
                if (execute != null) {
                    execute.close();
                }
                this.log.info("starting batched operation using iteration `%s` in separate thread", new Object[]{str2});
                Result execute2 = this.tx.execute(str2);
                try {
                    empty = Stream.concat(empty, iterateAndExecuteBatchedInSeparateThread((int) j, false, false, 0L, execute2, (transaction, map) -> {
                        transaction.execute(str3, map);
                    }, 50, -1).map(batchAndTotalResult -> {
                        return batchAndTotalResult.inLoop(obj);
                    }));
                    if (execute2 != null) {
                        execute2.close();
                    }
                } catch (Throwable th) {
                    if (execute2 != null) {
                        try {
                            execute2.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (Throwable th3) {
                if (execute != null) {
                    try {
                        execute.close();
                    } catch (Throwable th4) {
                        th3.addSuppressed(th4);
                    }
                }
                throw th3;
            }
        }
        Stream<LoopingBatchAndTotalResult> stream = empty;
        if (execute != null) {
            execute.close();
        }
        return stream;
    }

    private void validateQueries(Map<String, String> map) {
        String str = (String) map.entrySet().stream().map(entry -> {
            try {
                validateQuery((String) entry.getValue());
                return null;
            } catch (Exception e) {
                return String.format("Exception for field `%s`, message: %s", entry.getKey(), e.getMessage());
            }
        }).filter(str2 -> {
            return str2 != null;
        }).collect(Collectors.joining("\n"));
        if (!str.isEmpty()) {
            throw new RuntimeException(str);
        }
    }

    @Procedure(mode = Mode.WRITE)
    @Description("apoc.periodic.iterate('statement returning items', 'statement per item', {batchSize:1000,iterateList:true,parallel:false,params:{},concurrency:50,retries:0}) YIELD batches, total - run the second statement for each item returned by the first statement. Returns number of batches and total processed rows")
    public Stream<BatchAndTotalResult> iterate(@Name("cypherIterate") String str, @Name("cypherAction") String str2, @Name("config") Map<String, Object> map) {
        validateQuery(str);
        long longValue = Util.toLong(map.getOrDefault(CsvLoaderConfig.BATCH_SIZE, Integer.valueOf(Cypher.MAX_BATCH))).longValue();
        int intValue = Util.toInteger(map.getOrDefault("concurrency", 50)).intValue();
        boolean z = Util.toBoolean(map.getOrDefault("parallel", false));
        boolean z2 = Util.toBoolean(map.getOrDefault("iterateList", true));
        long longValue2 = Util.toLong(map.getOrDefault("retries", 0)).longValue();
        Map map2 = (Map) map.getOrDefault("params", Collections.emptyMap());
        int intValue2 = Util.toInteger(map.getOrDefault("failedParams", -1)).intValue();
        Result execute = this.tx.execute(slottedRuntime(str), map2);
        try {
            Pair<String, Boolean> prepareInnerStatement = prepareInnerStatement(str2, z2, execute.columns(), "_batch");
            String str3 = (String) prepareInnerStatement.first();
            boolean booleanValue = ((Boolean) prepareInnerStatement.other()).booleanValue();
            this.log.info("starting batching from `%s` operation using iteration `%s` in separate thread", new Object[]{str, str2});
            Stream<BatchAndTotalResult> iterateAndExecuteBatchedInSeparateThread = iterateAndExecuteBatchedInSeparateThread((int) longValue, z, booleanValue, longValue2, execute, (transaction, map3) -> {
                Iterators.count(transaction.execute(str3, Util.merge(map2, map3)));
            }, intValue, intValue2);
            if (execute != null) {
                execute.close();
            }
            return iterateAndExecuteBatchedInSeparateThread;
        } catch (Throwable th) {
            if (execute != null) {
                try {
                    execute.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    static String slottedRuntime(String str) {
        return RUNTIME_PATTERN.matcher(str).find() ? str : CYPHER_PREFIX_PATTERN.matcher(str.substring(0, Math.min(15, str.length()))).find() ? CYPHER_PREFIX_PATTERN.matcher(str).replaceFirst(CYPHER_RUNTIME_SLOTTED) : "cypher runtime=slotted " + str;
    }

    public Pair<String, Boolean> prepareInnerStatement(String str, boolean z, List<String> list, String str2) {
        if (regNoCaseMultiLine("[{$](" + ((String) list.stream().map(Util::quote).collect(Collectors.joining("|"))) + ")\\}?\\s+AS\\s+").matcher(str).find()) {
            return Pair.of(str, false);
        }
        if (!z) {
            return Pair.of(Util.withMapping(list.stream(), str3 -> {
                return Util.param(str3) + " AS " + Util.quote(str3);
            }) + str, false);
        }
        if (regNoCaseMultiLine("UNWIND\\s+[{$]" + str2 + "\\}?\\s+AS\\s+").matcher(str).find()) {
            return Pair.of(str, true);
        }
        return Pair.of("UNWIND " + Util.param(str2) + " AS " + Util.quote(str2) + Util.withMapping(list.stream(), str4 -> {
            return Util.quote(str2) + "." + Util.quote(str4) + " AS " + Util.quote(str4);
        }) + " " + str, true);
    }

    public Pattern regNoCaseMultiLine(String str) {
        return Pattern.compile(str, 42);
    }

    @Procedure(mode = Mode.WRITE)
    @Deprecated
    @Description("apoc.periodic.rock_n_roll('some cypher for iteration', 'some cypher as action on each iteration', 10000) YIELD batches, total - run the action statement in batches over the iterator statement's results in a separate thread. Returns number of batches and total processed rows")
    public Stream<BatchAndTotalResult> rock_n_roll(@Name("cypherIterate") String str, @Name("cypherAction") String str2, @Name("batchSize") long j) {
        validateQueries(Util.map("cypherIterate", str, "cypherAction", str2));
        this.log.info("starting batched operation using iteration `%s` in separate thread", new Object[]{str});
        Result execute = this.tx.execute(str);
        try {
            Stream<BatchAndTotalResult> iterateAndExecuteBatchedInSeparateThread = iterateAndExecuteBatchedInSeparateThread((int) j, false, false, 0L, execute, (transaction, map) -> {
                transaction.execute(str2, map);
            }, 50, -1);
            if (execute != null) {
                execute.close();
            }
            return iterateAndExecuteBatchedInSeparateThread;
        } catch (Throwable th) {
            if (execute != null) {
                try {
                    execute.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private Stream<BatchAndTotalResult> iterateAndExecuteBatchedInSeparateThread(int i, boolean z, boolean z2, long j, Iterator<Map<String, Object>> it, BiConsumer<Transaction, Map<String, Object>> biConsumer, int i2, int i3) {
        ExecutorService defaultExecutorService = z ? this.pools.getDefaultExecutorService() : this.pools.getSingleExecutorService();
        ArrayList arrayList = new ArrayList(i2);
        BatchAndTotalCollector batchAndTotalCollector = new BatchAndTotalCollector(this.terminationGuard, i3);
        while (!Util.transactionIsTerminated(this.terminationGuard)) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("execute in batch no %d batch size ", new Object[]{Integer.valueOf(i)});
            }
            List take = Util.take(it, i);
            long size = take.size();
            arrayList.add(Util.inTxFuture(this.log, defaultExecutorService, this.db, z2 ? transaction -> {
                if (Util.transactionIsTerminated(this.terminationGuard)) {
                    return 0L;
                }
                return Long.valueOf(executeAndReportErrors(transaction, biConsumer, Util.map("_count", Long.valueOf(batchAndTotalCollector.getCount()), "_batch", take), take, take.size(), null, batchAndTotalCollector));
            } : transaction2 -> {
                if (Util.transactionIsTerminated(this.terminationGuard)) {
                    return 0L;
                }
                AtomicLong atomicLong = new AtomicLong(batchAndTotalCollector.getCount());
                return Long.valueOf(take.stream().map(map -> {
                    if (atomicLong.get() % 1000 == 0 && Util.transactionIsTerminated(this.terminationGuard)) {
                        return 0;
                    }
                    return Long.valueOf(executeAndReportErrors(transaction2, biConsumer, Util.merge(map, Util.map("_count", Long.valueOf(atomicLong.get()), "_batch", take)), take, 1, atomicLong, batchAndTotalCollector));
                }).mapToLong(number -> {
                    return ((Long) number).longValue();
                }).sum());
            }, j, l -> {
                batchAndTotalCollector.incrementRetried();
            }, r3 -> {
                batchAndTotalCollector.incrementBatches();
            }));
            batchAndTotalCollector.incrementCount(size);
            if (!it.hasNext()) {
                break;
            }
        }
        batchAndTotalCollector.incrementSuccesses(arrayList.stream().mapToLong(Util.transactionIsTerminated(this.terminationGuard) ? future -> {
            return ((Long) Util.getFutureOrCancel(future, batchAndTotalCollector.getBatchErrors(), batchAndTotalCollector.getFailedBatches(), 0L)).longValue();
        } : future2 -> {
            return ((Long) Util.getFuture(future2, batchAndTotalCollector.getBatchErrors(), batchAndTotalCollector.getFailedBatches(), 0L)).longValue();
        }).sum());
        Util.logErrors("Error during iterate.commit:", batchAndTotalCollector.getBatchErrors(), this.log);
        Util.logErrors("Error during iterate.execute:", batchAndTotalCollector.getOperationErrors(), this.log);
        return Stream.of(batchAndTotalCollector.getResult());
    }

    private long executeAndReportErrors(Transaction transaction, BiConsumer<Transaction, Map<String, Object>> biConsumer, Map<String, Object> map, List<Map<String, Object>> list, int i, AtomicLong atomicLong, BatchAndTotalCollector batchAndTotalCollector) {
        try {
            biConsumer.accept(transaction, map);
            if (atomicLong != null) {
                atomicLong.getAndIncrement();
            }
            return i;
        } catch (Exception e) {
            batchAndTotalCollector.incrementFailedOps(list.size());
            batchAndTotalCollector.amendFailedParamsMap(list);
            recordError(batchAndTotalCollector.getOperationErrors(), e);
            throw e;
        }
    }
}
