/*
 * Decompiled with CFR 0.152.
 */
package reactor.net.zmq;

import com.gs.collections.api.block.predicate.Predicate;
import com.gs.collections.api.list.MutableList;
import com.gs.collections.impl.block.predicate.checked.CheckedPredicate;
import com.gs.collections.impl.list.mutable.FastList;
import com.gs.collections.impl.list.mutable.SynchronizedMutableList;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;
import reactor.core.Environment;
import reactor.core.Reactor;
import reactor.core.composable.Deferred;
import reactor.core.composable.Promise;
import reactor.event.dispatch.Dispatcher;
import reactor.function.Consumer;
import reactor.io.Buffer;
import reactor.io.encoding.Codec;
import reactor.net.AbstractNetChannel;
import reactor.net.NetChannel;

public class ZeroMQNetChannel<IN, OUT>
extends AbstractNetChannel<IN, OUT> {
    private static final AtomicReferenceFieldUpdater<ZeroMQNetChannel, ZMsg> MSG_UPD = AtomicReferenceFieldUpdater.newUpdater(ZeroMQNetChannel.class, ZMsg.class, "currentMsg");
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private final ZeroMQConsumerSpec eventSpec = new ZeroMQConsumerSpec();
    private final MutableList<Runnable> closeHandlers = SynchronizedMutableList.of((List)FastList.newList());
    private volatile String connectionId;
    private volatile ZMQ.Socket socket;
    private volatile ZMsg currentMsg;

    public ZeroMQNetChannel(@Nonnull Environment env, @Nonnull Reactor eventsReactor, @Nonnull Dispatcher ioDispatcher, @Nullable Codec<Buffer, IN, OUT> codec) {
        super(env, codec, ioDispatcher, eventsReactor);
    }

    public ZeroMQNetChannel<IN, OUT> setConnectionId(String connectionId) {
        this.connectionId = connectionId;
        return this;
    }

    public ZeroMQNetChannel<IN, OUT> setSocket(ZMQ.Socket socket) {
        this.socket = socket;
        return this;
    }

    @Override
    public InetSocketAddress remoteAddress() {
        return null;
    }

    @Override
    protected void write(ByteBuffer data, Deferred<Void, Promise<Void>> onComplete, boolean flush) {
        byte[] bytes = new byte[data.remaining()];
        data.get(bytes);
        boolean isNewMsg = MSG_UPD.compareAndSet(this, null, new ZMsg());
        ZMsg msg = MSG_UPD.get(this);
        if (isNewMsg) {
            switch (this.socket.getType()) {
                case 6: {
                    msg.add(new ZFrame(this.connectionId));
                    break;
                }
            }
        }
        msg.add(new ZFrame(bytes));
        if (flush) {
            this.doFlush(onComplete);
        }
    }

    @Override
    protected void write(Object data, Deferred<Void, Promise<Void>> onComplete, boolean flush) {
        Buffer buff = (Buffer)this.getEncoder().apply(data);
        this.write(buff.byteBuffer(), onComplete, flush);
    }

    @Override
    protected synchronized void flush() {
        this.doFlush(null);
    }

    private void doFlush(Deferred<Void, Promise<Void>> onComplete) {
        ZMsg msg = MSG_UPD.get(this);
        MSG_UPD.compareAndSet(this, msg, null);
        if (null != msg) {
            boolean success = msg.send(this.socket);
            if (null != onComplete) {
                if (success) {
                    onComplete.accept((Object)null);
                } else {
                    onComplete.accept((Throwable)new RuntimeException("ZeroMQ Message could not be sent"));
                }
            }
        }
    }

    @Override
    public void close(final Consumer<Boolean> onClose) {
        this.getEventsReactor().schedule((Consumer)new Consumer<Void>(){

            public void accept(Void v) {
                ZeroMQNetChannel.this.closeHandlers.removeIf((Predicate)new CheckedPredicate<Runnable>(){

                    public boolean safeAccept(Runnable r) throws Exception {
                        r.run();
                        return true;
                    }
                });
                if (null != onClose) {
                    onClose.accept((Object)true);
                }
            }
        }, null);
    }

    @Override
    public NetChannel.ConsumerSpec on() {
        return this.eventSpec;
    }

    public String toString() {
        return "ZeroMQNetChannel{closeHandlers=" + this.closeHandlers + ", connectionId='" + this.connectionId + '\'' + ", socket=" + this.socket + '}';
    }

    private class ZeroMQConsumerSpec
    implements NetChannel.ConsumerSpec {
        private ZeroMQConsumerSpec() {
        }

        @Override
        public NetChannel.ConsumerSpec close(Runnable onClose) {
            ZeroMQNetChannel.this.closeHandlers.add((Object)onClose);
            return this;
        }

        @Override
        public NetChannel.ConsumerSpec readIdle(long idleTimeout, Runnable onReadIdle) {
            return this;
        }

        @Override
        public NetChannel.ConsumerSpec writeIdle(long idleTimeout, Runnable onWriteIdle) {
            return this;
        }
    }
}

