/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.otter.canal.instance.manager;

import com.alibaba.otter.canal.common.CanalException;
import com.alibaba.otter.canal.common.alarm.CanalAlarmHandler;
import com.alibaba.otter.canal.common.alarm.LogAlarmHandler;
import com.alibaba.otter.canal.common.utils.JsonUtils;
import com.alibaba.otter.canal.common.zookeeper.ZkClientx;
import com.alibaba.otter.canal.filter.CanalEventFilter;
import com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter;
import com.alibaba.otter.canal.instance.core.AbstractCanalInstance;
import com.alibaba.otter.canal.instance.manager.model.Canal;
import com.alibaba.otter.canal.instance.manager.model.CanalParameter;
import com.alibaba.otter.canal.meta.MemoryMetaManager;
import com.alibaba.otter.canal.meta.PeriodMixedMetaManager;
import com.alibaba.otter.canal.meta.ZooKeeperMetaManager;
import com.alibaba.otter.canal.parse.CanalEventParser;
import com.alibaba.otter.canal.parse.ha.CanalHAController;
import com.alibaba.otter.canal.parse.ha.HeartBeatHAController;
import com.alibaba.otter.canal.parse.inbound.AbstractEventParser;
import com.alibaba.otter.canal.parse.inbound.group.GroupEventParser;
import com.alibaba.otter.canal.parse.inbound.mysql.LocalBinlogEventParser;
import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
import com.alibaba.otter.canal.parse.index.FailbackLogPositionManager;
import com.alibaba.otter.canal.parse.index.MemoryLogPositionManager;
import com.alibaba.otter.canal.parse.index.MetaLogPositionManager;
import com.alibaba.otter.canal.parse.index.PeriodMixedLogPositionManager;
import com.alibaba.otter.canal.parse.index.ZooKeeperLogPositionManager;
import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
import com.alibaba.otter.canal.protocol.position.EntryPosition;
import com.alibaba.otter.canal.sink.entry.EntryEventSink;
import com.alibaba.otter.canal.sink.entry.group.GroupEventSink;
import com.alibaba.otter.canal.store.AbstractCanalStoreScavenge;
import com.alibaba.otter.canal.store.memory.MemoryEventStoreWithBuffer;
import com.alibaba.otter.canal.store.model.BatchMode;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;

