package org.wso2.extension.siddhi.io.cdc.source;

import java.io.File;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.log4j.Logger;
import org.wso2.extension.siddhi.io.cdc.source.listening.CDCSourceObjectKeeper;
import org.wso2.extension.siddhi.io.cdc.source.listening.ChangeDataCapture;
import org.wso2.extension.siddhi.io.cdc.source.listening.WrongConfigurationException;
import org.wso2.extension.siddhi.io.cdc.source.polling.CDCPoller;
import org.wso2.extension.siddhi.io.cdc.util.CDCSourceConstants;
import org.wso2.extension.siddhi.io.cdc.util.CDCSourceUtil;
import org.wso2.siddhi.annotation.Example;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.annotation.Parameter;
import org.wso2.siddhi.annotation.util.DataType;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.exception.ConnectionUnavailableException;
import org.wso2.siddhi.core.exception.SiddhiAppCreationException;
import org.wso2.siddhi.core.exception.SiddhiAppRuntimeException;
import org.wso2.siddhi.core.stream.input.source.Source;
import org.wso2.siddhi.core.stream.input.source.SourceEventListener;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.core.util.transport.OptionHolder;
import org.wso2.siddhi.query.api.exception.SiddhiAppValidationException;

@Extension(name = "cdc", namespace = "source", description = "The CDC source receives events when a Database table's change event (INSERT, UPDATE, DELETE) is triggered. The events are received in key-value format.\nThe following are key values of the map of a CDC change event and their descriptions.\n\tFor insert: Keys will be specified table's columns.\n\tFor delete: Keys will be 'before_' followed by specified table's columns. Eg: before_X.\n\tFor update: Keys will be specified table's columns and 'before_' followed by specified table's columns.\nFor 'polling' mode: Keys will be specified table's columns.\nSee parameter: mode for supported databases and change events.", parameters = {@Parameter(name = "url", description = "Connection url to the database.\nuse format: jdbc:mysql://<host>:<port>/<database_name> ", type = {DataType.STRING}), @Parameter(name = CDCSourceConstants.MODE, description = "Mode to capture the change data. Mode 'polling' uses a polling.column to monitor the given table. Mode 'listening' uses logs to monitor the given table.\nThe required parameters are different for each modes.\nmode 'listening' currently supports only MySQL. INSERT, UPDATE, DELETE events can be received.\nmode 'polling' supports RDBMS. INSERT, UPDATE events can be received.", type = {DataType.STRING}, defaultValue = CDCSourceConstants.MODE_LISTENING, optional = true), @Parameter(name = CDCSourceConstants.JDBC_DRIVER_NAME, description = "The driver class name for connecting the database. **Required for 'polling' mode.**", type = {DataType.STRING}, defaultValue = "<Empty_String>", optional = true), @Parameter(name = CDCSourceConstants.USERNAME, description = "Username of a user with SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT privileges on Change Data Capturing table.\nFor polling mode, a user with SELECT privileges.", type = {DataType.STRING}), @Parameter(name = CDCSourceConstants.PASSWORD, description = "Password for the above user.", type = {DataType.STRING}), @Parameter(name = CDCSourceConstants.POOL_PROPERTIES, description = "Any pool parameters for the database connection must be specified as key-value pairs.", type = {DataType.STRING}, optional = true, defaultValue = "<Empty_String>"), @Parameter(name = CDCSourceConstants.DATASOURCE_NAME, description = "Name of the wso2 datasource to connect to the database. When datasource.name is provided, the url, username and password are not needed. Has a more priority over url based connection.\nAccepted only when mode is set to 'polling'.", type = {DataType.STRING}, defaultValue = "<Empty_String>", optional = true), @Parameter(name = CDCSourceConstants.TABLE_NAME, description = "Name of the table which needs to be monitored for data changes.", type = {DataType.STRING}), @Parameter(name = CDCSourceConstants.POLLING_COLUMN, description = "Column name on which the polling is done to capture the change data. It is recommend to have a TIMESTAMP field as the polling.column in order to capture inserts and updates.\nNumeric auto incremental fields and char fields can be also used as polling.column. Note that it will only support insert change capturing and depends on how the char field's data is input.\n**Mandatory when mode is 'polling'.**", type = {DataType.STRING}, defaultValue = "<Empty_String>", optional = true), @Parameter(name = CDCSourceConstants.POLLING_INTERVAL, description = "The interval in seconds to poll the given table for changes.\nAccepted only when mode is set to 'polling'.", type = {DataType.INT}, defaultValue = "1", optional = true), @Parameter(name = CDCSourceConstants.OPERATION, description = "Interested change event operation. 'insert', 'update' or 'delete'. Required for 'listening' mode.\nNot case sensitive.", type = {DataType.STRING}), @Parameter(name = CDCSourceConstants.CONNECTOR_PROPERTIES, description = "Debezium connector specified properties as a comma separated string. \nThis properties will have more priority over the parameters. Only for 'listening' mode", type = {DataType.STRING}, optional = true, defaultValue = "Empty_String"), @Parameter(name = CDCSourceConstants.DATABASE_SERVER_ID, description = "For MySQL, a unique integer between 1 to 2^32 as the ID, This is used when joining MySQL database cluster to read binlog. Only for 'listening'mode.", type = {DataType.STRING}, optional = true, defaultValue = "Random integer between 5400 and 6400"), @Parameter(name = CDCSourceConstants.DATABASE_SERVER_NAME, description = "Logical name that identifies and provides a namespace for the particular database server. Only for 'listening' mode.", defaultValue = "{host}_{port}", optional = true, type = {DataType.STRING})}, examples = {@Example(syntax = "@source(type = 'cdc' , url = 'jdbc:mysql://localhost:3306/SimpleDB', \nusername = 'cdcuser', password = 'pswd4cdc', \ntable.name = 'students', operation = 'insert', \n@map(type='keyvalue', @attributes(id = 'id', name = 'name')))\ndefine stream inputStream (id string, name string);", description = "In this example, the cdc source starts listening to the row insertions  on students table with columns name and id which is under MySQL SimpleDB database that can be accessed with the given url"), @Example(syntax = "@source(type = 'cdc' , url = 'jdbc:mysql://localhost:3306/SimpleDB', \nusername = 'cdcuser', password = 'pswd4cdc', \ntable.name = 'students', operation = 'update', \n@map(type='keyvalue', @attributes(id = 'id', name = 'name', \nbefore_id = 'before_id', before_name = 'before_name')))\ndefine stream inputStream (before_id string, id string, \nbefore_name string , name string);", description = "In this example, the cdc source starts listening to the row updates on students table which is under MySQL SimpleDB database that can be accessed with the given url."), @Example(syntax = "@source(type = 'cdc' , url = 'jdbc:mysql://localhost:3306/SimpleDB', \nusername = 'cdcuser', password = 'pswd4cdc', \ntable.name = 'students', operation = 'delete', \n@map(type='keyvalue', @attributes(before_id = 'before_id', before_name = 'before_name')))\ndefine stream inputStream (before_id string, before_name string);", description = "In this example, the cdc source starts listening to the row deletions on students table which is under MySQL SimpleDB database that can be accessed with the given url."), @Example(syntax = "@source(type = 'cdc', mode='polling', polling.column = 'id', \njdbc.driver.name = 'com.mysql.jdbc.Driver', url = 'jdbc:mysql://localhost:3306/SimpleDB', \nusername = 'cdcuser', password = 'pswd4cdc', \ntable.name = 'students', \n@map(type='keyvalue'), @attributes(id = 'id', name = 'name'))\ndefine stream inputStream (id int, name string);", description = "In this example, the cdc source starts polling students table for inserts. polling.column is an auto incremental field. url, username, password, and jdbc.driver.name are used to connect to the database."), @Example(syntax = "@source(type = 'cdc', mode='polling', polling.column = 'id', datasource.name = 'SimpleDB',\ntable.name = 'students', \n@map(type='keyvalue'), @attributes(id = 'id', name = 'name'))\ndefine stream inputStream (id int, name string);", description = "In this example, the cdc source starts polling students table for inserts. polling.column is a char column with the pattern S001, S002, ... . datasource.name is used to connect to the database. Note that the datasource.name works only with the Stream Processor."), @Example(syntax = "@source(type = 'cdc', mode='polling', polling.column = 'last_updated', datasource.name = 'SimpleDB',\ntable.name = 'students', \n@map(type='keyvalue'))\ndefine stream inputStream (name string);", description = "In this example, the cdc source starts polling students table for inserts and updates. polling.column is a timestamp field.")})
/* loaded from: input_file:org/wso2/extension/siddhi/io/cdc/source/CDCSource.class */
public class CDCSource extends Source {
    private static final Logger log = Logger.getLogger(CDCSource.class);
    private int pollingInterval;
    private String mode;
    private String operation;
    private ChangeDataCapture changeDataCapture;
    private String historyFileDirectory;
    private String carbonHome;
    private CDCPoller cdcPoller;
    private final ExecutorService executorService = Executors.newSingleThreadExecutor();
    private Map<byte[], byte[]> offsetData = new HashMap();
    private CDCSourceObjectKeeper cdcSourceObjectKeeper = CDCSourceObjectKeeper.getCdcSourceObjectKeeper();

