package com.alibaba.otter.canal.instance.core;

import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
import com.alibaba.otter.canal.common.alarm.CanalAlarmHandler;
import com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter;
import com.alibaba.otter.canal.meta.CanalMetaManager;
import com.alibaba.otter.canal.parse.CanalEventParser;
import com.alibaba.otter.canal.parse.ha.CanalHAController;
import com.alibaba.otter.canal.parse.ha.HeartBeatHAController;
import com.alibaba.otter.canal.parse.inbound.AbstractEventParser;
import com.alibaba.otter.canal.parse.inbound.group.GroupEventParser;
import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.ClientIdentity;
import com.alibaba.otter.canal.sink.CanalEventSink;
import com.alibaba.otter.canal.store.CanalEventStore;
import com.alibaba.otter.canal.store.model.Event;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/otter/canal/instance/core/AbstractCanalInstance.class */
public class AbstractCanalInstance extends AbstractCanalLifeCycle implements CanalInstance {
    private static final Logger logger = LoggerFactory.getLogger(AbstractCanalInstance.class);
    protected Long canalId;
    protected String destination;
    protected CanalEventStore<Event> eventStore;
    protected CanalEventParser eventParser;
    protected CanalEventSink<List<CanalEntry.Entry>> eventSink;
    protected CanalMetaManager metaManager;
    protected CanalAlarmHandler alarmHandler;

    @Override // com.alibaba.otter.canal.instance.core.CanalInstance
    public boolean subscribeChange(ClientIdentity clientIdentity) {
        if (!StringUtils.isNotEmpty(clientIdentity.getFilter())) {
            return true;
        }
        logger.info("subscribe filter change to " + clientIdentity.getFilter());
        AviaterRegexFilter aviaterRegexFilter = new AviaterRegexFilter(clientIdentity.getFilter());
        if (!(this.eventParser instanceof GroupEventParser)) {
            this.eventParser.setEventFilter(aviaterRegexFilter);
            return true;
        }
        Iterator it = this.eventParser.getEventParsers().iterator();
        while (it.hasNext()) {
            ((CanalEventParser) it.next()).setEventFilter(aviaterRegexFilter);
        }
        return true;
    }

    public void start() {
        super.start();
        if (!this.metaManager.isStart()) {
            this.metaManager.start();
        }
        if (!this.alarmHandler.isStart()) {
            this.alarmHandler.start();
        }
        if (!this.eventStore.isStart()) {
            this.eventStore.start();
        }
        if (!this.eventSink.isStart()) {
            this.eventSink.start();
        }
        if (!this.eventParser.isStart()) {
            beforeStartEventParser(this.eventParser);
            this.eventParser.start();
            afterStartEventParser(this.eventParser);
        }
        logger.info("start successful....");
    }

    public void stop() {
        super.stop();
        logger.info("stop CannalInstance for {}-{} ", new Object[]{this.canalId, this.destination});
        if (this.eventParser.isStart()) {
            beforeStopEventParser(this.eventParser);
            this.eventParser.stop();
            afterStopEventParser(this.eventParser);
        }
        if (this.eventSink.isStart()) {
            this.eventSink.stop();
        }
        if (this.eventStore.isStart()) {
            this.eventStore.stop();
        }
        if (this.metaManager.isStart()) {
            this.metaManager.stop();
        }
        if (this.alarmHandler.isStart()) {
            this.alarmHandler.stop();
        }
        logger.info("stop successful....");
    }

    protected void beforeStartEventParser(CanalEventParser canalEventParser) {
        if (!(canalEventParser instanceof GroupEventParser)) {
            startEventParserInternal(canalEventParser, false);
            return;
        }
        Iterator it = ((GroupEventParser) canalEventParser).getEventParsers().iterator();
        while (it.hasNext()) {
            startEventParserInternal((CanalEventParser) it.next(), true);
        }
    }

    protected void afterStartEventParser(CanalEventParser canalEventParser) {
        Iterator it = this.metaManager.listAllSubscribeInfo(this.destination).iterator();
        while (it.hasNext()) {
            subscribeChange((ClientIdentity) it.next());
        }
    }

    protected void beforeStopEventParser(CanalEventParser canalEventParser) {
    }

    protected void afterStopEventParser(CanalEventParser canalEventParser) {
        if (!(canalEventParser instanceof GroupEventParser)) {
            stopEventParserInternal(canalEventParser);
            return;
        }
        Iterator it = ((GroupEventParser) canalEventParser).getEventParsers().iterator();
        while (it.hasNext()) {
            stopEventParserInternal((CanalEventParser) it.next());
        }
    }

    protected void startEventParserInternal(CanalEventParser canalEventParser, boolean z) {
        if (canalEventParser instanceof AbstractEventParser) {
            CanalLogPositionManager logPositionManager = ((AbstractEventParser) canalEventParser).getLogPositionManager();
            if (!logPositionManager.isStart()) {
                logPositionManager.start();
            }
        }
        if (canalEventParser instanceof MysqlEventParser) {
            MysqlEventParser mysqlEventParser = (MysqlEventParser) canalEventParser;
            HeartBeatHAController haController = mysqlEventParser.getHaController();
            if (haController instanceof HeartBeatHAController) {
                haController.setCanalHASwitchable(mysqlEventParser);
            }
            if (haController.isStart()) {
                return;
            }
            haController.start();
        }
    }

    protected void stopEventParserInternal(CanalEventParser canalEventParser) {
        if (canalEventParser instanceof AbstractEventParser) {
            CanalLogPositionManager logPositionManager = ((AbstractEventParser) canalEventParser).getLogPositionManager();
            if (logPositionManager.isStart()) {
                logPositionManager.stop();
            }
        }
        if (canalEventParser instanceof MysqlEventParser) {
            CanalHAController haController = ((MysqlEventParser) canalEventParser).getHaController();
            if (haController.isStart()) {
                haController.stop();
            }
        }
    }

    @Override // com.alibaba.otter.canal.instance.core.CanalInstance
    public String getDestination() {
        return this.destination;
    }

    @Override // com.alibaba.otter.canal.instance.core.CanalInstance
    public CanalEventParser getEventParser() {
        return this.eventParser;
    }

    @Override // com.alibaba.otter.canal.instance.core.CanalInstance
    public CanalEventSink getEventSink() {
        return this.eventSink;
    }

    @Override // com.alibaba.otter.canal.instance.core.CanalInstance
    public CanalEventStore getEventStore() {
        return this.eventStore;
    }

    @Override // com.alibaba.otter.canal.instance.core.CanalInstance
    public CanalMetaManager getMetaManager() {
        return this.metaManager;
    }

    @Override // com.alibaba.otter.canal.instance.core.CanalInstance
    public CanalAlarmHandler getAlarmHandler() {
        return this.alarmHandler;
    }
}
