package reactor.ipc.netty.channel;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import java.net.InetSocketAddress;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.ipc.netty.NettyContext;
import reactor.ipc.netty.NettyInbound;
import reactor.ipc.netty.NettyOutbound;
import reactor.ipc.netty.NettyPipeline;
import reactor.ipc.netty.ReactorNetty;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.context.Context;

/* loaded from: input_file:reactor/ipc/netty/channel/ChannelOperations.class */
public class ChannelOperations<INBOUND extends NettyInbound, OUTBOUND extends NettyOutbound> implements NettyInbound, NettyOutbound, NettyContext, CoreSubscriber<Void> {
    final BiFunction<? super INBOUND, ? super OUTBOUND, ? extends Publisher<Void>> handler;
    final Channel channel;
    final FluxReceive inbound;
    final DirectProcessor<Void> onInactive;
    final ContextHandler<?> context;
    volatile Subscription outboundSubscription;
    protected static final AttributeKey<ChannelOperations<?, ?>> OPERATIONS_KEY = AttributeKey.newInstance("nettyOperations");
    static final Logger log = Loggers.getLogger(ChannelOperations.class);
    static final BiFunction<? super NettyInbound, ? super NettyOutbound, ? extends Publisher<Void>> PING = (nettyInbound, nettyOutbound) -> {
        return Flux.empty();
    };
    static final AtomicReferenceFieldUpdater<ChannelOperations, Subscription> OUTBOUND_CLOSE = AtomicReferenceFieldUpdater.newUpdater(ChannelOperations.class, Subscription.class, "outboundSubscription");

    @FunctionalInterface
    /* loaded from: input_file:reactor/ipc/netty/channel/ChannelOperations$OnNew.class */
    public interface OnNew<CHANNEL extends Channel> {
        ChannelOperations<?, ?> create(CHANNEL channel, ContextHandler<?> contextHandler, Object obj);
    }

    public static <INBOUND extends NettyInbound, OUTBOUND extends NettyOutbound> ChannelOperations<INBOUND, OUTBOUND> bind(Channel channel, BiFunction<? super INBOUND, ? super OUTBOUND, ? extends Publisher<Void>> biFunction, ContextHandler<?> contextHandler) {
        return new ChannelOperations<>(channel, biFunction, contextHandler);
    }

    public static <INBOUND extends NettyInbound, OUTBOUND extends NettyOutbound> BiFunction<? super INBOUND, ? super OUTBOUND, ? extends Publisher<Void>> noopHandler() {
        return (BiFunction<? super INBOUND, ? super OUTBOUND, ? extends Publisher<Void>>) PING;
    }

