package io.siddhi.extension.store.redis;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.exception.OperationNotSupportedException;
import io.siddhi.core.table.record.AbstractRecordTable;
import io.siddhi.core.table.record.ExpressionBuilder;
import io.siddhi.core.table.record.RecordIterator;
import io.siddhi.core.util.collection.operator.CompiledCondition;
import io.siddhi.core.util.collection.operator.CompiledExpression;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.extension.store.redis.beans.StoreVariable;
import io.siddhi.extension.store.redis.beans.StreamVariable;
import io.siddhi.extension.store.redis.exceptions.IllegalTtlArgumentException;
import io.siddhi.extension.store.redis.exceptions.RedisTableException;
import io.siddhi.extension.store.redis.utils.RedisClusterInstance;
import io.siddhi.extension.store.redis.utils.RedisInstance;
import io.siddhi.extension.store.redis.utils.RedisSingleNodeInstance;
import io.siddhi.extension.store.redis.utils.RedisTableConstants;
import io.siddhi.extension.store.redis.utils.RedisTableUtils;
import io.siddhi.query.api.annotation.Annotation;
import io.siddhi.query.api.definition.Attribute;
import io.siddhi.query.api.definition.TableDefinition;
import io.siddhi.query.api.util.AnnotationHelper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.ScanParams;
import redis.clients.jedis.ScanResult;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.jedis.exceptions.JedisException;

@Extension(name = "redis", namespace = "store", description = "This extension assigns data source and connection instructions to event tables. It also implements read write operations on connected datasource. This extension only can be used to read the data which persisted using the same extension since unique implementation has been used to map the relational data in to redis's key and value representation", parameters = {@Parameter(name = RedisTableConstants.ANNOTATION_ELEMENT_TABLE_NAME, description = "The name with which the event table should be persisted in the store. If noname is specified via this parameter, the event table is persisted with the same name as the Siddhi table.", type = {DataType.STRING}, optional = true, defaultValue = "The tale name defined in the siddhi app"), @Parameter(name = RedisTableConstants.ANNOTATION_ELEMENT_CLUSTER_MODE, description = "This will decide the redis mode. if this is false, client will connect to a single redis node.", type = {DataType.BOOL}, defaultValue = "false"), @Parameter(name = RedisTableConstants.ANNOTATION_ELEMENT_NODES, description = "host, port and the password of the node(s).In single node mode node details can be provided as follows- \"node='hosts:port@password'\" \nIn clustered mode host and port of all the master nodes should be provided separated by a comma(,). As an example \"nodes = 'localhost:30001,localhost:30002'\".", type = {DataType.STRING}, optional = true, defaultValue = "localhost:6379@root"), @Parameter(name = RedisTableConstants.ANNOTATION_ELEMENT_TTL_SECS, description = "Time to live in seconds for each record", type = {DataType.LONG}, optional = true, defaultValue = "-1"), @Parameter(name = RedisTableConstants.ANNOTATION_ELEMENT_TTL_ON_UPDATE, description = "Set ttl on row update", type = {DataType.BOOL}, optional = true, defaultValue = "false"), @Parameter(name = RedisTableConstants.ANNOTATION_ELEMENT_TTL_ON_READ, description = "Set ttl on read rows", type = {DataType.BOOL}, optional = true, defaultValue = "false")}, examples = {@Example(syntax = "@store(type='redis',nodes='localhost:6379@root',table.name='fooTable',cluster.mode=false)define table fooTable(time long, date String)", description = "Above example will create a redis table with the name fooTable and work on asingle redis node."), @Example(syntax = "@Store(type='redis', table.name='SweetProductionTable', nodes='localhost:30001,localhost:30002,localhost:30003', cluster.mode='true')\n@primaryKey('symbol')\n@index('price') \ndefine table SweetProductionTable (symbol string, price float, volume long);", description = "Above example demonstrate how to use the redis extension to connect in to redis cluster. Please note that, as nodes all the master node's host and port should be provided in order to work correctly. In clustered node password will not besupported"), @Example(syntax = "@store(type='redis',nodes='localhost:6379@root',table.name='fooTable', ttl.seconds='30', ttl.onUpdate='true', ttl.onRead='true')define table fooTable(time long, date String)", description = "Above example will create a redis table with the name fooTable and work on asingle redis node.  All rows inserted, updated or read will have its ttl set to 30 seconds")})
/* loaded from: input_file:io/siddhi/extension/store/redis/RedisTable.class */
public class RedisTable extends AbstractRecordTable {
    private static final Logger log = LoggerFactory.getLogger(RedisTable.class);
    private List<Attribute> attributes;
    private JedisPool jedisPool;
    private char[] password;
    private String tableName;
    private JedisCluster jedisCluster;
    private Jedis jedis;
    private RedisInstance redisInstance;
    private List<String> primaryKeys = new ArrayList();
    private String host = RedisTableConstants.DEFAULT_HOST;
    private int port = RedisTableConstants.DEFAULT_PORT;
    private List<String> indices = new ArrayList();
    private Boolean clusterModeEnabled = false;
    private List<HostAndPort> hostAndPortList = Arrays.asList(new HostAndPort(this.host, this.port));
    private int ttl = -1;
    private boolean ttlOnUpdate = false;
    private boolean ttlOnRead = false;

