/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness.data;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.data.InboundDataClient;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.sdk.util.WindowedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueueingBeamFnDataClient
implements BeamFnDataClient {
    private static final int QUEUE_SIZE = 1000;
    private static final Logger LOG = LoggerFactory.getLogger(QueueingBeamFnDataClient.class);
    private final BeamFnDataClient mainClient;
    private final LinkedBlockingQueue<ConsumerAndData> queue;
    private final ConcurrentHashMap<InboundDataClient, Object> inboundDataClients;

    public QueueingBeamFnDataClient(BeamFnDataClient mainClient) {
        this.mainClient = mainClient;
        this.queue = new LinkedBlockingQueue(1000);
        this.inboundDataClients = new ConcurrentHashMap();
    }

    @Override
    public <T> InboundDataClient receive(Endpoints.ApiServiceDescriptor apiServiceDescriptor, LogicalEndpoint inputLocation, Coder<WindowedValue<T>> coder, FnDataReceiver<WindowedValue<T>> consumer) {
        InboundDataClient inboundDataClient;
        LOG.debug("Registering consumer for instruction {} and target {}", (Object)inputLocation.getInstructionId(), (Object)inputLocation.getTarget());
        QueueingFnDataReceiver<T> queueingConsumer = new QueueingFnDataReceiver<T>(consumer);
        queueingConsumer.inboundDataClient = inboundDataClient = this.mainClient.receive(apiServiceDescriptor, inputLocation, coder, queueingConsumer);
        this.inboundDataClients.computeIfAbsent(inboundDataClient, idcToStore -> idcToStore);
        return inboundDataClient;
    }

    private boolean allDone() {
        for (InboundDataClient inboundDataClient : this.inboundDataClients.keySet()) {
            if (inboundDataClient.isDone()) continue;
            return false;
        }
        return this.queue.isEmpty();
    }

    public void drainAndBlock() throws Exception {
        try {
            while (true) {
                ConsumerAndData tuple;
                if ((tuple = this.queue.poll(200L, TimeUnit.MILLISECONDS)) != null) {
                    tuple.consumer.accept(tuple.data);
                    continue;
                }
                if (this.allDone()) break;
            }
        }
        catch (Exception e) {
            LOG.error("Client failed to dequeue and process WindowedValue", e);
            for (InboundDataClient inboundDataClient : this.inboundDataClients.keySet()) {
                inboundDataClient.fail(e);
            }
            throw e;
        }
    }

    @Override
    public <T> CloseableFnDataReceiver<WindowedValue<T>> send(Endpoints.ApiServiceDescriptor apiServiceDescriptor, LogicalEndpoint outputLocation, Coder<WindowedValue<T>> coder) {
        LOG.debug("Creating output consumer for instruction {} and target {}", (Object)outputLocation.getInstructionId(), (Object)outputLocation.getTarget());
        return this.mainClient.send(apiServiceDescriptor, outputLocation, coder);
    }

    static class ConsumerAndData<T> {
        public FnDataReceiver<WindowedValue<T>> consumer;
        public WindowedValue<T> data;

        public ConsumerAndData(FnDataReceiver<WindowedValue<T>> receiver, WindowedValue<T> data) {
            this.consumer = receiver;
            this.data = data;
        }
    }

    public class QueueingFnDataReceiver<T>
    implements FnDataReceiver<WindowedValue<T>> {
        private final FnDataReceiver<WindowedValue<T>> consumer;
        public InboundDataClient inboundDataClient;

        public QueueingFnDataReceiver(FnDataReceiver<WindowedValue<T>> consumer) {
            this.consumer = consumer;
        }

        @Override
        public void accept(WindowedValue<T> value) throws Exception {
            try {
                ConsumerAndData<T> offering = new ConsumerAndData<T>(this.consumer, value);
                while (!QueueingBeamFnDataClient.this.queue.offer(offering, 200L, TimeUnit.MILLISECONDS) && !this.inboundDataClient.isDone()) {
                }
            }
            catch (Exception e) {
                LOG.error("Failed to insert WindowedValue into the queue", e);
                this.inboundDataClient.fail(e);
                throw e;
            }
        }
    }
}

