package io.rsocket.resume;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.CharsetUtil;
import io.rsocket.DuplexConnection;
import io.rsocket.exceptions.ConnectionErrorException;
import io.rsocket.exceptions.RejectedResumeException;
import io.rsocket.frame.ResumeFrameCodec;
import io.rsocket.frame.ResumeOkFrameCodec;
import io.rsocket.keepalive.KeepAliveSupport;
import java.time.Duration;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.util.concurrent.Queues;

/* loaded from: input_file:io/rsocket/resume/ServerRSocketSession.class */
public class ServerRSocketSession implements RSocketSession, ResumeStateHolder, CoreSubscriber<Long> {
    final ResumableDuplexConnection resumableConnection;
    final Duration resumeSessionDuration;
    final ResumableFramesStore resumableFramesStore;
    final String session;
    final ByteBufAllocator allocator;
    final boolean cleanupStoreOnKeepAlive;
    final Queue<Runnable> connectionsQueue = (Queue) Queues.unboundedMultiproducer().get();
    volatile int wip;
    volatile Subscription s;
    KeepAliveSupport keepAliveSupport;
    private static final Logger logger = LoggerFactory.getLogger(ServerRSocketSession.class);
    static final AtomicIntegerFieldUpdater<ServerRSocketSession> WIP = AtomicIntegerFieldUpdater.newUpdater(ServerRSocketSession.class, "wip");
    static final AtomicReferenceFieldUpdater<ServerRSocketSession, Subscription> S = AtomicReferenceFieldUpdater.newUpdater(ServerRSocketSession.class, Subscription.class, "s");

    public ServerRSocketSession(ByteBuf byteBuf, ResumableDuplexConnection resumableDuplexConnection, DuplexConnection duplexConnection, ResumableFramesStore resumableFramesStore, Duration duration, boolean z) {
        this.session = byteBuf.toString(CharsetUtil.UTF_8);
        this.allocator = duplexConnection.alloc();
        this.resumeSessionDuration = duration;
        this.resumableFramesStore = resumableFramesStore;
        this.cleanupStoreOnKeepAlive = z;
        this.resumableConnection = resumableDuplexConnection;
        WIP.lazySet(this, 1);
        resumableDuplexConnection.onClose().doFinally(signalType -> {
            dispose();
        }).subscribe();
        resumableDuplexConnection.onActiveConnectionClosed().subscribe(num -> {
            tryTimeoutSession();
        });
    }

    void tryTimeoutSession() {
        Runnable poll;
        this.keepAliveSupport.stop();
        if (logger.isDebugEnabled()) {
            logger.debug("Side[server]|Session[{}]. Connection is lost. Trying to timeout the active session", this.session);
        }
        Mono.delay(this.resumeSessionDuration).subscribe(this);
        if (WIP.decrementAndGet(this) == 0 || (poll = this.connectionsQueue.poll()) == null) {
            return;
        }
        poll.run();
    }

    public void resumeWith(ByteBuf byteBuf, DuplexConnection duplexConnection) {
        Runnable poll;
        if (logger.isDebugEnabled()) {
            logger.debug("Side[server]|Session[{}]. New DuplexConnection received.", this.session);
        }
        long firstAvailableClientPos = ResumeFrameCodec.firstAvailableClientPos(byteBuf);
        long lastReceivedServerPos = ResumeFrameCodec.lastReceivedServerPos(byteBuf);
        this.connectionsQueue.offer(() -> {
            doResume(firstAvailableClientPos, lastReceivedServerPos, duplexConnection);
        });
        if (WIP.getAndIncrement(this) == 0 && (poll = this.connectionsQueue.poll()) != null) {
            poll.run();
        }
    }

