/*
 * Decompiled with CFR 0.152.
 */
package com.moilioncircle.redis.replicator;

import com.moilioncircle.redis.replicator.AbstractReplicator;
import com.moilioncircle.redis.replicator.Configuration;
import com.moilioncircle.redis.replicator.DefaultExceptionListener;
import com.moilioncircle.redis.replicator.cmd.BulkReplyHandler;
import com.moilioncircle.redis.replicator.cmd.CommandName;
import com.moilioncircle.redis.replicator.cmd.CommandParser;
import com.moilioncircle.redis.replicator.cmd.OffsetHandler;
import com.moilioncircle.redis.replicator.cmd.ReplyParser;
import com.moilioncircle.redis.replicator.event.Event;
import com.moilioncircle.redis.replicator.io.AsyncBufferedInputStream;
import com.moilioncircle.redis.replicator.io.RedisInputStream;
import com.moilioncircle.redis.replicator.io.RedisOutputStream;
import com.moilioncircle.redis.replicator.net.RedisSocketFactory;
import com.moilioncircle.redis.replicator.rdb.RdbParser;
import java.io.IOException;
import java.net.Socket;
import java.util.Arrays;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class RedisSocketReplicator
extends AbstractReplicator {
    protected static final Log logger = LogFactory.getLog(RedisSocketReplicator.class);
    protected Socket socket;
    protected final int port;
    protected Timer heartBeat;
    protected final String host;
    protected ReplyParser replyParser;
    protected RedisOutputStream outputStream;
    protected final RedisSocketFactory socketFactory;
    protected final AtomicBoolean connected = new AtomicBoolean(false);

    public RedisSocketReplicator(String host, int port, Configuration configuration) {
        this.host = host;
        this.port = port;
        this.configuration = configuration;
        this.socketFactory = new RedisSocketFactory(configuration);
        this.builtInCommandParserRegister();
        this.addExceptionListener(new DefaultExceptionListener());
    }

    @Override
    public void open() throws IOException {
        try {
            this.doOpen();
        }
        finally {
            this.close();
            this.doCloseListener(this);
        }
    }

    protected void doOpen() throws IOException {
        for (int i = 0; i < this.configuration.getRetries() || this.configuration.getRetries() <= 0; ++i) {
            try {
                this.establishConnection();
                i = 0;
                logger.info((Object)("PSYNC " + this.configuration.getReplId() + " " + String.valueOf(this.configuration.getReplOffset())));
                this.send("PSYNC".getBytes(), this.configuration.getReplId().getBytes(), String.valueOf(this.configuration.getReplOffset()).getBytes());
                String reply = (String)this.reply();
                SyncMode syncMode = this.trySync(reply);
                if (syncMode == SyncMode.PSYNC && this.connected.get()) {
                    this.heartBeat();
                } else if (syncMode == SyncMode.SYNC_LATER && this.connected.get()) {
                    i = 0;
                    this.close();
                    try {
                        Thread.sleep(this.configuration.getRetryTimeInterval());
                    }
                    catch (InterruptedException interrupt) {
                        Thread.currentThread().interrupt();
                    }
                    continue;
                }
                while (this.connected.get()) {
                    Object obj = this.replyParser.parse(new OffsetHandler(){

                        @Override
                        public void handle(long len) {
                            RedisSocketReplicator.this.configuration.addOffset(len);
                        }
                    });
                    if (obj instanceof Object[]) {
                        Object[] command;
                        CommandName cmdName;
                        CommandParser operations;
                        if (this.configuration.isVerbose() && logger.isDebugEnabled()) {
                            logger.debug((Object)Arrays.deepToString((Object[])obj));
                        }
                        if ((operations = (CommandParser)this.commands.get(cmdName = CommandName.name((String)(command = (Object[])obj)[0]))) == null) {
                            logger.warn((Object)("command [" + cmdName + "] not register. raw command:[" + Arrays.deepToString((Object[])obj) + "]"));
                            continue;
                        }
                        Object parsedCommand = operations.parse(command);
                        this.submitEvent((Event)parsedCommand);
                        continue;
                    }
                    if (!logger.isInfoEnabled()) continue;
                    logger.info((Object)("Redis reply:" + obj));
                }
                break;
            }
            catch (IOException e) {
                if (!this.connected.get()) break;
                logger.error((Object)"socket error", (Throwable)e);
                this.close();
                logger.info((Object)("reconnect to redis-server. retry times:" + (i + 1)));
                try {
                    Thread.sleep(this.configuration.getRetryTimeInterval());
                    continue;
                }
                catch (InterruptedException interrupt) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    protected SyncMode trySync(String reply) throws IOException {
        logger.info((Object)reply);
        if (reply.startsWith("FULLRESYNC")) {
            this.parseDump(this);
            String[] ary = reply.split(" ");
            this.configuration.setReplId(ary[1]);
            this.configuration.setReplOffset(Long.parseLong(ary[2]));
            return SyncMode.PSYNC;
        }
        if (reply.startsWith("CONTINUE")) {
            String[] ary = reply.split(" ");
            String masterRunId = this.configuration.getReplId();
            if (ary.length > 1 && masterRunId != null && !masterRunId.equals(ary[1])) {
                this.configuration.setReplId(ary[1]);
            }
            return SyncMode.PSYNC;
        }
        if (reply.startsWith("NOMASTERLINK") || reply.startsWith("LOADING")) {
            return SyncMode.SYNC_LATER;
        }
        logger.info((Object)"SYNC");
        this.send("SYNC".getBytes());
        this.parseDump(this);
        return SyncMode.SYNC;
    }

    protected void parseDump(final AbstractReplicator replicator) throws IOException {
        String reply = (String)this.replyParser.parse(new BulkReplyHandler(){

            @Override
            public String handle(long len, RedisInputStream in) throws IOException {
                logger.info((Object)("RDB dump file size:" + len));
                if (RedisSocketReplicator.this.configuration.isDiscardRdbEvent()) {
                    logger.info((Object)("Discard " + len + " bytes"));
                    in.skip(len);
                } else {
                    RdbParser parser = new RdbParser(in, replicator);
                    parser.parse();
                }
                return "OK";
            }
        });
        if (reply.equals("OK")) {
            return;
        }
        throw new AssertionError((Object)("SYNC failed." + reply));
    }

    protected void establishConnection() throws IOException {
        this.connect();
        if (this.configuration.getAuthPassword() != null) {
            this.auth(this.configuration.getAuthPassword());
        }
        this.sendSlavePort();
        this.sendSlaveIp();
        this.sendSlaveCapa("eof");
        this.sendSlaveCapa("psync2");
    }

    protected void auth(String password) throws IOException {
        if (password != null) {
            logger.info((Object)("AUTH " + password));
            this.send("AUTH".getBytes(), new byte[][]{password.getBytes()});
            String reply = (String)this.reply();
            logger.info((Object)reply);
            if (reply.equals("OK")) {
                return;
            }
            throw new AssertionError((Object)("[AUTH " + password + "] failed." + reply));
        }
    }

    protected void sendSlavePort() throws IOException {
        logger.info((Object)("REPLCONF listening-port " + this.socket.getLocalPort()));
        this.send("REPLCONF".getBytes(), "listening-port".getBytes(), String.valueOf(this.socket.getLocalPort()).getBytes());
        String reply = (String)this.reply();
        logger.info((Object)reply);
        if (reply.equals("OK")) {
            return;
        }
        logger.warn((Object)("[REPLCONF listening-port " + this.socket.getLocalPort() + "] failed." + reply));
    }

    protected void sendSlaveIp() throws IOException {
        logger.info((Object)("REPLCONF ip-address " + this.socket.getLocalAddress().getHostAddress()));
        this.send("REPLCONF".getBytes(), "ip-address".getBytes(), this.socket.getLocalAddress().getHostAddress().getBytes());
        String reply = (String)this.reply();
        logger.info((Object)reply);
        if (reply.equals("OK")) {
            return;
        }
        logger.warn((Object)("[REPLCONF ip-address " + this.socket.getLocalAddress().getHostAddress() + "] failed." + reply));
    }

    protected void sendSlaveCapa(String cmd) throws IOException {
        logger.info((Object)("REPLCONF capa " + cmd));
        this.send("REPLCONF".getBytes(), "capa".getBytes(), cmd.getBytes());
        String reply = (String)this.reply();
        logger.info((Object)reply);
        if (reply.equals("OK")) {
            return;
        }
        logger.warn((Object)("[REPLCONF capa " + cmd + "] failed." + reply));
    }

    protected synchronized void heartBeat() {
        this.heartBeat = new Timer("heart beat", true);
        this.heartBeat.schedule(new TimerTask(){

            @Override
            public void run() {
                try {
                    RedisSocketReplicator.this.send("REPLCONF".getBytes(), "ACK".getBytes(), String.valueOf(RedisSocketReplicator.this.configuration.getReplOffset()).getBytes());
                }
                catch (IOException iOException) {
                    // empty catch block
                }
            }
        }, this.configuration.getHeartBeatPeriod(), (long)this.configuration.getHeartBeatPeriod());
        logger.info((Object)"heart beat started.");
    }

    protected void send(byte[] command) throws IOException {
        this.send(command, new byte[0][]);
    }

    protected void send(byte[] command, byte[] ... args) throws IOException {
        this.outputStream.write(42);
        this.outputStream.write(String.valueOf(args.length + 1).getBytes());
        this.outputStream.writeCrLf();
        this.outputStream.write(36);
        this.outputStream.write(String.valueOf(command.length).getBytes());
        this.outputStream.writeCrLf();
        this.outputStream.write(command);
        this.outputStream.writeCrLf();
        for (byte[] arg : args) {
            this.outputStream.write(36);
            this.outputStream.write(String.valueOf(arg.length).getBytes());
            this.outputStream.writeCrLf();
            this.outputStream.write(arg);
            this.outputStream.writeCrLf();
        }
        this.outputStream.flush();
    }

    protected Object reply() throws IOException {
        return this.replyParser.parse();
    }

    protected Object reply(BulkReplyHandler handler) throws IOException {
        return this.replyParser.parse(handler);
    }

    protected void connect() throws IOException {
        if (!this.connected.compareAndSet(false, true)) {
            return;
        }
        this.socket = this.socketFactory.createSocket(this.host, this.port, this.configuration.getConnectionTimeout());
        this.outputStream = new RedisOutputStream(this.socket.getOutputStream());
        this.inputStream = new RedisInputStream(this.configuration.getAsyncCachedBytes() > 0 ? new AsyncBufferedInputStream(this.socket.getInputStream()) : this.socket.getInputStream(), this.configuration.getBufferSize());
        this.inputStream.addRawByteListener(this);
        this.replyParser = new ReplyParser(this.inputStream);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        if (!this.connected.compareAndSet(true, false)) {
            return;
        }
        RedisSocketReplicator redisSocketReplicator = this;
        synchronized (redisSocketReplicator) {
            if (this.heartBeat != null) {
                this.heartBeat.cancel();
                this.heartBeat = null;
                logger.info((Object)"heart beat canceled.");
            }
        }
        try {
            if (this.inputStream != null) {
                this.inputStream.removeRawByteListener(this);
                this.inputStream.close();
            }
        }
        catch (IOException iOException) {
            // empty catch block
        }
        try {
            if (this.outputStream != null) {
                this.outputStream.close();
            }
        }
        catch (IOException iOException) {
            // empty catch block
        }
        try {
            if (this.socket != null && !this.socket.isClosed()) {
                this.socket.close();
            }
        }
        catch (IOException iOException) {
            // empty catch block
        }
        logger.info((Object)"channel closed");
    }

    protected static enum SyncMode {
        SYNC,
        PSYNC,
        SYNC_LATER;

    }
}