    public static ChannelOperations<?, ?> get(Channel channel) {
        return (ChannelOperations) channel.attr(OPERATIONS_KEY).get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static ChannelOperations<?, ?> tryGetAndSet(Channel channel, ChannelOperations<?, ?> channelOperations) {
        Attribute attr = channel.attr(OPERATIONS_KEY);
        do {
            ChannelOperations<?, ?> channelOperations2 = (ChannelOperations) attr.get();
            if (channelOperations2 != null) {
                return channelOperations2;
            }
        } while (!attr.compareAndSet((Object) null, channelOperations));
        return null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ChannelOperations(Channel channel, ChannelOperations<INBOUND, OUTBOUND> channelOperations) {
        this(channel, channelOperations.handler, channelOperations.context, channelOperations.onInactive);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ChannelOperations(Channel channel, BiFunction<? super INBOUND, ? super OUTBOUND, ? extends Publisher<Void>> biFunction, ContextHandler<?> contextHandler) {
        this(channel, biFunction, contextHandler, DirectProcessor.create());
    }

    protected ChannelOperations(Channel channel, BiFunction<? super INBOUND, ? super OUTBOUND, ? extends Publisher<Void>> biFunction, ContextHandler<?> contextHandler, DirectProcessor<Void> directProcessor) {
        this.handler = (BiFunction) Objects.requireNonNull(biFunction, "handler");
        this.channel = (Channel) Objects.requireNonNull(channel, "channel");
        this.context = (ContextHandler) Objects.requireNonNull(contextHandler, "context");
        this.inbound = new FluxReceive(this);
        this.onInactive = directProcessor;
        Subscription[] subscriptionArr = new Subscription[1];
        Mono.fromDirect(contextHandler.onCloseOrRelease(channel)).doOnSubscribe(subscription -> {
            subscriptionArr[0] = subscription;
        }).subscribe(this.onInactive);
        if (subscriptionArr[0] != null) {
            DirectProcessor<Void> directProcessor2 = this.onInactive;
            Subscription subscription2 = subscriptionArr[0];
            Objects.requireNonNull(subscription2);
            directProcessor2.subscribe((Consumer) null, (Consumer) null, subscription2::cancel);
        }
    }

    @Override // reactor.ipc.netty.NettyContext
    public InetSocketAddress address() {
        SocketChannel channel = channel();
        if (channel instanceof SocketChannel) {
            return channel.remoteAddress();
        }
        if (!(channel instanceof DatagramChannel)) {
            throw new IllegalStateException("Does not have an InetSocketAddress");
        }
        InetSocketAddress remoteAddress = ((DatagramChannel) channel).remoteAddress();
        return remoteAddress != null ? remoteAddress : ((DatagramChannel) channel).localAddress();
    }

    @Override // reactor.ipc.netty.NettyContext
    public final Channel channel() {
        return this.channel;
    }

    @Override // reactor.ipc.netty.NettyInbound, reactor.ipc.netty.NettyOutbound
    public final NettyContext context() {
        return this;
    }

    @Override // reactor.ipc.netty.NettyInbound, reactor.ipc.netty.NettyOutbound
    public ChannelOperations<INBOUND, OUTBOUND> context(Consumer<NettyContext> consumer) {
        consumer.accept(context());
        return this;
    }

    @Override // reactor.ipc.netty.NettyContext
    public void dispose() {
        this.inbound.cancel();
        this.channel.close();
    }

    @Override // reactor.ipc.netty.NettyContext
    public final boolean isDisposed() {
        return (channel().isActive() && get(channel()) == this) ? false : true;
    }

    @Override // reactor.ipc.netty.NettyContext
    public final Mono<Void> onClose() {
        return Mono.fromDirect(this.onInactive);
    }

    @Override // reactor.ipc.netty.NettyContext
    public NettyContext onClose(Runnable runnable) {
        this.onInactive.subscribe((Consumer) null, th -> {
            runnable.run();
        }, runnable);
        return this;
    }

    public final void onComplete() {
        if (OUTBOUND_CLOSE.getAndSet(this, Operators.cancelledSubscription()) == Operators.cancelledSubscription() || isDisposed()) {
            return;
        }
        onOutboundComplete();
    }

    public final void onError(Throwable th) {
        if (OUTBOUND_CLOSE.getAndSet(this, Operators.cancelledSubscription()) != Operators.cancelledSubscription() && !isDisposed()) {
            onOutboundError(th);
        } else if (log.isDebugEnabled()) {
            log.debug(ReactorNetty.format(channel(), "An outbound error could not be processed"), th);
        }
    }

    public final void onNext(Void r2) {
    }

    public final void onSubscribe(Subscription subscription) {
        if (Operators.setOnce(OUTBOUND_CLOSE, this, subscription)) {
            subscription.request(Long.MAX_VALUE);
        }
    }

    @Override // reactor.ipc.netty.NettyInbound
    public Flux<?> receiveObject() {
        return this.inbound;
    }

    @Override // reactor.ipc.netty.NettyInbound
    public final InetSocketAddress remoteAddress() {
        return (InetSocketAddress) this.channel.remoteAddress();
    }

    public String toString() {
        return this.channel.toString();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final boolean isInboundCancelled() {
        return this.inbound.isCancelled();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final boolean isInboundDisposed() {
        return this.inbound.isDisposed();
    }

    protected final BiFunction<? super INBOUND, ? super OUTBOUND, ? extends Publisher<Void>> handler() {
        return this.handler;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void onHandlerStart() {
        applyHandler();
        this.context.fireContextActive(this);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void onInboundNext(ChannelHandlerContext channelHandlerContext, Object obj) {
        if (obj == null) {
            onInboundError(new NullPointerException("msg is null"));
        } else {
            this.inbound.onInboundNext(obj);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final boolean replace(ChannelOperations<?, ?> channelOperations) {
        return this.channel.attr(OPERATIONS_KEY).compareAndSet(this, channelOperations);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void onInboundCancel() {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void onInboundComplete() {
        this.inbound.onInboundComplete();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void onInboundClose() {
        if (this.inbound.receiver == null) {
            this.inbound.cancel();
        }
        onHandlerTerminate();
    }

    protected void onOutboundComplete() {
        if (log.isDebugEnabled()) {
            log.debug(ReactorNetty.format(channel(), "[{}] User handler requesting close connection"), new Object[]{formatName()});
        }
        markPersistent(false);
        onHandlerTerminate();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void onOutboundError(Throwable th) {
        markPersistent(false);
        onHandlerTerminate();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void applyHandler() {
        if (log.isDebugEnabled()) {
            log.debug(ReactorNetty.format(channel(), "[{}] Handler is being applied: {}"), new Object[]{formatName(), this.handler});
        }
        try {
            Mono.fromDirect(this.handler.apply(this, this)).subscribe(this);
        } catch (Throwable th) {
            log.error(ReactorNetty.format(channel(), ""), th);
            this.channel.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void onHandlerTerminate() {
        if (replace(null)) {
            if (log.isTraceEnabled()) {
                log.trace(ReactorNetty.format(channel(), "Disposing ChannelOperation from a channel"), new Exception("ChannelOperation terminal stack"));
            }
            try {
                Operators.terminate(OUTBOUND_CLOSE, this);
                this.onInactive.onComplete();
                onInboundComplete();
            } finally {
                this.channel.pipeline().fireUserEventTriggered(NettyPipeline.handlerTerminatedEvent());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void discard() {
        if (log.isDebugEnabled()) {
            log.debug(ReactorNetty.format(channel(), "Discarding inbound content"));
        }
        this.inbound.discard();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void onInboundError(Throwable th) {
        this.inbound.onInboundError(th);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final ContextHandler<?> parentContext() {
        return this.context;
    }

    protected final String formatName() {
        return getClass().getSimpleName().replace("Operations", "");
    }

    public Context currentContext() {
        return this.context.sink.currentContext();
    }

    @Override // reactor.ipc.netty.NettyInbound, reactor.ipc.netty.NettyOutbound
    public /* bridge */ /* synthetic */ NettyInbound context(Consumer consumer) {
        return context((Consumer<NettyContext>) consumer);
    }

    @Override // reactor.ipc.netty.NettyOutbound
    public /* bridge */ /* synthetic */ NettyOutbound context(Consumer consumer) {
        return context((Consumer<NettyContext>) consumer);
    }
}
