/*
 * Decompiled with CFR 0.152.
 */
package reactor.bus.routing;

import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.bus.Event;
import reactor.bus.filter.Filter;
import reactor.bus.registry.Registration;
import reactor.bus.routing.Router;
import reactor.core.support.Assert;
import reactor.core.support.Exceptions;
import reactor.fn.Consumer;
import reactor.fn.support.CancelConsumerException;

public class ConsumerFilteringRouter
implements Router {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    private final Filter filter;

    public ConsumerFilteringRouter(Filter filter) {
        Assert.notNull(filter, "filter must not be null");
        this.filter = filter;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public <E extends Event<?>> void route(Object key, E event, List<Registration<? extends Consumer<? extends Event<?>>>> consumers, Consumer<E> completionConsumer, Consumer<Throwable> errorConsumer) {
        if (null != consumers && !consumers.isEmpty()) {
            List<Registration<Consumer<Event<?>>>> regs = this.filter.filter(consumers, key);
            int size = regs.size();
            for (int i = 0; i < size; ++i) {
                Registration<Consumer<Event<?>>> reg = regs.get(i);
                if (null == reg || reg.isCancelled() || reg.isPaused()) continue;
                try {
                    reg.getObject().accept(event);
                    continue;
                }
                catch (CancelConsumerException cancel) {
                    reg.cancel();
                    continue;
                }
                catch (Throwable t) {
                    if (null != errorConsumer) {
                        errorConsumer.accept(Exceptions.addValueAsLastCause(t, event));
                        continue;
                    }
                    this.logger.error("Event routing failed for {}: {}", new Object[]{reg.getObject(), t.getMessage(), t});
                    if (RuntimeException.class.isInstance(t)) {
                        throw (RuntimeException)t;
                    }
                    throw new IllegalStateException(t);
                }
                finally {
                    if (reg.isCancelAfterUse()) {
                        reg.cancel();
                    }
                }
            }
        }
        if (null != completionConsumer) {
            try {
                completionConsumer.accept(event);
            }
            catch (Throwable t) {
                if (null != errorConsumer) {
                    errorConsumer.accept(Exceptions.addValueAsLastCause(t, event));
                }
                this.logger.error("Completion Consumer {} failed: {}", new Object[]{completionConsumer, t.getMessage(), t});
            }
        }
    }

    public Filter getFilter() {
        return this.filter;
    }
}

