package org.bonitasoft.engine.connector.impl;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.bonitasoft.engine.commons.exceptions.SBonitaRuntimeException;
import org.bonitasoft.engine.connector.ConnectorExecutor;
import org.bonitasoft.engine.connector.SConnector;
import org.bonitasoft.engine.connector.exception.SConnectorException;
import org.bonitasoft.engine.log.technical.TechnicalLogSeverity;
import org.bonitasoft.engine.log.technical.TechnicalLogger;
import org.bonitasoft.engine.log.technical.TechnicalLoggerService;
import org.bonitasoft.engine.monitoring.ExecutorServiceMetricsProvider;
import org.bonitasoft.engine.session.SessionService;
import org.bonitasoft.engine.sessionaccessor.STenantIdNotSetException;
import org.bonitasoft.engine.sessionaccessor.SessionAccessor;
import org.bonitasoft.engine.sessionaccessor.SessionIdNotSetException;
import org.bonitasoft.engine.tracking.TimeTracker;
import org.bonitasoft.engine.tracking.TimeTrackerRecords;

/* loaded from: input_file:org/bonitasoft/engine/connector/impl/ConnectorExecutorImpl.class */
public class ConnectorExecutorImpl implements ConnectorExecutor {
    public static final String NUMBER_OF_CONNECTORS_PENDING = "bonita.bpmengine.connector.pending";
    public static final String NUMBER_OF_CONNECTORS_RUNNING = "bonita.bpmengine.connector.running";
    public static final String NUMBER_OF_CONNECTORS_EXECUTED = "bonita.bpmengine.connector.executed";
    private ExecutorService executorService;
    private final SessionAccessor sessionAccessor;
    private final SessionService sessionService;
    private final int queueCapacity;
    private final int corePoolSize;
    private final int maximumPoolSize;
    private final long keepAliveTimeSeconds;
    private final TechnicalLoggerService loggerService;
    private final TimeTracker timeTracker;
    private MeterRegistry meterRegistry;
    private long tenantId;
    private ExecutorServiceMetricsProvider executorServiceMetricsProvider;
    private final AtomicLong runningWorks = new AtomicLong();
    private Counter executedWorkCounter;
    private Gauge numberOfConnectorsPending;
    private Gauge numberOfConnectorsRunning;

    /* loaded from: input_file:org/bonitasoft/engine/connector/impl/ConnectorExecutorImpl$ExecuteConnectorCallable.class */
    public final class ExecuteConnectorCallable implements InterruptibleCallable<Map<String, Object>> {
        private final Map<String, Object> inputParameters;
        private final SConnector sConnector;
        private final long tenantId;
        private final ClassLoader loader;
        private TechnicalLogger technicalLogger;
        private Thread thread;
        private boolean interrupted;
        private boolean completed;

        private ExecuteConnectorCallable(Map<String, Object> map, SConnector sConnector, long j, ClassLoader classLoader, TechnicalLogger technicalLogger) {
            this.inputParameters = map;
            this.sConnector = sConnector;
            this.tenantId = j;
            this.loader = classLoader;
            this.technicalLogger = technicalLogger;
        }

        @Override // java.util.concurrent.Callable
        public Map<String, Object> call() throws Exception {
            this.technicalLogger.debug("Start execution of connector {}", this.sConnector.getClass());
            if (this.interrupted) {
                throw new InterruptedException();
            }
            long currentTimeMillis = System.currentTimeMillis();
            ConnectorExecutorImpl.this.sessionAccessor.setTenantId(this.tenantId);
            Thread.currentThread().setContextClassLoader(this.loader);
            this.sConnector.setInputParameters(this.inputParameters);
            try {
                this.thread = Thread.currentThread();
                this.sConnector.validate();
                this.sConnector.connect();
                Map<String, Object> execute = this.sConnector.execute();
                this.thread = null;
                this.completed = true;
                this.technicalLogger.debug("Finish execution of connector {}", this.sConnector.getClass());
                try {
                    long sessionId = ConnectorExecutorImpl.this.sessionAccessor.getSessionId();
                    ConnectorExecutorImpl.this.sessionAccessor.deleteSessionId();
                    ConnectorExecutorImpl.this.sessionService.deleteSession(sessionId);
                } catch (SessionIdNotSetException e) {
                }
                ConnectorExecutorImpl.this.track(TimeTrackerRecords.EXECUTE_CONNECTOR_CALLABLE, currentTimeMillis, this.sConnector, this.inputParameters);
                return execute;
            } catch (Throwable th) {
                this.thread = null;
                this.completed = true;
                this.technicalLogger.debug("Finish execution of connector {}", this.sConnector.getClass());
                try {
                    long sessionId2 = ConnectorExecutorImpl.this.sessionAccessor.getSessionId();
                    ConnectorExecutorImpl.this.sessionAccessor.deleteSessionId();
                    ConnectorExecutorImpl.this.sessionService.deleteSession(sessionId2);
                } catch (SessionIdNotSetException e2) {
                }
                ConnectorExecutorImpl.this.track(TimeTrackerRecords.EXECUTE_CONNECTOR_CALLABLE, currentTimeMillis, this.sConnector, this.inputParameters);
                throw th;
            }
        }