    public void init(SourceEventListener sourceEventListener, OptionHolder optionHolder, String[] strArr, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.mode = optionHolder.validateAndGetStaticValue(CDCSourceConstants.MODE, CDCSourceConstants.MODE_LISTENING);
        String value = optionHolder.validateAndGetOption(CDCSourceConstants.TABLE_NAME).getValue();
        String str = this.mode;
        boolean z = -1;
        switch (str.hashCode()) {
            case -1218715461:
                if (str.equals(CDCSourceConstants.MODE_LISTENING)) {
                    z = false;
                    break;
                }
                break;
            case -397904957:
                if (str.equals(CDCSourceConstants.MODE_POLLING)) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                String value2 = optionHolder.validateAndGetOption("url").getValue();
                String value3 = optionHolder.validateAndGetOption(CDCSourceConstants.USERNAME).getValue();
                String value4 = optionHolder.validateAndGetOption(CDCSourceConstants.PASSWORD).getValue();
                String name = siddhiAppContext.getName();
                String id = sourceEventListener.getStreamDefinition().getId();
                this.operation = optionHolder.validateAndGetOption(CDCSourceConstants.OPERATION).getValue();
                int parseInt = Integer.parseInt(optionHolder.validateAndGetStaticValue(CDCSourceConstants.DATABASE_SERVER_ID, Integer.toString(-1)));
                String validateAndGetStaticValue = optionHolder.validateAndGetStaticValue(CDCSourceConstants.DATABASE_SERVER_NAME, "");
                String validateAndGetStaticValue2 = optionHolder.validateAndGetStaticValue(CDCSourceConstants.CONNECTOR_PROPERTIES, "");
                this.carbonHome = CDCSourceUtil.getCarbonHome();
                this.historyFileDirectory = this.carbonHome + File.separator + "cdc" + File.separator + "history" + File.separator + name + File.separator;
                validateListeningModeParameters(optionHolder);
                this.changeDataCapture = new ChangeDataCapture(this.operation, sourceEventListener);
                File file = new File(this.historyFileDirectory);
                if (!file.exists() && file.mkdirs() && log.isDebugEnabled()) {
                    log.debug("Directory created for history file.");
                }
                try {
                    this.changeDataCapture.setConfig(CDCSourceUtil.getConfigMap(value3, value4, value2, value, this.historyFileDirectory, name, id, parseInt, validateAndGetStaticValue, validateAndGetStaticValue2, hashCode()));
                    return;
                } catch (WrongConfigurationException e) {
                    throw new SiddhiAppCreationException("The cdc source couldn't get started because of invalid configurations. Found configurations: {username='" + value3 + "', password=******, url='" + value2 + "', tablename='" + value + "', connetorProperties='" + validateAndGetStaticValue2 + "'}", e);
                }
            case true:
                String validateAndGetStaticValue3 = optionHolder.validateAndGetStaticValue(CDCSourceConstants.POLLING_COLUMN);
                boolean isOptionExists = optionHolder.isOptionExists(CDCSourceConstants.DATASOURCE_NAME);
                boolean isOptionExists2 = optionHolder.isOptionExists(CDCSourceConstants.JNDI_RESOURCE);
                this.pollingInterval = Integer.parseInt(optionHolder.validateAndGetStaticValue(CDCSourceConstants.POLLING_INTERVAL, Integer.toString(1)));
                validatePollingModeParameters();
                String validateAndGetStaticValue4 = optionHolder.validateAndGetStaticValue(CDCSourceConstants.POOL_PROPERTIES, (String) null);
                if (isOptionExists) {
                    this.cdcPoller = new CDCPoller(null, null, null, value, null, optionHolder.validateAndGetStaticValue(CDCSourceConstants.DATASOURCE_NAME), null, validateAndGetStaticValue3, this.pollingInterval, validateAndGetStaticValue4, sourceEventListener, configReader);
                    return;
                }
                if (isOptionExists2) {
                    this.cdcPoller = new CDCPoller(null, null, null, value, null, null, optionHolder.validateAndGetStaticValue(CDCSourceConstants.JNDI_RESOURCE), validateAndGetStaticValue3, this.pollingInterval, validateAndGetStaticValue4, sourceEventListener, configReader);
                    return;
                }
                try {
                    this.cdcPoller = new CDCPoller(optionHolder.validateAndGetOption("url").getValue(), optionHolder.validateAndGetOption(CDCSourceConstants.USERNAME).getValue(), optionHolder.validateAndGetOption(CDCSourceConstants.PASSWORD).getValue(), value, optionHolder.validateAndGetStaticValue(CDCSourceConstants.JDBC_DRIVER_NAME), null, null, validateAndGetStaticValue3, this.pollingInterval, validateAndGetStaticValue4, sourceEventListener, configReader);
                    return;
                } catch (SiddhiAppValidationException e2) {
                    throw new SiddhiAppValidationException(e2.getMessage() + " Alternatively, define " + CDCSourceConstants.DATASOURCE_NAME + " or " + CDCSourceConstants.JNDI_RESOURCE + ". Current mode: " + CDCSourceConstants.MODE_POLLING);
                }
            default:
                throw new SiddhiAppValidationException("Unsupported mode: " + this.mode);
        }
    }

