package org.wso2.extension.siddhi.io.cdc.source;

import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.log4j.Logger;
import org.wso2.extension.siddhi.io.cdc.util.CDCSourceConstants;
import org.wso2.extension.siddhi.io.cdc.util.CDCSourceUtil;
import org.wso2.siddhi.annotation.Example;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.annotation.Parameter;
import org.wso2.siddhi.annotation.util.DataType;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.exception.ConnectionUnavailableException;
import org.wso2.siddhi.core.exception.SiddhiAppCreationException;
import org.wso2.siddhi.core.stream.input.source.Source;
import org.wso2.siddhi.core.stream.input.source.SourceEventListener;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.core.util.transport.OptionHolder;
import org.wso2.siddhi.query.api.exception.SiddhiAppValidationException;

@Extension(name = "cdc", namespace = "source", description = "The CDC source receives events when a specified MySQL table's change event (INSERT, UPDATE, DELETE) is triggered. The events are received in key-value map format.\nThe following are key values of the map of a CDC change event and their descriptions.\n\tFor insert: Keys will be specified table's columns\n\tFor delete: Keys will be 'before_' followed by specified table's columns. Eg: before_X\n\tFor update: Keys will be specified table's columns and 'before_' followed by specified table's columns.", parameters = {@Parameter(name = "url", description = "Connection url to the database.\nuse format: jdbc:mysql://<host>:<port>/<database_name> ", type = {DataType.STRING}), @Parameter(name = CDCSourceConstants.USERNAME, description = "Username of a user with SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT privileges on Change Data Capturing table.", type = {DataType.STRING}), @Parameter(name = CDCSourceConstants.PASSWORD, description = "Password for the above user.", type = {DataType.STRING}), @Parameter(name = CDCSourceConstants.TABLE_NAME, description = "Name of the table which needs to be monitored for data changes.", type = {DataType.STRING}), @Parameter(name = CDCSourceConstants.OPERATION, description = "Interested change event operation. 'insert', 'update' or 'delete'. \nNot case sensitive.", type = {DataType.STRING}), @Parameter(name = CDCSourceConstants.CONNECTOR_PROPERTIES, description = "Debezium connector specified properties as a comma separated string. \nThis properties will have more priority over the parameters.", type = {DataType.STRING}, optional = true, defaultValue = "Empty_String"), @Parameter(name = CDCSourceConstants.DATABASE_SERVER_ID, description = "For MySQL, a unique integer between 1 to 2^32 as the ID, This is used when joining MySQL database cluster to read binlog", type = {DataType.STRING}, optional = true, defaultValue = "Random integer between 5400 and 6400"), @Parameter(name = CDCSourceConstants.DATABASE_SERVER_NAME, description = "Logical name that identifies and provides a namespace for the particular database server", defaultValue = "{host}_{port}", optional = true, type = {DataType.STRING})}, examples = {@Example(syntax = "@source(type = 'cdc' , url = 'jdbc:mysql://localhost:3306/SimpleDB', \nusername = 'cdcuser', password = 'pswd4cdc', \ntable.name = 'students', operation = 'insert', \n@map(type='keyvalue', @attributes(id = 'id', name = 'name')))\ndefine stream inputStream (id string, name string);", description = "In this example, the cdc source starts listening to the row insertions  on students table with columns name and id which is under MySQL SimpleDB database that can be accessed with the given url"), @Example(syntax = "@source(type = 'cdc' , url = 'jdbc:mysql://localhost:3306/SimpleDB', \nusername = 'cdcuser', password = 'pswd4cdc', \ntable.name = 'students', operation = 'update', \n@map(type='keyvalue', @attributes(id = 'id', name = 'name', \nbefore_id = 'before_id', before_name = 'before_name')))\ndefine stream inputStream (before_id string, id string, \nbefore_name string, , name string);", description = "In this example, the cdc source starts listening to the row updates on students table which is under MySQL SimpleDB database that can be accessed with the given url"), @Example(syntax = "@source(type = 'cdc' , url = 'jdbc:mysql://localhost:3306/SimpleDB', \nusername = 'cdcuser', password = 'pswd4cdc', \ntable.name = 'students', operation = 'delete', \n@map(type='keyvalue', @attributes(before_id = 'before_id', before_name = 'before_name')))\ndefine stream inputStream (before_id string, before_name string);", description = "In this example, the cdc source starts listening to the row deletions on students table which is under MySQL SimpleDB database that can be accessed with the given url")})
/* loaded from: input_file:org/wso2/extension/siddhi/io/cdc/source/CDCSource.class */
public class CDCSource extends Source {
    private static final Logger log = Logger.getLogger(CDCSource.class);
    private String operation;
    private ChangeDataCapture changeDataCapture;
    private String historyFileDirectory;
    private String carbonHome;
    private final ExecutorService executorService = Executors.newSingleThreadExecutor();
    private Map<byte[], byte[]> offsetData = new HashMap();
    private CDCSourceObjectKeeper cdcSourceObjectKeeper = CDCSourceObjectKeeper.getCdcSourceObjectKeeper();