    void doResume(long j, long j2, DuplexConnection duplexConnection) {
        if (!tryCancelSessionTimeout()) {
            if (logger.isDebugEnabled()) {
                logger.debug("Side[server]|Session[{}]. Session has already been expired. Terminating received connection", this.session);
            }
            duplexConnection.sendErrorAndClose(new RejectedResumeException("resume_internal_error: Session Expired"));
            duplexConnection.receive().subscribe();
            return;
        }
        long frameImpliedPosition = this.resumableFramesStore.frameImpliedPosition();
        long framePosition = this.resumableFramesStore.framePosition();
        if (logger.isDebugEnabled()) {
            logger.debug("Side[server]|Session[{}]. Resume FRAME received. ServerResumeState[impliedPosition[{}], position[{}]]. ClientResumeState[remoteImpliedPosition[{}], remotePosition[{}]]", new Object[]{this.session, Long.valueOf(frameImpliedPosition), Long.valueOf(framePosition), Long.valueOf(j2), Long.valueOf(j)});
        }
        if (j > frameImpliedPosition || framePosition > j2) {
            if (logger.isDebugEnabled()) {
                logger.debug("Side[server]|Session[{}]. Mismatching remote and local state. Expected RemoteImpliedPosition[{}] to be greater or equal to the LocalPosition[{}] and RemotePosition[{}] to be less or equal to LocalImpliedPosition[{}]. Terminating received connection", new Object[]{this.session, Long.valueOf(j2), Long.valueOf(framePosition), Long.valueOf(j), Long.valueOf(frameImpliedPosition)});
            }
            dispose();
            duplexConnection.sendErrorAndClose(new RejectedResumeException(String.format("resumption_pos=[ remote: { pos: %d, impliedPos: %d }, local: { pos: %d, impliedPos: %d }]", Long.valueOf(j), Long.valueOf(j2), Long.valueOf(framePosition), Long.valueOf(frameImpliedPosition))));
            duplexConnection.receive().subscribe();
            return;
        }
        if (framePosition != j2) {
            try {
                this.resumableFramesStore.releaseFrames(j2);
            } catch (Throwable th) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Side[server]|Session[{}]. Exception occurred while releasing frames in the frameStore", this.session, th);
                }
                dispose();
                duplexConnection.sendErrorAndClose(new RejectedResumeException(th.getMessage(), th));
                duplexConnection.receive().subscribe();
                return;
            }
        }
        duplexConnection.sendFrame(0, ResumeOkFrameCodec.encode(this.allocator, frameImpliedPosition));
        if (logger.isDebugEnabled()) {
            logger.debug("Side[server]|Session[{}]. ResumeOKFrame[impliedPosition[{}]] has been sent", this.session, Long.valueOf(frameImpliedPosition));
        }
        this.keepAliveSupport.start();
        if (logger.isDebugEnabled()) {
            logger.debug("Side[server]|Session[{}]. Session has been resumed successfully", this.session);
        }
        if (this.resumableConnection.connect(duplexConnection)) {
            return;
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Side[server]|Session[{}]. Session has already been expired. Terminating received connection", this.session);
        }
        duplexConnection.sendErrorAndClose(new RejectedResumeException("resume_internal_error: Session Expired"));
        duplexConnection.receive().subscribe();
    }

    boolean tryCancelSessionTimeout() {
        Subscription subscription;
        do {
            subscription = this.s;
            if (subscription == Operators.cancelledSubscription()) {
                return false;
            }
        } while (!S.compareAndSet(this, subscription, null));
        subscription.cancel();
        return true;
    }

    @Override // io.rsocket.resume.ResumeStateHolder
    public long impliedPosition() {
        return this.resumableFramesStore.frameImpliedPosition();
    }

    @Override // io.rsocket.resume.ResumeStateHolder
    public void onImpliedPosition(long j) {
        if (this.cleanupStoreOnKeepAlive) {
            try {
                this.resumableFramesStore.releaseFrames(j);
            } catch (Throwable th) {
                this.resumableConnection.sendErrorAndClose(new ConnectionErrorException(th.getMessage(), th));
            }
        }
    }

    public void onSubscribe(Subscription subscription) {
        if (Operators.setOnce(S, this, subscription)) {
            subscription.request(Long.MAX_VALUE);
        }
    }

    public void onNext(Long l) {
        if (Operators.terminate(S, this)) {
            this.resumableConnection.dispose();
        }
    }

    public void onComplete() {
    }

    public void onError(Throwable th) {
    }

    @Override // io.rsocket.resume.RSocketSession
    public void setKeepAliveSupport(KeepAliveSupport keepAliveSupport) {
        this.keepAliveSupport = keepAliveSupport;
    }

    public void dispose() {
        Operators.terminate(S, this);
        this.resumableConnection.dispose();
    }

    public boolean isDisposed() {
        return this.resumableConnection.isDisposed();
    }
}
