package org.wso2.carbon.stream.processor.core.internal;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wso2.carbon.stream.processor.core.distribution.DistributionService;
import org.wso2.carbon.stream.processor.core.ha.HACoordinationRecordTableHandler;
import org.wso2.carbon.stream.processor.core.ha.HACoordinationSinkHandler;
import org.wso2.carbon.stream.processor.core.ha.HACoordinationSourceHandler;
import org.wso2.carbon.stream.processor.core.ha.HAManager;
import org.wso2.carbon.stream.processor.core.ha.RetryRecordTableConnection;
import org.wso2.carbon.stream.processor.core.ha.exception.HAModeException;
import org.wso2.carbon.stream.processor.core.internal.exception.SiddhiAppAlreadyExistException;
import org.wso2.carbon.stream.processor.core.internal.exception.SiddhiAppConfigurationException;
import org.wso2.carbon.stream.processor.core.internal.exception.SiddhiAppDeploymentException;
import org.wso2.carbon.stream.processor.core.internal.util.SiddhiAppFilesystemInvoker;
import org.wso2.carbon.stream.processor.core.internal.util.SiddhiAppProcessorConstants;
import org.wso2.carbon.stream.processor.core.util.DeploymentMode;
import org.wso2.carbon.stream.processor.core.util.RuntimeMode;
import org.wso2.siddhi.core.SiddhiAppRuntime;
import org.wso2.siddhi.core.exception.CannotRestoreSiddhiAppStateException;
import org.wso2.siddhi.core.exception.ConnectionUnavailableException;
import org.wso2.siddhi.core.stream.input.source.Source;
import org.wso2.siddhi.core.stream.output.sink.Sink;
import org.wso2.siddhi.core.table.Table;
import org.wso2.siddhi.core.util.transport.BackoffRetryCounter;
import org.wso2.siddhi.query.api.annotation.Element;
import org.wso2.siddhi.query.api.util.AnnotationHelper;
import org.wso2.siddhi.query.compiler.SiddhiCompiler;

/* loaded from: input_file:org/wso2/carbon/stream/processor/core/internal/StreamProcessorService.class */
public class StreamProcessorService {
    private static final Logger log = LoggerFactory.getLogger(StreamProcessorService.class);
    private Map<String, SiddhiAppData> siddhiAppMap = new ConcurrentHashMap();
    private BackoffRetryCounter backoffRetryCounter = new BackoffRetryCounter();
    private DistributionService distributionService = StreamProcessorDataHolder.getDistributionService();

