/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.core.util.transport;

import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.stream.ServiceDeploymentInfo;
import io.siddhi.core.stream.output.sink.Sink;
import io.siddhi.core.stream.output.sink.distributed.DistributedTransport;
import io.siddhi.core.stream.output.sink.distributed.DistributionStrategy;
import io.siddhi.core.util.SiddhiClassLoader;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.extension.holder.SinkExecutorExtensionHolder;
import io.siddhi.core.util.parser.helper.DefinitionParserHelper;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.transport.DynamicOptions;
import io.siddhi.core.util.transport.Option;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.query.api.annotation.Annotation;
import io.siddhi.query.api.exception.SiddhiAppValidationException;
import io.siddhi.query.api.extension.Extension;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class SingleClientDistributedSink
extends DistributedTransport {
    private static final Logger log = LogManager.getLogger(SingleClientDistributedSink.class);
    private Sink sink;
    private int destinationCount = 0;

    @Override
    public void publish(Object payload, DynamicOptions transportOptions, Integer destinationId, State s) throws ConnectionUnavailableException {
        try {
            transportOptions.setVariableOptionIndex(destinationId);
            this.sink.publish(payload, transportOptions, s);
        }
        catch (ConnectionUnavailableException e) {
            this.sink.setConnected(false);
            this.strategy.destinationFailed(destinationId);
            log.warn("Failed to publish payload to destination ID " + destinationId + ".", (Throwable)e);
            this.sink.connectWithRetry();
            throw e;
        }
    }

    @Override
    public void initTransport(OptionHolder sinkOptionHolder, List<OptionHolder> destinationOptionHolders, Map<String, String> deploymentProperties, List<Map<String, String>> destinationDeploymentProperties, Annotation sinkAnnotation, ConfigReader sinkConfigReader, DistributionStrategy strategy, String type, SiddhiAppContext siddhiAppContext) {
        String transportType = sinkOptionHolder.validateAndGetStaticValue("type");
        Extension sinkExtension = DefinitionParserHelper.constructExtension(this.streamDefinition, "Sink", transportType, sinkAnnotation, "sink");
        Set<String> allDynamicOptionKeys = this.findAllDynamicOptions(destinationOptionHolders);
        destinationOptionHolders.forEach(optionHolder -> {
            optionHolder.merge(sinkOptionHolder);
            allDynamicOptionKeys.forEach(optionKey -> {
                String optionValue = optionHolder.getOrCreateOption((String)optionKey, null).getValue();
                if (optionValue == null || optionValue.isEmpty()) {
                    throw new SiddhiAppValidationException("Destination properties can only contain non-empty static values.");
                }
                Option sinkOption = sinkOptionHolder.getOrAddStaticOption((String)optionKey, optionValue);
                sinkOption.addVariableValue(optionValue);
                ++this.destinationCount;
            });
        });
        this.sink = (Sink)SiddhiClassLoader.loadExtensionImplementation(sinkExtension, SinkExecutorExtensionHolder.getInstance(siddhiAppContext));
        this.sink.initOnlyTransport(this.streamDefinition, sinkOptionHolder, sinkConfigReader, type, new SingleClientConnectionCallback(this.destinationCount, strategy), destinationDeploymentProperties.get(0), siddhiAppContext);
        if (!this.sink.getServiceDeploymentInfoList().isEmpty()) {
            this.sink.getServiceDeploymentInfoList().get(0).addDeploymentProperties(deploymentProperties);
        }
    }

    @Override
    public Class[] getSupportedInputEventClasses() {
        return this.sink.getSupportedInputEventClasses();
    }

    @Override
    public void connect() throws ConnectionUnavailableException {
        this.sink.connectWithRetry();
    }

    @Override
    public void disconnect() {
        this.sink.disconnect();
    }

    @Override
    public void destroy() {
        this.sink.destroy();
    }

    @Override
    protected ServiceDeploymentInfo exposeServiceDeploymentInfo() {
        return null;
    }

    @Override
    public List<ServiceDeploymentInfo> getServiceDeploymentInfoList() {
        return this.sink.getServiceDeploymentInfoList();
    }

    private Set<String> findAllDynamicOptions(List<OptionHolder> destinationOptionHolders) {
        HashSet<String> dynamicOptions = new HashSet<String>();
        destinationOptionHolders.forEach(destinationOptionHolder -> {
            destinationOptionHolder.getDynamicOptionsKeys().forEach(dynamicOptions::add);
            destinationOptionHolder.getStaticOptionsKeys().forEach(dynamicOptions::add);
        });
        return dynamicOptions;
    }

    public class SingleClientConnectionCallback
    extends DistributedTransport.ConnectionCallback {
        private final int destinations;
        private final DistributionStrategy strategy;

        private SingleClientConnectionCallback(int destinations, DistributionStrategy strategy) {
            this.destinations = destinations;
            this.strategy = strategy;
        }

        @Override
        public void connectionEstablished() {
            for (int i = 0; i < this.destinations; ++i) {
                this.strategy.destinationAvailable(i);
            }
        }

        @Override
        public void connectionFailed() {
            for (int i = 0; i < this.destinations; ++i) {
                this.strategy.destinationFailed(i);
            }
        }
    }
}

