package org.apache.samza.storage;

import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import org.apache.samza.SamzaException;
import org.apache.samza.container.TaskName;
import org.apache.samza.job.model.TaskMode;
import org.apache.samza.storage.kv.Entry;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.StreamMetadataCache;
import org.apache.samza.system.SystemAdmins;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.Clock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConverters;

/* loaded from: input_file:org/apache/samza/storage/TaskSideInputHandler.class */
public class TaskSideInputHandler {
    private static final Logger LOG = LoggerFactory.getLogger(TaskSideInputHandler.class);
    private final StorageManagerUtil storageManagerUtil = new StorageManagerUtil();
    private final Map<SystemStreamPartition, String> lastProcessedOffsets = new ConcurrentHashMap();
    private final TaskName taskName;
    private final TaskSideInputStorageManager taskSideInputStorageManager;
    private final Map<SystemStreamPartition, Set<String>> sspToStores;
    private final Map<String, SideInputsProcessor> storeToProcessor;
    private final SystemAdmins systemAdmins;
    private final StreamMetadataCache streamMetadataCache;
    private final CountDownLatch taskCaughtUpLatch;
    private Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata> initialSideInputSSPMetadata;
    private Map<SystemStreamPartition, String> startingOffsets;

    public TaskSideInputHandler(TaskName taskName, TaskMode taskMode, File file, Map<String, StorageEngine> map, Map<String, Set<SystemStreamPartition>> map2, Map<String, SideInputsProcessor> map3, SystemAdmins systemAdmins, StreamMetadataCache streamMetadataCache, CountDownLatch countDownLatch, Clock clock) {
        validateProcessorConfiguration(map2.keySet(), map3);
        this.taskName = taskName;
        this.systemAdmins = systemAdmins;
        this.streamMetadataCache = streamMetadataCache;
        this.storeToProcessor = map3;
        this.taskCaughtUpLatch = countDownLatch;
        this.sspToStores = new HashMap();
        map2.forEach((str, set) -> {
            Iterator it = set.iterator();
            while (it.hasNext()) {
                SystemStreamPartition systemStreamPartition = (SystemStreamPartition) it.next();
                this.sspToStores.computeIfAbsent(systemStreamPartition, systemStreamPartition2 -> {
                    return new HashSet();
                });
                this.sspToStores.computeIfPresent(systemStreamPartition, (systemStreamPartition3, set) -> {
                    set.add(str);
                    return set;
                });
            }
        });
        this.taskSideInputStorageManager = new TaskSideInputStorageManager(taskName, taskMode, file, map, map2, clock);
    }

    public TaskName getTaskName() {
        return this.taskName;
    }

    public void init() {
        this.taskSideInputStorageManager.init();
        Map<? extends SystemStreamPartition, ? extends String> fileOffsets = this.taskSideInputStorageManager.getFileOffsets();
        LOG.info("File offsets for the task {}: {}", this.taskName, fileOffsets);
        this.lastProcessedOffsets.putAll(fileOffsets);
        LOG.info("Last processed offsets for the task {}: {}", this.taskName, this.lastProcessedOffsets);
        this.startingOffsets = getStartingOffsets(fileOffsets, getOldestOffsets());
        LOG.info("Starting offsets for the task {}: {}", this.taskName, this.startingOffsets);
        this.initialSideInputSSPMetadata = getInitialSideInputSSPMetadata();
        LOG.info("Task {} will catch up to offsets {}", this.taskName, this.initialSideInputSSPMetadata);
        this.startingOffsets.forEach((systemStreamPartition, str) -> {
            checkCaughtUp(systemStreamPartition, str, SystemStreamMetadata.OffsetType.UPCOMING);
        });
    }

    private Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata> getInitialSideInputSSPMetadata() {
        HashMap hashMap = new HashMap();
        for (SystemStreamPartition systemStreamPartition : this.sspToStores.keySet()) {
            SystemStreamMetadata systemStreamMetadata = this.streamMetadataCache.getSystemStreamMetadata(systemStreamPartition.getSystemStream(), false);
            if (systemStreamMetadata != null) {
                hashMap.put(systemStreamPartition, (SystemStreamMetadata.SystemStreamPartitionMetadata) systemStreamMetadata.getSystemStreamPartitionMetadata().get(systemStreamPartition.getPartition()));
            }
        }
        return hashMap;
    }