    protected void init(TableDefinition tableDefinition, ConfigReader configReader) {
        this.attributes = tableDefinition.getAttributeList();
        Annotation annotation = AnnotationHelper.getAnnotation("PrimaryKey", tableDefinition.getAnnotations());
        Annotation annotation2 = AnnotationHelper.getAnnotation("Store", tableDefinition.getAnnotations());
        Annotation annotation3 = AnnotationHelper.getAnnotation("Index", tableDefinition.getAnnotations());
        if (annotation != null) {
            this.primaryKeys = new ArrayList();
            annotation.getElements().forEach(element -> {
                this.primaryKeys.add(element.getValue().trim());
            });
        }
        if (annotation3 != null) {
            annotation3.getElements().forEach(element2 -> {
                this.indices.add(element2.getValue().trim());
            });
        }
        if (annotation2.getElement(RedisTableConstants.ANNOTATION_ELEMENT_CLUSTER_MODE) != null) {
            this.clusterModeEnabled = Boolean.valueOf(Boolean.parseBoolean(annotation2.getElement(RedisTableConstants.ANNOTATION_ELEMENT_CLUSTER_MODE)));
        }
        if (annotation2.getElement(RedisTableConstants.ANNOTATION_ELEMENT_HOST) != null) {
            this.host = annotation2.getElement(RedisTableConstants.ANNOTATION_ELEMENT_HOST);
        }
        if (this.clusterModeEnabled.booleanValue()) {
            this.hostAndPortList = new ArrayList();
            if (annotation2.getElement(RedisTableConstants.ANNOTATION_ELEMENT_NODES) != null) {
                for (String str : annotation2.getElement(RedisTableConstants.ANNOTATION_ELEMENT_NODES).split(",")) {
                    String[] split = str.split(":");
                    if (str.split(":").length == 2) {
                        this.hostAndPortList.add(new HostAndPort(split[0], Integer.parseInt(split[1])));
                    }
                }
            }
        } else if (annotation2.getElement(RedisTableConstants.ANNOTATION_ELEMENT_NODES) != null) {
            String[] split2 = annotation2.getElement(RedisTableConstants.ANNOTATION_ELEMENT_NODES).split("@");
            if (split2.length > 0) {
                String[] split3 = split2[0].split(":");
                if (split2[0].split(":").length == 2) {
                    this.host = split3[0];
                    this.port = Integer.parseInt(split3[1]);
                }
                if (split2.length == 2) {
                    this.password = split2[1].toCharArray();
                }
            }
        }
        if (annotation2.getElement(RedisTableConstants.ANNOTATION_ELEMENT_PORT) != null) {
            this.port = Integer.parseInt(annotation2.getElement(RedisTableConstants.ANNOTATION_ELEMENT_PORT));
        }
        if (annotation2.getElement(RedisTableConstants.ANNOTATION_ELEMENT_PASSWORD) != null) {
            this.password = annotation2.getElement(RedisTableConstants.ANNOTATION_ELEMENT_PASSWORD).toCharArray();
        }
        if (annotation2.getElement(RedisTableConstants.ANNOTATION_ELEMENT_TABLE_NAME) != null) {
            this.tableName = annotation2.getElement(RedisTableConstants.ANNOTATION_ELEMENT_TABLE_NAME);
        } else {
            this.tableName = tableDefinition.getId();
        }
        if (annotation2.getElement(RedisTableConstants.ANNOTATION_ELEMENT_TTL_SECS) != null) {
            this.ttl = Integer.parseInt(annotation2.getElement(RedisTableConstants.ANNOTATION_ELEMENT_TTL_SECS));
            if (1 > this.ttl) {
                throw new IllegalTtlArgumentException("ttl must be greater than 1");
            }
        } else {
            this.ttl = -1;
        }
        if (this.ttl <= 0 || annotation2.getElement(RedisTableConstants.ANNOTATION_ELEMENT_TTL_ON_UPDATE) == null) {
            this.ttlOnUpdate = false;
        } else {
            this.ttlOnUpdate = Boolean.parseBoolean(annotation2.getElement(RedisTableConstants.ANNOTATION_ELEMENT_TTL_ON_UPDATE));
        }
        if (this.ttl <= 0 || annotation2.getElement(RedisTableConstants.ANNOTATION_ELEMENT_TTL_ON_READ) == null) {
            this.ttlOnRead = false;
        } else {
            this.ttlOnRead = Boolean.parseBoolean(annotation2.getElement(RedisTableConstants.ANNOTATION_ELEMENT_TTL_ON_READ));
        }
        if (log.isDebugEnabled()) {
            log.debug("ttl " + this.ttl + " update " + this.ttlOnUpdate + " read " + this.ttlOnRead);
        }
    }

