package org.apache.kafka.streams;

import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.internals.StreamsConfigUtils;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ChangelogRegister;
import org.apache.kafka.streams.processor.internals.ClientUtils;
import org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.GlobalStateManager;
import org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl;
import org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.StreamsProducer;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.ReadOnlySessionStore;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.VersionedKeyValueStore;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.ReadOnlyKeyValueStoreFacade;
import org.apache.kafka.streams.state.internals.ReadOnlyWindowStoreFacade;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.streams.test.TestRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/streams/TopologyTestDriver.class */
public class TopologyTestDriver implements Closeable {
    private final LogContext logContext;
    private final Time mockWallClockTime;
    private InternalTopologyBuilder internalTopologyBuilder;
    StreamTask task;
    private GlobalStateUpdateTask globalStateTask;
    private GlobalStateManager globalStateManager;
    private StateDirectory stateDirectory;
    private Metrics metrics;
    ProcessorTopology processorTopology;
    ProcessorTopology globalTopology;
    private final MockConsumer<byte[], byte[]> consumer;
    private final MockProducer<byte[], byte[]> producer;
    private final TestDriverProducer testDriverProducer;
    private final Map<String, TopicPartition> partitionsByInputTopic;
    private final Map<String, TopicPartition> globalPartitionsByInputTopic;
    private final Map<TopicPartition, AtomicLong> offsetsByTopicOrPatternPartition;
    private final Map<String, Queue<ProducerRecord<byte[], byte[]>>> outputRecordsByTopic;
    private final StreamsConfigUtils.ProcessingMode processingMode;
    private final StateRestoreListener stateRestoreListener;
    private static final Logger log = LoggerFactory.getLogger(TopologyTestDriver.class);
    private static final int PARTITION_ID = 0;
    private static final TaskId TASK_ID = new TaskId(PARTITION_ID, PARTITION_ID);

    /* loaded from: input_file:org/apache/kafka/streams/TopologyTestDriver$KeyValueStoreFacade.class */
    static class KeyValueStoreFacade<K, V> extends ReadOnlyKeyValueStoreFacade<K, V> implements KeyValueStore<K, V> {
        public KeyValueStoreFacade(TimestampedKeyValueStore<K, V> timestampedKeyValueStore) {
            super(timestampedKeyValueStore);
        }

        @Deprecated
        public void init(ProcessorContext processorContext, StateStore stateStore) {
            this.inner.init(processorContext, stateStore);
        }

        public void init(StateStoreContext stateStoreContext, StateStore stateStore) {
            this.inner.init(stateStoreContext, stateStore);
        }

        public void put(K k, V v) {
            this.inner.put(k, ValueAndTimestamp.make(v, -1L));
        }

        public V putIfAbsent(K k, V v) {
            return (V) ValueAndTimestamp.getValueOrNull((ValueAndTimestamp) this.inner.putIfAbsent(k, ValueAndTimestamp.make(v, -1L)));
        }

        public void putAll(List<KeyValue<K, V>> list) {
            for (KeyValue<K, V> keyValue : list) {
                this.inner.put(keyValue.key, ValueAndTimestamp.make(keyValue.value, -1L));
            }
        }

        public V delete(K k) {
            return (V) ValueAndTimestamp.getValueOrNull((ValueAndTimestamp) this.inner.delete(k));
        }

        public void flush() {
            this.inner.flush();
        }

        public void close() {
            this.inner.close();
        }

        public String name() {
            return this.inner.name();
        }

        public boolean persistent() {
            return this.inner.persistent();
        }

        public boolean isOpen() {
            return this.inner.isOpen();
        }

