/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.tests;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.requests.IsolationLevel;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.tests.SmokeTestUtil;

public class EosTestDriver
extends SmokeTestUtil {
    private static final int MAX_NUMBER_OF_KEYS = 20000;
    private static final long MAX_IDLE_TIME_MS = 600000L;
    private static boolean isRunning = true;
    private static int numRecordsProduced = 0;

    private static synchronized void updateNumRecordsProduces(int delta) {
        numRecordsProduced += delta;
    }

    static void generate(String kafka) {
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println("Terminating");
            System.out.flush();
            isRunning = false;
        }));
        Properties producerProps = new Properties();
        producerProps.put("client.id", "EosTest");
        producerProps.put("bootstrap.servers", kafka);
        producerProps.put("key.serializer", StringSerializer.class);
        producerProps.put("value.serializer", IntegerSerializer.class);
        producerProps.put("enable.idempotence", (Object)true);
        KafkaProducer producer = new KafkaProducer(producerProps);
        Random rand = new Random(System.currentTimeMillis());
        while (isRunning) {
            String key = "" + rand.nextInt(20000);
            int value = rand.nextInt(10000);
            ProducerRecord record = new ProducerRecord("data", (Object)key, (Object)value);
            producer.send(record, (metadata, exception) -> {
                if (exception != null) {
                    exception.printStackTrace(System.err);
                    System.err.flush();
                    if (exception instanceof org.apache.kafka.common.errors.TimeoutException) {
                        try {
                            int expired = Integer.parseInt(exception.getMessage().split(" ")[2]);
                            EosTestDriver.updateNumRecordsProduces(-expired);
                        }
                        catch (Exception exception2) {
                            // empty catch block
                        }
                    }
                }
            });
            EosTestDriver.updateNumRecordsProduces(1);
            if (numRecordsProduced % 1000 == 0) {
                System.out.println(numRecordsProduced + " records produced");
                System.out.flush();
            }
            Utils.sleep((long)rand.nextInt(10));
        }
        producer.close();
        System.out.println("Producer closed: " + numRecordsProduced + " records produced");
        Properties props = new Properties();
        props.put("client.id", "verifier");
        props.put("bootstrap.servers", kafka);
        props.put("key.deserializer", ByteArrayDeserializer.class);
        props.put("value.deserializer", ByteArrayDeserializer.class);
        props.put("isolation.level", IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));
        try (KafkaConsumer consumer = new KafkaConsumer(props);){
            List<TopicPartition> partitions = EosTestDriver.getAllPartitions(consumer, "data");
            System.out.println("Partitions: " + partitions);
            consumer.assign(partitions);
            consumer.seekToEnd(partitions);
            for (TopicPartition tp : partitions) {
                System.out.println("End-offset for " + tp + " is " + consumer.position(tp));
            }
        }
        System.out.flush();
    }

    public static void verify(String kafka, boolean withRepartitioning) {
        Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> outputRecordsPerTopicPerPartition;
        List<TopicPartition> partitions3;
        Throwable throwable;
        KafkaConsumer consumer;
        Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> inputRecordsPerTopicPerPartition;
        String[] allOutputTopics;
        String[] allInputTopics;
        Map<TopicPartition, Long> committedOffsets;
        Properties props = new Properties();
        props.put("client.id", "verifier");
        props.put("bootstrap.servers", kafka);
        props.put("key.deserializer", ByteArrayDeserializer.class);
        props.put("value.deserializer", ByteArrayDeserializer.class);
        props.put("isolation.level", IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));
        try (AdminClient adminClient = KafkaAdminClient.create((Properties)props);){
            EosTestDriver.ensureStreamsApplicationDown(adminClient);
            committedOffsets = EosTestDriver.getCommittedOffsets(adminClient, withRepartitioning);
        }
        if (withRepartitioning) {
            allInputTopics = new String[]{"data", "repartition"};
            allOutputTopics = new String[]{"echo", "min", "sum", "repartition", "max", "cnt"};
        } else {
            allInputTopics = new String[]{"data"};
            allOutputTopics = new String[]{"echo", "min", "sum"};
        }
        try (KafkaConsumer consumer2 = new KafkaConsumer(props);){
            List<TopicPartition> partitions2 = EosTestDriver.getAllPartitions(consumer2, allInputTopics);
            consumer2.assign(partitions2);
            consumer2.seekToBeginning(partitions2);
            inputRecordsPerTopicPerPartition = EosTestDriver.getRecords((KafkaConsumer<byte[], byte[]>)consumer2, committedOffsets, withRepartitioning, true);
        }
        catch (Exception e) {
            e.printStackTrace(System.err);
            System.out.println("FAILED");
            return;
        }
        try {
            consumer = new KafkaConsumer(props);
            throwable = null;
            try {
                partitions3 = EosTestDriver.getAllPartitions(consumer, allOutputTopics);
                consumer.assign(partitions3);
                consumer.seekToBeginning(partitions3);
                outputRecordsPerTopicPerPartition = EosTestDriver.getRecords((KafkaConsumer<byte[], byte[]>)consumer, consumer.endOffsets(partitions3), withRepartitioning, false);
            }
            catch (Throwable partitions3) {
                throwable = partitions3;
                throw partitions3;
            }
            finally {
                if (consumer != null) {
                    if (throwable != null) {
                        try {
                            consumer.close();
                        }
                        catch (Throwable partitions3) {
                            throwable.addSuppressed(partitions3);
                        }
                    } else {
                        consumer.close();
                    }
                }
            }
        }
        catch (Exception e) {
            e.printStackTrace(System.err);
            System.out.println("FAILED");
            return;
        }
        EosTestDriver.verifyReceivedAllRecords(inputRecordsPerTopicPerPartition.get("data"), outputRecordsPerTopicPerPartition.get("echo"));
        if (withRepartitioning) {
            EosTestDriver.verifyReceivedAllRecords(inputRecordsPerTopicPerPartition.get("data"), outputRecordsPerTopicPerPartition.get("repartition"));
        }
        EosTestDriver.verifyMin(inputRecordsPerTopicPerPartition.get("data"), outputRecordsPerTopicPerPartition.get("min"));
        EosTestDriver.verifySum(inputRecordsPerTopicPerPartition.get("data"), outputRecordsPerTopicPerPartition.get("sum"));
        if (withRepartitioning) {
            EosTestDriver.verifyMax(inputRecordsPerTopicPerPartition.get("repartition"), outputRecordsPerTopicPerPartition.get("max"));
            EosTestDriver.verifyCnt(inputRecordsPerTopicPerPartition.get("repartition"), outputRecordsPerTopicPerPartition.get("cnt"));
        }
        try {
            consumer = new KafkaConsumer(props);
            throwable = null;
            try {
                partitions3 = EosTestDriver.getAllPartitions(consumer, allOutputTopics);
                consumer.assign(partitions3);
                consumer.seekToBeginning(partitions3);
                EosTestDriver.verifyAllTransactionFinished((KafkaConsumer<byte[], byte[]>)consumer, kafka, withRepartitioning);
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (consumer != null) {
                    if (throwable != null) {
                        try {
                            consumer.close();
                        }
                        catch (Throwable throwable3) {
                            throwable.addSuppressed(throwable3);
                        }
                    } else {
                        consumer.close();
                    }
                }
            }
        }
        catch (Exception e) {
            e.printStackTrace(System.err);
            System.out.println("FAILED");
            return;
        }
        System.out.println("ALL-RECORDS-DELIVERED");
        System.out.flush();
    }

    private static void ensureStreamsApplicationDown(AdminClient adminClient) {
        ConsumerGroupDescription description;
        long maxWaitTime = System.currentTimeMillis() + 600000L;
        do {
            description = EosTestDriver.getConsumerGroupDescription(adminClient);
            if (System.currentTimeMillis() > maxWaitTime && !description.members().isEmpty()) {
                throw new RuntimeException("Streams application not down after 600 seconds. Group: " + description);
            }
            EosTestDriver.sleep(1000L);
        } while (!description.members().isEmpty());
    }

    private static Map<TopicPartition, Long> getCommittedOffsets(AdminClient adminClient, boolean withRepartitioning) {
        Map topicPartitionOffsetAndMetadataMap;
        try {
            ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = adminClient.listConsumerGroupOffsets("EosTest");
            topicPartitionOffsetAndMetadataMap = (Map)listConsumerGroupOffsetsResult.partitionsToOffsetAndMetadata().get(10L, TimeUnit.SECONDS);
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
        HashMap<TopicPartition, Long> committedOffsets = new HashMap<TopicPartition, Long>();
        for (Map.Entry entry : topicPartitionOffsetAndMetadataMap.entrySet()) {
            String topic = ((TopicPartition)entry.getKey()).topic();
            if (!topic.equals("data") && (!withRepartitioning || !topic.equals("repartition"))) continue;
            committedOffsets.put((TopicPartition)entry.getKey(), ((OffsetAndMetadata)entry.getValue()).offset());
        }
        return committedOffsets;
    }

    private static Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> getRecords(KafkaConsumer<byte[], byte[]> consumer, Map<TopicPartition, Long> readEndOffsets, boolean withRepartitioning, boolean isInputTopic) {
        System.err.println("read end offset: " + readEndOffsets);
        HashMap<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> recordPerTopicPerPartition = new HashMap<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>>();
        HashMap<TopicPartition, Long> maxReceivedOffsetPerPartition = new HashMap<TopicPartition, Long>();
        HashMap<TopicPartition, Long> maxConsumerPositionPerPartition = new HashMap<TopicPartition, Long>();
        long maxWaitTime = System.currentTimeMillis() + 600000L;
        boolean allRecordsReceived = false;
        while (!allRecordsReceived && System.currentTimeMillis() < maxWaitTime) {
            ConsumerRecords receivedRecords = consumer.poll(Duration.ofMillis(100L));
            for (ConsumerRecord record : receivedRecords) {
                maxWaitTime = System.currentTimeMillis() + 600000L;
                TopicPartition tp = new TopicPartition(record.topic(), record.partition());
                maxReceivedOffsetPerPartition.put(tp, record.offset());
                long readEndOffset = readEndOffsets.get(tp);
                if (record.offset() < readEndOffset) {
                    EosTestDriver.addRecord((ConsumerRecord<byte[], byte[]>)record, recordPerTopicPerPartition, withRepartitioning);
                    continue;
                }
                if (isInputTopic) continue;
                throw new RuntimeException("FAIL: did receive more records than expected for " + tp + " (expected EOL offset: " + readEndOffset + "; current offset: " + record.offset());
            }
            for (TopicPartition tp : readEndOffsets.keySet()) {
                maxConsumerPositionPerPartition.put(tp, consumer.position(tp));
                if (consumer.position(tp) < readEndOffsets.get(tp)) continue;
                consumer.pause(Collections.singletonList(tp));
            }
            allRecordsReceived = consumer.paused().size() == readEndOffsets.keySet().size();
        }
        if (!allRecordsReceived) {
            System.err.println("Pause partitions (ie, received all data): " + consumer.paused());
            System.err.println("Max received offset per partition: " + maxReceivedOffsetPerPartition);
            System.err.println("Max consumer position per partition: " + maxConsumerPositionPerPartition);
            throw new RuntimeException("FAIL: did not receive all records after 600 sec idle time.");
        }
        return recordPerTopicPerPartition;
    }

    private static void addRecord(ConsumerRecord<byte[], byte[]> record, Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> recordPerTopicPerPartition, boolean withRepartitioning) {
        String topic = record.topic();
        TopicPartition partition = new TopicPartition(topic, record.partition());
        if (!EosTestDriver.verifyTopic(topic, withRepartitioning)) {
            throw new RuntimeException("FAIL: received data from unexpected topic: " + record);
        }
        Map topicRecordsPerPartition = recordPerTopicPerPartition.computeIfAbsent(topic, k -> new HashMap());
        List records = topicRecordsPerPartition.computeIfAbsent(partition, k -> new ArrayList());
        records.add(record);
    }

    private static boolean verifyTopic(String topic, boolean withRepartitioning) {
        boolean validTopic;
        boolean bl = validTopic = "data".equals(topic) || "echo".equals(topic) || "min".equals(topic) || "sum".equals(topic);
        if (withRepartitioning) {
            return validTopic || "repartition".equals(topic) || "max".equals(topic) || "cnt".equals(topic);
        }
        return validTopic;
    }

    private static void verifyReceivedAllRecords(Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> expectedRecords, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> receivedRecords) {
        if (expectedRecords.size() != receivedRecords.size()) {
            throw new RuntimeException("Result verification failed. Received " + receivedRecords.size() + " records but expected " + expectedRecords.size());
        }
        StringDeserializer stringDeserializer = new StringDeserializer();
        IntegerDeserializer integerDeserializer = new IntegerDeserializer();
        for (Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords : receivedRecords.entrySet()) {
            TopicPartition inputTopicPartition = new TopicPartition("data", partitionRecords.getKey().partition());
            Iterator<ConsumerRecord<byte[], byte[]>> expectedRecord = expectedRecords.get(inputTopicPartition).iterator();
            for (ConsumerRecord<byte[], byte[]> receivedRecord : partitionRecords.getValue()) {
                ConsumerRecord<byte[], byte[]> expected = expectedRecord.next();
                String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), (byte[])receivedRecord.key());
                int receivedValue = integerDeserializer.deserialize(receivedRecord.topic(), (byte[])receivedRecord.value());
                String expectedKey = stringDeserializer.deserialize(expected.topic(), (byte[])expected.key());
                int expectedValue = integerDeserializer.deserialize(expected.topic(), (byte[])expected.value());
                if (receivedKey.equals(expectedKey) && receivedValue == expectedValue) continue;
                throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + expectedKey + "," + expectedValue + "> but was <" + receivedKey + "," + receivedValue + ">");
            }
        }
    }

    private static void verifyMin(Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> inputPerTopicPerPartition, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> minPerTopicPerPartition) {
        StringDeserializer stringDeserializer = new StringDeserializer();
        IntegerDeserializer integerDeserializer = new IntegerDeserializer();
        HashMap<String, Integer> currentMinPerKey = new HashMap<String, Integer>();
        for (Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords : minPerTopicPerPartition.entrySet()) {
            TopicPartition inputTopicPartition = new TopicPartition("data", partitionRecords.getKey().partition());
            List<ConsumerRecord<byte[], byte[]>> partitionInput = inputPerTopicPerPartition.get(inputTopicPartition);
            List<ConsumerRecord<byte[], byte[]>> partitionMin = partitionRecords.getValue();
            if (partitionInput.size() != partitionMin.size()) {
                throw new RuntimeException("Result verification failed: expected " + partitionInput.size() + " records for " + partitionRecords.getKey() + " but received " + partitionMin.size());
            }
            Iterator<ConsumerRecord<byte[], byte[]>> inputRecords = partitionInput.iterator();
            for (ConsumerRecord<byte[], byte[]> receivedRecord : partitionMin) {
                ConsumerRecord<byte[], byte[]> input = inputRecords.next();
                String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), (byte[])receivedRecord.key());
                int receivedValue = integerDeserializer.deserialize(receivedRecord.topic(), (byte[])receivedRecord.value());
                String key = stringDeserializer.deserialize(input.topic(), (byte[])input.key());
                int value = integerDeserializer.deserialize(input.topic(), (byte[])input.value());
                Integer min = (Integer)currentMinPerKey.get(key);
                min = min == null ? Integer.valueOf(value) : Integer.valueOf(Math.min(min, value));
                currentMinPerKey.put(key, min);
                if (receivedKey.equals(key) && receivedValue == min) continue;
                throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + key + "," + min + "> but was <" + receivedKey + "," + receivedValue + ">");
            }
        }
    }

    private static void verifySum(Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> inputPerTopicPerPartition, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> minPerTopicPerPartition) {
        StringDeserializer stringDeserializer = new StringDeserializer();
        IntegerDeserializer integerDeserializer = new IntegerDeserializer();
        LongDeserializer longDeserializer = new LongDeserializer();
        HashMap<String, Long> currentSumPerKey = new HashMap<String, Long>();
        for (Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords : minPerTopicPerPartition.entrySet()) {
            TopicPartition inputTopicPartition = new TopicPartition("data", partitionRecords.getKey().partition());
            List<ConsumerRecord<byte[], byte[]>> partitionInput = inputPerTopicPerPartition.get(inputTopicPartition);
            List<ConsumerRecord<byte[], byte[]>> partitionSum = partitionRecords.getValue();
            if (partitionInput.size() != partitionSum.size()) {
                throw new RuntimeException("Result verification failed: expected " + partitionInput.size() + " records for " + partitionRecords.getKey() + " but received " + partitionSum.size());
            }
            Iterator<ConsumerRecord<byte[], byte[]>> inputRecords = partitionInput.iterator();
            for (ConsumerRecord<byte[], byte[]> receivedRecord : partitionSum) {
                ConsumerRecord<byte[], byte[]> input = inputRecords.next();
                String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), (byte[])receivedRecord.key());
                long receivedValue = longDeserializer.deserialize(receivedRecord.topic(), (byte[])receivedRecord.value());
                String key = stringDeserializer.deserialize(input.topic(), (byte[])input.key());
                int value = integerDeserializer.deserialize(input.topic(), (byte[])input.value());
                Long sum = (Long)currentSumPerKey.get(key);
                sum = sum == null ? Long.valueOf(value) : Long.valueOf(sum + (long)value);
                currentSumPerKey.put(key, sum);
                if (receivedKey.equals(key) && receivedValue == sum) continue;
                throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + key + "," + sum + "> but was <" + receivedKey + "," + receivedValue + ">");
            }
        }
    }

    private static void verifyMax(Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> inputPerTopicPerPartition, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> maxPerTopicPerPartition) {
        StringDeserializer stringDeserializer = new StringDeserializer();
        IntegerDeserializer integerDeserializer = new IntegerDeserializer();
        HashMap<String, Integer> currentMinPerKey = new HashMap<String, Integer>();
        for (Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords : maxPerTopicPerPartition.entrySet()) {
            TopicPartition inputTopicPartition = new TopicPartition("repartition", partitionRecords.getKey().partition());
            List<ConsumerRecord<byte[], byte[]>> partitionInput = inputPerTopicPerPartition.get(inputTopicPartition);
            List<ConsumerRecord<byte[], byte[]>> partitionMax = partitionRecords.getValue();
            if (partitionInput.size() != partitionMax.size()) {
                throw new RuntimeException("Result verification failed: expected " + partitionInput.size() + " records for " + partitionRecords.getKey() + " but received " + partitionMax.size());
            }
            Iterator<ConsumerRecord<byte[], byte[]>> inputRecords = partitionInput.iterator();
            for (ConsumerRecord<byte[], byte[]> receivedRecord : partitionMax) {
                ConsumerRecord<byte[], byte[]> input = inputRecords.next();
                String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), (byte[])receivedRecord.key());
                int receivedValue = integerDeserializer.deserialize(receivedRecord.topic(), (byte[])receivedRecord.value());
                String key = stringDeserializer.deserialize(input.topic(), (byte[])input.key());
                int value = integerDeserializer.deserialize(input.topic(), (byte[])input.value());
                Integer max = (Integer)currentMinPerKey.get(key);
                if (max == null) {
                    max = Integer.MIN_VALUE;
                }
                max = Math.max(max, value);
                currentMinPerKey.put(key, max);
                if (receivedKey.equals(key) && receivedValue == max) continue;
                throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + key + "," + max + "> but was <" + receivedKey + "," + receivedValue + ">");
            }
        }
    }

    private static void verifyCnt(Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> inputPerTopicPerPartition, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> cntPerTopicPerPartition) {
        StringDeserializer stringDeserializer = new StringDeserializer();
        LongDeserializer longDeserializer = new LongDeserializer();
        HashMap<String, Long> currentSumPerKey = new HashMap<String, Long>();
        for (Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords : cntPerTopicPerPartition.entrySet()) {
            TopicPartition inputTopicPartition = new TopicPartition("repartition", partitionRecords.getKey().partition());
            List<ConsumerRecord<byte[], byte[]>> partitionInput = inputPerTopicPerPartition.get(inputTopicPartition);
            List<ConsumerRecord<byte[], byte[]>> partitionCnt = partitionRecords.getValue();
            if (partitionInput.size() != partitionCnt.size()) {
                throw new RuntimeException("Result verification failed: expected " + partitionInput.size() + " records for " + partitionRecords.getKey() + " but received " + partitionCnt.size());
            }
            Iterator<ConsumerRecord<byte[], byte[]>> inputRecords = partitionInput.iterator();
            for (ConsumerRecord<byte[], byte[]> receivedRecord : partitionCnt) {
                ConsumerRecord<byte[], byte[]> input = inputRecords.next();
                String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), (byte[])receivedRecord.key());
                long receivedValue = longDeserializer.deserialize(receivedRecord.topic(), (byte[])receivedRecord.value());
                String key = stringDeserializer.deserialize(input.topic(), (byte[])input.key());
                Long cnt = (Long)currentSumPerKey.get(key);
                if (cnt == null) {
                    cnt = 0L;
                }
                cnt = cnt + 1L;
                currentSumPerKey.put(key, cnt);
                if (receivedKey.equals(key) && receivedValue == cnt) continue;
                throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + key + "," + cnt + "> but was <" + receivedKey + "," + receivedValue + ">");
            }
        }
    }

    private static void verifyAllTransactionFinished(KafkaConsumer<byte[], byte[]> consumer, String kafka, boolean withRepartitioning) {
        String[] topics = withRepartitioning ? new String[]{"echo", "min", "sum", "repartition", "max", "cnt"} : new String[]{"echo", "min", "sum"};
        List<TopicPartition> partitions = EosTestDriver.getAllPartitions(consumer, topics);
        consumer.assign(partitions);
        consumer.seekToEnd(partitions);
        for (TopicPartition tp : partitions) {
            System.out.println(tp + " at position " + consumer.position(tp));
        }
        Properties producerProps = new Properties();
        producerProps.put("client.id", "VerifyProducer");
        producerProps.put("bootstrap.servers", kafka);
        producerProps.put("key.serializer", StringSerializer.class);
        producerProps.put("value.serializer", StringSerializer.class);
        producerProps.put("enable.idempotence", (Object)true);
        try (KafkaProducer producer = new KafkaProducer(producerProps);){
            for (TopicPartition tp : partitions) {
                ProducerRecord record = new ProducerRecord(tp.topic(), Integer.valueOf(tp.partition()), (Object)"key", (Object)"value");
                producer.send(record, (metadata, exception) -> {
                    if (exception != null) {
                        exception.printStackTrace(System.err);
                        System.err.flush();
                        Exit.exit((int)1);
                    }
                });
            }
        }
        StringDeserializer stringDeserializer = new StringDeserializer();
        long maxWaitTime = System.currentTimeMillis() + 600000L;
        while (!partitions.isEmpty() && System.currentTimeMillis() < maxWaitTime) {
            ConsumerRecords records = consumer.poll(Duration.ofMillis(100L));
            if (records.isEmpty()) {
                System.out.println("No data received.");
                for (TopicPartition tp : partitions) {
                    System.out.println(tp + " at position " + consumer.position(tp));
                }
            }
            for (ConsumerRecord record : records) {
                maxWaitTime = System.currentTimeMillis() + 600000L;
                String topic = record.topic();
                TopicPartition tp = new TopicPartition(topic, record.partition());
                try {
                    String key = stringDeserializer.deserialize(topic, (byte[])record.key());
                    String value = stringDeserializer.deserialize(topic, (byte[])record.value());
                    if (!("key".equals(key) && "value".equals(value) && partitions.remove(tp))) {
                        throw new RuntimeException("Post transactions verification failed. Received unexpected verification record: Expected record <'key','value'> from one of " + partitions + " but got" + " <" + key + "," + value + "> [" + record.topic() + ", " + record.partition() + "]");
                    }
                    System.out.println("Verifying " + tp + " successful.");
                }
                catch (SerializationException e) {
                    throw new RuntimeException("Post transactions verification failed. Received unexpected verification record: Expected record <'key','value'> from one of " + partitions + " but got " + record, e);
                }
            }
        }
        if (!partitions.isEmpty()) {
            throw new RuntimeException("Could not read all verification records. Did not receive any new record within the last 600 sec.");
        }
    }

    private static List<TopicPartition> getAllPartitions(KafkaConsumer<?, ?> consumer, String ... topics) {
        ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
        for (String topic : topics) {
            for (PartitionInfo info : consumer.partitionsFor(topic)) {
                partitions.add(new TopicPartition(info.topic(), info.partition()));
            }
        }
        return partitions;
    }

    private static ConsumerGroupDescription getConsumerGroupDescription(AdminClient adminClient) {
        ConsumerGroupDescription description;
        try {
            description = (ConsumerGroupDescription)((KafkaFuture)adminClient.describeConsumerGroups(Collections.singleton("EosTest")).describedGroups().get("EosTest")).get(10L, TimeUnit.SECONDS);
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            e.printStackTrace();
            throw new RuntimeException("Unexpected Exception getting group description", e);
        }
        return description;
    }
}