    protected void add(List<Object[]> list) {
        HashMap hashMap = new HashMap();
        try {
            list.forEach(objArr -> {
                StringBuilder sb = new StringBuilder(this.tableName);
                int i = 0;
                Iterator<Attribute> it = this.attributes.iterator();
                while (it.hasNext()) {
                    int i2 = i;
                    i++;
                    hashMap.put(it.next().getName(), objArr[i2].toString());
                }
                if (this.primaryKeys != null && !this.primaryKeys.isEmpty()) {
                    if (isRecordExists(hashMap)) {
                        throw new RedisTableException("Error While adding record to the table '" + this.tableName + "'. Record exists for primary key '" + this.primaryKeys.get(0) + "' with the value of '" + ((String) hashMap.get(this.primaryKeys.get(0))) + "'. ");
                    }
                    createTablesWithPrimaryKey(hashMap, sb);
                    return;
                }
                sb.append(":").append(RedisTableUtils.generateRecordID());
                this.redisInstance.hmset(sb.toString(), hashMap);
                if (this.indices != null && !this.indices.isEmpty()) {
                    createIndexTable(hashMap, sb);
                } else if (log.isDebugEnabled()) {
                    log.debug("There are no indexed columns defined in table " + this.tableName);
                }
                if (0 < this.ttl) {
                    RedisTableUtils.setExpire(this.redisInstance, this.tableName, this.indices, sb.toString(), this.ttl, hashMap);
                }
            });
        } catch (JedisException e) {
            throw new RedisTableException("Error while inserting records to Redis Event Table : " + this.tableName + ". ", e);
        }
    }

    protected RecordIterator<Object[]> find(Map<String, Object> map, CompiledCondition compiledCondition) throws ConnectionUnavailableException {
        try {
            return new RedisIterator(this.redisInstance, this.attributes, RedisTableUtils.resolveCondition((RedisCompliedCondition) compiledCondition, map), this.tableName, this.primaryKeys, this.indices, this.hostAndPortList, this.ttl, this.ttlOnRead);
        } catch (JedisDataException e) {
            throw new RedisTableException("Error while searching the records in Redis Event Table : " + this.tableName, e);
        }
    }

