/*
 * Decompiled with CFR 0.152.
 */
package ratpack.exec.internal;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import ratpack.exec.Downstream;
import ratpack.exec.ExecResult;
import ratpack.exec.Execution;
import ratpack.exec.Result;
import ratpack.exec.Upstream;
import ratpack.exec.internal.CompleteExecResult;
import ratpack.exec.internal.ExecutionBacking;
import ratpack.exec.internal.ResultBackedExecResult;

public class CachingUpstream<T>
implements Upstream<T> {
    private Upstream<? extends T> upstream;
    private final AtomicBoolean fired = new AtomicBoolean();
    private final Queue<Job> waiting = new ConcurrentLinkedQueue<Job>();
    private final AtomicBoolean draining = new AtomicBoolean();
    private final AtomicReference<ExecResult<T>> result = new AtomicReference();

    public CachingUpstream(Upstream<? extends T> upstream) {
        this.upstream = upstream;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void tryDrain() {
        if (this.draining.compareAndSet(false, true)) {
            try {
                ExecResult result = this.result.get();
                Job job = this.waiting.poll();
                while (job != null) {
                    Job finalJob = job;
                    job.streamHandle.complete(() -> job.downstream.accept(result));
                    job = this.waiting.poll();
                }
            }
            finally {
                this.draining.set(false);
            }
        }
        if (!this.draining.get() && !this.waiting.isEmpty()) {
            this.tryDrain();
        }
    }

    @Override
    public void connect(final Downstream<? super T> downstream) throws Exception {
        if (this.fired.compareAndSet(false, true)) {
            this.upstream.connect(new Downstream<T>(){

                @Override
                public void error(Throwable throwable) {
                    CachingUpstream.this.result.set(new ResultBackedExecResult(Result.error(throwable), Execution.current()));
                    CachingUpstream.this.doDrainInNewSegment();
                    downstream.error(throwable);
                }

                @Override
                public void success(T value) {
                    CachingUpstream.this.result.set(new ResultBackedExecResult(Result.success(value), Execution.current()));
                    CachingUpstream.this.doDrainInNewSegment();
                    downstream.success(value);
                }

                @Override
                public void complete() {
                    CachingUpstream.this.result.set(new CompleteExecResult(Execution.current()));
                    CachingUpstream.this.doDrainInNewSegment();
                    downstream.complete();
                }
            });
        } else {
            ExecutionBacking.require().streamSubscribe(streamHandle -> {
                this.waiting.add(new Job(downstream, (ExecutionBacking.StreamHandle)streamHandle));
                if (this.result.get() != null) {
                    this.tryDrain();
                }
            });
        }
    }

    private void doDrainInNewSegment() {
        this.upstream = null;
        ExecutionBacking.require().getEventLoop().execute(this::tryDrain);
    }

    private class Job {
        final Downstream<? super T> downstream;
        final ExecutionBacking.StreamHandle streamHandle;

        private Job(Downstream<? super T> downstream, ExecutionBacking.StreamHandle streamHandle) {
            this.downstream = downstream;
            this.streamHandle = streamHandle;
        }
    }
}

