package io.siddhi.extension.io.cdc.source;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.exception.SiddhiAppCreationException;
import io.siddhi.core.exception.SiddhiAppRuntimeException;
import io.siddhi.core.stream.ServiceDeploymentInfo;
import io.siddhi.core.stream.input.source.Source;
import io.siddhi.core.stream.input.source.SourceEventListener;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.cdc.source.listening.CDCSourceObjectKeeper;
import io.siddhi.extension.io.cdc.source.listening.ChangeDataCapture;
import io.siddhi.extension.io.cdc.source.listening.MongoChangeDataCapture;
import io.siddhi.extension.io.cdc.source.listening.RdbmsChangeDataCapture;
import io.siddhi.extension.io.cdc.source.listening.WrongConfigurationException;
import io.siddhi.extension.io.cdc.source.polling.CDCPoller;
import io.siddhi.extension.io.cdc.util.CDCSourceConstants;
import io.siddhi.extension.io.cdc.util.CDCSourceUtil;
import io.siddhi.query.api.exception.SiddhiAppValidationException;
import java.io.File;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.log4j.Logger;

@Extension(name = "cdc", namespace = "source", description = "The CDC source receives events when change events (i.e., INSERT, UPDATE, DELETE) are triggered for a database table. Events are received in the 'key-value' format.\n\nThere are two modes you could perform CDC: Listening mode and Polling mode.\n\nIn polling mode, the datasource is periodically polled for capturing the changes. The polling period can be configured.\nIn polling mode, you can only capture INSERT and UPDATE changes.\n\nOn listening mode, the Source will keep listening to the Change Log of the database and notify in case a change has taken place. Here, you are immediately notified about the change, compared to polling mode.\n\nThe key values of the map of a CDC change event are as follows.\n\nFor 'listening' mode: \n\tFor insert: Keys are specified as columns of the table.\n\tFor delete: Keys are followed by the specified table columns. This is achieved via 'before_'. e.g., specifying 'before_X' results in the key being added before the column named 'X'.\n\tFor update: Keys are followed followed by the specified table columns. This is achieved via 'before_'. e.g., specifying 'before_X' results in the key being added before the column named 'X'.\n\nFor 'polling' mode: Keys are specified as the columns of the table.#### Preparations required for working with Oracle Databases in listening mode\n\nUsing the extension in Windows, Mac OSX and AIX are pretty straight forward inorder to achieve the required behaviour please follow the steps given below\n\n  - Download the compatible version of oracle instantclient for the database version from [here](https://www.oracle.com/database/technologies/instant-client/downloads.html) and extract\n  - Extract and set the environment variable `LD_LIBRARY_PATH` to the location of instantclient which was exstracted as shown below\n  ```\n    export LD_LIBRARY_PATH=<path to the instant client location>\n  ```\n  - Inside the instantclient folder which was download there are two jars `xstreams.jar` and `ojdbc<version>.jar` convert them to OSGi bundles using the tools which were provided in the `<distribution>/bin` for converting the `ojdbc.jar` use the tool `spi-provider.sh|bat` and for the conversion of `xstreams.jar` use the jni-provider.sh as shown below(Note: this way of converting Xstreams jar is applicable only for Linux environments for other OSs this step is not required and converting it through the `jartobundle.sh` tool is enough)\n  ```\n    ./jni-provider.sh <input-jar> <destination> <comma seperated native library names>\n  ```\n  once ojdbc and xstreams jars are converted to OSGi copy the generated jars to the `<distribution>/lib`. Currently siddhi-io-cdc only supports the oracle database distributions 12 and above\n\nSee parameter: mode for supported databases and change events.", parameters = {@Parameter(name = "url", description = "The connection URL to the database.\nF=The format used is: 'jdbc:mysql://<host>:<port>/<database_name>' ", type = {DataType.STRING}), @Parameter(name = CDCSourceConstants.MODE, description = "Mode to capture the change data. The type of events that can be received, and the required parameters differ based on the mode. The mode can be one of the following:\n'polling': This mode uses a column named 'polling.column' to monitor the given table. It captures change events of the 'RDBMS', 'INSERT, and 'UPDATE' types.\n'listening': This mode uses logs to monitor the given table. It currently supports change events only of the 'MySQL', 'INSERT', 'UPDATE', and 'DELETE' types.", type = {DataType.STRING}, defaultValue = CDCSourceConstants.MODE_LISTENING, optional = true), @Parameter(name = CDCSourceConstants.JDBC_DRIVER_NAME, description = "The driver class name for connecting the database. **It is required to specify a value for this parameter when the mode is 'polling'.**", type = {DataType.STRING}, defaultValue = "<Empty_String>", optional = true), @Parameter(name = CDCSourceConstants.USERNAME, description = "The username to be used for accessing the database. This user needs to have the 'SELECT', 'RELOAD', 'SHOW DATABASES', 'REPLICATION SLAVE', and 'REPLICATION CLIENT'privileges for the change data capturing table (specified via the 'table.name' parameter).\nTo operate in the polling mode, the user needs 'SELECT' privileges.", type = {DataType.STRING}), @Parameter(name = CDCSourceConstants.PASSWORD, description = "The password of the username you specified for accessing the database.", type = {DataType.STRING}), @Parameter(name = CDCSourceConstants.POOL_PROPERTIES, description = "The pool parameters for the database connection can be specified as key-value pairs.", type = {DataType.STRING}, optional = true, defaultValue = "<Empty_String>"), @Parameter(name = CDCSourceConstants.DATASOURCE_NAME, description = "Name of the wso2 datasource to connect to the database. When datasource name is provided, the URL, username and password are not needed. A datasource based connection is given more priority over the URL based connection.\n This parameter is applicable only when the mode is set to 'polling', and it can be applied only when you use this extension with WSO2 Stream Processor.", type = {DataType.STRING}, defaultValue = "<Empty_String>", optional = true), @Parameter(name = CDCSourceConstants.TABLE_NAME, description = "The name of the table that needs to be monitored for data changes.", type = {DataType.STRING}), @Parameter(name = CDCSourceConstants.POLLING_COLUMN, description = "The column name  that is polled to capture the change data. It is recommended to have a TIMESTAMP field as the 'polling.column' in order to capture the inserts and updates.\nNumeric auto-incremental fields and char fields can also be used as 'polling.column'. However, note that fields of these types only support insert change capturing, and the possibility of using a char field also depends on how the data is input.\n**It is required to enter a value for this parameter only when the mode is 'polling'.**", type = {DataType.STRING}, defaultValue = "<Empty_String>", optional = true), @Parameter(name = CDCSourceConstants.POLLING_INTERVAL, description = "The time interval (specified in seconds) to poll the given table for changes.\nThis parameter is applicable only when the mode is set to 'polling'.", type = {DataType.INT}, defaultValue = "1", optional = true), @Parameter(name = CDCSourceConstants.OPERATION, description = "The change event operation you want to carry out. Possible values are 'insert', 'update' or 'delete'. This parameter is not case sensitive. **It is required to specify a value only when the mode is 'listening'.**\n", type = {DataType.STRING}), @Parameter(name = CDCSourceConstants.CONNECTOR_PROPERTIES, description = "Here, you can specify Debezium connector properties as a comma-separated string. \nThe properties specified here are given more priority over the parameters. This parameter is applicable only for the 'listening' mode.", type = {DataType.STRING}, optional = true, defaultValue = "Empty_String"), @Parameter(name = CDCSourceConstants.DATABASE_SERVER_ID, description = "An ID to be used when joining MySQL database cluster to read the bin log. This should be a unique integer between 1 to 2^32. This parameter is applicable only when the mode is 'listening'.", type = {DataType.STRING}, optional = true, defaultValue = "Random integer between 5400 and 6400"), @Parameter(name = CDCSourceConstants.DATABASE_SERVER_NAME, description = "A logical name that identifies and provides a namespace for the database server. This parameter is applicable only when the mode is 'listening'.", defaultValue = "{host}_{port}", optional = true, type = {DataType.STRING}), @Parameter(name = CDCSourceConstants.WAIT_ON_MISSED_RECORD, description = "Indicates whether the process needs to wait on missing/out-of-order records. \nWhen this flag is set to 'true' the process will be held once it identifies a missing record. The missing recrod is identified by the sequence of the polling.column value. This can be used only with number fields and not recommended to use with time values as it will not be sequential.\nThis should be enabled ONLY where the records can be written out-of-order, (eg. concurrent writers) as this degrades the performance.", type = {DataType.BOOL}, optional = true, defaultValue = "false"), @Parameter(name = CDCSourceConstants.MISSED_RECORD_WAITING_TIMEOUT, description = "The timeout (specified in seconds) to retry for missing/out-of-order record. This should be used along with the wait.on.missed.record parameter. If the parameter is not set, the process will indefinitely wait for the missing record.", type = {DataType.INT}, optional = true, defaultValue = "-1")}, examples = {@Example(syntax = "@source(type = 'cdc' , url = 'jdbc:mysql://localhost:3306/SimpleDB', \nusername = 'cdcuser', password = 'pswd4cdc', \ntable.name = 'students', operation = 'insert', \n@map(type='keyvalue', @attributes(id = 'id', name = 'name')))\ndefine stream inputStream (id string, name string);", description = "In this example, the CDC source listens to the row insertions that are made in the 'students' table with the column name, and the ID. This table belongs to the 'SimpleDB' MySQL database that can be accessed via the given URL."), @Example(syntax = "@source(type = 'cdc' , url = 'jdbc:mysql://localhost:3306/SimpleDB', \nusername = 'cdcuser', password = 'pswd4cdc', \ntable.name = 'students', operation = 'update', \n@map(type='keyvalue', @attributes(id = 'id', name = 'name', \nbefore_id = 'before_id', before_name = 'before_name')))\ndefine stream inputStream (before_id string, id string, \nbefore_name string , name string);", description = "In this example, the CDC source listens to the row updates that are made in the 'students' table. This table belongs to the 'SimpleDB' MySQL database that can be accessed via the given URL."), @Example(syntax = "@source(type = 'cdc' , url = 'jdbc:mysql://localhost:3306/SimpleDB', \nusername = 'cdcuser', password = 'pswd4cdc', \ntable.name = 'students', operation = 'delete', \n@map(type='keyvalue', @attributes(before_id = 'before_id', before_name = 'before_name')))\ndefine stream inputStream (before_id string, before_name string);", description = "In this example, the CDC source listens to the row deletions made in the 'students' table. This table belongs to the 'SimpleDB' database that can be accessed via the given URL."), @Example(syntax = "@source(type = 'cdc', mode='polling', polling.column = 'id', \njdbc.driver.name = 'com.mysql.jdbc.Driver', url = 'jdbc:mysql://localhost:3306/SimpleDB', \nusername = 'cdcuser', password = 'pswd4cdc', \ntable.name = 'students', \n@map(type='keyvalue'), @attributes(id = 'id', name = 'name'))\ndefine stream inputStream (id int, name string);", description = "In this example, the CDC source polls the 'students' table for inserts. 'id' that is specified as the polling colum' is an auto incremental field. The connection to the database is made via the URL, username, password, and the JDBC driver name."), @Example(syntax = "@source(type = 'cdc', mode='polling', polling.column = 'id', datasource.name = 'SimpleDB',\ntable.name = 'students', \n@map(type='keyvalue'), @attributes(id = 'id', name = 'name'))\ndefine stream inputStream (id int, name string);", description = "In this example, the CDC source polls the 'students' table for inserts. The given polling column is a char column with the 'S001, S002, ... .' pattern. The connection to the database is made via a data source named 'SimpleDB'. Note that the 'datasource.name' parameter works only with the Stream Processor."), @Example(syntax = "@source(type = 'cdc', mode='polling', polling.column = 'last_updated', datasource.name = 'SimpleDB',\ntable.name = 'students', \n@map(type='keyvalue'))\ndefine stream inputStream (name string);", description = "In this example, the CDC source polls the 'students' table for inserts and updates. The polling column is a timestamp field."), @Example(syntax = "@source(type='cdc', jdbc.driver.name='com.mysql.jdbc.Driver', url='jdbc:mysql://localhost:3306/SimpleDB', username='cdcuser', password='pswd4cdc', table.name='students', mode='polling', polling.column='id', operation='insert', wait.on.missed.record='true', missed.record.waiting.timeout='10',\n@map(type='keyvalue'), \n@attributes(batch_no='batch_no', item='item', qty='qty'))\ndefine stream inputStream (id int, name string);", description = "In this example, the CDC source polls the 'students' table for inserts. The polling column is a numeric field. This source expects the records in the database to be written concurrently/out-of-order so it waits if it encounters a missing record. If the record doesn't appear within 10 seconds it resumes the process."), @Example(syntax = "@source(type = 'cdc', url = 'jdbc:oracle:thin://localhost:1521/ORCLCDB', username='c##xstrm', password='xs', table.name='DEBEZIUM.sweetproductiontable', operation = 'insert', connector.properties='oracle.outserver.name=DBZXOUT,oracle.pdb=ORCLPDB1' @map(type = 'keyvalue'))\ndefine stream insertSweetProductionStream (ID int, NAME string, WEIGHT int);\n", description = "In this example, the CDC source connect to an Oracle database and listens for insert queries of sweetproduction table")})
/* loaded from: input_file:io/siddhi/extension/io/cdc/source/CDCSource.class */
public class CDCSource extends Source<CdcState> {
    private static final Logger log = Logger.getLogger(CDCSource.class);
    private int pollingInterval;
    private String mode;
    private String operation;
    private ChangeDataCapture changeDataCapture;
    private String historyFileDirectory;
    private String carbonHome;
    private CDCPoller cdcPoller;
    private final ExecutorService executorService = Executors.newSingleThreadExecutor();
    private Map<byte[], byte[]> offsetData = new HashMap();
    private CDCSourceObjectKeeper cdcSourceObjectKeeper = CDCSourceObjectKeeper.getCdcSourceObjectKeeper();

