package io.siddhi.extension.io.cdc.source.polling;

import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import io.siddhi.core.stream.input.source.SourceEventListener;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.extension.io.cdc.source.config.Database;
import io.siddhi.extension.io.cdc.source.config.QueryConfiguration;
import io.siddhi.extension.io.cdc.util.CDCPollingUtil;
import io.siddhi.extension.io.cdc.util.CDCSourceConstants;
import io.siddhi.extension.io.cdc.util.MyYamlConstructor;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import org.apache.log4j.Logger;
import org.osgi.framework.BundleContext;
import org.osgi.framework.FrameworkUtil;
import org.osgi.framework.ServiceReference;
import org.wso2.carbon.datasource.core.api.DataSourceService;
import org.wso2.carbon.datasource.core.exception.DataSourceException;
import org.yaml.snakeyaml.TypeDescription;
import org.yaml.snakeyaml.Yaml;

/* loaded from: input_file:io/siddhi/extension/io/cdc/source/polling/CDCPoller.class */
public class CDCPoller implements Runnable {
    private static final Logger log = Logger.getLogger(CDCPoller.class);
    private static final String PLACE_HOLDER_TABLE_NAME = "{{TABLE_NAME}}";
    private static final String PLACE_HOLDER_COLUMN_LIST = "{{COLUMN_LIST}}";
    private static final String PLACE_HOLDER_CONDITION = "{{CONDITION}}";
    private static final String SELECT_QUERY_CONFIG_FILE = "query-config.yaml";
    private static final String RECORD_SELECT_QUERY = "recordSelectQuery";
    private String url;
    private String tableName;
    private String username;
    private String password;
    private String driverClassName;
    private HikariDataSource dataSource;
    private String lastReadPollingColumnValue;
    private SourceEventListener sourceEventListener;
    private String pollingColumn;
    private String datasourceName;
    private int pollingInterval;
    private CompletionCallback completionCallback;
    private ConfigReader configReader;
    private String poolPropertyString;
    private String jndiResource;
    private String selectQueryStructure = "";
    private boolean paused = false;
    private ReentrantLock pauseLock = new ReentrantLock();
    private Condition pauseLockCondition = this.pauseLock.newCondition();
    private boolean isLocalDataSource = false;

    /* loaded from: input_file:io/siddhi/extension/io/cdc/source/polling/CDCPoller$CompletionCallback.class */
    public interface CompletionCallback {
        void handle(Throwable th);
    }

    public CDCPoller(String str, String str2, String str3, String str4, String str5, String str6, String str7, String str8, int i, String str9, SourceEventListener sourceEventListener, ConfigReader configReader) {
        this.url = str;
        this.tableName = str4;
        this.username = str2;
        this.password = str3;
        this.driverClassName = str5;
        this.sourceEventListener = sourceEventListener;
        this.pollingColumn = str8;
        this.pollingInterval = i;
        this.configReader = configReader;
        this.poolPropertyString = str9;
        this.datasourceName = str6;
        this.jndiResource = str7;
    }

    public HikariDataSource getDataSource() {
        return this.dataSource;
    }

    public void setCompletionCallback(CompletionCallback completionCallback) {
        this.completionCallback = completionCallback;
    }

