package com.tc.async.impl;

import com.tc.async.api.ConfigurationContext;
import com.tc.async.api.EventHandler;
import com.tc.async.api.EventHandlerException;
import com.tc.async.api.MultiThreadedEventContext;
import com.tc.async.api.Sink;
import com.tc.async.api.Source;
import com.tc.async.api.Stage;
import com.tc.async.impl.StageQueue;
import com.tc.exception.TCNotRunningException;
import com.tc.exception.TCRuntimeException;
import com.tc.exception.TCServerRestartException;
import com.tc.exception.TCShutdownServerException;
import com.tc.logging.TCLoggerProvider;
import com.tc.management.TerracottaManagement;
import com.tc.properties.TCPropertiesImpl;
import com.tc.util.concurrent.QueueFactory;
import com.tc.util.concurrent.ThreadUtil;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;

/* loaded from: input_file:com/tc/async/impl/StageImpl.class */
public class StageImpl<EC> implements Stage<EC> {
    private static final long pollTime = 3000;
    private final String name;
    private final EventHandler<EC> handler;
    private final StageQueue<EC> stageQueue;
    private final WorkerThread[] threads;
    private final ThreadGroup group;
    private final Logger logger;
    private final int sleepMs;
    private final boolean pausable;
    private volatile boolean paused;
    private volatile boolean shutdown = true;
    private final AtomicInteger inflight = new AtomicInteger();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/tc/async/impl/StageImpl$WorkerThread.class */
    public class WorkerThread<EC> extends Thread {
        private final Source source;
        private final EventHandler<EC> handler;
        private volatile boolean idle;
        private final Object idleLock;
        private boolean waitingForIdle;
        private long idleTime;
        private long runTime;
        private long count;

        public WorkerThread(String str, Source source, EventHandler<EC> eventHandler) {
            super(StageImpl.this.group, str);
            this.idle = false;
            this.idleLock = new Object();
            this.waitingForIdle = false;
            this.idleTime = 0L;
            this.runTime = 0L;
            this.count = 0L;
            setDaemon(true);
            this.source = source;
            this.handler = eventHandler;
        }

        private void handleStageDebugPauses() {
            if (StageImpl.this.sleepMs > 0) {
                ThreadUtil.reallySleep(StageImpl.this.sleepMs);
            }
            while (true) {
                if (!StageImpl.this.paused && (!StageImpl.this.pausable || !"paused".equalsIgnoreCase(System.getProperty(StageImpl.this.name)))) {
                    return;
                }
                if (!StageImpl.this.paused) {
                    StageImpl.this.logger.info("Stage paused, sleeping for 1s");
                }
                ThreadUtil.reallySleep(1000L);
            }
        }

        public boolean isIdle() {
            return this.idle;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (true) {
                if (StageImpl.this.shutdown && this.source.isEmpty()) {
                    return;
                }
                Event event = null;
                try {
                    try {
                        try {
                            setToIdle();
                            long nanoTime = System.nanoTime();
                            event = this.source.poll(StageImpl.pollTime);
                            if (event != null) {
                                long nanoTime2 = System.nanoTime();
                                this.idle = false;
                                handleStageDebugPauses();
                                this.idleTime += nanoTime2 - nanoTime;
                                event.call();
                                this.runTime += System.nanoTime() - nanoTime2;
                                this.count++;
                            } else {
                                this.idleTime += System.nanoTime() - nanoTime;
                            }
                        } catch (TCShutdownServerException e) {
                            throw e;
                        } catch (Exception e2) {
                            if (!StageImpl.isTCNotRunningException(e2)) {
                                StageImpl.this.logger.error("Uncaught exception in stage", e2);
                                throw new TCRuntimeException("Uncaught exception in stage", e2);
                            }
                            if (!StageImpl.this.shutdown) {
                                StageImpl.this.logger.info("Ignoring " + TCNotRunningException.class.getSimpleName() + " while handling context: " + event);
                            }
                        }
                    } catch (EventHandlerException e3) {
                        if (!StageImpl.this.shutdown) {
                            throw new TCRuntimeException(e3);
                        }
                    } catch (TCServerRestartException e4) {
                        throw e4;
                    } catch (InterruptedException e5) {
                        if (!StageImpl.this.shutdown) {
                            throw new TCRuntimeException(e5);
                        }
                    }
                } catch (Throwable th) {
                    throw th;
                }
            }
        }

