package org.apache.synapse.message.store.impl.resequencer;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.MessageContext;
import org.apache.synapse.SynapseException;
import org.apache.synapse.config.xml.SynapsePath;
import org.apache.synapse.core.SynapseEnvironment;
import org.apache.synapse.message.store.impl.jdbc.JDBCMessageStore;
import org.apache.synapse.message.store.impl.jdbc.StoreException;
import org.apache.synapse.message.store.impl.jdbc.util.Statement;

/* loaded from: input_file:WEB-INF/lib/synapse-core-4.0.0-wso2v53.jar:org/apache/synapse/message/store/impl/resequencer/ResequenceMessageStore.class */
public class ResequenceMessageStore extends JDBCMessageStore {
    private static final Log log = LogFactory.getLog(ResequenceMessageStore.class);
    private static final int MILLISECONDS = 1000;
    private SynapsePath xPath;
    private long gapTimeoutInterval;
    private long nextElapsedTime;
    private long nextSequenceId = 0;
    private boolean hasStarted = false;
    private ConcurrentHashMap<String, Long> sequenceIdMapper = new ConcurrentHashMap<>();

    /* JADX INFO: Access modifiers changed from: private */
    public List<Map> startIdSelectionResult(ResultSet resultSet) throws SQLException {
        ArrayList arrayList = new ArrayList();
        while (resultSet.next()) {
            HashMap hashMap = new HashMap();
            long j = resultSet.getLong("seq_id");
            hashMap.put("seq_id", Long.valueOf(j));
            arrayList.add(hashMap);
            if (log.isDebugEnabled()) {
                log.debug("DB returned " + j + " as the result");
            }
        }
        return arrayList;
    }

    private long readStartId() {
        Long l = 0L;
        List<Map> processedRows = getProcessedRows(new Statement("SELECT seq_id FROM tbl_lastprocessid WHERE statement=\"" + getName() + "\"") { // from class: org.apache.synapse.message.store.impl.resequencer.ResequenceMessageStore.1
            @Override // org.apache.synapse.message.store.impl.jdbc.util.Statement
            public List<Map> getResult(ResultSet resultSet) throws SQLException {
                return ResequenceMessageStore.this.startIdSelectionResult(resultSet);
            }
        });
        if (processedRows.size() > 0) {
            l = (Long) processedRows.get(0).get("seq_id");
            log.info("Starting sequence id recorded as:" + l);
        }
        return l.longValue();
    }

    private void initResequenceParams(Map<String, Object> map) {
        this.xPath = (SynapsePath) map.get("store.resequence.id.path");
        this.gapTimeoutInterval = Integer.parseInt((String) map.get("store.resequence.timeout"));
        if (this.gapTimeoutInterval < 0) {
            this.nextElapsedTime = -1L;
            return;
        }
        this.gapTimeoutInterval *= 1000;
        this.nextElapsedTime = System.currentTimeMillis() + this.gapTimeoutInterval;
        if (log.isDebugEnabled()) {
            log.debug("Resequencer initialized with xpath:" + this.xPath.expression + ",the waiting count configured:" + this.gapTimeoutInterval);
        }
    }

    @Override // org.apache.synapse.message.store.impl.jdbc.JDBCMessageStore, org.apache.synapse.message.store.AbstractMessageStore, org.apache.synapse.message.store.MessageStore
    public void setParameters(Map<String, Object> map) {
        initResequenceParams(map);
        super.setParameters(map);
    }

    @Override // org.apache.synapse.message.store.impl.jdbc.JDBCMessageStore, org.apache.synapse.message.store.AbstractMessageStore, org.apache.synapse.ManagedLifecycle
    public void init(SynapseEnvironment synapseEnvironment) {
        super.init(synapseEnvironment);
        this.nextSequenceId = readStartId() + 1;
        if (log.isDebugEnabled()) {
            log.debug("Next sequence which will be processed:" + this.nextSequenceId);
        }
    }

    private Long getMessageSequenceId(MessageContext messageContext) throws StoreException {
        String stringValueOf = this.xPath.stringValueOf(messageContext);
        if (log.isDebugEnabled()) {
            log.debug("Sequence id extracted from the incoming message " + messageContext.getMessageID() + " is:" + stringValueOf);
        }
        return Long.valueOf(Long.parseLong(stringValueOf));
    }

    private MessageContext getNextMessage() {
        MessageContext messageContext = null;
        try {
            Statement statement = new Statement("SELECT message FROM " + getJdbcConfiguration().getTableName() + " WHERE seq_id= ?") { // from class: org.apache.synapse.message.store.impl.resequencer.ResequenceMessageStore.2
                @Override // org.apache.synapse.message.store.impl.jdbc.util.Statement
                public List<Map> getResult(ResultSet resultSet) throws SQLException {
                    return ResequenceMessageStore.this.messageContentResultSet(resultSet, getStatement());
                }
            };
            statement.addParameter(Long.valueOf(this.nextSequenceId));
            List<Map> processedRows = getProcessedRows(statement);
            if (!processedRows.isEmpty()) {
                messageContext = getMessageContext(processedRows, 0);
                this.nextSequenceId++;
                if (log.isTraceEnabled()) {
                    log.trace("Message with id " + messageContext.getMessageID() + " returned for sequence " + this.nextSequenceId);
                }
            } else if (log.isTraceEnabled()) {
                log.trace("Sequences not returned from DB, next sequence will be:" + this.nextSequenceId);
            }
            return messageContext;
        } catch (SynapseException e) {
            throw new SynapseException("Error while peek the message", e);
        }
    }

