/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.extension.io.cdc.source.polling;

import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import io.siddhi.core.stream.input.source.SourceEventListener;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.extension.io.cdc.source.config.Database;
import io.siddhi.extension.io.cdc.source.config.QueryConfiguration;
import io.siddhi.extension.io.cdc.source.polling.CDCPollingModeException;
import io.siddhi.extension.io.cdc.util.CDCPollingUtil;
import io.siddhi.extension.io.cdc.util.MyYamlConstructor;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import org.apache.log4j.Logger;
import org.osgi.framework.BundleContext;
import org.osgi.framework.FrameworkUtil;
import org.osgi.framework.ServiceReference;
import org.wso2.carbon.datasource.core.api.DataSourceService;
import org.wso2.carbon.datasource.core.exception.DataSourceException;
import org.yaml.snakeyaml.TypeDescription;
import org.yaml.snakeyaml.Yaml;
import org.yaml.snakeyaml.constructor.BaseConstructor;

public class CDCPoller
implements Runnable {
    private static final Logger log = Logger.getLogger(CDCPoller.class);
    private static final String PLACE_HOLDER_TABLE_NAME = "{{TABLE_NAME}}";
    private static final String PLACE_HOLDER_COLUMN_LIST = "{{COLUMN_LIST}}";
    private static final String PLACE_HOLDER_CONDITION = "{{CONDITION}}";
    private static final String SELECT_QUERY_CONFIG_FILE = "query-config.yaml";
    private static final String RECORD_SELECT_QUERY = "recordSelectQuery";
    private String selectQueryStructure = "";
    private String url;
    private String tableName;
    private String username;
    private String password;
    private String driverClassName;
    private HikariDataSource dataSource;
    private String lastReadPollingColumnValue;
    private SourceEventListener sourceEventListener;
    private String pollingColumn;
    private String datasourceName;
    private int pollingInterval;
    private CompletionCallback completionCallback;
    private boolean paused = false;
    private ReentrantLock pauseLock = new ReentrantLock();
    private Condition pauseLockCondition = this.pauseLock.newCondition();
    private ConfigReader configReader;
    private String poolPropertyString;
    private String jndiResource;
    private boolean isLocalDataSource = false;

    public CDCPoller(String url, String username, String password, String tableName, String driverClassName, String datasourceName, String jndiResource, String pollingColumn, int pollingInterval, String poolPropertyString, SourceEventListener sourceEventListener, ConfigReader configReader) {
        this.url = url;
        this.tableName = tableName;
        this.username = username;
        this.password = password;
        this.driverClassName = driverClassName;
        this.sourceEventListener = sourceEventListener;
        this.pollingColumn = pollingColumn;
        this.pollingInterval = pollingInterval;
        this.configReader = configReader;
        this.poolPropertyString = poolPropertyString;
        this.datasourceName = datasourceName;
        this.jndiResource = jndiResource;
    }

    public HikariDataSource getDataSource() {
        return this.dataSource;
    }

    public void setCompletionCallback(CompletionCallback completionCallback) {
        this.completionCallback = completionCallback;
    }

    private void initializeDatasource() throws NamingException {
        if (this.datasourceName == null) {
            if (this.jndiResource == null) {
                Properties connectionProperties = new Properties();
                connectionProperties.setProperty("jdbcUrl", this.url);
                connectionProperties.setProperty("dataSource.user", this.username);
                if (!CDCPollingUtil.isEmpty(this.password)) {
                    connectionProperties.setProperty("dataSource.password", this.password);
                }
                connectionProperties.setProperty("driverClassName", this.driverClassName);
                if (this.poolPropertyString != null) {
                    List<String[]> poolProps = CDCPollingUtil.processKeyValuePairs(this.poolPropertyString);
                    poolProps.forEach(pair -> connectionProperties.setProperty(pair[0], pair[1]));
                }
                HikariConfig config = new HikariConfig(connectionProperties);
                this.dataSource = new HikariDataSource(config);
                this.isLocalDataSource = true;
                if (log.isDebugEnabled()) {
                    log.debug((Object)("Database connection for '" + this.tableName + "' created through connection parameters specified in the query."));
                }
            } else {
                this.dataSource = (HikariDataSource)InitialContext.doLookup(this.jndiResource);
                this.isLocalDataSource = false;
                if (log.isDebugEnabled()) {
                    log.debug((Object)("Lookup for resource '" + this.jndiResource + "' completed through JNDI lookup."));
                }
            }
        } else {
            try {
                BundleContext bundleContext = FrameworkUtil.getBundle(DataSourceService.class).getBundleContext();
                ServiceReference serviceRef = bundleContext.getServiceReference(DataSourceService.class.getName());
                if (serviceRef == null) {
                    throw new CDCPollingModeException("DatasourceService : '" + DataSourceService.class.getCanonicalName() + "' cannot be found.");
                }
                DataSourceService dataSourceService = (DataSourceService)bundleContext.getService(serviceRef);
                this.dataSource = (HikariDataSource)dataSourceService.getDataSource(this.datasourceName);
                this.isLocalDataSource = false;
                if (log.isDebugEnabled()) {
                    log.debug((Object)("Lookup for datasource '" + this.datasourceName + "' completed through DataSource Service lookup. Current mode: " + "polling"));
                }
            }
            catch (DataSourceException e) {
                throw new CDCPollingModeException("Datasource '" + this.datasourceName + "' cannot be connected. Current mode: " + "polling", e);
            }
        }
    }

    public boolean isLocalDataSource() {
        return this.isLocalDataSource;
    }

    public String getLastReadPollingColumnValue() {
        return this.lastReadPollingColumnValue;
    }

    public void setLastReadPollingColumnValue(String lastReadPollingColumnValue) {
        this.lastReadPollingColumnValue = lastReadPollingColumnValue;
    }

    private Connection getConnection() {
        Connection conn;
        try {
            conn = this.dataSource.getConnection();
            if (log.isDebugEnabled()) {
                log.debug((Object)"A connection is initialized ");
            }
        }
        catch (SQLException e) {
            throw new CDCPollingModeException("Error initializing datasource connection. Current mode: polling", e);
        }
        return conn;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private String getSelectQuery(String columnList, String condition) {
        if (this.selectQueryStructure.isEmpty()) {
            String databaseName;
            Connection conn = null;
            try {
                conn = this.getConnection();
                DatabaseMetaData dmd = conn.getMetaData();
                databaseName = dmd.getDatabaseProductName();
            }
            catch (SQLException e) {
                throw new CDCPollingModeException("Error in looking up database type. Current mode: polling", e);
            }
            finally {
                CDCPollingUtil.cleanupConnection(null, null, conn);
            }
            this.selectQueryStructure = this.configReader.readConfig(databaseName + "." + RECORD_SELECT_QUERY, "");
            if (this.selectQueryStructure.isEmpty()) {
                QueryConfiguration queryConfiguration;
                InputStream inputStream = null;
                try {
                    MyYamlConstructor constructor = new MyYamlConstructor(QueryConfiguration.class);
                    TypeDescription queryTypeDescription = new TypeDescription(QueryConfiguration.class);
                    queryTypeDescription.putListPropertyType("databases", Database.class);
                    constructor.addTypeDescription(queryTypeDescription);
                    Yaml yaml = new Yaml((BaseConstructor)constructor);
                    ClassLoader classLoader = this.getClass().getClassLoader();
                    inputStream = classLoader.getResourceAsStream(SELECT_QUERY_CONFIG_FILE);
                    if (inputStream == null) {
                        throw new CDCPollingModeException("query-config.yaml is not found in the classpath. Current mode: polling");
                    }
                    queryConfiguration = (QueryConfiguration)yaml.load(inputStream);
                }
                finally {
                    if (inputStream != null) {
                        try {
                            inputStream.close();
                        }
                        catch (IOException e) {
                            log.error((Object)"Failed to close the input stream for query-config.yaml. Current mode: polling");
                        }
                    }
                }
                if (queryConfiguration != null) {
                    for (Database database : queryConfiguration.getDatabases()) {
                        if (!database.getName().equalsIgnoreCase(databaseName)) continue;
                        this.selectQueryStructure = database.getSelectQuery();
                        break;
                    }
                }
            }
            if (this.selectQueryStructure.isEmpty()) {
                throw new CDCPollingModeException("Unsupported database: " + databaseName + ". Configure system parameter: " + databaseName + "." + RECORD_SELECT_QUERY + ". Current mode: " + "polling");
            }
        }
        String selectQuery = this.selectQueryStructure.replace(PLACE_HOLDER_TABLE_NAME, this.tableName).replace(PLACE_HOLDER_COLUMN_LIST, columnList).replace(PLACE_HOLDER_CONDITION, condition);
        return selectQuery;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void pollForChanges() {
        try {
            this.initializeDatasource();
        }
        catch (NamingException e) {
            throw new CDCPollingModeException("Error in initializing connection for " + this.tableName + ". Current mode: " + "polling", e);
        }
        Connection connection = this.getConnection();
        PreparedStatement statement = null;
        ResultSet resultSet = null;
        try {
            try {
                String selectQuery;
                if (this.lastReadPollingColumnValue == null) {
                    selectQuery = this.getSelectQuery("MAX(" + this.pollingColumn + ")", "").trim();
                    statement = connection.prepareStatement(selectQuery);
                    resultSet = statement.executeQuery();
                    if (resultSet.next()) {
                        this.lastReadPollingColumnValue = resultSet.getString(1);
                    }
                    if (this.lastReadPollingColumnValue == null) {
                        this.lastReadPollingColumnValue = "-1";
                    }
                }
                selectQuery = this.getSelectQuery("*", "WHERE " + this.pollingColumn + " > ?");
                statement = connection.prepareStatement(selectQuery);
                while (true) {
                    if (this.paused) {
                        this.pauseLock.lock();
                        try {
                            while (this.paused) {
                                this.pauseLockCondition.await();
                            }
                        }
                        catch (InterruptedException ie) {
                            Thread.currentThread().interrupt();
                        }
                        finally {
                            this.pauseLock.unlock();
                        }
                    }
                    try {
                        statement.setString(1, this.lastReadPollingColumnValue);
                        resultSet = statement.executeQuery();
                        ResultSetMetaData metadata = resultSet.getMetaData();
                        while (resultSet.next()) {
                            HashMap<String, Object> detailsMap = new HashMap<String, Object>();
                            for (int i = 1; i <= metadata.getColumnCount(); ++i) {
                                String key = metadata.getColumnName(i);
                                Object value = resultSet.getObject(key);
                                detailsMap.put(key.toLowerCase(Locale.ENGLISH), value);
                            }
                            this.lastReadPollingColumnValue = resultSet.getString(this.pollingColumn);
                            this.handleEvent(detailsMap);
                        }
                    }
                    catch (SQLException ex) {
                        log.error((Object)ex);
                    }
                    finally {
                        CDCPollingUtil.cleanupConnection(resultSet, null, null);
                    }
                    try {
                        Thread.sleep((long)this.pollingInterval * 1000L);
                    }
                    catch (InterruptedException e) {
                        log.error((Object)"Error while polling. Current mode: polling", (Throwable)e);
                    }
                }
            }
            catch (SQLException ex) {
                throw new CDCPollingModeException("Error in polling for changes on " + this.tableName + ". Current mode: " + "polling", ex);
            }
        }
        catch (Throwable throwable) {
            CDCPollingUtil.cleanupConnection(resultSet, statement, connection);
            throw throwable;
        }
    }

    private void handleEvent(Map detailsMap) {
        this.sourceEventListener.onEvent((Object)detailsMap, null);
    }

    public void pause() {
        this.paused = true;
    }

    public void resume() {
        this.paused = false;
        try {
            this.pauseLock.lock();
            this.pauseLockCondition.signal();
        }
        finally {
            this.pauseLock.unlock();
        }
    }

    @Override
    public void run() {
        try {
            this.pollForChanges();
        }
        catch (CDCPollingModeException e) {
            this.completionCallback.handle((Throwable)((Object)e));
        }
    }

    public static interface CompletionCallback {
        public void handle(Throwable var1);
    }
}

