/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.core.util.transport;

import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.stream.ServiceDeploymentInfo;
import io.siddhi.core.stream.output.sink.Sink;
import io.siddhi.core.stream.output.sink.distributed.DistributedTransport;
import io.siddhi.core.stream.output.sink.distributed.DistributionStrategy;
import io.siddhi.core.util.SiddhiClassLoader;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.extension.holder.SinkExecutorExtensionHolder;
import io.siddhi.core.util.parser.helper.DefinitionParserHelper;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.transport.DynamicOptions;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.query.api.annotation.Annotation;
import io.siddhi.query.api.extension.Extension;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;

public class MultiClientDistributedSink
extends DistributedTransport {
    private static final Logger log = Logger.getLogger(MultiClientDistributedSink.class);
    private List<Sink> transports = new ArrayList<Sink>();

    @Override
    public void publish(Object payload, DynamicOptions transportOptions, Integer destinationId, State state) throws ConnectionUnavailableException {
        Sink transport = this.transports.get(destinationId);
        try {
            transport.publish(payload, transportOptions, state);
        }
        catch (ConnectionUnavailableException e) {
            transport.setConnected(false);
            this.strategy.destinationFailed(destinationId);
            log.warn((Object)("Failed to publish payload to destination ID " + destinationId));
            transport.connectWithRetry();
            throw e;
        }
    }

    @Override
    public void initTransport(OptionHolder sinkOptionHolder, List<OptionHolder> destinationOptionHolders, Map<String, String> deploymentProperties, List<Map<String, String>> destinationDeploymentProperties, Annotation sinkAnnotation, ConfigReader sinkConfigReader, DistributionStrategy strategy, String type, SiddhiAppContext siddhiAppContext) {
        String transportType = sinkOptionHolder.validateAndGetStaticValue("type");
        Extension sinkExtension = DefinitionParserHelper.constructExtension(this.streamDefinition, "Sink", transportType, sinkAnnotation, "sink");
        for (int i = 0; i < destinationOptionHolders.size(); ++i) {
            OptionHolder destinationOption = destinationOptionHolders.get(i);
            Sink sink = (Sink)SiddhiClassLoader.loadExtensionImplementation(sinkExtension, SinkExecutorExtensionHolder.getInstance(siddhiAppContext));
            destinationOption.merge(sinkOptionHolder);
            sink.initOnlyTransport(this.streamDefinition, destinationOption, sinkConfigReader, type, new MultiClientConnectionCallback(this.transports.size(), strategy), destinationDeploymentProperties.get(i), siddhiAppContext);
            if (!sink.getServiceDeploymentInfoList().isEmpty()) {
                sink.getServiceDeploymentInfoList().get(0).addDeploymentProperties(deploymentProperties);
            }
            this.transports.add(sink);
        }
    }

    @Override
    public Class[] getSupportedInputEventClasses() {
        return this.transports.get(0).getSupportedInputEventClasses();
    }

    @Override
    protected ServiceDeploymentInfo exposedServiceDeploymentInfo() {
        return null;
    }

    @Override
    public void connect() throws ConnectionUnavailableException {
        for (Sink transport : this.transports) {
            if (transport.isConnected()) continue;
            transport.connectWithRetry();
        }
    }

    @Override
    public void disconnect() {
        this.transports.forEach(Sink::disconnect);
    }

    @Override
    public void destroy() {
        this.transports.forEach(Sink::destroy);
    }

    @Override
    public List<ServiceDeploymentInfo> getServiceDeploymentInfoList() {
        ArrayList<ServiceDeploymentInfo> serviceDeploymentInfoList = new ArrayList<ServiceDeploymentInfo>();
        for (Sink sink : this.transports) {
            serviceDeploymentInfoList.addAll(sink.getServiceDeploymentInfoList());
        }
        return serviceDeploymentInfoList;
    }

    public class MultiClientConnectionCallback
    extends DistributedTransport.ConnectionCallback {
        private final int destinationId;
        private final DistributionStrategy strategy;

        private MultiClientConnectionCallback(int destinationId, DistributionStrategy strategy) {
            this.destinationId = destinationId;
            this.strategy = strategy;
        }

        @Override
        public void connectionEstablished() {
            this.strategy.destinationAvailable(this.destinationId);
        }

        @Override
        public void connectionFailed() {
            this.strategy.destinationFailed(this.destinationId);
        }
    }
}

