/*
 * Decompiled with CFR 0.152.
 */
package reactor.net.zmq;

import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZLoop;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;
import reactor.io.Buffer;
import reactor.net.zmq.ZeroMQNetChannel;

public abstract class ZeroMQWorker<IN, OUT>
implements Runnable {
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private final ZLoop zloop = new ZLoop();
    private final UUID id;
    private final int socketType;
    private final int ioThreadCount;
    private final ZLoop.IZLoopHandler inputHandler;
    private volatile boolean closed;
    private volatile boolean shutdownCtx;
    private volatile ZContext zmq;
    private volatile ZMQ.Socket socket;
    private volatile ZMQ.PollItem pollin;

    public ZeroMQWorker(UUID id, int socketType, int ioThreadCount, ZContext zmq) {
        this.id = id;
        this.socketType = socketType;
        this.ioThreadCount = ioThreadCount;
        this.zmq = zmq;
        this.inputHandler = new ZLoop.IZLoopHandler(){

            public int handle(ZLoop loop, ZMQ.PollItem item, Object arg) {
                ZFrame content;
                String connId;
                ZMsg msg = ZMsg.recvMsg((ZMQ.Socket)ZeroMQWorker.this.socket);
                if (null == msg || msg.size() == 0) {
                    return 0;
                }
                if (ZeroMQWorker.this.closed) {
                    return -1;
                }
                switch (ZeroMQWorker.this.socketType) {
                    case 6: {
                        connId = msg.popString();
                        break;
                    }
                    default: {
                        connId = ZeroMQWorker.this.id.toString();
                    }
                }
                ZeroMQNetChannel netChannel = ZeroMQWorker.this.select(connId).setConnectionId(connId).setSocket(ZeroMQWorker.this.socket);
                while (null != (content = msg.pop())) {
                    netChannel.read(Buffer.wrap((byte[])content.getData()));
                }
                msg.destroy();
                return 0;
            }
        };
    }

    @Override
    public void run() {
        if (this.closed) {
            return;
        }
        if (null == this.zmq) {
            this.zmq = new ZContext(this.ioThreadCount);
            this.shutdownCtx = true;
        }
        this.socket = this.zmq.createSocket(this.socketType);
        this.socket.setIdentity(this.id.toString().getBytes());
        this.configure(this.socket);
        this.pollin = new ZMQ.PollItem(this.socket, 1);
        if (this.log.isTraceEnabled()) {
            this.zloop.verbose(true);
        }
        this.zloop.addPoller(this.pollin, this.inputHandler, null);
        this.start(this.socket);
        this.zloop.start();
        this.zmq.destroySocket(this.socket);
    }

    public void shutdown() {
        if (this.closed) {
            return;
        }
        this.zloop.removePoller(this.pollin);
        this.zloop.destroy();
        this.closed = true;
        if (this.shutdownCtx) {
            this.zmq.destroy();
        }
    }

    protected abstract void configure(ZMQ.Socket var1);

    protected abstract void start(ZMQ.Socket var1);

    protected abstract ZeroMQNetChannel<IN, OUT> select(Object var1);
}

