package org.apache.samza.system.inmemory;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.system.EndOfStreamMessage;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/samza/system/inmemory/InMemoryManager.class */
public class InMemoryManager {
    private static final Logger LOG = LoggerFactory.getLogger(InMemoryManager.class);
    private static final int DEFAULT_PARTITION_COUNT = 1;
    private final ConcurrentHashMap<SystemStreamPartition, List<IncomingMessageEnvelope>> bufferedMessages = new ConcurrentHashMap<>();
    private final Map<SystemStream, Integer> systemStreamToPartitions = new ConcurrentHashMap();

    private List<IncomingMessageEnvelope> newSynchronizedLinkedList() {
        return Collections.synchronizedList(new LinkedList());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void put(SystemStreamPartition systemStreamPartition, Object obj, Object obj2) {
        String valueOf = String.valueOf(this.bufferedMessages.get(systemStreamPartition).size());
        if (shouldUseEndOfStreamOffset(obj2)) {
            valueOf = IncomingMessageEnvelope.END_OF_STREAM_OFFSET;
        }
        this.bufferedMessages.get(systemStreamPartition).add(new IncomingMessageEnvelope(systemStreamPartition, valueOf, obj, obj2, 0, 0L, Instant.now().toEpochMilli()));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void put(SystemStreamPartition systemStreamPartition, IncomingMessageEnvelope incomingMessageEnvelope) {
        Preconditions.checkNotNull(incomingMessageEnvelope);
        Preconditions.checkNotNull(incomingMessageEnvelope.getOffset());
        String valueOf = String.valueOf(this.bufferedMessages.get(systemStreamPartition).size());
        if (!incomingMessageEnvelope.getOffset().equals(valueOf)) {
            throw new SamzaException(String.format("Offset mismatch for ssp %s, expected %s found %s, please set the correct offset", systemStreamPartition, valueOf, incomingMessageEnvelope.getOffset()));
        }
        this.bufferedMessages.get(systemStreamPartition).add(incomingMessageEnvelope);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Map<SystemStreamPartition, String> map) {
        return (Map) map.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return poll((SystemStreamPartition) entry.getKey(), (String) entry.getValue());
        }));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean initializeStream(StreamSpec streamSpec) {
        LOG.info("Initializing the stream for {}", streamSpec.getId());
        this.systemStreamToPartitions.put(streamSpec.toSystemStream(), Integer.valueOf(streamSpec.getPartitionCount()));
        for (int i = 0; i < streamSpec.getPartitionCount(); i++) {
            this.bufferedMessages.put(new SystemStreamPartition(streamSpec.toSystemStream(), new Partition(i)), newSynchronizedLinkedList());
        }
        return true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<String, SystemStreamMetadata> getSystemStreamMetadata(String str, Set<String> set) {
        return (Map) ((Map) this.bufferedMessages.entrySet().stream().filter(entry -> {
            return str.equals(((SystemStreamPartition) entry.getKey()).getSystem()) && set.contains(((SystemStreamPartition) entry.getKey()).getStream());
        }).collect(Collectors.groupingBy(entry2 -> {
            return ((SystemStreamPartition) entry2.getKey()).getStream();
        }, Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        })))).entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry3 -> {
            return constructSystemStreamMetadata((String) entry3.getKey(), (Map) entry3.getValue());
        }));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getPartitionCountForSystemStream(SystemStream systemStream) {
        return this.systemStreamToPartitions.getOrDefault(systemStream, 1).intValue();
    }

    private SystemStreamMetadata constructSystemStreamMetadata(String str, Map<SystemStreamPartition, List<IncomingMessageEnvelope>> map) {
        return new SystemStreamMetadata(str, (Map) map.entrySet().stream().collect(Collectors.toMap(entry -> {
            return ((SystemStreamPartition) entry.getKey()).getPartition();
        }, entry2 -> {
            Integer num;
            Integer valueOf;
            int size;
            List list = (List) entry2.getValue();
            if (list.isEmpty()) {
                num = null;
                valueOf = null;
                size = 0;
            } else if (!((IncomingMessageEnvelope) list.get(list.size() - 1)).isEndOfStream()) {
                num = 0;
                valueOf = Integer.valueOf(list.size() - 1);
                size = list.size();
            } else if (list.size() > 1) {
                num = 0;
                valueOf = Integer.valueOf(list.size() - 2);
                size = list.size() - 1;
            } else {
                num = null;
                valueOf = null;
                size = 0;
            }
            return new SystemStreamMetadata.SystemStreamPartitionMetadata(num == null ? null : num.toString(), valueOf == null ? null : valueOf.toString(), Integer.toString(size));
        })));
    }

    private List<IncomingMessageEnvelope> poll(SystemStreamPartition systemStreamPartition, String str) {
        int parseInt = Integer.parseInt(str);
        List<IncomingMessageEnvelope> orDefault = this.bufferedMessages.getOrDefault(systemStreamPartition, new LinkedList());
        return parseInt >= orDefault.size() ? new ArrayList() : ImmutableList.copyOf(orDefault.subList(parseInt, orDefault.size()));
    }

    private static boolean shouldUseEndOfStreamOffset(Object obj) {
        return (obj instanceof EndOfStreamMessage) && ((EndOfStreamMessage) obj).getTaskName() == null;
    }
}
