package com.espertech.esper.core.thread;

import com.espertech.esper.client.ConfigurationEngineDefaults;
import com.espertech.esper.core.service.EPRuntimeImpl;
import com.espertech.esper.core.service.EPServicesContext;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:com/espertech/esper/core/thread/ThreadingServiceImpl.class */
public class ThreadingServiceImpl implements ThreadingService {
    private static final Log log = LogFactory.getLog(ThreadingServiceImpl.class);
    private final ConfigurationEngineDefaults.Threading config;
    private final boolean isTimerThreading;
    private final boolean isInboundThreading;
    private final boolean isRouteThreading;
    private final boolean isOutboundThreading;
    private BlockingQueue<Runnable> timerQueue;
    private BlockingQueue<Runnable> inboundQueue;
    private BlockingQueue<Runnable> routeQueue;
    private BlockingQueue<Runnable> outboundQueue;
    private ThreadPoolExecutor timerThreadPool;
    private ThreadPoolExecutor inboundThreadPool;
    private ThreadPoolExecutor routeThreadPool;
    private ThreadPoolExecutor outboundThreadPool;

    public ThreadingServiceImpl(ConfigurationEngineDefaults.Threading threading) {
        this.config = threading;
        if (ThreadingOption.isThreadingEnabled()) {
            this.isTimerThreading = threading.isThreadPoolTimerExec();
            this.isInboundThreading = threading.isThreadPoolInbound();
            this.isRouteThreading = threading.isThreadPoolRouteExec();
            this.isOutboundThreading = threading.isThreadPoolOutbound();
            return;
        }
        this.isTimerThreading = false;
        this.isInboundThreading = false;
        this.isRouteThreading = false;
        this.isOutboundThreading = false;
    }

    @Override // com.espertech.esper.core.thread.ThreadingService
    public boolean isRouteThreading() {
        return this.isRouteThreading;
    }

    @Override // com.espertech.esper.core.thread.ThreadingService
    public boolean isInboundThreading() {
        return this.isInboundThreading;
    }

    @Override // com.espertech.esper.core.thread.ThreadingService
    public boolean isTimerThreading() {
        return this.isTimerThreading;
    }

    @Override // com.espertech.esper.core.thread.ThreadingService
    public boolean isOutboundThreading() {
        return this.isOutboundThreading;
    }

    @Override // com.espertech.esper.core.thread.ThreadingService
    public void initThreading(EPServicesContext ePServicesContext, EPRuntimeImpl ePRuntimeImpl) {
        if (this.isInboundThreading) {
            this.inboundQueue = makeQueue(this.config.getThreadPoolInboundCapacity());
            this.inboundThreadPool = getThreadPool(ePServicesContext.getEngineURI(), "Inbound", this.inboundQueue, this.config.getThreadPoolInboundNumThreads());
        }
        if (this.isTimerThreading) {
            this.timerQueue = makeQueue(this.config.getThreadPoolTimerExecCapacity());
            this.timerThreadPool = getThreadPool(ePServicesContext.getEngineURI(), "TimerExec", this.timerQueue, this.config.getThreadPoolTimerExecNumThreads());
        }
        if (this.isRouteThreading) {
            this.routeQueue = makeQueue(this.config.getThreadPoolRouteExecCapacity());
            this.routeThreadPool = getThreadPool(ePServicesContext.getEngineURI(), "RouteExec", this.routeQueue, this.config.getThreadPoolRouteExecNumThreads());
        }
        if (this.isOutboundThreading) {
            this.outboundQueue = makeQueue(this.config.getThreadPoolOutboundCapacity());
            this.outboundThreadPool = getThreadPool(ePServicesContext.getEngineURI(), "Outbound", this.outboundQueue, this.config.getThreadPoolOutboundNumThreads());
        }
    }

    private BlockingQueue<Runnable> makeQueue(Integer num) {
        return (num == null || num.intValue() <= 0 || num.intValue() == Integer.MAX_VALUE) ? new LinkedBlockingQueue() : new ArrayBlockingQueue(num.intValue());
    }

    @Override // com.espertech.esper.core.thread.ThreadingService
    public void submitRoute(RouteUnitRunnable routeUnitRunnable) {
        try {
            this.routeQueue.put(routeUnitRunnable);
        } catch (InterruptedException e) {
            log.info("Submit interrupted:" + e);
        }
    }

