/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.extension.io.http.sink;

import io.siddhi.extension.io.http.source.HttpWorkerThread;
import io.siddhi.extension.io.http.util.HTTPSinkRegistry;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wso2.transport.http.netty.message.HttpCarbonMessage;
import org.wso2.transport.http.netty.message.HttpMessageDataStreamer;

public class SSEWorkerThread
implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(HttpWorkerThread.class);
    private HttpCarbonMessage carbonMessage;
    private String streamID;

    SSEWorkerThread(HttpCarbonMessage cMessage, String streamID) {
        this.carbonMessage = cMessage;
        this.streamID = streamID;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        BufferedReader buf = new BufferedReader(new InputStreamReader(new HttpMessageDataStreamer(this.carbonMessage).getInputStream(), Charset.defaultCharset()));
        String payload = buf.lines().collect(Collectors.joining("\n"));
        this.carbonMessage.setStreaming(true);
        HTTPSinkRegistry.findAndGetSSESource(this.streamID).registerCallback(this.carbonMessage);
        if (logger.isDebugEnabled()) {
            logger.debug("Submitted Event " + payload + " Stream");
        }
        try {
            buf.close();
        }
        catch (IOException e) {
            logger.error("Error occurred when closing the byte buffer in source " + this.streamID, (Throwable)e);
        }
        finally {
            this.carbonMessage.waitAndReleaseAllEntities();
        }
    }
}