    public synchronized void process(IncomingMessageEnvelope incomingMessageEnvelope) {
        SystemStreamPartition systemStreamPartition = incomingMessageEnvelope.getSystemStreamPartition();
        String offset = incomingMessageEnvelope.getOffset();
        for (String str : this.sspToStores.get(systemStreamPartition)) {
            SideInputsProcessor sideInputsProcessor = this.storeToProcessor.get(str);
            KeyValueStore store = this.taskSideInputStorageManager.getStore(str);
            for (Entry entry : sideInputsProcessor.process(incomingMessageEnvelope, store)) {
                if (entry.getKey() != null) {
                    if (entry.getValue() != null) {
                        store.put(entry.getKey(), entry.getValue());
                    } else {
                        store.delete(entry.getKey());
                    }
                }
            }
        }
        this.lastProcessedOffsets.put(systemStreamPartition, offset);
        checkCaughtUp(systemStreamPartition, offset, SystemStreamMetadata.OffsetType.NEWEST);
    }

    public synchronized void flush() {
        this.taskSideInputStorageManager.flush(this.lastProcessedOffsets);
    }

    public String getStartingOffset(SystemStreamPartition systemStreamPartition) {
        return this.startingOffsets.get(systemStreamPartition);
    }

    public String getLastProcessedOffset(SystemStreamPartition systemStreamPartition) {
        return this.lastProcessedOffsets.get(systemStreamPartition);
    }

    public void stop() {
        this.taskSideInputStorageManager.stop(this.lastProcessedOffsets);
    }

    @VisibleForTesting
    Map<SystemStreamPartition, String> getStartingOffsets(Map<SystemStreamPartition, String> map, Map<SystemStreamPartition, String> map2) {
        HashMap hashMap = new HashMap();
        this.sspToStores.keySet().forEach(systemStreamPartition -> {
            hashMap.put(systemStreamPartition, this.storageManagerUtil.getStartingOffset(systemStreamPartition, this.systemAdmins.getSystemAdmin(systemStreamPartition.getSystem()), (String) map.get(systemStreamPartition), (String) map2.get(systemStreamPartition)));
        });
        return hashMap;
    }

    @VisibleForTesting
    Map<SystemStreamPartition, String> getOldestOffsets() {
        HashMap hashMap = new HashMap();
        Map map = (Map) this.sspToStores.keySet().stream().collect(Collectors.groupingBy((v0) -> {
            return v0.getSystemStream();
        }));
        ((Map) JavaConverters.mapAsJavaMapConverter(this.streamMetadataCache.getStreamMetadata(((scala.collection.mutable.Set) JavaConverters.asScalaSetConverter(map.keySet()).asScala()).toSet(), false)).asJava()).forEach((systemStream, systemStreamMetadata) -> {
            Map systemStreamPartitionMetadata = systemStreamMetadata.getSystemStreamPartitionMetadata();
            for (SystemStreamPartition systemStreamPartition : (List) map.get(systemStream)) {
                hashMap.put(systemStreamPartition, ((SystemStreamMetadata.SystemStreamPartitionMetadata) systemStreamPartitionMetadata.get(systemStreamPartition.getPartition())).getOldestOffset());
            }
        });
        return hashMap;
    }

    private void checkCaughtUp(SystemStreamPartition systemStreamPartition, String str, SystemStreamMetadata.OffsetType offsetType) {
        SystemStreamMetadata.SystemStreamPartitionMetadata systemStreamPartitionMetadata = this.initialSideInputSSPMetadata.get(systemStreamPartition);
        String offset = systemStreamPartitionMetadata == null ? null : systemStreamPartitionMetadata.getOffset(offsetType);
        LOG.trace("Checking offset {} against {} offset {} for {}.", new Object[]{str, offset, offsetType, systemStreamPartition});
        Integer offsetComparator = (str == null || offset == null) ? -1 : this.systemAdmins.getSystemAdmin(systemStreamPartition.getSystem()).offsetComparator(str, offset);
        if (offsetComparator == null || offsetComparator.intValue() < 0) {
            return;
        }
        LOG.info("Side input ssp {} has caught up to offset {}.", systemStreamPartition, offset);
        this.initialSideInputSSPMetadata.remove(systemStreamPartition);
        if (this.initialSideInputSSPMetadata.isEmpty()) {
            this.taskCaughtUpLatch.countDown();
        }
    }

    private void validateProcessorConfiguration(Set<String> set, Map<String, SideInputsProcessor> map) {
        set.forEach(str -> {
            if (!map.containsKey(str)) {
                throw new SamzaException(String.format("Side inputs processor missing for store: %s.", str));
            }
        });
    }
}
