/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.extension.io.cdc.source.polling.strategies;

import com.zaxxer.hikari.HikariDataSource;
import io.siddhi.core.stream.input.source.SourceEventListener;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.extension.io.cdc.source.metrics.CDCStatus;
import io.siddhi.extension.io.cdc.source.metrics.PollingMetrics;
import io.siddhi.extension.io.cdc.source.polling.CDCPollingModeException;
import io.siddhi.extension.io.cdc.source.polling.strategies.PollingStrategy;
import io.siddhi.extension.io.cdc.util.CDCPollingUtil;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Locale;
import org.apache.log4j.Logger;

public class WaitOnMissingRecordPollingStrategy
extends PollingStrategy {
    private static final Logger log = Logger.getLogger(WaitOnMissingRecordPollingStrategy.class);
    private String pollingColumn;
    private int pollingInterval;
    private int waitTimeout;
    private Integer lastReadPollingColumnValue;

    public WaitOnMissingRecordPollingStrategy(HikariDataSource dataSource, ConfigReader configReader, SourceEventListener sourceEventListener, String tableName, String pollingColumn, int pollingInterval, int waitTimeout, String appName, PollingMetrics pollingMetrics) {
        super(dataSource, configReader, sourceEventListener, tableName, appName, pollingMetrics);
        this.pollingColumn = pollingColumn;
        this.pollingInterval = pollingInterval;
        this.waitTimeout = waitTimeout;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void poll() {
        Connection connection = this.getConnection();
        PreparedStatement statement = null;
        ResultSet resultSet = null;
        int waitingFor = -1;
        long waitingFrom = -1L;
        try {
            try {
                String selectQuery;
                long startedTime = System.currentTimeMillis();
                if (this.lastReadPollingColumnValue == null) {
                    selectQuery = this.getSelectQuery("MAX(" + this.pollingColumn + ")", "").trim();
                    statement = connection.prepareStatement(selectQuery);
                    resultSet = statement.executeQuery();
                    if (resultSet.next()) {
                        this.lastReadPollingColumnValue = resultSet.getInt(1);
                    }
                    if (this.lastReadPollingColumnValue == null) {
                        this.lastReadPollingColumnValue = -1;
                    }
                }
                selectQuery = this.getSelectQuery("*", "WHERE " + this.pollingColumn + " > ?");
                statement = connection.prepareStatement(selectQuery);
                while (true) {
                    if (this.paused) {
                        this.pauseLock.lock();
                        try {
                            while (this.paused) {
                                this.pauseLockCondition.await();
                            }
                        }
                        catch (InterruptedException ie) {
                            Thread.currentThread().interrupt();
                        }
                        finally {
                            this.pauseLock.unlock();
                        }
                    }
                    int eventsPerPollingInterval = 0;
                    boolean isError = false;
                    try {
                        statement.setInt(1, this.lastReadPollingColumnValue);
                        resultSet = statement.executeQuery();
                        ResultSetMetaData metadata = resultSet.getMetaData();
                        while (resultSet.next()) {
                            ++eventsPerPollingInterval;
                            boolean isTimedout = false;
                            int currentPollingColumnValue = resultSet.getInt(this.pollingColumn);
                            if (currentPollingColumnValue - this.lastReadPollingColumnValue > 1) {
                                if (waitingFor == -1) {
                                    waitingFor = this.lastReadPollingColumnValue + 1;
                                    waitingFrom = System.currentTimeMillis();
                                }
                                long waitEndTimestamp = this.waitTimeout > -1 ? waitingFrom + (long)this.waitTimeout * 1000L : Long.MAX_VALUE;
                                boolean bl = isTimedout = waitEndTimestamp < System.currentTimeMillis();
                                if (!isTimedout) {
                                    log.debug((Object)("Missed record found at " + waitingFor + " in table " + this.tableName + ". Hence pausing the process and retry in " + this.pollingInterval + " seconds."));
                                    break;
                                }
                            }
                            if (waitingFor > -1) {
                                if (isTimedout) {
                                    log.debug((Object)("Waiting for missed record " + waitingFor + " in table " + this.tableName + " timed-out. Hence resuming the process."));
                                } else {
                                    log.debug((Object)("Received the missed record " + waitingFor + " in table " + this.tableName + ". Hence resuming the process."));
                                }
                                waitingFor = -1;
                                waitingFrom = -1L;
                            }
                            HashMap<String, Object> detailsMap = new HashMap<String, Object>();
                            for (int i = 1; i <= metadata.getColumnCount(); ++i) {
                                String key = metadata.getColumnName(i);
                                Object value = resultSet.getObject(key);
                                detailsMap.put(key.toLowerCase(Locale.ENGLISH), value);
                            }
                            this.lastReadPollingColumnValue = resultSet.getInt(this.pollingColumn);
                            this.handleEvent(detailsMap);
                        }
                    }
                    catch (SQLException e) {
                        if (this.metrics != null) {
                            isError = true;
                            this.metrics.setCDCStatus(CDCStatus.ERROR);
                        }
                        log.error((Object)this.buildError("Error occurred while processing records in table %s.", this.tableName), (Throwable)e);
                    }
                    finally {
                        CDCPollingUtil.cleanupConnection(resultSet, null, null);
                    }
                    try {
                        if (this.metrics != null) {
                            this.metrics.setReceiveEventsPerPollingInterval(eventsPerPollingInterval);
                            CDCStatus cdcStatus = isError ? CDCStatus.ERROR : CDCStatus.SUCCESS;
                            this.metrics.pollingDetailsMetric(eventsPerPollingInterval, startedTime, System.currentTimeMillis() - startedTime, cdcStatus);
                        }
                        Thread.sleep((long)this.pollingInterval * 1000L);
                    }
                    catch (InterruptedException e) {
                        if (this.metrics != null) {
                            this.metrics.setCDCStatus(CDCStatus.ERROR);
                        }
                        log.error((Object)this.buildError("Error while polling the table %s.", this.tableName), (Throwable)e);
                    }
                }
            }
            catch (SQLException ex) {
                throw new CDCPollingModeException(this.buildError("Error in polling for changes on %s.", this.tableName), ex);
            }
        }
        catch (Throwable throwable) {
            CDCPollingUtil.cleanupConnection(resultSet, statement, connection);
            throw throwable;
        }
    }

    @Override
    public String getLastReadPollingColumnValue() {
        return String.valueOf(this.lastReadPollingColumnValue);
    }

    @Override
    public void setLastReadPollingColumnValue(String lastReadPollingColumnValue) {
        this.lastReadPollingColumnValue = Integer.parseInt(lastReadPollingColumnValue);
    }
}