    public void init(SourceEventListener sourceEventListener, OptionHolder optionHolder, String[] strArr, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        String name = siddhiAppContext.getName();
        String id = sourceEventListener.getStreamDefinition().getId();
        String value = optionHolder.validateAndGetOption("url").getValue();
        String value2 = optionHolder.validateAndGetOption(CDCSourceConstants.TABLE_NAME).getValue();
        String value3 = optionHolder.validateAndGetOption(CDCSourceConstants.USERNAME).getValue();
        String value4 = optionHolder.validateAndGetOption(CDCSourceConstants.PASSWORD).getValue();
        this.operation = optionHolder.validateAndGetOption(CDCSourceConstants.OPERATION).getValue();
        int parseInt = Integer.parseInt(optionHolder.validateAndGetStaticValue(CDCSourceConstants.DATABASE_SERVER_ID, Integer.toString(-1)));
        String validateAndGetStaticValue = optionHolder.validateAndGetStaticValue(CDCSourceConstants.DATABASE_SERVER_NAME, "");
        String validateAndGetStaticValue2 = optionHolder.validateAndGetStaticValue(CDCSourceConstants.CONNECTOR_PROPERTIES, "");
        this.carbonHome = CDCSourceUtil.getCarbonHome();
        this.historyFileDirectory = this.carbonHome + File.separator + "cdc" + File.separator + "history" + File.separator + name + File.separator;
        validateParameter();
        this.changeDataCapture = new ChangeDataCapture(this.operation, sourceEventListener);
        File file = new File(this.historyFileDirectory);
        if (!file.exists() && file.mkdirs() && log.isDebugEnabled()) {
            log.debug("Directory created for history file.");
        }
        try {
            this.changeDataCapture.setConfig(CDCSourceUtil.getConfigMap(value3, value4, value, value2, this.historyFileDirectory, name, id, parseInt, validateAndGetStaticValue, validateAndGetStaticValue2, hashCode()));
        } catch (WrongConfigurationException e) {
            throw new SiddhiAppCreationException("The cdc source couldn't get started because of invalid configurations. Found configurations: {username='" + value3 + "', password=******, url='" + value + "', tablename='" + value2 + "', connetorProperties='" + validateAndGetStaticValue2 + "'}", e);
        }
    }

    public Class[] getOutputEventClasses() {
        return new Class[]{Map.class};
    }

    public void connect(Source.ConnectionCallback connectionCallback) throws ConnectionUnavailableException {
        this.cdcSourceObjectKeeper.addCdcObject(this);
        this.executorService.execute(this.changeDataCapture.getEngine((z, str, th) -> {
            if (z) {
                return;
            }
            connectionCallback.onError(new ConnectionUnavailableException("Connection to the database lost.", th));
        }));
    }

    public void disconnect() {
    }

    public void destroy() {
        this.cdcSourceObjectKeeper.removeObject(hashCode());
        this.executorService.shutdown();
    }

    public void pause() {
        this.changeDataCapture.pause();
    }

    public void resume() {
        this.changeDataCapture.resume();
    }

    public Map<String, Object> currentState() {
        HashMap hashMap = new HashMap();
        hashMap.put(CDCSourceConstants.CACHE_OBJECT, this.offsetData);
        return hashMap;
    }

    public void restoreState(Map<String, Object> map) {
        this.offsetData = (HashMap) map.get(CDCSourceConstants.CACHE_OBJECT);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<byte[], byte[]> getOffsetData() {
        try {
            Thread.sleep(50L);
        } catch (InterruptedException e) {
            log.error("Offset data retrieval failed.", e);
        }
        return this.offsetData;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setOffsetData(Map<byte[], byte[]> map) {
        this.offsetData = map;
    }

    private void validateParameter() {
        if (!this.operation.equalsIgnoreCase(CDCSourceConstants.INSERT) && !this.operation.equalsIgnoreCase(CDCSourceConstants.UPDATE) && !this.operation.equalsIgnoreCase("delete")) {
            throw new SiddhiAppValidationException("Unsupported operation: '" + this.operation + "'. operation should be one of 'insert', 'update' or 'delete'");
        }
        if (this.carbonHome.isEmpty()) {
            throw new SiddhiAppValidationException("Couldn't initialize Carbon Home.");
        }
        if (this.historyFileDirectory.endsWith(File.separator)) {
            return;
        }
        this.historyFileDirectory += File.separator;
    }
}
