/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.core.stream.output.sink;

import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.exception.SiddhiAppCreationException;
import io.siddhi.core.stream.ServiceDeploymentInfo;
import io.siddhi.core.stream.StreamJunction;
import io.siddhi.core.stream.output.sink.SinkHandler;
import io.siddhi.core.stream.output.sink.SinkHandlerCallback;
import io.siddhi.core.stream.output.sink.SinkListener;
import io.siddhi.core.stream.output.sink.SinkMapper;
import io.siddhi.core.stream.output.sink.distributed.DistributedTransport;
import io.siddhi.core.util.ExceptionUtil;
import io.siddhi.core.util.StringUtil;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.error.handler.model.ErroneousEvent;
import io.siddhi.core.util.error.handler.util.ErrorOccurrence;
import io.siddhi.core.util.error.handler.util.ErrorStoreHelper;
import io.siddhi.core.util.parser.helper.QueryParserHelper;
import io.siddhi.core.util.snapshot.state.EmptyStateHolder;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.snapshot.state.StateHolder;
import io.siddhi.core.util.statistics.LatencyTracker;
import io.siddhi.core.util.statistics.ThroughputTracker;
import io.siddhi.core.util.statistics.metrics.Level;
import io.siddhi.core.util.transport.BackoffRetryCounter;
import io.siddhi.core.util.transport.DynamicOptions;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.query.api.annotation.Element;
import io.siddhi.query.api.definition.StreamDefinition;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;

