package org.wso2.siddhi.core.table;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.state.StateEvent;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventCloner;
import org.wso2.siddhi.core.event.stream.StreamEventPool;
import org.wso2.siddhi.core.exception.ConnectionUnavailableException;
import org.wso2.siddhi.core.query.processor.stream.window.FindableProcessor;
import org.wso2.siddhi.core.util.collection.AddingStreamEventExtractor;
import org.wso2.siddhi.core.util.collection.UpdateAttributeMapper;
import org.wso2.siddhi.core.util.collection.operator.CompiledCondition;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.core.util.transport.BackoffRetryCounter;
import org.wso2.siddhi.query.api.definition.TableDefinition;

/* loaded from: input_file:org/wso2/siddhi/core/table/Table.class */
public abstract class Table implements FindableProcessor {
    private static final Logger LOG = Logger.getLogger(Table.class);
    protected TableDefinition tableDefinition;
    private AtomicBoolean isTryingToConnect = new AtomicBoolean(false);
    private BackoffRetryCounter backoffRetryCounter = new BackoffRetryCounter();
    private AtomicBoolean isConnected = new AtomicBoolean(false);
    private ScheduledExecutorService scheduledExecutorService;

    public void initTable(TableDefinition tableDefinition, StreamEventPool streamEventPool, StreamEventCloner streamEventCloner, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.tableDefinition = tableDefinition;
        this.scheduledExecutorService = siddhiAppContext.getScheduledExecutorService();
        init(tableDefinition, streamEventPool, streamEventCloner, configReader, siddhiAppContext);
    }

    protected abstract void init(TableDefinition tableDefinition, StreamEventPool streamEventPool, StreamEventCloner streamEventCloner, ConfigReader configReader, SiddhiAppContext siddhiAppContext);

    public TableDefinition getTableDefinition() {
        return this.tableDefinition;
    }

    public void addEvents(ComplexEventChunk<StreamEvent> complexEventChunk) {
        if (!this.isConnected.get()) {
            if (this.isTryingToConnect.get()) {
                LOG.error("Dropping event at Table '" + this.tableDefinition.getId() + "' as its still trying to reconnect!, events dropped '" + complexEventChunk + "'");
                return;
            } else {
                connectWithRetry();
                addEvents(complexEventChunk);
                return;
            }
        }
        try {
            add(complexEventChunk);
        } catch (ConnectionUnavailableException e) {
            this.isConnected.set(false);
            LOG.error("Connection unavailable at Table '" + this.tableDefinition.getId() + "', " + e.getMessage() + ", will retry connection immediately.", e);
            connectWithRetry();
            addEvents(complexEventChunk);
        }
    }

    protected abstract void add(ComplexEventChunk<StreamEvent> complexEventChunk) throws ConnectionUnavailableException;

    @Override // org.wso2.siddhi.core.query.processor.stream.window.FindableProcessor
    public StreamEvent find(StateEvent stateEvent, CompiledCondition compiledCondition) {
        if (!this.isConnected.get()) {
            if (this.isTryingToConnect.get()) {
                LOG.error("Find operation failed for event '" + stateEvent + "', at Table '" + this.tableDefinition.getId() + "' as its still trying to reconnect!");
                return null;
            }
            connectWithRetry();
            return find(stateEvent, compiledCondition);
        }
        try {
            return find(compiledCondition, stateEvent);
        } catch (ConnectionUnavailableException e) {
            this.isConnected.set(false);
            LOG.error("Connection unavailable at Table '" + this.tableDefinition.getId() + "', " + e.getMessage() + ", will retry connection immediately.", e);
            connectWithRetry();
            return find(stateEvent, compiledCondition);
        }
    }

    protected abstract StreamEvent find(CompiledCondition compiledCondition, StateEvent stateEvent) throws ConnectionUnavailableException;

    public void deleteEvents(ComplexEventChunk<StateEvent> complexEventChunk, CompiledCondition compiledCondition) {
        if (!this.isConnected.get()) {
            if (this.isTryingToConnect.get()) {
                LOG.error("Dropping event at Table '" + this.tableDefinition.getId() + "' as its still trying to reconnect!, events dropped '" + complexEventChunk + "'");
                return;
            } else {
                connectWithRetry();
                deleteEvents(complexEventChunk, compiledCondition);
                return;
            }
        }
        try {
            delete(complexEventChunk, compiledCondition);
        } catch (ConnectionUnavailableException e) {
            this.isConnected.set(false);
            LOG.error("Connection unavailable at Table '" + this.tableDefinition.getId() + "', " + e.getMessage() + ", will retry connection immediately.", e);
            connectWithRetry();
            deleteEvents(complexEventChunk, compiledCondition);
        }
    }

    protected abstract void delete(ComplexEventChunk<StateEvent> complexEventChunk, CompiledCondition compiledCondition) throws ConnectionUnavailableException;

