package com.github.jlangch.venice.impl.util;

import com.github.jlangch.venice.VncException;
import com.github.jlangch.venice.impl.Printer;
import com.github.jlangch.venice.impl.types.Coerce;
import com.github.jlangch.venice.impl.types.Constants;
import com.github.jlangch.venice.impl.types.VncFunction;
import com.github.jlangch.venice.impl.types.VncJavaObject;
import com.github.jlangch.venice.impl.types.VncKeyword;
import com.github.jlangch.venice.impl.types.VncVal;
import com.github.jlangch.venice.impl.types.collections.VncHashMap;
import com.github.jlangch.venice.impl.types.collections.VncList;
import com.github.jlangch.venice.impl.types.collections.VncMap;
import com.github.jlangch.venice.impl.util.concurrency.StripedExecutorService;
import com.github.jlangch.venice.impl.util.concurrency.StripedRunnable;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

/* loaded from: input_file:com/github/jlangch/venice/impl/util/Agent.class */
public class Agent {
    private final AtomicReference<VncFunction> errorHandler = new AtomicReference<>();
    private final AtomicReference<Value> value = new AtomicReference<>(new Value(Constants.Nil, null));
    private final Watchable watchable = new Watchable();
    private final long id = agentCounter.getAndIncrement();
    private final boolean continueOnError;
    private static final VncKeyword ERROR_HANDLER = new VncKeyword("error-handler");
    private static final VncKeyword ERROR_MODE = new VncKeyword("error-mode");
    private static final VncKeyword ERROR_MODE_CONTINUE = new VncKeyword("continue");
    private static final VncKeyword ERROR_MODE_FAIL = new VncKeyword("fail");
    private static final AtomicLong agentCounter = new AtomicLong(0);
    private static final AtomicLong sendThreadPoolCounter = new AtomicLong(0);
    private static final AtomicLong sendOffThreadPoolCounter = new AtomicLong(0);
    private static final ExecutorService sendExecutor = new StripedExecutorService(Executors.newFixedThreadPool(2 + Runtime.getRuntime().availableProcessors(), ThreadPoolUtil.createThreadFactory("venice-agent-send-pool-%d", sendThreadPoolCounter, true)));
    private static final ExecutorService sendOffExecutor = new StripedExecutorService(Executors.newCachedThreadPool(ThreadPoolUtil.createThreadFactory("venice-agent-send-off-pool-%d", sendOffThreadPoolCounter, true)));

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/github/jlangch/venice/impl/util/Agent$Action.class */
    public static class Action implements StripedRunnable {
        private final Agent agent;
        private final VncFunction fn;
        private final VncList fnArgs;

        public Action(Agent agent, VncFunction vncFunction, VncList vncList) {
            this.agent = agent;
            this.fn = vncFunction;
            this.fnArgs = vncList;
        }