    public Class[] getOutputEventClasses() {
        return new Class[]{Map.class};
    }

    public void connect(Source.ConnectionCallback connectionCallback) throws ConnectionUnavailableException {
        String str = this.mode;
        boolean z = -1;
        switch (str.hashCode()) {
            case -1218715461:
                if (str.equals(CDCSourceConstants.MODE_LISTENING)) {
                    z = false;
                    break;
                }
                break;
            case -397904957:
                if (str.equals(CDCSourceConstants.MODE_POLLING)) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                this.cdcSourceObjectKeeper.addCdcObject(this);
                this.executorService.execute(this.changeDataCapture.getEngine((z2, str2, th) -> {
                    if (z2) {
                        return;
                    }
                    connectionCallback.onError(new ConnectionUnavailableException("Connection to the database lost.", th));
                }));
                return;
            case true:
                this.cdcPoller.setCompletionCallback(th2 -> {
                    if (th2.getClass().equals(SQLException.class)) {
                        connectionCallback.onError(new ConnectionUnavailableException("Connection to the database lost.", th2));
                    } else {
                        destroy();
                        throw new SiddhiAppRuntimeException("CDC Polling mode run failed.", th2);
                    }
                });
                this.executorService.execute(this.cdcPoller);
                return;
            default:
                return;
        }
    }

