/*
 * Decompiled with CFR 0.152.
 */
package kafka.tools;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import joptsimple.ArgumentAcceptingOptionSpec;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import joptsimple.OptionSpecBuilder;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.utils.Utils;
import org.apache.log4j.Logger;

public class KafkaMigrationTool {
    private static final Logger logger = Logger.getLogger((String)KafkaMigrationTool.class.getName());
    private static final String KAFKA_07_STATIC_CONSUMER_CLASS_NAME = "kafka.consumer.Consumer";
    private static final String KAFKA_07_CONSUMER_CONFIG_CLASS_NAME = "kafka.consumer.ConsumerConfig";
    private static final String KAFKA_07_CONSUMER_STREAM_CLASS_NAME = "kafka.consumer.KafkaStream";
    private static final String KAFKA_07_CONSUMER_ITERATOR_CLASS_NAME = "kafka.consumer.ConsumerIterator";
    private static final String KAFKA_07_CONSUMER_CONNECTOR_CLASS_NAME = "kafka.javaapi.consumer.ConsumerConnector";
    private static final String KAFKA_07_MESSAGE_AND_METADATA_CLASS_NAME = "kafka.message.MessageAndMetadata";
    private static final String KAFKA_07_MESSAGE_CLASS_NAME = "kafka.message.Message";
    private static final String KAFKA_07_WHITE_LIST_CLASS_NAME = "kafka.consumer.Whitelist";
    private static final String KAFKA_07_TOPIC_FILTER_CLASS_NAME = "kafka.consumer.TopicFilter";
    private static final String KAFKA_07_BLACK_LIST_CLASS_NAME = "kafka.consumer.Blacklist";
    private static Class<?> KafkaStaticConsumer_07 = null;
    private static Class<?> ConsumerConfig_07 = null;
    private static Class<?> ConsumerConnector_07 = null;
    private static Class<?> KafkaStream_07 = null;
    private static Class<?> TopicFilter_07 = null;
    private static Class<?> WhiteList_07 = null;
    private static Class<?> BlackList_07 = null;
    private static Class<?> KafkaConsumerIteratorClass_07 = null;
    private static Class<?> KafkaMessageAndMetatDataClass_07 = null;
    private static Class<?> KafkaMessageClass_07 = null;