    protected boolean contains(Map<String, Object> map, CompiledCondition compiledCondition) throws ConnectionUnavailableException {
        return find(map, compiledCondition).hasNext();
    }

    protected void delete(List<Map<String, Object>> list, CompiledCondition compiledCondition) throws ConnectionUnavailableException {
        try {
            Iterator<Map<String, Object>> it = list.iterator();
            while (it.hasNext()) {
                deleteFromTable(RedisTableUtils.resolveCondition((RedisCompliedCondition) compiledCondition, it.next()));
            }
        } catch (JedisDataException e) {
            throw new RedisTableException("Error while deleting records from table : " + this.tableName, e);
        }
    }

    protected void update(CompiledCondition compiledCondition, List<Map<String, Object>> list, Map<String, CompiledExpression> map, List<Map<String, Object>> list2) throws ConnectionUnavailableException {
        try {
            updateTable(compiledCondition, list2, list, true);
        } catch (JedisDataException e) {
            throw new RedisTableException("Error while updating records from table : " + this.tableName, e);
        }
    }

    protected void updateOrAdd(CompiledCondition compiledCondition, List<Map<String, Object>> list, Map<String, CompiledExpression> map, List<Map<String, Object>> list2, List<Object[]> list3) throws ConnectionUnavailableException {
        try {
            updateTable(compiledCondition, list2, list, false);
        } catch (JedisDataException e) {
            throw new RedisTableException("Error while updating records from table : " + this.tableName, e);
        }
    }

    protected CompiledCondition compileCondition(ExpressionBuilder expressionBuilder) {
        RedisConditionVisitor redisConditionVisitor = new RedisConditionVisitor();
        expressionBuilder.build(redisConditionVisitor);
        return new RedisCompliedCondition(redisConditionVisitor.returnCondition());
    }

    protected CompiledExpression compileSetAttribute(ExpressionBuilder expressionBuilder) {
        return compileCondition(expressionBuilder);
    }

    protected void connect() throws ConnectionUnavailableException {
        try {
            if (this.jedisPool == null) {
                if (this.clusterModeEnabled.booleanValue()) {
                    this.jedisCluster = new JedisCluster(new HashSet(this.hostAndPortList));
                    this.redisInstance = new RedisClusterInstance(this.jedisCluster);
                } else {
                    this.jedisPool = new JedisPool(new GenericObjectPoolConfig(), this.host, this.port);
                    this.jedis = this.jedisPool.getResource();
                    if (this.password != null) {
                        this.jedis.auth(String.valueOf(this.password));
                    }
                    this.redisInstance = new RedisSingleNodeInstance(this.jedis);
                }
            }
        } catch (JedisConnectionException e) {
            throw new ConnectionUnavailableException("Error while initializing the Redis event table: " + this.tableName + " : " + e.getMessage(), e);
        }
    }

    protected void disconnect() {
        try {
            if (this.clusterModeEnabled.booleanValue() && this.jedisCluster != null) {
                this.jedisCluster.close();
            }
            if (this.jedisPool != null && !this.jedisPool.isClosed()) {
                this.jedisPool.close();
            }
        } catch (JedisConnectionException | IOException e) {
            log.error("Error while closing the redis client for table: " + this.tableName + " : ", e);
        }
    }

    protected void destroy() {
        try {
            if (this.jedisPool != null) {
                this.jedisPool.destroy();
            }
        } catch (JedisConnectionException e) {
            log.error("Error while closing the redis client for table: " + this.tableName + " : ", e);
        }
    }

    private void createIndexTable(Map<String, String> map, StringBuilder sb) {
        for (String str : this.indices) {
            this.redisInstance.sadd(this.tableName + ":" + str + ":" + map.get(str), sb.toString());
        }
    }

