package org.apache.samza.coordinator.stream;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
import org.apache.samza.coordinator.stream.messages.SetConfig;
import org.apache.samza.serializers.JsonSerde;
import org.apache.samza.serializers.Serde;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.system.SystemStreamPartitionIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.class */
public class CoordinatorStreamSystemConsumer {
    private static final Logger log = LoggerFactory.getLogger(CoordinatorStreamSystemConsumer.class);
    private final Serde<List<?>> keySerde;
    private final Serde<Map<String, Object>> messageSerde;
    private final SystemStreamPartition coordinatorSystemStreamPartition;
    private final SystemConsumer systemConsumer;
    private final SystemAdmin systemAdmin;
    private final Map<String, String> configMap;
    private volatile boolean isStarted;
    private volatile boolean isBootstrapped;
    private final Object bootstrapLock;
    private volatile Set<CoordinatorStreamMessage> bootstrappedStreamSet;

    public CoordinatorStreamSystemConsumer(SystemStream systemStream, SystemConsumer systemConsumer, SystemAdmin systemAdmin, Serde<List<?>> serde, Serde<Map<String, Object>> serde2) {
        this.bootstrapLock = new Object();
        this.bootstrappedStreamSet = Collections.emptySet();
        this.coordinatorSystemStreamPartition = new SystemStreamPartition(systemStream, new Partition(0));
        this.systemConsumer = systemConsumer;
        this.systemAdmin = systemAdmin;
        this.configMap = new HashMap();
        this.isBootstrapped = false;
        this.keySerde = serde;
        this.messageSerde = serde2;
    }

    public CoordinatorStreamSystemConsumer(SystemStream systemStream, SystemConsumer systemConsumer, SystemAdmin systemAdmin) {
        this(systemStream, systemConsumer, systemAdmin, new JsonSerde(), new JsonSerde());
    }

    public void register() {
        if (this.isStarted) {
            log.info("Coordinator stream partition {} has already been registered. Skipping.", this.coordinatorSystemStreamPartition);
            return;
        }
        log.debug("Attempting to register: {}", this.coordinatorSystemStreamPartition);
        HashSet hashSet = new HashSet();
        String stream = this.coordinatorSystemStreamPartition.getStream();
        hashSet.add(stream);
        Map systemStreamMetadata = this.systemAdmin.getSystemStreamMetadata(hashSet);
        log.info(String.format("Got metadata %s", systemStreamMetadata.toString()));
        if (systemStreamMetadata == null) {
            throw new SamzaException("Received a null systemStreamMetadataMap from the systemAdmin. This is illegal.");
        }
        SystemStreamMetadata systemStreamMetadata2 = (SystemStreamMetadata) systemStreamMetadata.get(stream);
        if (systemStreamMetadata2 == null) {
            throw new SamzaException("Expected " + stream + " to be in system stream metadata.");
        }
        SystemStreamMetadata.SystemStreamPartitionMetadata systemStreamPartitionMetadata = (SystemStreamMetadata.SystemStreamPartitionMetadata) systemStreamMetadata2.getSystemStreamPartitionMetadata().get(this.coordinatorSystemStreamPartition.getPartition());
        if (systemStreamPartitionMetadata == null) {
            throw new SamzaException("Expected metadata for " + this.coordinatorSystemStreamPartition + " to exist.");
        }
        String oldestOffset = systemStreamPartitionMetadata.getOldestOffset();
        log.debug("Registering {} with offset {}", this.coordinatorSystemStreamPartition, oldestOffset);
        this.systemConsumer.register(this.coordinatorSystemStreamPartition, oldestOffset);
    }

    public void start() {
        if (this.isStarted) {
            log.info("Coordinator stream consumer already started");
            return;
        }
        log.info("Starting coordinator stream system consumer.");
        this.systemConsumer.start();
        this.isStarted = true;
    }

    public void stop() {
        log.info("Stopping coordinator stream system consumer.");
        this.systemConsumer.stop();
        this.isStarted = false;
    }

