/*
 * Decompiled with CFR 0.152.
 */
package org.ballerinalang.nats.streaming.producer;

import io.nats.streaming.AckHandler;
import io.nats.streaming.StreamingConnection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import org.ballerinalang.jvm.scheduling.Scheduler;
import org.ballerinalang.jvm.values.ObjectValue;
import org.ballerinalang.jvm.values.connector.NonBlockingCallback;
import org.ballerinalang.nats.Utils;
import org.ballerinalang.nats.observability.NatsMetricsUtil;
import org.ballerinalang.nats.observability.NatsTracingUtil;
import org.ballerinalang.nats.streaming.producer.AckListener;

public class Publish {
    public static Object externStreamingPublish(ObjectValue publisher, String subject, Object data, ObjectValue connectionObject) {
        StreamingConnection streamingConnection = (StreamingConnection)publisher.getNativeData("nats_streaming_connection");
        NatsMetricsUtil natsMetricsUtil = (NatsMetricsUtil)connectionObject.getNativeData("nats_metric_util");
        NatsTracingUtil.traceResourceInvocation(Scheduler.getStrand(), streamingConnection.getNatsConnection().getConnectedUrl(), subject);
        byte[] byteData = Utils.convertDataIntoByteArray(data);
        try {
            NonBlockingCallback nonBlockingCallback = new NonBlockingCallback(Scheduler.getStrand());
            AckListener ackListener = new AckListener(nonBlockingCallback, subject, natsMetricsUtil);
            natsMetricsUtil.reportPublish(subject, byteData.length);
            return streamingConnection.publish(subject, byteData, (AckHandler)ackListener);
        }
        catch (InterruptedException e) {
            natsMetricsUtil.reportProducerError(subject, "publish");
            return Utils.createNatsError("Failed to publish due to an internal error");
        }
        catch (IOException | TimeoutException e) {
            natsMetricsUtil.reportProducerError(subject, "publish");
            return Utils.createNatsError(e.getMessage());
        }
    }
}

