/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.extension.io.http.source;

import io.siddhi.core.stream.input.source.SourceEventListener;
import io.siddhi.extension.io.http.metrics.SourceMetrics;
import io.siddhi.extension.io.http.source.HttpResponseProcessor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wso2.transport.http.netty.contract.HttpConnectorListener;
import org.wso2.transport.http.netty.message.HttpCarbonMessage;

public class HttpCallResponseConnectorListener
implements HttpConnectorListener {
    private static final Logger log = LoggerFactory.getLogger(HttpCallResponseConnectorListener.class);
    private SourceEventListener sourceEventListener;
    private String sinkId;
    private ExecutorService executorService;
    private String siddhiAppName;
    private String[] trpPropertyNames;
    private boolean shouldAllowStreamingResponses;
    private SourceMetrics metrics;

    public HttpCallResponseConnectorListener(int numberOfThreads, SourceEventListener sourceEventListener, boolean shouldAllowStreamingResponses, String sinkId, String[] trpPropertyNames, String siddhiAppName, SourceMetrics metrics) {
        this.sourceEventListener = sourceEventListener;
        this.sinkId = sinkId;
        this.executorService = Executors.newFixedThreadPool(numberOfThreads);
        this.siddhiAppName = siddhiAppName;
        this.trpPropertyNames = (String[])trpPropertyNames.clone();
        this.shouldAllowStreamingResponses = shouldAllowStreamingResponses;
        this.metrics = metrics;
    }

    @Override
    public void onMessage(HttpCarbonMessage carbonMessage) {
        String[] properties = new String[this.trpPropertyNames.length];
        for (int i = 0; i < this.trpPropertyNames.length; ++i) {
            Object property = carbonMessage.getProperty(this.trpPropertyNames[i]);
            if (property == null) continue;
            properties[i] = carbonMessage.getProperty(this.trpPropertyNames[i]).toString();
        }
        HttpResponseProcessor workerThread = new HttpResponseProcessor(carbonMessage, this.sourceEventListener, this.shouldAllowStreamingResponses, this.sinkId, properties, this.metrics);
        this.executorService.execute(workerThread);
    }

    @Override
    public void onError(Throwable throwable) {
        log.error("Error occurred during processing response for the request sent by http-call sink with 'sink.id' = " + this.sinkId + " in Siddhi app " + this.siddhiAppName + ".", throwable);
    }

    String getSiddhiAppName() {
        return this.siddhiAppName;
    }

    void disconnect() {
        this.executorService.shutdown();
    }
}

