/*
 * Decompiled with CFR 0.152.
 */
package com.moilioncircle.redis.replicator;

import com.moilioncircle.redis.replicator.AbstractReplicator;
import com.moilioncircle.redis.replicator.AbstractReplicatorRetrier;
import com.moilioncircle.redis.replicator.Configuration;
import com.moilioncircle.redis.replicator.DefaultExceptionListener;
import com.moilioncircle.redis.replicator.Status;
import com.moilioncircle.redis.replicator.cmd.BulkReplyHandler;
import com.moilioncircle.redis.replicator.cmd.CommandName;
import com.moilioncircle.redis.replicator.cmd.CommandParser;
import com.moilioncircle.redis.replicator.cmd.OffsetHandler;
import com.moilioncircle.redis.replicator.cmd.ReplyParser;
import com.moilioncircle.redis.replicator.event.Event;
import com.moilioncircle.redis.replicator.io.AsyncBufferedInputStream;
import com.moilioncircle.redis.replicator.io.RateLimitInputStream;
import com.moilioncircle.redis.replicator.io.RedisInputStream;
import com.moilioncircle.redis.replicator.io.RedisOutputStream;
import com.moilioncircle.redis.replicator.net.RedisSocketFactory;
import com.moilioncircle.redis.replicator.rdb.RdbParser;
import com.moilioncircle.redis.replicator.util.Arrays;
import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class RedisSocketReplicator
extends AbstractReplicator {
    protected static final Log logger = LogFactory.getLog(RedisSocketReplicator.class);
    protected final int port;
    protected Timer heartbeat;
    protected final String host;
    protected volatile Socket socket;
    protected volatile ReplyParser replyParser;
    protected volatile RedisOutputStream outputStream;
    protected final RedisSocketFactory socketFactory;

    public RedisSocketReplicator(String host, int port, Configuration configuration) {
        Objects.requireNonNull(host);
        if (port <= 0 || port > 65535) {
            throw new IllegalArgumentException("illegal argument port: " + port);
        }
        Objects.requireNonNull(configuration);
        this.host = host;
        this.port = port;
        this.configuration = configuration;
        this.socketFactory = new RedisSocketFactory(configuration);
        this.builtInCommandParserRegister();
        if (configuration.isUseDefaultExceptionListener()) {
            this.addExceptionListener(new DefaultExceptionListener());
        }
    }

    @Override
    public void open() throws IOException {
        try {
            new RedisSocketReplicatorRetrier().retry(this);
        }
        finally {
            this.doClose();
            this.doCloseListener(this);
        }
    }

    protected SyncMode trySync(String reply) throws IOException {
        logger.info((Object)reply);
        if (reply.startsWith("FULLRESYNC")) {
            this.parseDump(this);
            String[] ary = reply.split(" ");
            this.configuration.setReplId(ary[1]);
            this.configuration.setReplOffset(Long.parseLong(ary[2]));
            return SyncMode.PSYNC;
        }
        if (reply.startsWith("CONTINUE")) {
            String[] ary = reply.split(" ");
            String masterRunId = this.configuration.getReplId();
            if (ary.length > 1 && masterRunId != null && !masterRunId.equals(ary[1])) {
                this.configuration.setReplId(ary[1]);
            }
            return SyncMode.PSYNC;
        }
        if (reply.startsWith("NOMASTERLINK") || reply.startsWith("LOADING")) {
            return SyncMode.SYNC_LATER;
        }
        logger.info((Object)"SYNC");
        this.send("SYNC".getBytes());
        this.parseDump(this);
        return SyncMode.SYNC;
    }

    protected void parseDump(final AbstractReplicator replicator) throws IOException {
        byte[] rawReply = (byte[])this.reply(new BulkReplyHandler(){

            @Override
            public byte[] handle(long len, RedisInputStream in) throws IOException {
                if (logger.isInfoEnabled()) {
                    if (len != -1L) {
                        logger.info((Object)("RDB dump file size:" + len));
                    } else {
                        logger.info((Object)"Disk-less replication.");
                    }
                }
                if (len != -1L && RedisSocketReplicator.this.configuration.isDiscardRdbEvent()) {
                    if (logger.isInfoEnabled()) {
                        logger.info((Object)("discard " + len + " bytes"));
                    }
                    in.skip(len);
                } else {
                    RdbParser parser = new RdbParser(in, replicator);
                    parser.parse();
                    if (len == -1L) {
                        in.skip(40L, false);
                    }
                }
                return "OK".getBytes();
            }
        });
        String reply = new String(rawReply, StandardCharsets.UTF_8);
        if ("OK".equals(reply)) {
            return;
        }
        throw new IOException("SYNC failed. reason : [" + reply + "]");
    }

    protected void establishConnection() throws IOException {
        this.connect();
        if (this.configuration.getAuthPassword() != null) {
            this.auth(this.configuration.getAuthPassword());
        }
        this.sendPing();
        this.sendSlavePort();
        this.sendSlaveIp();
        this.sendSlaveCapa("eof");
        this.sendSlaveCapa("psync2");
    }

    protected void auth(String password) throws IOException {
        if (password != null) {
            if (logger.isInfoEnabled()) {
                logger.info((Object)("AUTH " + password));
            }
            this.send("AUTH".getBytes(), new byte[][]{password.getBytes()});
            String reply = new String((byte[])this.reply(), StandardCharsets.UTF_8);
            logger.info((Object)reply);
            if ("OK".equals(reply)) {
                return;
            }
            if (reply.contains("no password")) {
                logger.warn((Object)("[AUTH " + password + "] failed. " + reply));
                return;
            }
            throw new AssertionError((Object)("[AUTH " + password + "] failed. " + reply));
        }
    }

    protected void sendPing() throws IOException {
        if (logger.isInfoEnabled()) {
            logger.info((Object)"PING");
        }
        this.send("PING".getBytes());
        String reply = new String((byte[])this.reply(), StandardCharsets.UTF_8);
        logger.info((Object)reply);
        if ("PONG".equalsIgnoreCase(reply)) {
            return;
        }
        if (reply.contains("NOAUTH")) {
            throw new AssertionError((Object)reply);
        }
        if (reply.contains("operation not permitted")) {
            throw new AssertionError((Object)"-NOAUTH Authentication required.");
        }
        logger.warn((Object)("[PING] failed. " + reply));
    }

    protected void sendSlavePort() throws IOException {
        if (logger.isInfoEnabled()) {
            logger.info((Object)("REPLCONF listening-port " + this.socket.getLocalPort()));
        }
        this.send("REPLCONF".getBytes(), "listening-port".getBytes(), String.valueOf(this.socket.getLocalPort()).getBytes());
        String reply = new String((byte[])this.reply(), StandardCharsets.UTF_8);
        logger.info((Object)reply);
        if ("OK".equals(reply)) {
            return;
        }
        if (logger.isWarnEnabled()) {
            logger.warn((Object)("[REPLCONF listening-port " + this.socket.getLocalPort() + "] failed. " + reply));
        }
    }

    protected void sendSlaveIp() throws IOException {
        if (logger.isInfoEnabled()) {
            logger.info((Object)("REPLCONF ip-address " + this.socket.getLocalAddress().getHostAddress()));
        }
        this.send("REPLCONF".getBytes(), "ip-address".getBytes(), this.socket.getLocalAddress().getHostAddress().getBytes());
        String reply = new String((byte[])this.reply(), StandardCharsets.UTF_8);
        logger.info((Object)reply);
        if ("OK".equals(reply)) {
            return;
        }
        if (logger.isWarnEnabled()) {
            logger.warn((Object)("[REPLCONF ip-address " + this.socket.getLocalAddress().getHostAddress() + "] failed. " + reply));
        }
    }

    protected void sendSlaveCapa(String cmd) throws IOException {
        if (logger.isInfoEnabled()) {
            logger.info((Object)("REPLCONF capa " + cmd));
        }
        this.send("REPLCONF".getBytes(), "capa".getBytes(), cmd.getBytes());
        String reply = new String((byte[])this.reply(), StandardCharsets.UTF_8);
        logger.info((Object)reply);
        if ("OK".equals(reply)) {
            return;
        }
        if (logger.isWarnEnabled()) {
            logger.warn((Object)("[REPLCONF capa " + cmd + "] failed. " + reply));
        }
    }

    protected synchronized void heartbeat() {
        this.heartbeat = new Timer("heartbeat", true);
        this.heartbeat.schedule(new TimerTask(){

            @Override
            public void run() {
                try {
                    RedisSocketReplicator.this.send("REPLCONF".getBytes(), "ACK".getBytes(), String.valueOf(RedisSocketReplicator.this.configuration.getReplOffset()).getBytes());
                }
                catch (IOException iOException) {
                    // empty catch block
                }
            }
        }, this.configuration.getHeartBeatPeriod(), (long)this.configuration.getHeartBeatPeriod());
        logger.info((Object)"heartbeat thread started.");
    }

    protected void send(byte[] command) throws IOException {
        this.send(command, new byte[0][]);
    }

    protected void send(byte[] command, byte[] ... args) throws IOException {
        this.outputStream.write(42);
        this.outputStream.write(String.valueOf(args.length + 1).getBytes());
        this.outputStream.writeCrLf();
        this.outputStream.write(36);
        this.outputStream.write(String.valueOf(command.length).getBytes());
        this.outputStream.writeCrLf();
        this.outputStream.write(command);
        this.outputStream.writeCrLf();
        for (byte[] arg : args) {
            this.outputStream.write(36);
            this.outputStream.write(String.valueOf(arg.length).getBytes());
            this.outputStream.writeCrLf();
            this.outputStream.write(arg);
            this.outputStream.writeCrLf();
        }
        this.outputStream.flush();
    }

    protected <T> T reply() throws IOException {
        return (T)this.replyParser.parse();
    }

    protected <T> T reply(BulkReplyHandler handler) throws IOException {
        return (T)this.replyParser.parse(handler);
    }

    protected void connect() throws IOException {
        if (!this.connected.compareAndSet(Status.DISCONNECTED, Status.CONNECTING)) {
            return;
        }
        try {
            this.socket = this.socketFactory.createSocket(this.host, this.port, this.configuration.getConnectionTimeout());
            this.outputStream = new RedisOutputStream(this.socket.getOutputStream());
            InputStream inputStream = this.socket.getInputStream();
            if (this.configuration.getAsyncCachedBytes() > 0) {
                inputStream = new AsyncBufferedInputStream(inputStream, this.configuration.getAsyncCachedBytes());
            }
            if (this.configuration.getRateLimit() > 0) {
                inputStream = new RateLimitInputStream(inputStream, this.configuration.getRateLimit());
            }
            this.inputStream = new RedisInputStream(inputStream, this.configuration.getBufferSize());
            this.inputStream.setRawByteListeners(this.rawByteListeners);
            this.replyParser = new ReplyParser(this.inputStream);
            logger.info((Object)("Connected to redis-server[" + this.host + ":" + this.port + "]"));
        }
        finally {
            this.connected.set(Status.CONNECTED);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void doClose() throws IOException {
        this.connected.compareAndSet(Status.CONNECTED, Status.DISCONNECTING);
        try {
            RedisSocketReplicator redisSocketReplicator = this;
            synchronized (redisSocketReplicator) {
                if (this.heartbeat != null) {
                    this.heartbeat.cancel();
                    this.heartbeat = null;
                    logger.info((Object)"heartbeat canceled.");
                }
            }
            try {
                if (this.inputStream != null) {
                    this.inputStream.setRawByteListeners(null);
                    this.inputStream.close();
                }
            }
            catch (IOException iOException) {
                // empty catch block
            }
            try {
                if (this.outputStream != null) {
                    this.outputStream.close();
                }
            }
            catch (IOException iOException) {
                // empty catch block
            }
            try {
                if (this.socket != null && !this.socket.isClosed()) {
                    this.socket.close();
                }
            }
            catch (IOException iOException) {
                // empty catch block
            }
            logger.info((Object)("socket closed. redis-server[" + this.host + ":" + this.port + "]"));
        }
        finally {
            this.connected.set(Status.DISCONNECTED);
        }
    }

    private class RedisSocketReplicatorRetrier
    extends AbstractReplicatorRetrier {
        private RedisSocketReplicatorRetrier() {
        }

        @Override
        protected boolean connect() throws IOException {
            RedisSocketReplicator.this.establishConnection();
            return true;
        }

        @Override
        protected boolean close(IOException reason) throws IOException {
            if (reason != null) {
                logger.error((Object)("[redis-replicator] socket error. redis-server[" + RedisSocketReplicator.this.host + ":" + RedisSocketReplicator.this.port + "]"), (Throwable)reason);
            }
            RedisSocketReplicator.this.doClose();
            if (reason != null) {
                logger.info((Object)("reconnecting to redis-server[" + RedisSocketReplicator.this.host + ":" + RedisSocketReplicator.this.port + "]. retry times:" + (this.retries + 1)));
            }
            return true;
        }

        @Override
        protected boolean open() throws IOException {
            logger.info((Object)("PSYNC " + RedisSocketReplicator.this.configuration.getReplId() + " " + String.valueOf(RedisSocketReplicator.this.configuration.getReplOffset())));
            RedisSocketReplicator.this.send("PSYNC".getBytes(), RedisSocketReplicator.this.configuration.getReplId().getBytes(), String.valueOf(RedisSocketReplicator.this.configuration.getReplOffset()).getBytes());
            String reply = new String((byte[])RedisSocketReplicator.this.reply(), StandardCharsets.UTF_8);
            SyncMode mode = RedisSocketReplicator.this.trySync(reply);
            if (mode == SyncMode.PSYNC && RedisSocketReplicator.this.getStatus() == Status.CONNECTED) {
                RedisSocketReplicator.this.heartbeat();
            } else if (mode == SyncMode.SYNC_LATER && RedisSocketReplicator.this.getStatus() == Status.CONNECTED) {
                return false;
            }
            final long[] offset = new long[1];
            while (RedisSocketReplicator.this.getStatus() == Status.CONNECTED) {
                Object obj = RedisSocketReplicator.this.replyParser.parse(new OffsetHandler(){

                    @Override
                    public void handle(long len) {
                        offset[0] = len;
                    }
                });
                if (obj instanceof Object[]) {
                    Object[] raw;
                    CommandName name;
                    CommandParser parser;
                    if (RedisSocketReplicator.this.verbose() && logger.isDebugEnabled()) {
                        logger.debug((Object)Arrays.deepToString((Object[])obj));
                    }
                    if ((parser = (CommandParser)RedisSocketReplicator.this.commands.get(name = CommandName.name(new String((byte[])(raw = (Object[])obj)[0], StandardCharsets.UTF_8)))) == null) {
                        logger.warn((Object)("command [" + name + "] not register. raw command:[" + Arrays.deepToString(raw) + "]"));
                        continue;
                    }
                    RedisSocketReplicator.this.submitEvent((Event)parser.parse(raw));
                } else {
                    logger.info((Object)("unexpected redis reply:" + obj));
                }
                RedisSocketReplicator.this.configuration.addOffset(offset[0]);
                offset[0] = 0L;
            }
            return true;
        }
    }

    protected static enum SyncMode {
        SYNC,
        PSYNC,
        SYNC_LATER;

    }
}

