package org.apache.flume.channel.kafka;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.ConsumerTimeoutException;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.flume.channel.BasicChannelSemantics;
import org.apache.flume.channel.BasicTransactionSemantics;
import org.apache.flume.conf.ConfigurationException;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.instrumentation.kafka.KafkaChannelCounter;
import org.apache.flume.source.avro.AvroFlumeEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flume/channel/kafka/KafkaChannel.class */
public class KafkaChannel extends BasicChannelSemantics {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaChannel.class);
    private Producer<String, byte[]> producer;
    private KafkaChannelCounter counter;
    private final Properties kafkaConf = new Properties();
    private final String channelUUID = UUID.randomUUID().toString();
    private AtomicReference<String> topic = new AtomicReference<>();
    private boolean parseAsFlumeEvent = true;
    private final Map<String, Integer> topicCountMap = Collections.synchronizedMap(new HashMap());
    private final List<ConsumerAndIterator> consumers = Collections.synchronizedList(new LinkedList());
    private final ThreadLocal<ConsumerAndIterator> consumerAndIter = new ThreadLocal<ConsumerAndIterator>() { // from class: org.apache.flume.channel.kafka.KafkaChannel.1
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.lang.ThreadLocal
        public ConsumerAndIterator initialValue() {
            return KafkaChannel.this.createConsumerAndIter();
        }
    };

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flume/channel/kafka/KafkaChannel$ConsumerAndIterator.class */
    public class ConsumerAndIterator {
        final ConsumerConnector consumer;
        final ConsumerIterator<byte[], byte[]> iterator;
        final String uuid;
        final LinkedList<Event> failedEvents = new LinkedList<>();

        ConsumerAndIterator(ConsumerConnector consumerConnector, ConsumerIterator<byte[], byte[]> consumerIterator, String str) {
            this.consumer = consumerConnector;
            this.iterator = consumerIterator;
            this.uuid = str;
        }
    }

    /* loaded from: input_file:org/apache/flume/channel/kafka/KafkaChannel$KafkaTransaction.class */
    private class KafkaTransaction extends BasicTransactionSemantics {
        private TransactionType type;
        private Optional<ByteArrayOutputStream> tempOutStream;
        private Optional<LinkedList<byte[]>> serializedEvents;
        private Optional<LinkedList<Event>> events;
        private Optional<SpecificDatumWriter<AvroFlumeEvent>> writer;
        private Optional<SpecificDatumReader<AvroFlumeEvent>> reader;
        private BinaryEncoder encoder;
        private BinaryDecoder decoder;
        private final String batchUUID;
        private boolean eventTaken;

        private KafkaTransaction() {
            this.type = TransactionType.NONE;
            this.tempOutStream = Optional.absent();
            this.serializedEvents = Optional.absent();
            this.events = Optional.absent();
            this.writer = Optional.absent();
            this.reader = Optional.absent();
            this.encoder = null;
            this.decoder = null;
            this.batchUUID = UUID.randomUUID().toString();
            this.eventTaken = false;
        }

        protected void doPut(Event event) throws InterruptedException {
            this.type = TransactionType.PUT;
            if (!this.serializedEvents.isPresent()) {
                this.serializedEvents = Optional.of(new LinkedList());
            }
            try {
                if (!this.tempOutStream.isPresent()) {
                    this.tempOutStream = Optional.of(new ByteArrayOutputStream());
                }
                if (!this.writer.isPresent()) {
                    this.writer = Optional.of(new SpecificDatumWriter(AvroFlumeEvent.class));
                }
                ((ByteArrayOutputStream) this.tempOutStream.get()).reset();
                AvroFlumeEvent avroFlumeEvent = new AvroFlumeEvent(KafkaChannel.toCharSeqMap(event.getHeaders()), ByteBuffer.wrap(event.getBody()));
                this.encoder = EncoderFactory.get().directBinaryEncoder((OutputStream) this.tempOutStream.get(), this.encoder);
                ((SpecificDatumWriter) this.writer.get()).write(avroFlumeEvent, this.encoder);
                ((LinkedList) this.serializedEvents.get()).add(((ByteArrayOutputStream) this.tempOutStream.get()).toByteArray());
            } catch (Exception e) {
                throw new ChannelException("Error while serializing event", e);
            }
        }

        protected Event doTake() throws InterruptedException {
            Event withBody;
            this.type = TransactionType.TAKE;
            try {
                if (!((ConsumerAndIterator) KafkaChannel.this.consumerAndIter.get()).uuid.equals(KafkaChannel.this.channelUUID)) {
                    KafkaChannel.LOGGER.info("UUID mismatch, creating new consumer");
                    KafkaChannel.this.decommissionConsumerAndIterator((ConsumerAndIterator) KafkaChannel.this.consumerAndIter.get());
                    KafkaChannel.this.consumerAndIter.remove();
                }
            } catch (Exception e) {
                KafkaChannel.LOGGER.warn("Error while shutting down consumer", e);
            }
            if (!this.events.isPresent()) {
                this.events = Optional.of(new LinkedList());
            }
            if (((ConsumerAndIterator) KafkaChannel.this.consumerAndIter.get()).failedEvents.isEmpty()) {
                try {
                    ConsumerIterator<byte[], byte[]> consumerIterator = ((ConsumerAndIterator) KafkaChannel.this.consumerAndIter.get()).iterator;
                    long nanoTime = System.nanoTime();
                    consumerIterator.hasNext();
                    KafkaChannel.this.counter.addToKafkaEventGetTimer((System.nanoTime() - nanoTime) / 1000000);
                    if (KafkaChannel.this.parseAsFlumeEvent) {
                        this.decoder = DecoderFactory.get().directBinaryDecoder(new ByteArrayInputStream((byte[]) consumerIterator.next().message()), this.decoder);
                        if (!this.reader.isPresent()) {
                            this.reader = Optional.of(new SpecificDatumReader(AvroFlumeEvent.class));
                        }
                        AvroFlumeEvent avroFlumeEvent = (AvroFlumeEvent) ((SpecificDatumReader) this.reader.get()).read((Object) null, this.decoder);
                        withBody = EventBuilder.withBody(avroFlumeEvent.getBody().array(), KafkaChannel.toStringMap(avroFlumeEvent.getHeaders()));
                    } else {
                        withBody = EventBuilder.withBody((byte[]) consumerIterator.next().message(), Collections.EMPTY_MAP);
                    }
                } catch (Exception e2) {
                    KafkaChannel.LOGGER.warn("Error while getting events from Kafka", e2);
                    throw new ChannelException("Error while getting events from Kafka", e2);
                } catch (ConsumerTimeoutException e3) {
                    if (!KafkaChannel.LOGGER.isDebugEnabled()) {
                        return null;
                    }
                    KafkaChannel.LOGGER.debug("Timed out while waiting for data to come from Kafka", e3);
                    return null;
                }
            } else {
                withBody = ((ConsumerAndIterator) KafkaChannel.this.consumerAndIter.get()).failedEvents.removeFirst();
            }
            this.eventTaken = true;
            ((LinkedList) this.events.get()).add(withBody);
            return withBody;
        }

        protected void doCommit() throws InterruptedException {
            if (this.type.equals(TransactionType.NONE)) {
                return;
            }
            if (!this.type.equals(TransactionType.PUT)) {
                if (((ConsumerAndIterator) KafkaChannel.this.consumerAndIter.get()).failedEvents.isEmpty() && this.eventTaken) {
                    long nanoTime = System.nanoTime();
                    ((ConsumerAndIterator) KafkaChannel.this.consumerAndIter.get()).consumer.commitOffsets();
                    KafkaChannel.this.counter.addToKafkaCommitTimer((System.nanoTime() - nanoTime) / 1000000);
                }
                KafkaChannel.this.counter.addToEventTakeSuccessCount(Long.valueOf(((LinkedList) this.events.get()).size()).longValue());
                ((LinkedList) this.events.get()).clear();
                return;
            }
            try {
                ArrayList arrayList = new ArrayList(((LinkedList) this.serializedEvents.get()).size());
                Iterator it = ((LinkedList) this.serializedEvents.get()).iterator();
                while (it.hasNext()) {
                    arrayList.add(new KeyedMessage((String) KafkaChannel.this.topic.get(), (Object) null, this.batchUUID, (byte[]) it.next()));
                }
                long nanoTime2 = System.nanoTime();
                KafkaChannel.this.producer.send(arrayList);
                KafkaChannel.this.counter.addToKafkaEventSendTimer((System.nanoTime() - nanoTime2) / 1000000);
                KafkaChannel.this.counter.addToEventPutSuccessCount(Long.valueOf(arrayList.size()).longValue());
                ((LinkedList) this.serializedEvents.get()).clear();
            } catch (Exception e) {
                KafkaChannel.LOGGER.warn("Sending events to Kafka failed", e);
                throw new ChannelException("Commit failed as send to Kafka failed", e);
            }
        }

        protected void doRollback() throws InterruptedException {
            if (this.type.equals(TransactionType.NONE)) {
                return;
            }
            if (this.type.equals(TransactionType.PUT)) {
                ((LinkedList) this.serializedEvents.get()).clear();
                return;
            }
            KafkaChannel.this.counter.addToRollbackCounter(Long.valueOf(((LinkedList) this.events.get()).size()).longValue());
            ((ConsumerAndIterator) KafkaChannel.this.consumerAndIter.get()).failedEvents.addAll((Collection) this.events.get());
            ((LinkedList) this.events.get()).clear();
        }
    }

    /* loaded from: input_file:org/apache/flume/channel/kafka/KafkaChannel$TransactionType.class */
    private enum TransactionType {
        PUT,
        TAKE,
        NONE
    }

    public void start() {
        try {
            LOGGER.info("Starting Kafka Channel: " + getName());
            this.producer = new Producer<>(new ProducerConfig(this.kafkaConf));
            LOGGER.info("Topic = " + this.topic.get());
            this.topicCountMap.put(this.topic.get(), 1);
            this.counter.start();
            super.start();
        } catch (Exception e) {
            LOGGER.error("Could not start producer");
            throw new FlumeException("Unable to create Kafka Connections. Check whether Kafka Brokers are up and that the Flume agent can connect to it.", e);
        }
    }

    public void stop() {
        Iterator<ConsumerAndIterator> it = this.consumers.iterator();
        while (it.hasNext()) {
            try {
                decommissionConsumerAndIterator(it.next());
            } catch (Exception e) {
                LOGGER.warn("Error while shutting down consumer.", e);
            }
        }
        this.producer.close();
        this.counter.stop();
        super.stop();
        LOGGER.info("Kafka channel {} stopped. Metrics: {}", getName(), this.counter);
    }

    protected BasicTransactionSemantics createTransaction() {
        return new KafkaTransaction();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized ConsumerAndIterator createConsumerAndIter() {
        try {
            ConsumerConnector createJavaConsumerConnector = Consumer.createJavaConsumerConnector(new ConsumerConfig(this.kafkaConf));
            ConsumerAndIterator consumerAndIterator = new ConsumerAndIterator(createJavaConsumerConnector, ((KafkaStream) ((List) createJavaConsumerConnector.createMessageStreams(this.topicCountMap).get(this.topic.get())).remove(0)).iterator(), this.channelUUID);
            this.consumers.add(consumerAndIterator);
            LOGGER.info("Created new consumer to connect to Kafka");
            return consumerAndIterator;
        } catch (Exception e) {
            throw new FlumeException("Unable to connect to Kafka", e);
        }
    }

    Properties getKafkaConf() {
        return this.kafkaConf;
    }

    public void configure(Context context) {
        String string = context.getString(KafkaChannelConfiguration.TOPIC);
        if (string == null || string.isEmpty()) {
            string = KafkaChannelConfiguration.DEFAULT_TOPIC;
            LOGGER.info("Topic was not specified. Using " + string + " as the topic.");
        }
        this.topic.set(string);
        String string2 = context.getString(KafkaChannelConfiguration.GROUP_ID_FLUME);
        if (string2 == null || string2.isEmpty()) {
            string2 = KafkaChannelConfiguration.DEFAULT_GROUP_ID;
            LOGGER.info("Group ID was not specified. Using " + string2 + " as the group id.");
        }
        String string3 = context.getString(KafkaChannelConfiguration.BROKER_LIST_FLUME_KEY);
        if (string3 == null || string3.isEmpty()) {
            throw new ConfigurationException("Broker List must be specified");
        }
        String string4 = context.getString(KafkaChannelConfiguration.ZOOKEEPER_CONNECT_FLUME_KEY);
        if (string4 == null || string4.isEmpty()) {
            throw new ConfigurationException("Zookeeper Connection must be specified");
        }
        Long l = context.getLong(KafkaChannelConfiguration.TIMEOUT, Long.valueOf(KafkaChannelConfiguration.DEFAULT_TIMEOUT));
        this.kafkaConf.putAll(context.getSubProperties(KafkaChannelConfiguration.KAFKA_PREFIX));
        this.kafkaConf.put(KafkaChannelConfiguration.GROUP_ID, string2);
        this.kafkaConf.put(KafkaChannelConfiguration.BROKER_LIST_KEY, string3);
        this.kafkaConf.put(KafkaChannelConfiguration.ZOOKEEPER_CONNECT, string4);
        this.kafkaConf.put(KafkaChannelConfiguration.AUTO_COMMIT_ENABLED, String.valueOf(false));
        this.kafkaConf.put(KafkaChannelConfiguration.CONSUMER_TIMEOUT, String.valueOf(l));
        this.kafkaConf.put(KafkaChannelConfiguration.REQUIRED_ACKS_KEY, "-1");
        LOGGER.info(this.kafkaConf.toString());
        this.parseAsFlumeEvent = context.getBoolean(KafkaChannelConfiguration.PARSE_AS_FLUME_EVENT, true).booleanValue();
        boolean booleanValue = context.getBoolean(KafkaChannelConfiguration.READ_SMALLEST_OFFSET, false).booleanValue();
        if (this.parseAsFlumeEvent || booleanValue) {
            this.kafkaConf.put("auto.offset.reset", "smallest");
        }
        if (this.counter == null) {
            this.counter = new KafkaChannelCounter(getName());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void decommissionConsumerAndIterator(ConsumerAndIterator consumerAndIterator) {
        if (consumerAndIterator.failedEvents.isEmpty()) {
            consumerAndIterator.consumer.commitOffsets();
        }
        consumerAndIterator.failedEvents.clear();
        consumerAndIterator.consumer.shutdown();
    }

    @VisibleForTesting
    void registerThread() {
        this.consumerAndIter.get();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Map<CharSequence, CharSequence> toCharSeqMap(Map<String, String> map) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, String> entry : map.entrySet()) {
            hashMap.put(entry.getKey(), entry.getValue());
        }
        return hashMap;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Map<String, String> toStringMap(Map<CharSequence, CharSequence> map) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<CharSequence, CharSequence> entry : map.entrySet()) {
            hashMap.put(entry.getKey().toString(), entry.getValue().toString());
        }
        return hashMap;
    }
}
