package com.netflix.eventbus.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.netflix.config.DynamicIntProperty;
import com.netflix.config.DynamicPropertyFactory;
import com.netflix.eventbus.impl.EventBusImpl;
import com.netflix.eventbus.spi.EventBus;
import com.netflix.eventbus.spi.EventFilter;
import com.netflix.eventbus.spi.Subscribe;
import com.netflix.eventbus.spi.SubscriberConfigProvider;
import com.netflix.eventbus.spi.SyncSubscribersGatekeeper;
import com.netflix.eventbus.utils.EventBusUtils;
import com.netflix.servo.monitor.Stopwatch;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/netflix/eventbus/impl/EventConsumer.class */
public class EventConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(EventConsumer.class);
    private static final DynamicIntProperty maxRetriesOnQueueFull = DynamicPropertyFactory.getInstance().getIntProperty(EventBus.CONSUMER_QUEUE_FULL_RETRY_MAX_PROP_NAME, 5);
    private static final AtomicLong threadIdCounter = new AtomicLong();
    private Class<?> targetEventClass;
    private final Method delegateSubscriber;
    private final Object subscriberClassInstance;
    private final CopyOnWriteArraySet<EventFilter> filters;
    private final EventBusImpl.ConsumerQueueSupplier.ConsumerQueue eventQueue;
    private final ExecutorService executor;
    private final Subscribe.BatchingStrategy batchingStrategy;
    private final EventConsumerStats stats;
    private final SubscriberConfigProvider.SubscriberConfig subscriberConfig;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/netflix/eventbus/impl/EventConsumer$BatchDecorator.class */
    public class BatchDecorator implements Iterable {
        private final EventBatch batch;

        /* loaded from: input_file:com/netflix/eventbus/impl/EventConsumer$BatchDecorator$BatchIterator.class */
        private class BatchIterator implements Iterator {
            private final PeekingIterator delegatePeekingIterator;

            BatchIterator(EventBatch eventBatch) {
                this.delegatePeekingIterator = Iterators.peekingIterator(eventBatch.iterator());
                _ensureNextEventIsConsumable();
            }

            @Override // java.util.Iterator
            public boolean hasNext() {
                return this.delegatePeekingIterator.hasNext();
            }

            @Override // java.util.Iterator
            public Object next() {
                Object next = this.delegatePeekingIterator.next();
                _ensureNextEventIsConsumable();
                return next;
            }

            @Override // java.util.Iterator
            public void remove() {
                throw new UnsupportedOperationException("Event batch iterator does not support remove.");
            }

            private void _ensureNextEventIsConsumable() {
                if (!this.delegatePeekingIterator.hasNext() || EventBusUtils.applyFilters(this.delegatePeekingIterator.peek(), EventConsumer.this.filters, EventConsumer.this.stats.filterStats, "subscriber: " + EventConsumer.this.delegateSubscriber.toGenericString(), EventConsumer.LOGGER)) {
                    return;
                }
                this.delegatePeekingIterator.next();
                this.delegatePeekingIterator.remove();
                _ensureNextEventIsConsumable();
            }
        }

        BatchDecorator(EventBatch eventBatch) {
            this.batch = eventBatch;
        }

        @Override // java.lang.Iterable
        public Iterator iterator() {
            return new BatchIterator(this.batch);
        }
    }

    /* loaded from: input_file:com/netflix/eventbus/impl/EventConsumer$EventPoller.class */
    private class EventPoller implements Runnable {
        private EventPoller() {
        }

        @Override // java.lang.Runnable
        public void run() {
            EventConsumer.LOGGER.info("Event consumer: " + EventConsumer.this.delegateSubscriber.toGenericString() + " started.");
            boolean z = false;
            while (!z) {
                try {
                    try {
                        Object blockingTake = EventConsumer.this.eventQueue.blockingTake();
                        if (null != blockingTake) {
                            EventConsumer.this.processEvent(blockingTake);
                        }
                    } catch (InterruptedException e) {
                        EventConsumer.LOGGER.info("Event consumer: " + EventConsumer.this.delegateSubscriber.toGenericString() + " interrupted. Can be the result of a stop call, if so, you will see a 'consumer stopped' log.");
                        z = true;
                    }
                } catch (Throwable th) {
                    EventConsumer.LOGGER.info("Event consumer: " + EventConsumer.this.delegateSubscriber.toGenericString() + " stopped.");
                    throw th;
                }
            }
            EventConsumer.LOGGER.info("Event consumer: " + EventConsumer.this.delegateSubscriber.toGenericString() + " stopped.");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public EventConsumer(Method method, Object obj, @Nullable EventFilter eventFilter, Class<?> cls, EventBusImpl.ConsumerQueueSupplier consumerQueueSupplier) {
        Preconditions.checkArgument(method.getDeclaringClass() == obj.getClass(), "The subscriber method does not belong to the subscriber class.");
        this.delegateSubscriber = method;
        this.subscriberClassInstance = obj;
        this.targetEventClass = cls;
        String join = Joiner.on("_").join(obj.getClass().getName(), this.delegateSubscriber.getName(), new Object[]{this.targetEventClass.getName()});
        this.stats = new EventConsumerStats(join, EventBusImpl.STATS_COLLECTION_DURATION_MILLIS.get());
        this.subscriberConfig = EventBusUtils.getSubscriberConfig(method, obj);
        this.batchingStrategy = this.subscriberConfig.getBatchingStrategy();
        this.eventQueue = consumerQueueSupplier.get(this.delegateSubscriber, this.subscriberConfig, this.stats.QUEUE_SIZE_COUNTER);
        if (null != eventFilter) {
            this.filters = new CopyOnWriteArraySet<>(Arrays.asList(eventFilter));
        } else {
            this.filters = new CopyOnWriteArraySet<>();
        }
        this.executor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat(join + "-" + threadIdCounter.incrementAndGet()).build());
        this.executor.execute(new EventPoller());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void enqueue(Object obj) {
        if (SyncSubscribersGatekeeper.isSyncSubscriber(this.subscriberConfig, obj.getClass(), this.delegateSubscriber.getClass())) {
            LOGGER.debug(String.format("Sending a sync event to subscriber: %s. Set the property %s to false to disable sync consumption.", this.delegateSubscriber.toGenericString(), SyncSubscribersGatekeeper.ALLOW_SYNC_SUBSCRIBERS));
            processEvent(obj);
            return;
        }
        Stopwatch start = this.stats.enqueueStats.start();
        try {
            int i = 0;
            int i2 = maxRetriesOnQueueFull.get();
            while (!this.eventQueue.offer(obj)) {
                int i3 = i;
                i++;
                if (i3 >= i2) {
                    break;
                }
                this.stats.QUEUE_OFFER_RETRY_COUNTER.increment();
                this.eventQueue.nonBlockingTake();
                Logger logger = LOGGER;
                Object[] objArr = new Object[2];
                objArr[0] = this.delegateSubscriber.toGenericString();
                objArr[1] = Subscribe.BatchingStrategy.None == this.batchingStrategy ? "event" : "batch";
                logger.info(String.format("Subscriber: %s queue full, rejected one %s as a result of retries.", objArr));
            }
            if (0 != i) {
                Logger logger2 = LOGGER;
                Object[] objArr2 = new Object[3];
                objArr2[0] = this.delegateSubscriber.toGenericString();
                objArr2[1] = i >= i2 ? "rejected" : "accepted";
                objArr2[2] = Integer.valueOf(i - 1);
                logger2.info(String.format("Subscriber: %s %s one event after %s retries.", objArr2));
                if (i >= i2) {
                    this.stats.EVENT_ENQUEUE_REJECTED_COUNTER.increment();
                }
            }
        } finally {
            start.stop();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addFilters(EventFilter... eventFilterArr) {
        this.filters.addAll(Arrays.asList(eventFilterArr));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeFilters(EventFilter... eventFilterArr) {
        this.filters.removeAll(Arrays.asList(eventFilterArr));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void clearFilters() {
        this.filters.clear();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void shutdown() {
        this.executor.shutdownNow();
        this.eventQueue.clear();
        this.filters.clear();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Method getDelegateSubscriber() {
        return this.delegateSubscriber;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Object getContainerInstance() {
        return this.subscriberClassInstance;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Class<?> getTargetEventClass() {
        return this.targetEventClass;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<EventFilter> getAttachedFilters() {
        return this.filters;
    }

    @VisibleForTesting
    EventConsumerStats getStats() {
        return this.stats;
    }

    @VisibleForTesting
    SubscriberConfigProvider.SubscriberConfig getSubscriberConfig() {
        return this.subscriberConfig;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processEvent(Object obj) {
        Stopwatch start = this.stats.consumptionStats.start();
        Object wrapIfBatched = wrapIfBatched(obj);
        if (applyFilters(wrapIfBatched)) {
            try {
                try {
                    this.delegateSubscriber.invoke(this.subscriberClassInstance, wrapIfBatched);
                    start.stop();
                } catch (Exception e) {
                    LOGGER.error("Failed to dispatch event: " + wrapIfBatched + " to subscriber class: " + this.subscriberClassInstance.getClass() + " and method: " + this.delegateSubscriber.toGenericString() + ". Ignoring the event.", e);
                    start.stop();
                }
            } catch (Throwable th) {
                start.stop();
                throw th;
            }
        }
    }

    private boolean applyFilters(Object obj) {
        if (EventBusUtils.isAnEventBatch(obj)) {
            return true;
        }
        return EventBusUtils.applyFilters(obj, this.filters, this.stats.filterStats, "subscriber: " + this.delegateSubscriber.toGenericString(), LOGGER);
    }

    private Object wrapIfBatched(Object obj) {
        return EventBusUtils.isAnEventBatch(obj) ? new BatchDecorator((EventBatch) obj) : obj;
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null || getClass() != obj.getClass()) {
            return false;
        }
        EventConsumer eventConsumer = (EventConsumer) obj;
        if (this.delegateSubscriber != null) {
            if (!this.delegateSubscriber.equals(eventConsumer.delegateSubscriber)) {
                return false;
            }
        } else if (eventConsumer.delegateSubscriber != null) {
            return false;
        }
        if (this.filters != null) {
            if (!this.filters.equals(eventConsumer.filters)) {
                return false;
            }
        } else if (eventConsumer.filters != null) {
            return false;
        }
        return this.subscriberClassInstance != null ? this.subscriberClassInstance.equals(eventConsumer.subscriberClassInstance) : eventConsumer.subscriberClassInstance == null;
    }

    public int hashCode() {
        return (31 * ((31 * (this.delegateSubscriber != null ? this.delegateSubscriber.hashCode() : 0)) + (this.subscriberClassInstance != null ? this.subscriberClassInstance.hashCode() : 0))) + (this.filters != null ? this.filters.hashCode() : 0);
    }
}
