package org.apache.beam.fn.harness.data;

import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.data.InboundDataClient;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.sdk.util.WindowedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/fn/harness/data/QueueingBeamFnDataClient.class */
public class QueueingBeamFnDataClient implements BeamFnDataClient {
    private static final int QUEUE_SIZE = 1000;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) QueueingBeamFnDataClient.class);
    private final BeamFnDataClient mainClient;
    private final LinkedBlockingQueue<ConsumerAndData> queue = new LinkedBlockingQueue<>(1000);
    private final ConcurrentHashMap<InboundDataClient, Object> inboundDataClients = new ConcurrentHashMap<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/fn/harness/data/QueueingBeamFnDataClient$ConsumerAndData.class */
    public static class ConsumerAndData<T> {
        public FnDataReceiver<WindowedValue<T>> consumer;
        public WindowedValue<T> data;

        public ConsumerAndData(FnDataReceiver<WindowedValue<T>> fnDataReceiver, WindowedValue<T> windowedValue) {
            this.consumer = fnDataReceiver;
            this.data = windowedValue;
        }
    }

    /* loaded from: input_file:org/apache/beam/fn/harness/data/QueueingBeamFnDataClient$QueueingFnDataReceiver.class */
    public class QueueingFnDataReceiver<T> implements FnDataReceiver<WindowedValue<T>> {
        private final FnDataReceiver<WindowedValue<T>> consumer;
        public InboundDataClient inboundDataClient;

        public QueueingFnDataReceiver(FnDataReceiver<WindowedValue<T>> fnDataReceiver) {
            this.consumer = fnDataReceiver;
        }

        @Override // org.apache.beam.sdk.fn.data.FnDataReceiver
        public void accept(WindowedValue<T> windowedValue) throws Exception {
            try {
                ConsumerAndData consumerAndData = new ConsumerAndData(this.consumer, windowedValue);
                while (!QueueingBeamFnDataClient.this.queue.offer(consumerAndData, 200L, TimeUnit.MILLISECONDS) && !this.inboundDataClient.isDone()) {
                }
            } catch (Exception e) {
                QueueingBeamFnDataClient.LOG.error("Failed to insert WindowedValue into the queue", (Throwable) e);
                this.inboundDataClient.fail(e);
                throw e;
            }
        }
    }

    public QueueingBeamFnDataClient(BeamFnDataClient beamFnDataClient) {
        this.mainClient = beamFnDataClient;
    }

    @Override // org.apache.beam.fn.harness.data.BeamFnDataClient
    public <T> InboundDataClient receive(Endpoints.ApiServiceDescriptor apiServiceDescriptor, LogicalEndpoint logicalEndpoint, Coder<WindowedValue<T>> coder, FnDataReceiver<WindowedValue<T>> fnDataReceiver) {
        LOG.debug("Registering consumer for instruction {} and transform {}", logicalEndpoint.getInstructionId(), logicalEndpoint.getPTransformId());
        QueueingFnDataReceiver queueingFnDataReceiver = new QueueingFnDataReceiver(fnDataReceiver);
        InboundDataClient receive = this.mainClient.receive(apiServiceDescriptor, logicalEndpoint, coder, queueingFnDataReceiver);
        queueingFnDataReceiver.inboundDataClient = receive;
        this.inboundDataClients.computeIfAbsent(receive, inboundDataClient -> {
            return inboundDataClient;
        });
        return receive;
    }

    private boolean allDone() {
        Iterator it = this.inboundDataClients.keySet().iterator();
        while (it.hasNext()) {
            if (!((InboundDataClient) it.next()).isDone()) {
                return false;
            }
        }
        return this.queue.isEmpty();
    }

    public void drainAndBlock() throws Exception {
        while (true) {
            try {
                ConsumerAndData poll = this.queue.poll(200L, TimeUnit.MILLISECONDS);
                if (poll != null) {
                    poll.consumer.accept(poll.data);
                } else if (allDone()) {
                    return;
                }
            } catch (Exception e) {
                LOG.error("Client failed to dequeue and process WindowedValue", (Throwable) e);
                Iterator it = this.inboundDataClients.keySet().iterator();
                while (it.hasNext()) {
                    ((InboundDataClient) it.next()).fail(e);
                }
                throw e;
            }
        }
    }

    @Override // org.apache.beam.fn.harness.data.BeamFnDataClient
    public <T> CloseableFnDataReceiver<WindowedValue<T>> send(Endpoints.ApiServiceDescriptor apiServiceDescriptor, LogicalEndpoint logicalEndpoint, Coder<WindowedValue<T>> coder) {
        LOG.debug("Creating output consumer for instruction {} and transform {}", logicalEndpoint.getInstructionId(), logicalEndpoint.getPTransformId());
        return this.mainClient.send(apiServiceDescriptor, logicalEndpoint, coder);
    }
}