    public void updateEvents(ComplexEventChunk<StateEvent> complexEventChunk, CompiledCondition compiledCondition, UpdateAttributeMapper[] updateAttributeMapperArr) {
        if (!this.isConnected.get()) {
            if (this.isTryingToConnect.get()) {
                LOG.error("Dropping event at Table '" + this.tableDefinition.getId() + "' as its still trying to reconnect!, events dropped '" + complexEventChunk + "'");
                return;
            } else {
                connectWithRetry();
                updateEvents(complexEventChunk, compiledCondition, updateAttributeMapperArr);
                return;
            }
        }
        try {
            update(complexEventChunk, compiledCondition, updateAttributeMapperArr);
        } catch (ConnectionUnavailableException e) {
            this.isConnected.set(false);
            LOG.error("Connection unavailable at Table '" + this.tableDefinition.getId() + "', " + e.getMessage() + ", will retry connection immediately.", e);
            connectWithRetry();
            updateEvents(complexEventChunk, compiledCondition, updateAttributeMapperArr);
        }
    }

    protected abstract void update(ComplexEventChunk<StateEvent> complexEventChunk, CompiledCondition compiledCondition, UpdateAttributeMapper[] updateAttributeMapperArr) throws ConnectionUnavailableException;

    public void updateOrAddEvents(ComplexEventChunk<StateEvent> complexEventChunk, CompiledCondition compiledCondition, UpdateAttributeMapper[] updateAttributeMapperArr, AddingStreamEventExtractor addingStreamEventExtractor) {
        if (!this.isConnected.get()) {
            if (this.isTryingToConnect.get()) {
                LOG.error("Dropping event at Table '" + this.tableDefinition.getId() + "' as its still trying to reconnect!, events dropped '" + complexEventChunk + "'");
                return;
            } else {
                connectWithRetry();
                updateOrAddEvents(complexEventChunk, compiledCondition, updateAttributeMapperArr, addingStreamEventExtractor);
                return;
            }
        }
        try {
            updateOrAdd(complexEventChunk, compiledCondition, updateAttributeMapperArr, addingStreamEventExtractor);
        } catch (ConnectionUnavailableException e) {
            this.isConnected.set(false);
            LOG.error("Connection unavailable at Table '" + this.tableDefinition.getId() + "', " + e.getMessage() + ", will retry connection immediately.", e);
            connectWithRetry();
            updateOrAddEvents(complexEventChunk, compiledCondition, updateAttributeMapperArr, addingStreamEventExtractor);
        }
    }

    protected abstract void updateOrAdd(ComplexEventChunk<StateEvent> complexEventChunk, CompiledCondition compiledCondition, UpdateAttributeMapper[] updateAttributeMapperArr, AddingStreamEventExtractor addingStreamEventExtractor) throws ConnectionUnavailableException;

    public boolean containsEvent(StateEvent stateEvent, CompiledCondition compiledCondition) {
        if (!this.isConnected.get()) {
            if (this.isTryingToConnect.get()) {
                LOG.error("Dropping event at Table '" + this.tableDefinition.getId() + "' as its still trying to reconnect!, event matching failed for event '" + stateEvent + "'");
                return false;
            }
            connectWithRetry();
            return containsEvent(stateEvent, compiledCondition);
        }
        try {
            return contains(stateEvent, compiledCondition);
        } catch (ConnectionUnavailableException e) {
            this.isConnected.set(false);
            LOG.error("Connection unavailable at Table '" + this.tableDefinition.getId() + "', " + e.getMessage() + ", will retry connection immediately.", e);
            connectWithRetry();
            return containsEvent(stateEvent, compiledCondition);
        }
    }

    protected abstract boolean contains(StateEvent stateEvent, CompiledCondition compiledCondition) throws ConnectionUnavailableException;

    public void connectWithRetry() {
        if (this.isConnected.get()) {
            return;
        }
        this.isTryingToConnect.set(true);
        try {
            connect();
            this.isConnected.set(true);
            this.isTryingToConnect.set(false);
            this.backoffRetryCounter.reset();
        } catch (RuntimeException | ConnectionUnavailableException e) {
            LOG.error("Error while connecting to Table '" + this.tableDefinition.getId() + "', " + e.getMessage() + ", will retry in '" + this.backoffRetryCounter.getTimeInterval() + "'.", e);
            this.scheduledExecutorService.schedule(new Runnable() { // from class: org.wso2.siddhi.core.table.Table.1
                @Override // java.lang.Runnable
                public void run() {
                    Table.this.connectWithRetry();
                }
            }, this.backoffRetryCounter.getTimeIntervalMillis(), TimeUnit.MILLISECONDS);
            this.backoffRetryCounter.increment();
        }
    }

    protected abstract void connect() throws ConnectionUnavailableException;

    protected abstract void disconnect();

    protected abstract void destroy();

    public void shutdown() {
        disconnect();
        destroy();
        this.isConnected.set(false);
        this.isTryingToConnect.set(false);
    }
}