public abstract class Sink<S extends State>
implements SinkListener {
    private static final Logger LOG = Logger.getLogger(Sink.class);
    protected AtomicBoolean isTryingToConnect = new AtomicBoolean(false);
    private StreamDefinition streamDefinition;
    private String type;
    private SinkMapper mapper;
    private SinkHandler handler;
    private DistributedTransport.ConnectionCallback connectionCallback = null;
    private StreamJunction streamJunction;
    private SiddhiAppContext siddhiAppContext;
    private OnErrorAction onErrorAction;
    private BackoffRetryCounter backoffRetryCounter = new BackoffRetryCounter();
    private AtomicBoolean isConnected = new AtomicBoolean(false);
    private AtomicBoolean isShutdown = new AtomicBoolean(false);
    private ThreadLocal<DynamicOptions> trpDynamicOptions;
    private ScheduledExecutorService scheduledExecutorService;
    private ThroughputTracker throughputTracker;
    private LatencyTracker mapperLatencyTracker;
    private StateHolder<S> stateHolder;
    private ServiceDeploymentInfo serviceDeploymentInfo;

    public final void init(StreamDefinition streamDefinition, String type, OptionHolder transportOptionHolder, ConfigReader sinkConfigReader, SinkMapper sinkMapper, String mapType, OptionHolder mapOptionHolder, SinkHandler sinkHandler, List<Element> payloadElementList, ConfigReader mapperConfigReader, Map<String, String> deploymentProperties, StreamJunction streamJunction, SiddhiAppContext siddhiAppContext) {
        this.streamDefinition = streamDefinition;
        this.type = type;
        this.streamJunction = streamJunction;
        this.siddhiAppContext = siddhiAppContext;
        this.onErrorAction = OnErrorAction.valueOf(transportOptionHolder.getOrCreateOption("on.error", "LOG").getValue().toUpperCase());
        if (this.onErrorAction == OnErrorAction.STORE && siddhiAppContext.getSiddhiContext().getErrorStore() == null) {
            LOG.error((Object)("On error action is 'STORE' for sink connected to stream " + streamDefinition.getId() + " in Siddhi App " + siddhiAppContext.getName() + " but error store is not configured in Siddhi Manager"));
        }
        if (siddhiAppContext.getStatisticsManager() != null) {
            this.throughputTracker = QueryParserHelper.createThroughputTracker(siddhiAppContext, streamDefinition.getId(), "Sinks", type);
            this.mapperLatencyTracker = QueryParserHelper.createLatencyTracker(siddhiAppContext, streamDefinition.getId(), "SinkMappers", type + "." + mapType);
        }
        StateFactory<S> stateFactory = this.init(streamDefinition, transportOptionHolder, sinkConfigReader, siddhiAppContext);
        this.stateHolder = siddhiAppContext.generateStateHolder(streamDefinition.getId() + "-" + this.getClass().getName(), stateFactory);
        if (sinkMapper != null) {
            sinkMapper.init(streamDefinition, mapType, mapOptionHolder, payloadElementList, this, mapperConfigReader, this.mapperLatencyTracker, transportOptionHolder, siddhiAppContext);
            this.mapper = sinkMapper;
        }
        if (sinkHandler != null) {
            sinkHandler.initSinkHandler(siddhiAppContext.getName(), streamDefinition, new SinkHandlerCallback(sinkMapper), siddhiAppContext);
            this.handler = sinkHandler;
        }
        this.scheduledExecutorService = siddhiAppContext.getScheduledExecutorService();
        this.serviceDeploymentInfo = this.exposeServiceDeploymentInfo();
        if (this.serviceDeploymentInfo != null) {
            this.serviceDeploymentInfo.addDeploymentProperties(deploymentProperties);
        } else if (!deploymentProperties.isEmpty()) {
            throw new SiddhiAppCreationException("Deployment properties '" + deploymentProperties + "' are defined for sink '" + type + "' which does not expose a service");
        }
    }

    public abstract Class[] getSupportedInputEventClasses();

    public final void initOnlyTransport(StreamDefinition streamDefinition, OptionHolder transportOptionHolder, ConfigReader sinkConfigReader, String type, DistributedTransport.ConnectionCallback connectionCallback, Map<String, String> deploymentProperties, SiddhiAppContext siddhiAppContext) {
        this.type = type;
        this.streamDefinition = streamDefinition;
        this.connectionCallback = connectionCallback;
        this.siddhiAppContext = siddhiAppContext;
        this.init(streamDefinition, transportOptionHolder, sinkConfigReader, siddhiAppContext);
        this.scheduledExecutorService = siddhiAppContext.getScheduledExecutorService();
        this.serviceDeploymentInfo = this.exposeServiceDeploymentInfo();
        if (this.serviceDeploymentInfo != null) {
            this.serviceDeploymentInfo.addDeploymentProperties(deploymentProperties);
        } else if (!deploymentProperties.isEmpty()) {
            throw new SiddhiAppCreationException("Deployment properties '" + deploymentProperties + "' are defined for sink '" + type + "' which does not expose a service");
        }
    }

    protected abstract ServiceDeploymentInfo exposeServiceDeploymentInfo();

    public abstract String[] getSupportedDynamicOptions();

    protected abstract StateFactory<S> init(StreamDefinition var1, OptionHolder var2, ConfigReader var3, SiddhiAppContext var4);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Override
    public final void publish(Object payload) {
        if (this.mapperLatencyTracker != null && Level.BASIC.compareTo(this.siddhiAppContext.getRootMetricsLevel()) <= 0) {
            this.mapperLatencyTracker.markOut();
        }
        DynamicOptions dynamicOptions = this.trpDynamicOptions.get();
        if (this.isConnected()) {
            S state = this.stateHolder.getState();
            try {
                this.publish(payload, dynamicOptions, state);
                if (this.throughputTracker == null || Level.BASIC.compareTo(this.siddhiAppContext.getRootMetricsLevel()) > 0) return;
                this.throughputTracker.eventIn();
                return;
            }
            catch (ConnectionUnavailableException e) {
                this.setConnected(false);
                if (this.connectionCallback != null) {
                    this.connectionCallback.connectionFailed();
                }
                if (!this.isTryingToConnect.getAndSet(true)) {
                    try {
                        this.connectAndPublish(payload, dynamicOptions, state);
                        this.isTryingToConnect.set(false);
                        return;
                    }
                    catch (ConnectionUnavailableException e1) {
                        this.isTryingToConnect.set(false);
                        this.onError(payload, dynamicOptions, e);
                    }
                    return;
                }
                this.onError(payload, dynamicOptions, e);
                return;
            }
            finally {
                this.stateHolder.returnState(state);
            }
        } else {
            if (this.isShutdown.get()) return;
            this.onError(payload, dynamicOptions, new ConnectionUnavailableException("Connection unavailable at Sink '" + this.type + "' at '" + this.streamDefinition.getId() + "'. Connection retrying is in progress from a different thread"));
        }
    }

    private void connectAndPublish(Object payload, DynamicOptions dynamicOptions, S state) throws ConnectionUnavailableException {
        this.connect();
        this.setConnected(true);
        this.publish(payload, dynamicOptions, state);
        if (this.connectionCallback != null) {
            this.connectionCallback.connectionEstablished();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void retryPublish(Object payload) throws ConnectionUnavailableException {
        DynamicOptions dynamicOptions = this.trpDynamicOptions.get();
        S state = this.stateHolder.getState();
        try {
            this.publish(payload, dynamicOptions, state);
            if (this.throughputTracker != null && Level.BASIC.compareTo(this.siddhiAppContext.getRootMetricsLevel()) <= 0) {
                this.throughputTracker.eventIn();
            }
        }
        finally {
            this.stateHolder.returnState(state);
        }
    }

    public abstract void publish(Object var1, DynamicOptions var2, S var3) throws ConnectionUnavailableException;

    public abstract void connect() throws ConnectionUnavailableException;

    public abstract void disconnect();

    public abstract void destroy();

    public final String getType() {
        return this.type;
    }

    public final SinkMapper getMapper() {
        return this.mapper;
    }

    public final SinkHandler getHandler() {
        return this.handler;
    }

    public void connectWithRetry() {
        this.connectWithRetry(false);
    }

    private void connectWithRetry(boolean forceConnect) {
        if (!(this.isConnected.get() || this.isTryingToConnect.getAndSet(true) && !forceConnect)) {
            try {
                this.connect();
                this.setConnected(true);
                this.isTryingToConnect.set(false);
                if (this.connectionCallback != null) {
                    this.connectionCallback.connectionEstablished();
                }
                this.backoffRetryCounter.reset();
            }
            catch (ConnectionUnavailableException e) {
                LOG.error((Object)StringUtil.removeCRLFCharacters(ExceptionUtil.getMessageWithContext(e, this.siddhiAppContext) + ", error while connecting at Sink '" + this.type + "' at '" + this.streamDefinition.getId() + "', will retry in '" + this.backoffRetryCounter.getTimeInterval() + "'."), (Throwable)e);
                this.scheduledExecutorService.schedule(new Runnable(){

                    @Override
                    public void run() {
                        Sink.this.connectWithRetry(true);
                    }
                }, this.backoffRetryCounter.getTimeIntervalMillis(), TimeUnit.MILLISECONDS);
                this.backoffRetryCounter.increment();
            }
            catch (RuntimeException e) {
                LOG.error((Object)(StringUtil.removeCRLFCharacters(ExceptionUtil.getMessageWithContext(e, this.siddhiAppContext)) + ", error while connecting at Sink '" + StringUtil.removeCRLFCharacters(this.type) + "' at '" + StringUtil.removeCRLFCharacters(this.streamDefinition.getId()) + "'."), (Throwable)e);
                throw e;
            }
        }
    }

    public void shutdown() {
        this.isShutdown.set(true);
        this.disconnect();
        this.destroy();
        this.setConnected(false);
        this.isTryingToConnect.set(false);
        if (this.connectionCallback != null) {
            this.connectionCallback.connectionFailed();
        }
    }

    void setTrpDynamicOptions(ThreadLocal<DynamicOptions> trpDynamicOptions) {
        this.trpDynamicOptions = trpDynamicOptions;
    }

    public StreamDefinition getStreamDefinition() {
        return this.streamDefinition;
    }

    public boolean isConnected() {
        return this.isConnected.get();
    }

    public void setConnected(boolean connected) {
        this.isConnected.set(connected);
    }

    @Deprecated
    void onError(Object payload, Exception e) {
        DynamicOptions dynamicOptions = this.trpDynamicOptions.get();
        if (dynamicOptions == null && this.onErrorAction == OnErrorAction.WAIT) {
            LOG.error((Object)("Error on '" + this.siddhiAppContext.getName() + "'. Dropping event at Sink '" + this.type + "' at '" + this.streamDefinition.getId() + "' as its does not support 'WAIT' as it uses deprecated onError(Object payload, Exception e) method!, events dropped '" + payload + "'"));
        } else {
            this.onError(payload, dynamicOptions, e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Unable to fully structure code
     */
    public void onError(Object payload, DynamicOptions dynamicOptions, Exception e) {
        errorAction = this.onErrorAction;
        if (e instanceof ConnectionUnavailableException) {
            this.setConnected(false);
            if (this.connectionCallback != null) {
                this.connectionCallback.connectionFailed();
            }
        } else if (errorAction == OnErrorAction.WAIT) {
            Sink.LOG.error((Object)("Error on '" + this.siddhiAppContext.getName() + "'. Dropping event at Sink '" + this.type + "' at '" + this.streamDefinition.getId() + "' as on.error='wait' does not handle '" + e.getClass().getName() + "' error: '" + e.getMessage() + "', events dropped '" + payload + "'"), (Throwable)e);
            return;
        }
        try {
            switch (2.$SwitchMap$io$siddhi$core$stream$output$sink$Sink$OnErrorAction[errorAction.ordinal()]) {
                case 1: {
                    this.connectWithRetry();
                    this.streamJunction.handleError(dynamicOptions.getEvent(), e);
                    ** break;
                }
                case 2: {
                    Sink.LOG.error((Object)StringUtil.removeCRLFCharacters(ExceptionUtil.getMessageWithContext(e, this.siddhiAppContext) + ", error while connecting Sink '" + this.type + "' at '" + this.streamDefinition.getId() + "', will retry every '5 sec'."), (Throwable)e);
                    count = 0;
                    while (!this.isConnected.get()) {
                        if (this.isShutdown.get()) {
                            this.isTryingToConnect.set(false);
                            return;
                        }
                        this.retryWait(5000L);
                        ++count;
                        if (this.isConnected.get() || this.isTryingToConnect.getAndSet(true)) ** GOTO lbl45
                        while (!this.isConnected.get()) {
                            if (this.isShutdown.get()) {
                                this.isTryingToConnect.set(false);
                                return;
                            }
                            state = this.stateHolder.getState();
                            try {
                                this.connectAndPublish(payload, dynamicOptions, state);
                                this.isTryingToConnect.set(false);
                                return;
                            }
                            catch (ConnectionUnavailableException var7_8) {
                            }
                            finally {
                                this.stateHolder.returnState(state);
                            }
                            if (count % 12 == 0) {
                                Sink.LOG.warn((Object)StringUtil.removeCRLFCharacters(ExceptionUtil.getMessageWithContext(e, this.siddhiAppContext) + ", still waiting to connect Sink '" + this.type + "' at '" + this.streamDefinition.getId() + "' retrying every '5 sec'."), (Throwable)e);
                            }
                            this.retryWait(5000L);
                            ++count;
                        }
lbl45:
                        // 2 sources

                        if (count % 12 != 0) continue;
                        Sink.LOG.warn((Object)StringUtil.removeCRLFCharacters(ExceptionUtil.getMessageWithContext(e, this.siddhiAppContext) + ", still waiting to connect Sink '" + this.type + "' at '" + this.streamDefinition.getId() + "."), (Throwable)e);
                    }
                    state = this.stateHolder.getState();
                    try {
                        this.publish(payload, dynamicOptions, state);
                        break;
                    }
                    catch (ConnectionUnavailableException ignore) {
                        this.onError(payload, dynamicOptions, e);
                        break;
                    }
                    finally {
                        this.stateHolder.returnState(state);
                    }
                }
                case 3: {
                    erroneousEvent = new ErroneousEvent(dynamicOptions.getEvent(), e, e.getMessage());
                    erroneousEvent.setOriginalPayload(payload);
                    ErrorStoreHelper.storeErroneousEvent(this.siddhiAppContext.getSiddhiContext().getErrorStore(), ErrorOccurrence.STORE_ON_SINK_ERROR, this.siddhiAppContext.getName(), erroneousEvent, this.streamDefinition.getId());
                    ** break;
                }
                default: {
                    this.connectWithRetry();
                    Sink.LOG.error((Object)("Error on '" + this.siddhiAppContext.getName() + "'. Dropping event at Sink '" + this.type + "' at '" + this.streamDefinition.getId() + "' as its still trying to reconnect!, events dropped '" + payload + "'"));
                    if (!Sink.LOG.isDebugEnabled()) break;
                    Sink.LOG.debug((Object)e);
lbl68:
                    // 3 sources

                }
            }
        }
        catch (Throwable t) {
            Sink.LOG.error((Object)("Error on '" + this.siddhiAppContext.getName() + "'. Dropping event at Sink '" + this.type + "' at '" + this.streamDefinition.getId() + "' as there is an issue when handling the error: '" + t.getMessage() + "', events dropped '" + payload + "'"), (Throwable)e);
        }
    }

    public List<ServiceDeploymentInfo> getServiceDeploymentInfoList() {
        if (this.serviceDeploymentInfo != null) {
            ArrayList<ServiceDeploymentInfo> list = new ArrayList<ServiceDeploymentInfo>(1);
            list.add(this.serviceDeploymentInfo);
            return list;
        }
        return new ArrayList<ServiceDeploymentInfo>(0);
    }

    private void retryWait(long waitTime) {
        try {
            Thread.sleep(waitTime);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    public boolean isStateful() {
        return this.stateHolder != null && !(this.stateHolder instanceof EmptyStateHolder);
    }

    public static enum OnErrorAction {
        LOG,
        WAIT,
        STREAM,
        STORE;

    }
}