    public static void main(String[] stringArray) throws InterruptedException, IOException {
        int n;
        OptionParser optionParser = new OptionParser();
        ArgumentAcceptingOptionSpec argumentAcceptingOptionSpec = optionParser.accepts("consumer.config", "Kafka 0.7 consumer config to consume from the source 0.7 cluster. You man specify multiple of these.").withRequiredArg().describedAs("config file").ofType(String.class);
        ArgumentAcceptingOptionSpec argumentAcceptingOptionSpec2 = optionParser.accepts("producer.config", "Producer config.").withRequiredArg().describedAs("config file").ofType(String.class);
        ArgumentAcceptingOptionSpec argumentAcceptingOptionSpec3 = optionParser.accepts("num.producers", "Number of producer instances").withRequiredArg().describedAs("Number of producers").ofType(Integer.class).defaultsTo((Object)1, (Object[])new Integer[0]);
        ArgumentAcceptingOptionSpec argumentAcceptingOptionSpec4 = optionParser.accepts("zkclient.01.jar", "zkClient 0.1 jar file").withRequiredArg().describedAs("zkClient 0.1 jar file required by Kafka 0.7").ofType(String.class);
        ArgumentAcceptingOptionSpec argumentAcceptingOptionSpec5 = optionParser.accepts("kafka.07.jar", "Kafka 0.7 jar file").withRequiredArg().describedAs("kafka 0.7 jar").ofType(String.class);
        ArgumentAcceptingOptionSpec argumentAcceptingOptionSpec6 = optionParser.accepts("num.streams", "Number of consumer streams").withRequiredArg().describedAs("Number of consumer threads").ofType(Integer.class).defaultsTo((Object)1, (Object[])new Integer[0]);
        ArgumentAcceptingOptionSpec argumentAcceptingOptionSpec7 = optionParser.accepts("whitelist", "Whitelist of topics to migrate from the 0.7 cluster").withRequiredArg().describedAs("Java regex (String)").ofType(String.class);
        ArgumentAcceptingOptionSpec argumentAcceptingOptionSpec8 = optionParser.accepts("blacklist", "Blacklist of topics to migrate from the 0.7 cluster").withRequiredArg().describedAs("Java regex (String)").ofType(String.class);
        ArgumentAcceptingOptionSpec argumentAcceptingOptionSpec9 = optionParser.accepts("queue.size", "Number of messages that are buffered between the 0.7 consumer and 0.8 producer").withRequiredArg().describedAs("Queue size in terms of number of messages").ofType(Integer.class).defaultsTo((Object)10000, (Object[])new Integer[0]);
        OptionSpecBuilder optionSpecBuilder = optionParser.accepts("help", "Print this message.");
        OptionSet optionSet = optionParser.parse(stringArray);
        if (optionSet.has((OptionSpec)optionSpecBuilder)) {
            optionParser.printHelpOn((OutputStream)System.out);
            System.exit(0);
        }
        KafkaMigrationTool.checkRequiredArgs(optionParser, optionSet, new OptionSpec[]{argumentAcceptingOptionSpec, argumentAcceptingOptionSpec2, argumentAcceptingOptionSpec4, argumentAcceptingOptionSpec5});
        int n2 = optionSet.has((OptionSpec)argumentAcceptingOptionSpec7) ? 1 : 0;
        int n3 = n = optionSet.has((OptionSpec)argumentAcceptingOptionSpec8) ? 1 : 0;
        if (n2 + n != 1) {
            System.err.println("Exactly one of whitelist or blacklist is required.");
            System.exit(1);
        }
        String string = (String)optionSet.valueOf((OptionSpec)argumentAcceptingOptionSpec5);
        String string2 = (String)optionSet.valueOf((OptionSpec)argumentAcceptingOptionSpec4);
        String string3 = (String)optionSet.valueOf((OptionSpec)argumentAcceptingOptionSpec);
        int n4 = (Integer)optionSet.valueOf((OptionSpec)argumentAcceptingOptionSpec6);
        String string4 = (String)optionSet.valueOf((OptionSpec)argumentAcceptingOptionSpec2);
        int n5 = (Integer)optionSet.valueOf((OptionSpec)argumentAcceptingOptionSpec3);
        final ArrayList<MigrationThread> arrayList = new ArrayList<MigrationThread>(n4);
        final ArrayList<ProducerThread> arrayList2 = new ArrayList<ProducerThread>(n5);
        try {
            Object object;
            File file = new File(string);
            File file2 = new File(string2);
            ParentLastURLClassLoader parentLastURLClassLoader = new ParentLastURLClassLoader(new URL[]{file.toURI().toURL(), file2.toURI().toURL()});
            ConsumerConfig_07 = parentLastURLClassLoader.loadClass(KAFKA_07_CONSUMER_CONFIG_CLASS_NAME);
            KafkaStaticConsumer_07 = parentLastURLClassLoader.loadClass(KAFKA_07_STATIC_CONSUMER_CLASS_NAME);
            ConsumerConnector_07 = parentLastURLClassLoader.loadClass(KAFKA_07_CONSUMER_CONNECTOR_CLASS_NAME);
            KafkaStream_07 = parentLastURLClassLoader.loadClass(KAFKA_07_CONSUMER_STREAM_CLASS_NAME);
            TopicFilter_07 = parentLastURLClassLoader.loadClass(KAFKA_07_TOPIC_FILTER_CLASS_NAME);
            WhiteList_07 = parentLastURLClassLoader.loadClass(KAFKA_07_WHITE_LIST_CLASS_NAME);
            BlackList_07 = parentLastURLClassLoader.loadClass(KAFKA_07_BLACK_LIST_CLASS_NAME);
            KafkaMessageClass_07 = parentLastURLClassLoader.loadClass(KAFKA_07_MESSAGE_CLASS_NAME);
            KafkaConsumerIteratorClass_07 = parentLastURLClassLoader.loadClass(KAFKA_07_CONSUMER_ITERATOR_CLASS_NAME);
            KafkaMessageAndMetatDataClass_07 = parentLastURLClassLoader.loadClass(KAFKA_07_MESSAGE_AND_METADATA_CLASS_NAME);
            Constructor<?> constructor = ConsumerConfig_07.getConstructor(Properties.class);
            Properties properties = new Properties();
            properties.load(new FileInputStream(string3));
            if (properties.getProperty("shallow.iterator.enable", "").equals("true")) {
                logger.warn((Object)"Shallow iterator should not be used in the migration tool");
                properties.setProperty("shallow.iterator.enable", "false");
            }
            Object obj = constructor.newInstance(properties);
            Method method = KafkaStaticConsumer_07.getMethod("createJavaConsumerConnector", ConsumerConfig_07);
            final Object object2 = method.invoke(null, obj);
            Method method2 = ConsumerConnector_07.getMethod("createMessageStreamsByFilter", TopicFilter_07, Integer.TYPE);
            final Method method3 = ConsumerConnector_07.getMethod("shutdown", new Class[0]);
            Constructor<?> constructor2 = WhiteList_07.getConstructor(String.class);
            Constructor<?> constructor3 = BlackList_07.getConstructor(String.class);
            Object var35_36 = null;
            var35_36 = optionSet.has((OptionSpec)argumentAcceptingOptionSpec7) ? constructor2.newInstance(optionSet.valueOf((OptionSpec)argumentAcceptingOptionSpec7)) : constructor3.newInstance(optionSet.valueOf((OptionSpec)argumentAcceptingOptionSpec8));
            Object object3 = method2.invoke(object2, var35_36, n4);
            Properties properties2 = new Properties();
            properties2.load(new FileInputStream(string4));
            properties2.setProperty("serializer.class", "kafka.serializer.DefaultEncoder");
            int n6 = (Integer)optionSet.valueOf((OptionSpec)argumentAcceptingOptionSpec9);
            ProducerDataChannel<KeyedMessage<byte[], byte[]>> producerDataChannel = new ProducerDataChannel<KeyedMessage<byte[], byte[]>>(n6);
            int n7 = 0;
            Runtime.getRuntime().addShutdownHook(new Thread(){

                @Override
                public void run() {
                    try {
                        method3.invoke(object2, new Object[0]);
                    }
                    catch (Exception exception) {
                        logger.error((Object)"Error while shutting down Kafka consumer", (Throwable)exception);
                    }
                    for (Thread thread : arrayList) {
                        ((MigrationThread)thread).shutdown();
                    }
                    for (Thread thread : arrayList2) {
                        ((ProducerThread)thread).shutdown();
                    }
                    for (Thread thread : arrayList2) {
                        ((ProducerThread)thread).awaitShutdown();
                    }
                    logger.info((Object)"Kafka migration tool shutdown successfully");
                }
            });
            for (Object e : (List)object3) {
                object = new MigrationThread(e, producerDataChannel, n7);
                ++n7;
                ((Thread)object).start();
                arrayList.add((MigrationThread)object);
            }
            String string5 = properties2.getProperty("client.id");
            for (int i = 0; i < n5; ++i) {
                properties2.put("client.id", (String)string5 + "-" + i);
                object = new ProducerConfig(properties2);
                Producer<byte[], byte[]> producer = new Producer<byte[], byte[]>((ProducerConfig)object);
                ProducerThread producerThread = new ProducerThread(producerDataChannel, producer, i);
                producerThread.start();
                arrayList2.add(producerThread);
            }
        }
        catch (Throwable throwable) {
            System.out.println("Kafka migration tool failed due to: " + Utils.stackTrace(throwable));
            logger.error((Object)"Kafka migration tool failed: ", throwable);
        }
    }