    private void initializeDatasource() throws NamingException {
        if (this.datasourceName != null) {
            try {
                BundleContext bundleContext = FrameworkUtil.getBundle(DataSourceService.class).getBundleContext();
                ServiceReference serviceReference = bundleContext.getServiceReference(DataSourceService.class.getName());
                if (serviceReference == null) {
                    throw new CDCPollingModeException("DatasourceService : '" + DataSourceService.class.getCanonicalName() + "' cannot be found.");
                }
                this.dataSource = (HikariDataSource) ((DataSourceService) bundleContext.getService(serviceReference)).getDataSource(this.datasourceName);
                this.isLocalDataSource = false;
                if (log.isDebugEnabled()) {
                    log.debug("Lookup for datasource '" + this.datasourceName + "' completed through DataSource Service lookup. Current mode: " + CDCSourceConstants.MODE_POLLING);
                }
                return;
            } catch (DataSourceException e) {
                throw new CDCPollingModeException("Datasource '" + this.datasourceName + "' cannot be connected. Current mode: " + CDCSourceConstants.MODE_POLLING, e);
            }
        }
        if (this.jndiResource != null) {
            this.dataSource = (HikariDataSource) InitialContext.doLookup(this.jndiResource);
            this.isLocalDataSource = false;
            if (log.isDebugEnabled()) {
                log.debug("Lookup for resource '" + this.jndiResource + "' completed through JNDI lookup.");
                return;
            }
            return;
        }
        Properties properties = new Properties();
        properties.setProperty("jdbcUrl", this.url);
        properties.setProperty("dataSource.user", this.username);
        if (!CDCPollingUtil.isEmpty(this.password)) {
            properties.setProperty("dataSource.password", this.password);
        }
        properties.setProperty("driverClassName", this.driverClassName);
        if (this.poolPropertyString != null) {
            CDCPollingUtil.processKeyValuePairs(this.poolPropertyString).forEach(strArr -> {
                properties.setProperty(strArr[0], strArr[1]);
            });
        }
        this.dataSource = new HikariDataSource(new HikariConfig(properties));
        this.isLocalDataSource = true;
        if (log.isDebugEnabled()) {
            log.debug("Database connection for '" + this.tableName + "' created through connection parameters specified in the query.");
        }
    }

    public boolean isLocalDataSource() {
        return this.isLocalDataSource;
    }

    public String getLastReadPollingColumnValue() {
        return this.lastReadPollingColumnValue;
    }

    public void setLastReadPollingColumnValue(String str) {
        this.lastReadPollingColumnValue = str;
    }

    private Connection getConnection() {
        try {
            Connection connection = this.dataSource.getConnection();
            if (log.isDebugEnabled()) {
                log.debug("A connection is initialized ");
            }
            return connection;
        } catch (SQLException e) {
            throw new CDCPollingModeException("Error initializing datasource connection. Current mode: polling", e);
        }
    }

    /* JADX WARN: Finally extract failed */
    private String getSelectQuery(String str, String str2) {
        if (this.selectQueryStructure.isEmpty()) {
            Connection connection = null;
            try {
                try {
                    connection = getConnection();
                    String databaseProductName = connection.getMetaData().getDatabaseProductName();
                    CDCPollingUtil.cleanupConnection(null, null, connection);
                    this.selectQueryStructure = this.configReader.readConfig(databaseProductName + "." + RECORD_SELECT_QUERY, "");
                    if (this.selectQueryStructure.isEmpty()) {
                        InputStream inputStream = null;
                        try {
                            MyYamlConstructor myYamlConstructor = new MyYamlConstructor(QueryConfiguration.class);
                            TypeDescription typeDescription = new TypeDescription(QueryConfiguration.class);
                            typeDescription.putListPropertyType("databases", Database.class);
                            myYamlConstructor.addTypeDescription(typeDescription);
                            Yaml yaml = new Yaml(myYamlConstructor);
                            InputStream resourceAsStream = getClass().getClassLoader().getResourceAsStream(SELECT_QUERY_CONFIG_FILE);
                            if (resourceAsStream == null) {
                                throw new CDCPollingModeException("query-config.yaml is not found in the classpath. Current mode: polling");
                            }
                            QueryConfiguration queryConfiguration = (QueryConfiguration) yaml.load(resourceAsStream);
                            if (resourceAsStream != null) {
                                try {
                                    resourceAsStream.close();
                                } catch (IOException e) {
                                    log.error("Failed to close the input stream for query-config.yaml. Current mode: polling");
                                }
                            }
                            if (queryConfiguration != null) {
                                Database[] databases = queryConfiguration.getDatabases();
                                int length = databases.length;
                                int i = 0;
                                while (true) {
                                    if (i >= length) {
                                        break;
                                    }
                                    Database database = databases[i];
                                    if (database.getName().equalsIgnoreCase(databaseProductName)) {
                                        this.selectQueryStructure = database.getSelectQuery();
                                        break;
                                    }
                                    i++;
                                }
                            }
                        } catch (Throwable th) {
                            if (0 != 0) {
                                try {
                                    inputStream.close();
                                } catch (IOException e2) {
                                    log.error("Failed to close the input stream for query-config.yaml. Current mode: polling");
                                }
                            }
                            throw th;
                        }
                    }
                    if (this.selectQueryStructure.isEmpty()) {
                        throw new CDCPollingModeException("Unsupported database: " + databaseProductName + ". Configure system parameter: " + databaseProductName + "." + RECORD_SELECT_QUERY + ". Current mode: " + CDCSourceConstants.MODE_POLLING);
                    }
                } catch (SQLException e3) {
                    throw new CDCPollingModeException("Error in looking up database type. Current mode: polling", e3);
                }
            } catch (Throwable th2) {
                CDCPollingUtil.cleanupConnection(null, null, connection);
                throw th2;
            }
        }
        return this.selectQueryStructure.replace(PLACE_HOLDER_TABLE_NAME, this.tableName).replace(PLACE_HOLDER_COLUMN_LIST, str).replace(PLACE_HOLDER_CONDITION, str2);
    }