        @Override // org.bonitasoft.engine.connector.impl.InterruptibleCallable
        public void interrupt() {
            this.interrupted = true;
            if (this.thread != null) {
                StackTraceElement[] stackTrace = this.thread.getStackTrace();
                String str = (String) Arrays.stream(stackTrace).map((v0) -> {
                    return v0.toString();
                }).collect(Collectors.joining("\n"));
                this.technicalLogger.warn("Interrupt thread of connector {}, thread is {}, {}, connectors was doing :\n {}, activate debug logs to have the full execution stacktrace.", this.sConnector.getClass(), this.thread.getName(), Long.valueOf(this.thread.getId()), stackTrace[0].toString());
                this.technicalLogger.debug("Interrupt thread of connector {}, thread is {}, {}, stack is:\n {}", this.sConnector.getClass(), this.thread.getName(), Long.valueOf(this.thread.getId()), str);
                this.thread.interrupt();
            }
        }

        @Override // org.bonitasoft.engine.connector.impl.InterruptibleCallable
        public boolean isCompleted() {
            return this.completed;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/bonitasoft/engine/connector/impl/ConnectorExecutorImpl$QueueRejectedExecutionHandler.class */
    public final class QueueRejectedExecutionHandler implements RejectedExecutionHandler {
        private final TechnicalLoggerService logger;

        public QueueRejectedExecutionHandler(TechnicalLoggerService technicalLoggerService) {
            this.logger = technicalLoggerService;
        }

        @Override // java.util.concurrent.RejectedExecutionHandler
        public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
            if (this.logger.isLoggable(getClass(), TechnicalLogSeverity.WARNING)) {
                this.logger.log(getClass(), TechnicalLogSeverity.WARNING, "The work was rejected. Requeue work : " + runnable.toString());
            }
            try {
                threadPoolExecutor.getQueue().put(runnable);
            } catch (InterruptedException e) {
                throw new RejectedExecutionException("Queuing " + runnable + " got interrupted.", e);
            }
        }
    }

    public ConnectorExecutorImpl(int i, int i2, TechnicalLoggerService technicalLoggerService, int i3, long j, SessionAccessor sessionAccessor, SessionService sessionService, TimeTracker timeTracker, MeterRegistry meterRegistry, long j2, ExecutorServiceMetricsProvider executorServiceMetricsProvider) {
        this.queueCapacity = i;
        this.corePoolSize = i2;
        this.loggerService = technicalLoggerService;
        this.maximumPoolSize = i3;
        this.keepAliveTimeSeconds = j;
        this.sessionAccessor = sessionAccessor;
        this.sessionService = sessionService;
        this.timeTracker = timeTracker;
        this.meterRegistry = meterRegistry;
        this.tenantId = j2;
        this.executorServiceMetricsProvider = executorServiceMetricsProvider;
    }

    @Override // org.bonitasoft.engine.connector.ConnectorExecutor
    public CompletableFuture<Map<String, Object>> execute(SConnector sConnector, Map<String, Object> map, ClassLoader classLoader) throws SConnectorException {
        if (this.executorService == null) {
            throw new SConnectorException("Unable to execute a connector, if the node is not started. Start it first");
        }
        try {
            return execute(sConnector, new ExecuteConnectorCallable(map, sConnector, this.sessionAccessor.getTenantId(), classLoader, this.loggerService.asLogger(ConnectorExecutorImpl.class)));
        } catch (STenantIdNotSetException e) {
            throw new SConnectorException("Tenant id not set.", e);
        }
    }

