package com.tencent.trpc.proto.standard.stream;

import com.tencent.trpc.core.logger.Logger;
import com.tencent.trpc.core.logger.LoggerFactory;
import com.tencent.trpc.core.stream.transport.RpcConnection;
import com.tencent.trpc.core.utils.PreconditionUtils;
import com.tencent.trpc.core.worker.spi.WorkerPool;
import com.tencent.trpc.proto.standard.common.TRPCProtocol;
import com.tencent.trpc.proto.standard.stream.codec.TRpcStreamFrameCodec;
import com.tencent.trpc.proto.standard.stream.config.TRpcStreamConstants;
import io.netty.buffer.ByteBuf;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.ThreadSafe;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;

@ThreadSafe
/* loaded from: input_file:com/tencent/trpc/proto/standard/stream/StreamRemoteSubscriber.class */
public class StreamRemoteSubscriber<T> extends BaseSubscriber<T> {
    private final WorkerPool workerPool;
    private final RpcConnection connection;
    private final TRpcStreamFrameCodec frameCodec;
    private final int streamId;
    private final int initialWindowSize;
    private volatile int windowSize;
    private final boolean noFlowControl;
    private static final Logger logger = LoggerFactory.getLogger(StreamRemoteSubscriber.class);
    private static final AtomicIntegerFieldUpdater<StreamRemoteSubscriber> WINDOW_SIZE = AtomicIntegerFieldUpdater.newUpdater(StreamRemoteSubscriber.class, "windowSize");

    public StreamRemoteSubscriber(WorkerPool workerPool, RpcConnection rpcConnection, TRpcStreamFrameCodec tRpcStreamFrameCodec, int i, int i2) {
        this.workerPool = (WorkerPool) Objects.requireNonNull(workerPool, "workerPool is null");
        this.connection = (RpcConnection) Objects.requireNonNull(rpcConnection, "connection is null");
        this.frameCodec = (TRpcStreamFrameCodec) Objects.requireNonNull(tRpcStreamFrameCodec, "frameCodec is null");
        PreconditionUtils.checkArgument(i2 >= 0 && i >= 0, "create remote flow subscriber failed, windowSize=%d, streamId=%d", new Object[]{Integer.valueOf(i2), Integer.valueOf(i)});
        this.streamId = i;
        this.initialWindowSize = i2;
        this.windowSize = i2;
        this.noFlowControl = i2 == 0;
    }

    protected void hookOnSubscribe(@Nonnull Subscription subscription) {
        if (this.noFlowControl) {
            subscription.request(Long.MAX_VALUE);
        } else {
            subscription.request(1L);
        }
    }

    protected void hookOnNext(@Nonnull T t) {
        ByteBuf encodeStreamDataFrame = this.frameCodec.encodeStreamDataFrame(this.streamId, t);
        this.connection.send(encodeStreamDataFrame);
        if (this.noFlowControl) {
            return;
        }
        consumeWindow(encodeStreamDataFrame.readableBytes() - 16);
    }

    protected void hookOnComplete() {
        this.connection.send(TRpcStreamFrameCodec.encodeStreamCloseFrame(this.connection.alloc(), this.streamId, 0, TRpcStreamConstants.RPC_DEFAULT_RET_CODE_OK));
    }

    protected void hookOnError(Throwable th) {
        this.connection.send(TRpcStreamFrameCodec.encodeStreamCloseFrame(this.connection.alloc(), this.streamId, TRPCProtocol.TrpcRetCode.TRPC_STREAM_UNKNOWN_ERR_VALUE, th.toString()));
    }

    public void incrementWindow(int i) {
        long addAndGet = WINDOW_SIZE.addAndGet(this, i);
        if (addAndGet - i > 0 || addAndGet <= 0) {
            return;
        }
        logger.info("stream {} has recovered {} windowSize, reconsuming", new Object[]{Integer.valueOf(this.streamId), Long.valueOf(addAndGet)});
        this.workerPool.execute(() -> {
            request(1L);
        });
    }

    private void consumeWindow(int i) {
        if (WINDOW_SIZE.addAndGet(this, -i) > 0) {
            request(1L);
        } else {
            logger.warn("stream {} has used all {} windowSize, stop consuming", new Object[]{Integer.valueOf(this.streamId), Integer.valueOf(this.initialWindowSize)});
        }
    }
}
