package com.moilioncircle.redis.replicator;

import com.moilioncircle.redis.replicator.cmd.BulkReplyHandler;
import com.moilioncircle.redis.replicator.cmd.Command;
import com.moilioncircle.redis.replicator.cmd.CommandName;
import com.moilioncircle.redis.replicator.cmd.CommandParser;
import com.moilioncircle.redis.replicator.cmd.OffsetHandler;
import com.moilioncircle.redis.replicator.cmd.ReplyParser;
import com.moilioncircle.redis.replicator.io.AsyncBufferedInputStream;
import com.moilioncircle.redis.replicator.io.RateLimitInputStream;
import com.moilioncircle.redis.replicator.io.RedisInputStream;
import com.moilioncircle.redis.replicator.io.RedisOutputStream;
import com.moilioncircle.redis.replicator.net.RedisSocketFactory;
import com.moilioncircle.redis.replicator.rdb.RdbParser;
import com.moilioncircle.redis.replicator.util.Arrays;
import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:com/moilioncircle/redis/replicator/RedisSocketReplicator.class */
public class RedisSocketReplicator extends AbstractReplicator {
    protected static final Log logger = LogFactory.getLog(RedisSocketReplicator.class);
    protected final int port;
    protected Timer heartbeat;
    protected final String host;
    protected volatile Socket socket;
    protected volatile ReplyParser replyParser;
    protected volatile RedisOutputStream outputStream;
    protected final RedisSocketFactory socketFactory;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/moilioncircle/redis/replicator/RedisSocketReplicator$SyncMode.class */
    public enum SyncMode {
        SYNC,
        PSYNC,
        SYNC_LATER
    }

    public RedisSocketReplicator(String str, int i, Configuration configuration) {
        Objects.requireNonNull(str);
        if (i <= 0 || i > 65535) {
            throw new IllegalArgumentException("illegal argument port: " + i);
        }
        Objects.requireNonNull(configuration);
        this.host = str;
        this.port = i;
        this.configuration = configuration;
        this.socketFactory = new RedisSocketFactory(configuration);
        builtInCommandParserRegister();
        if (configuration.isUseDefaultExceptionListener()) {
            addExceptionListener(new DefaultExceptionListener());
        }
    }

