/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.otter.canal.parse.inbound;

import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
import com.alibaba.otter.canal.common.alarm.CanalAlarmHandler;
import com.alibaba.otter.canal.filter.CanalEventFilter;
import com.alibaba.otter.canal.parse.CanalEventParser;
import com.alibaba.otter.canal.parse.exception.CanalParseException;
import com.alibaba.otter.canal.parse.exception.TableIdNotFoundException;
import com.alibaba.otter.canal.parse.inbound.BinlogParser;
import com.alibaba.otter.canal.parse.inbound.ErosaConnection;
import com.alibaba.otter.canal.parse.inbound.EventTransactionBuffer;
import com.alibaba.otter.canal.parse.inbound.SinkFunction;
import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.position.EntryPosition;
import com.alibaba.otter.canal.protocol.position.LogIdentity;
import com.alibaba.otter.canal.protocol.position.LogPosition;
import com.alibaba.otter.canal.sink.CanalEventSink;
import com.alibaba.otter.canal.sink.exception.CanalSinkException;
import java.io.IOException;
import java.nio.channels.ClosedByInterruptException;
import java.util.Arrays;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.lang.math.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

public abstract class AbstractEventParser<EVENT>
extends AbstractCanalLifeCycle
implements CanalEventParser<EVENT> {
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    protected CanalLogPositionManager logPositionManager = null;
    protected CanalEventSink<List<CanalEntry.Entry>> eventSink = null;
    protected CanalEventFilter eventFilter = null;
    protected CanalEventFilter eventBlackFilter = null;
    private CanalAlarmHandler alarmHandler = null;
    protected AtomicBoolean profilingEnabled = new AtomicBoolean(false);
    protected AtomicLong receivedEventCount = new AtomicLong();
    protected AtomicLong parsedEventCount = new AtomicLong();
    protected AtomicLong consumedEventCount = new AtomicLong();
    protected long parsingInterval = -1L;
    protected long processingInterval = -1L;
    protected volatile AuthenticationInfo runningInfo;
    protected String destination;
    protected BinlogParser binlogParser = null;
    protected Thread parseThread = null;
    protected Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler(){

        @Override
        public void uncaughtException(Thread t, Throwable e) {
            AbstractEventParser.this.logger.error("parse events has an error", e);
        }
    };
    protected EventTransactionBuffer transactionBuffer;
    protected int transactionSize = 1024;
    protected AtomicBoolean needTransactionPosition = new AtomicBoolean(false);
    protected long lastEntryTime = 0L;
    protected volatile boolean detectingEnable = true;
    protected Integer detectingIntervalInSeconds = 3;
    protected volatile Timer timer;
    protected TimerTask heartBeatTimerTask;
    protected Throwable exception = null;

    protected abstract BinlogParser buildParser();

    protected abstract ErosaConnection buildErosaConnection();

    protected abstract EntryPosition findStartPosition(ErosaConnection var1) throws IOException;

    protected void preDump(ErosaConnection connection) {
    }

    protected void afterDump(ErosaConnection connection) {
    }

    public void sendAlarm(String destination, String msg) {
        if (this.alarmHandler != null) {
            this.alarmHandler.sendAlarm(destination, msg);
        }
    }

    public AbstractEventParser() {
        this.transactionBuffer = new EventTransactionBuffer(new EventTransactionBuffer.TransactionFlushCallback(){

            @Override
            public void flush(List<CanalEntry.Entry> transaction) throws InterruptedException {
                boolean successed = AbstractEventParser.this.consumeTheEventAndProfilingIfNecessary(transaction);
                if (!AbstractEventParser.this.running) {
                    return;
                }
                if (!successed) {
                    throw new CanalParseException("consume failed!");
                }
                LogPosition position = AbstractEventParser.this.buildLastTransactionPosition(transaction);
                if (position != null) {
                    AbstractEventParser.this.logPositionManager.persistLogPosition(AbstractEventParser.this.destination, position);
                }
            }
        });
    }

    public void start() {
        super.start();
        MDC.put((String)"destination", (String)this.destination);
        this.transactionBuffer.setBufferSize(this.transactionSize);
        this.transactionBuffer.start();
        this.binlogParser = this.buildParser();
        this.binlogParser.start();
        this.parseThread = new Thread(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             * Loose catch block
             */
            @Override
            public void run() {
                MDC.put((String)"destination", (String)String.valueOf(AbstractEventParser.this.destination));
                ErosaConnection erosaConnection = null;
                while (AbstractEventParser.this.running) {
                    block30: {
                        block29: {
                            EntryPosition position;
                            erosaConnection = AbstractEventParser.this.buildErosaConnection();
                            AbstractEventParser.this.startHeartBeat(erosaConnection);
                            AbstractEventParser.this.preDump(erosaConnection);
                            erosaConnection.connect();
                            final EntryPosition startPosition = position = AbstractEventParser.this.findStartPosition(erosaConnection);
                            if (startPosition == null) {
                                throw new CanalParseException("can't find start position for " + AbstractEventParser.this.destination);
                            }
                            AbstractEventParser.this.logger.info("find start position : {}", (Object)startPosition.toString());
                            erosaConnection.reconnect();
                            SinkFunction sinkHandler = new SinkFunction<EVENT>(){
                                private LogPosition lastPosition;

                                @Override
                                public boolean sink(EVENT event) {
                                    try {
                                        CanalEntry.Entry entry = AbstractEventParser.this.parseAndProfilingIfNecessary(event);
                                        if (!AbstractEventParser.this.running) {
                                            return false;
                                        }
                                        if (entry != null) {
                                            AbstractEventParser.this.exception = null;
                                            AbstractEventParser.this.transactionBuffer.add(entry);
                                            this.lastPosition = AbstractEventParser.this.buildLastPosition(entry);
                                            AbstractEventParser.this.lastEntryTime = System.currentTimeMillis();
                                        }
                                        return AbstractEventParser.this.running;
                                    }
                                    catch (TableIdNotFoundException e) {
                                        throw e;
                                    }
                                    catch (Throwable e) {
                                        AbstractEventParser.this.processSinkError(e, this.lastPosition, startPosition.getJournalName(), startPosition.getPosition());
                                        throw new CanalParseException(e);
                                    }
                                }
                            };
                            if (StringUtils.isEmpty((String)startPosition.getJournalName()) && startPosition.getTimestamp() != null) {
                                erosaConnection.dump(startPosition.getTimestamp(), sinkHandler);
                                break block29;
                            }
                            erosaConnection.dump(startPosition.getJournalName(), startPosition.getPosition(), sinkHandler);
                        }
                        Thread.interrupted();
                        AbstractEventParser.this.afterDump(erosaConnection);
                        try {
                            if (erosaConnection != null) {
                                erosaConnection.disconnect();
                            }
                            break block30;
                        }
                        catch (IOException e1) {
                            if (!AbstractEventParser.this.running) {
                                throw new CanalParseException(String.format("disconnect address %s has an error, retrying. ", AbstractEventParser.this.runningInfo.getAddress().toString()), e1);
                            }
                            AbstractEventParser.this.logger.error("disconnect address {} has an error, retrying., caused by ", (Object)AbstractEventParser.this.runningInfo.getAddress().toString(), (Object)e1);
                        }
                        break block30;
                        catch (TableIdNotFoundException e) {
                            AbstractEventParser.this.exception = e;
                            AbstractEventParser.this.needTransactionPosition.compareAndSet(false, true);
                            AbstractEventParser.this.logger.error(String.format("dump address %s has an error, retrying. caused by ", AbstractEventParser.this.runningInfo.getAddress().toString()), (Throwable)((Object)e));
                            Thread.interrupted();
                            AbstractEventParser.this.afterDump(erosaConnection);
                            try {
                                if (erosaConnection != null) {
                                    erosaConnection.disconnect();
                                }
                            }
                            catch (IOException e1) {
                                if (!AbstractEventParser.this.running) {
                                    throw new CanalParseException(String.format("disconnect address %s has an error, retrying. ", AbstractEventParser.this.runningInfo.getAddress().toString()), e1);
                                }
                                AbstractEventParser.this.logger.error("disconnect address {} has an error, retrying., caused by ", (Object)AbstractEventParser.this.runningInfo.getAddress().toString(), (Object)e1);
                            }
                        }
                        catch (Throwable e2) {
                            block31: {
                                AbstractEventParser.this.processDumpError(e2);
                                AbstractEventParser.this.exception = e2;
                                if (!AbstractEventParser.this.running) {
                                    if (!(e2 instanceof ClosedByInterruptException) && !(e2.getCause() instanceof ClosedByInterruptException)) {
                                        throw new CanalParseException(String.format("dump address %s has an error, retrying. ", AbstractEventParser.this.runningInfo.getAddress().toString()), e2);
                                    }
                                    break block31;
                                }
                                AbstractEventParser.this.logger.error(String.format("dump address %s has an error, retrying. caused by ", AbstractEventParser.this.runningInfo.getAddress().toString()), e2);
                                AbstractEventParser.this.sendAlarm(AbstractEventParser.this.destination, ExceptionUtils.getFullStackTrace((Throwable)e2));
                                {
                                    catch (Throwable throwable) {
                                        Thread.interrupted();
                                        AbstractEventParser.this.afterDump(erosaConnection);
                                        try {
                                            if (erosaConnection != null) {
                                                erosaConnection.disconnect();
                                            }
                                        }
                                        catch (IOException e1) {
                                            if (!AbstractEventParser.this.running) {
                                                throw new CanalParseException(String.format("disconnect address %s has an error, retrying. ", AbstractEventParser.this.runningInfo.getAddress().toString()), e1);
                                            }
                                            AbstractEventParser.this.logger.error("disconnect address {} has an error, retrying., caused by ", (Object)AbstractEventParser.this.runningInfo.getAddress().toString(), (Object)e1);
                                        }
                                        throw throwable;
                                    }
                                }
                            }
                            Thread.interrupted();
                            AbstractEventParser.this.afterDump(erosaConnection);
                            try {
                                if (erosaConnection != null) {
                                    erosaConnection.disconnect();
                                }
                            }
                            catch (IOException e1) {
                                if (!AbstractEventParser.this.running) {
                                    throw new CanalParseException(String.format("disconnect address %s has an error, retrying. ", AbstractEventParser.this.runningInfo.getAddress().toString()), e1);
                                }
                                AbstractEventParser.this.logger.error("disconnect address {} has an error, retrying., caused by ", (Object)AbstractEventParser.this.runningInfo.getAddress().toString(), (Object)e1);
                            }
                        }
                    }
                    AbstractEventParser.this.eventSink.interrupt();
                    AbstractEventParser.this.transactionBuffer.reset();
                    AbstractEventParser.this.binlogParser.reset();
                    if (!AbstractEventParser.this.running) continue;
                    try {
                        Thread.sleep(10000 + RandomUtils.nextInt((int)10000));
                    }
                    catch (InterruptedException interruptedException) {}
                }
                MDC.remove((String)"destination");
            }
        });
        this.parseThread.setUncaughtExceptionHandler(this.handler);
        this.parseThread.setName(String.format("destination = %s , address = %s , EventParser", this.destination, this.runningInfo == null ? null : this.runningInfo.getAddress().toString()));
        this.parseThread.start();
    }

    public void stop() {
        super.stop();
        this.stopHeartBeat();
        this.parseThread.interrupt();
        this.eventSink.interrupt();
        try {
            this.parseThread.join();
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        if (this.binlogParser.isStart()) {
            this.binlogParser.stop();
        }
        if (this.transactionBuffer.isStart()) {
            this.transactionBuffer.stop();
        }
    }

    protected boolean consumeTheEventAndProfilingIfNecessary(List<CanalEntry.Entry> entrys) throws CanalSinkException, InterruptedException {
        long startTs = -1L;
        boolean enabled = this.getProfilingEnabled();
        if (enabled) {
            startTs = System.currentTimeMillis();
        }
        boolean result = this.eventSink.sink(entrys, this.runningInfo == null ? null : this.runningInfo.getAddress(), this.destination);
        if (enabled) {
            this.processingInterval = System.currentTimeMillis() - startTs;
        }
        if (this.consumedEventCount.incrementAndGet() < 0L) {
            this.consumedEventCount.set(0L);
        }
        return result;
    }

    protected CanalEntry.Entry parseAndProfilingIfNecessary(EVENT bod) throws Exception {
        long startTs = -1L;
        boolean enabled = this.getProfilingEnabled();
        if (enabled) {
            startTs = System.currentTimeMillis();
        }
        CanalEntry.Entry event = this.binlogParser.parse(bod);
        if (enabled) {
            this.parsingInterval = System.currentTimeMillis() - startTs;
        }
        if (this.parsedEventCount.incrementAndGet() < 0L) {
            this.parsedEventCount.set(0L);
        }
        return event;
    }

    public Boolean getProfilingEnabled() {
        return this.profilingEnabled.get();
    }

    protected LogPosition buildLastTransactionPosition(List<CanalEntry.Entry> entries) {
        for (int i = entries.size() - 1; i > 0; --i) {
            CanalEntry.Entry entry = entries.get(i);
            if (entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONEND) continue;
            return this.buildLastPosition(entry);
        }
        return null;
    }

    protected LogPosition buildLastPosition(CanalEntry.Entry entry) {
        LogPosition logPosition = new LogPosition();
        EntryPosition position = new EntryPosition();
        position.setJournalName(entry.getHeader().getLogfileName());
        position.setPosition(Long.valueOf(entry.getHeader().getLogfileOffset()));
        position.setTimestamp(Long.valueOf(entry.getHeader().getExecuteTime()));
        position.setServerId(Long.valueOf(entry.getHeader().getServerId()));
        logPosition.setPostion(position);
        LogIdentity identity = new LogIdentity(this.runningInfo.getAddress(), Long.valueOf(-1L));
        logPosition.setIdentity(identity);
        return logPosition;
    }

    protected void processSinkError(Throwable e, LogPosition lastPosition, String startBinlogFile, long startPosition) {
        if (lastPosition != null) {
            this.logger.warn(String.format("ERROR ## parse this event has an error , last position : [%s]", lastPosition.getPostion()), e);
        } else {
            this.logger.warn(String.format("ERROR ## parse this event has an error , last position : [%s,%s]", startBinlogFile, startPosition), e);
        }
    }

    protected void processDumpError(Throwable e) {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     * Converted monitor instructions to comments
     * Lifted jumps to return sites
     */
    protected void startHeartBeat(ErosaConnection connection) {
        this.lastEntryTime = 0L;
        if (this.timer == null) {
            String name = String.format("destination = %s , address = %s , HeartBeatTimeTask", this.destination, this.runningInfo == null ? null : this.runningInfo.getAddress().toString());
            Class<MysqlEventParser> clazz = MysqlEventParser.class;
            // MONITORENTER : com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser.class
            if (this.timer == null) {
                this.timer = new Timer(name, true);
            }
            // MONITOREXIT : clazz
        }
        if (this.heartBeatTimerTask != null) return;
        this.heartBeatTimerTask = this.buildHeartBeatTimeTask(connection);
        Integer interval = this.detectingIntervalInSeconds;
        this.timer.schedule(this.heartBeatTimerTask, (long)interval.intValue() * 1000L, (long)interval.intValue() * 1000L);
        this.logger.info("start heart beat.... ");
    }

    protected TimerTask buildHeartBeatTimeTask(ErosaConnection connection) {
        return new TimerTask(){

            @Override
            public void run() {
                try {
                    long now;
                    long inteval;
                    if ((AbstractEventParser.this.exception == null || AbstractEventParser.this.lastEntryTime > 0L) && (inteval = ((now = System.currentTimeMillis()) - AbstractEventParser.this.lastEntryTime) / 1000L) >= (long)AbstractEventParser.this.detectingIntervalInSeconds.intValue()) {
                        CanalEntry.Header.Builder headerBuilder = CanalEntry.Header.newBuilder();
                        headerBuilder.setExecuteTime(now);
                        CanalEntry.Entry.Builder entryBuilder = CanalEntry.Entry.newBuilder();
                        entryBuilder.setHeader(headerBuilder.build());
                        entryBuilder.setEntryType(CanalEntry.EntryType.HEARTBEAT);
                        CanalEntry.Entry entry = entryBuilder.build();
                        AbstractEventParser.this.consumeTheEventAndProfilingIfNecessary(Arrays.asList(entry));
                    }
                }
                catch (Throwable e) {
                    AbstractEventParser.this.logger.warn("heartBeat run failed " + ExceptionUtils.getStackTrace((Throwable)e));
                }
            }
        };
    }

    protected void stopHeartBeat() {
        this.lastEntryTime = 0L;
        if (this.timer != null) {
            this.timer.cancel();
            this.timer = null;
        }
        this.heartBeatTimerTask = null;
    }

    public void setEventFilter(CanalEventFilter eventFilter) {
        this.eventFilter = eventFilter;
    }

    public void setEventBlackFilter(CanalEventFilter eventBlackFilter) {
        this.eventBlackFilter = eventBlackFilter;
    }

    public Long getParsedEventCount() {
        return this.parsedEventCount.get();
    }

    public Long getConsumedEventCount() {
        return this.consumedEventCount.get();
    }

    public void setProfilingEnabled(boolean profilingEnabled) {
        this.profilingEnabled = new AtomicBoolean(profilingEnabled);
    }

    public long getParsingInterval() {
        return this.parsingInterval;
    }

    public long getProcessingInterval() {
        return this.processingInterval;
    }

    public void setEventSink(CanalEventSink<List<CanalEntry.Entry>> eventSink) {
        this.eventSink = eventSink;
    }

    public void setDestination(String destination) {
        this.destination = destination;
    }

    public void setBinlogParser(BinlogParser binlogParser) {
        this.binlogParser = binlogParser;
    }

    public BinlogParser getBinlogParser() {
        return this.binlogParser;
    }

    public void setAlarmHandler(CanalAlarmHandler alarmHandler) {
        this.alarmHandler = alarmHandler;
    }

    public CanalAlarmHandler getAlarmHandler() {
        return this.alarmHandler;
    }

    public void setLogPositionManager(CanalLogPositionManager logPositionManager) {
        this.logPositionManager = logPositionManager;
    }

    public void setTransactionSize(int transactionSize) {
        this.transactionSize = transactionSize;
    }

    public CanalLogPositionManager getLogPositionManager() {
        return this.logPositionManager;
    }

    public void setDetectingEnable(boolean detectingEnable) {
        this.detectingEnable = detectingEnable;
    }

    public void setDetectingIntervalInSeconds(Integer detectingIntervalInSeconds) {
        this.detectingIntervalInSeconds = detectingIntervalInSeconds;
    }

    public Throwable getException() {
        return this.exception;
    }
}

