/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.core.stream.output.sink.distributed;

import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.stream.output.sink.Sink;
import io.siddhi.core.stream.output.sink.SinkHandler;
import io.siddhi.core.stream.output.sink.SinkMapper;
import io.siddhi.core.stream.output.sink.distributed.DistributionStrategy;
import io.siddhi.core.util.ExceptionUtil;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.transport.DynamicOptions;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.query.api.annotation.Annotation;
import io.siddhi.query.api.annotation.Element;
import io.siddhi.query.api.definition.StreamDefinition;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;

public abstract class DistributedTransport
extends Sink {
    private static final Logger log = Logger.getLogger(DistributedTransport.class);
    protected DistributionStrategy strategy;
    protected StreamDefinition streamDefinition;
    protected SiddhiAppContext siddhiAppContext;
    private String type;
    private OptionHolder sinkOptionHolder;
    private String[] supportedDynamicOptions;

    protected StateFactory<State> init(StreamDefinition outputStreamDefinition, OptionHolder optionHolder, ConfigReader sinkConfigReader, SiddhiAppContext siddhiAppContext) {
        this.streamDefinition = outputStreamDefinition;
        this.sinkOptionHolder = optionHolder;
        this.siddhiAppContext = siddhiAppContext;
        return null;
    }

    public void init(StreamDefinition streamDefinition, String type, OptionHolder transportOptionHolder, ConfigReader sinkConfigReader, SinkMapper sinkMapper, String mapType, OptionHolder mapOptionHolder, SinkHandler sinkHandler, List<Element> payloadElementList, ConfigReader mapperConfigReader, SiddhiAppContext siddhiAppContext, List<OptionHolder> destinationOptionHolders, Annotation sinkAnnotation, DistributionStrategy strategy, String[] supportedDynamicOptions, Map<String, String> deploymentProperties, List<Map<String, String>> destinationDeploymentProperties) {
        this.type = type;
        this.strategy = strategy;
        this.supportedDynamicOptions = supportedDynamicOptions;
        this.init(streamDefinition, type, transportOptionHolder, sinkConfigReader, sinkMapper, mapType, mapOptionHolder, sinkHandler, payloadElementList, mapperConfigReader, new HashMap<String, String>(), siddhiAppContext);
        this.initTransport(this.sinkOptionHolder, destinationOptionHolders, deploymentProperties, destinationDeploymentProperties, sinkAnnotation, sinkConfigReader, strategy, type, siddhiAppContext);
    }

    public void publish(Object payload, DynamicOptions transportOptions, State state) throws ConnectionUnavailableException {
        int errorCount = 0;
        StringBuilder errorMessages = null;
        List<Integer> destinationsToPublish = this.strategy.getDestinationsToPublish(payload, transportOptions);
        int destinationsToPublishSize = destinationsToPublish.size();
        if (destinationsToPublishSize == 0) {
            throw new ConnectionUnavailableException("Error on '" + this.siddhiAppContext.getName() + "' at Sink '" + this.type + "' stream '" + this.streamDefinition.getId() + "' as no connections are available to publish data.");
        }
        for (Integer destinationId : destinationsToPublish) {
            try {
                this.publish(payload, transportOptions, destinationId, state);
            }
            catch (ConnectionUnavailableException e) {
                ++errorCount;
                if (errorMessages == null) {
                    errorMessages = new StringBuilder();
                }
                errorMessages.append("[Destination ").append(destinationId).append("]:").append(e.getMessage());
                log.warn((Object)(ExceptionUtil.getMessageWithContext(e, this.siddhiAppContext) + " Failed to publish destination ID " + destinationId));
            }
        }
        if (errorCount == destinationsToPublish.size()) {
            throw new ConnectionUnavailableException("Error on '" + this.siddhiAppContext.getName() + "'. " + errorCount + "/" + destinationsToPublish.size() + " connections failed while trying to publish with following error messages:" + errorMessages.toString());
        }
    }

    @Override
    public boolean isConnected() {
        return this.strategy.getActiveDestinationCount() > 0;
    }

    @Override
    public String[] getSupportedDynamicOptions() {
        return this.supportedDynamicOptions;
    }

    public abstract void publish(Object var1, DynamicOptions var2, Integer var3, State var4) throws ConnectionUnavailableException;

    public abstract void initTransport(OptionHolder var1, List<OptionHolder> var2, Map<String, String> var3, List<Map<String, String>> var4, Annotation var5, ConfigReader var6, DistributionStrategy var7, String var8, SiddhiAppContext var9);

    @Override
    public void connectWithRetry() {
        if (!this.isConnected()) {
            this.isTryingToConnect.set(true);
            try {
                this.connect();
            }
            catch (ConnectionUnavailableException connectionUnavailableException) {
                // empty catch block
            }
            for (int retryAttempt = 0; this.strategy.getActiveDestinationCount() == 0 && retryAttempt < 4; ++retryAttempt) {
                try {
                    Thread.sleep(5000L);
                    continue;
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            }
        }
    }

    public abstract class ConnectionCallback {
        public abstract void connectionEstablished();

        public abstract void connectionFailed();
    }
}