        @Override // com.github.jlangch.venice.impl.util.concurrency.StripedObject
        public Object getStripe() {
            return Long.valueOf(this.agent.getID());
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                ThreadLocalMap.push(new VncKeyword("*agent*"), new VncJavaObject(this.agent));
                if (this.agent.getError() == null || this.agent.continueOnError) {
                    VncVal vncVal = ((Value) this.agent.value.get()).val;
                    try {
                        VncVal vncVal2 = (VncVal) this.fn.apply(this.fnArgs.addAtStart(vncVal));
                        this.agent.value.set(new Value(vncVal2, null));
                        this.agent.watchable.notifyWatches(new VncJavaObject(this.agent), vncVal, vncVal2);
                    } catch (RuntimeException e) {
                        if (!this.agent.continueOnError) {
                            this.agent.value.set(new Value(vncVal, e));
                        }
                        VncFunction vncFunction = (VncFunction) this.agent.errorHandler.get();
                        if (vncFunction != null) {
                            vncFunction.apply(VncList.of(new VncJavaObject(this.agent), new VncJavaObject(e)));
                        }
                    }
                }
                ThreadLocalMap.pop(new VncKeyword("*agent*"));
            } catch (Throwable th) {
                ThreadLocalMap.pop(new VncKeyword("*agent*"));
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/github/jlangch/venice/impl/util/Agent$Value.class */
    public static class Value {
        private final VncVal val;
        private final RuntimeException ex;

        public Value(VncVal vncVal, RuntimeException runtimeException) {
            this.val = vncVal;
            this.ex = runtimeException;
        }

        public VncVal deref() {
            if (this.ex != null) {
                throw this.ex;
            }
            return this.val;
        }

        public RuntimeException getException() {
            return this.ex;
        }
    }

    public Agent(VncVal vncVal, VncList vncList) {
        this.value.set(new Value(vncVal == null ? Constants.Nil : vncVal, null));
        VncHashMap ofAll = VncHashMap.ofAll(vncList);
        this.errorHandler.set(getErrorHandler(ofAll));
        VncKeyword errorMode = getErrorMode(ofAll);
        this.continueOnError = errorMode == null ? true : errorMode.equals(ERROR_MODE_CONTINUE);
    }

    public long getID() {
        return this.id;
    }

    public VncVal deref() {
        return this.value.get().deref();
    }

    public RuntimeException getError() {
        return this.value.get().getException();
    }

    public void send(VncFunction vncFunction, VncList vncList) {
        sendExecutor.execute(new Action(this, vncFunction, vncList));
    }

    public void send_off(VncFunction vncFunction, VncList vncList) {
        sendOffExecutor.execute(new Action(this, vncFunction, vncList));
    }

    public void restart(VncVal vncVal) {
        this.value.set(new Value(vncVal, null));
    }

    public void addWatch(VncKeyword vncKeyword, VncFunction vncFunction) {
        this.watchable.addWatch(vncKeyword, vncFunction);
    }

    public void removeWatch(VncKeyword vncKeyword) {
        this.watchable.removeWatch(vncKeyword);
    }

    public void setErrorHandler(VncFunction vncFunction) {
        this.errorHandler.set(vncFunction);
    }

    public VncKeyword getErrorMode() {
        return this.continueOnError ? ERROR_MODE_CONTINUE : ERROR_MODE_FAIL;
    }

    public String toString() {
        return toString(true);
    }

    public String toString(boolean z) {
        Value value = this.value.get();
        StringBuilder sb = new StringBuilder();
        sb.append("(agent ");
        if (value.ex != null) {
            sb.append(":error ");
            sb.append(value.ex.getClass().getName());
        }
        sb.append(":value ");
        sb.append(Printer._pr_str(value.val, z));
        sb.append(")");
        return sb.toString();
    }

    public static boolean await(List<Agent> list, long j) {
        final CountDownLatch countDownLatch = new CountDownLatch(list.size() * 2);
        VncFunction vncFunction = new VncFunction() { // from class: com.github.jlangch.venice.impl.util.Agent.1
            private static final long serialVersionUID = 1;

            @Override // java.util.function.Function
            public VncVal apply(VncList vncList) {
                countDownLatch.countDown();
                return vncList.first();
            }
        };
        try {
            list.forEach(agent -> {
                agent.send(vncFunction, new VncList());
            });
            list.forEach(agent2 -> {
                agent2.send_off(vncFunction, new VncList());
            });
            if (j > 0) {
                return countDownLatch.await(j, TimeUnit.MILLISECONDS);
            }
            countDownLatch.await();
            return true;
        } catch (Exception e) {
            throw new VncException("Failed awaiting for agents", e);
        }
    }

    public static void shutdown() {
        sendExecutor.shutdown();
        sendOffExecutor.shutdown();
    }

    public static boolean isShutdown() {
        return sendExecutor.isShutdown() && sendOffExecutor.isShutdown();
    }

    public static void awaitTermination(long j) {
        try {
            sendExecutor.awaitTermination(j, TimeUnit.MILLISECONDS);
            sendOffExecutor.awaitTermination(j, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            throw new VncException("Failed awaiting for executor termination", e);
        }
    }

    public static boolean isTerminated() {
        return sendExecutor.isTerminated() && sendOffExecutor.isTerminated();
    }

    private static VncFunction getErrorHandler(VncMap vncMap) {
        VncVal vncVal;
        if (vncMap == null || (vncVal = vncMap.get(ERROR_HANDLER)) == Constants.Nil) {
            return null;
        }
        return Coerce.toVncFunction(vncVal);
    }

    private static VncKeyword getErrorMode(VncMap vncMap) {
        VncVal vncVal = vncMap == null ? Constants.Nil : vncMap.get(ERROR_MODE);
        if (vncVal == Constants.Nil) {
            return null;
        }
        VncKeyword vncKeyword = Coerce.toVncKeyword(vncVal);
        if (vncKeyword.equals(ERROR_MODE_CONTINUE)) {
            return ERROR_MODE_CONTINUE;
        }
        if (vncKeyword.equals(ERROR_MODE_FAIL)) {
            return ERROR_MODE_FAIL;
        }
        return null;
    }
}
