/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration.utils;

import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import kafka.api.Request;
import kafka.server.KafkaServer;
import kafka.server.MetadataCache;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.requests.UpdateMetadataRequest;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import scala.Option;

public class IntegrationTestUtils {
    public static final long DEFAULT_TIMEOUT = 30000L;
    private static final long DEFAULT_COMMIT_INTERVAL = 100L;
    public static final String INTERNAL_LEAVE_GROUP_ON_CLOSE = "internal.leave.group.on.close";

    public static void purgeLocalStreamsState(Properties streamsConfiguration) throws IOException {
        File node;
        String tmpDir = TestUtils.IO_TMP_DIR.getPath();
        String path = streamsConfiguration.getProperty("state.dir");
        if (path != null && (node = Paths.get(path, new String[0]).normalize().toFile()).getAbsolutePath().startsWith(tmpDir)) {
            Utils.delete((File)new File(node.getAbsolutePath()));
        }
    }

    public static void cleanStateBeforeTest(EmbeddedKafkaCluster cluster, String ... topics) {
        try {
            cluster.deleteAllTopicsAndWait(30000L);
            for (String topic : topics) {
                cluster.createTopic(topic, 1, 1);
            }
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public static void cleanStateAfterTest(EmbeddedKafkaCluster cluster, KafkaStreams driver) {
        driver.cleanUp();
        try {
            cluster.deleteAllTopicsAndWait(30000L);
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public static <K, V> void produceKeyValuesSynchronously(String topic, Collection<KeyValue<K, V>> records, Properties producerConfig, Time time) throws ExecutionException, InterruptedException {
        IntegrationTestUtils.produceKeyValuesSynchronously(topic, records, producerConfig, time, false);
    }

    public static <K, V> void produceKeyValuesSynchronously(String topic, Collection<KeyValue<K, V>> records, Properties producerConfig, Headers headers, Time time) throws ExecutionException, InterruptedException {
        IntegrationTestUtils.produceKeyValuesSynchronously(topic, records, producerConfig, headers, time, false);
    }

    public static <K, V> void produceKeyValuesSynchronously(String topic, Collection<KeyValue<K, V>> records, Properties producerConfig, Time time, boolean enableTransactions) throws ExecutionException, InterruptedException {
        IntegrationTestUtils.produceKeyValuesSynchronously(topic, records, producerConfig, null, time, enableTransactions);
    }

    public static <K, V> void produceKeyValuesSynchronously(String topic, Collection<KeyValue<K, V>> records, Properties producerConfig, Headers headers, Time time, boolean enableTransactions) throws ExecutionException, InterruptedException {
        for (KeyValue<K, V> record : records) {
            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(topic, Collections.singleton(record), producerConfig, headers, time.milliseconds(), enableTransactions);
            time.sleep(1L);
        }
    }

    public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(String topic, Collection<KeyValue<K, V>> records, Properties producerConfig, Long timestamp) throws ExecutionException, InterruptedException {
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(topic, records, producerConfig, timestamp, false);
    }

    public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(String topic, Collection<KeyValue<K, V>> records, Properties producerConfig, Long timestamp, boolean enableTransactions) throws ExecutionException, InterruptedException {
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(topic, records, producerConfig, null, timestamp, enableTransactions);
    }

    public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(String topic, Collection<KeyValue<K, V>> records, Properties producerConfig, Headers headers, Long timestamp, boolean enabledTransactions) throws ExecutionException, InterruptedException {
        try (KafkaProducer producer = new KafkaProducer(producerConfig);){
            if (enabledTransactions) {
                producer.initTransactions();
                producer.beginTransaction();
            }
            for (KeyValue<K, V> record : records) {
                Future f = producer.send(new ProducerRecord(topic, null, timestamp, record.key, record.value, (Iterable)headers));
                f.get();
            }
            if (enabledTransactions) {
                producer.commitTransaction();
            }
            producer.flush();
        }
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public static <V, K> void produceSynchronously(Properties producerConfig, boolean eos, String topic, List<KeyValueTimestamp<K, V>> toProduce) {
        try (KafkaProducer producer = new KafkaProducer(producerConfig);){
            LinkedList<Future> futures = new LinkedList<Future>();
            for (KeyValueTimestamp<K, V> record : toProduce) {
                Future f = producer.send(new ProducerRecord(topic, null, Long.valueOf(record.timestamp()), record.key(), record.value(), null));
                futures.add(f);
            }
            if (eos) {
                producer.commitTransaction();
            } else {
                producer.flush();
            }
            for (Future future : futures) {
                try {
                    future.get();
                }
                catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException(e);
                    return;
                }
            }
        }
    }

    public static <K, V> void produceAbortedKeyValuesSynchronouslyWithTimestamp(String topic, Collection<KeyValue<K, V>> records, Properties producerConfig, Long timestamp) throws ExecutionException, InterruptedException {
        try (KafkaProducer producer = new KafkaProducer(producerConfig);){
            producer.initTransactions();
            for (KeyValue<K, V> record : records) {
                producer.beginTransaction();
                Future f = producer.send(new ProducerRecord(topic, null, timestamp, record.key, record.value));
                f.get();
                producer.abortTransaction();
            }
        }
    }

    public static <V> void produceValuesSynchronously(String topic, Collection<V> records, Properties producerConfig, Time time) throws ExecutionException, InterruptedException {
        IntegrationTestUtils.produceValuesSynchronously(topic, records, producerConfig, time, false);
    }

    public static <V> void produceValuesSynchronously(String topic, Collection<V> records, Properties producerConfig, Time time, boolean enableTransactions) throws ExecutionException, InterruptedException {
        ArrayList keyedRecords = new ArrayList();
        for (V value : records) {
            KeyValue kv = new KeyValue(null, value);
            keyedRecords.add(kv);
        }
        IntegrationTestUtils.produceKeyValuesSynchronously(topic, keyedRecords, producerConfig, time, enableTransactions);
    }

    public static void waitForCompletion(KafkaStreams streams, int expectedPartitions, int timeoutMilliseconds) {
        double totalLag;
        int lagMetrics;
        long start = System.currentTimeMillis();
        do {
            lagMetrics = 0;
            totalLag = 0.0;
            for (Metric metric : streams.metrics().values()) {
                if (!metric.metricName().name().equals("records-lag")) continue;
                ++lagMetrics;
                totalLag += ((Number)metric.metricValue()).doubleValue();
            }
            if (lagMetrics < expectedPartitions || totalLag != 0.0) continue;
            return;
        } while (System.currentTimeMillis() - start < (long)timeoutMilliseconds);
        throw new RuntimeException(String.format("Timed out waiting for completion. lagMetrics=[%s/%s] totalLag=[%s]", lagMetrics, expectedPartitions, totalLag));
    }

    public static <K, V> List<ConsumerRecord<K, V>> waitUntilMinRecordsReceived(Properties consumerConfig, String topic, int expectedNumRecords) throws InterruptedException {
        return IntegrationTestUtils.waitUntilMinRecordsReceived(consumerConfig, topic, expectedNumRecords, 30000L);
    }

    public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(Properties consumerConfig, String topic, int expectedNumRecords) throws InterruptedException {
        return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, topic, expectedNumRecords, 30000L);
    }

    public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(Properties consumerConfig, String topic, int expectedNumRecords, long waitTime) throws InterruptedException {
        ArrayList accumData = new ArrayList();
        try (KafkaConsumer consumer = IntegrationTestUtils.createConsumer(consumerConfig);){
            TestCondition valuesRead = () -> {
                List readData = IntegrationTestUtils.readKeyValues(topic, consumer, waitTime, expectedNumRecords);
                accumData.addAll(readData);
                return accumData.size() >= expectedNumRecords;
            };
            String conditionDetails = "Did not receive all " + expectedNumRecords + " records from topic " + topic;
            TestUtils.waitForCondition((TestCondition)valuesRead, (long)waitTime, (String)conditionDetails);
        }
        return accumData;
    }

    public static <K, V> List<KeyValue<K, KeyValue<V, Long>>> waitUntilMinKeyValueWithTimestampRecordsReceived(Properties consumerConfig, String topic, int expectedNumRecords, long waitTime) throws InterruptedException {
        ArrayList accumData = new ArrayList();
        try (KafkaConsumer consumer = IntegrationTestUtils.createConsumer(consumerConfig);){
            TestCondition valuesRead = () -> {
                List readData = IntegrationTestUtils.readKeyValuesWithTimestamp(topic, consumer, waitTime, expectedNumRecords);
                accumData.addAll(readData);
                return accumData.size() >= expectedNumRecords;
            };
            String conditionDetails = "Did not receive all " + expectedNumRecords + " records from topic " + topic;
            TestUtils.waitForCondition((TestCondition)valuesRead, (long)waitTime, (String)conditionDetails);
        }
        return accumData;
    }

    public static <K, V> List<KeyValue<K, V>> waitUntilFinalKeyValueRecordsReceived(Properties consumerConfig, String topic, List<KeyValue<K, V>> expectedRecords) throws InterruptedException {
        return IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig, topic, expectedRecords, 30000L);
    }

    public static <K, V> List<KeyValue<K, V>> waitUntilFinalKeyValueRecordsReceived(Properties consumerConfig, String topic, List<KeyValue<K, V>> expectedRecords, long waitTime) throws InterruptedException {
        ArrayList accumData = new ArrayList();
        try (KafkaConsumer consumer = IntegrationTestUtils.createConsumer(consumerConfig);){
            TestCondition valuesRead = () -> {
                List readData = IntegrationTestUtils.readKeyValues(topic, consumer, waitTime, expectedRecords.size());
                accumData.addAll(readData);
                List accumulatedActual = accumData.stream().filter(expectedRecords::contains).collect(Collectors.toList());
                HashMap<Object, List> finalAccumData = new HashMap<Object, List>();
                for (KeyValue kv : accumulatedActual) {
                    finalAccumData.computeIfAbsent(kv.key, key -> new ArrayList()).add(kv);
                }
                HashMap<Object, List> finalExpected = new HashMap<Object, List>();
                for (KeyValue kv : expectedRecords) {
                    finalExpected.computeIfAbsent(kv.key, key -> new ArrayList()).add(kv);
                }
                return finalAccumData.equals(finalExpected);
            };
            String conditionDetails = "Did not receive all " + expectedRecords + " records from topic " + topic;
            TestUtils.waitForCondition((TestCondition)valuesRead, (long)waitTime, (String)conditionDetails);
        }
        return accumData;
    }

    public static <K, V> List<ConsumerRecord<K, V>> waitUntilMinRecordsReceived(Properties consumerConfig, String topic, int expectedNumRecords, long waitTime) throws InterruptedException {
        ArrayList accumData = new ArrayList();
        try (KafkaConsumer consumer = IntegrationTestUtils.createConsumer(consumerConfig);){
            TestCondition valuesRead = () -> {
                List readData = IntegrationTestUtils.readRecords(topic, consumer, waitTime, expectedNumRecords);
                accumData.addAll(readData);
                return accumData.size() >= expectedNumRecords;
            };
            String conditionDetails = "Did not receive all " + expectedNumRecords + " records from topic " + topic;
            TestUtils.waitForCondition((TestCondition)valuesRead, (long)waitTime, (String)conditionDetails);
        }
        return accumData;
    }

    public static <V> List<V> waitUntilMinValuesRecordsReceived(Properties consumerConfig, String topic, int expectedNumRecords) throws InterruptedException {
        return IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfig, topic, expectedNumRecords, 30000L);
    }

    public static <V> List<V> waitUntilMinValuesRecordsReceived(Properties consumerConfig, String topic, int expectedNumRecords, long waitTime) throws InterruptedException {
        ArrayList accumData = new ArrayList();
        try (KafkaConsumer consumer = IntegrationTestUtils.createConsumer(consumerConfig);){
            TestCondition valuesRead = () -> {
                List readData = IntegrationTestUtils.readValues(topic, consumer, waitTime, expectedNumRecords);
                accumData.addAll(readData);
                return accumData.size() >= expectedNumRecords;
            };
            String conditionDetails = "Did not receive all " + expectedNumRecords + " records from topic " + topic;
            TestUtils.waitForCondition((TestCondition)valuesRead, (long)waitTime, (String)conditionDetails);
        }
        return accumData;
    }

    public static void waitForTopicPartitions(List<KafkaServer> servers, List<TopicPartition> partitions, long timeout) throws InterruptedException {
        long end = System.currentTimeMillis() + timeout;
        for (TopicPartition partition : partitions) {
            long remaining = end - System.currentTimeMillis();
            if (remaining <= 0L) {
                throw new AssertionError((Object)("timed out while waiting for partitions to become available. Timeout=" + timeout));
            }
            IntegrationTestUtils.waitUntilMetadataIsPropagated(servers, partition.topic(), partition.partition(), remaining);
        }
    }

    public static void waitUntilMetadataIsPropagated(List<KafkaServer> servers, String topic, int partition, long timeout) throws InterruptedException {
        TestUtils.waitForCondition(() -> {
            for (KafkaServer server : servers) {
                MetadataCache metadataCache = server.apis().metadataCache();
                Option partitionInfo = metadataCache.getPartitionInfo(topic, partition);
                if (partitionInfo.isEmpty()) {
                    return false;
                }
                UpdateMetadataRequest.PartitionState metadataPartitionState = (UpdateMetadataRequest.PartitionState)partitionInfo.get();
                if (Request.isValidBrokerId((int)metadataPartitionState.basePartitionState.leader)) continue;
                return false;
            }
            return true;
        }, (long)timeout, (String)("metadata for topic=" + topic + " partition=" + partition + " not propagated to all brokers"));
    }

    public static void verifyKeyValueTimestamps(Properties consumerConfig, String topic, List<KeyValueTimestamp<String, Long>> expected) {
        List results;
        try {
            results = IntegrationTestUtils.waitUntilMinRecordsReceived(consumerConfig, topic, expected.size());
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        if (results.size() != expected.size()) {
            throw new AssertionError((Object)(IntegrationTestUtils.printRecords(results) + " != " + expected));
        }
        Iterator<KeyValueTimestamp<String, Long>> expectedIterator = expected.iterator();
        for (ConsumerRecord result : results) {
            KeyValueTimestamp<String, Long> expected1 = expectedIterator.next();
            try {
                IntegrationTestUtils.compareKeyValueTimestamp(result, expected1.key(), expected1.value(), expected1.timestamp());
            }
            catch (AssertionError e) {
                throw new AssertionError(IntegrationTestUtils.printRecords(results) + " != " + expected, (Throwable)((Object)e));
            }
        }
    }

    private static <K, V> void compareKeyValueTimestamp(ConsumerRecord<K, V> record, K expectedKey, V expectedValue, long expectedTimestamp) {
        Objects.requireNonNull(record);
        Object recordKey = record.key();
        Object recordValue = record.value();
        long recordTimestamp = record.timestamp();
        AssertionError error = new AssertionError((Object)("Expected <" + expectedKey + ", " + expectedValue + "> with timestamp=" + expectedTimestamp + " but was <" + recordKey + ", " + recordValue + "> with timestamp=" + recordTimestamp));
        if (recordKey != null ? !recordKey.equals(expectedKey) : expectedKey != null) {
            throw error;
        }
        if (recordValue != null ? !recordValue.equals(expectedValue) : expectedValue != null) {
            throw error;
        }
        if (recordTimestamp != expectedTimestamp) {
            throw error;
        }
    }

    private static <K, V> String printRecords(List<ConsumerRecord<K, V>> result) {
        StringBuilder resultStr = new StringBuilder();
        resultStr.append("[\n");
        for (ConsumerRecord<K, V> record : result) {
            resultStr.append("  ").append(record.toString()).append("\n");
        }
        resultStr.append("]");
        return resultStr.toString();
    }

    public static <V> List<V> readValues(String topic, Properties consumerConfig, long waitTime, int maxMessages) {
        List<V> returnList;
        try (KafkaConsumer consumer = IntegrationTestUtils.createConsumer(consumerConfig);){
            returnList = IntegrationTestUtils.readValues(topic, consumer, waitTime, maxMessages);
        }
        return returnList;
    }

    public static <K, V> List<KeyValue<K, V>> readKeyValues(String topic, Properties consumerConfig, long waitTime, int maxMessages) {
        List<KeyValue<K, V>> consumedValues;
        try (KafkaConsumer<K, V> consumer = IntegrationTestUtils.createConsumer(consumerConfig);){
            consumedValues = IntegrationTestUtils.readKeyValues(topic, consumer, waitTime, maxMessages);
        }
        return consumedValues;
    }

    public static KafkaStreams getStartedStreams(Properties streamsConfig, StreamsBuilder builder, boolean clean) {
        KafkaStreams driver = new KafkaStreams(builder.build(), streamsConfig);
        if (clean) {
            driver.cleanUp();
        }
        driver.start();
        return driver;
    }

    private static <V> List<V> readValues(String topic, Consumer<Object, V> consumer, long waitTime, int maxMessages) {
        ArrayList<Object> returnList = new ArrayList<Object>();
        List<KeyValue<Object, V>> kvs = IntegrationTestUtils.readKeyValues(topic, consumer, waitTime, maxMessages);
        for (KeyValue<Object, V> kv : kvs) {
            returnList.add(kv.value);
        }
        return returnList;
    }

    private static <K, V> List<KeyValue<K, V>> readKeyValues(String topic, Consumer<K, V> consumer, long waitTime, int maxMessages) {
        ArrayList<KeyValue<K, V>> consumedValues = new ArrayList<KeyValue<K, V>>();
        List<ConsumerRecord<K, V>> records = IntegrationTestUtils.readRecords(topic, consumer, waitTime, maxMessages);
        for (ConsumerRecord<K, V> record : records) {
            consumedValues.add(new KeyValue(record.key(), record.value()));
        }
        return consumedValues;
    }

    private static <K, V> List<KeyValue<K, KeyValue<V, Long>>> readKeyValuesWithTimestamp(String topic, Consumer<K, V> consumer, long waitTime, int maxMessages) {
        ArrayList<KeyValue<K, KeyValue<V, Long>>> consumedValues = new ArrayList<KeyValue<K, KeyValue<V, Long>>>();
        List<ConsumerRecord<K, V>> records = IntegrationTestUtils.readRecords(topic, consumer, waitTime, maxMessages);
        for (ConsumerRecord<K, V> record : records) {
            consumedValues.add(new KeyValue(record.key(), (Object)KeyValue.pair((Object)record.value(), (Object)record.timestamp())));
        }
        return consumedValues;
    }

    private static <K, V> List<ConsumerRecord<K, V>> readRecords(String topic, Consumer<K, V> consumer, long waitTime, int maxMessages) {
        consumer.subscribe(Collections.singletonList(topic));
        int pollIntervalMs = 100;
        ArrayList<ConsumerRecord<K, V>> consumerRecords = new ArrayList<ConsumerRecord<K, V>>();
        int totalPollTimeMs = 0;
        while ((long)totalPollTimeMs < waitTime && IntegrationTestUtils.continueConsuming(consumerRecords.size(), maxMessages)) {
            totalPollTimeMs += 100;
            ConsumerRecords records = consumer.poll(Duration.ofMillis(100L));
            for (ConsumerRecord record : records) {
                consumerRecords.add(record);
            }
        }
        return consumerRecords;
    }

    private static boolean continueConsuming(int messagesConsumed, int maxMessages) {
        return maxMessages <= 0 || messagesConsumed < maxMessages;
    }

    private static <K, V> KafkaConsumer<K, V> createConsumer(Properties consumerConfig) {
        Properties filtered = new Properties();
        filtered.putAll((Map<?, ?>)consumerConfig);
        filtered.setProperty("auto.offset.reset", "earliest");
        filtered.setProperty("enable.auto.commit", "true");
        return new KafkaConsumer(filtered);
    }

    public static class StateListenerStub
    implements StreamThread.StateListener {
        boolean runningToRevokedSeen = false;
        boolean revokedToPendingShutdownSeen = false;

        public void onChange(Thread thread, ThreadStateTransitionValidator newState, ThreadStateTransitionValidator oldState) {
            if (oldState == StreamThread.State.RUNNING && newState == StreamThread.State.PARTITIONS_REVOKED) {
                this.runningToRevokedSeen = true;
            } else if (oldState == StreamThread.State.PARTITIONS_REVOKED && newState == StreamThread.State.PENDING_SHUTDOWN) {
                this.revokedToPendingShutdownSeen = true;
            }
        }

        public boolean revokedToPendingShutdownSeen() {
            return this.revokedToPendingShutdownSeen;
        }

        public boolean runningToRevokedSeen() {
            return this.runningToRevokedSeen;
        }
    }
}

