package org.apache.samza.coordinator.metadatastore;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.coordinator.stream.CoordinatorStreamKeySerde;
import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
import org.apache.samza.metadatastore.MetadataStore;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.serializers.JsonSerde;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemFactory;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.system.SystemStreamPartitionIterator;
import org.apache.samza.util.CoordinatorStreamUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.class */
public class CoordinatorStreamStore implements MetadataStore {
    private static final String SOURCE = "SamzaContainer";
    private final Config config;
    private final SystemStream coordinatorSystemStream;
    private final SystemStreamPartition coordinatorSystemStreamPartition;
    private final SystemProducer systemProducer;
    private final SystemConsumer systemConsumer;
    private final SystemAdmin systemAdmin;
    private final Map<String, byte[]> messagesReadFromCoordinatorStream = new ConcurrentHashMap();
    private final Object bootstrapLock = new Object();
    private final AtomicBoolean isInitialized = new AtomicBoolean(false);
    private SystemStreamPartitionIterator iterator;
    private static final Logger LOG = LoggerFactory.getLogger(CoordinatorStreamStore.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

    /* loaded from: input_file:org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore$CoordinatorMessageKey.class */
    public static class CoordinatorMessageKey {
        private final String key;
        private final String namespace;

        CoordinatorMessageKey(@JsonProperty("key") String str, @JsonProperty("namespace") String str2) {
            this.key = str;
            this.namespace = str2;
        }

        public String getKey() {
            return this.key;
        }

        public String getNamespace() {
            return this.namespace;
        }
    }

    public CoordinatorStreamStore(Config config, MetricsRegistry metricsRegistry) {
        this.config = config;
        this.coordinatorSystemStream = CoordinatorStreamUtil.getCoordinatorSystemStream(config);
        this.coordinatorSystemStreamPartition = new SystemStreamPartition(this.coordinatorSystemStream, new Partition(0));
        SystemFactory coordinatorSystemFactory = CoordinatorStreamUtil.getCoordinatorSystemFactory(config);
        this.systemProducer = coordinatorSystemFactory.getProducer(this.coordinatorSystemStream.getSystem(), config, metricsRegistry, getClass().getSimpleName());
        this.systemConsumer = coordinatorSystemFactory.getConsumer(this.coordinatorSystemStream.getSystem(), config, metricsRegistry, getClass().getSimpleName());
        this.systemAdmin = coordinatorSystemFactory.getAdmin(this.coordinatorSystemStream.getSystem(), config, getClass().getSimpleName());
    }

    @VisibleForTesting
    protected CoordinatorStreamStore(Config config, SystemProducer systemProducer, SystemConsumer systemConsumer, SystemAdmin systemAdmin) {
        this.config = config;
        this.systemConsumer = systemConsumer;
        this.systemProducer = systemProducer;
        this.systemAdmin = systemAdmin;
        this.coordinatorSystemStream = CoordinatorStreamUtil.getCoordinatorSystemStream(config);
        this.coordinatorSystemStreamPartition = new SystemStreamPartition(this.coordinatorSystemStream, new Partition(0));
    }

    public void init() {
        if (!this.isInitialized.compareAndSet(false, true)) {
            LOG.info("Store had already been initialized. Skipping.", this.coordinatorSystemStreamPartition);
            return;
        }
        LOG.info("Starting the coordinator stream system consumer with config: {}.", this.config);
        registerConsumer();
        this.systemConsumer.start();
        this.systemProducer.register(SOURCE);
        this.systemProducer.start();
        this.iterator = new SystemStreamPartitionIterator(this.systemConsumer, this.coordinatorSystemStreamPartition);
        readMessagesFromCoordinatorStream();
    }

    public byte[] get(String str) {
        readMessagesFromCoordinatorStream();
        return this.messagesReadFromCoordinatorStream.get(str);
    }

    public void put(String str, byte[] bArr) {
        CoordinatorMessageKey deserializeCoordinatorMessageKeyFromJson = deserializeCoordinatorMessageKeyFromJson(str);
        this.systemProducer.send(SOURCE, new OutgoingMessageEnvelope(this.coordinatorSystemStream, 0, new CoordinatorStreamKeySerde(deserializeCoordinatorMessageKeyFromJson.getNamespace()).toBytes(deserializeCoordinatorMessageKeyFromJson.getKey()), bArr));
    }

    public void delete(String str) {
        put(str, null);
    }

    public Map<String, byte[]> all() {
        readMessagesFromCoordinatorStream();
        return Collections.unmodifiableMap(this.messagesReadFromCoordinatorStream);
    }

    private void readMessagesFromCoordinatorStream() {
        synchronized (this.bootstrapLock) {
            while (this.iterator.hasNext()) {
                IncomingMessageEnvelope next = this.iterator.next();
                CoordinatorStreamMessage coordinatorStreamMessage = new CoordinatorStreamMessage(((List) new JsonSerde().fromBytes((byte[]) next.getKey())).toArray(), new HashMap());
                String serializeCoordinatorMessageKeyToJson = serializeCoordinatorMessageKeyToJson(coordinatorStreamMessage.getType(), coordinatorStreamMessage.getKey());
                if (next.getMessage() != null) {
                    this.messagesReadFromCoordinatorStream.put(serializeCoordinatorMessageKeyToJson, (byte[]) next.getMessage());
                } else {
                    this.messagesReadFromCoordinatorStream.remove(serializeCoordinatorMessageKeyToJson);
                }
            }
        }
    }

    public void close() {
        try {
            LOG.info("Stopping the coordinator stream system consumer.", this.config);
            this.systemAdmin.stop();
            this.systemProducer.stop();
            this.systemConsumer.stop();
        } catch (Exception e) {
            LOG.error("Exception occurred when closing the metadata store:", e);
        }
    }

    public void flush() {
        try {
            this.systemProducer.flush(SOURCE);
        } catch (Exception e) {
            LOG.error("Exception occurred when flushing the metadata store:", e);
            throw new SamzaException("Exception occurred when flushing the metadata store:", e);
        }
    }

    private void registerConsumer() {
        LOG.debug("Attempting to register system stream partition: {}", this.coordinatorSystemStreamPartition);
        String stream = this.coordinatorSystemStreamPartition.getStream();
        SystemStreamMetadata systemStreamMetadata = (SystemStreamMetadata) this.systemAdmin.getSystemStreamMetadata(Sets.newHashSet(new String[]{stream})).get(stream);
        Preconditions.checkNotNull(systemStreamMetadata, String.format("System stream metadata does not exist for stream: %s.", stream));
        SystemStreamMetadata.SystemStreamPartitionMetadata systemStreamPartitionMetadata = (SystemStreamMetadata.SystemStreamPartitionMetadata) systemStreamMetadata.getSystemStreamPartitionMetadata().get(this.coordinatorSystemStreamPartition.getPartition());
        Preconditions.checkNotNull(systemStreamPartitionMetadata, String.format("System stream partition metadata does not exist for: %s.", this.coordinatorSystemStreamPartition));
        String oldestOffset = systemStreamPartitionMetadata.getOldestOffset();
        LOG.info("Registering system stream partition: {} with offset: {}.", this.coordinatorSystemStreamPartition, oldestOffset);
        this.systemConsumer.register(this.coordinatorSystemStreamPartition, oldestOffset);
    }

    public static String serializeCoordinatorMessageKeyToJson(String str, String str2) {
        try {
            return OBJECT_MAPPER.writeValueAsString(new CoordinatorMessageKey(str2, str));
        } catch (IOException e) {
            throw new SamzaException(String.format("Exception occurred when serializing metadata for type: %s, key: %s", str, str2), e);
        }
    }

    public static CoordinatorMessageKey deserializeCoordinatorMessageKeyFromJson(String str) {
        try {
            return (CoordinatorMessageKey) OBJECT_MAPPER.readValue(str, CoordinatorMessageKey.class);
        } catch (IOException e) {
            throw new SamzaException(String.format("Exception occurred when deserializing the coordinatorMsgKey: %s", str), e);
        }
    }
}