    private static void checkRequiredArgs(OptionParser optionParser, OptionSet optionSet, OptionSpec[] optionSpecArray) throws IOException {
        for (OptionSpec optionSpec : optionSpecArray) {
            if (optionSet.has(optionSpec)) continue;
            System.err.println("Missing required argument \"" + optionSpec + "\"");
            optionParser.printHelpOn((OutputStream)System.err);
            System.exit(1);
        }
    }

    private static class ParentLastURLClassLoader
    extends ClassLoader {
        private ChildURLClassLoader childClassLoader;

        public ParentLastURLClassLoader(URL[] uRLArray) {
            super(Thread.currentThread().getContextClassLoader());
            this.childClassLoader = new ChildURLClassLoader(uRLArray, new FindClassClassLoader(this.getParent()));
        }

        @Override
        protected synchronized Class<?> loadClass(String string, boolean bl) throws ClassNotFoundException {
            try {
                return this.childClassLoader.findClass(string);
            }
            catch (ClassNotFoundException classNotFoundException) {
                return super.loadClass(string, bl);
            }
        }

        private static class ChildURLClassLoader
        extends URLClassLoader {
            private FindClassClassLoader realParent;

            public ChildURLClassLoader(URL[] uRLArray, FindClassClassLoader findClassClassLoader) {
                super(uRLArray, (ClassLoader)null);
                this.realParent = findClassClassLoader;
            }

            @Override
            public Class<?> findClass(String string) throws ClassNotFoundException {
                try {
                    return super.findClass(string);
                }
                catch (ClassNotFoundException classNotFoundException) {
                    return this.realParent.loadClass(string);
                }
            }
        }

        private static class FindClassClassLoader
        extends ClassLoader {
            public FindClassClassLoader(ClassLoader classLoader) {
                super(classLoader);
            }

            @Override
            public Class<?> findClass(String string) throws ClassNotFoundException {
                return super.findClass(string);
            }
        }
    }

