/*
 * Decompiled with CFR 0.152.
 */
package kafka.tools;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.InputStreamReader;
import java.util.Properties;
import java.util.Random;
import joptsimple.ArgumentAcceptingOptionSpec;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerTimeoutException;
import kafka.consumer.KafkaStream;
import kafka.consumer.ZookeeperConsumerConnector;
import kafka.log.FileMessageSet;
import kafka.log.Log$;
import kafka.message.MessageAndMetadata;
import kafka.message.MessageAndOffset;
import kafka.serializer.StringDecoder;
import kafka.serializer.StringDecoder$;
import kafka.tools.TestLogCleaning$;
import kafka.tools.TestRecord;
import kafka.utils.CommandLineUtils$;
import kafka.utils.IteratorTemplate;
import kafka.utils.Utils$;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import scala.Array$;
import scala.Function0;
import scala.Function1;
import scala.Predef$;
import scala.ScalaObject;
import scala.Serializable;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.TraversableOnce;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.List;
import scala.collection.mutable.StringBuilder;
import scala.io.Codec$;
import scala.io.Source$;
import scala.math.Ordering;
import scala.reflect.ClassManifest$;
import scala.reflect.Manifest$;
import scala.reflect.OptManifest;
import scala.runtime.BoxesRunTime;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public final class TestLogCleaning$
implements ScalaObject {
    public static final TestLogCleaning$ MODULE$;

    static {
        new TestLogCleaning$();
    }

    public void main(String[] args) {
        OptionParser parser = new OptionParser();
        ArgumentAcceptingOptionSpec numMessagesOpt = parser.accepts("messages", "The number of messages to send or consume.").withRequiredArg().describedAs("count").ofType(Long.class).defaultsTo((Object)Predef$.MODULE$.long2Long(Long.MAX_VALUE), (Object[])new Long[0]);
        ArgumentAcceptingOptionSpec numDupsOpt = parser.accepts("duplicates", "The number of duplicates for each key.").withRequiredArg().describedAs("count").ofType(Integer.class).defaultsTo((Object)Predef$.MODULE$.int2Integer(5), (Object[])new Integer[0]);
        ArgumentAcceptingOptionSpec brokerOpt = parser.accepts("broker", "Url to connect to.").withRequiredArg().describedAs("url").ofType(String.class);
        ArgumentAcceptingOptionSpec topicsOpt = parser.accepts("topics", "The number of topics to test.").withRequiredArg().describedAs("count").ofType(Integer.class).defaultsTo((Object)Predef$.MODULE$.int2Integer(1), (Object[])new Integer[0]);
        ArgumentAcceptingOptionSpec percentDeletesOpt = parser.accepts("percent-deletes", "The percentage of updates that are deletes.").withRequiredArg().describedAs("percent").ofType(Integer.class).defaultsTo((Object)Predef$.MODULE$.int2Integer(0), (Object[])new Integer[0]);
        ArgumentAcceptingOptionSpec zkConnectOpt = parser.accepts("zk", "Zk url.").withRequiredArg().describedAs("url").ofType(String.class);
        ArgumentAcceptingOptionSpec sleepSecsOpt = parser.accepts("sleep", "Time to sleep between production and consumption.").withRequiredArg().describedAs("ms").ofType(Integer.class).defaultsTo((Object)Predef$.MODULE$.int2Integer(0), (Object[])new Integer[0]);
        ArgumentAcceptingOptionSpec dumpOpt = parser.accepts("dump", "Dump the message contents of a topic partition that contains test data from this test to standard out.").withRequiredArg().describedAs("directory").ofType(String.class);
        OptionSet options = parser.parse(args);
        if (args.length == 0) {
            CommandLineUtils$.MODULE$.printUsageAndDie(parser, "An integration test for log cleaning.");
        }
        if (options.has((OptionSpec)dumpOpt)) {
            this.dumpLog(new File((String)options.valueOf((OptionSpec)dumpOpt)));
            System.exit(0);
        }
        CommandLineUtils$.MODULE$.checkRequiredArgs(parser, options, (Seq<OptionSpec<?>>)Predef$.MODULE$.wrapRefArray((Object[])new OptionSpec[]{brokerOpt, zkConnectOpt, numMessagesOpt}));
        long messages = (Long)options.valueOf((OptionSpec)numMessagesOpt);
        int percentDeletes = (Integer)options.valueOf((OptionSpec)percentDeletesOpt);
        int dups = (Integer)options.valueOf((OptionSpec)numDupsOpt);
        String brokerUrl = (String)options.valueOf((OptionSpec)brokerOpt);
        int topicCount = (Integer)options.valueOf((OptionSpec)topicsOpt);
        String zkUrl = (String)options.valueOf((OptionSpec)zkConnectOpt);
        int sleepSecs = (Integer)options.valueOf((OptionSpec)sleepSecsOpt);
        int testId$1 = new Random().nextInt(Integer.MAX_VALUE);
        String[] topics = (String[])((TraversableOnce)Predef$.MODULE$.intWrapper(0).until(topicCount).map((Function1)new Serializable(testId$1){
            public static final long serialVersionUID;
            private final int testId$1;

            static {
                long l = serialVersionUID = 0L;
            }

            public final String apply(int n) {
                return new StringBuilder().append((Object)"log-cleaner-test-").append((Object)BoxesRunTime.boxToInteger((int)this.testId$1)).append((Object)"-").append((Object)BoxesRunTime.boxToInteger((int)n)).toString();
            }
            {
                this.testId$1 = n;
            }
        }, IndexedSeq$.MODULE$.canBuildFrom())).toArray(ClassManifest$.MODULE$.classType(String.class));
        Predef$.MODULE$.println((Object)Predef$.MODULE$.augmentString("Producing %d messages...").format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)messages)})));
        File producedDataFile = this.produceMessages(brokerUrl, topics, messages, dups, percentDeletes);
        Predef$.MODULE$.println((Object)Predef$.MODULE$.augmentString("Sleeping for %d seconds...").format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)sleepSecs)})));
        Thread.sleep(sleepSecs * 1000);
        Predef$.MODULE$.println((Object)"Consuming messages...");
        File consumedDataFile = this.consumeMessages(zkUrl, topics);
        int producedLines = this.lineCount(producedDataFile);
        int consumedLines = this.lineCount(consumedDataFile);
        double reduction = 1.0 - (double)consumedLines / (double)producedLines;
        Predef$.MODULE$.println((Object)Predef$.MODULE$.augmentString("%d rows of data produced, %d rows of data consumed (%.1f%% reduction).").format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)producedLines), BoxesRunTime.boxToInteger((int)consumedLines), BoxesRunTime.boxToDouble((double)((double)100 * reduction))})));
        Predef$.MODULE$.println((Object)"De-duplicating and validating output files...");
        this.validateOutput(producedDataFile, consumedDataFile);
        producedDataFile.delete();
        consumedDataFile.delete();
    }

    public void dumpLog(File dir$1) {
        Predef$.MODULE$.require(dir$1.exists(), (Function0)new Serializable(dir$1){
            public static final long serialVersionUID;
            private final File dir$1;

            static {
                long l = serialVersionUID = 0L;
            }

            public final String apply() {
                return new StringBuilder().append((Object)"Non-existent directory: ").append((Object)this.dir$1.getAbsolutePath()).toString();
            }
            {
                this.dir$1 = file;
            }
        });
        Predef$.MODULE$.refArrayOps((Object[])Predef$.MODULE$.refArrayOps((Object[])dir$1.list()).sorted((Ordering)Ordering.String$.MODULE$)).withFilter((Function1)new Serializable(){
            public static final long serialVersionUID;

            static {
                long l = serialVersionUID = 0L;
            }

            public final boolean apply(String file) {
                return file.endsWith(Log$.MODULE$.LogFileSuffix());
            }
        }).foreach((Function1)new Serializable(dir$1){
            public static final long serialVersionUID;
            private final File dir$1;

            static {
                long l = serialVersionUID = 0L;
            }

            public final void apply(String file) {
                FileMessageSet ms = new FileMessageSet(new File(this.dir$1, file));
                ms.foreach((Function1)new Serializable(this){
                    public static final long serialVersionUID;

                    static {
                        long l = serialVersionUID = 0L;
                    }

                    public final void apply(MessageAndOffset entry2) {
                        String key = Utils$.MODULE$.readString(entry2.message().key(), Utils$.MODULE$.readString$default$2());
                        String content = entry2.message().isNull() ? null : Utils$.MODULE$.readString(entry2.message().payload(), Utils$.MODULE$.readString$default$2());
                        Predef$.MODULE$.println((Object)Predef$.MODULE$.augmentString("offset = %s, key = %s, content = %s").format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)entry2.offset()), key, content})));
                    }
                });
            }
            {
                this.dir$1 = file;
            }
        });
    }

    public int lineCount(File file) {
        return Source$.MODULE$.fromFile(file, Codec$.MODULE$.fallbackSystemCodec()).getLines().size();
    }

    public void validateOutput(File producedDataFile, File consumedDataFile) {
        BufferedReader producedReader = this.externalSort(producedDataFile);
        BufferedReader consumedReader = this.externalSort(consumedDataFile);
        IteratorTemplate<TestRecord> produced = this.valuesIterator(producedReader);
        IteratorTemplate<TestRecord> consumed = this.valuesIterator(consumedReader);
        File producedDedupedFile = new File(new StringBuilder().append((Object)producedDataFile.getAbsolutePath()).append((Object)".deduped").toString());
        BufferedWriter producedDeduped = new BufferedWriter(new FileWriter(producedDedupedFile), 0x100000);
        File consumedDedupedFile = new File(new StringBuilder().append((Object)consumedDataFile.getAbsolutePath()).append((Object)".deduped").toString());
        BufferedWriter consumedDeduped = new BufferedWriter(new FileWriter(consumedDedupedFile), 0x100000);
        int total = 0;
        int mismatched = 0;
        while (produced.hasNext() && consumed.hasNext()) {
            TestRecord p = produced.next();
            producedDeduped.write(p.toString());
            producedDeduped.newLine();
            TestRecord c = consumed.next();
            consumedDeduped.write(c.toString());
            consumedDeduped.newLine();
            TestRecord testRecord = p;
            TestRecord testRecord2 = c;
            if (testRecord == null ? testRecord2 != null : !((Object)testRecord).equals(testRecord2)) {
                ++mismatched;
            }
            ++total;
        }
        producedDeduped.close();
        consumedDeduped.close();
        Predef$.MODULE$.require(!produced.hasNext(), (Function0)new Serializable(){
            public static final long serialVersionUID;

            static {
                long l = serialVersionUID = 0L;
            }

            public final String apply() {
                return "Additional values produced not found in consumer log.";
            }
        });
        Predef$.MODULE$.require(!consumed.hasNext(), (Function0)new Serializable(){
            public static final long serialVersionUID;

            static {
                long l = serialVersionUID = 0L;
            }

            public final String apply() {
                return "Additional values consumed not found in producer log.";
            }
        });
        Predef$.MODULE$.println((Object)new StringBuilder().append((Object)"Validated ").append((Object)BoxesRunTime.boxToInteger((int)total)).append((Object)" values, ").append((Object)BoxesRunTime.boxToInteger((int)mismatched)).append((Object)" mismatches.").toString());
        Predef$.MODULE$.require(mismatched == 0, (Function0)new Serializable(){
            public static final long serialVersionUID;

            static {
                long l = serialVersionUID = 0L;
            }

            public final String apply() {
                return "Non-zero number of row mismatches.";
            }
        });
        producedDedupedFile.delete();
        consumedDedupedFile.delete();
    }

    public IteratorTemplate<TestRecord> valuesIterator(BufferedReader reader$1) {
        return new IteratorTemplate<TestRecord>(reader$1){
            private final BufferedReader reader$1;

            public TestRecord makeNext() {
                TestRecord next2 = TestLogCleaning$.MODULE$.readNext(this.reader$1);
                while (next2 != null && next2.delete()) {
                    next2 = TestLogCleaning$.MODULE$.readNext(this.reader$1);
                }
                return next2 == null ? (TestRecord)this.allDone() : next2;
            }
            {
                this.reader$1 = bufferedReader;
            }
        };
    }

    public TestRecord readNext(BufferedReader reader) {
        String line = reader.readLine();
        if (line == null) {
            return null;
        }
        TestRecord curr = new TestRecord(line);
        while (true) {
            if ((line = this.peekLine(reader)) == null) {
                return curr;
            }
            TestRecord next2 = new TestRecord(line);
            if (next2 == null) break;
            String string = next2.topicAndKey();
            String string2 = curr.topicAndKey();
            if (string != null ? !string.equals(string2) : string2 != null) break;
            curr = next2;
            reader.readLine();
        }
        return curr;
    }

    /*
     * WARNING - void declaration
     */
    public String peekLine(BufferedReader reader) {
        void var2_2;
        reader.mark(4096);
        String line = reader.readLine();
        reader.reset();
        return var2_2;
    }

    public BufferedReader externalSort(File file) {
        ProcessBuilder builder = new ProcessBuilder("sort", "--key=1,2", "--stable", "--buffer-size=20%", new StringBuilder().append((Object)"--temporary-directory=").append((Object)System.getProperty("java.io.tmpdir")).toString(), file.getAbsolutePath());
        Process process$1 = builder.start();
        new Thread(process$1){
            private final Process process$1;

            public void run() {
                int exitCode = this.process$1.waitFor();
                if (exitCode != 0) {
                    System.err.println("Process exited abnormally.");
                    while (this.process$1.getErrorStream().available() > 0) {
                        System.err.write(this.process$1.getErrorStream().read());
                    }
                }
            }
            {
                this.process$1 = process;
            }
        }.start();
        return new BufferedReader(new InputStreamReader(process$1.getInputStream()), 0xA00000);
    }

    public File produceMessages(String brokerUrl, String[] topics$1, long messages, int dups, int percentDeletes$1) {
        Properties producerProps = new Properties();
        producerProps.setProperty("block.on.buffer.full", "true");
        producerProps.setProperty("bootstrap.servers", brokerUrl);
        producerProps.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        producerProps.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaProducer producer$1 = new KafkaProducer(producerProps);
        Random rand$1 = new Random(1L);
        int keyCount$1 = (int)(messages / (long)dups);
        File producedFile = File.createTempFile("kafka-log-cleaner-produced-", ".txt");
        Predef$.MODULE$.println((Object)new StringBuilder().append((Object)"Logging produce requests to ").append((Object)producedFile.getAbsolutePath()).toString());
        BufferedWriter producedWriter$1 = new BufferedWriter(new FileWriter(producedFile), 0x100000);
        Predef$.MODULE$.longWrapper(0L).until((Object)BoxesRunTime.boxToLong((long)(messages * (long)topics$1.length))).foreach((Function1)new Serializable(topics$1, percentDeletes$1, producer$1, rand$1, keyCount$1, producedWriter$1){
            public static final long serialVersionUID;
            private final String[] topics$1;
            private final int percentDeletes$1;
            private final KafkaProducer producer$1;
            private final Random rand$1;
            private final int keyCount$1;
            private final BufferedWriter producedWriter$1;

            static {
                long l = serialVersionUID = 0L;
            }

            public final void apply(long i) {
                this.apply$mcVJ$sp(i);
            }

            public void apply$mcVJ$sp(long v1) {
                boolean delete2;
                String topic = this.topics$1[(int)(v1 % (long)this.topics$1.length)];
                int key = this.rand$1.nextInt(this.keyCount$1);
                boolean bl = delete2 = v1 % 100L < (long)this.percentDeletes$1;
                ProducerRecord msg = delete2 ? new ProducerRecord(topic, (Object)((Object)BoxesRunTime.boxToInteger((int)key)).toString().getBytes(), null) : new ProducerRecord(topic, (Object)((Object)BoxesRunTime.boxToInteger((int)key)).toString().getBytes(), (Object)((Object)BoxesRunTime.boxToLong((long)v1)).toString().getBytes());
                this.producer$1.send(msg);
                this.producedWriter$1.write(new TestRecord(topic, key, v1, delete2).toString());
                this.producedWriter$1.newLine();
            }
            {
                this.topics$1 = stringArray;
                this.percentDeletes$1 = n;
                this.producer$1 = kafkaProducer;
                this.rand$1 = random;
                this.keyCount$1 = n2;
                this.producedWriter$1 = bufferedWriter;
            }
        });
        producedWriter$1.close();
        producer$1.close();
        return producedFile;
    }

    public ZookeeperConsumerConnector makeConsumer(String zkUrl, String[] topics) {
        Properties consumerProps = new Properties();
        consumerProps.setProperty("group.id", new StringBuilder().append((Object)"log-cleaner-test-").append((Object)BoxesRunTime.boxToInteger((int)new Random().nextInt(Integer.MAX_VALUE))).toString());
        consumerProps.setProperty("zookeeper.connect", zkUrl);
        consumerProps.setProperty("consumer.timeout.ms", ((Object)BoxesRunTime.boxToInteger((int)20000)).toString());
        consumerProps.setProperty("auto.offset.reset", "smallest");
        return new ZookeeperConsumerConnector(new ConsumerConfig(consumerProps));
    }

    public File consumeMessages(String zkUrl, String[] topics) {
        ZookeeperConsumerConnector connector = this.makeConsumer(zkUrl, topics);
        Map<String, List<KafkaStream<String, String>>> streams$1 = connector.createMessageStreams((Map<String, Object>)Predef$.MODULE$.refArrayOps((Object[])Predef$.MODULE$.refArrayOps((Object[])topics).map((Function1)new Serializable(){
            public static final long serialVersionUID;

            static {
                long l = serialVersionUID = 0L;
            }

            public final Tuple2<String, Object> apply(String topic) {
                return new Tuple2((Object)topic, (Object)BoxesRunTime.boxToInteger((int)1));
            }
        }, Array$.MODULE$.canBuildFrom(ClassManifest$.MODULE$.classType(Tuple2.class, (OptManifest)ClassManifest$.MODULE$.classType(String.class), (Seq)Predef$.MODULE$.wrapRefArray((Object[])new OptManifest[]{Manifest$.MODULE$.Int()}))))).toMap(Predef$.MODULE$.conforms()), new StringDecoder(StringDecoder$.MODULE$.init$default$1()), new StringDecoder(StringDecoder$.MODULE$.init$default$1()));
        File consumedFile = File.createTempFile("kafka-log-cleaner-consumed-", ".txt");
        Predef$.MODULE$.println((Object)new StringBuilder().append((Object)"Logging consumed messages to ").append((Object)consumedFile.getAbsolutePath()).toString());
        BufferedWriter consumedWriter$1 = new BufferedWriter(new FileWriter(consumedFile));
        Predef$.MODULE$.refArrayOps((Object[])topics).foreach((Function1)new Serializable(streams$1, consumedWriter$1){
            public static final long serialVersionUID;
            private final Map streams$1;
            public final BufferedWriter consumedWriter$1;

            static {
                long l = serialVersionUID = 0L;
            }

            public final void apply(String topic$2) {
                KafkaStream stream = (KafkaStream)((IterableLike)this.streams$1.apply((Object)topic$2)).head();
                try {
                    stream.foreach((Function1)new Serializable(this, topic$2){
                        public static final long serialVersionUID;
                        private final anonfun.consumeMessages.1 $outer;
                        private final String topic$2;

                        static {
                            long l = serialVersionUID = 0L;
                        }

                        public final void apply(MessageAndMetadata<String, String> item) {
                            boolean delete2 = item.message() == null;
                            long value2 = delete2 ? -1L : Predef$.MODULE$.augmentString(item.message()).toLong();
                            this.$outer.consumedWriter$1.write(new TestRecord(this.topic$2, Predef$.MODULE$.augmentString(item.key()).toInt(), value2, delete2).toString());
                            this.$outer.consumedWriter$1.newLine();
                        }
                        {
                            if ($outer == null) {
                                throw new NullPointerException();
                            }
                            this.$outer = $outer;
                            this.topic$2 = string;
                        }
                    });
                }
                catch (ConsumerTimeoutException consumerTimeoutException) {
                    // empty catch block
                }
            }
            {
                this.streams$1 = map;
                this.consumedWriter$1 = bufferedWriter;
            }
        });
        consumedWriter$1.close();
        connector.shutdown();
        return consumedFile;
    }

    private TestLogCleaning$() {
        MODULE$ = this;
    }
}