    @Override // com.moilioncircle.redis.replicator.Replicator
    public void open() throws IOException {
        try {
            doOpen();
        } finally {
            close();
            doCloseListener(this);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r2v6, types: [byte[], byte[][]] */
    protected void doOpen() throws IOException {
        SyncMode trySync;
        int i = 0;
        while (true) {
            if (i < this.configuration.getRetries() || this.configuration.getRetries() <= 0) {
                try {
                    establishConnection();
                    i = 0;
                    if (logger.isInfoEnabled()) {
                        logger.info("PSYNC " + this.configuration.getReplId() + " " + String.valueOf(this.configuration.getReplOffset()));
                    }
                    send("PSYNC".getBytes(), new byte[]{this.configuration.getReplId().getBytes(), String.valueOf(this.configuration.getReplOffset()).getBytes()});
                    trySync = trySync(new String((byte[]) reply(), StandardCharsets.UTF_8));
                } catch (UncheckedIOException | IOException e) {
                    if (getStatus() != Status.CONNECTED) {
                        return;
                    }
                    logger.error("[redis-replicator] socket error", e);
                    close();
                    if (logger.isInfoEnabled()) {
                        logger.info("reconnect to redis-server. retry times:" + (i + 1));
                    }
                    try {
                        Thread.sleep(this.configuration.getRetryTimeInterval());
                    } catch (InterruptedException e2) {
                        Thread.currentThread().interrupt();
                    }
                }
                if (trySync == SyncMode.PSYNC && getStatus() == Status.CONNECTED) {
                    heartbeat();
                } else if (trySync == SyncMode.SYNC_LATER && getStatus() == Status.CONNECTED) {
                    i = 0;
                    close();
                    try {
                        Thread.sleep(this.configuration.getRetryTimeInterval());
                    } catch (InterruptedException e3) {
                        Thread.currentThread().interrupt();
                    }
                    i++;
                }
                final long[] jArr = new long[1];
                while (getStatus() == Status.CONNECTED) {
                    Object parse = this.replyParser.parse(new OffsetHandler() { // from class: com.moilioncircle.redis.replicator.RedisSocketReplicator.1
                        @Override // com.moilioncircle.redis.replicator.cmd.OffsetHandler
                        public void handle(long j) {
                            jArr[0] = j;
                        }
                    });
                    if (parse instanceof Object[]) {
                        if (this.configuration.isVerbose() && logger.isDebugEnabled()) {
                            logger.debug(Arrays.deepToString((Object[]) parse));
                        }
                        Object[] objArr = (Object[]) parse;
                        CommandName name = CommandName.name(new String((byte[]) objArr[0], StandardCharsets.UTF_8));
                        CommandParser<? extends Command> commandParser = this.commands.get(name);
                        if (commandParser != null) {
                            submitEvent(commandParser.parse(objArr));
                        } else if (logger.isWarnEnabled()) {
                            logger.warn("command [" + name + "] not register. raw command:[" + Arrays.deepToString(objArr) + "]");
                        }
                    } else if (logger.isInfoEnabled()) {
                        logger.info("redis reply:" + parse);
                    }
                    this.configuration.addOffset(jArr[0]);
                    jArr[0] = 0;
                }
                return;
            }
            return;
        }
    }

    protected SyncMode trySync(String str) throws IOException {
        logger.info(str);
        if (str.startsWith("FULLRESYNC")) {
            parseDump(this);
            String[] split = str.split(" ");
            this.configuration.setReplId(split[1]);
            this.configuration.setReplOffset(Long.parseLong(split[2]));
            return SyncMode.PSYNC;
        }
        if (str.startsWith("CONTINUE")) {
            String[] split2 = str.split(" ");
            String replId = this.configuration.getReplId();
            if (split2.length > 1 && replId != null && !replId.equals(split2[1])) {
                this.configuration.setReplId(split2[1]);
            }
            return SyncMode.PSYNC;
        }
        if (str.startsWith("NOMASTERLINK") || str.startsWith("LOADING")) {
            return SyncMode.SYNC_LATER;
        }
        logger.info("SYNC");
        send("SYNC".getBytes());
        parseDump(this);
        return SyncMode.SYNC;
    }

    protected void parseDump(final AbstractReplicator abstractReplicator) throws IOException {
        String str = new String((byte[]) reply(new BulkReplyHandler() { // from class: com.moilioncircle.redis.replicator.RedisSocketReplicator.2
            @Override // com.moilioncircle.redis.replicator.cmd.BulkReplyHandler
            public byte[] handle(long j, RedisInputStream redisInputStream) throws IOException {
                if (RedisSocketReplicator.logger.isInfoEnabled()) {
                    if (j != -1) {
                        RedisSocketReplicator.logger.info("RDB dump file size:" + j);
                    } else {
                        RedisSocketReplicator.logger.info("Disk-less replication.");
                    }
                }
                if (j == -1 || !RedisSocketReplicator.this.configuration.isDiscardRdbEvent()) {
                    new RdbParser(redisInputStream, abstractReplicator).parse();
                    if (j == -1) {
                        redisInputStream.skip(40L, false);
                    }
                } else {
                    if (RedisSocketReplicator.logger.isInfoEnabled()) {
                        RedisSocketReplicator.logger.info("discard " + j + " bytes");
                    }
                    redisInputStream.skip(j);
                }
                return "OK".getBytes();
            }
        }), StandardCharsets.UTF_8);
        if (!"OK".equals(str)) {
            throw new IOException("SYNC failed. reason : [" + str + "]");
        }
    }

    protected void establishConnection() throws IOException {
        connect();
        if (this.configuration.getAuthPassword() != null) {
            auth(this.configuration.getAuthPassword());
        }
        sendPing();
        sendSlavePort();
        sendSlaveIp();
        sendSlaveCapa("eof");
        sendSlaveCapa("psync2");
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r2v1, types: [byte[], byte[][]] */
    protected void auth(String str) throws IOException {
        if (str != null) {
            if (logger.isInfoEnabled()) {
                logger.info("AUTH " + str);
            }
            send("AUTH".getBytes(), new byte[]{str.getBytes()});
            String str2 = new String((byte[]) reply(), StandardCharsets.UTF_8);
            logger.info(str2);
            if ("OK".equals(str2)) {
                return;
            }
            if (!str2.contains("no password")) {
                throw new AssertionError("[AUTH " + str + "] failed. " + str2);
            }
            logger.warn("[AUTH " + str + "] failed. " + str2);
        }
    }

    protected void sendPing() throws IOException {
        if (logger.isInfoEnabled()) {
            logger.info("PING");
        }
        send("PING".getBytes());
        String str = new String((byte[]) reply(), StandardCharsets.UTF_8);
        logger.info(str);
        if ("PONG".equalsIgnoreCase(str)) {
            return;
        }
        if (str.contains("NOAUTH")) {
            throw new AssertionError(str);
        }
        if (str.contains("operation not permitted")) {
            throw new AssertionError("-NOAUTH Authentication required.");
        }
        logger.warn("[PING] failed. " + str);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r2v1, types: [byte[], byte[][]] */
    protected void sendSlavePort() throws IOException {
        if (logger.isInfoEnabled()) {
            logger.info("REPLCONF listening-port " + this.socket.getLocalPort());
        }
        send("REPLCONF".getBytes(), new byte[]{"listening-port".getBytes(), String.valueOf(this.socket.getLocalPort()).getBytes()});
        String str = new String((byte[]) reply(), StandardCharsets.UTF_8);
        logger.info(str);
        if (!"OK".equals(str) && logger.isWarnEnabled()) {
            logger.warn("[REPLCONF listening-port " + this.socket.getLocalPort() + "] failed. " + str);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r2v1, types: [byte[], byte[][]] */
    protected void sendSlaveIp() throws IOException {
        if (logger.isInfoEnabled()) {
            logger.info("REPLCONF ip-address " + this.socket.getLocalAddress().getHostAddress());
        }
        send("REPLCONF".getBytes(), new byte[]{"ip-address".getBytes(), this.socket.getLocalAddress().getHostAddress().getBytes()});
        String str = new String((byte[]) reply(), StandardCharsets.UTF_8);
        logger.info(str);
        if (!"OK".equals(str) && logger.isWarnEnabled()) {
            logger.warn("[REPLCONF ip-address " + this.socket.getLocalAddress().getHostAddress() + "] failed. " + str);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r2v1, types: [byte[], byte[][]] */
    protected void sendSlaveCapa(String str) throws IOException {
        if (logger.isInfoEnabled()) {
            logger.info("REPLCONF capa " + str);
        }
        send("REPLCONF".getBytes(), new byte[]{"capa".getBytes(), str.getBytes()});
        String str2 = new String((byte[]) reply(), StandardCharsets.UTF_8);
        logger.info(str2);
        if (!"OK".equals(str2) && logger.isWarnEnabled()) {
            logger.warn("[REPLCONF capa " + str + "] failed. " + str2);
        }
    }

    protected synchronized void heartbeat() {
        this.heartbeat = new Timer("heartbeat", true);
        this.heartbeat.schedule(new TimerTask() { // from class: com.moilioncircle.redis.replicator.RedisSocketReplicator.3
            /* JADX WARN: Multi-variable type inference failed */
            /* JADX WARN: Type inference failed for: r2v1, types: [byte[], byte[][]] */
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                try {
                    RedisSocketReplicator.this.send("REPLCONF".getBytes(), new byte[]{"ACK".getBytes(), String.valueOf(RedisSocketReplicator.this.configuration.getReplOffset()).getBytes()});
                } catch (IOException e) {
                }
            }
        }, this.configuration.getHeartBeatPeriod(), this.configuration.getHeartBeatPeriod());
        logger.info("heartbeat thread started.");
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r2v1, types: [byte[], byte[][]] */
    protected void send(byte[] bArr) throws IOException {
        send(bArr, new byte[0]);
    }

    protected void send(byte[] bArr, byte[]... bArr2) throws IOException {
        this.outputStream.write(42);
        this.outputStream.write(String.valueOf(bArr2.length + 1).getBytes());
        this.outputStream.writeCrLf();
        this.outputStream.write(36);
        this.outputStream.write(String.valueOf(bArr.length).getBytes());
        this.outputStream.writeCrLf();
        this.outputStream.write(bArr);
        this.outputStream.writeCrLf();
        for (byte[] bArr3 : bArr2) {
            this.outputStream.write(36);
            this.outputStream.write(String.valueOf(bArr3.length).getBytes());
            this.outputStream.writeCrLf();
            this.outputStream.write(bArr3);
            this.outputStream.writeCrLf();
        }
        this.outputStream.flush();
    }

    protected <T> T reply() throws IOException {
        return (T) this.replyParser.parse();
    }

    protected <T> T reply(BulkReplyHandler bulkReplyHandler) throws IOException {
        return (T) this.replyParser.parse(bulkReplyHandler);
    }

    protected void connect() throws IOException {
        if (this.connected.compareAndSet(Status.DISCONNECTED, Status.CONNECTING)) {
            try {
                this.socket = this.socketFactory.createSocket(this.host, this.port, this.configuration.getConnectionTimeout());
                this.outputStream = new RedisOutputStream(this.socket.getOutputStream());
                InputStream inputStream = this.socket.getInputStream();
                if (this.configuration.getAsyncCachedBytes() > 0) {
                    inputStream = new AsyncBufferedInputStream(inputStream, this.configuration.getAsyncCachedBytes());
                }
                if (this.configuration.getRateLimit() > 0) {
                    inputStream = new RateLimitInputStream(inputStream, this.configuration.getRateLimit());
                }
                this.inputStream = new RedisInputStream(inputStream, this.configuration.getBufferSize());
                this.inputStream.setRawByteListeners(this.rawByteListeners);
                this.replyParser = new ReplyParser(this.inputStream);
            } finally {
                this.connected.set(Status.CONNECTED);
            }
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.connected.compareAndSet(Status.CONNECTED, Status.DISCONNECTING)) {
            try {
                synchronized (this) {
                    if (this.heartbeat != null) {
                        this.heartbeat.cancel();
                        this.heartbeat = null;
                        logger.info("heartbeat canceled.");
                    }
                }
                try {
                    if (this.inputStream != null) {
                        this.inputStream.setRawByteListeners(null);
                        this.inputStream.close();
                    }
                } catch (IOException e) {
                }
                try {
                    if (this.outputStream != null) {
                        this.outputStream.close();
                    }
                } catch (IOException e2) {
                }
                try {
                    if (this.socket != null && !this.socket.isClosed()) {
                        this.socket.close();
                    }
                } catch (IOException e3) {
                }
                logger.info("socket closed");
            } finally {
                this.connected.set(Status.DISCONNECTED);
            }
        }
    }
}