        private void setToIdle() {
            if (this.idle || !this.source.isEmpty()) {
                return;
            }
            this.idle = true;
            synchronized (this.idleLock) {
                if (this.waitingForIdle) {
                    this.idleLock.notifyAll();
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void waitForIdleUninterruptibly() {
            boolean z = false;
            boolean z2 = false;
            while (!z2) {
                try {
                    waitForIdle();
                    z2 = true;
                } catch (InterruptedException e) {
                    z = true;
                }
            }
            if (z) {
                Thread.currentThread().interrupt();
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void waitForIdle() throws InterruptedException {
            while (!this.idle) {
                synchronized (this.idleLock) {
                    this.waitingForIdle = true;
                    this.idleLock.wait();
                    this.waitingForIdle = false;
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Map<String, ?> getStats() {
            LinkedHashMap linkedHashMap = new LinkedHashMap();
            linkedHashMap.put("idle", Long.valueOf(this.idleTime));
            linkedHashMap.put("run", Long.valueOf(this.runTime));
            linkedHashMap.put("processed", Long.valueOf(this.count));
            linkedHashMap.put("backlog", Integer.valueOf(this.source.size()));
            return linkedHashMap;
        }
    }

    public StageImpl(TCLoggerProvider tCLoggerProvider, String str, Class<EC> cls, EventHandler<EC> eventHandler, int i, ThreadGroup threadGroup, QueueFactory queueFactory, int i2, boolean z) {
        this.logger = tCLoggerProvider.getLogger(Stage.class.getName() + ": " + str);
        this.name = str;
        if (i > 1 && !MultiThreadedEventContext.class.isAssignableFrom(cls)) {
            throw new IllegalArgumentException("the requested queue count is greater than one but the event type is not multi-threaded for stage:" + this.name);
        }
        this.threads = new WorkerThread[i];
        this.handler = eventHandler;
        StageQueue.StageQueueFactory stageQueueFactory = StageQueue.FACTORY;
        this.stageQueue = StageQueue.StageQueueFactory.factory(i, queueFactory, cls, eventCreator(z), tCLoggerProvider, str, i2);
        this.group = threadGroup;
        this.sleepMs = TCPropertiesImpl.getProperties().getInt("seda." + str + ".sleepMs", 0);
        if (this.sleepMs > 0) {
            this.logger.warn("Sleep of " + this.sleepMs + "ms enabled for stage " + str);
        }
        this.pausable = TCPropertiesImpl.getProperties().getBoolean("seda." + str + ".pausable", false);
        if (this.pausable) {
            this.logger.warn("Stage pausing is enabled for stage " + str);
        }
    }

    private EventCreator<EC> eventCreator(boolean z) {
        return z ? new DirectEventCreator(baseCreator(), () -> {
            return Boolean.valueOf(this.inflight.get() == 0);
        }) : baseCreator();
    }

    private EventCreator<EC> baseCreator() {
        return obj -> {
            this.inflight.incrementAndGet();
            return () -> {
                try {
                    this.handler.handleEvent(obj);
                } finally {
                    this.inflight.decrementAndGet();
                }
            };
        };
    }

    @Override // com.tc.async.api.Stage
    public boolean isEmpty() {
        return this.inflight.get() == 0;
    }

    @Override // com.tc.async.api.Stage
    public int size() {
        return this.inflight.get();
    }

    public void trackExtraStatistics(boolean z) {
        this.stageQueue.enableAdditionalStatistics(z);
    }

    @Override // com.tc.async.api.Stage
    public void destroy() {
        synchronized (this) {
            if (this.shutdown) {
                return;
            }
            this.shutdown = true;
            this.stageQueue.close();
            stopThreads();
            this.handler.destroy();
        }
    }

    @Override // com.tc.async.api.Stage
    public void start(ConfigurationContext configurationContext) {
        synchronized (this) {
            if (this.shutdown) {
                this.shutdown = false;
                this.handler.initializeContext(configurationContext);
                startThreads();
            }
        }
    }

    @Override // com.tc.async.api.Stage
    public Sink<EC> getSink() {
        return this.stageQueue;
    }

    @Override // com.tc.async.api.Stage
    public int pause() {
        this.paused = true;
        return this.inflight.get();
    }

    @Override // com.tc.async.api.Stage
    public void unpause() {
        this.paused = false;
    }

    @Override // com.tc.async.api.Stage
    public void clear() {
        boolean interrupted = Thread.interrupted();
        this.stageQueue.clear();
        for (WorkerThread workerThread : this.threads) {
            if (workerThread != null) {
                try {
                    workerThread.waitForIdle();
                } catch (InterruptedException e) {
                    interrupted = true;
                }
            }
        }
        if (interrupted) {
            Thread.currentThread().interrupt();
        }
    }

    private synchronized void startThreads() {
        for (int i = 0; i < this.threads.length; i++) {
            String str = "WorkerThread(" + this.name + ", " + i;
            this.threads[i] = new WorkerThread(this.threads.length > 1 ? str + ", " + this.stageQueue.getSource(i).getSourceName() + ")" : str + ")", this.stageQueue.getSource(i), this.handler);
            this.threads[i].start();
        }
    }

    private synchronized void stopThreads() {
        for (WorkerThread workerThread : this.threads) {
            try {
                workerThread.join();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override // com.tc.async.api.Stage
    public String getName() {
        return this.name;
    }

    public String toString() {
        return "StageImpl(" + this.name + ")";
    }

    void waitForIdle() {
        Arrays.stream(this.threads).forEach(workerThread -> {
            workerThread.waitForIdleUninterruptibly();
        });
    }

    @Override // com.tc.async.api.Stage
    public Map<String, ?> getState() {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        ArrayList arrayList = new ArrayList(this.threads.length);
        Arrays.stream(this.threads).forEach(workerThread -> {
            if (workerThread != null) {
                arrayList.add(workerThread.getStats());
            }
        });
        linkedHashMap.put(TerracottaManagement.MBeanKeys.NAME, this.name);
        linkedHashMap.put("threadCount", Integer.valueOf(this.threads.length));
        linkedHashMap.put("backlog", Integer.valueOf(this.inflight.get()));
        linkedHashMap.put("sink", this.stageQueue.getState());
        linkedHashMap.put("threads", arrayList);
        return linkedHashMap;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static boolean isTCNotRunningException(Throwable th) {
        Throwable th2 = null;
        while (th != null) {
            th2 = th;
            th = th.getCause();
        }
        return th2 instanceof TCNotRunningException;
    }
}