    private void createTablesWithPrimaryKey(Map<String, String> map, StringBuilder sb) {
        sb.append(":").append(map.get(this.primaryKeys.get(0)));
        this.redisInstance.hmset(sb.toString(), map);
        if (this.indices != null && !this.indices.isEmpty()) {
            createIndexTable(map, sb);
        } else if (log.isDebugEnabled()) {
            log.debug("There are no indexed columns defined in table " + this.tableName);
        }
        if (-1 != this.ttl) {
            RedisTableUtils.setExpire(this.redisInstance, this.tableName, this.indices, sb.toString(), this.ttl, map);
        }
    }

    private void deleteFromTable(BasicCompareOperation basicCompareOperation) {
        StoreVariable storeVariable = basicCompareOperation.getStoreVariable();
        StreamVariable streamVariable = basicCompareOperation.getStreamVariable();
        if (streamVariable.getName().equalsIgnoreCase("true")) {
            deleteAllRecords();
            return;
        }
        if (this.primaryKeys.contains(storeVariable.getName())) {
            Map<String, String> hgetAll = this.redisInstance.hgetAll(this.tableName + ":" + streamVariable.getName());
            for (String str : this.indices) {
                this.redisInstance.srem(this.tableName + ":" + str + ":" + hgetAll.get(str), this.tableName + ":" + streamVariable.getName());
            }
            this.redisInstance.del(this.tableName + ":" + streamVariable.getName());
            return;
        }
        if (!this.indices.contains(storeVariable.getName())) {
            throw new OperationNotSupportedException("Cannot delete records by  '" + storeVariable.getName() + "' since the field is nether indexed nor primary key");
        }
        Iterator it = this.redisInstance.sscan(this.tableName + ":" + storeVariable.getName() + ":" + streamVariable.getName(), "0").getResult().iterator();
        while (it.hasNext()) {
            this.redisInstance.del(it.next().toString());
        }
        this.redisInstance.del(this.tableName + ":" + storeVariable.getName() + ":" + streamVariable.getName());
    }

    private void deleteAllRecords() {
        ScanParams scanParams = new ScanParams();
        scanParams.match(this.tableName + ":*");
        scanParams.count(Integer.valueOf(RedisTableConstants.REDIS_BATCH_SIZE));
        List<String> scan = this.redisInstance.scan(this.hostAndPortList, scanParams);
        while (true) {
            List<String> list = scan;
            if (list.isEmpty()) {
                return;
            }
            Iterator<String> it = list.iterator();
            while (it.hasNext()) {
                this.redisInstance.del(it.next().toString());
            }
            scan = this.redisInstance.scan(this.hostAndPortList, scanParams);
        }
    }

    private void updateTable(CompiledCondition compiledCondition, List<Map<String, Object>> list, List<Map<String, Object>> list2, boolean z) {
        int i = 0;
        Iterator<Map<String, Object>> it = list2.iterator();
        while (it.hasNext()) {
            BasicCompareOperation resolveCondition = RedisTableUtils.resolveCondition((RedisCompliedCondition) compiledCondition, it.next());
            StoreVariable storeVariable = resolveCondition.getStoreVariable();
            StreamVariable streamVariable = resolveCondition.getStreamVariable();
            if (z && this.primaryKeys.contains(storeVariable.getName())) {
                if (this.redisInstance.hgetAll(this.tableName + ":" + streamVariable.getName()).isEmpty()) {
                    log.warn("Record " + storeVariable.getName() + " = " + streamVariable.getName() + " that trying to update does not exist in table : " + this.tableName + ". ");
                } else {
                    updateOnPrimaryKey(list.get(i), resolveCondition);
                }
            } else if (!z && this.primaryKeys.contains(storeVariable.getName())) {
                updateOnPrimaryKey(list.get(i), resolveCondition);
            } else if (z && this.indices.contains(storeVariable.getName())) {
                updateOrAddOnIndex(list, resolveCondition);
            } else if (!z && this.indices.contains(storeVariable.getName())) {
                if (log.isDebugEnabled()) {
                    log.debug("Existing records will be updated where " + storeVariable.getName() + " = " + streamVariable.getName() + ". ");
                }
                updateOrAddOnIndex(list, resolveCondition);
            }
            i++;
        }
    }