    public void deploySiddhiApp(String str, String str2) throws SiddhiAppConfigurationException, SiddhiAppAlreadyExistException, ConnectionUnavailableException {
        SiddhiAppData siddhiAppData = new SiddhiAppData(str);
        if (this.siddhiAppMap.containsKey(str2)) {
            throw new SiddhiAppAlreadyExistException("There is a Siddhi App with name " + str2 + " is already exist");
        }
        if (this.distributionService.getRuntimeMode() == RuntimeMode.MANAGER && this.distributionService.getDeploymentMode() == DeploymentMode.DISTRIBUTED) {
            if (this.distributionService.isLeader()) {
                if (!this.distributionService.distribute(str).isDeployed()) {
                    throw new SiddhiAppConfigurationException("Error in deploying Siddhi App " + str2 + "in distributed mode");
                }
                siddhiAppData.setActive(true);
                this.siddhiAppMap.put(str2, siddhiAppData);
                return;
            }
            return;
        }
        SiddhiAppRuntime createSiddhiAppRuntime = StreamProcessorDataHolder.getSiddhiManager().createSiddhiAppRuntime(str);
        if (createSiddhiAppRuntime != null) {
            Set<String> keySet = createSiddhiAppRuntime.getStreamDefinitionMap().keySet();
            ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap(keySet.size());
            for (String str3 : keySet) {
                concurrentHashMap.put(str3, createSiddhiAppRuntime.getInputHandler(str3));
            }
            HAManager hAManager = StreamProcessorDataHolder.getHAManager();
            if (hAManager != null) {
                if (hAManager.isActiveNode()) {
                    if (StreamProcessorDataHolder.isPersistenceEnabled()) {
                        log.info("Periodic Persistence of Active Node Enabled. Restoring From Last Saved Snapshot for " + str2);
                        String str4 = null;
                        try {
                            str4 = createSiddhiAppRuntime.restoreLastRevision();
                        } catch (CannotRestoreSiddhiAppStateException e) {
                            log.error("Error in restoring Siddhi app " + createSiddhiAppRuntime.getName(), e);
                        }
                        if (str4 != null) {
                            log.info("Siddhi App " + str2 + " restored to revision " + str4);
                        }
                    } else {
                        log.info("Periodic Persistence is Disabled. It is recommended to enable this feature when using 2 Node Minimum HA");
                    }
                    log.info("Setting SinksHandlers of " + str2 + " to Active");
                    Iterator it = createSiddhiAppRuntime.getSinks().iterator();
                    while (it.hasNext()) {
                        Iterator it2 = ((List) it.next()).iterator();
                        while (it2.hasNext()) {
                            ((HACoordinationSinkHandler) ((Sink) it2.next()).getHandler()).setAsActive();
                        }
                    }
                    log.info("Setting SourceHandlers of " + str2 + " to Active");
                    Iterator it3 = createSiddhiAppRuntime.getSources().iterator();
                    while (it3.hasNext()) {
                        Iterator it4 = ((List) it3.next()).iterator();
                        while (it4.hasNext()) {
                            ((HACoordinationSourceHandler) ((Source) it4.next()).getMapper().getHandler()).setAsActive();
                        }
                    }
                    log.info("Setting RecordTableHandlers of " + str2 + " to Active");
                    for (Table table : createSiddhiAppRuntime.getTables()) {
                        HACoordinationRecordTableHandler hACoordinationRecordTableHandler = (HACoordinationRecordTableHandler) table.getHandler();
                        if (hACoordinationRecordTableHandler != null) {
                            try {
                                hACoordinationRecordTableHandler.setAsActive();
                            } catch (ConnectionUnavailableException e2) {
                                this.backoffRetryCounter.reset();
                                log.error("HA Deployment: Error in connecting to table " + hACoordinationRecordTableHandler.getTableId() + " while changing from passive state to active, will retry in " + this.backoffRetryCounter.getTimeInterval(), e2);
                                ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
                                this.backoffRetryCounter.increment();
                                newSingleThreadScheduledExecutor.schedule(new RetryRecordTableConnection(this.backoffRetryCounter, table.getHandler(), newSingleThreadScheduledExecutor), this.backoffRetryCounter.getTimeIntervalMillis(), TimeUnit.MILLISECONDS);
                            }
                        }
                    }
                } else if (hAManager.isLiveStateSyncEnabled()) {
                    log.info("Live State Sync is Enabled for Passive Node. Restoring Active Node current state for " + str2);
                    if (!StreamProcessorDataHolder.getHAManager().persistActiveNode(str2)) {
                        int retryAppSyncPeriod = StreamProcessorDataHolder.getDeploymentConfig().getRetryAppSyncPeriod();
                        siddhiAppData.setActive(false);
                        siddhiAppData.setSiddhiAppRuntime(createSiddhiAppRuntime);
                        siddhiAppData.setInputHandlerMap(concurrentHashMap);
                        siddhiAppData.setDeploymentTime(System.currentTimeMillis());
                        this.siddhiAppMap.put(str2, siddhiAppData);
                        Timer retrySiddhiAppLiveStateSync = retrySiddhiAppLiveStateSync(retryAppSyncPeriod, str2, siddhiAppData, createSiddhiAppRuntime);
                        log.info("Snapshot for " + str2 + " not found. Make sure Active and Passive node have deployed the same Siddhi Applications");
                        log.info("Scheduled active node state sync for " + str2 + " in " + (retryAppSyncPeriod / 1000) + " seconds.");
                        hAManager.addRetrySiddhiAppSyncTimer(retrySiddhiAppLiveStateSync);
                        return;
                    }
                    if (log.isDebugEnabled()) {
                        log.debug("Snapshot for " + str2 + " found from Active Node Live State Sync before deploying app");
                    }
                    try {
                        createSiddhiAppRuntime.restoreLastRevision();
                    } catch (CannotRestoreSiddhiAppStateException e3) {
                        log.error("Error in restoring Siddhi app " + createSiddhiAppRuntime.getName(), e3);
                    }
                } else {
                    if (!StreamProcessorDataHolder.isPersistenceEnabled()) {
                        throw new HAModeException("Passive Node Periodic Persistence is Disabled and Live State Sync Disabled. Please enable Periodic Persistence.");
                    }
                    log.info("Live State Sync is Disabled for Passive Node. Restoring Active nodes last persisted state for " + str2);
                    String str5 = null;
                    try {
                        str5 = createSiddhiAppRuntime.restoreLastRevision();
                    } catch (CannotRestoreSiddhiAppStateException e4) {
                        log.error("Error in restoring Siddhi app " + createSiddhiAppRuntime.getName(), e4);
                    }
                    if (str5 == null) {
                        int retryAppSyncPeriod2 = StreamProcessorDataHolder.getDeploymentConfig().getRetryAppSyncPeriod();
                        siddhiAppData.setActive(false);
                        siddhiAppData.setSiddhiAppRuntime(createSiddhiAppRuntime);
                        siddhiAppData.setInputHandlerMap(concurrentHashMap);
                        siddhiAppData.setDeploymentTime(System.currentTimeMillis());
                        this.siddhiAppMap.put(str2, siddhiAppData);
                        Timer retrySiddhiAppPersistenceStateSync = retrySiddhiAppPersistenceStateSync(retryAppSyncPeriod2, str2, siddhiAppData, createSiddhiAppRuntime);
                        log.info("Snapshot for " + str2 + " not found. Make sure Active and Passive node have deployed the same Siddhi Applications");
                        log.info("Scheduled active node persistence sync for " + str2 + " in " + (retryAppSyncPeriod2 / 1000) + " seconds.");
                        hAManager.addRetrySiddhiAppSyncTimer(retrySiddhiAppPersistenceStateSync);
                        return;
                    }
                    log.info("Siddhi App " + str2 + " restored to revision " + str5);
                }
            } else if (StreamProcessorDataHolder.isPersistenceEnabled()) {
                log.info("Periodic State persistence enabled. Restoring last persisted state of " + str2);
                String str6 = null;
                try {
                    str6 = createSiddhiAppRuntime.restoreLastRevision();
                } catch (CannotRestoreSiddhiAppStateException e5) {
                    log.error("Error in restoring Siddhi app " + createSiddhiAppRuntime.getName(), e5);
                }
                if (str6 != null) {
                    log.info("Siddhi App " + str2 + " restored to revision " + str6);
                }
            }
            createSiddhiAppRuntime.start();
            log.info("Siddhi App " + str2 + " deployed successfully");
            siddhiAppData.setActive(true);
            siddhiAppData.setSiddhiAppRuntime(createSiddhiAppRuntime);
            siddhiAppData.setInputHandlerMap(concurrentHashMap);
            siddhiAppData.setDeploymentTime(System.currentTimeMillis());
            this.siddhiAppMap.put(str2, siddhiAppData);
        }
    }

