package org.apache.storm.redis.state;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.storm.redis.common.config.JedisPoolConfig;
import org.apache.storm.redis.common.container.JedisCommandsContainerBuilder;
import org.apache.storm.redis.common.container.JedisCommandsInstanceContainer;
import org.apache.storm.state.DefaultStateSerializer;
import org.apache.storm.state.KeyValueState;
import org.apache.storm.state.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisCommands;
import sun.misc.BASE64Decoder;
import sun.misc.BASE64Encoder;

/* loaded from: input_file:org/apache/storm/redis/state/RedisKeyValueState.class */
public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
    private static final Logger LOG = LoggerFactory.getLogger(RedisKeyValueState.class);
    private static final String COMMIT_TXID_KEY = "commit";
    private static final String PREPARE_TXID_KEY = "prepare";
    private final BASE64Encoder base64Encoder;
    private final BASE64Decoder base64Decoder;
    private final String namespace;
    private final String prepareNamespace;
    private final String txidNamespace;
    private final Serializer<K> keySerializer;
    private final Serializer<V> valueSerializer;
    private final JedisCommandsInstanceContainer jedisContainer;
    private Map<String, String> pendingPrepare;
    private Map<String, String> pendingCommit;
    private Map<String, String> txIds;

    public RedisKeyValueState(String str) {
        this(str, new JedisPoolConfig.Builder().build());
    }

    public RedisKeyValueState(String str, JedisPoolConfig jedisPoolConfig) {
        this(str, jedisPoolConfig, (Serializer) new DefaultStateSerializer(), (Serializer) new DefaultStateSerializer());
    }

    public RedisKeyValueState(String str, JedisPoolConfig jedisPoolConfig, Serializer<K> serializer, Serializer<V> serializer2) {
        this(str, JedisCommandsContainerBuilder.build(jedisPoolConfig), serializer, serializer2);
    }

    public RedisKeyValueState(String str, JedisCommandsInstanceContainer jedisCommandsInstanceContainer, Serializer<K> serializer, Serializer<V> serializer2) {
        this.base64Encoder = new BASE64Encoder();
        this.base64Decoder = new BASE64Decoder();
        this.namespace = str;
        this.prepareNamespace = str + "$prepare";
        this.txidNamespace = str + "$txid";
        this.keySerializer = serializer;
        this.valueSerializer = serializer2;
        this.jedisContainer = jedisCommandsInstanceContainer;
        this.pendingPrepare = new ConcurrentHashMap();
        initTxids();
        initPendingCommit();
    }

    private void initTxids() {
        JedisCommands jedisCommands = null;
        try {
            jedisCommands = this.jedisContainer.getInstance();
            if (jedisCommands.exists(this.txidNamespace).booleanValue()) {
                this.txIds = jedisCommands.hgetAll(this.txidNamespace);
            } else {
                this.txIds = new HashMap();
            }
            LOG.debug("initTxids, txIds {}", this.txIds);
            this.jedisContainer.returnInstance(jedisCommands);
        } catch (Throwable th) {
            this.jedisContainer.returnInstance(jedisCommands);
            throw th;
        }
    }

    private void initPendingCommit() {
        try {
            JedisCommands jedisCommandsInstanceContainer = this.jedisContainer.getInstance();
            if (jedisCommandsInstanceContainer.exists(this.prepareNamespace).booleanValue()) {
                LOG.debug("Loading previously prepared commit from {}", this.prepareNamespace);
                this.pendingCommit = Collections.unmodifiableMap(jedisCommandsInstanceContainer.hgetAll(this.prepareNamespace));
            } else {
                LOG.debug("No previously prepared commits.");
                this.pendingCommit = Collections.emptyMap();
            }
            this.jedisContainer.returnInstance(jedisCommandsInstanceContainer);
        } catch (Throwable th) {
            this.jedisContainer.returnInstance(null);
            throw th;
        }
    }

    public void put(K k, V v) {
        LOG.debug("put key '{}', value '{}'", k, v);
        this.pendingPrepare.put(encode(this.keySerializer.serialize(k)), encode(this.valueSerializer.serialize(v)));
    }

    public V get(K k) {
        String hget;
        LOG.debug("get key '{}'", k);
        String encode = encode(this.keySerializer.serialize(k));
        if (this.pendingPrepare.containsKey(encode)) {
            hget = this.pendingPrepare.get(encode);
        } else if (this.pendingCommit.containsKey(encode)) {
            hget = this.pendingCommit.get(encode);
        } else {
            JedisCommands jedisCommands = null;
            try {
                jedisCommands = this.jedisContainer.getInstance();
                hget = jedisCommands.hget(this.namespace, encode);
                this.jedisContainer.returnInstance(jedisCommands);
            } catch (Throwable th) {
                this.jedisContainer.returnInstance(jedisCommands);
                throw th;
            }
        }
        Object obj = null;
        if (hget != null) {
            obj = this.valueSerializer.deserialize(decode(hget));
        }
        LOG.debug("Value for key '{}' is '{}'", k, obj);
        return (V) obj;
    }

    public V get(K k, V v) {
        V v2 = get(k);
        return v2 != null ? v2 : v;
    }

    public void prepareCommit(long j) {
        LOG.debug("prepareCommit txid {}", Long.valueOf(j));
        validatePrepareTxid(j);
        JedisCommands jedisCommands = null;
        try {
            jedisCommands = this.jedisContainer.getInstance();
            if (jedisCommands.exists(this.prepareNamespace).booleanValue()) {
                LOG.debug("Prepared txn already exists, will merge", Long.valueOf(j));
                this.pendingPrepare.putAll(this.pendingCommit);
            }
            if (this.pendingPrepare.isEmpty()) {
                LOG.debug("Nothing to save for prepareCommit, txid {}.", Long.valueOf(j));
            } else {
                jedisCommands.hmset(this.prepareNamespace, this.pendingPrepare);
            }
            this.txIds.put(PREPARE_TXID_KEY, String.valueOf(j));
            jedisCommands.hmset(this.txidNamespace, this.txIds);
            this.pendingCommit = Collections.unmodifiableMap(this.pendingPrepare);
            this.pendingPrepare = new ConcurrentHashMap();
            this.jedisContainer.returnInstance(jedisCommands);
        } catch (Throwable th) {
            this.jedisContainer.returnInstance(jedisCommands);
            throw th;
        }
    }

    public void commit(long j) {
        LOG.debug("commit txid {}", Long.valueOf(j));
        validateCommitTxid(j);
        JedisCommands jedisCommands = null;
        try {
            jedisCommands = this.jedisContainer.getInstance();
            if (this.pendingCommit.isEmpty()) {
                LOG.debug("Nothing to save for commit, txid {}.", Long.valueOf(j));
            } else {
                jedisCommands.hmset(this.namespace, this.pendingCommit);
            }
            this.txIds.put(COMMIT_TXID_KEY, String.valueOf(j));
            jedisCommands.hmset(this.txidNamespace, this.txIds);
            jedisCommands.del(this.prepareNamespace);
            this.pendingCommit = Collections.emptyMap();
            this.jedisContainer.returnInstance(jedisCommands);
        } catch (Throwable th) {
            this.jedisContainer.returnInstance(jedisCommands);
            throw th;
        }
    }

    public void commit() {
        JedisCommands jedisCommands = null;
        try {
            jedisCommands = this.jedisContainer.getInstance();
            if (this.pendingPrepare.isEmpty()) {
                LOG.debug("Nothing to save for commit");
            } else {
                jedisCommands.hmset(this.namespace, this.pendingPrepare);
            }
            this.pendingPrepare = new ConcurrentHashMap();
            this.jedisContainer.returnInstance(jedisCommands);
        } catch (Throwable th) {
            this.jedisContainer.returnInstance(jedisCommands);
            throw th;
        }
    }

    public void rollback() {
        LOG.debug("rollback");
        JedisCommands jedisCommands = null;
        try {
            jedisCommands = this.jedisContainer.getInstance();
            if (jedisCommands.exists(this.prepareNamespace).booleanValue()) {
                jedisCommands.del(this.prepareNamespace);
            } else {
                LOG.debug("Nothing to rollback, prepared data is empty");
            }
            Long lastCommittedTxid = lastCommittedTxid();
            if (lastCommittedTxid != null) {
                this.txIds.put(PREPARE_TXID_KEY, String.valueOf(lastCommittedTxid));
            } else {
                this.txIds.remove(PREPARE_TXID_KEY);
            }
            if (!this.txIds.isEmpty()) {
                LOG.debug("hmset txidNamespace {}, txIds {}", this.txidNamespace, this.txIds);
                jedisCommands.hmset(this.txidNamespace, this.txIds);
            }
            this.pendingCommit = Collections.emptyMap();
            this.pendingPrepare = new ConcurrentHashMap();
            this.jedisContainer.returnInstance(jedisCommands);
        } catch (Throwable th) {
            this.jedisContainer.returnInstance(jedisCommands);
            throw th;
        }
    }

    private void validatePrepareTxid(long j) {
        Long lastCommittedTxid = lastCommittedTxid();
        if (lastCommittedTxid != null && j <= lastCommittedTxid.longValue()) {
            throw new RuntimeException("Invalid txid '" + j + "' for prepare. Txid '" + lastCommittedTxid + "' is already committed");
        }
    }

    private void validateCommitTxid(long j) {
        Long lastCommittedTxid = lastCommittedTxid();
        if (lastCommittedTxid != null && j < lastCommittedTxid.longValue()) {
            throw new RuntimeException("Invalid txid '" + j + "' txid '" + lastCommittedTxid + "' is already committed");
        }
        Long lastPreparedTxid = lastPreparedTxid();
        if (lastPreparedTxid != null && j != lastPreparedTxid.longValue()) {
            throw new RuntimeException("Invalid txid '" + j + "' not same as prepared txid '" + lastPreparedTxid + "'");
        }
    }

    private Long lastCommittedTxid() {
        return lastId(COMMIT_TXID_KEY);
    }

    private Long lastPreparedTxid() {
        return lastId(PREPARE_TXID_KEY);
    }

    private Long lastId(String str) {
        Long l = null;
        String str2 = this.txIds.get(str);
        if (str2 != null) {
            l = Long.valueOf(str2);
        }
        return l;
    }

    private String encode(byte[] bArr) {
        return this.base64Encoder.encode(bArr);
    }

    private byte[] decode(String str) {
        try {
            return this.base64Decoder.decodeBuffer(str);
        } catch (IOException e) {
            throw new RuntimeException("Error while decoding string " + str);
        }
    }
}
