package io.siddhi.core.stream.output.sink;

import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.exception.SiddhiAppCreationException;
import io.siddhi.core.exception.SiddhiAppRuntimeException;
import io.siddhi.core.stream.ServiceDeploymentInfo;
import io.siddhi.core.stream.output.sink.distributed.DistributedTransport;
import io.siddhi.core.util.ExceptionUtil;
import io.siddhi.core.util.SiddhiConstants;
import io.siddhi.core.util.StringUtil;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.parser.helper.QueryParserHelper;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.snapshot.state.StateHolder;
import io.siddhi.core.util.statistics.LatencyTracker;
import io.siddhi.core.util.statistics.ThroughputTracker;
import io.siddhi.core.util.transport.BackoffRetryCounter;
import io.siddhi.core.util.transport.DynamicOptions;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.query.api.annotation.Element;
import io.siddhi.query.api.definition.StreamDefinition;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;

/* JADX WARN: Classes with same name are omitted:
  input_file:dependencies/siddhi-core-5.0.0-m12.jar:io/siddhi/core/stream/output/sink/Sink.class
 */
/* loaded from: input_file:io/siddhi/core/stream/output/sink/Sink.class */
public abstract class Sink<S extends State> implements SinkListener {
    private static final Logger LOG = Logger.getLogger(Sink.class);
    private StreamDefinition streamDefinition;
    private String type;
    private SinkMapper mapper;
    private SinkHandler handler;
    private SiddhiAppContext siddhiAppContext;
    private OnErrorAction onErrorAction;
    private ThreadLocal<DynamicOptions> trpDynamicOptions;
    private ScheduledExecutorService scheduledExecutorService;
    private ThroughputTracker throughputTracker;
    private LatencyTracker mapperLatencyTracker;
    private StateHolder<S> stateHolder;
    private ServiceDeploymentInfo serviceDeploymentInfo;
    protected AtomicBoolean isTryingToConnect = new AtomicBoolean(false);
    private DistributedTransport.ConnectionCallback connectionCallback = null;
    private BackoffRetryCounter backoffRetryCounter = new BackoffRetryCounter();
    private BackoffRetryCounter backoffPublishRetryCounter = new BackoffRetryCounter();
    private AtomicBoolean isConnected = new AtomicBoolean(false);

    /* JADX WARN: Classes with same name are omitted:
      input_file:dependencies/siddhi-core-5.0.0-m12.jar:io/siddhi/core/stream/output/sink/Sink$OnErrorAction.class
     */
    /* loaded from: input_file:io/siddhi/core/stream/output/sink/Sink$OnErrorAction.class */
    public enum OnErrorAction {
        LOG,
        WAIT,
        STREAM
    }

    public final void init(StreamDefinition streamDefinition, String str, OptionHolder optionHolder, ConfigReader configReader, SinkMapper sinkMapper, String str2, OptionHolder optionHolder2, SinkHandler sinkHandler, List<Element> list, ConfigReader configReader2, Map<String, String> map, SiddhiAppContext siddhiAppContext) {
        this.streamDefinition = streamDefinition;
        this.type = str;
        this.siddhiAppContext = siddhiAppContext;
        this.onErrorAction = OnErrorAction.valueOf(optionHolder.getOrCreateOption(SiddhiConstants.ANNOTATION_ELEMENT_ON_ERROR, "LOG").getValue().toUpperCase());
        if (siddhiAppContext.getStatisticsManager() != null) {
            this.throughputTracker = QueryParserHelper.createThroughputTracker(siddhiAppContext, streamDefinition.getId(), SiddhiConstants.METRIC_INFIX_SINKS, str);
            this.mapperLatencyTracker = QueryParserHelper.createLatencyTracker(siddhiAppContext, streamDefinition.getId(), SiddhiConstants.METRIC_INFIX_SINK_MAPPERS, str + SiddhiConstants.METRIC_DELIMITER + str2);
        }
        this.stateHolder = siddhiAppContext.generateStateHolder(streamDefinition.getId() + "-" + getClass().getName(), init(streamDefinition, optionHolder, configReader, siddhiAppContext));
        if (sinkMapper != null) {
            sinkMapper.init(streamDefinition, str2, optionHolder2, list, this, configReader2, this.mapperLatencyTracker, siddhiAppContext);
            this.mapper = sinkMapper;
        }
        if (sinkHandler != null) {
            sinkHandler.initSinkHandler(siddhiAppContext.getName(), streamDefinition, new SinkHandlerCallback(sinkMapper), siddhiAppContext);
            this.handler = sinkHandler;
        }
        this.scheduledExecutorService = siddhiAppContext.getScheduledExecutorService();
        this.serviceDeploymentInfo = exposeServiceDeploymentInfo();
        if (this.serviceDeploymentInfo != null) {
            this.serviceDeploymentInfo.addDeploymentProperties(map);
        } else if (!map.isEmpty()) {
            throw new SiddhiAppCreationException("Deployment properties '" + map + "' are defined for sink '" + str + "' which does not expose a service");
        }
    }

