/*
 * Decompiled with CFR 0.152.
 */
package ratpack.exec.internal;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import ratpack.exec.Downstream;
import ratpack.exec.ExecResult;
import ratpack.exec.Execution;
import ratpack.exec.Promise;
import ratpack.exec.Result;
import ratpack.exec.Upstream;
import ratpack.exec.internal.CompleteExecResult;
import ratpack.exec.internal.DefaultExecution;
import ratpack.exec.internal.ResultBackedExecResult;

public class CachingUpstream<T>
implements Upstream<T> {
    private Upstream<? extends T> upstream;
    private final AtomicBoolean fired = new AtomicBoolean();
    private final Queue<Downstream<? super ExecResult<? extends T>>> waiting = new ConcurrentLinkedQueue<Downstream<? super ExecResult<? extends T>>>();
    private final AtomicBoolean draining = new AtomicBoolean();
    private final AtomicReference<ExecResult<? extends T>> result = new AtomicReference();

    public CachingUpstream(Upstream<? extends T> upstream) {
        this.upstream = upstream;
    }

    private void tryDrain() {
        if (this.draining.compareAndSet(false, true)) {
            try {
                ExecResult<? extends T> result = this.result.get();
                Downstream<ExecResult<ExecResult<? extends T>>> downstream = this.waiting.poll();
                while (downstream != null) {
                    downstream.success(result);
                    downstream = this.waiting.poll();
                }
            }
            finally {
                this.draining.set(false);
            }
        }
        if (!this.draining.get() && !this.waiting.isEmpty()) {
            this.tryDrain();
        }
    }

    @Override
    public void connect(final Downstream<? super T> downstream) throws Exception {
        if (this.fired.compareAndSet(false, true)) {
            this.upstream.connect(new Downstream<T>(){

                @Override
                public void error(Throwable throwable) {
                    CachingUpstream.this.result.set(new ResultBackedExecResult(Result.error(throwable), Execution.current()));
                    CachingUpstream.this.doDrainInNewSegment();
                    downstream.error(throwable);
                }

                @Override
                public void success(T value) {
                    CachingUpstream.this.result.set(new ResultBackedExecResult(Result.success(value), Execution.current()));
                    CachingUpstream.this.doDrainInNewSegment();
                    downstream.success(value);
                }

                @Override
                public void complete() {
                    CachingUpstream.this.result.set(new CompleteExecResult(Execution.current()));
                    CachingUpstream.this.doDrainInNewSegment();
                    downstream.complete();
                }
            });
        } else {
            Promise.of(innerDownstream -> {
                ExecResult<? extends T> result = this.result.get();
                if (result == null) {
                    this.waiting.add(innerDownstream);
                } else {
                    innerDownstream.success(result);
                }
            }).then(result -> downstream.accept((ExecResult)result));
        }
    }

    private void doDrainInNewSegment() {
        this.upstream = null;
        DefaultExecution.require().getEventLoop().execute(this::tryDrain);
    }
}