    @Override // com.espertech.esper.core.thread.ThreadingService
    public void submitInbound(InboundUnitRunnable inboundUnitRunnable) {
        try {
            this.inboundQueue.put(inboundUnitRunnable);
        } catch (InterruptedException e) {
            log.info("Submit interrupted:" + e);
        }
    }

    @Override // com.espertech.esper.core.thread.ThreadingService
    public void submitOutbound(OutboundUnitRunnable outboundUnitRunnable) {
        try {
            this.outboundQueue.put(outboundUnitRunnable);
        } catch (InterruptedException e) {
            log.info("Submit interrupted:" + e);
        }
    }

    @Override // com.espertech.esper.core.thread.ThreadingService
    public void submitTimerWork(TimerUnit timerUnit) {
        try {
            this.timerQueue.put(timerUnit);
        } catch (InterruptedException e) {
            log.info("Submit interrupted:" + e);
        }
    }

    @Override // com.espertech.esper.core.thread.ThreadingService
    public BlockingQueue<Runnable> getOutboundQueue() {
        return this.outboundQueue;
    }

    @Override // com.espertech.esper.core.thread.ThreadingService
    public ThreadPoolExecutor getOutboundThreadPool() {
        return this.outboundThreadPool;
    }

    @Override // com.espertech.esper.core.thread.ThreadingService
    public BlockingQueue<Runnable> getRouteQueue() {
        return this.routeQueue;
    }

    @Override // com.espertech.esper.core.thread.ThreadingService
    public ThreadPoolExecutor getRouteThreadPool() {
        return this.routeThreadPool;
    }

    @Override // com.espertech.esper.core.thread.ThreadingService
    public BlockingQueue<Runnable> getTimerQueue() {
        return this.timerQueue;
    }

    @Override // com.espertech.esper.core.thread.ThreadingService
    public ThreadPoolExecutor getTimerThreadPool() {
        return this.timerThreadPool;
    }

    @Override // com.espertech.esper.core.thread.ThreadingService
    public BlockingQueue<Runnable> getInboundQueue() {
        return this.inboundQueue;
    }

    @Override // com.espertech.esper.core.thread.ThreadingService
    public ThreadPoolExecutor getInboundThreadPool() {
        return this.inboundThreadPool;
    }

    @Override // com.espertech.esper.core.thread.ThreadingService
    public synchronized void destroy() {
        if (this.timerThreadPool != null) {
            stopPool(this.timerThreadPool, this.timerQueue, "TimerExec");
        }
        if (this.routeThreadPool != null) {
            stopPool(this.routeThreadPool, this.routeQueue, "RouteExec");
        }
        if (this.outboundThreadPool != null) {
            stopPool(this.outboundThreadPool, this.outboundQueue, "Outbound");
        }
        if (this.inboundThreadPool != null) {
            stopPool(this.inboundThreadPool, this.inboundQueue, "Inbound");
        }
        this.timerThreadPool = null;
        this.routeThreadPool = null;
        this.outboundThreadPool = null;
        this.inboundThreadPool = null;
    }

    private ThreadPoolExecutor getThreadPool(String str, String str2, BlockingQueue<Runnable> blockingQueue, int i) {
        if (log.isInfoEnabled()) {
            log.info("Starting pool " + str2 + " with " + i + " threads");
        }
        if (str == null) {
            str = "default";
        }
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(i, i, 1L, TimeUnit.SECONDS, blockingQueue, new EngineThreadFactory(str, str2, new ThreadGroup("com.espertech.esper." + str + "-" + str2), 5));
        threadPoolExecutor.prestartAllCoreThreads();
        return threadPoolExecutor;
    }

    @Override // com.espertech.esper.core.thread.ThreadingService
    public Thread makeEventSourceThread(String str, String str2, Runnable runnable) {
        if (str == null) {
            str = "default";
        }
        return new Thread(new ThreadGroup("com.espertech.esper." + str + "-source-" + str2), runnable);
    }

    private void stopPool(ThreadPoolExecutor threadPoolExecutor, BlockingQueue<Runnable> blockingQueue, String str) {
        if (log.isInfoEnabled()) {
            log.info("Shutting down pool " + str);
        }
        blockingQueue.clear();
        threadPoolExecutor.shutdown();
        try {
            threadPoolExecutor.awaitTermination(10L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.error("Interruped awaiting termination", e);
        }
    }
}