    public abstract Class[] getSupportedInputEventClasses();

    public final void initOnlyTransport(StreamDefinition streamDefinition, OptionHolder optionHolder, ConfigReader configReader, String str, DistributedTransport.ConnectionCallback connectionCallback, Map<String, String> map, SiddhiAppContext siddhiAppContext) {
        this.type = str;
        this.streamDefinition = streamDefinition;
        this.connectionCallback = connectionCallback;
        this.siddhiAppContext = siddhiAppContext;
        init(streamDefinition, optionHolder, configReader, siddhiAppContext);
        this.scheduledExecutorService = siddhiAppContext.getScheduledExecutorService();
        this.serviceDeploymentInfo = exposeServiceDeploymentInfo();
        if (this.serviceDeploymentInfo != null) {
            this.serviceDeploymentInfo.addDeploymentProperties(map);
        } else if (!map.isEmpty()) {
            throw new SiddhiAppCreationException("Deployment properties '" + map + "' are defined for sink '" + str + "' which does not expose a service");
        }
    }

    protected abstract ServiceDeploymentInfo exposeServiceDeploymentInfo();

    public abstract String[] getSupportedDynamicOptions();

    protected abstract StateFactory<S> init(StreamDefinition streamDefinition, OptionHolder optionHolder, ConfigReader configReader, SiddhiAppContext siddhiAppContext);

    @Override // io.siddhi.core.stream.output.sink.SinkListener
    public final void publish(Object obj) {
        if (this.mapperLatencyTracker != null && this.siddhiAppContext.isStatsEnabled()) {
            this.mapperLatencyTracker.markOut();
        }
        if (!isConnected()) {
            if (this.isTryingToConnect.get()) {
                onError(obj, new SiddhiAppRuntimeException("Connection unavailable at Sink '" + this.type + "' at '" + this.streamDefinition.getId() + "'. Connection retrying is in progress from a different thread."));
                return;
            } else {
                connectWithRetry();
                publish(obj);
                return;
            }
        }
        S state = this.stateHolder.getState();
        try {
            try {
                publish(obj, this.trpDynamicOptions.get(), state);
                if (this.throughputTracker != null && this.siddhiAppContext.isStatsEnabled()) {
                    this.throughputTracker.eventIn();
                }
            } catch (ConnectionUnavailableException e) {
                setConnected(false);
                if (this.connectionCallback != null) {
                    this.connectionCallback.connectionFailed();
                }
                LOG.error(ExceptionUtil.getMessageWithContext(e, this.siddhiAppContext) + " Connection unavailable at Sink '" + this.type + "' at '" + this.streamDefinition.getId() + "', will retry connection immediately.", e);
                connectWithRetry();
                publish(obj);
                this.stateHolder.returnState(state);
            }
        } finally {
            this.stateHolder.returnState(state);
        }
    }

