package io.siddhi.extension.io.http.source;

import io.siddhi.core.stream.input.source.SourceEventListener;
import io.siddhi.extension.io.http.source.util.HttpSourceUtil;
import io.siddhi.extension.io.http.util.HTTPSourceRegistry;
import io.siddhi.extension.io.http.util.HttpConstants;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wso2.transport.http.netty.message.HttpCarbonMessage;
import org.wso2.transport.http.netty.message.HttpMessageDataStreamer;

/* loaded from: input_file:io/siddhi/extension/io/http/source/HttpSyncWorkerThread.class */
public class HttpSyncWorkerThread implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(HttpWorkerThread.class);
    private HttpCarbonMessage carbonMessage;
    private SourceEventListener sourceEventListener;
    private String sourceID;
    private String[] trpProperties;
    private String sourceId;
    private String messageId;

    /* JADX INFO: Access modifiers changed from: package-private */
    public HttpSyncWorkerThread(HttpCarbonMessage httpCarbonMessage, SourceEventListener sourceEventListener, String str, String[] strArr, String str2, String str3) {
        this.carbonMessage = httpCarbonMessage;
        this.sourceEventListener = sourceEventListener;
        this.sourceID = str;
        this.trpProperties = strArr;
        this.messageId = str3;
        this.sourceId = str2;
    }

    @Override // java.lang.Runnable
    public void run() {
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new HttpMessageDataStreamer(this.carbonMessage).getInputStream(), Charset.defaultCharset()));
        try {
            String str = (String) bufferedReader.lines().collect(Collectors.joining(HttpConstants.NEW_LINE));
            if (str.equals("")) {
                HttpSourceUtil.handleCallback(this.carbonMessage, 405);
                if (logger.isDebugEnabled()) {
                    logger.debug("Empty payload event, hence dropping the event chunk at source " + this.sourceID);
                }
            } else {
                HTTPSourceRegistry.getServiceSource(this.sourceId).registerCallback(this.carbonMessage, this.messageId);
                this.sourceEventListener.onEvent(str, this.trpProperties);
                if (logger.isDebugEnabled()) {
                    logger.debug("Submitted Event " + str + " Stream");
                }
            }
        } finally {
            try {
                bufferedReader.close();
                this.carbonMessage.waitAndReleaseAllEntities();
            } catch (IOException e) {
                logger.error("Error occurred when closing the byte buffer in source " + this.sourceID, e);
            }
        }
    }
}
