package com.tencent.trpc.transport.netty.stream;

import com.tencent.trpc.core.stream.transport.RpcConnection;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import java.util.Objects;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.netty.Connection;

/* loaded from: input_file:com/tencent/trpc/transport/netty/stream/TcpRpcConnection.class */
public class TcpRpcConnection implements RpcConnection {
    private final Sinks.Many<ByteBuf> sender = Sinks.many().unicast().onBackpressureBuffer();
    private final Sinks.Empty<Void> onClose = Sinks.empty();
    private final Connection connection;

    public TcpRpcConnection(Connection connection) {
        this.connection = (Connection) Objects.requireNonNull(connection, "connection is null");
        this.onClose.asMono().doFinally(signalType -> {
            doOnClose();
        }).subscribe();
        connection.channel().closeFuture().addListener(future -> {
            if (isDisposed()) {
                return;
            }
            dispose();
        });
        connection.outbound().send(this.sender.asFlux()).then().subscribe();
    }

    public void send(ByteBuf byteBuf) {
        this.sender.emitNext(byteBuf, Sinks.EmitFailureHandler.FAIL_FAST);
    }

    public Flux<ByteBuf> receive() {
        return this.connection.inbound().receive();
    }

    public ByteBufAllocator alloc() {
        return this.connection.channel().alloc();
    }

    public void doOnClose() {
        this.connection.dispose();
    }

    public Mono<Void> onClose() {
        return this.connection.onDispose();
    }

    public void dispose() {
        this.onClose.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST);
    }

    public boolean isDisposed() {
        return this.connection.isDisposed();
    }
}