    public abstract void publish(Object obj, DynamicOptions dynamicOptions, S s) throws ConnectionUnavailableException;

    public abstract void connect() throws ConnectionUnavailableException;

    public abstract void disconnect();

    public abstract void destroy();

    public final String getType() {
        return this.type;
    }

    public final SinkMapper getMapper() {
        return this.mapper;
    }

    public final SinkHandler getHandler() {
        return this.handler;
    }

    public void connectWithRetry() {
        if (this.isConnected.get()) {
            return;
        }
        this.isTryingToConnect.set(true);
        try {
            connect();
            setConnected(true);
            this.isTryingToConnect.set(false);
            if (this.connectionCallback != null) {
                this.connectionCallback.connectionEstablished();
            }
            this.backoffRetryCounter.reset();
        } catch (ConnectionUnavailableException e) {
            LOG.error(StringUtil.removeCRLFCharacters(ExceptionUtil.getMessageWithContext(e, this.siddhiAppContext) + " Error while connecting at Sink '" + this.type + "' at '" + this.streamDefinition.getId() + "', will retry in '" + this.backoffRetryCounter.getTimeInterval() + "'."), e);
            this.scheduledExecutorService.schedule(new Runnable() { // from class: io.siddhi.core.stream.output.sink.Sink.1
                @Override // java.lang.Runnable
                public void run() {
                    Sink.this.connectWithRetry();
                }
            }, this.backoffRetryCounter.getTimeIntervalMillis(), TimeUnit.MILLISECONDS);
            this.backoffRetryCounter.increment();
        } catch (RuntimeException e2) {
            LOG.error(StringUtil.removeCRLFCharacters(ExceptionUtil.getMessageWithContext(e2, this.siddhiAppContext)) + " Error while connecting at Sink '" + StringUtil.removeCRLFCharacters(this.type) + "' at '" + StringUtil.removeCRLFCharacters(this.streamDefinition.getId()) + "'.", e2);
            throw e2;
        }
    }

    public void shutdown() {
        disconnect();
        destroy();
        setConnected(false);
        this.isTryingToConnect.set(false);
        if (this.connectionCallback != null) {
            this.connectionCallback.connectionFailed();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setTrpDynamicOptions(ThreadLocal<DynamicOptions> threadLocal) {
        this.trpDynamicOptions = threadLocal;
    }

    public StreamDefinition getStreamDefinition() {
        return this.streamDefinition;
    }

    public boolean isConnected() {
        return this.isConnected.get();
    }

    public void setConnected(boolean z) {
        this.isConnected.set(z);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onError(Object obj, Exception exc) {
        switch (this.onErrorAction) {
            case STREAM:
                throw new SiddhiAppRuntimeException("Dropping event at Sink '" + this.type + "' at '" + this.streamDefinition.getId() + "' as its still trying to reconnect!, event dropped '" + obj + "'", exc);
            case WAIT:
                retryWait(this.backoffPublishRetryCounter.getTimeIntervalMillis());
                this.backoffPublishRetryCounter.increment();
                publish(obj);
                return;
            case LOG:
            default:
                LOG.error("Error on '" + this.siddhiAppContext.getName() + "'. Dropping event at Sink '" + this.type + "' at '" + this.streamDefinition.getId() + "' as its still trying to reconnect!, events dropped '" + obj + "'");
                return;
        }
    }

    public List<ServiceDeploymentInfo> getServiceDeploymentInfoList() {
        if (this.serviceDeploymentInfo == null) {
            return new ArrayList(0);
        }
        ArrayList arrayList = new ArrayList(1);
        arrayList.add(this.serviceDeploymentInfo);
        return arrayList;
    }

    private void retryWait(long j) {
        try {
            Thread.sleep(j);
        } catch (InterruptedException e) {
        }
    }
}