    public void disconnect() {
        if (this.mode.equals(CDCSourceConstants.MODE_POLLING)) {
            this.cdcPoller.pause();
            if (this.cdcPoller.isLocalDataSource()) {
                this.cdcPoller.getDataSource().close();
                if (log.isDebugEnabled()) {
                    log.debug("Closing the pool for CDC polling mode.");
                }
            }
        }
    }

    public void destroy() {
        disconnect();
        if (this.mode.equals(CDCSourceConstants.MODE_LISTENING)) {
            this.cdcSourceObjectKeeper.removeObject(hashCode());
        }
        this.executorService.shutdown();
    }

    public void pause() {
        String str = this.mode;
        boolean z = -1;
        switch (str.hashCode()) {
            case -1218715461:
                if (str.equals(CDCSourceConstants.MODE_LISTENING)) {
                    z = true;
                    break;
                }
                break;
            case -397904957:
                if (str.equals(CDCSourceConstants.MODE_POLLING)) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                this.cdcPoller.pause();
                return;
            case true:
                this.changeDataCapture.pause();
                return;
            default:
                return;
        }
    }

    public void resume() {
        String str = this.mode;
        boolean z = -1;
        switch (str.hashCode()) {
            case -1218715461:
                if (str.equals(CDCSourceConstants.MODE_LISTENING)) {
                    z = true;
                    break;
                }
                break;
            case -397904957:
                if (str.equals(CDCSourceConstants.MODE_POLLING)) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                this.cdcPoller.resume();
                return;
            case true:
                this.changeDataCapture.resume();
                return;
            default:
                return;
        }
    }