    public void undeploySiddhiApp(String str) {
        if (this.siddhiAppMap.containsKey(str)) {
            if (this.distributionService.getRuntimeMode() == RuntimeMode.MANAGER && this.distributionService.getDeploymentMode() == DeploymentMode.DISTRIBUTED) {
                this.distributionService.undeploy(str);
            } else {
                SiddhiAppData siddhiAppData = this.siddhiAppMap.get(str);
                if (siddhiAppData != null && siddhiAppData.isActive()) {
                    siddhiAppData.getSiddhiAppRuntime().shutdown();
                }
            }
            this.siddhiAppMap.remove(str);
            log.info("Siddhi App File " + str + " undeployed successfully.");
        }
    }

    public boolean delete(String str) throws SiddhiAppConfigurationException, SiddhiAppDeploymentException {
        if (!this.siddhiAppMap.containsKey(str)) {
            return false;
        }
        SiddhiAppFilesystemInvoker.delete(str);
        return true;
    }

    public String validateAndSave(String str, boolean z) throws SiddhiAppConfigurationException, SiddhiAppDeploymentException {
        String str2 = "";
        try {
            str2 = getSiddhiAppName(str);
            if ((!z && this.siddhiAppMap.containsKey(str2)) || StreamProcessorDataHolder.getSiddhiManager().createSiddhiAppRuntime(str) == null) {
                return null;
            }
            SiddhiAppFilesystemInvoker.save(str, str2);
            return str2;
        } catch (SiddhiAppDeploymentException e) {
            log.error("Exception occurred when saving Siddhi App : " + str2, e);
            throw e;
        } catch (Throwable th) {
            log.error("Exception occurred when validating Siddhi App " + str2, th);
            throw new SiddhiAppConfigurationException(th);
        }
    }

