package com.tencent.trpc.proto.standard.stream;

import com.tencent.trpc.core.common.config.ProtocolConfig;
import com.tencent.trpc.core.exception.TRpcException;
import com.tencent.trpc.core.logger.Logger;
import com.tencent.trpc.core.logger.LoggerFactory;
import com.tencent.trpc.core.stream.transport.RpcConnection;
import com.tencent.trpc.proto.standard.common.TRPCProtocol;
import com.tencent.trpc.proto.standard.common.TRpcFrameType;
import com.tencent.trpc.proto.standard.stream.codec.TRpcStreamFrameCodec;
import com.tencent.trpc.proto.standard.stream.codec.TRpcStreamFrameHeaderCodec;
import com.tencent.trpc.proto.standard.stream.config.TRpcStreamConstants;
import io.netty.buffer.ByteBuf;
import io.netty.util.ReferenceCountUtil;
import java.nio.channels.ClosedChannelException;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import reactor.core.publisher.Sinks;

/* loaded from: input_file:com/tencent/trpc/proto/standard/stream/BaseTRpcStreamRequesterResponder.class */
public abstract class BaseTRpcStreamRequesterResponder {
    protected final ConcurrentMap<Integer, StreamRemoteSubscriber<?>> subscribers = new ConcurrentHashMap();
    protected final ConcurrentMap<Integer, Sinks.Many<ByteBuf>> receivers = new ConcurrentHashMap();
    protected final ProtocolConfig protocolConfig;
    protected final RpcConnection connection;
    private volatile Throwable terminationError;
    private static final Logger logger = LoggerFactory.getLogger(BaseTRpcStreamRequesterResponder.class);
    private static final Exception CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException();
    private static final AtomicReferenceFieldUpdater<BaseTRpcStreamRequesterResponder, Throwable> TERMINATION_ERROR = AtomicReferenceFieldUpdater.newUpdater(BaseTRpcStreamRequesterResponder.class, Throwable.class, "terminationError");

    /* JADX INFO: Access modifiers changed from: package-private */
    public BaseTRpcStreamRequesterResponder(ProtocolConfig protocolConfig, RpcConnection rpcConnection) {
        this.protocolConfig = (ProtocolConfig) Objects.requireNonNull(protocolConfig, "protocolConfig is null");
        this.connection = (RpcConnection) Objects.requireNonNull(rpcConnection, "connection is null");
        rpcConnection.onClose().subscribe((Consumer) null, this::terminate, this::shutdown);
        rpcConnection.receive().subscribe(this::handleFrame, this::terminate);
    }

    private void handleFrame(ByteBuf byteBuf) {
        int streamId = TRpcStreamFrameHeaderCodec.streamId(byteBuf);
        TRpcFrameType frameType = TRpcStreamFrameHeaderCodec.frameType(byteBuf);
        logger.debug("stream {} got frame: {}", new Object[]{Integer.valueOf(streamId), frameType});
        ByteBuf retainedSlice = byteBuf.retainedSlice(16, byteBuf.readableBytes() - 16);
        try {
            try {
                switch (frameType) {
                    case INIT:
                        handleStreamInit(streamId, retainedSlice);
                        break;
                    case DATA:
                        handleStreamData(streamId, retainedSlice);
                        break;
                    case FEEDBACK:
                        handleStreamFeedback(streamId, retainedSlice);
                        break;
                    case CLOSE:
                        handleStreamClose(streamId, retainedSlice);
                        break;
                    default:
                        throw TRpcException.newFrameException(TRPCProtocol.TrpcRetCode.TRPC_STREAM_UNKNOWN_ERR_VALUE, "unknown frameType " + frameType);
                }
            } catch (Throwable th) {
                throw TRpcException.newFrameException(TRPCProtocol.TrpcRetCode.TRPC_INVOKE_UNKNOWN_ERR_VALUE, "handle frame failed", th);
            }
        } finally {
            ReferenceCountUtil.safeRelease(retainedSlice);
        }
    }

    protected abstract void handleStreamInit(int i, ByteBuf byteBuf);

    protected void handleStreamData(int i, ByteBuf byteBuf) {
        Sinks.Many<ByteBuf> many = this.receivers.get(Integer.valueOf(i));
        if (many == null) {
            logger.error("cannot find receiver for stream {}", new Object[]{Integer.valueOf(i)});
            errorResetStream(i, TRPCProtocol.TrpcRetCode.TRPC_STREAM_UNKNOWN_ERR_VALUE, "not found stream");
        } else {
            Sinks.EmitResult tryEmitNext = many.tryEmitNext(byteBuf.retain().touch());
            if (tryEmitNext.isFailure()) {
                errorResetStream(i, TRPCProtocol.TrpcRetCode.TRPC_STREAM_UNKNOWN_ERR_VALUE, "receive data failed: " + tryEmitNext);
            }
        }
    }

