/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.resume;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.CharsetUtil;
import io.rsocket.DuplexConnection;
import io.rsocket.exceptions.ConnectionErrorException;
import io.rsocket.exceptions.Exceptions;
import io.rsocket.exceptions.RejectedResumeException;
import io.rsocket.frame.FrameHeaderCodec;
import io.rsocket.frame.FrameType;
import io.rsocket.frame.ResumeFrameCodec;
import io.rsocket.frame.ResumeOkFrameCodec;
import io.rsocket.keepalive.KeepAliveSupport;
import io.rsocket.resume.RSocketSession;
import io.rsocket.resume.ResumableDuplexConnection;
import io.rsocket.resume.ResumableFramesStore;
import io.rsocket.resume.ResumeStateHolder;
import java.time.Duration;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.util.context.Context;
import reactor.util.function.Tuple2;
import reactor.util.retry.Retry;

public class ClientRSocketSession
implements RSocketSession,
ResumeStateHolder,
CoreSubscriber<Tuple2<ByteBuf, DuplexConnection>> {
    private static final Logger logger = LoggerFactory.getLogger(ClientRSocketSession.class);
    final ResumableDuplexConnection resumableConnection;
    final Mono<Tuple2<ByteBuf, DuplexConnection>> connectionFactory;
    final ResumableFramesStore resumableFramesStore;
    final ByteBufAllocator allocator;
    final Duration resumeSessionDuration;
    final Retry retry;
    final boolean cleanupStoreOnKeepAlive;
    final ByteBuf resumeToken;
    final String session;
    final Disposable reconnectDisposable;
    volatile Subscription s;
    static final AtomicReferenceFieldUpdater<ClientRSocketSession, Subscription> S = AtomicReferenceFieldUpdater.newUpdater(ClientRSocketSession.class, Subscription.class, "s");
    KeepAliveSupport keepAliveSupport;

    public ClientRSocketSession(ByteBuf resumeToken, ResumableDuplexConnection resumableDuplexConnection, Mono<DuplexConnection> connectionFactory, Function<DuplexConnection, Mono<Tuple2<ByteBuf, DuplexConnection>>> connectionTransformer, ResumableFramesStore resumableFramesStore, Duration resumeSessionDuration, Retry retry, boolean cleanupStoreOnKeepAlive) {
        this.resumeToken = resumeToken;
        this.session = resumeToken.toString(CharsetUtil.UTF_8);
        this.connectionFactory = connectionFactory.flatMap(dc -> {
            long impliedPosition = resumableFramesStore.frameImpliedPosition();
            long position = resumableFramesStore.framePosition();
            dc.sendFrame(0, ResumeFrameCodec.encode(dc.alloc(), resumeToken.retain(), impliedPosition, position));
            if (logger.isDebugEnabled()) {
                logger.debug("Side[client]|Session[{}]. ResumeFrame[impliedPosition[{}], position[{}]] has been sent.", new Object[]{this.session, impliedPosition, position});
            }
            return (Mono)connectionTransformer.apply((DuplexConnection)dc);
        });
        this.resumableFramesStore = resumableFramesStore;
        this.allocator = resumableDuplexConnection.alloc();
        this.resumeSessionDuration = resumeSessionDuration;
        this.retry = retry;
        this.cleanupStoreOnKeepAlive = cleanupStoreOnKeepAlive;
        this.resumableConnection = resumableDuplexConnection;
        resumableDuplexConnection.onClose().doFinally(__ -> this.dispose()).subscribe();
        this.reconnectDisposable = resumableDuplexConnection.onActiveConnectionClosed().subscribe(this::reconnect);
    }

    void reconnect(int index) {
        if (this.s == Operators.cancelledSubscription()) {
            if (logger.isDebugEnabled()) {
                logger.debug("Side[client]|Session[{}]. Connection[{}] is lost. Reconnecting rejected since session is closed", (Object)this.session, (Object)index);
            }
            return;
        }
        this.keepAliveSupport.stop();
        if (logger.isDebugEnabled()) {
            logger.debug("Side[client]|Session[{}]. Connection[{}] is lost. Reconnecting to resume...", (Object)this.session, (Object)index);
        }
        this.connectionFactory.doOnNext(this::tryReestablishSession).retryWhen(this.retry).timeout(this.resumeSessionDuration).subscribe((CoreSubscriber)this);
    }

    @Override
    public long impliedPosition() {
        return this.resumableFramesStore.frameImpliedPosition();
    }

    @Override
    public void onImpliedPosition(long remoteImpliedPos) {
        if (this.cleanupStoreOnKeepAlive) {
            try {
                this.resumableFramesStore.releaseFrames(remoteImpliedPos);
            }
            catch (Throwable e) {
                this.resumableConnection.sendErrorAndClose(new ConnectionErrorException(e.getMessage(), e));
            }
        }
    }

    public void dispose() {
        Operators.terminate(S, (Object)this);
        this.reconnectDisposable.dispose();
        this.resumableConnection.dispose();
        this.resumableFramesStore.dispose();
        if (this.resumeToken.refCnt() > 0) {
            this.resumeToken.release();
        }
    }

    public boolean isDisposed() {
        return this.resumableConnection.isDisposed();
    }

    void tryReestablishSession(Tuple2<ByteBuf, DuplexConnection> tuple2) {
        ByteBuf shouldBeResumeOKFrame = (ByteBuf)tuple2.getT1();
        DuplexConnection nextDuplexConnection = (DuplexConnection)tuple2.getT2();
        int streamId = FrameHeaderCodec.streamId(shouldBeResumeOKFrame);
        if (streamId != 0) {
            if (logger.isDebugEnabled()) {
                logger.debug("Side[client]|Session[{}]. Illegal first frame received. RESUME_OK frame must be received before any others. Terminating received connection", (Object)this.session);
            }
            ConnectionErrorException connectionErrorException = new ConnectionErrorException("RESUME_OK frame must be received before any others");
            this.resumableConnection.dispose(connectionErrorException);
            nextDuplexConnection.sendErrorAndClose(connectionErrorException);
            nextDuplexConnection.receive().subscribe().dispose();
            throw connectionErrorException;
        }
        FrameType frameType = FrameHeaderCodec.nativeFrameType(shouldBeResumeOKFrame);
        if (frameType == FrameType.RESUME_OK) {
            long remoteImpliedPos = ResumeOkFrameCodec.lastReceivedClientPos(shouldBeResumeOKFrame);
            long position = this.resumableFramesStore.framePosition();
            long impliedPosition = this.resumableFramesStore.frameImpliedPosition();
            if (logger.isDebugEnabled()) {
                logger.debug("Side[client]|Session[{}]. ResumeOK FRAME received. ServerResumeState[remoteImpliedPosition[{}]]. ClientResumeState[impliedPosition[{}], position[{}]]", new Object[]{this.session, remoteImpliedPos, impliedPosition, position});
            }
            if (position <= remoteImpliedPos) {
                try {
                    if (position != remoteImpliedPos) {
                        this.resumableFramesStore.releaseFrames(remoteImpliedPos);
                    }
                }
                catch (IllegalStateException e) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Side[client]|Session[{}]. Exception occurred while releasing frames in the frameStore", (Object)this.session, (Object)e);
                    }
                    ConnectionErrorException t = new ConnectionErrorException(e.getMessage(), e);
                    this.resumableConnection.dispose(t);
                    nextDuplexConnection.sendErrorAndClose(t);
                    nextDuplexConnection.receive().subscribe().dispose();
                    return;
                }
                if (!this.tryCancelSessionTimeout()) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Side[client]|Session[{}]. Session has already been expired. Terminating received connection", (Object)this.session);
                    }
                    ConnectionErrorException connectionErrorException = new ConnectionErrorException("resumption_server=[Session Expired]");
                    nextDuplexConnection.sendErrorAndClose(connectionErrorException);
                    nextDuplexConnection.receive().subscribe().dispose();
                    return;
                }
                this.keepAliveSupport.start();
                if (logger.isDebugEnabled()) {
                    logger.debug("Side[client]|Session[{}]. Session has been resumed successfully", (Object)this.session);
                }
                if (!this.resumableConnection.connect(nextDuplexConnection)) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Side[client]|Session[{}]. Session has already been expired. Terminating received connection", (Object)this.session);
                    }
                    ConnectionErrorException connectionErrorException = new ConnectionErrorException("resumption_server_pos=[Session Expired]");
                    nextDuplexConnection.sendErrorAndClose(connectionErrorException);
                    nextDuplexConnection.receive().subscribe().dispose();
                }
            } else {
                if (logger.isDebugEnabled()) {
                    logger.debug("Side[client]|Session[{}]. Mismatching remote and local state. Expected RemoteImpliedPosition[{}] to be greater or equal to the LocalPosition[{}]. Terminating received connection", new Object[]{this.session, remoteImpliedPos, position});
                }
                ConnectionErrorException connectionErrorException = new ConnectionErrorException("resumption_server_pos=[" + remoteImpliedPos + "]");
                this.resumableConnection.dispose(connectionErrorException);
                nextDuplexConnection.sendErrorAndClose(connectionErrorException);
                nextDuplexConnection.receive().subscribe().dispose();
            }
        } else {
            if (frameType == FrameType.ERROR) {
                RuntimeException exception = Exceptions.from(0, shouldBeResumeOKFrame);
                if (logger.isDebugEnabled()) {
                    logger.debug("Side[client]|Session[{}]. Received error frame. Terminating received connection", (Object)this.session, (Object)exception);
                }
                if (exception instanceof RejectedResumeException) {
                    this.resumableConnection.dispose(exception);
                    nextDuplexConnection.dispose();
                    nextDuplexConnection.receive().subscribe().dispose();
                    return;
                }
                nextDuplexConnection.dispose();
                throw exception;
            }
            if (logger.isDebugEnabled()) {
                logger.debug("Side[client]|Session[{}]. Illegal first frame received. RESUME_OK frame must be received before any others. Terminating received connection", (Object)this.session);
            }
            ConnectionErrorException connectionErrorException = new ConnectionErrorException("RESUME_OK frame must be received before any others");
            this.resumableConnection.dispose(connectionErrorException);
            nextDuplexConnection.sendErrorAndClose(connectionErrorException);
            nextDuplexConnection.receive().subscribe().dispose();
        }
    }

    boolean tryCancelSessionTimeout() {
        Subscription subscription;
        do {
            if ((subscription = this.s) != Operators.cancelledSubscription()) continue;
            return false;
        } while (!S.compareAndSet(this, subscription, null));
        subscription.cancel();
        return true;
    }

    public void onSubscribe(Subscription s) {
        if (Operators.setOnce(S, (Object)this, (Subscription)s)) {
            s.request(Long.MAX_VALUE);
        }
    }

    public void onNext(Tuple2<ByteBuf, DuplexConnection> objects) {
    }

    public void onError(Throwable t) {
        if (!Operators.terminate(S, (Object)this)) {
            Operators.onErrorDropped((Throwable)t, (Context)this.currentContext());
        }
        if (t instanceof TimeoutException) {
            this.resumableConnection.dispose();
        } else {
            this.resumableConnection.dispose(t);
        }
    }

    public void onComplete() {
    }

    @Override
    public void setKeepAliveSupport(KeepAliveSupport keepAliveSupport) {
        this.keepAliveSupport = keepAliveSupport;
    }
}