    protected CompletableFuture<Map<String, Object>> execute(SConnector sConnector, InterruptibleCallable<Map<String, Object>> interruptibleCallable) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                return wrapForStats(interruptibleCallable).call();
            } catch (Throwable th) {
                disconnectSilently(sConnector);
                throw new SBonitaRuntimeException(th);
            }
        }, this.executorService);
    }

    private Callable<Map<String, Object>> wrapForStats(Callable<Map<String, Object>> callable) {
        return () -> {
            this.runningWorks.incrementAndGet();
            try {
                Map map = (Map) callable.call();
                this.executedWorkCounter.increment();
                this.runningWorks.decrementAndGet();
                return map;
            } catch (Throwable th) {
                this.runningWorks.decrementAndGet();
                throw th;
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void track(TimeTrackerRecords timeTrackerRecords, long j, SConnector sConnector, Map<String, Object> map) {
        if (this.timeTracker.isTrackable(timeTrackerRecords)) {
            this.timeTracker.track(timeTrackerRecords, "Connector: " + sConnector + " - inputParameters: " + map, System.currentTimeMillis() - j);
        }
    }

    void disconnectSilently(SConnector sConnector) {
        try {
            sConnector.disconnect();
        } catch (Exception e) {
            if (this.loggerService.isLoggable(getClass(), TechnicalLogSeverity.WARNING)) {
                this.loggerService.log(getClass(), TechnicalLogSeverity.WARNING, "An error occurred while disconnecting the connector: " + sConnector, e);
            }
        }
    }

    @Override // org.bonitasoft.engine.connector.ConnectorExecutor
    public void disconnect(SConnector sConnector) throws SConnectorException {
        try {
            sConnector.disconnect();
        } catch (SConnectorException e) {
            throw e;
        } catch (Exception e2) {
            throw new SConnectorException(e2);
        }
    }

    @Override // org.bonitasoft.engine.commons.LifecycleService
    public void start() {
        if (this.executorService == null) {
            ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(this.queueCapacity);
            QueueRejectedExecutionHandler queueRejectedExecutionHandler = new QueueRejectedExecutionHandler(this.loggerService);
            this.executorService = this.executorServiceMetricsProvider.bind(this.meterRegistry, new ThreadPoolExecutor(this.corePoolSize, this.maximumPoolSize, this.keepAliveTimeSeconds, TimeUnit.SECONDS, arrayBlockingQueue, new ConnectorExecutorThreadFactory("ConnectorExecutor"), queueRejectedExecutionHandler), "bonita-connector-executor", this.tenantId);
            Tags of = Tags.of("tenant", String.valueOf(this.tenantId));
            this.numberOfConnectorsPending = Gauge.builder(NUMBER_OF_CONNECTORS_PENDING, arrayBlockingQueue, (v0) -> {
                return v0.size();
            }).tags(of).baseUnit("connectors").description("Connectors pending in the execution queue").register(this.meterRegistry);
            this.numberOfConnectorsRunning = Gauge.builder(NUMBER_OF_CONNECTORS_RUNNING, this.runningWorks, (v0) -> {
                return v0.get();
            }).tags(of).baseUnit("connectors").description("Connectors currently executing").register(this.meterRegistry);
            this.executedWorkCounter = Counter.builder(NUMBER_OF_CONNECTORS_EXECUTED).tags(of).baseUnit("connectors").description("Total connectors executed since last server start").register(this.meterRegistry);
        }
    }

    ExecutorService getExecutorService() {
        return this.executorService;
    }

    @Override // org.bonitasoft.engine.commons.LifecycleService
    public void stop() {
        if (this.executorService != null) {
            this.meterRegistry.remove(this.executedWorkCounter);
            this.meterRegistry.remove(this.numberOfConnectorsRunning);
            this.meterRegistry.remove(this.numberOfConnectorsPending);
            this.executorServiceMetricsProvider.unbind(this.meterRegistry, "bonita-connector-executor", this.tenantId);
            this.executorService.shutdown();
            try {
                if (!this.executorService.awaitTermination(5000L, TimeUnit.MILLISECONDS)) {
                    this.loggerService.log(getClass(), TechnicalLogSeverity.WARNING, "Timeout (5s) trying to stop the connector executor thread pool.");
                }
            } catch (InterruptedException e) {
                this.loggerService.log(getClass(), TechnicalLogSeverity.WARNING, "Error while stopping the connector executor thread pool.", e);
            }
            this.executorService = null;
        }
    }

    @Override // org.bonitasoft.engine.commons.LifecycleService
    public void pause() {
        stop();
    }

    @Override // org.bonitasoft.engine.commons.LifecycleService
    public void resume() {
        start();
    }
}
