/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.messaging.tcp.reactor;

import io.netty5.buffer.Buffer;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import org.reactivestreams.Publisher;
import org.springframework.messaging.Message;
import org.springframework.messaging.tcp.TcpConnection;
import org.springframework.messaging.tcp.reactor.TcpMessageCodec;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.netty5.NettyInbound;
import reactor.netty5.NettyOutbound;

public class ReactorNetty2TcpConnection<P>
implements TcpConnection<P> {
    private final NettyInbound inbound;
    private final NettyOutbound outbound;
    private final TcpMessageCodec<P> codec;
    private final Sinks.Empty<Void> completionSink;

    public ReactorNetty2TcpConnection(NettyInbound inbound, NettyOutbound outbound, TcpMessageCodec<P> codec, Sinks.Empty<Void> completionSink) {
        this.inbound = inbound;
        this.outbound = outbound;
        this.codec = codec;
        this.completionSink = completionSink;
    }

    @Override
    public CompletableFuture<Void> sendAsync(Message<P> message) {
        ByteBuffer byteBuffer = this.codec.encode(message);
        Buffer buffer = this.outbound.alloc().copyOf(byteBuffer);
        return this.outbound.send((Publisher)Mono.just((Object)buffer)).then().toFuture();
    }

    @Override
    public void onReadInactivity(Runnable runnable, long inactivityDuration) {
        this.inbound.withConnection(conn -> conn.onReadIdle(inactivityDuration, runnable));
    }

    @Override
    public void onWriteInactivity(Runnable runnable, long inactivityDuration) {
        this.inbound.withConnection(conn -> conn.onWriteIdle(inactivityDuration, runnable));
    }

    @Override
    public void close() {
        this.completionSink.tryEmitEmpty();
    }
}

