/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.core.stream.input.source;

import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.stream.input.source.AttributeMapping;
import io.siddhi.core.stream.input.source.SourceEventListener;
import io.siddhi.core.stream.input.source.SourceHandler;
import io.siddhi.core.stream.input.source.SourceMapper;
import io.siddhi.core.stream.input.source.SourceSyncCallback;
import io.siddhi.core.util.ExceptionUtil;
import io.siddhi.core.util.StringUtil;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.Snapshotable;
import io.siddhi.core.util.transport.BackoffRetryCounter;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.query.api.definition.StreamDefinition;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;

public abstract class Source
implements Snapshotable {
    private static final Logger LOG = Logger.getLogger(Source.class);
    private String type;
    private SourceMapper mapper;
    private StreamDefinition streamDefinition;
    private String elementId;
    private SiddhiAppContext siddhiAppContext;
    private AtomicBoolean isTryingToConnect = new AtomicBoolean(false);
    private BackoffRetryCounter backoffRetryCounter = new BackoffRetryCounter();
    private AtomicBoolean isConnected = new AtomicBoolean(false);
    private ScheduledExecutorService scheduledExecutorService;
    private ConnectionCallback connectionCallback = new ConnectionCallback();

    public final void init(String sourceType, OptionHolder transportOptionHolder, SourceMapper sourceMapper, String[] transportPropertyNames, ConfigReader configReader, String mapType, OptionHolder mapOptionHolder, List<AttributeMapping> attributeMappings, List<AttributeMapping> transportMappings, ConfigReader mapperConfigReader, SourceHandler sourceHandler, StreamDefinition streamDefinition, SiddhiAppContext siddhiAppContext) {
        this.type = sourceType;
        sourceMapper.init(streamDefinition, mapType, mapOptionHolder, attributeMappings, sourceType, this instanceof SourceSyncCallback ? (SourceSyncCallback)((Object)this) : null, transportMappings, sourceHandler, mapperConfigReader, siddhiAppContext);
        this.mapper = sourceMapper;
        this.streamDefinition = streamDefinition;
        this.elementId = siddhiAppContext.getElementIdGenerator().createNewId();
        this.siddhiAppContext = siddhiAppContext;
        this.init(sourceMapper, transportOptionHolder, transportPropertyNames, configReader, siddhiAppContext);
        this.scheduledExecutorService = siddhiAppContext.getScheduledExecutorService();
    }

    public abstract void init(SourceEventListener var1, OptionHolder var2, String[] var3, ConfigReader var4, SiddhiAppContext var5);

    public abstract Class[] getOutputEventClasses();

    public abstract void connect(ConnectionCallback var1) throws ConnectionUnavailableException;

    public abstract void disconnect();

    public abstract void destroy();

    public abstract void pause();

    public abstract void resume();

    public void connectWithRetry() {
        if (!this.isConnected.get()) {
            this.isTryingToConnect.set(true);
            try {
                this.connect(this.connectionCallback);
                this.isConnected.set(true);
                this.isTryingToConnect.set(false);
                this.backoffRetryCounter.reset();
            }
            catch (ConnectionUnavailableException e) {
                this.disconnect();
                this.isConnected.set(false);
                this.retryWithBackoff(e);
            }
            catch (RuntimeException e) {
                LOG.error((Object)(StringUtil.removeCRLFCharacters(ExceptionUtil.getMessageWithContext(e, this.siddhiAppContext)) + " Error while connecting at Source '" + StringUtil.removeCRLFCharacters(this.type) + "' at '" + StringUtil.removeCRLFCharacters(this.streamDefinition.getId()) + "'."), (Throwable)e);
                throw e;
            }
        }
    }

    private void retryWithBackoff(ConnectionUnavailableException e) {
        LOG.error((Object)(ExceptionUtil.getMessageWithContext(e, this.siddhiAppContext) + " Error while connecting at Source '" + StringUtil.removeCRLFCharacters(this.type) + "' at '" + StringUtil.removeCRLFCharacters(this.streamDefinition.getId()) + "'. Will retry in '" + StringUtil.removeCRLFCharacters(this.backoffRetryCounter.getTimeInterval()) + "'."), (Throwable)e);
        this.scheduledExecutorService.schedule(new Runnable(){

            @Override
            public void run() {
                Source.this.connectWithRetry();
            }
        }, this.backoffRetryCounter.getTimeIntervalMillis(), TimeUnit.MILLISECONDS);
        this.backoffRetryCounter.increment();
    }

    public final SourceMapper getMapper() {
        return this.mapper;
    }

    public void shutdown() {
        try {
            this.disconnect();
            this.destroy();
        }
        finally {
            this.isConnected.set(false);
            this.isTryingToConnect.set(false);
        }
    }

    @Override
    public final String getElementId() {
        return this.elementId;
    }

    @Override
    public void clean() {
    }

    public String getType() {
        return this.type;
    }

    public StreamDefinition getStreamDefinition() {
        return this.streamDefinition;
    }

    public class ConnectionCallback {
        public void onError(ConnectionUnavailableException e) {
            Source.this.disconnect();
            Source.this.isConnected.set(false);
            Source.this.retryWithBackoff(e);
        }
    }
}