        public Position getPosition() {
            return this.inner.getPosition();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/streams/TopologyTestDriver$MockChangelogRegister.class */
    public static class MockChangelogRegister implements ChangelogRegister {
        private final Set<TopicPartition> restoringPartitions = new HashSet();

        MockChangelogRegister() {
        }

        public void register(TopicPartition topicPartition, ProcessorStateManager processorStateManager) {
            this.restoringPartitions.add(topicPartition);
        }

        public void register(Set<TopicPartition> set, ProcessorStateManager processorStateManager) {
            Iterator<TopicPartition> it = set.iterator();
            while (it.hasNext()) {
                register(it.next(), processorStateManager);
            }
        }

        public void unregister(Collection<TopicPartition> collection) {
            this.restoringPartitions.removeAll(collection);
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/TopologyTestDriver$MockTime.class */
    static class MockTime implements Time {
        private final AtomicLong timeMs;
        private final AtomicLong highResTimeNs;

        MockTime(long j) {
            this.timeMs = new AtomicLong(j);
            this.highResTimeNs = new AtomicLong(j * 1000 * 1000);
        }

        public long milliseconds() {
            return this.timeMs.get();
        }

        public long nanoseconds() {
            return this.highResTimeNs.get();
        }

        public long hiResClockMs() {
            return TimeUnit.NANOSECONDS.toMillis(nanoseconds());
        }

        public void sleep(long j) {
            if (j < 0) {
                throw new IllegalArgumentException("Sleep ms cannot be negative.");
            }
            this.timeMs.addAndGet(j);
            this.highResTimeNs.addAndGet(TimeUnit.MILLISECONDS.toNanos(j));
        }

        public void waitObject(Object obj, Supplier<Boolean> supplier, long j) {
            throw new UnsupportedOperationException();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/TopologyTestDriver$TestDriverProducer.class */
    public static class TestDriverProducer extends StreamsProducer {
        public TestDriverProducer(StreamsConfig streamsConfig, KafkaClientSupplier kafkaClientSupplier, LogContext logContext, Time time) {
            super(streamsConfig, "TopologyTestDriver-StreamThread-1", kafkaClientSupplier, new TaskId(TopologyTestDriver.PARTITION_ID, TopologyTestDriver.PARTITION_ID), UUID.randomUUID(), logContext, time);
        }

        public void commitTransaction(Map<TopicPartition, OffsetAndMetadata> map, ConsumerGroupMetadata consumerGroupMetadata) throws ProducerFencedException {
            super.commitTransaction(map, consumerGroupMetadata);
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/TopologyTestDriver$WindowStoreFacade.class */
    static class WindowStoreFacade<K, V> extends ReadOnlyWindowStoreFacade<K, V> implements WindowStore<K, V> {
        public WindowStoreFacade(TimestampedWindowStore<K, V> timestampedWindowStore) {
            super(timestampedWindowStore);
        }

        @Deprecated
        public void init(ProcessorContext processorContext, StateStore stateStore) {
            this.inner.init(processorContext, stateStore);
        }

        public void init(StateStoreContext stateStoreContext, StateStore stateStore) {
            this.inner.init(stateStoreContext, stateStore);
        }

        public void put(K k, V v, long j) {
            this.inner.put(k, ValueAndTimestamp.make(v, -1L), j);
        }

        public WindowStoreIterator<V> fetch(K k, long j, long j2) {
            return fetch(k, Instant.ofEpochMilli(j), Instant.ofEpochMilli(j2));
        }

        public WindowStoreIterator<V> backwardFetch(K k, long j, long j2) {
            return backwardFetch(k, Instant.ofEpochMilli(j), Instant.ofEpochMilli(j2));
        }

        public KeyValueIterator<Windowed<K>, V> fetch(K k, K k2, long j, long j2) {
            return fetch(k, k2, Instant.ofEpochMilli(j), Instant.ofEpochMilli(j2));
        }

        public KeyValueIterator<Windowed<K>, V> backwardFetch(K k, K k2, long j, long j2) {
            return backwardFetch(k, k2, Instant.ofEpochMilli(j), Instant.ofEpochMilli(j2));
        }

        public KeyValueIterator<Windowed<K>, V> fetchAll(long j, long j2) {
            return fetchAll(Instant.ofEpochMilli(j), Instant.ofEpochMilli(j2));
        }

        public KeyValueIterator<Windowed<K>, V> backwardFetchAll(long j, long j2) {
            return backwardFetchAll(Instant.ofEpochMilli(j), Instant.ofEpochMilli(j2));
        }

        public void flush() {
            this.inner.flush();
        }

        public void close() {
            this.inner.close();
        }

        public String name() {
            return this.inner.name();
        }

        public boolean persistent() {
            return this.inner.persistent();
        }

        public boolean isOpen() {
            return this.inner.isOpen();
        }

        public Position getPosition() {
            return this.inner.getPosition();
        }
    }

    public TopologyTestDriver(Topology topology) {
        this(topology, new Properties());
    }

    public TopologyTestDriver(Topology topology, Properties properties) {
        this(topology, properties, (Instant) null);
    }

    public TopologyTestDriver(Topology topology, Instant instant) {
        this(topology, new Properties(), instant);
    }

    public TopologyTestDriver(Topology topology, Properties properties, Instant instant) {
        this(topology.internalTopologyBuilder, properties, instant == null ? System.currentTimeMillis() : instant.toEpochMilli());
    }

    private TopologyTestDriver(InternalTopologyBuilder internalTopologyBuilder, Properties properties, long j) {
        this.partitionsByInputTopic = new HashMap();
        this.globalPartitionsByInputTopic = new HashMap();
        this.offsetsByTopicOrPatternPartition = new HashMap();
        this.outputRecordsByTopic = new HashMap();
        this.stateRestoreListener = new StateRestoreListener() { // from class: org.apache.kafka.streams.TopologyTestDriver.1
            public void onRestoreStart(TopicPartition topicPartition, String str, long j2, long j3) {
            }

            public void onBatchRestored(TopicPartition topicPartition, String str, long j2, long j3) {
            }

            public void onRestoreEnd(TopicPartition topicPartition, String str, long j2) {
            }
        };
        Properties properties2 = new Properties();
        properties2.putAll(properties);
        properties2.putIfAbsent("bootstrap.servers", "dummy-bootstrap-host:0");
        properties2.putIfAbsent("application.id", "dummy-topology-test-driver-app-id-" + ThreadLocalRandom.current().nextInt());
        ClientUtils.QuietStreamsConfig quietStreamsConfig = new ClientUtils.QuietStreamsConfig(properties2);
        logIfTaskIdleEnabled(quietStreamsConfig);
        this.logContext = new LogContext("topology-test-driver ");
        this.mockWallClockTime = new MockTime(j);
        this.processingMode = StreamsConfigUtils.processingMode(quietStreamsConfig);
        StreamsMetricsImpl streamsMetricsImpl = setupMetrics(quietStreamsConfig);
        setupTopology(internalTopologyBuilder, quietStreamsConfig);
        ThreadCache threadCache = new ThreadCache(this.logContext, Math.max(0L, quietStreamsConfig.getLong("statestore.cache.max.bytes").longValue()), streamsMetricsImpl);
        this.consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
        ByteArraySerializer byteArraySerializer = new ByteArraySerializer();
        this.producer = new MockProducer<byte[], byte[]>(true, byteArraySerializer, byteArraySerializer) { // from class: org.apache.kafka.streams.TopologyTestDriver.2
            public List<PartitionInfo> partitionsFor(String str) {
                return Collections.singletonList(new PartitionInfo(str, TopologyTestDriver.PARTITION_ID, (Node) null, (Node[]) null, (Node[]) null));
            }
        };
        this.testDriverProducer = new TestDriverProducer(quietStreamsConfig, new KafkaClientSupplier() { // from class: org.apache.kafka.streams.TopologyTestDriver.3
            public Producer<byte[], byte[]> getProducer(Map<String, Object> map) {
                return TopologyTestDriver.this.producer;
            }

            public Consumer<byte[], byte[]> getConsumer(Map<String, Object> map) {
                throw new IllegalStateException();
            }

            public Consumer<byte[], byte[]> getRestoreConsumer(Map<String, Object> map) {
                throw new IllegalStateException();
            }

            public Consumer<byte[], byte[]> getGlobalConsumer(Map<String, Object> map) {
                throw new IllegalStateException();
            }
        }, this.logContext, this.mockWallClockTime);
        setupGlobalTask(this.mockWallClockTime, quietStreamsConfig, streamsMetricsImpl, threadCache);
        setupTask(quietStreamsConfig, streamsMetricsImpl, threadCache, this.internalTopologyBuilder.topologyConfigs().getTaskConfig());
    }

    private static void logIfTaskIdleEnabled(StreamsConfig streamsConfig) {
        Long l = streamsConfig.getLong("max.task.idle.ms");
        if (l.longValue() > 0) {
            log.info("Detected {} config in use with TopologyTestDriver (set to {}ms). This means you might need to use TopologyTestDriver#advanceWallClockTime() or enqueue records on all partitions to allow Steams to make progress. TopologyTestDriver will log a message each time it cannot process enqueued records due to {}.", new Object[]{"max.task.idle.ms", l, "max.task.idle.ms"});
        }
    }

    private StreamsMetricsImpl setupMetrics(StreamsConfig streamsConfig) {
        String name = Thread.currentThread().getName();
        this.metrics = new Metrics(new MetricConfig().samples(streamsConfig.getInt("metrics.num.samples").intValue()).recordLevel(Sensor.RecordingLevel.forName(streamsConfig.getString("metrics.recording.level"))).timeWindow(streamsConfig.getLong("metrics.sample.window.ms").longValue(), TimeUnit.MILLISECONDS), this.mockWallClockTime);
        StreamsMetricsImpl streamsMetricsImpl = new StreamsMetricsImpl(this.metrics, "test-client", streamsConfig.getString("built.in.metrics.version"), this.mockWallClockTime);
        TaskMetrics.droppedRecordsSensor(name, TASK_ID.toString(), streamsMetricsImpl);
        return streamsMetricsImpl;
    }

    private void setupTopology(InternalTopologyBuilder internalTopologyBuilder, StreamsConfig streamsConfig) {
        this.internalTopologyBuilder = internalTopologyBuilder;
        this.internalTopologyBuilder.rewriteTopology(streamsConfig);
        this.processorTopology = this.internalTopologyBuilder.buildTopology();
        this.globalTopology = this.internalTopologyBuilder.buildGlobalStateTopology();
        for (String str : this.processorTopology.sourceTopics()) {
            TopicPartition topicPartition = new TopicPartition(str, PARTITION_ID);
            this.partitionsByInputTopic.put(str, topicPartition);
            this.offsetsByTopicOrPatternPartition.put(topicPartition, new AtomicLong());
        }
        this.stateDirectory = new StateDirectory(streamsConfig, this.mockWallClockTime, this.internalTopologyBuilder.hasPersistentStores(), false);
    }

    private void setupGlobalTask(Time time, StreamsConfig streamsConfig, StreamsMetricsImpl streamsMetricsImpl, ThreadCache threadCache) {
        if (this.globalTopology == null) {
            this.globalStateManager = null;
            this.globalStateTask = null;
            return;
        }
        MockConsumer mockConsumer = new MockConsumer(OffsetResetStrategy.NONE);
        for (String str : this.globalTopology.sourceTopics()) {
            TopicPartition topicPartition = new TopicPartition(str, PARTITION_ID);
            this.globalPartitionsByInputTopic.put(str, topicPartition);
            this.offsetsByTopicOrPatternPartition.put(topicPartition, new AtomicLong());
            mockConsumer.updatePartitions(str, Collections.singletonList(new PartitionInfo(str, PARTITION_ID, (Node) null, (Node[]) null, (Node[]) null)));
            mockConsumer.updateBeginningOffsets(Collections.singletonMap(topicPartition, 0L));
            mockConsumer.updateEndOffsets(Collections.singletonMap(topicPartition, 0L));
        }
        this.globalStateManager = new GlobalStateManagerImpl(this.logContext, time, this.globalTopology, mockConsumer, this.stateDirectory, this.stateRestoreListener, streamsConfig);
        GlobalProcessorContextImpl globalProcessorContextImpl = new GlobalProcessorContextImpl(streamsConfig, this.globalStateManager, streamsMetricsImpl, threadCache, time);
        this.globalStateManager.setGlobalProcessorContext(globalProcessorContextImpl);
        this.globalStateTask = new GlobalStateUpdateTask(this.logContext, this.globalTopology, globalProcessorContextImpl, this.globalStateManager, new LogAndContinueExceptionHandler(), time, streamsConfig.getLong("commit.interval.ms").longValue());
        this.globalStateTask.initialize();
        globalProcessorContextImpl.setRecordContext((ProcessorRecordContext) null);
    }

    private void setupTask(StreamsConfig streamsConfig, StreamsMetricsImpl streamsMetricsImpl, ThreadCache threadCache, TopologyConfig.TaskConfig taskConfig) {
        if (this.partitionsByInputTopic.isEmpty()) {
            this.task = null;
            return;
        }
        this.consumer.assign(this.partitionsByInputTopic.values());
        HashMap hashMap = new HashMap();
        Iterator<TopicPartition> it = this.partitionsByInputTopic.values().iterator();
        while (it.hasNext()) {
            hashMap.put(it.next(), 0L);
        }
        this.consumer.updateBeginningOffsets(hashMap);
        ProcessorStateManager processorStateManager = new ProcessorStateManager(TASK_ID, Task.TaskType.ACTIVE, "exactly_once".equals(streamsConfig.getString("processing.guarantee")), this.logContext, this.stateDirectory, new MockChangelogRegister(), this.processorTopology.storeToChangelogTopic(), new HashSet(this.partitionsByInputTopic.values()), false);
        this.task = new StreamTask(TASK_ID, new HashSet(this.partitionsByInputTopic.values()), this.processorTopology, this.consumer, taskConfig, streamsMetricsImpl, this.stateDirectory, threadCache, this.mockWallClockTime, processorStateManager, new RecordCollectorImpl(this.logContext, TASK_ID, this.testDriverProducer, streamsConfig.defaultProductionExceptionHandler(), streamsMetricsImpl, this.processorTopology), new ProcessorContextImpl(TASK_ID, streamsConfig, processorStateManager, streamsMetricsImpl, threadCache), this.logContext, false);
        this.task.initializeIfNeeded();
        this.task.completeRestoration(set -> {
        });
        this.task.processorContext().setRecordContext((ProcessorRecordContext) null);
    }

    public Map<MetricName, ? extends Metric> metrics() {
        return Collections.unmodifiableMap(this.metrics.metrics());
    }

    private void pipeRecord(String str, long j, byte[] bArr, byte[] bArr2, Headers headers) {
        TopicPartition inputTopicOrPatternPartition = getInputTopicOrPatternPartition(str);
        TopicPartition topicPartition = this.globalPartitionsByInputTopic.get(str);
        if (inputTopicOrPatternPartition == null && topicPartition == null) {
            throw new IllegalArgumentException("Unknown topic: " + str);
        }
        if (inputTopicOrPatternPartition != null) {
            enqueueTaskRecord(str, inputTopicOrPatternPartition, j, bArr, bArr2, headers);
            completeAllProcessableWork();
        }
        if (topicPartition != null) {
            processGlobalRecord(topicPartition, j, bArr, bArr2, headers);
        }
    }

    private void enqueueTaskRecord(String str, TopicPartition topicPartition, long j, byte[] bArr, byte[] bArr2, Headers headers) {
        this.task.addRecords(topicPartition, Collections.singleton(new ConsumerRecord(str, topicPartition.partition(), this.offsetsByTopicOrPatternPartition.get(topicPartition).incrementAndGet() - 1, j, TimestampType.CREATE_TIME, bArr == null ? -1 : bArr.length, bArr2 == null ? -1 : bArr2.length, bArr, bArr2, headers, Optional.empty())));
    }

    private void completeAllProcessableWork() {
        captureOutputsAndReEnqueueInternalResults();
        if (this.task != null) {
            this.task.resumePollingForPartitionsWithAvailableSpace();
            this.task.updateLags();
            while (this.task.hasRecordsQueued() && this.task.isProcessable(this.mockWallClockTime.milliseconds())) {
                this.task.process(this.mockWallClockTime.milliseconds());
                this.task.maybePunctuateStreamTime();
                commit(this.task.prepareCommit());
                this.task.postCommit(true);
                captureOutputsAndReEnqueueInternalResults();
            }
            if (this.task.hasRecordsQueued()) {
                log.info("Due to the {} configuration, there are currently some records that cannot be processed. Advancing wall-clock time or enqueuing records on the empty topics will allow Streams to process more.", "max.task.idle.ms");
            }
        }
    }

    private void commit(Map<TopicPartition, OffsetAndMetadata> map) {
        if (this.processingMode == StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA || this.processingMode == StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2) {
            this.testDriverProducer.commitTransaction(map, new ConsumerGroupMetadata("dummy-app-id"));
        } else {
            this.consumer.commitSync(map);
        }
    }

    private void processGlobalRecord(TopicPartition topicPartition, long j, byte[] bArr, byte[] bArr2, Headers headers) {
        this.globalStateTask.update(new ConsumerRecord(topicPartition.topic(), topicPartition.partition(), this.offsetsByTopicOrPatternPartition.get(topicPartition).incrementAndGet() - 1, j, TimestampType.CREATE_TIME, bArr == null ? -1 : bArr.length, bArr2 == null ? -1 : bArr2.length, bArr, bArr2, headers, Optional.empty()));
        this.globalStateTask.flushState();
    }

    private void validateSourceTopicNameRegexPattern(String str) {
        for (String str2 : this.internalTopologyBuilder.fullSourceTopicNames()) {
            if (!str2.equals(str) && Pattern.compile(str2).matcher(str).matches()) {
                throw new TopologyException("Topology add source of type String for topic: " + str2 + " cannot contain regex pattern for input record topic: " + str + " and hence cannot process the message.");
            }
        }
    }

    private TopicPartition getInputTopicOrPatternPartition(String str) {
        if (!this.internalTopologyBuilder.fullSourceTopicNames().isEmpty()) {
            validateSourceTopicNameRegexPattern(str);
        }
        TopicPartition topicPartition = this.partitionsByInputTopic.get(str);
        if (topicPartition == null) {
            for (Map.Entry<String, TopicPartition> entry : this.partitionsByInputTopic.entrySet()) {
                if (Pattern.compile(entry.getKey()).matcher(str).matches()) {
                    return entry.getValue();
                }
            }
        }
        return topicPartition;
    }

    private void captureOutputsAndReEnqueueInternalResults() {
        List<ProducerRecord<byte[], byte[]>> history = this.producer.history();
        this.producer.clear();
        for (ProducerRecord<byte[], byte[]> producerRecord : history) {
            this.outputRecordsByTopic.computeIfAbsent(producerRecord.topic(), str -> {
                return new LinkedList();
            }).add(producerRecord);
            String str2 = producerRecord.topic();
            TopicPartition inputTopicOrPatternPartition = getInputTopicOrPatternPartition(str2);
            TopicPartition topicPartition = this.globalPartitionsByInputTopic.get(str2);
            if (inputTopicOrPatternPartition != null) {
                enqueueTaskRecord(str2, inputTopicOrPatternPartition, producerRecord.timestamp().longValue(), (byte[]) producerRecord.key(), (byte[]) producerRecord.value(), producerRecord.headers());
            }
            if (topicPartition != null) {
                processGlobalRecord(topicPartition, producerRecord.timestamp().longValue(), (byte[]) producerRecord.key(), (byte[]) producerRecord.value(), producerRecord.headers());
            }
        }
    }

    public void advanceWallClockTime(Duration duration) {
        Objects.requireNonNull(duration, "advance cannot be null");
        this.mockWallClockTime.sleep(duration.toMillis());
        if (this.task != null) {
            this.task.maybePunctuateSystemTime();
            commit(this.task.prepareCommit());
            this.task.postCommit(true);
        }
        completeAllProcessableWork();
    }

    private Queue<ProducerRecord<byte[], byte[]>> getRecordsQueue(String str) {
        Queue<ProducerRecord<byte[], byte[]>> queue = this.outputRecordsByTopic.get(str);
        if (queue == null && !this.processorTopology.sinkTopics().contains(str)) {
            log.warn("Unrecognized topic: {}, this can occur if dynamic routing is used and no output has been sent to this topic yet. If not using a TopicNameExtractor, check that the output topic is correct.", str);
        }
        return queue;
    }

    public final <K, V> TestInputTopic<K, V> createInputTopic(String str, Serializer<K> serializer, Serializer<V> serializer2) {
        return new TestInputTopic<>(this, str, serializer, serializer2, Instant.now(), Duration.ZERO);
    }

    public final <K, V> TestInputTopic<K, V> createInputTopic(String str, Serializer<K> serializer, Serializer<V> serializer2, Instant instant, Duration duration) {
        return new TestInputTopic<>(this, str, serializer, serializer2, instant, duration);
    }

    public final <K, V> TestOutputTopic<K, V> createOutputTopic(String str, Deserializer<K> deserializer, Deserializer<V> deserializer2) {
        return new TestOutputTopic<>(this, str, deserializer, deserializer2);
    }

    public final Set<String> producedTopicNames() {
        return Collections.unmodifiableSet(this.outputRecordsByTopic.keySet());
    }

    ProducerRecord<byte[], byte[]> readRecord(String str) {
        Queue<ProducerRecord<byte[], byte[]>> recordsQueue = getRecordsQueue(str);
        if (recordsQueue == null) {
            return null;
        }
        return recordsQueue.poll();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <K, V> TestRecord<K, V> readRecord(String str, Deserializer<K> deserializer, Deserializer<V> deserializer2) {
        Queue<ProducerRecord<byte[], byte[]>> recordsQueue = getRecordsQueue(str);
        if (recordsQueue == null) {
            throw new NoSuchElementException("Uninitialized topic: " + str);
        }
        ProducerRecord<byte[], byte[]> poll = recordsQueue.poll();
        if (poll == null) {
            throw new NoSuchElementException("Empty topic: " + str);
        }
        return new TestRecord<>(deserializer.deserialize(poll.topic(), poll.headers(), (byte[]) poll.key()), deserializer2.deserialize(poll.topic(), poll.headers(), (byte[]) poll.value()), poll.headers(), poll.timestamp());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <K, V> void pipeRecord(String str, TestRecord<K, V> testRecord, Serializer<K> serializer, Serializer<V> serializer2, Instant instant) {
        long longValue;
        byte[] serialize = serializer.serialize(str, testRecord.headers(), testRecord.key());
        byte[] serialize2 = serializer2.serialize(str, testRecord.headers(), testRecord.value());
        if (instant != null) {
            longValue = instant.toEpochMilli();
        } else {
            if (testRecord.timestamp() == null) {
                throw new IllegalStateException("Provided `TestRecord` does not have a timestamp and no timestamp overwrite was provided via `time` parameter.");
            }
            longValue = testRecord.timestamp().longValue();
        }
        pipeRecord(str, longValue, serialize, serialize2, testRecord.headers());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final long getQueueSize(String str) {
        if (getRecordsQueue(str) == null) {
            return 0L;
        }
        return r0.size();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final boolean isEmpty(String str) {
        return getQueueSize(str) == 0;
    }

    public Map<String, StateStore> getAllStateStores() {
        HashMap hashMap = new HashMap();
        for (String str : this.internalTopologyBuilder.allStateStoreNames()) {
            hashMap.put(str, getStateStore(str, false));
        }
        return hashMap;
    }

    public StateStore getStateStore(String str) throws IllegalArgumentException {
        return getStateStore(str, true);
    }

    private StateStore getStateStore(String str, boolean z) {
        StateStore store;
        StateStore store2;
        if (this.task != null && (store2 = this.task.processorContext().stateManager().getStore(str)) != null) {
            if (z) {
                throwIfBuiltInStore(store2);
            }
            return store2;
        }
        if (this.globalStateManager == null || (store = this.globalStateManager.getStore(str)) == null) {
            return null;
        }
        if (z) {
            throwIfBuiltInStore(store);
        }
        return store;
    }

    private void throwIfBuiltInStore(StateStore stateStore) {
        if (stateStore instanceof VersionedKeyValueStore) {
            throw new IllegalArgumentException("Store " + stateStore.name() + " is a versioned key-value store and should be accessed via `getVersionedKeyValueStore()`");
        }
        if (stateStore instanceof TimestampedKeyValueStore) {
            throw new IllegalArgumentException("Store " + stateStore.name() + " is a timestamped key-value store and should be accessed via `getTimestampedKeyValueStore()`");
        }
        if (stateStore instanceof ReadOnlyKeyValueStore) {
            throw new IllegalArgumentException("Store " + stateStore.name() + " is a key-value store and should be accessed via `getKeyValueStore()`");
        }
        if (stateStore instanceof TimestampedWindowStore) {
            throw new IllegalArgumentException("Store " + stateStore.name() + " is a timestamped window store and should be accessed via `getTimestampedWindowStore()`");
        }
        if (stateStore instanceof ReadOnlyWindowStore) {
            throw new IllegalArgumentException("Store " + stateStore.name() + " is a window store and should be accessed via `getWindowStore()`");
        }
        if (stateStore instanceof ReadOnlySessionStore) {
            throw new IllegalArgumentException("Store " + stateStore.name() + " is a session store and should be accessed via `getSessionStore()`");
        }
    }

    public <K, V> KeyValueStore<K, V> getKeyValueStore(String str) {
        TimestampedKeyValueStore stateStore = getStateStore(str, false);
        if (stateStore instanceof TimestampedKeyValueStore) {
            log.info("Method #getTimestampedKeyValueStore() should be used to access a TimestampedKeyValueStore.");
            return new KeyValueStoreFacade(stateStore);
        }
        if (stateStore instanceof KeyValueStore) {
            return (KeyValueStore) stateStore;
        }
        return null;
    }

    public <K, V> KeyValueStore<K, ValueAndTimestamp<V>> getTimestampedKeyValueStore(String str) {
        TimestampedKeyValueStore stateStore = getStateStore(str, false);
        if (stateStore instanceof TimestampedKeyValueStore) {
            return stateStore;
        }
        return null;
    }

    public <K, V> VersionedKeyValueStore<K, V> getVersionedKeyValueStore(String str) {
        VersionedKeyValueStore<K, V> stateStore = getStateStore(str, false);
        if (stateStore instanceof VersionedKeyValueStore) {
            return stateStore;
        }
        return null;
    }

    public <K, V> WindowStore<K, V> getWindowStore(String str) {
        TimestampedWindowStore stateStore = getStateStore(str, false);
        if (stateStore instanceof TimestampedWindowStore) {
            log.info("Method #getTimestampedWindowStore() should be used to access a TimestampedWindowStore.");
            return new WindowStoreFacade(stateStore);
        }
        if (stateStore instanceof WindowStore) {
            return (WindowStore) stateStore;
        }
        return null;
    }

    public <K, V> WindowStore<K, ValueAndTimestamp<V>> getTimestampedWindowStore(String str) {
        TimestampedWindowStore stateStore = getStateStore(str, false);
        if (stateStore instanceof TimestampedWindowStore) {
            return stateStore;
        }
        return null;
    }

    public <K, V> SessionStore<K, V> getSessionStore(String str) {
        SessionStore<K, V> stateStore = getStateStore(str, false);
        if (stateStore instanceof SessionStore) {
            return stateStore;
        }
        return null;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.task != null) {
            this.task.suspend();
            this.task.prepareCommit();
            this.task.postCommit(true);
            this.task.closeClean();
        }
        if (this.globalStateTask != null) {
            try {
                this.globalStateTask.close(false);
            } catch (IOException e) {
            }
        }
        completeAllProcessableWork();
        if (this.task != null && this.task.hasRecordsQueued()) {
            log.warn("Found some records that cannot be processed due to the {} configuration during TopologyTestDriver#close().", "max.task.idle.ms");
        }
        if (this.processingMode == StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE) {
            this.producer.close();
        }
        this.stateDirectory.clean();
    }
}