    public Map<String, Object> currentState() {
        HashMap hashMap = new HashMap();
        String str = this.mode;
        boolean z = -1;
        switch (str.hashCode()) {
            case -1218715461:
                if (str.equals(CDCSourceConstants.MODE_LISTENING)) {
                    z = true;
                    break;
                }
                break;
            case -397904957:
                if (str.equals(CDCSourceConstants.MODE_POLLING)) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                hashMap.put("last.offset", this.cdcPoller.getLastReadPollingColumnValue());
                break;
            case true:
                hashMap.put(CDCSourceConstants.CACHE_OBJECT, this.offsetData);
                break;
        }
        return hashMap;
    }

    public void restoreState(Map<String, Object> map) {
        String str = this.mode;
        boolean z = -1;
        switch (str.hashCode()) {
            case -1218715461:
                if (str.equals(CDCSourceConstants.MODE_LISTENING)) {
                    z = true;
                    break;
                }
                break;
            case -397904957:
                if (str.equals(CDCSourceConstants.MODE_POLLING)) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                this.cdcPoller.setLastReadPollingColumnValue((String) map.get("last.offset"));
                return;
            case true:
                this.offsetData = (HashMap) map.get(CDCSourceConstants.CACHE_OBJECT);
                return;
            default:
                return;
        }
    }

    public Map<byte[], byte[]> getOffsetData() {
        try {
            Thread.sleep(50L);
        } catch (InterruptedException e) {
            log.error("Offset data retrieval failed.", e);
        }
        return this.offsetData;
    }

    public void setOffsetData(Map<byte[], byte[]> map) {
        this.offsetData = map;
    }

    private void validateListeningModeParameters(OptionHolder optionHolder) {
        if (optionHolder.isOptionExists(CDCSourceConstants.DATASOURCE_NAME)) {
            throw new SiddhiAppValidationException("Parameter: datasource.name should not be defined for listening mode");
        }
        if (!this.operation.equalsIgnoreCase(CDCSourceConstants.INSERT) && !this.operation.equalsIgnoreCase(CDCSourceConstants.UPDATE) && !this.operation.equalsIgnoreCase(CDCSourceConstants.DELETE)) {
            throw new SiddhiAppValidationException("Unsupported operation: '" + this.operation + "'. operation should be one of 'insert', 'update' or 'delete'");
        }
        if (this.carbonHome.isEmpty()) {
            throw new SiddhiAppValidationException("Couldn't initialize Carbon Home.");
        }
        if (this.historyFileDirectory.endsWith(File.separator)) {
            return;
        }
        this.historyFileDirectory += File.separator;
    }

    private void validatePollingModeParameters() {
        if (this.pollingInterval < 0) {
            throw new SiddhiAppValidationException("polling.interval should be a non negative integer. Current mode: polling");
        }
    }
}
