package io.druid.segment.realtime.firehose;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.MapInputRowParser;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Response;

/* loaded from: input_file:io/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.class */
public class EventReceiverFirehoseFactory implements FirehoseFactory<MapInputRowParser> {
    private static final EmittingLogger log = new EmittingLogger(EventReceiverFirehoseFactory.class);
    private static final int DEFAULT_BUFFER_SIZE = 100000;
    private final String serviceName;
    private final int bufferSize;
    private final Optional<ChatHandlerProvider> chatHandlerProvider;

    /* loaded from: input_file:io/druid/segment/realtime/firehose/EventReceiverFirehoseFactory$EventReceiverFirehose.class */
    public class EventReceiverFirehose implements ChatHandler, Firehose {
        private final BlockingQueue<InputRow> buffer;
        private final MapInputRowParser parser;
        private final Object readLock = new Object();
        private volatile InputRow nextRow = null;
        private volatile boolean closed = false;

        public EventReceiverFirehose(MapInputRowParser mapInputRowParser) {
            this.buffer = new ArrayBlockingQueue(EventReceiverFirehoseFactory.this.bufferSize);
            this.parser = mapInputRowParser;
        }

        @POST
        @Produces({"application/json"})
        @Path("/push-events")
        public Response addAll(Collection<Map<String, Object>> collection) {
            EventReceiverFirehoseFactory.log.debug("Adding %,d events to firehose: %s", new Object[]{Integer.valueOf(collection.size()), EventReceiverFirehoseFactory.this.serviceName});
            ArrayList<InputRow> newArrayList = Lists.newArrayList();
            Iterator<Map<String, Object>> it = collection.iterator();
            while (it.hasNext()) {
                newArrayList.add(this.parser.parse(it.next()));
            }
            try {
                for (InputRow inputRow : newArrayList) {
                    boolean z = false;
                    while (!this.closed && !z) {
                        z = this.buffer.offer(inputRow, 500L, TimeUnit.MILLISECONDS);
                    }
                    if (!z) {
                        throw new IllegalStateException("Cannot add events to closed firehose!");
                    }
                }
                return Response.ok().entity(ImmutableMap.of("eventCount", Integer.valueOf(collection.size()))).build();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw Throwables.propagate(e);
            }
        }

        public boolean hasMore() {
            boolean z;
            synchronized (this.readLock) {
                while (!this.closed && this.nextRow == null) {
                    try {
                        this.nextRow = this.buffer.poll(500L, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        throw Throwables.propagate(e);
                    }
                }
                z = this.nextRow != null;
            }
            return z;
        }

        public InputRow nextRow() {
            InputRow inputRow;
            synchronized (this.readLock) {
                inputRow = this.nextRow;
                if (inputRow == null) {
                    throw new NoSuchElementException();
                }
                this.nextRow = null;
            }
            return inputRow;
        }

        public Runnable commit() {
            return new Runnable() { // from class: io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory.EventReceiverFirehose.1
                @Override // java.lang.Runnable
                public void run() {
                }
            };
        }

        public void close() throws IOException {
            EventReceiverFirehoseFactory.log.info("Firehose closing.", new Object[0]);
            this.closed = true;
            if (EventReceiverFirehoseFactory.this.chatHandlerProvider.isPresent()) {
                ((ChatHandlerProvider) EventReceiverFirehoseFactory.this.chatHandlerProvider.get()).unregister(EventReceiverFirehoseFactory.this.serviceName);
            }
        }
    }

    @JsonCreator
    public EventReceiverFirehoseFactory(@JsonProperty("serviceName") String str, @JsonProperty("bufferSize") Integer num, @JacksonInject ChatHandlerProvider chatHandlerProvider) {
        Preconditions.checkNotNull(str, "serviceName");
        this.serviceName = str;
        this.bufferSize = (num == null || num.intValue() <= 0) ? DEFAULT_BUFFER_SIZE : num.intValue();
        this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider);
    }

    public Firehose connect(MapInputRowParser mapInputRowParser) throws IOException {
        log.info("Connecting firehose: %s", new Object[]{this.serviceName});
        EventReceiverFirehose eventReceiverFirehose = new EventReceiverFirehose(mapInputRowParser);
        if (this.chatHandlerProvider.isPresent()) {
            log.info("Found chathandler of class[%s]", new Object[]{((ChatHandlerProvider) this.chatHandlerProvider.get()).getClass().getName()});
            ((ChatHandlerProvider) this.chatHandlerProvider.get()).register(this.serviceName, eventReceiverFirehose);
            if (this.serviceName.contains(":")) {
                ((ChatHandlerProvider) this.chatHandlerProvider.get()).register(this.serviceName.replaceAll(".*:", ""), eventReceiverFirehose);
            }
        } else {
            log.info("No chathandler detected", new Object[0]);
        }
        return eventReceiverFirehose;
    }

    @JsonProperty
    public String getServiceName() {
        return this.serviceName;
    }

    @JsonProperty
    public int getBufferSize() {
        return this.bufferSize;
    }
}