    public String getSiddhiAppName(String str) throws SiddhiAppConfigurationException {
        try {
            Element annotationElement = AnnotationHelper.getAnnotationElement(SiddhiAppProcessorConstants.ANNOTATION_NAME_NAME, (String) null, SiddhiCompiler.parse(str).getAnnotations());
            if (annotationElement == null || annotationElement.getValue().isEmpty()) {
                throw new SiddhiAppConfigurationException("Siddhi App name must be provided as @App:name('name').");
            }
            return annotationElement.getValue();
        } catch (Throwable th) {
            throw new SiddhiAppConfigurationException("Exception occurred when retrieving Siddhi App Name ", th);
        }
    }

    public boolean isExists(String str) throws SiddhiAppConfigurationException {
        return this.siddhiAppMap.containsKey(getSiddhiAppName(str));
    }

    public void addSiddhiAppFile(String str, SiddhiAppData siddhiAppData) {
        this.siddhiAppMap.put(str, siddhiAppData);
    }

    public Map<String, SiddhiAppData> getSiddhiAppMap() {
        return this.siddhiAppMap;
    }

    private Timer retrySiddhiAppLiveStateSync(final int i, final String str, final SiddhiAppData siddhiAppData, final SiddhiAppRuntime siddhiAppRuntime) {
        Timer timer = new Timer();
        timer.schedule(new TimerTask() { // from class: org.wso2.carbon.stream.processor.core.internal.StreamProcessorService.1
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                if (!StreamProcessorDataHolder.getHAManager().persistActiveNode(str)) {
                    StreamProcessorService.log.error("Snapshot for " + str + " not found after " + (i / 1000) + " seconds. Make sure Active and Passive node have deployed the same Siddhi Applications");
                    return;
                }
                try {
                    siddhiAppRuntime.restoreLastRevision();
                    siddhiAppRuntime.start();
                    siddhiAppData.setActive(true);
                    siddhiAppData.setSiddhiAppRuntime(siddhiAppRuntime);
                    siddhiAppData.setDeploymentTime(System.currentTimeMillis());
                    StreamProcessorService.this.siddhiAppMap.put(str, siddhiAppData);
                    StreamProcessorService.log.info("Siddhi App " + str + " deployed successfully after active node sync in " + (i / 1000) + " seconds");
                } catch (CannotRestoreSiddhiAppStateException e) {
                    StreamProcessorService.log.error("Error in restoring Siddhi app " + siddhiAppRuntime.getName(), e);
                }
            }
        }, i);
        return timer;
    }

    private Timer retrySiddhiAppPersistenceStateSync(final int i, final String str, final SiddhiAppData siddhiAppData, final SiddhiAppRuntime siddhiAppRuntime) {
        Timer timer = new Timer();
        timer.schedule(new TimerTask() { // from class: org.wso2.carbon.stream.processor.core.internal.StreamProcessorService.2
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                String str2 = null;
                try {
                    str2 = siddhiAppRuntime.restoreLastRevision();
                } catch (CannotRestoreSiddhiAppStateException e) {
                    StreamProcessorService.log.error("Error in restoring Siddhi app " + siddhiAppRuntime.getName(), e);
                }
                if (str2 == null) {
                    StreamProcessorService.log.error("Snapshot for " + str + " not found after " + (i / 1000) + " seconds. Make sure Active and Passive node have deployed the same Siddhi Applications");
                    return;
                }
                StreamProcessorService.log.info("Siddhi App " + str + " restored to revision " + str2);
                siddhiAppRuntime.start();
                siddhiAppData.setActive(true);
                siddhiAppData.setSiddhiAppRuntime(siddhiAppRuntime);
                siddhiAppData.setDeploymentTime(System.currentTimeMillis());
                StreamProcessorService.this.siddhiAppMap.put(str, siddhiAppData);
                StreamProcessorService.log.info("Siddhi App " + str + " deployed successfully after active node sync in " + (i / 1000) + " seconds");
            }
        }, i);
        return timer;
    }
}
