package reactor.net.zmq;

import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZLoop;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;
import reactor.io.Buffer;

/* loaded from: input_file:reactor/net/zmq/ZeroMQWorker.class */
public abstract class ZeroMQWorker<IN, OUT> implements Runnable {
    private final UUID id;
    private final int socketType;
    private final int ioThreadCount;
    private volatile boolean closed;
    private volatile boolean shutdownCtx;
    private volatile ZContext zmq;
    private volatile ZMQ.Socket socket;
    private volatile ZMQ.PollItem pollin;
    private final Logger log = LoggerFactory.getLogger(getClass());
    private final ZLoop zloop = new ZLoop();
    private final ZLoop.IZLoopHandler inputHandler = new ZLoop.IZLoopHandler() { // from class: reactor.net.zmq.ZeroMQWorker.1
        public int handle(ZLoop zLoop, ZMQ.PollItem pollItem, Object obj) {
            String uuid;
            ZMsg recvMsg = ZMsg.recvMsg(ZeroMQWorker.this.socket);
            if (null == recvMsg || recvMsg.size() == 0) {
                return 0;
            }
            if (ZeroMQWorker.this.closed) {
                return -1;
            }
            switch (ZeroMQWorker.this.socketType) {
                case 6:
                    uuid = recvMsg.popString();
                    break;
                default:
                    uuid = ZeroMQWorker.this.id.toString();
                    break;
            }
            ZeroMQNetChannel<IN, OUT> socket = ZeroMQWorker.this.select(uuid).setConnectionId(uuid).setSocket(ZeroMQWorker.this.socket);
            while (true) {
                ZFrame pop = recvMsg.pop();
                if (null == pop) {
                    recvMsg.destroy();
                    return 0;
                }
                socket.read(Buffer.wrap(pop.getData()));
            }
        }
    };

    public ZeroMQWorker(UUID uuid, int i, int i2, ZContext zContext) {
        this.id = uuid;
        this.socketType = i;
        this.ioThreadCount = i2;
        this.zmq = zContext;
    }

    @Override // java.lang.Runnable
    public void run() {
        if (this.closed) {
            return;
        }
        if (null == this.zmq) {
            this.zmq = new ZContext(this.ioThreadCount);
            this.shutdownCtx = true;
        }
        this.socket = this.zmq.createSocket(this.socketType);
        this.socket.setIdentity(this.id.toString().getBytes());
        configure(this.socket);
        this.pollin = new ZMQ.PollItem(this.socket, 1);
        if (this.log.isTraceEnabled()) {
            this.zloop.verbose(true);
        }
        this.zloop.addPoller(this.pollin, this.inputHandler, (Object) null);
        start(this.socket);
        this.zloop.start();
        this.zmq.destroySocket(this.socket);
    }

    public void shutdown() {
        if (this.closed) {
            return;
        }
        this.zloop.removePoller(this.pollin);
        this.zloop.destroy();
        this.closed = true;
        if (this.shutdownCtx) {
            this.zmq.destroy();
        }
    }

    protected abstract void configure(ZMQ.Socket socket);

    protected abstract void start(ZMQ.Socket socket);

    protected abstract ZeroMQNetChannel<IN, OUT> select(Object obj);
}
