/*
 * Decompiled with CFR 0.152.
 */
package reactor.rx.action.error;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.bus.registry.Registration;
import reactor.core.Dispatcher;
import reactor.core.support.Assert;
import reactor.fn.Consumer;
import reactor.fn.timer.Timer;
import reactor.rx.action.error.FallbackAction;

public final class TimeoutAction<T>
extends FallbackAction<T> {
    private final Timer timer;
    private final long timeout;
    private final Consumer<Long> timeoutTask;
    private final Consumer<Void> timeoutRequest = new Consumer<Void>(){

        @Override
        public void accept(Void aVoid) {
            if (!TimeoutAction.this.timeoutRegistration.isCancelled()) {
                if (TimeoutAction.this.fallback != null) {
                    TimeoutAction.this.doSwitch();
                } else {
                    TimeoutAction.this.doError(new TimeoutException("No data signaled for " + TimeoutAction.this.timeout + "ms"));
                }
            }
        }
    };
    private volatile Registration<? extends Consumer<Long>> timeoutRegistration;

    public TimeoutAction(final Dispatcher dispatcher, Publisher<? extends T> fallback, Timer timer, long timeout) {
        super(fallback);
        Assert.state(timer != null, "Timer must be supplied");
        this.timeoutTask = new Consumer<Long>(){

            @Override
            public void accept(Long aLong) {
                if (TimeoutAction.this.timeoutRegistration.getObject() == this) {
                    dispatcher.dispatch(null, TimeoutAction.this.timeoutRequest, null);
                }
            }
        };
        this.timer = timer;
        this.timeout = timeout;
    }

    @Override
    protected void doSubscribe(Subscription subscription) {
        super.doSubscribe(subscription);
        this.timeoutRegistration = this.timer.submit(this.timeoutTask, this.timeout, TimeUnit.MILLISECONDS);
    }

    @Override
    protected void doNormalNext(T ev) {
        this.timeoutRegistration.cancel();
        this.broadcastNext(ev);
        this.timeoutRegistration = this.timer.submit(this.timeoutTask, this.timeout, TimeUnit.MILLISECONDS);
    }

    @Override
    public void cancel() {
        if (this.timeoutRegistration != null) {
            this.timeoutRegistration.cancel();
            this.timeoutRegistration = null;
        }
        super.cancel();
    }

    @Override
    public void doComplete() {
        if (this.timeoutRegistration != null) {
            this.timeoutRegistration.cancel();
            this.timeoutRegistration = null;
        }
        super.doComplete();
    }
}