    static class ProducerThread
    extends Thread {
        private final ProducerDataChannel<KeyedMessage<byte[], byte[]>> producerDataChannel;
        private final Producer<byte[], byte[]> producer;
        private final int threadId;
        private String threadName;
        private Logger logger;
        private CountDownLatch shutdownComplete = new CountDownLatch(1);
        private KeyedMessage<byte[], byte[]> shutdownMessage = new KeyedMessage<Object, Object>("shutdown", null, null);

        public ProducerThread(ProducerDataChannel<KeyedMessage<byte[], byte[]>> producerDataChannel, Producer<byte[], byte[]> producer, int n) {
            this.producerDataChannel = producerDataChannel;
            this.producer = producer;
            this.threadId = n;
            this.threadName = "ProducerThread-" + this.threadId;
            this.logger = Logger.getLogger((String)ProducerThread.class.getName());
            this.setName(this.threadName);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                KeyedMessage<byte[], byte[]> keyedMessage;
                while (!(keyedMessage = this.producerDataChannel.receiveRequest()).equals(this.shutdownMessage)) {
                    this.producer.send(keyedMessage);
                    if (!this.logger.isDebugEnabled()) continue;
                    this.logger.debug((Object)("Sending message " + new String(keyedMessage.message())));
                }
                this.logger.info((Object)("Producer thread " + this.threadName + " finished running"));
            }
            catch (Throwable throwable) {
                this.logger.fatal((Object)"Producer thread failure due to ", throwable);
            }
            finally {
                this.shutdownComplete.countDown();
            }
        }

        public void shutdown() {
            try {
                this.logger.info((Object)("Producer thread " + this.threadName + " shutting down"));
                this.producerDataChannel.sendRequest(this.shutdownMessage);
            }
            catch (InterruptedException interruptedException) {
                this.logger.warn((Object)"Interrupt during shutdown of ProducerThread", (Throwable)interruptedException);
            }
        }

