package reactor.core.composable;

import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import reactor.core.Observable;
import reactor.core.Reactor;
import reactor.core.support.NotifyConsumer;
import reactor.event.Event;
import reactor.event.dispatch.Dispatcher;
import reactor.event.selector.Selector;
import reactor.event.selector.Selectors;
import reactor.event.support.EventConsumer;
import reactor.function.Consumer;
import reactor.function.Function;
import reactor.function.Predicate;
import reactor.tuple.Tuple2;
import reactor.util.Assert;

/* loaded from: input_file:reactor/core/composable/Composable.class */
public abstract class Composable<T> {
    protected final ReentrantLock lock = new ReentrantLock();
    private final Tuple2<Selector, Object> accept;
    private final Tuple2<Selector, Object> flush;
    private final Observable events;
    private final Composable<?> parent;
    private long acceptCount;
    private long errorCount;

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Multi-variable type inference failed */
    public <U> Composable(@Nonnull Dispatcher dispatcher, @Nullable Composable<U> composable) {
        Tuple2<Selector, Object> anonymous;
        Tuple2<Selector, Object> anonymous2;
        anonymous = Selectors.anonymous();
        this.accept = anonymous;
        anonymous2 = Selectors.anonymous();
        this.flush = anonymous2;
        this.acceptCount = 0L;
        this.errorCount = 0L;
        Assert.notNull(dispatcher, "'dispatcher' cannot be null.");
        this.events = new Reactor(dispatcher);
        this.parent = composable;
        if (composable != 0) {
            composable.cascadeErrors(this);
        }
    }

    public Composable<T> consume(@Nonnull final Composable<T> composable) {
        if (composable == this) {
            throw new IllegalArgumentException("Trying to consume itself, leading to erroneous recursive calls");
        }
        consumeEvent(new Consumer<Event<T>>() { // from class: reactor.core.composable.Composable.1
            @Override // reactor.function.Consumer
            public void accept(Event<T> event) {
                composable.notifyValue((Event) event);
            }
        });
        cascadeErrors(composable);
        return this;
    }

    public Composable<T> consume(@Nonnull Consumer<T> consumer) {
        this.events.on(this.accept.getT1(), new EventConsumer(consumer));
        return this;
    }

    public Composable<T> consumeEvent(@Nonnull Consumer<Event<T>> consumer) {
        this.events.on(this.accept.getT1(), consumer);
        return this;
    }

    public Composable<T> consume(@Nonnull Object obj, @Nonnull Observable observable) {
        consume(new NotifyConsumer(obj, observable));
        return this;
    }

    public <E extends Throwable> Composable<T> when(@Nonnull Class<E> cls, @Nonnull Consumer<E> consumer) {
        this.events.on(Selectors.T(cls), new EventConsumer(consumer));
        return this;
    }

    public <V> Composable<V> map(@Nonnull final Function<T, V> function) {
        Assert.notNull(function, "Map function cannot be null.");
        final Deferred<V, C> createDeferred = createDeferred();
        consumeEvent(new Consumer<Event<T>>() { // from class: reactor.core.composable.Composable.2
            /* JADX WARN: Multi-variable type inference failed */
            @Override // reactor.function.Consumer
            public void accept(Event<T> event) {
                try {
                    createDeferred.acceptEvent(event.copy(function.apply(event.getData())));
                } catch (Throwable th) {
                    createDeferred.accept(th);
                }
            }
        });
        return createDeferred.compose();
    }

    public Composable<T> filter(@Nonnull final Function<T, Boolean> function) {
        return filter(new Predicate<T>() { // from class: reactor.core.composable.Composable.3
            @Override // reactor.function.Predicate
            public boolean test(T t) {
                return ((Boolean) function.apply(t)).booleanValue();
            }
        }, null);
    }

    public Composable<T> filter(@Nonnull Predicate<T> predicate) {
        return filter(predicate, null);
    }

    public Composable<T> filter(@Nonnull final Predicate<T> predicate, final Composable<T> composable) {
        final Deferred<V, C> createDeferred = createDeferred();
        consumeEvent(new Consumer<Event<T>>() { // from class: reactor.core.composable.Composable.4
            @Override // reactor.function.Consumer
            public void accept(Event<T> event) {
                if (predicate.test(event.getData())) {
                    createDeferred.acceptEvent(event);
                } else if (null != composable) {
                    composable.notifyValue((Event) event);
                }
            }
        });
        return createDeferred.compose();
    }

    public long getAcceptCount() {
        this.lock.lock();
        try {
            long j = this.acceptCount;
            this.lock.unlock();
            return j;
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    public long getErrorCount() {
        this.lock.lock();
        try {
            long j = this.errorCount;
            this.lock.unlock();
            return j;
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    public Composable<T> flush() {
        if (null != this.parent) {
            this.parent.flush();
        }
        this.events.notify(this.flush.getT2(), new Event(null));
        return this;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void notifyValue(T t) {
        notifyValue((Event) Event.wrap(t));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void notifyValue(Event<T> event) {
        this.lock.lock();
        try {
            valueAccepted(event.getData());
            this.acceptCount++;
            this.lock.unlock();
            this.events.notify(this.accept.getT2(), event);
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void notifyError(Throwable th) {
        this.lock.lock();
        try {
            errorAccepted(th);
            this.errorCount++;
            this.lock.unlock();
            this.events.notify(th.getClass(), (Class<?>) Event.wrap(th));
        } catch (Throwable th2) {
            this.lock.unlock();
            throw th2;
        }
    }

    protected abstract <V, C extends Composable<V>> Deferred<V, C> createDeferred();

    protected abstract void errorAccepted(Throwable th);

    protected abstract void valueAccepted(T t);

    protected void cascadeErrors(final Composable<?> composable) {
        when(Throwable.class, new Consumer<Throwable>() { // from class: reactor.core.composable.Composable.5
            @Override // reactor.function.Consumer
            public void accept(Throwable th) {
                composable.notifyError(th);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Observable getObservable() {
        return this.events;
    }

    protected Tuple2<Selector, Object> getAccept() {
        return this.accept;
    }

    protected Composable<?> getParent() {
        return this.parent;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Tuple2<Selector, Object> getFlush() {
        return this.flush;
    }
}