    private void updateOnPrimaryKey(Map<String, Object> map, BasicCompareOperation basicCompareOperation) {
        StreamVariable streamVariable = basicCompareOperation.getStreamVariable();
        String str = this.tableName + ":" + streamVariable.getName();
        for (Map.Entry<String, Object> entry : map.entrySet()) {
            if (this.indices.contains(entry.getKey())) {
                String hget = this.redisInstance.hget(str, entry.getKey());
                String str2 = this.tableName + ":" + entry.getKey() + ":";
                this.redisInstance.srem(str2 + hget, this.tableName + ":" + streamVariable.getName());
                this.redisInstance.sadd(str2 + entry.getValue(), this.tableName + ":" + streamVariable.getName());
            }
            this.redisInstance.hset(str, entry.getKey(), entry.getValue().toString());
        }
        if (this.ttlOnUpdate) {
            LinkedHashMap linkedHashMap = new LinkedHashMap();
            for (Map.Entry<String, Object> entry2 : map.entrySet()) {
                linkedHashMap.put(entry2.getKey(), "" + entry2.getValue());
            }
            RedisTableUtils.setExpire(this.redisInstance, this.tableName, this.indices, str, this.ttl, linkedHashMap);
        }
    }

    private void updateOrAddOnIndex(List<Map<String, Object>> list, BasicCompareOperation basicCompareOperation) {
        String str = this.tableName + ":" + basicCompareOperation.getStoreVariable().getName() + ":" + basicCompareOperation.getStreamVariable().getName();
        ScanResult<String> sscan = this.redisInstance.sscan(str, "0");
        List result = sscan.getResult();
        if (Long.parseLong(sscan.getStringCursor()) <= 0) {
            executeUpdateOperation(list, result);
            return;
        }
        while (Long.parseLong(sscan.getStringCursor()) > 0) {
            sscan = this.redisInstance.sscan(str, sscan.getStringCursor());
            result.addAll(sscan.getResult());
            executeUpdateOperation(list, result);
        }
    }

    private void executeUpdateOperation(List<Map<String, Object>> list, List list2) {
        for (Object obj : list2) {
            Iterator<Map<String, Object>> it = list.iterator();
            while (it.hasNext()) {
                for (Map.Entry<String, Object> entry : it.next().entrySet()) {
                    if (entry.getValue() == null) {
                        entry.setValue("");
                    }
                    if (this.primaryKeys.contains(entry.getKey())) {
                        throw new OperationNotSupportedException("Primary Key cannot be update in table : " + this.tableName);
                    }
                    executeUpdateOperationOnIndexedValues(entry, obj);
                }
            }
        }
    }

    private void executeUpdateOperationOnIndexedValues(Map.Entry<String, Object> entry, Object obj) {
        String hget = this.redisInstance.hget(obj.toString(), entry.getKey());
        this.redisInstance.hset(obj.toString(), entry.getKey(), entry.getValue().toString());
        this.redisInstance.sadd(this.tableName + ":" + entry.getKey() + ":" + entry.getValue(), obj.toString());
        if (hget.isEmpty()) {
            return;
        }
        this.redisInstance.srem(this.tableName + ":" + entry.getKey() + hget, obj.toString());
    }

    private boolean isRecordExists(Map<String, String> map) {
        String str = this.tableName + ":" + map.get(this.primaryKeys.get(0));
        Map<String, String> hgetAll = this.redisInstance.hgetAll(str);
        if (-1 != this.ttl && this.ttlOnRead && !hgetAll.isEmpty()) {
            RedisTableUtils.setExpire(this.redisInstance, this.tableName, this.indices, str, this.ttl, hgetAll);
        }
        return !hgetAll.isEmpty();
    }
}
