/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.core.table;

import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.config.SiddhiQueryContext;
import io.siddhi.core.event.ComplexEventChunk;
import io.siddhi.core.event.state.StateEvent;
import io.siddhi.core.event.stream.StreamEvent;
import io.siddhi.core.event.stream.StreamEventCloner;
import io.siddhi.core.event.stream.StreamEventFactory;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.exception.DatabaseRuntimeException;
import io.siddhi.core.executor.VariableExpressionExecutor;
import io.siddhi.core.query.processor.stream.window.FindableProcessor;
import io.siddhi.core.table.CompiledUpdateSet;
import io.siddhi.core.table.record.RecordTableHandler;
import io.siddhi.core.util.ExceptionUtil;
import io.siddhi.core.util.StringUtil;
import io.siddhi.core.util.collection.AddingStreamEventExtractor;
import io.siddhi.core.util.collection.operator.CompiledCondition;
import io.siddhi.core.util.collection.operator.MatchingMetaInfoHolder;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.error.handler.model.ErroneousEvent;
import io.siddhi.core.util.error.handler.model.ReplayableTableRecord;
import io.siddhi.core.util.error.handler.util.ErrorHandlerUtils;
import io.siddhi.core.util.error.handler.util.ErrorOccurrence;
import io.siddhi.core.util.error.handler.util.ErrorStoreHelper;
import io.siddhi.core.util.parser.helper.QueryParserHelper;
import io.siddhi.core.util.statistics.LatencyTracker;
import io.siddhi.core.util.statistics.MemoryCalculable;
import io.siddhi.core.util.statistics.ThroughputTracker;
import io.siddhi.core.util.statistics.metrics.Level;
import io.siddhi.core.util.transport.BackoffRetryCounter;
import io.siddhi.query.api.annotation.Element;
import io.siddhi.query.api.definition.Attribute;
import io.siddhi.query.api.definition.TableDefinition;
import io.siddhi.query.api.execution.query.output.stream.UpdateSet;
import io.siddhi.query.api.util.AnnotationHelper;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public abstract class Table
implements FindableProcessor,
MemoryCalculable {
    private static final Logger LOG = LogManager.getLogger(Table.class);
    public Map<String, Table> tableMap;
    protected TableDefinition tableDefinition;
    protected SiddhiAppContext siddhiAppContext;
    private AtomicBoolean isTryingToConnect = new AtomicBoolean(false);
    private BackoffRetryCounter backoffRetryCounter = new BackoffRetryCounter();
    private AtomicBoolean isConnected = new AtomicBoolean(false);
    private ScheduledExecutorService scheduledExecutorService;
    private RecordTableHandler recordTableHandler;
    private OnErrorAction onErrorAction = OnErrorAction.RETRY;
    private boolean isObjectColumnPresent;
    private LatencyTracker latencyTrackerFind;
    private LatencyTracker latencyTrackerInsert;
    private LatencyTracker latencyTrackerUpdate;
    private LatencyTracker latencyTrackerDelete;
    private LatencyTracker latencyTrackerUpdateOrInsert;
    private LatencyTracker latencyTrackerContains;
    private ThroughputTracker throughputTrackerFind;
    private ThroughputTracker throughputTrackerInsert;
    private ThroughputTracker throughputTrackerUpdate;
    private ThroughputTracker throughputTrackerDelete;
    private ThroughputTracker throughputTrackerUpdateOrInsert;
    private ThroughputTracker throughputTrackerContains;

    public void initTable(TableDefinition tableDefinition, StreamEventFactory storeEventPool, StreamEventCloner storeEventCloner, ConfigReader configReader, SiddhiAppContext siddhiAppContext, RecordTableHandler recordTableHandler) {
        this.tableDefinition = tableDefinition;
        this.scheduledExecutorService = siddhiAppContext.getScheduledExecutorService();
        this.siddhiAppContext = siddhiAppContext;
        this.recordTableHandler = recordTableHandler;
        Element onErrorElement = AnnotationHelper.getAnnotationElement((String)"Store", (String)"on.error", (List)tableDefinition.getAnnotations());
        if (onErrorElement != null) {
            this.onErrorAction = OnErrorAction.valueOf(onErrorElement.getValue());
        }
        if (this.onErrorAction == OnErrorAction.STORE && siddhiAppContext.getSiddhiContext().getErrorStore() == null) {
            LOG.error("On error action is 'STORE' for table " + tableDefinition.getId() + " in Siddhi App " + siddhiAppContext.getName() + " but error store is not configured in Siddhi Manager");
        }
        this.isObjectColumnPresent = this.isObjectColumnPresent(tableDefinition);
        if (siddhiAppContext.getStatisticsManager() != null) {
            this.latencyTrackerFind = QueryParserHelper.createLatencyTracker(siddhiAppContext, tableDefinition.getId(), "Tables", "find");
            this.latencyTrackerInsert = QueryParserHelper.createLatencyTracker(siddhiAppContext, tableDefinition.getId(), "Tables", "insert");
            this.latencyTrackerUpdate = QueryParserHelper.createLatencyTracker(siddhiAppContext, tableDefinition.getId(), "Tables", "update");
            this.latencyTrackerDelete = QueryParserHelper.createLatencyTracker(siddhiAppContext, tableDefinition.getId(), "Tables", "delete");
            this.latencyTrackerUpdateOrInsert = QueryParserHelper.createLatencyTracker(siddhiAppContext, tableDefinition.getId(), "Tables", "updateOrInsert");
            this.latencyTrackerContains = QueryParserHelper.createLatencyTracker(siddhiAppContext, tableDefinition.getId(), "Tables", "contains");
            this.throughputTrackerFind = QueryParserHelper.createThroughputTracker(siddhiAppContext, tableDefinition.getId(), "Tables", "find");
            this.throughputTrackerInsert = QueryParserHelper.createThroughputTracker(siddhiAppContext, tableDefinition.getId(), "Tables", "insert");
            this.throughputTrackerUpdate = QueryParserHelper.createThroughputTracker(siddhiAppContext, tableDefinition.getId(), "Tables", "update");
            this.throughputTrackerDelete = QueryParserHelper.createThroughputTracker(siddhiAppContext, tableDefinition.getId(), "Tables", "delete");
            this.throughputTrackerUpdateOrInsert = QueryParserHelper.createThroughputTracker(siddhiAppContext, tableDefinition.getId(), "Tables", "updateOrInsert");
            this.throughputTrackerContains = QueryParserHelper.createThroughputTracker(siddhiAppContext, tableDefinition.getId(), "Tables", "contains");
        }
        this.init(tableDefinition, storeEventPool, storeEventCloner, configReader, siddhiAppContext, recordTableHandler);
    }

    protected abstract void init(TableDefinition var1, StreamEventFactory var2, StreamEventCloner var3, ConfigReader var4, SiddhiAppContext var5, RecordTableHandler var6);

    public TableDefinition getTableDefinition() {
        return this.tableDefinition;
    }

    private boolean isObjectColumnPresent(TableDefinition tableDefinition) {
        return tableDefinition.getAttributeList().stream().anyMatch(attribute -> attribute.getType() == Attribute.Type.OBJECT);
    }

    protected void onAddError(ComplexEventChunk<StreamEvent> addingEventChunk, Exception e) {
        OnErrorAction errorAction = this.onErrorAction;
        if (e instanceof ConnectionUnavailableException) {
            this.isConnected.set(false);
            if (errorAction == OnErrorAction.STORE) {
                this.handleStoreAddError(addingEventChunk, true, e);
                LOG.error("Error on '" + this.siddhiAppContext.getName() + "' while performing add for events  at '" + this.tableDefinition.getId() + "'. Events saved '" + addingEventChunk.toString() + "'");
                if (LOG.isDebugEnabled()) {
                    LOG.debug(e);
                }
                if (!this.isTryingToConnect.get()) {
                    this.connectWithRetry();
                }
            } else if (this.isTryingToConnect.get()) {
                LOG.warn("Error on '" + this.siddhiAppContext.getName() + "' while performing add for events '" + addingEventChunk + "', operation busy waiting at Table '" + this.tableDefinition.getId() + "' as its trying to reconnect!");
                this.waitWhileConnect();
                LOG.info("SiddhiApp '" + this.siddhiAppContext.getName() + "' table '" + this.tableDefinition.getId() + "' has become available for add operation for events '" + addingEventChunk + "'");
                this.add(addingEventChunk);
            } else {
                this.connectWithRetry();
                this.add(addingEventChunk);
            }
        } else if (e instanceof DatabaseRuntimeException) {
            if (errorAction == OnErrorAction.STORE) {
                this.handleStoreAddError(addingEventChunk, false, e);
                LOG.error("Error on '" + this.siddhiAppContext.getName() + "' while performing add for events  at '" + this.tableDefinition.getId() + "'. Events saved '" + addingEventChunk.toString() + "'");
                if (LOG.isDebugEnabled()) {
                    LOG.debug(e);
                }
            } else {
                throw (DatabaseRuntimeException)e;
            }
        }
    }

    private void handleStoreAddError(ComplexEventChunk addingEventChunk, boolean isFromConnectionUnavailableException, Exception e) {
        addingEventChunk.reset();
        ReplayableTableRecord record = new ReplayableTableRecord(addingEventChunk);
        record.setFromConnectionUnavailableException(isFromConnectionUnavailableException);
        record.setEditable(!this.isObjectColumnPresent);
        ErroneousEvent erroneousEvent = new ErroneousEvent(record, e, e.getMessage());
        erroneousEvent.setOriginalPayload(ErrorHandlerUtils.constructAddErrorRecordString(addingEventChunk, isFromConnectionUnavailableException, this.tableDefinition, e));
        ErrorStoreHelper.storeErroneousEvent(this.siddhiAppContext.getSiddhiContext().getErrorStore(), ErrorOccurrence.STORE_ON_TABLE_ADD, this.siddhiAppContext.getName(), erroneousEvent, this.tableDefinition.getId());
    }

    public void addEvents(ComplexEventChunk<StreamEvent> addingEventChunk, int noOfEvents) {
        try {
            if (this.throughputTrackerInsert != null && Level.BASIC.compareTo(this.siddhiAppContext.getRootMetricsLevel()) <= 0) {
                this.throughputTrackerInsert.eventsIn(noOfEvents);
            }
            if (this.latencyTrackerInsert != null && Level.BASIC.compareTo(this.siddhiAppContext.getRootMetricsLevel()) <= 0) {
                this.latencyTrackerInsert.markIn();
            }
            addingEventChunk.reset();
            this.add(addingEventChunk);
        }
        finally {
            if (this.latencyTrackerInsert != null && Level.BASIC.compareTo(this.siddhiAppContext.getRootMetricsLevel()) <= 0) {
                this.latencyTrackerInsert.markOut();
            }
        }
    }

    public abstract void add(ComplexEventChunk<StreamEvent> var1);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public StreamEvent find(StateEvent matchingEvent, CompiledCondition compiledCondition) {
        if (this.isConnected.get()) {
            try {
                if (this.latencyTrackerFind != null && Level.BASIC.compareTo(this.siddhiAppContext.getRootMetricsLevel()) <= 0) {
                    this.latencyTrackerFind.markIn();
                }
                StreamEvent results = this.find(compiledCondition, matchingEvent);
                if (this.throughputTrackerFind != null && Level.BASIC.compareTo(this.siddhiAppContext.getRootMetricsLevel()) <= 0) {
                    this.throughputTrackerFind.eventIn();
                }
                StreamEvent streamEvent = results;
                return streamEvent;
            }
            catch (ConnectionUnavailableException e) {
                this.isConnected.set(false);
                LOG.error(ExceptionUtil.getMessageWithContext(e, this.siddhiAppContext) + " Connection unavailable at Table '" + this.tableDefinition.getId() + "', will retry connection immediately.", (Throwable)e);
                this.connectWithRetry();
                StreamEvent streamEvent = this.find(matchingEvent, compiledCondition);
                return streamEvent;
            }
            finally {
                if (this.latencyTrackerFind != null && Level.BASIC.compareTo(this.siddhiAppContext.getRootMetricsLevel()) <= 0) {
                    this.latencyTrackerFind.markOut();
                }
            }
        }
        if (this.isTryingToConnect.get()) {
            LOG.warn("Error on '" + this.siddhiAppContext.getName() + "' while performing find for events '" + matchingEvent + "', operation busy waiting at Table '" + this.tableDefinition.getId() + "' as its trying to reconnect!");
            this.waitWhileConnect();
            LOG.info("SiddhiApp '" + this.siddhiAppContext.getName() + "' table '" + this.tableDefinition.getId() + "' has become available for find operation for events '" + matchingEvent + "'");
            return this.find(matchingEvent, compiledCondition);
        }
        this.connectWithRetry();
        return this.find(matchingEvent, compiledCondition);
    }

    protected abstract StreamEvent find(CompiledCondition var1, StateEvent var2) throws ConnectionUnavailableException;

    protected void onDeleteError(ComplexEventChunk<StateEvent> deletingEventChunk, CompiledCondition compiledCondition, Exception e) {
        OnErrorAction errorAction = this.onErrorAction;
        if (e instanceof ConnectionUnavailableException) {
            this.isConnected.set(false);
            if (errorAction == OnErrorAction.STORE) {
                deletingEventChunk.reset();
                ErroneousEvent erroneousEvent = new ErroneousEvent(new ReplayableTableRecord(deletingEventChunk, compiledCondition), e, e.getMessage());
                erroneousEvent.setOriginalPayload(ErrorHandlerUtils.constructErrorRecordString(deletingEventChunk, this.isObjectColumnPresent, this.tableDefinition, e));
                ErrorStoreHelper.storeErroneousEvent(this.siddhiAppContext.getSiddhiContext().getErrorStore(), ErrorOccurrence.STORE_ON_TABLE_DELETE, this.siddhiAppContext.getName(), erroneousEvent, this.tableDefinition.getId());
                LOG.error("Error on '" + this.siddhiAppContext.getName() + "' while performing delete for events  at '" + this.tableDefinition.getId() + "'. Events saved '" + deletingEventChunk.toString() + "'");
                if (LOG.isDebugEnabled()) {
                    LOG.debug(e);
                }
                if (!this.isTryingToConnect.get()) {
                    this.connectWithRetry();
                }
            } else if (this.isTryingToConnect.get()) {
                LOG.warn("Error on '" + this.siddhiAppContext.getName() + "' while performing delete for events '" + deletingEventChunk + "', operation busy waiting at Table '" + this.tableDefinition.getId() + "' as its trying to reconnect!");
                this.waitWhileConnect();
                LOG.info("SiddhiApp '" + this.siddhiAppContext.getName() + "' table '" + this.tableDefinition.getId() + "' has become available for delete operation for events '" + deletingEventChunk + "'");
                this.delete(deletingEventChunk, compiledCondition);
            } else {
                this.connectWithRetry();
                this.delete(deletingEventChunk, compiledCondition);
            }
        } else if (e instanceof DatabaseRuntimeException && errorAction == OnErrorAction.STORE) {
            deletingEventChunk.reset();
            ReplayableTableRecord record = new ReplayableTableRecord(deletingEventChunk, compiledCondition);
            record.setFromConnectionUnavailableException(false);
            ErroneousEvent erroneousEvent = new ErroneousEvent(record, e, e.getMessage());
            erroneousEvent.setOriginalPayload(ErrorHandlerUtils.constructErrorRecordString(deletingEventChunk, this.isObjectColumnPresent, this.tableDefinition, e));
            ErrorStoreHelper.storeErroneousEvent(this.siddhiAppContext.getSiddhiContext().getErrorStore(), ErrorOccurrence.STORE_ON_TABLE_DELETE, this.siddhiAppContext.getName(), erroneousEvent, this.tableDefinition.getId());
            LOG.error("Error on '" + this.siddhiAppContext.getName() + "' while performing delete for events  at '" + this.tableDefinition.getId() + "'. Events saved '" + deletingEventChunk.toString() + "'");
            if (LOG.isDebugEnabled()) {
                LOG.debug(e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void deleteEvents(ComplexEventChunk<StateEvent> deletingEventChunk, CompiledCondition compiledCondition, int noOfEvents) {
        try {
            if (this.throughputTrackerDelete != null && Level.BASIC.compareTo(this.siddhiAppContext.getRootMetricsLevel()) <= 0) {
                this.throughputTrackerDelete.eventsIn(noOfEvents);
            }
            if (this.latencyTrackerDelete != null && Level.BASIC.compareTo(this.siddhiAppContext.getRootMetricsLevel()) <= 0) {
                this.latencyTrackerDelete.markIn();
            }
            this.delete(deletingEventChunk, compiledCondition);
        }
        finally {
            if (this.latencyTrackerDelete != null && Level.BASIC.compareTo(this.siddhiAppContext.getRootMetricsLevel()) <= 0) {
                this.latencyTrackerDelete.markOut();
            }
        }
    }

    public abstract void delete(ComplexEventChunk<StateEvent> var1, CompiledCondition var2);

    protected void onUpdateError(ComplexEventChunk<StateEvent> updatingEventChunk, CompiledCondition compiledCondition, CompiledUpdateSet compiledUpdateSet, Exception e) {
        OnErrorAction errorAction = this.onErrorAction;
        if (e instanceof ConnectionUnavailableException) {
            this.isConnected.set(false);
            if (errorAction == OnErrorAction.STORE) {
                updatingEventChunk.reset();
                ErroneousEvent erroneousEvent = new ErroneousEvent(new ReplayableTableRecord(updatingEventChunk, compiledCondition, compiledUpdateSet), e, e.getMessage());
                erroneousEvent.setOriginalPayload(ErrorHandlerUtils.constructErrorRecordString(updatingEventChunk, this.isObjectColumnPresent, this.tableDefinition, e));
                ErrorStoreHelper.storeErroneousEvent(this.siddhiAppContext.getSiddhiContext().getErrorStore(), ErrorOccurrence.STORE_ON_TABLE_UPDATE, this.siddhiAppContext.getName(), erroneousEvent, this.tableDefinition.getId());
                LOG.error("Error on '" + this.siddhiAppContext.getName() + "' while performing update for events  at '" + this.tableDefinition.getId() + "'. Events saved '" + updatingEventChunk.toString() + "'");
                if (LOG.isDebugEnabled()) {
                    LOG.debug(e);
                }
                if (!this.isTryingToConnect.get()) {
                    this.connectWithRetry();
                }
            } else if (this.isTryingToConnect.get()) {
                LOG.warn("Error on '" + this.siddhiAppContext.getName() + "' while performing update for events '" + updatingEventChunk + "', operation busy waiting at Table '" + this.tableDefinition.getId() + "' as its trying to reconnect!");
                this.waitWhileConnect();
                LOG.info("SiddhiApp '" + this.siddhiAppContext.getName() + "' table '" + this.tableDefinition.getId() + "' has become available for update operation for events '" + updatingEventChunk + "'");
                this.update(updatingEventChunk, compiledCondition, compiledUpdateSet);
            } else {
                this.connectWithRetry();
                this.update(updatingEventChunk, compiledCondition, compiledUpdateSet);
            }
        } else if (e instanceof DatabaseRuntimeException && errorAction == OnErrorAction.STORE) {
            updatingEventChunk.reset();
            ReplayableTableRecord record = new ReplayableTableRecord(updatingEventChunk, compiledCondition, compiledUpdateSet);
            record.setFromConnectionUnavailableException(false);
            ErroneousEvent erroneousEvent = new ErroneousEvent(record, e, e.getMessage());
            erroneousEvent.setOriginalPayload(ErrorHandlerUtils.constructErrorRecordString(updatingEventChunk, this.isObjectColumnPresent, this.tableDefinition, e));
            ErrorStoreHelper.storeErroneousEvent(this.siddhiAppContext.getSiddhiContext().getErrorStore(), ErrorOccurrence.STORE_ON_TABLE_UPDATE, this.siddhiAppContext.getName(), erroneousEvent, this.tableDefinition.getId());
            LOG.error("Error on '" + this.siddhiAppContext.getName() + "' while performing update for events  at '" + this.tableDefinition.getId() + "'. Events saved '" + updatingEventChunk.toString() + "'");
            if (LOG.isDebugEnabled()) {
                LOG.debug(e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void updateEvents(ComplexEventChunk<StateEvent> updatingEventChunk, CompiledCondition compiledCondition, CompiledUpdateSet compiledUpdateSet, int noOfEvents) {
        try {
            if (this.throughputTrackerUpdate != null && Level.BASIC.compareTo(this.siddhiAppContext.getRootMetricsLevel()) <= 0) {
                this.throughputTrackerUpdate.eventsIn(noOfEvents);
            }
            if (this.latencyTrackerUpdate != null && Level.BASIC.compareTo(this.siddhiAppContext.getRootMetricsLevel()) <= 0) {
                this.latencyTrackerUpdate.markIn();
            }
            this.update(updatingEventChunk, compiledCondition, compiledUpdateSet);
        }
        finally {
            if (this.latencyTrackerUpdate != null && Level.BASIC.compareTo(this.siddhiAppContext.getRootMetricsLevel()) <= 0) {
                this.latencyTrackerUpdate.markOut();
            }
        }
    }

    public abstract void update(ComplexEventChunk<StateEvent> var1, CompiledCondition var2, CompiledUpdateSet var3);

    protected void onUpdateOrAddError(ComplexEventChunk<StateEvent> updateOrAddingEventChunk, CompiledCondition compiledCondition, CompiledUpdateSet compiledUpdateSet, AddingStreamEventExtractor addingStreamEventExtractor, Exception e) {
        OnErrorAction errorAction = this.onErrorAction;
        if (e instanceof ConnectionUnavailableException) {
            this.isConnected.set(false);
            if (errorAction == OnErrorAction.STORE) {
                updateOrAddingEventChunk.reset();
                ErroneousEvent erroneousEvent = new ErroneousEvent(new ReplayableTableRecord(updateOrAddingEventChunk, compiledCondition, compiledUpdateSet, addingStreamEventExtractor), e, e.getMessage());
                erroneousEvent.setOriginalPayload(ErrorHandlerUtils.constructErrorRecordString(updateOrAddingEventChunk, this.isObjectColumnPresent, this.tableDefinition, e));
                ErrorStoreHelper.storeErroneousEvent(this.siddhiAppContext.getSiddhiContext().getErrorStore(), ErrorOccurrence.STORE_ON_TABLE_UPDATE_OR_ADD, this.siddhiAppContext.getName(), erroneousEvent, this.tableDefinition.getId());
                LOG.error("Error on '" + this.siddhiAppContext.getName() + "' while performing update or add for events  at '" + this.tableDefinition.getId() + "'. Events saved '" + updateOrAddingEventChunk.toString() + "'");
                if (LOG.isDebugEnabled()) {
                    LOG.debug(e);
                }
                if (!this.isTryingToConnect.get()) {
                    this.connectWithRetry();
                }
            } else if (this.isTryingToConnect.get()) {
                LOG.warn("Error on '" + this.siddhiAppContext.getName() + "' while performing update or add for events '" + updateOrAddingEventChunk + "', operation busy waiting at Table '" + this.tableDefinition.getId() + "' as its trying to reconnect!");
                this.waitWhileConnect();
                LOG.info("SiddhiApp '" + this.siddhiAppContext.getName() + "' table '" + this.tableDefinition.getId() + "' has become available for update or add operation for events '" + updateOrAddingEventChunk + "'");
                this.updateOrAdd(updateOrAddingEventChunk, compiledCondition, compiledUpdateSet, addingStreamEventExtractor);
            } else {
                this.connectWithRetry();
                this.updateOrAdd(updateOrAddingEventChunk, compiledCondition, compiledUpdateSet, addingStreamEventExtractor);
            }
        } else if (e instanceof DatabaseRuntimeException && errorAction == OnErrorAction.STORE) {
            updateOrAddingEventChunk.reset();
            ReplayableTableRecord record = new ReplayableTableRecord(updateOrAddingEventChunk, compiledCondition, compiledUpdateSet, addingStreamEventExtractor);
            record.setFromConnectionUnavailableException(false);
            ErroneousEvent erroneousEvent = new ErroneousEvent(record, e, e.getMessage());
            erroneousEvent.setOriginalPayload(ErrorHandlerUtils.constructErrorRecordString(updateOrAddingEventChunk, this.isObjectColumnPresent, this.tableDefinition, e));
            ErrorStoreHelper.storeErroneousEvent(this.siddhiAppContext.getSiddhiContext().getErrorStore(), ErrorOccurrence.STORE_ON_TABLE_UPDATE_OR_ADD, this.siddhiAppContext.getName(), erroneousEvent, this.tableDefinition.getId());
            LOG.error("Error on '" + this.siddhiAppContext.getName() + "' while performing update or add for events  at '" + this.tableDefinition.getId() + "'. Events saved '" + updateOrAddingEventChunk.toString() + "'");
            if (LOG.isDebugEnabled()) {
                LOG.debug(e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void updateOrAddEvents(ComplexEventChunk<StateEvent> updateOrAddingEventChunk, CompiledCondition compiledCondition, CompiledUpdateSet compiledUpdateSet, AddingStreamEventExtractor addingStreamEventExtractor, int noOfEvents) {
        try {
            if (this.throughputTrackerUpdateOrInsert != null && Level.BASIC.compareTo(this.siddhiAppContext.getRootMetricsLevel()) <= 0) {
                this.throughputTrackerUpdateOrInsert.eventsIn(noOfEvents);
            }
            if (this.latencyTrackerUpdateOrInsert != null && Level.BASIC.compareTo(this.siddhiAppContext.getRootMetricsLevel()) <= 0) {
                this.latencyTrackerUpdateOrInsert.markIn();
            }
            this.updateOrAdd(updateOrAddingEventChunk, compiledCondition, compiledUpdateSet, addingStreamEventExtractor);
        }
        finally {
            if (this.latencyTrackerUpdateOrInsert != null && Level.BASIC.compareTo(this.siddhiAppContext.getRootMetricsLevel()) <= 0) {
                this.latencyTrackerUpdateOrInsert.markOut();
            }
        }
    }

    public abstract void updateOrAdd(ComplexEventChunk<StateEvent> var1, CompiledCondition var2, CompiledUpdateSet var3, AddingStreamEventExtractor var4);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean containsEvent(StateEvent matchingEvent, CompiledCondition compiledCondition) {
        if (this.isConnected.get()) {
            try {
                if (this.latencyTrackerContains != null && Level.BASIC.compareTo(this.siddhiAppContext.getRootMetricsLevel()) <= 0) {
                    this.latencyTrackerContains.markIn();
                }
                boolean results = this.contains(matchingEvent, compiledCondition);
                if (this.throughputTrackerContains != null && Level.BASIC.compareTo(this.siddhiAppContext.getRootMetricsLevel()) <= 0) {
                    this.throughputTrackerContains.eventIn();
                }
                boolean bl = results;
                return bl;
            }
            catch (ConnectionUnavailableException e) {
                this.isConnected.set(false);
                LOG.error(ExceptionUtil.getMessageWithContext(e, this.siddhiAppContext) + " Connection unavailable at Table '" + this.tableDefinition.getId() + "', will retry connection immediately.", (Throwable)e);
                this.connectWithRetry();
                boolean bl = this.containsEvent(matchingEvent, compiledCondition);
                return bl;
            }
            finally {
                if (this.latencyTrackerContains != null && Level.BASIC.compareTo(this.siddhiAppContext.getRootMetricsLevel()) <= 0) {
                    this.latencyTrackerContains.markOut();
                }
            }
        }
        if (this.isTryingToConnect.get()) {
            LOG.warn("Error on '" + this.siddhiAppContext.getName() + "' while performing contains check for event '" + matchingEvent + "', operation busy waiting at Table '" + this.tableDefinition.getId() + "' as its trying to reconnect!");
            this.waitWhileConnect();
            LOG.info("SiddhiApp '" + this.siddhiAppContext.getName() + "' table '" + this.tableDefinition.getId() + "' has become available for contains check operation for matching event '" + matchingEvent + "'");
            return this.containsEvent(matchingEvent, compiledCondition);
        }
        this.connectWithRetry();
        return this.containsEvent(matchingEvent, compiledCondition);
    }

    protected abstract boolean contains(StateEvent var1, CompiledCondition var2) throws ConnectionUnavailableException;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void connectWithRetry() {
        if (!this.isConnected.get()) {
            this.isTryingToConnect.set(true);
            try {
                this.connectAndLoadCache();
                this.isConnected.set(true);
                Table table = this;
                synchronized (table) {
                    this.isTryingToConnect.set(false);
                    this.notifyAll();
                }
                this.backoffRetryCounter.reset();
            }
            catch (ConnectionUnavailableException e) {
                LOG.error(StringUtil.removeCRLFCharacters(ExceptionUtil.getMessageWithContext(e, this.siddhiAppContext)) + " Error while connecting to Table '" + StringUtil.removeCRLFCharacters(this.tableDefinition.getId()) + "', will retry in '" + StringUtil.removeCRLFCharacters(this.backoffRetryCounter.getTimeInterval()) + "'.", (Throwable)e);
                this.scheduledExecutorService.schedule(new Runnable(){

                    @Override
                    public void run() {
                        Table.this.connectWithRetry();
                    }
                }, this.backoffRetryCounter.getTimeIntervalMillis(), TimeUnit.MILLISECONDS);
                this.backoffRetryCounter.increment();
            }
            catch (RuntimeException e) {
                LOG.error(StringUtil.removeCRLFCharacters(ExceptionUtil.getMessageWithContext(e, this.siddhiAppContext)) + " . Error while connecting to Table '" + StringUtil.removeCRLFCharacters(this.tableDefinition.getId()) + "'.", (Throwable)e);
                throw e;
            }
        }
    }

    public void setIsConnectedToFalse() {
        this.isConnected.set(false);
    }

    public boolean getIsTryingToConnect() {
        return this.isTryingToConnect.get();
    }

    public boolean getIsConnected() {
        return this.isConnected.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void waitWhileConnect() {
        try {
            Table table = this;
            synchronized (table) {
                while (this.isTryingToConnect.get()) {
                    this.wait();
                }
            }
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Error on SiddhiApp '" + this.siddhiAppContext.getName() + "', interrupted while busy wait on connection retrying condition " + e.getMessage(), e);
        }
    }

    public abstract CompiledUpdateSet compileUpdateSet(UpdateSet var1, MatchingMetaInfoHolder var2, List<VariableExpressionExecutor> var3, Map<String, Table> var4, SiddhiQueryContext var5);

    protected abstract void connectAndLoadCache() throws ConnectionUnavailableException;

    protected abstract void disconnect();

    protected abstract void destroy();

    public RecordTableHandler getHandler() {
        return this.recordTableHandler;
    }

    public void shutdown() {
        this.disconnect();
        this.destroy();
        this.isConnected.set(false);
        this.isTryingToConnect.set(false);
    }

    public abstract boolean isStateful();

    public static enum OnErrorAction {
        LOG,
        STORE,
        RETRY;

    }
}