    /* loaded from: input_file:io/siddhi/extension/io/cdc/source/CDCSource$CdcState.class */
    class CdcState extends State {
        private final String mode;
        private final Map<String, Object> state;

        private CdcState(String str) {
            this.mode = str;
            this.state = new HashMap();
        }

        public boolean canDestroy() {
            return false;
        }

        public Map<String, Object> snapshot() {
            String str = this.mode;
            boolean z = -1;
            switch (str.hashCode()) {
                case -1218715461:
                    if (str.equals(CDCSourceConstants.MODE_LISTENING)) {
                        z = true;
                        break;
                    }
                    break;
                case -397904957:
                    if (str.equals(CDCSourceConstants.MODE_POLLING)) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    this.state.put("last.offset", CDCSource.this.cdcPoller.getLastReadPollingColumnValue());
                    break;
                case true:
                    this.state.put(CDCSourceConstants.CACHE_OBJECT, CDCSource.this.offsetData);
                    break;
            }
            return this.state;
        }

        public void restore(Map<String, Object> map) {
            String str = this.mode;
            boolean z = -1;
            switch (str.hashCode()) {
                case -1218715461:
                    if (str.equals(CDCSourceConstants.MODE_LISTENING)) {
                        z = true;
                        break;
                    }
                    break;
                case -397904957:
                    if (str.equals(CDCSourceConstants.MODE_POLLING)) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    CDCSource.this.cdcPoller.setLastReadPollingColumnValue((String) map.get("last.offset"));
                    return;
                case true:
                    CDCSource.this.offsetData = (HashMap) map.get(CDCSourceConstants.CACHE_OBJECT);
                    return;
                default:
                    return;
            }
        }
    }

