package org.springframework.messaging.tcp.reactor;

import org.springframework.messaging.Message;
import org.springframework.messaging.tcp.TcpConnection;
import org.springframework.util.concurrent.ListenableFuture;
import reactor.fn.Functions;
import reactor.io.net.ChannelStream;
import reactor.rx.Promise;
import reactor.rx.Promises;
import reactor.rx.Streams;

/* loaded from: input_file:BOOT-INF/lib/spring-messaging-4.3.12.RELEASE.jar:org/springframework/messaging/tcp/reactor/Reactor2TcpConnection.class */
public class Reactor2TcpConnection<P> implements TcpConnection<P> {
    private final ChannelStream<Message<P>, Message<P>> channelStream;
    private final Promise<Void> closePromise;

    public Reactor2TcpConnection(ChannelStream<Message<P>, Message<P>> channelStream, Promise<Void> promise) {
        this.channelStream = channelStream;
        this.closePromise = promise;
    }

    @Override // org.springframework.messaging.tcp.TcpConnection
    public ListenableFuture<Void> send(Message<P> message) {
        Promise prepare = Promises.prepare();
        this.channelStream.writeWith(Streams.just(message)).subscribe(prepare);
        return new PassThroughPromiseToListenableFutureAdapter(prepare);
    }

    @Override // org.springframework.messaging.tcp.TcpConnection
    public void onReadInactivity(Runnable runnable, long j) {
        this.channelStream.on().readIdle(j, Functions.consumer(runnable));
    }

    @Override // org.springframework.messaging.tcp.TcpConnection
    public void onWriteInactivity(Runnable runnable, long j) {
        this.channelStream.on().writeIdle(j, Functions.consumer(runnable));
    }

    @Override // org.springframework.messaging.tcp.TcpConnection, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.closePromise.onComplete();
    }
}
