/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.otter.canal.parse.inbound.mysql;

import com.alibaba.otter.canal.parse.exception.CanalParseException;
import com.alibaba.otter.canal.parse.inbound.ErosaConnection;
import com.alibaba.otter.canal.parse.inbound.SinkFunction;
import com.alibaba.otter.canal.parse.inbound.mysql.local.BinLogFileQueue;
import com.taobao.tddl.dbsync.binlog.FileLogFetcher;
import com.taobao.tddl.dbsync.binlog.LogBuffer;
import com.taobao.tddl.dbsync.binlog.LogContext;
import com.taobao.tddl.dbsync.binlog.LogDecoder;
import com.taobao.tddl.dbsync.binlog.LogEvent;
import com.taobao.tddl.dbsync.binlog.LogPosition;
import com.taobao.tddl.dbsync.binlog.event.QueryLogEvent;
import java.io.File;
import java.io.IOException;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LocalBinLogConnection
implements ErosaConnection {
    private static final Logger logger = LoggerFactory.getLogger(LocalBinLogConnection.class);
    private BinLogFileQueue binlogs = null;
    private boolean needWait;
    private String directory;
    private int bufferSize = 16384;
    private boolean running = false;

    public LocalBinLogConnection() {
    }

    public LocalBinLogConnection(String directory, boolean needWait) {
        this.needWait = needWait;
        this.directory = directory;
    }

    @Override
    public void connect() throws IOException {
        if (this.binlogs == null) {
            this.binlogs = new BinLogFileQueue(this.directory);
        }
        this.running = true;
    }

    @Override
    public void reconnect() throws IOException {
        this.disconnect();
        this.connect();
    }

    @Override
    public void disconnect() throws IOException {
        this.running = false;
        if (this.binlogs != null) {
            this.binlogs.destory();
        }
        this.binlogs = null;
        this.running = false;
    }

    @Override
    public boolean isConnected() {
        return this.running;
    }

    @Override
    public void seek(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void dump(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException {
        File current = new File(this.directory, binlogfilename);
        FileLogFetcher fetcher = new FileLogFetcher(this.bufferSize);
        LogDecoder decoder = new LogDecoder(0, 164);
        LogContext context = new LogContext();
        try {
            fetcher.open(current, binlogPosition.longValue());
            context.setLogPosition(new LogPosition(binlogfilename, binlogPosition.longValue()));
            while (this.running) {
                boolean needContinue = true;
                while (fetcher.fetch()) {
                    LogEvent event = decoder.decode((LogBuffer)fetcher, context);
                    if (event == null) {
                        throw new CanalParseException("parse failed");
                    }
                    if (func.sink(event)) continue;
                    needContinue = false;
                    break;
                }
                if (!needContinue) break;
                fetcher.close();
                File nextFile = this.needWait ? this.binlogs.waitForNextFile(current) : this.binlogs.getNextFile(current);
                if (nextFile == null) {
                    break;
                }
                current = nextFile;
                fetcher.open(current);
                context.setLogPosition(new LogPosition(nextFile.getName()));
            }
        }
        catch (InterruptedException e) {
            logger.warn("LocalBinLogConnection dump interrupted");
        }
        finally {
            if (fetcher != null) {
                fetcher.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void dump(long timestampMills, SinkFunction func) throws IOException {
        List<File> currentBinlogs = this.binlogs.currentBinlogs();
        File current = currentBinlogs.get(currentBinlogs.size() - 1);
        long timestampSeconds = timestampMills / 1000L;
        String binlogFilename = null;
        long binlogFileOffset = 0L;
        FileLogFetcher fetcher = new FileLogFetcher(this.bufferSize);
        LogDecoder decoder = new LogDecoder();
        decoder.handle(2);
        decoder.handle(16);
        LogContext context = new LogContext();
        try {
            fetcher.open(current);
            context.setLogPosition(new LogPosition(current.getName()));
            while (this.running) {
                boolean needContinue = true;
                String lastXidLogFilename = current.getName();
                long lastXidLogFileOffset = 0L;
                binlogFilename = lastXidLogFilename;
                binlogFileOffset = lastXidLogFileOffset;
                block4: while (fetcher.fetch()) {
                    LogEvent event;
                    do {
                        if ((event = decoder.decode((LogBuffer)fetcher, context)) == null) continue;
                        if (event.getWhen() > timestampSeconds) break block4;
                        needContinue = false;
                        if (2 == event.getHeader().getType()) {
                            if (StringUtils.endsWithIgnoreCase((String)((QueryLogEvent)event).getQuery(), (String)"BEGIN")) {
                                binlogFilename = lastXidLogFilename;
                                binlogFileOffset = lastXidLogFileOffset;
                                continue;
                            }
                            if (!StringUtils.endsWithIgnoreCase((String)((QueryLogEvent)event).getQuery(), (String)"COMMIT")) continue;
                            lastXidLogFilename = current.getName();
                            lastXidLogFileOffset = event.getLogPos();
                            continue;
                        }
                        if (16 != event.getHeader().getType()) continue;
                        lastXidLogFilename = current.getName();
                        lastXidLogFileOffset = event.getLogPos();
                    } while (event != null);
                }
                if (!needContinue) break;
                fetcher.close();
                File nextFile = this.binlogs.getBefore(current);
                if (nextFile == null) {
                    break;
                }
                current = nextFile;
                fetcher.open(current);
                context.setLogPosition(new LogPosition(current.getName()));
            }
        }
        finally {
            if (fetcher != null) {
                fetcher.close();
            }
        }
        this.dump(binlogFilename, binlogFileOffset, func);
    }

    @Override
    public ErosaConnection fork() {
        LocalBinLogConnection connection = new LocalBinLogConnection();
        connection.setBufferSize(this.bufferSize);
        connection.setDirectory(this.directory);
        connection.setNeedWait(this.needWait);
        return connection;
    }

    public boolean isNeedWait() {
        return this.needWait;
    }

    public void setNeedWait(boolean needWait) {
        this.needWait = needWait;
    }

    public String getDirectory() {
        return this.directory;
    }

    public void setDirectory(String directory) {
        this.directory = directory;
    }

    public int getBufferSize() {
        return this.bufferSize;
    }

    public void setBufferSize(int bufferSize) {
        this.bufferSize = bufferSize;
    }
}