    protected ServiceDeploymentInfo exposeServiceDeploymentInfo() {
        return null;
    }

    public StateFactory<CdcState> init(SourceEventListener sourceEventListener, OptionHolder optionHolder, String[] strArr, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.mode = optionHolder.validateAndGetStaticValue(CDCSourceConstants.MODE, CDCSourceConstants.MODE_LISTENING);
        String value = optionHolder.validateAndGetOption(CDCSourceConstants.TABLE_NAME).getValue();
        String name = siddhiAppContext.getName();
        String str = this.mode;
        boolean z = -1;
        switch (str.hashCode()) {
            case -1218715461:
                if (str.equals(CDCSourceConstants.MODE_LISTENING)) {
                    z = false;
                    break;
                }
                break;
            case -397904957:
                if (str.equals(CDCSourceConstants.MODE_POLLING)) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                String value2 = optionHolder.validateAndGetOption("url").getValue();
                String value3 = optionHolder.validateAndGetOption(CDCSourceConstants.USERNAME).getValue();
                String value4 = optionHolder.validateAndGetOption(CDCSourceConstants.PASSWORD).getValue();
                String id = sourceEventListener.getStreamDefinition().getId();
                this.operation = optionHolder.validateAndGetOption(CDCSourceConstants.OPERATION).getValue();
                int parseInt = Integer.parseInt(optionHolder.validateAndGetStaticValue(CDCSourceConstants.DATABASE_SERVER_ID, Integer.toString(-1)));
                String validateAndGetStaticValue = optionHolder.validateAndGetStaticValue(CDCSourceConstants.DATABASE_SERVER_NAME, "");
                String validateAndGetStaticValue2 = optionHolder.validateAndGetStaticValue(CDCSourceConstants.CONNECTOR_PROPERTIES, "");
                this.carbonHome = CDCSourceUtil.getCarbonHome();
                this.historyFileDirectory = this.carbonHome + File.separator + "cdc" + File.separator + "history" + File.separator + name + File.separator;
                validateListeningModeParameters(optionHolder);
                if (value2.toLowerCase(Locale.ENGLISH).contains("jdbc:mongodb")) {
                    this.changeDataCapture = new MongoChangeDataCapture(this.operation, sourceEventListener);
                } else {
                    this.changeDataCapture = new RdbmsChangeDataCapture(this.operation, sourceEventListener);
                }
                File file = new File(this.historyFileDirectory);
                if (!file.exists() && file.mkdirs() && log.isDebugEnabled()) {
                    log.debug("Directory created for history file.");
                }
                try {
                    this.changeDataCapture.setConfig(CDCSourceUtil.getConfigMap(value3, value4, value2, value, this.historyFileDirectory, name, id, parseInt, validateAndGetStaticValue, validateAndGetStaticValue2, hashCode()));
                    break;
                } catch (WrongConfigurationException e) {
                    throw new SiddhiAppCreationException("The cdc source couldn't get started because of invalid configurations. Found configurations: {username='" + value3 + "', password=******, url='" + value2 + "', tablename='" + value + "', connetorProperties='" + validateAndGetStaticValue2 + "'}", e);
                }
                break;
            case true:
                String validateAndGetStaticValue3 = optionHolder.validateAndGetStaticValue(CDCSourceConstants.POLLING_COLUMN);
                boolean isOptionExists = optionHolder.isOptionExists(CDCSourceConstants.DATASOURCE_NAME);
                boolean isOptionExists2 = optionHolder.isOptionExists(CDCSourceConstants.JNDI_RESOURCE);
                this.pollingInterval = Integer.parseInt(optionHolder.validateAndGetStaticValue(CDCSourceConstants.POLLING_INTERVAL, Integer.toString(1)));
                validatePollingModeParameters();
                String validateAndGetStaticValue4 = optionHolder.validateAndGetStaticValue(CDCSourceConstants.POOL_PROPERTIES, (String) null);
                boolean parseBoolean = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue(CDCSourceConstants.WAIT_ON_MISSED_RECORD, "false"));
                int parseInt2 = Integer.parseInt(optionHolder.validateAndGetStaticValue(CDCSourceConstants.MISSED_RECORD_WAITING_TIMEOUT, "-1"));
                if (isOptionExists) {
                    this.cdcPoller = new CDCPoller(null, null, null, value, null, optionHolder.validateAndGetStaticValue(CDCSourceConstants.DATASOURCE_NAME), null, validateAndGetStaticValue3, this.pollingInterval, validateAndGetStaticValue4, sourceEventListener, configReader, parseBoolean, parseInt2, name);
                    break;
                } else if (isOptionExists2) {
                    this.cdcPoller = new CDCPoller(null, null, null, value, null, null, optionHolder.validateAndGetStaticValue(CDCSourceConstants.JNDI_RESOURCE), validateAndGetStaticValue3, this.pollingInterval, validateAndGetStaticValue4, sourceEventListener, configReader, parseBoolean, parseInt2, name);
                    break;
                } else {
                    try {
                        this.cdcPoller = new CDCPoller(optionHolder.validateAndGetOption("url").getValue(), optionHolder.validateAndGetOption(CDCSourceConstants.USERNAME).getValue(), optionHolder.validateAndGetOption(CDCSourceConstants.PASSWORD).getValue(), value, optionHolder.validateAndGetStaticValue(CDCSourceConstants.JDBC_DRIVER_NAME), null, null, validateAndGetStaticValue3, this.pollingInterval, validateAndGetStaticValue4, sourceEventListener, configReader, parseBoolean, parseInt2, name);
                        break;
                    } catch (SiddhiAppValidationException e2) {
                        throw new SiddhiAppValidationException(e2.getMessage() + " Alternatively, define " + CDCSourceConstants.DATASOURCE_NAME + " or " + CDCSourceConstants.JNDI_RESOURCE + ". Current mode: " + CDCSourceConstants.MODE_POLLING);
                    }
                }
            default:
                throw new SiddhiAppValidationException("Unsupported mode: " + this.mode);
        }
        return () -> {
            return new CdcState(this.mode);
        };
    }

    public Class[] getOutputEventClasses() {
        return new Class[]{Map.class};
    }

    public void connect(Source<CdcState>.ConnectionCallback connectionCallback, CdcState cdcState) throws ConnectionUnavailableException {
        String str = this.mode;
        boolean z = -1;
        switch (str.hashCode()) {
            case -1218715461:
                if (str.equals(CDCSourceConstants.MODE_LISTENING)) {
                    z = false;
                    break;
                }
                break;
            case -397904957:
                if (str.equals(CDCSourceConstants.MODE_POLLING)) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                this.cdcSourceObjectKeeper.addCdcObject(this);
                this.executorService.execute(this.changeDataCapture.getEngine((z2, str2, th) -> {
                    if (z2) {
                        return;
                    }
                    connectionCallback.onError(new ConnectionUnavailableException("Connection to the database lost.", th));
                }));
                return;
            case true:
                this.cdcPoller.setCompletionCallback(th2 -> {
                    if (th2.getClass().equals(SQLException.class)) {
                        connectionCallback.onError(new ConnectionUnavailableException("Connection to the database lost.", th2));
                    } else {
                        destroy();
                        throw new SiddhiAppRuntimeException("CDC Polling mode run failed.", th2);
                    }
                });
                this.executorService.execute(this.cdcPoller);
                return;
            default:
                return;
        }
    }

    public void disconnect() {
        if (this.mode.equals(CDCSourceConstants.MODE_POLLING)) {
            this.cdcPoller.pause();
            if (this.cdcPoller.isLocalDataSource()) {
                this.cdcPoller.getDataSource().close();
                if (log.isDebugEnabled()) {
                    log.debug("Closing the pool for CDC polling mode.");
                }
            }
        }
    }

    public void destroy() {
        disconnect();
        if (this.mode.equals(CDCSourceConstants.MODE_LISTENING)) {
            this.cdcSourceObjectKeeper.removeObject(hashCode());
        }
        this.executorService.shutdown();
    }

    public void pause() {
        String str = this.mode;
        boolean z = -1;
        switch (str.hashCode()) {
            case -1218715461:
                if (str.equals(CDCSourceConstants.MODE_LISTENING)) {
                    z = true;
                    break;
                }
                break;
            case -397904957:
                if (str.equals(CDCSourceConstants.MODE_POLLING)) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                this.cdcPoller.pause();
                return;
            case true:
                this.changeDataCapture.pause();
                return;
            default:
                return;
        }
    }

    public void resume() {
        String str = this.mode;
        boolean z = -1;
        switch (str.hashCode()) {
            case -1218715461:
                if (str.equals(CDCSourceConstants.MODE_LISTENING)) {
                    z = true;
                    break;
                }
                break;
            case -397904957:
                if (str.equals(CDCSourceConstants.MODE_POLLING)) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                this.cdcPoller.resume();
                return;
            case true:
                this.changeDataCapture.resume();
                return;
            default:
                return;
        }
    }

    public Map<byte[], byte[]> getOffsetData() {
        try {
            Thread.sleep(50L);
        } catch (InterruptedException e) {
            log.error("Offset data retrieval failed.", e);
        }
        return this.offsetData;
    }

    public void setOffsetData(Map<byte[], byte[]> map) {
        this.offsetData = map;
    }

    private void validateListeningModeParameters(OptionHolder optionHolder) {
        if (optionHolder.isOptionExists(CDCSourceConstants.DATASOURCE_NAME)) {
            throw new SiddhiAppValidationException("Parameter: datasource.name should not be defined for listening mode");
        }
        if (!this.operation.equalsIgnoreCase(CDCSourceConstants.INSERT) && !this.operation.equalsIgnoreCase(CDCSourceConstants.UPDATE) && !this.operation.equalsIgnoreCase("delete")) {
            throw new SiddhiAppValidationException("Unsupported operation: '" + this.operation + "'. operation should be one of 'insert', 'update' or 'delete'");
        }
        if (this.carbonHome.isEmpty()) {
            throw new SiddhiAppValidationException("Couldn't initialize Carbon Home.");
        }
        if (this.historyFileDirectory.endsWith(File.separator)) {
            return;
        }
        this.historyFileDirectory += File.separator;
    }

    private void validatePollingModeParameters() {
        if (this.pollingInterval < 0) {
            throw new SiddhiAppValidationException("polling.interval should be a non negative integer. Current mode: polling");
        }
    }

    public /* bridge */ /* synthetic */ void connect(Source.ConnectionCallback connectionCallback, State state) throws ConnectionUnavailableException {
        connect((Source<CdcState>.ConnectionCallback) connectionCallback, (CdcState) state);
    }
}
