/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.fn.data;

import java.io.IOException;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BeamFnDataBufferingOutboundObserver<T>
implements CloseableFnDataReceiver<WindowedValue<T>> {
    public static final String BEAM_FN_API_DATA_BUFFER_LIMIT = "beam_fn_api_data_buffer_limit=";
    @VisibleForTesting
    static final int DEFAULT_BUFFER_LIMIT_BYTES = 1000000;
    private static final Logger LOG = LoggerFactory.getLogger(BeamFnDataBufferingOutboundObserver.class);
    private long byteCounter;
    private long counter;
    private boolean closed;
    private final int bufferLimit;
    private final Coder<WindowedValue<T>> coder;
    private final LogicalEndpoint outputLocation;
    private final StreamObserver<BeamFnApi.Elements> outboundObserver;
    private final ByteString.Output bufferedElements;

    public static <T> BeamFnDataBufferingOutboundObserver<T> forLocation(LogicalEndpoint endpoint, Coder<WindowedValue<T>> coder, StreamObserver<BeamFnApi.Elements> outboundObserver) {
        return BeamFnDataBufferingOutboundObserver.forLocationWithBufferLimit(1000000, endpoint, coder, outboundObserver);
    }

    public static <T> BeamFnDataBufferingOutboundObserver<T> forLocationWithBufferLimit(int bufferLimit, LogicalEndpoint endpoint, Coder<WindowedValue<T>> coder, StreamObserver<BeamFnApi.Elements> outboundObserver) {
        return new BeamFnDataBufferingOutboundObserver<T>(bufferLimit, endpoint, coder, outboundObserver);
    }

    private BeamFnDataBufferingOutboundObserver(int bufferLimit, LogicalEndpoint outputLocation, Coder<WindowedValue<T>> coder, StreamObserver<BeamFnApi.Elements> outboundObserver) {
        this.bufferLimit = bufferLimit;
        this.outputLocation = outputLocation;
        this.coder = coder;
        this.outboundObserver = outboundObserver;
        this.bufferedElements = ByteString.newOutput();
        this.closed = false;
    }

    @Override
    public void close() throws Exception {
        if (this.closed) {
            throw new IllegalStateException("Already closed.");
        }
        this.closed = true;
        BeamFnApi.Elements.Builder elements = this.convertBufferForTransmission();
        elements.addDataBuilder().setInstructionReference(this.outputLocation.getInstructionId()).setTarget(this.outputLocation.getTarget());
        LOG.debug("Closing stream for instruction {} and target {} having transmitted {} values {} bytes", this.outputLocation.getInstructionId(), this.outputLocation.getTarget(), this.counter, this.byteCounter);
        this.outboundObserver.onNext(elements.build());
    }

    @Override
    public void flush() throws IOException {
        if (this.bufferedElements.size() > 0) {
            this.outboundObserver.onNext(this.convertBufferForTransmission().build());
        }
    }

    @Override
    public void accept(WindowedValue<T> t) throws IOException {
        if (this.closed) {
            throw new IllegalStateException("Already closed.");
        }
        this.coder.encode(t, this.bufferedElements);
        ++this.counter;
        if (this.bufferedElements.size() >= this.bufferLimit) {
            this.flush();
        }
    }

    private BeamFnApi.Elements.Builder convertBufferForTransmission() {
        BeamFnApi.Elements.Builder elements = BeamFnApi.Elements.newBuilder();
        if (this.bufferedElements.size() == 0) {
            return elements;
        }
        elements.addDataBuilder().setInstructionReference(this.outputLocation.getInstructionId()).setTarget(this.outputLocation.getTarget()).setData(this.bufferedElements.toByteString());
        this.byteCounter += (long)this.bufferedElements.size();
        this.bufferedElements.reset();
        return elements;
    }
}

