package org.apache.samza.system.inmemory;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemStreamPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/system/inmemory/InMemorySystemConsumer.class */
public class InMemorySystemConsumer implements SystemConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(InMemorySystemConsumer.class);
    private final InMemoryManager memoryManager;
    private final Map<SystemStreamPartition, String> sspToOffset = new ConcurrentHashMap();

    public InMemorySystemConsumer(InMemoryManager inMemoryManager) {
        this.memoryManager = inMemoryManager;
    }

    public void start() {
        LOG.info("Starting in memory system consumer...");
    }

    public void stop() {
        LOG.info("Stopping in memory system consumer...");
    }

    public void register(SystemStreamPartition systemStreamPartition, String str) {
        String str2;
        if (str == null) {
            LOG.info("Registering ssp {} with starting offset null, overriding to 0", systemStreamPartition);
            str2 = "0";
        } else {
            LOG.info("Registering ssp {} with starting offset {}", systemStreamPartition, str);
            str2 = str;
        }
        this.sspToOffset.put(systemStreamPartition, str2);
    }

    public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> set, long j) throws InterruptedException {
        Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll = this.memoryManager.poll((Map) this.sspToOffset.entrySet().stream().filter(entry -> {
            return set.contains(entry.getKey());
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        })));
        for (Map.Entry<SystemStreamPartition, List<IncomingMessageEnvelope>> entry2 : poll.entrySet()) {
            this.sspToOffset.computeIfPresent(entry2.getKey(), (systemStreamPartition, str) -> {
                return String.valueOf(Integer.parseInt(str) + ((List) entry2.getValue()).size());
            });
        }
        return poll;
    }
}