    /* JADX WARN: Finally extract failed */
    private void pollForChanges() {
        try {
            initializeDatasource();
            Connection connection = getConnection();
            ResultSet resultSet = null;
            try {
                try {
                    if (this.lastReadPollingColumnValue == null) {
                        resultSet = connection.prepareStatement(getSelectQuery("MAX(" + this.pollingColumn + ")", "").trim()).executeQuery();
                        if (resultSet.next()) {
                            this.lastReadPollingColumnValue = resultSet.getString(1);
                        }
                        if (this.lastReadPollingColumnValue == null) {
                            this.lastReadPollingColumnValue = "-1";
                        }
                    }
                    PreparedStatement prepareStatement = connection.prepareStatement(getSelectQuery("*", "WHERE " + this.pollingColumn + " > ?"));
                    while (true) {
                        if (this.paused) {
                            this.pauseLock.lock();
                            while (this.paused) {
                                try {
                                    try {
                                        this.pauseLockCondition.await();
                                    } catch (InterruptedException e) {
                                        Thread.currentThread().interrupt();
                                        this.pauseLock.unlock();
                                    }
                                } catch (Throwable th) {
                                    this.pauseLock.unlock();
                                    throw th;
                                }
                            }
                            this.pauseLock.unlock();
                        }
                        try {
                            try {
                                prepareStatement.setString(1, this.lastReadPollingColumnValue);
                                resultSet = prepareStatement.executeQuery();
                                ResultSetMetaData metaData = resultSet.getMetaData();
                                while (resultSet.next()) {
                                    HashMap hashMap = new HashMap();
                                    for (int i = 1; i <= metaData.getColumnCount(); i++) {
                                        String columnName = metaData.getColumnName(i);
                                        hashMap.put(columnName.toLowerCase(Locale.ENGLISH), resultSet.getObject(columnName));
                                    }
                                    this.lastReadPollingColumnValue = resultSet.getString(this.pollingColumn);
                                    handleEvent(hashMap);
                                }
                                CDCPollingUtil.cleanupConnection(resultSet, null, null);
                            } catch (Throwable th2) {
                                CDCPollingUtil.cleanupConnection(resultSet, null, null);
                                throw th2;
                            }
                        } catch (SQLException e2) {
                            log.error(e2);
                            CDCPollingUtil.cleanupConnection(resultSet, null, null);
                        }
                        try {
                            Thread.sleep(this.pollingInterval * 1000);
                        } catch (InterruptedException e3) {
                            log.error("Error while polling. Current mode: polling", e3);
                        }
                    }
                } catch (SQLException e4) {
                    throw new CDCPollingModeException("Error in polling for changes on " + this.tableName + ". Current mode: " + CDCSourceConstants.MODE_POLLING, e4);
                }
            } catch (Throwable th3) {
                CDCPollingUtil.cleanupConnection(null, null, connection);
                throw th3;
            }
        } catch (NamingException e5) {
            throw new CDCPollingModeException("Error in initializing connection for " + this.tableName + ". Current mode: " + CDCSourceConstants.MODE_POLLING, e5);
        }
    }

    private void handleEvent(Map map) {
        this.sourceEventListener.onEvent(map, (String[]) null);
    }

    public void pause() {
        this.paused = true;
    }

    public void resume() {
        this.paused = false;
        try {
            this.pauseLock.lock();
            this.pauseLockCondition.signal();
        } finally {
            this.pauseLock.unlock();
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            pollForChanges();
        } catch (CDCPollingModeException e) {
            this.completionCallback.handle(e);
        }
    }
}