public class CanalInstanceWithManager
extends AbstractCanalInstance {
    private static final Logger logger = LoggerFactory.getLogger(CanalInstanceWithManager.class);
    protected String filter;
    protected CanalParameter parameters;

    public CanalInstanceWithManager(Canal canal, String filter) {
        this.parameters = canal.getCanalParameter();
        this.canalId = canal.getId();
        this.destination = canal.getName();
        this.filter = filter;
        logger.info("init CanalInstance for {}-{} with parameters:{}", new Object[]{this.canalId, this.destination, this.parameters});
        this.initAlarmHandler();
        this.initMetaManager();
        this.initEventStore();
        this.initEventSink();
        this.initEventParser();
        if (!this.alarmHandler.isStart()) {
            this.alarmHandler.start();
        }
        if (!this.metaManager.isStart()) {
            this.metaManager.start();
        }
        logger.info("init successful....");
    }

    public void start() {
        logger.info("start CannalInstance for {}-{} with parameters:{}", new Object[]{this.canalId, this.destination, this.parameters});
        super.start();
    }

    protected void initAlarmHandler() {
        logger.info("init alarmHandler begin...");
        this.alarmHandler = new LogAlarmHandler();
        logger.info("init alarmHandler end! \n\t load CanalAlarmHandler:{} ", (Object)this.alarmHandler.getClass().getName());
    }

    protected void initMetaManager() {
        logger.info("init metaManager begin...");
        CanalParameter.MetaMode mode = this.parameters.getMetaMode();
        if (mode.isMemory()) {
            this.metaManager = new MemoryMetaManager();
        } else if (mode.isZookeeper()) {
            this.metaManager = new ZooKeeperMetaManager();
            ((ZooKeeperMetaManager)this.metaManager).setZkClientx(this.getZkclientx());
        } else if (mode.isMixed()) {
            this.metaManager = new PeriodMixedMetaManager();
            ZooKeeperMetaManager zooKeeperMetaManager = new ZooKeeperMetaManager();
            zooKeeperMetaManager.setZkClientx(this.getZkclientx());
            ((PeriodMixedMetaManager)this.metaManager).setZooKeeperMetaManager(zooKeeperMetaManager);
        } else {
            throw new CanalException("unsupport MetaMode for " + (Object)((Object)mode));
        }
        logger.info("init metaManager end! \n\t load CanalMetaManager:{} ", (Object)this.metaManager.getClass().getName());
    }

    protected void initEventStore() {
        logger.info("init eventStore begin...");
        CanalParameter.StorageMode mode = this.parameters.getStorageMode();
        if (!mode.isMemory()) {
            if (mode.isFile()) {
                throw new CanalException("unsupport MetaMode for " + (Object)((Object)mode));
            }
            if (mode.isMixed()) {
                throw new CanalException("unsupport MetaMode for " + (Object)((Object)mode));
            }
            throw new CanalException("unsupport MetaMode for " + (Object)((Object)mode));
        }
        MemoryEventStoreWithBuffer memoryEventStore = new MemoryEventStoreWithBuffer();
        memoryEventStore.setBufferSize(this.parameters.getMemoryStorageBufferSize().intValue());
        memoryEventStore.setBufferMemUnit(this.parameters.getMemoryStorageBufferMemUnit().intValue());
        memoryEventStore.setBatchMode(BatchMode.valueOf((String)this.parameters.getStorageBatchMode().name()));
        memoryEventStore.setDdlIsolation(this.parameters.getDdlIsolation().booleanValue());
        this.eventStore = memoryEventStore;
        if (this.eventStore instanceof AbstractCanalStoreScavenge) {
            CanalParameter.StorageScavengeMode scavengeMode = this.parameters.getStorageScavengeMode();
            AbstractCanalStoreScavenge eventScavengeStore = (AbstractCanalStoreScavenge)this.eventStore;
            eventScavengeStore.setDestination(this.destination);
            eventScavengeStore.setCanalMetaManager(this.metaManager);
            eventScavengeStore.setOnAck(scavengeMode.isOnAck());
            eventScavengeStore.setOnFull(scavengeMode.isOnFull());
            eventScavengeStore.setOnSchedule(scavengeMode.isOnSchedule());
            if (scavengeMode.isOnSchedule()) {
                eventScavengeStore.setScavengeSchedule(this.parameters.getScavengeSchdule());
            }
        }
        logger.info("init eventStore end! \n\t load CanalEventStore:{}", (Object)this.eventStore.getClass().getName());
    }

    protected void initEventSink() {
        logger.info("init eventSink begin...");
        int groupSize = this.getGroupSize();
        this.eventSink = groupSize <= 1 ? new EntryEventSink() : new GroupEventSink(groupSize);
        if (this.eventSink instanceof EntryEventSink) {
            ((EntryEventSink)this.eventSink).setFilterTransactionEntry(false);
            ((EntryEventSink)this.eventSink).setEventStore(this.getEventStore());
        }
        logger.info("init eventSink end! \n\t load CanalEventSink:{}", (Object)this.eventSink.getClass().getName());
    }

    protected void initEventParser() {
        logger.info("init eventParser begin...");
        CanalParameter.SourcingType type = this.parameters.getSourcingType();
        List<List<CanalParameter.DataSourcing>> groupDbAddresses = this.parameters.getGroupDbAddresses();
        if (!CollectionUtils.isEmpty(groupDbAddresses)) {
            int size = groupDbAddresses.get(0).size();
            ArrayList<CanalEventParser> eventParsers = new ArrayList<CanalEventParser>();
            for (int i = 0; i < size; ++i) {
                ArrayList<InetSocketAddress> dbAddress = new ArrayList<InetSocketAddress>();
                CanalParameter.SourcingType lastType = null;
                for (List<CanalParameter.DataSourcing> groupDbAddress : groupDbAddresses) {
                    if (lastType != null && !lastType.equals((Object)groupDbAddress.get(i).getType())) {
                        throw new CanalException(String.format("master/slave Sourcing type is unmatch. %s vs %s", new Object[]{lastType, groupDbAddress.get(i).getType()}));
                    }
                    lastType = groupDbAddress.get(i).getType();
                    dbAddress.add(groupDbAddress.get(i).getDbAddress());
                }
                eventParsers.add(this.doInitEventParser(lastType, dbAddress));
            }
            if (eventParsers.size() > 1) {
                GroupEventParser groupEventParser = new GroupEventParser();
                groupEventParser.setEventParsers(eventParsers);
                this.eventParser = groupEventParser;
            } else {
                this.eventParser = (CanalEventParser)eventParsers.get(0);
            }
        } else {
            this.eventParser = this.doInitEventParser(type, new ArrayList<InetSocketAddress>());
        }
        logger.info("init eventParser end! \n\t load CanalEventParser:{}", (Object)this.eventParser.getClass().getName());
    }

    private CanalEventParser doInitEventParser(CanalParameter.SourcingType type, List<InetSocketAddress> dbAddresses) {
        LocalBinlogEventParser eventParser;
        MysqlEventParser mysqlEventParser;
        if (type.isMysql()) {
            mysqlEventParser = new MysqlEventParser();
            mysqlEventParser.setDestination(this.destination);
            mysqlEventParser.setConnectionCharset(Charset.forName(this.parameters.getConnectionCharset()));
            mysqlEventParser.setConnectionCharsetNumber(this.parameters.getConnectionCharsetNumber().byteValue());
            mysqlEventParser.setDefaultConnectionTimeoutInSeconds(this.parameters.getDefaultConnectionTimeoutInSeconds().intValue());
            mysqlEventParser.setSendBufferSize(this.parameters.getSendBufferSize().intValue());
            mysqlEventParser.setReceiveBufferSize(this.parameters.getReceiveBufferSize().intValue());
            mysqlEventParser.setDetectingEnable(this.parameters.getDetectingEnable().booleanValue());
            mysqlEventParser.setDetectingSQL(this.parameters.getDetectingSQL());
            mysqlEventParser.setDetectingIntervalInSeconds(this.parameters.getDetectingIntervalInSeconds());
            mysqlEventParser.setSlaveId(this.parameters.getSlaveId().longValue());
            if (!CollectionUtils.isEmpty(dbAddresses)) {
                mysqlEventParser.setMasterInfo(new AuthenticationInfo(dbAddresses.get(0), this.parameters.getDbUsername(), this.parameters.getDbPassword(), this.parameters.getDefaultDatabaseName()));
                if (dbAddresses.size() > 1) {
                    mysqlEventParser.setStandbyInfo(new AuthenticationInfo(dbAddresses.get(1), this.parameters.getDbUsername(), this.parameters.getDbPassword(), this.parameters.getDefaultDatabaseName()));
                }
            }
            if (!CollectionUtils.isEmpty(this.parameters.getPositions())) {
                EntryPosition masterPosition = (EntryPosition)JsonUtils.unmarshalFromString((String)this.parameters.getPositions().get(0), EntryPosition.class);
                mysqlEventParser.setMasterPosition(masterPosition);
                if (this.parameters.getPositions().size() > 1) {
                    EntryPosition standbyPosition = (EntryPosition)JsonUtils.unmarshalFromString((String)this.parameters.getPositions().get(0), EntryPosition.class);
                    mysqlEventParser.setStandbyPosition(standbyPosition);
                }
            }
            mysqlEventParser.setFallbackIntervalInSeconds(this.parameters.getFallbackIntervalInSeconds().intValue());
            mysqlEventParser.setProfilingEnabled(false);
            mysqlEventParser.setFilterTableError(this.parameters.getFilterTableError().booleanValue());
            eventParser = mysqlEventParser;
        } else if (type.isLocalBinlog()) {
            LocalBinlogEventParser localBinlogEventParser = new LocalBinlogEventParser();
            localBinlogEventParser.setDestination(this.destination);
            localBinlogEventParser.setBufferSize(this.parameters.getReceiveBufferSize().intValue());
            localBinlogEventParser.setConnectionCharset(Charset.forName(this.parameters.getConnectionCharset()));
            localBinlogEventParser.setConnectionCharsetNumber(this.parameters.getConnectionCharsetNumber().byteValue());
            localBinlogEventParser.setDirectory(this.parameters.getLocalBinlogDirectory());
            localBinlogEventParser.setProfilingEnabled(false);
            localBinlogEventParser.setDetectingEnable(this.parameters.getDetectingEnable().booleanValue());
            localBinlogEventParser.setDetectingIntervalInSeconds(this.parameters.getDetectingIntervalInSeconds());
            localBinlogEventParser.setFilterTableError(this.parameters.getFilterTableError().booleanValue());
            if (!CollectionUtils.isEmpty(dbAddresses)) {
                localBinlogEventParser.setMasterInfo(new AuthenticationInfo(dbAddresses.get(0), this.parameters.getDbUsername(), this.parameters.getDbPassword(), this.parameters.getDefaultDatabaseName()));
            }
            eventParser = localBinlogEventParser;
        } else {
            if (type.isOracle()) {
                throw new CanalException("unsupport SourcingType for " + (Object)((Object)type));
            }
            throw new CanalException("unsupport SourcingType for " + (Object)((Object)type));
        }
        if (eventParser instanceof AbstractEventParser) {
            AviaterRegexFilter aviaterFilter;
            AbstractEventParser abstractEventParser = (AbstractEventParser)eventParser;
            abstractEventParser.setTransactionSize(this.parameters.getTransactionSize().intValue());
            abstractEventParser.setLogPositionManager(this.initLogPositionManager());
            abstractEventParser.setAlarmHandler(this.getAlarmHandler());
            abstractEventParser.setEventSink(this.getEventSink());
            if (StringUtils.isNotEmpty((String)this.filter)) {
                aviaterFilter = new AviaterRegexFilter(this.filter);
                abstractEventParser.setEventFilter((CanalEventFilter)aviaterFilter);
            }
            if (StringUtils.isNotEmpty((String)this.parameters.getBlackFilter())) {
                aviaterFilter = new AviaterRegexFilter(this.parameters.getBlackFilter());
                abstractEventParser.setEventBlackFilter((CanalEventFilter)aviaterFilter);
            }
        }
        if (eventParser instanceof MysqlEventParser) {
            mysqlEventParser = (MysqlEventParser)eventParser;
            CanalHAController haController = this.initHaController();
            mysqlEventParser.setHaController(haController);
        }
        return eventParser;
    }

    protected CanalHAController initHaController() {
        logger.info("init haController begin...");
        CanalParameter.HAMode haMode = this.parameters.getHaMode();
        HeartBeatHAController haController = null;
        if (!haMode.isHeartBeat()) {
            throw new CanalException("unsupport HAMode for " + (Object)((Object)haMode));
        }
        haController = new HeartBeatHAController();
        haController.setDetectingRetryTimes(this.parameters.getDetectingRetryTimes().intValue());
        haController.setSwitchEnable(this.parameters.getHeartbeatHaEnable().booleanValue());
        logger.info("init haController end! \n\t load CanalHAController:{}", (Object)haController.getClass().getName());
        return haController;
    }

    protected CanalLogPositionManager initLogPositionManager() {
        logger.info("init logPositionPersistManager begin...");
        CanalParameter.IndexMode indexMode = this.parameters.getIndexMode();
        MemoryLogPositionManager logPositionManager = null;
        if (indexMode.isMemory()) {
            logPositionManager = new MemoryLogPositionManager();
        } else if (indexMode.isZookeeper()) {
            logPositionManager = new ZooKeeperLogPositionManager();
            ((ZooKeeperLogPositionManager)logPositionManager).setZkClientx(this.getZkclientx());
        } else if (indexMode.isMixed()) {
            logPositionManager = new PeriodMixedLogPositionManager();
            ZooKeeperLogPositionManager zooKeeperLogPositionManager = new ZooKeeperLogPositionManager();
            zooKeeperLogPositionManager.setZkClientx(this.getZkclientx());
            ((PeriodMixedLogPositionManager)logPositionManager).setZooKeeperLogPositionManager(zooKeeperLogPositionManager);
        } else if (indexMode.isMeta()) {
            logPositionManager = new MetaLogPositionManager();
            ((MetaLogPositionManager)logPositionManager).setMetaManager(this.metaManager);
        } else if (indexMode.isMemoryMetaFailback()) {
            MemoryLogPositionManager primaryLogPositionManager = new MemoryLogPositionManager();
            MetaLogPositionManager failbackLogPositionManager = new MetaLogPositionManager();
            failbackLogPositionManager.setMetaManager(this.metaManager);
            logPositionManager = new FailbackLogPositionManager();
            ((FailbackLogPositionManager)logPositionManager).setPrimary((CanalLogPositionManager)primaryLogPositionManager);
            ((FailbackLogPositionManager)logPositionManager).setFailback((CanalLogPositionManager)failbackLogPositionManager);
        } else {
            throw new CanalException("unsupport indexMode for " + (Object)((Object)indexMode));
        }
        logger.info("init logPositionManager end! \n\t load CanalLogPositionManager:{}", (Object)logPositionManager.getClass().getName());
        return logPositionManager;
    }

    protected void startEventParserInternal(CanalEventParser eventParser, boolean isGroup) {
        if (eventParser instanceof AbstractEventParser) {
            AbstractEventParser abstractEventParser = (AbstractEventParser)eventParser;
            abstractEventParser.setAlarmHandler(this.getAlarmHandler());
        }
        super.startEventParserInternal(eventParser, isGroup);
    }

    private int getGroupSize() {
        List<List<CanalParameter.DataSourcing>> groupDbAddresses = this.parameters.getGroupDbAddresses();
        if (!CollectionUtils.isEmpty(groupDbAddresses)) {
            return groupDbAddresses.get(0).size();
        }
        return 1;
    }

    private synchronized ZkClientx getZkclientx() {
        ArrayList<String> zkClusters = new ArrayList<String>(this.parameters.getZkClusters());
        Collections.sort(zkClusters);
        return ZkClientx.getZkClient((String)StringUtils.join(zkClusters, (String)";"));
    }

    public void setAlarmHandler(CanalAlarmHandler alarmHandler) {
        this.alarmHandler = alarmHandler;
    }
}

