/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.resume;

import io.netty.buffer.ByteBuf;
import io.rsocket.resume.ResumableFramesStore;
import java.util.Queue;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.util.concurrent.Queues;

public class InMemoryResumableFramesStore
implements ResumableFramesStore {
    private static final Logger logger = LoggerFactory.getLogger(InMemoryResumableFramesStore.class);
    private static final long SAVE_REQUEST_SIZE = Long.MAX_VALUE;
    private final MonoProcessor<Void> disposed = MonoProcessor.create();
    volatile long position;
    volatile long impliedPosition;
    volatile int cacheSize;
    final Queue<ByteBuf> cachedFrames;
    private final String tag;
    private final int cacheLimit;
    private volatile int upstreamFrameRefCnt;

    public InMemoryResumableFramesStore(String tag, int cacheSizeBytes) {
        this.tag = tag;
        this.cacheLimit = cacheSizeBytes;
        this.cachedFrames = InMemoryResumableFramesStore.cachedFramesQueue(cacheSizeBytes);
    }

    @Override
    public Mono<Void> saveFrames(Flux<ByteBuf> frames) {
        MonoProcessor completed = MonoProcessor.create();
        frames.doFinally(s -> completed.onComplete()).subscribe((Subscriber)new FramesSubscriber(Long.MAX_VALUE));
        return completed;
    }

    @Override
    public void releaseFrames(long remoteImpliedPos) {
        long removeSize;
        ByteBuf cachedFrame;
        long pos = this.position;
        logger.debug("{} Removing frames for local: {}, remote implied: {}", new Object[]{this.tag, pos, remoteImpliedPos});
        for (removeSize = Math.max(0L, remoteImpliedPos - pos); removeSize > 0L && (cachedFrame = this.cachedFrames.poll()) != null; removeSize -= (long)this.releaseTailFrame(cachedFrame)) {
        }
        if (removeSize > 0L) {
            throw new IllegalStateException(String.format("Local and remote state disagreement: need to remove additional %d bytes, but cache is empty", removeSize));
        }
        if (removeSize < 0L) {
            throw new IllegalStateException("Local and remote state disagreement: local and remote frame sizes are not equal");
        }
        logger.debug("{} Removed frames. Current cache size: {}", (Object)this.tag, (Object)this.cacheSize);
    }

    @Override
    public Flux<ByteBuf> resumeStream() {
        return Flux.generate(() -> new ResumeStreamState(this.cachedFrames.size(), this.upstreamFrameRefCnt), (state, sink) -> {
            if (state.next()) {
                ByteBuf frame = this.cachedFrames.poll();
                if (state.shouldRetain(frame)) {
                    frame.retain();
                }
                this.cachedFrames.offer(frame);
                sink.next((Object)frame);
            } else {
                sink.complete();
                logger.debug("{} Resuming stream completed", (Object)this.tag);
            }
            return state;
        });
    }

    @Override
    public long framePosition() {
        return this.position;
    }

    @Override
    public long frameImpliedPosition() {
        return this.impliedPosition;
    }

    @Override
    public void resumableFrameReceived(ByteBuf frame) {
        this.impliedPosition += (long)frame.readableBytes();
    }

    @Override
    public Mono<Void> onClose() {
        return this.disposed;
    }

    public void dispose() {
        this.cacheSize = 0;
        ByteBuf frame = this.cachedFrames.poll();
        while (frame != null) {
            frame.release();
            frame = this.cachedFrames.poll();
        }
        this.disposed.onComplete();
    }

    public boolean isDisposed() {
        return this.disposed.isTerminated();
    }

    private int releaseTailFrame(ByteBuf content) {
        int frameSize = content.readableBytes();
        this.cacheSize -= frameSize;
        this.position += (long)frameSize;
        content.release();
        return frameSize;
    }

    void saveFrame(ByteBuf frame) {
        long availableSize;
        ByteBuf cachedFrame;
        if (this.upstreamFrameRefCnt == 0) {
            this.upstreamFrameRefCnt = frame.refCnt();
        }
        int frameSize = frame.readableBytes();
        for (availableSize = (long)(this.cacheLimit - this.cacheSize); availableSize < (long)frameSize && (cachedFrame = this.cachedFrames.poll()) != null; availableSize += (long)this.releaseTailFrame(cachedFrame)) {
        }
        if (availableSize >= (long)frameSize) {
            this.cachedFrames.offer(frame.retain());
            this.cacheSize += frameSize;
        } else {
            this.position += (long)frameSize;
        }
    }

    static Queue<ByteBuf> cachedFramesQueue(int size) {
        return (Queue)Queues.get((int)size).get();
    }

    class FramesSubscriber
    implements Subscriber<ByteBuf> {
        private final long firstRequestSize;
        private final long refillSize;
        private int received;
        private Subscription s;

        public FramesSubscriber(long requestSize) {
            this.firstRequestSize = requestSize;
            this.refillSize = this.firstRequestSize / 2L;
        }

        public void onSubscribe(Subscription s) {
            this.s = s;
            s.request(this.firstRequestSize);
        }

        public void onNext(ByteBuf byteBuf) {
            InMemoryResumableFramesStore.this.saveFrame(byteBuf);
            if (this.firstRequestSize != Long.MAX_VALUE && (long)(++this.received) == this.refillSize) {
                this.received = 0;
                this.s.request(this.refillSize);
            }
        }

        public void onError(Throwable t) {
            logger.info("unexpected onError signal: {}, {}", t.getClass(), (Object)t.getMessage());
        }

        public void onComplete() {
        }
    }

    static class ResumeStreamState {
        private final int cacheSize;
        private final int expectedRefCnt;
        private int cacheCounter;

        public ResumeStreamState(int cacheSize, int expectedRefCnt) {
            this.cacheSize = cacheSize;
            this.expectedRefCnt = expectedRefCnt;
        }

        public boolean next() {
            if (this.cacheCounter < this.cacheSize) {
                ++this.cacheCounter;
                return true;
            }
            return false;
        }

        public boolean shouldRetain(ByteBuf frame) {
            return frame.refCnt() == this.expectedRefCnt;
        }
    }
}