        public void awaitShutdown() {
            try {
                this.shutdownComplete.await();
                this.producer.close();
                this.logger.info((Object)("Producer thread " + this.threadName + " shutdown complete"));
            }
            catch (InterruptedException interruptedException) {
                this.logger.warn((Object)"Interrupt during shutdown of ProducerThread", (Throwable)interruptedException);
            }
        }
    }

    private static class MigrationThread
    extends Thread {
        private final Object stream;
        private final ProducerDataChannel<KeyedMessage<byte[], byte[]>> producerDataChannel;
        private final int threadId;
        private final String threadName;
        private final Logger logger;
        private CountDownLatch shutdownComplete = new CountDownLatch(1);
        private final AtomicBoolean isRunning = new AtomicBoolean(true);

        MigrationThread(Object object, ProducerDataChannel<KeyedMessage<byte[], byte[]>> producerDataChannel, int n) {
            this.stream = object;
            this.producerDataChannel = producerDataChannel;
            this.threadId = n;
            this.threadName = "MigrationThread-" + this.threadId;
            this.logger = Logger.getLogger((String)MigrationThread.class.getName());
            this.setName(this.threadName);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                Method method = KafkaMessageClass_07.getMethod("payload", new Class[0]);
                Method method2 = KafkaMessageAndMetatDataClass_07.getMethod("message", new Class[0]);
                Method method3 = KafkaMessageAndMetatDataClass_07.getMethod("topic", new Class[0]);
                Method method4 = KafkaStream_07.getMethod("iterator", new Class[0]);
                Method method5 = KafkaConsumerIteratorClass_07.getMethod("hasNext", new Class[0]);
                Method method6 = KafkaConsumerIteratorClass_07.getMethod("next", new Class[0]);
                Object object = method4.invoke(this.stream, new Object[0]);
                while (((Boolean)method5.invoke(object, new Object[0])).booleanValue()) {
                    Object object2 = method6.invoke(object, new Object[0]);
                    Object object3 = method2.invoke(object2, new Object[0]);
                    Object object4 = method3.invoke(object2, new Object[0]);
                    Object object5 = method.invoke(object3, new Object[0]);
                    int n = ((ByteBuffer)object5).remaining();
                    byte[] byArray = new byte[n];
                    ((ByteBuffer)object5).get(byArray);
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug((Object)("Migration thread " + this.threadId + " sending message of size " + byArray.length + " to topic " + object4));
                    }
                    KeyedMessage<Object, byte[]> keyedMessage = new KeyedMessage<Object, byte[]>((String)object4, null, byArray);
                    this.producerDataChannel.sendRequest(keyedMessage);
                }
                this.logger.info((Object)("Migration thread " + this.threadName + " finished running"));
            }
            catch (InvocationTargetException invocationTargetException) {
                this.logger.fatal((Object)"Migration thread failure due to root cause ", invocationTargetException.getCause());
            }
            catch (Throwable throwable) {
                this.logger.fatal((Object)"Migration thread failure due to ", throwable);
            }
            finally {
                this.shutdownComplete.countDown();
            }
        }

        public void shutdown() {
            this.logger.info((Object)("Migration thread " + this.threadName + " shutting down"));
            this.isRunning.set(false);
            this.interrupt();
            try {
                this.shutdownComplete.await();
            }
            catch (InterruptedException interruptedException) {
                this.logger.warn((Object)"Interrupt during shutdown of MigrationThread", (Throwable)interruptedException);
            }
            this.logger.info((Object)("Migration thread " + this.threadName + " shutdown complete"));
        }
    }

    static class ProducerDataChannel<T> {
        private final int producerQueueSize;
        private final BlockingQueue<T> producerRequestQueue;

        public ProducerDataChannel(int n) {
            this.producerQueueSize = n;
            this.producerRequestQueue = new ArrayBlockingQueue<T>(this.producerQueueSize);
        }

        public void sendRequest(T t) throws InterruptedException {
            this.producerRequestQueue.put(t);
        }

        public T receiveRequest() throws InterruptedException {
            return this.producerRequestQueue.take();
        }
    }
}

