/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.redis.service;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.redis.RedisConnectionPool;
import org.apache.nifi.redis.RedisType;
import org.apache.nifi.redis.util.RedisAction;
import org.apache.nifi.util.Tuple;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisStringCommands;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.data.redis.core.types.Expiration;

@Tags(value={"redis", "distributed", "cache", "map"})
@CapabilityDescription(value="An implementation of DistributedMapCacheClient that uses Redis as the backing cache. This service relies on the WATCH, MULTI, and EXEC commands in Redis, which are not fully supported when Redis is clustered. As a result, this service can only be used with a Redis Connection Pool that is configured for standalone or sentinel mode. Sentinel mode can be used to provide high-availability configurations.")
public class RedisDistributedMapCacheClientService
extends AbstractControllerService
implements AtomicDistributedMapCacheClient<byte[]> {
    public static final PropertyDescriptor REDIS_CONNECTION_POOL = new PropertyDescriptor.Builder().name("redis-connection-pool").displayName("Redis Connection Pool").identifiesControllerService(RedisConnectionPool.class).required(true).build();
    public static final PropertyDescriptor TTL = new PropertyDescriptor.Builder().name("redis-cache-ttl").displayName("TTL").description("Indicates how long the data should exist in Redis. Setting '0 secs' would mean the data would exist forever").addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).required(true).defaultValue("0 secs").build();
    static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
    private volatile RedisConnectionPool redisConnectionPool;
    private Long ttl;

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        RedisType redisType;
        ArrayList<ValidationResult> results = new ArrayList<ValidationResult>();
        RedisConnectionPool redisConnectionPool = (RedisConnectionPool)validationContext.getProperty(REDIS_CONNECTION_POOL).asControllerService(RedisConnectionPool.class);
        if (redisConnectionPool != null && (redisType = redisConnectionPool.getRedisType()) != null && redisType == RedisType.CLUSTER) {
            results.add(new ValidationResult.Builder().subject(REDIS_CONNECTION_POOL.getDisplayName()).valid(false).explanation(REDIS_CONNECTION_POOL.getDisplayName() + " is configured in clustered mode, and this service requires a non-clustered Redis").build());
        }
        return results;
    }

    @OnEnabled
    public void onEnabled(ConfigurationContext context) {
        this.redisConnectionPool = (RedisConnectionPool)context.getProperty(REDIS_CONNECTION_POOL).asControllerService(RedisConnectionPool.class);
        this.ttl = context.getProperty(TTL).asTimePeriod(TimeUnit.SECONDS);
        if (this.ttl == 0L) {
            this.ttl = -1L;
        }
    }

    @OnDisabled
    public void onDisabled() {
        this.redisConnectionPool = null;
    }

    public <K, V> boolean putIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
        return this.withConnection(redisConnection -> {
            Tuple<byte[], byte[]> kv = this.serialize(key, value, keySerializer, valueSerializer);
            boolean set = redisConnection.setNX((byte[])kv.getKey(), (byte[])kv.getValue());
            if (this.ttl != -1L && set) {
                redisConnection.expire((byte[])kv.getKey(), this.ttl.longValue());
            }
            return set;
        });
    }

    public <K, V> V getAndPutIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, Deserializer<V> valueDeserializer) throws IOException {
        return (V)this.withConnection(redisConnection -> {
            Tuple<byte[], byte[]> kv = this.serialize(key, value, keySerializer, valueSerializer);
            do {
                List results;
                redisConnection.watch((byte[][])new byte[][]{(byte[])kv.getKey()});
                byte[] existingValue = redisConnection.get((byte[])kv.getKey());
                redisConnection.multi();
                redisConnection.setNX((byte[])kv.getKey(), (byte[])kv.getValue());
                if (this.ttl != -1L && existingValue == null) {
                    redisConnection.expire((byte[])kv.getKey(), this.ttl.longValue());
                }
                if ((results = redisConnection.exec()).size() <= 0) continue;
                Object firstResult = results.get(0);
                if (firstResult instanceof Boolean) {
                    Boolean absent = (Boolean)firstResult;
                    return absent != false ? null : valueDeserializer.deserialize(existingValue);
                }
                throw new IOException("Unexpected result from Redis transaction: Expected Boolean result, but got " + firstResult.getClass().getName() + " with value " + firstResult.toString());
            } while (this.isEnabled());
            return null;
        });
    }

    public <K> boolean containsKey(K key, Serializer<K> keySerializer) throws IOException {
        return this.withConnection(redisConnection -> {
            byte[] k = this.serialize(key, keySerializer);
            return redisConnection.exists(k);
        });
    }

    public <K, V> void put(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
        this.withConnection(redisConnection -> {
            Tuple<byte[], byte[]> kv = this.serialize(key, value, keySerializer, valueSerializer);
            redisConnection.set((byte[])kv.getKey(), (byte[])kv.getValue(), Expiration.seconds((long)this.ttl), RedisStringCommands.SetOption.upsert());
            return null;
        });
    }

    public <K, V> V get(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException {
        return (V)this.withConnection(redisConnection -> {
            byte[] k = this.serialize(key, keySerializer);
            byte[] v = redisConnection.get(k);
            return valueDeserializer.deserialize(v);
        });
    }

    public void close() throws IOException {
    }

    public <K> boolean remove(K key, Serializer<K> keySerializer) throws IOException {
        return this.withConnection(redisConnection -> {
            byte[] k = this.serialize(key, keySerializer);
            long numRemoved = redisConnection.del((byte[][])new byte[][]{k});
            return numRemoved > 0L;
        });
    }

    public long removeByPattern(String regex) throws IOException {
        return this.withConnection(redisConnection -> {
            long deletedCount = 0L;
            ArrayList<byte[]> batchKeys = new ArrayList<byte[]>();
            Cursor cursor = redisConnection.scan(ScanOptions.scanOptions().count(100L).match(regex).build());
            while (cursor.hasNext()) {
                batchKeys.add((byte[])cursor.next());
                if (batchKeys.size() != 1000) continue;
                deletedCount += redisConnection.del(this.getKeys(batchKeys)).longValue();
                batchKeys.clear();
            }
            if (batchKeys.size() > 0) {
                deletedCount += redisConnection.del(this.getKeys(batchKeys)).longValue();
                batchKeys.clear();
            }
            return deletedCount;
        });
    }

    private byte[][] getKeys(List<byte[]> keys) {
        byte[][] allKeysArray = new byte[keys.size()][];
        for (int i = 0; i < keys.size(); ++i) {
            allKeysArray[i] = keys.get(i);
        }
        return allKeysArray;
    }

    public <K, V> AtomicCacheEntry<K, V, byte[]> fetch(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException {
        return this.withConnection(redisConnection -> {
            byte[] k = this.serialize(key, keySerializer);
            byte[] v = redisConnection.get(k);
            if (v == null) {
                return null;
            }
            return new AtomicCacheEntry(key, valueDeserializer.deserialize(v), (Object)v);
        });
    }

    public <K, V> boolean replace(AtomicCacheEntry<K, V, byte[]> entry, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
        return this.withConnection(redisConnection -> {
            List results;
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            keySerializer.serialize(entry.getKey(), (OutputStream)out);
            byte[] k = out.toByteArray();
            out.reset();
            valueSerializer.serialize(entry.getValue(), (OutputStream)out);
            byte[] newVal = out.toByteArray();
            byte[] prevVal = entry.getRevision().orElse(null);
            boolean replaced = false;
            redisConnection.watch((byte[][])new byte[][]{k});
            byte[] currValue = redisConnection.get(k);
            redisConnection.multi();
            if (Arrays.equals(prevVal, currValue)) {
                redisConnection.getSet(k, newVal);
            }
            if ((results = redisConnection.exec()).size() > 0) {
                replaced = true;
            }
            return replaced;
        });
    }

    private <K, V> Tuple<byte[], byte[]> serialize(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        keySerializer.serialize(key, (OutputStream)out);
        byte[] k = out.toByteArray();
        out.reset();
        valueSerializer.serialize(value, (OutputStream)out);
        byte[] v = out.toByteArray();
        return new Tuple((Object)k, (Object)v);
    }

    private <K> byte[] serialize(K key, Serializer<K> keySerializer) throws IOException {
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        keySerializer.serialize(key, (OutputStream)out);
        return out.toByteArray();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T> T withConnection(RedisAction<T> action) throws IOException {
        RedisConnection redisConnection = null;
        try {
            redisConnection = this.redisConnectionPool.getConnection();
            T t = action.execute(redisConnection);
            return t;
        }
        finally {
            if (redisConnection != null) {
                try {
                    redisConnection.close();
                }
                catch (Exception e) {
                    this.getLogger().warn("Error closing connection: " + e.getMessage(), (Throwable)e);
                }
            }
        }
    }

    static {
        ArrayList<PropertyDescriptor> props = new ArrayList<PropertyDescriptor>();
        props.add(REDIS_CONNECTION_POOL);
        props.add(TTL);
        PROPERTY_DESCRIPTORS = Collections.unmodifiableList(props);
    }
}