    public void bootstrap() {
        synchronized (this.bootstrapLock) {
            LinkedHashSet linkedHashSet = new LinkedHashSet(this.bootstrappedStreamSet);
            log.info("Bootstrapping configuration from coordinator stream.");
            SystemStreamPartitionIterator systemStreamPartitionIterator = new SystemStreamPartitionIterator(this.systemConsumer, this.coordinatorSystemStreamPartition);
            while (systemStreamPartitionIterator.hasNext()) {
                try {
                    IncomingMessageEnvelope next = systemStreamPartitionIterator.next();
                    Object[] array = ((List) this.keySerde.fromBytes((byte[]) next.getKey())).toArray();
                    Map map = null;
                    if (next.getMessage() != null) {
                        map = (Map) this.messageSerde.fromBytes((byte[]) next.getMessage());
                    }
                    CoordinatorStreamMessage coordinatorStreamMessage = new CoordinatorStreamMessage(array, map);
                    log.debug("Received coordinator stream message: {}", coordinatorStreamMessage);
                    if (linkedHashSet.remove(coordinatorStreamMessage)) {
                        log.debug("Removed duplicate message: {}", coordinatorStreamMessage);
                    }
                    linkedHashSet.add(coordinatorStreamMessage);
                    if ("set-config".equals(coordinatorStreamMessage.getType())) {
                        String key = coordinatorStreamMessage.getKey();
                        if (coordinatorStreamMessage.isDelete()) {
                            this.configMap.remove(key);
                        } else {
                            this.configMap.put(key, new SetConfig(coordinatorStreamMessage).getConfigValue());
                        }
                    }
                } catch (Exception e) {
                    throw new SamzaException(e);
                }
            }
            this.bootstrappedStreamSet = Collections.unmodifiableSet(linkedHashSet);
            log.debug("Bootstrapped configuration: {}", this.configMap);
            this.isBootstrapped = true;
        }
    }

    public Set<CoordinatorStreamMessage> getBoostrappedStream() {
        log.info("Returning the bootstrapped data from the stream");
        if (!this.isBootstrapped) {
            bootstrap();
        }
        return this.bootstrappedStreamSet;
    }

    public Set<CoordinatorStreamMessage> getBootstrappedStream(String str) {
        log.debug("Bootstrapping coordinator stream for messages of type {}", str);
        bootstrap();
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        for (CoordinatorStreamMessage coordinatorStreamMessage : this.bootstrappedStreamSet) {
            log.trace("Considering message: {}", coordinatorStreamMessage);
            if (str.equalsIgnoreCase(coordinatorStreamMessage.getType())) {
                log.trace("Adding message: {}", coordinatorStreamMessage);
                linkedHashSet.add(coordinatorStreamMessage);
            }
        }
        return linkedHashSet;
    }

    public Config getConfig() {
        if (this.isBootstrapped) {
            return new MapConfig(this.configMap);
        }
        throw new SamzaException("Must call bootstrap before retrieving config.");
    }

    public SystemStreamPartitionIterator getStartIterator() {
        return new SystemStreamPartitionIterator(this.systemConsumer, this.coordinatorSystemStreamPartition);
    }

    public Set<CoordinatorStreamMessage> getUnreadMessages(SystemStreamPartitionIterator systemStreamPartitionIterator) {
        return getUnreadMessages(systemStreamPartitionIterator, null);
    }

    public Set<CoordinatorStreamMessage> getUnreadMessages(SystemStreamPartitionIterator systemStreamPartitionIterator, String str) {
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        while (systemStreamPartitionIterator.hasNext()) {
            IncomingMessageEnvelope next = systemStreamPartitionIterator.next();
            Object[] array = ((List) this.keySerde.fromBytes((byte[]) next.getKey())).toArray();
            Map map = null;
            if (next.getMessage() != null) {
                map = (Map) this.messageSerde.fromBytes((byte[]) next.getMessage());
            }
            CoordinatorStreamMessage coordinatorStreamMessage = new CoordinatorStreamMessage(array, map);
            if (str == null || str.equals(coordinatorStreamMessage.getType())) {
                linkedHashSet.add(coordinatorStreamMessage);
            }
        }
        return linkedHashSet;
    }

    public boolean hasNewMessages(SystemStreamPartitionIterator systemStreamPartitionIterator) {
        if (systemStreamPartitionIterator == null) {
            return false;
        }
        return systemStreamPartitionIterator.hasNext();
    }
}
