/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.extension.io.http.source;

import io.siddhi.core.stream.input.source.SourceEventListener;
import io.siddhi.extension.io.http.metrics.SourceMetrics;
import io.siddhi.extension.io.http.source.HttpWorkerThread;
import io.siddhi.extension.io.http.source.util.HttpSourceUtil;
import io.siddhi.extension.io.http.util.HTTPSourceRegistry;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wso2.transport.http.netty.message.HttpCarbonMessage;
import org.wso2.transport.http.netty.message.HttpMessageDataStreamer;

public class HttpSyncWorkerThread
implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(HttpWorkerThread.class);
    private HttpCarbonMessage carbonMessage;
    private SourceEventListener sourceEventListener;
    private String sourceID;
    private String[] trpProperties;
    private String sourceId;
    private String messageId;
    private SourceMetrics metrics;

    HttpSyncWorkerThread(HttpCarbonMessage cMessage, SourceEventListener sourceEventListener, String sourceID, String[] trpProperties, String sourceId, String messageId, SourceMetrics metrics) {
        this.carbonMessage = cMessage;
        this.sourceEventListener = sourceEventListener;
        this.sourceID = sourceID;
        this.trpProperties = trpProperties;
        this.messageId = messageId;
        this.sourceId = sourceId;
        this.metrics = metrics;
    }

    @Override
    public void run() {
        BufferedReader buf = new BufferedReader(new InputStreamReader(new HttpMessageDataStreamer(this.carbonMessage).getInputStream(), Charset.defaultCharset()));
        try {
            String payload = buf.lines().collect(Collectors.joining("\n"));
            if (!payload.equals("")) {
                HTTPSourceRegistry.getServiceSource(this.sourceId).registerCallback(this.carbonMessage, this.messageId);
                if (this.metrics != null) {
                    this.metrics.getTotalReadsMetric().inc();
                    this.metrics.getTotalHttpReadsMetric().inc();
                    this.metrics.getRequestSizeMetric().inc(HttpSourceUtil.getByteSize(payload));
                    this.metrics.setLastEventTime(System.currentTimeMillis());
                }
                this.sourceEventListener.onEvent((Object)payload, this.trpProperties);
                if (logger.isDebugEnabled()) {
                    logger.debug("Submitted Event " + payload + " Stream");
                }
            } else {
                if (this.metrics != null) {
                    this.metrics.getTotalHttpErrorsMetric().inc();
                }
                HttpSourceUtil.handleCallback(this.carbonMessage, 405);
                if (logger.isDebugEnabled()) {
                    logger.debug("Empty payload event, hence dropping the event chunk at source " + this.sourceID);
                }
            }
        }
        finally {
            try {
                buf.close();
                this.carbonMessage.waitAndReleaseAllEntities();
            }
            catch (IOException e) {
                if (this.metrics != null) {
                    this.metrics.getTotalHttpErrorsMetric().inc();
                }
                logger.error("Error occurred when closing the byte buffer in source " + this.sourceID, (Throwable)e);
            }
        }
    }
}