    @Override // org.apache.synapse.message.store.impl.jdbc.JDBCMessageStore
    protected List<Statement> removeMessageStatement(String str) {
        Long remove = this.sequenceIdMapper.remove(str);
        String name = getName();
        if (remove == null) {
            log.error("The message with id " + str + " is not tracked within the memory.");
        }
        ArrayList arrayList = new ArrayList();
        String str2 = "DELETE FROM " + getJdbcConfiguration().getTableName() + " WHERE msg_id=?";
        Statement statement = new Statement("INSERT INTO tbl_lastprocessid (statement,seq_id) VALUES (?,?) ON DUPLICATE KEY UPDATE seq_id = ?") { // from class: org.apache.synapse.message.store.impl.resequencer.ResequenceMessageStore.3
            @Override // org.apache.synapse.message.store.impl.jdbc.util.Statement
            public List<Map> getResult(ResultSet resultSet) throws SQLException {
                throw new UnsupportedOperationException();
            }
        };
        Statement statement2 = new Statement(str2) { // from class: org.apache.synapse.message.store.impl.resequencer.ResequenceMessageStore.4
            @Override // org.apache.synapse.message.store.impl.jdbc.util.Statement
            public List<Map> getResult(ResultSet resultSet) throws SQLException {
                throw new UnsupportedOperationException();
            }
        };
        statement2.addParameter(str);
        statement.addParameter(name);
        statement.addParameter(remove);
        statement.addParameter(remove);
        arrayList.add(statement2);
        arrayList.add(statement);
        if (log.isDebugEnabled()) {
            log.debug("Removing message with id:" + str + " and last process id:" + remove);
        }
        return arrayList;
    }

    private MessageContext getMessageContext(List<Map> list, int i) {
        return (MessageContext) list.get(i).get("message");
    }

    private long getSequenceId(List<Map> list, int i) {
        return ((Long) list.get(i).get("seq_id")).longValue();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public List<Map> getMessageWithMinimumId(ResultSet resultSet, String str) throws SQLException {
        ArrayList arrayList = new ArrayList();
        while (resultSet.next()) {
            try {
                HashMap hashMap = new HashMap();
                hashMap.put("message", deserializeMessage(resultSet.getBytes("message")));
                hashMap.put("seq_id", Long.valueOf(resultSet.getLong("seq_id")));
                arrayList.add(hashMap);
            } catch (SQLException e) {
                throw new SynapseException("Error executing statement : " + str + " against DataSource : " + getJdbcConfiguration().getDSName(), e);
            }
        }
        return arrayList;
    }

    private MessageContext getMessageWithMinimumSequence() {
        String tableName = getJdbcConfiguration().getTableName();
        MessageContext messageContext = null;
        try {
            List<Map> processedRows = getProcessedRows(new Statement("SELECT message,seq_id FROM " + tableName + " WHERE seq_id=(SELECT min(seq_id) from " + tableName + ")") { // from class: org.apache.synapse.message.store.impl.resequencer.ResequenceMessageStore.5
                @Override // org.apache.synapse.message.store.impl.jdbc.util.Statement
                public List<Map> getResult(ResultSet resultSet) throws SQLException {
                    return ResequenceMessageStore.this.getMessageWithMinimumId(resultSet, getStatement());
                }
            });
            if (!processedRows.isEmpty()) {
                messageContext = getMessageContext(processedRows, 0);
                this.nextSequenceId = getSequenceId(processedRows, 0) + 1;
                if (log.isTraceEnabled()) {
                    log.trace("Message with id " + messageContext.getMessageID() + " returned as the minimum, the minimum sequence will be marked as " + this.nextSequenceId);
                }
            }
            return messageContext;
        } catch (SynapseException e) {
            throw new SynapseException("Error while peek the message", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.synapse.message.store.impl.jdbc.JDBCMessageStore
    public Statement getStoreMessageStatement(MessageContext messageContext, Long l) throws StoreException {
        return super.getStoreMessageStatement(messageContext, getMessageSequenceId(messageContext));
    }

    private boolean shouldWait() {
        return this.nextElapsedTime < 0 || System.currentTimeMillis() <= this.nextElapsedTime;
    }

    @Override // org.apache.synapse.message.store.impl.jdbc.JDBCMessageStore
    public MessageContext peek() throws SynapseException {
        if (!this.hasStarted) {
            this.nextSequenceId = readStartId() + 1;
            this.hasStarted = true;
        }
        MessageContext nextMessage = getNextMessage();
        if (null == nextMessage && !shouldWait()) {
            nextMessage = getMessageWithMinimumSequence();
        }
        if (null != nextMessage) {
            long j = this.nextSequenceId - 1;
            String messageID = nextMessage.getMessageID();
            this.sequenceIdMapper.put(messageID, Long.valueOf(j));
            if (this.nextElapsedTime > 0) {
                this.nextElapsedTime = System.currentTimeMillis() + this.gapTimeoutInterval;
            }
            if (log.isDebugEnabled()) {
                log.debug("Message with sequence " + j + " and message id " + messageID + " will be returned to the processor.");
                log.debug("Next elapsed time would be marked as:" + this.nextElapsedTime);
            }
        }
        return nextMessage;
    }
}