    protected void handleStreamFeedback(int i, ByteBuf byteBuf) {
        StreamRemoteSubscriber<?> streamRemoteSubscriber = this.subscribers.get(Integer.valueOf(i));
        if (streamRemoteSubscriber == null) {
            logger.warn("feedback msg not found stream {}", new Object[]{Integer.valueOf(i)});
            return;
        }
        TRPCProtocol.TrpcStreamFeedBackMeta decodeStreamFeedbackFrame = TRpcStreamFrameCodec.decodeStreamFeedbackFrame(byteBuf);
        logger.info("got feedback msg: {}", new Object[]{decodeStreamFeedbackFrame});
        streamRemoteSubscriber.incrementWindow(decodeStreamFeedbackFrame.getWindowSizeIncrement());
    }

    protected void handleStreamClose(int i, ByteBuf byteBuf) {
        Sinks.Many<ByteBuf> many = this.receivers.get(Integer.valueOf(i));
        if (many == null) {
            logger.error("cannot find receiver for stream {}", new Object[]{Integer.valueOf(i)});
            return;
        }
        TRPCProtocol.TrpcStreamCloseMeta decodeStreamCloseFrame = TRpcStreamFrameCodec.decodeStreamCloseFrame(byteBuf);
        Sinks.EmitResult emitResult = null;
        switch (decodeStreamCloseFrame.getCloseType()) {
            case 0:
                if (decodeStreamCloseFrame.getRet() == 0) {
                    emitResult = many.tryEmitComplete();
                    break;
                } else {
                    emitResult = many.tryEmitError(TRpcException.newFrameException(decodeStreamCloseFrame.getRet(), decodeStreamCloseFrame.getMsg().toStringUtf8()));
                    break;
                }
            case 1:
                errorResetStream(i, decodeStreamCloseFrame.getRet(), decodeStreamCloseFrame.getMsg().toStringUtf8());
                break;
            default:
                logger.error("got unknown close type: {}", new Object[]{decodeStreamCloseFrame});
                break;
        }
        if (emitResult == null || !emitResult.isFailure() || emitResult == Sinks.EmitResult.FAIL_CANCELLED) {
            return;
        }
        logger.error("close stream {} failed: {}", new Object[]{Integer.valueOf(i), emitResult});
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void errorInitStream(int i, int i2, String str) {
        logger.error("init stream {} failed, ret: {}, msg: {}", new Object[]{Integer.valueOf(i), Integer.valueOf(i2), str});
        this.connection.send(TRpcStreamFrameCodec.encodeStreamInitResponseFrame(this.connection.alloc(), i, 0, this.protocolConfig, i2, str));
        this.subscribers.remove(Integer.valueOf(i));
        this.receivers.remove(Integer.valueOf(i));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void errorResetStream(int i, int i2, String str) {
        logger.error("error reset stream {}, ret: {}, msg: {}", new Object[]{Integer.valueOf(i), Integer.valueOf(i2), str});
        Optional.ofNullable(this.subscribers.remove(Integer.valueOf(i))).ifPresent(streamRemoteSubscriber -> {
            streamRemoteSubscriber.cancel();
            this.connection.send(TRpcStreamFrameCodec.encodeStreamResetFrame(this.connection.alloc(), i, i2, str));
        });
        Optional.ofNullable(this.receivers.remove(Integer.valueOf(i))).ifPresent(many -> {
            Sinks.EmitResult tryEmitError = many.tryEmitError(TRpcException.newFrameException(i2, str));
            if (tryEmitError.isFailure()) {
                logger.error("reset receiver stream {} failed: {}", new Object[]{Integer.valueOf(i), tryEmitError});
            }
        });
    }

    protected void shutdown() {
        terminate(CLOSED_CHANNEL_EXCEPTION);
    }

    protected void terminate(Throwable th) {
        if (this.terminationError == null && TERMINATION_ERROR.compareAndSet(this, null, th)) {
            this.subscribers.values().forEach((v0) -> {
                v0.cancel();
            });
            this.receivers.values().forEach(many -> {
                many.tryEmitError(th);
            });
            this.subscribers.clear();
            this.receivers.clear();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int getWindowSize(int i) {
        return (i == 0 || i >= 65536) ? i : TRpcStreamConstants.DEFAULT_STREAM_WINDOW_SIZE;
    }
}
