/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.trogdor.workload;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.TextNode;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.ThreadUtils;
import org.apache.kafka.trogdor.common.WorkerUtils;
import org.apache.kafka.trogdor.task.TaskWorker;
import org.apache.kafka.trogdor.task.WorkerStatusTracker;
import org.apache.kafka.trogdor.workload.PartitionsSpec;
import org.apache.kafka.trogdor.workload.PayloadGenerator;
import org.apache.kafka.trogdor.workload.RoundTripWorkloadSpec;
import org.apache.kafka.trogdor.workload.SequentialPayloadGenerator;
import org.apache.kafka.trogdor.workload.Throttle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RoundTripWorker
implements TaskWorker {
    private static final int THROTTLE_PERIOD_MS = 100;
    private static final int LOG_INTERVAL_MS = 5000;
    private static final int LOG_NUM_MESSAGES = 10;
    private static final Logger log = LoggerFactory.getLogger(RoundTripWorker.class);
    private static final PayloadGenerator KEY_GENERATOR = new SequentialPayloadGenerator(4, 0L);
    private ToReceiveTracker toReceiveTracker;
    private final String id;
    private final RoundTripWorkloadSpec spec;
    private final AtomicBoolean running = new AtomicBoolean(false);
    private ScheduledExecutorService executor;
    private WorkerStatusTracker status;
    private KafkaFutureImpl<String> doneFuture;
    private KafkaProducer<byte[], byte[]> producer;
    private KafkaConsumer<byte[], byte[]> consumer;
    private CountDownLatch unackedSends;
    private ToSendTracker toSendTracker;

    public RoundTripWorker(String id, RoundTripWorkloadSpec spec) {
        this.id = id;
        this.spec = spec;
    }

    @Override
    public void start(Platform platform, WorkerStatusTracker status, KafkaFutureImpl<String> doneFuture) throws Exception {
        if (!this.running.compareAndSet(false, true)) {
            throw new IllegalStateException("RoundTripWorker is already running.");
        }
        log.info("{}: Activating RoundTripWorker.", (Object)this.id);
        this.executor = Executors.newScheduledThreadPool(3, ThreadUtils.createThreadFactory("RoundTripWorker%d", false));
        this.status = status;
        this.doneFuture = doneFuture;
        this.producer = null;
        this.consumer = null;
        this.unackedSends = new CountDownLatch(this.spec.maxMessages());
        this.executor.submit(new Prepare());
    }

    @Override
    public void stop(Platform platform) throws Exception {
        if (!this.running.compareAndSet(true, false)) {
            throw new IllegalStateException("ProduceBenchWorker is not running.");
        }
        log.info("{}: Deactivating RoundTripWorkloadWorker.", (Object)this.id);
        this.doneFuture.complete((Object)"");
        this.executor.shutdownNow();
        this.executor.awaitTermination(1L, TimeUnit.DAYS);
        Utils.closeQuietly(this.consumer, (String)"consumer");
        Utils.closeQuietly(this.producer, (String)"producer");
        this.consumer = null;
        this.producer = null;
        this.unackedSends = null;
        this.executor = null;
        this.doneFuture = null;
    }

    public static class StatusData {
        private final long totalUniqueSent;
        private final long totalReceived;

        @JsonCreator
        public StatusData(@JsonProperty(value="totalUniqueSent") long totalUniqueSent, @JsonProperty(value="totalReceived") long totalReceived) {
            this.totalUniqueSent = totalUniqueSent;
            this.totalReceived = totalReceived;
        }

        @JsonProperty
        public long totalUniqueSent() {
            return this.totalUniqueSent;
        }

        @JsonProperty
        public long totalReceived() {
            return this.totalReceived;
        }
    }

    public class StatusUpdater
    implements Runnable {
        @Override
        public void run() {
            try {
                this.update();
            }
            catch (Exception e) {
                WorkerUtils.abort(log, "StatusUpdater", e, (KafkaFutureImpl<String>)RoundTripWorker.this.doneFuture);
            }
        }

        StatusData update() {
            StatusData statusData = new StatusData(RoundTripWorker.this.toSendTracker.frontier(), RoundTripWorker.this.toReceiveTracker.totalReceived());
            RoundTripWorker.this.status.update(JsonUtil.JSON_SERDE.valueToTree((Object)statusData));
            return statusData;
        }
    }

    class ConsumerRunnable
    implements Runnable {
        private final Properties props = new Properties();

        ConsumerRunnable(HashSet<TopicPartition> partitions) {
            this.props.put("bootstrap.servers", RoundTripWorker.this.spec.bootstrapServers());
            this.props.put("client.id", "consumer." + RoundTripWorker.this.id);
            this.props.put("group.id", "round-trip-consumer-group-1");
            this.props.put("auto.offset.reset", "earliest");
            this.props.put("request.timeout.ms", (Object)105000);
            this.props.put("max.poll.interval.ms", (Object)100000);
            WorkerUtils.addConfigsToProperties(this.props, RoundTripWorker.this.spec.commonClientConf(), RoundTripWorker.this.spec.consumerConf());
            RoundTripWorker.this.consumer = new KafkaConsumer(this.props, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer());
            RoundTripWorker.this.consumer.assign(partitions);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * WARNING - Removed back jump from a try to a catch block - possible behaviour change.
         * Unable to fully structure code
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        @Override
        public void run() {
            block11: {
                uniqueMessagesReceived = 0L;
                messagesReceived = 0L;
                pollInvoked = 0L;
                RoundTripWorker.access$200().debug("{}: Starting RoundTripWorker#ConsumerRunnable.", (Object)RoundTripWorker.access$800(RoundTripWorker.this));
                try {
                    lastLogTimeMs = Time.SYSTEM.milliseconds();
lbl7:
                    // 3 sources

                    while (true) {
                        try {
                            ++pollInvoked;
                            records = RoundTripWorker.access$1200(RoundTripWorker.this).poll(Duration.ofMillis(50L));
                            for (ConsumerRecord record : records) {
                                messageIndex = ByteBuffer.wrap((byte[])record.key()).order(ByteOrder.LITTLE_ENDIAN).getInt();
                                ++messagesReceived;
                                if (!RoundTripWorker.access$400(RoundTripWorker.this).removePending(messageIndex) || ++uniqueMessagesReceived < (long)RoundTripWorker.access$000(RoundTripWorker.this).maxMessages()) continue;
                                RoundTripWorker.access$200().info("{}: Consumer received the full count of {} unique messages.  Waiting for all sends to be acked...", (Object)RoundTripWorker.access$800(RoundTripWorker.this), (Object)RoundTripWorker.access$000(RoundTripWorker.this).maxMessages());
                                RoundTripWorker.access$1100(RoundTripWorker.this).await();
                                RoundTripWorker.access$200().info("{}: all sends have been acked.", (Object)RoundTripWorker.access$800(RoundTripWorker.this));
                                new StatusUpdater().update();
                                RoundTripWorker.access$700(RoundTripWorker.this).complete((Object)"");
                                break block11;
                            }
                            ** GOTO lbl-1000
                        }
                        catch (WakeupException e) {
                            RoundTripWorker.access$200().debug("{}: Consumer got WakeupException", (Object)RoundTripWorker.access$800(RoundTripWorker.this), (Object)e);
                        }
                        catch (TimeoutException e) {
                            RoundTripWorker.access$200().debug("{}: Consumer got TimeoutException", (Object)RoundTripWorker.access$800(RoundTripWorker.this), (Object)e);
                        }
                        break;
                    }
                }
                catch (Throwable e) {
                    WorkerUtils.abort(RoundTripWorker.access$200(), "ConsumerRunnable", e, (KafkaFutureImpl<String>)RoundTripWorker.access$700(RoundTripWorker.this));
                    RoundTripWorker.access$200().info("{}: ConsumerRunnable is exiting.  Invoked poll {} time(s).  messagesReceived = {}; uniqueMessagesReceived = {}.", new Object[]{RoundTripWorker.access$800(RoundTripWorker.this), pollInvoked, messagesReceived, uniqueMessagesReceived});
                    return;
                }
                catch (Throwable var13_13) {
                    RoundTripWorker.access$200().info("{}: ConsumerRunnable is exiting.  Invoked poll {} time(s).  messagesReceived = {}; uniqueMessagesReceived = {}.", new Object[]{RoundTripWorker.access$800(RoundTripWorker.this), pollInvoked, messagesReceived, uniqueMessagesReceived});
                    throw var13_13;
                }
            }
            RoundTripWorker.access$200().info("{}: ConsumerRunnable is exiting.  Invoked poll {} time(s).  messagesReceived = {}; uniqueMessagesReceived = {}.", new Object[]{RoundTripWorker.access$800(RoundTripWorker.this), pollInvoked, messagesReceived, uniqueMessagesReceived});
            return;
lbl-1000:
            // 1 sources

            {
                curTimeMs = Time.SYSTEM.milliseconds();
                if (curTimeMs <= lastLogTimeMs + 5000L) ** GOTO lbl7
                RoundTripWorker.access$400(RoundTripWorker.this).log();
                lastLogTimeMs = curTimeMs;
                ** continue;
            }
        }
    }

    private class ToReceiveTracker {
        private final TreeSet<Integer> pending = new TreeSet();
        private int totalReceived = 0;

        private ToReceiveTracker() {
        }

        synchronized void addPending(int messageIndex) {
            this.pending.add(messageIndex);
        }

        synchronized boolean removePending(int messageIndex) {
            if (this.pending.remove(messageIndex)) {
                ++this.totalReceived;
                return true;
            }
            return false;
        }

        synchronized int totalReceived() {
            return this.totalReceived;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void log() {
            int numToReceive;
            ArrayList<Integer> list = new ArrayList<Integer>(10);
            ToReceiveTracker toReceiveTracker = this;
            synchronized (toReceiveTracker) {
                numToReceive = this.pending.size();
                Iterator<Integer> iter = this.pending.iterator();
                while (iter.hasNext() && list.size() < 10) {
                    Integer i = iter.next();
                    list.add(i);
                }
            }
            log.info("{}: consumer waiting for {} message(s), starting with: {}", new Object[]{RoundTripWorker.this.id, numToReceive, Utils.join(list, (String)", ")});
        }
    }

    class ProducerRunnable
    implements Runnable {
        private final HashSet<TopicPartition> partitions;
        private final Throttle throttle;

        ProducerRunnable(HashSet<TopicPartition> partitions) {
            this.partitions = partitions;
            Properties props = new Properties();
            props.put("bootstrap.servers", RoundTripWorker.this.spec.bootstrapServers());
            props.put("batch.size", (Object)16384);
            props.put("buffer.memory", (Object)65536L);
            props.put("max.block.ms", (Object)1000L);
            props.put("client.id", "producer." + RoundTripWorker.this.id);
            props.put("acks", "all");
            props.put("request.timeout.ms", (Object)105000);
            WorkerUtils.addConfigsToProperties(props, RoundTripWorker.this.spec.commonClientConf(), RoundTripWorker.this.spec.producerConf());
            RoundTripWorker.this.producer = new KafkaProducer(props, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
            int perPeriod = WorkerUtils.perSecToPerPeriod(RoundTripWorker.this.spec.targetMessagesPerSec(), 100L);
            this.throttle = new Throttle(perPeriod, 100);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            long messagesSent = 0L;
            long uniqueMessagesSent = 0L;
            log.debug("{}: Starting RoundTripWorker#ProducerRunnable.", (Object)RoundTripWorker.this.id);
            try {
                ToSendTrackerResult result;
                Iterator<TopicPartition> iter = this.partitions.iterator();
                while ((result = RoundTripWorker.this.toSendTracker.next()) != null) {
                    this.throttle.increment();
                    final int messageIndex = result.index;
                    if (result.firstSend) {
                        RoundTripWorker.this.toReceiveTracker.addPending(messageIndex);
                        ++uniqueMessagesSent;
                    }
                    ++messagesSent;
                    if (!iter.hasNext()) {
                        iter = this.partitions.iterator();
                    }
                    TopicPartition partition = iter.next();
                    ProducerRecord record = new ProducerRecord(partition.topic(), Integer.valueOf(partition.partition()), (Object)KEY_GENERATOR.generate(messageIndex), (Object)RoundTripWorker.this.spec.valueGenerator().generate(messageIndex));
                    RoundTripWorker.this.producer.send(record, new Callback(){

                        public void onCompletion(RecordMetadata metadata, Exception exception) {
                            if (exception == null) {
                                RoundTripWorker.this.unackedSends.countDown();
                            } else {
                                log.info("{}: Got exception when sending message {}: {}", new Object[]{RoundTripWorker.this.id, messageIndex, exception.getMessage()});
                                RoundTripWorker.this.toSendTracker.addFailed(messageIndex);
                            }
                        }
                    });
                }
            }
            catch (Throwable e) {
                try {
                    WorkerUtils.abort(log, "ProducerRunnable", e, (KafkaFutureImpl<String>)RoundTripWorker.this.doneFuture);
                }
                catch (Throwable throwable) {
                    log.info("{}: ProducerRunnable is exiting.  messagesSent={}; uniqueMessagesSent={}; ackedSends={}.", new Object[]{RoundTripWorker.this.id, messagesSent, uniqueMessagesSent, (long)RoundTripWorker.this.spec.maxMessages() - RoundTripWorker.this.unackedSends.getCount()});
                    throw throwable;
                }
                log.info("{}: ProducerRunnable is exiting.  messagesSent={}; uniqueMessagesSent={}; ackedSends={}.", new Object[]{RoundTripWorker.this.id, messagesSent, uniqueMessagesSent, (long)RoundTripWorker.this.spec.maxMessages() - RoundTripWorker.this.unackedSends.getCount()});
            }
            log.info("{}: ProducerRunnable is exiting.  messagesSent={}; uniqueMessagesSent={}; ackedSends={}.", new Object[]{RoundTripWorker.this.id, messagesSent, uniqueMessagesSent, (long)RoundTripWorker.this.spec.maxMessages() - RoundTripWorker.this.unackedSends.getCount()});
        }
    }

    private static class ToSendTracker {
        private final int maxMessages;
        private final List<Integer> failed = new ArrayList<Integer>();
        private int frontier = 0;

        ToSendTracker(int maxMessages) {
            this.maxMessages = maxMessages;
        }

        synchronized void addFailed(int index) {
            this.failed.add(index);
        }

        synchronized int frontier() {
            return this.frontier;
        }

        synchronized ToSendTrackerResult next() {
            if (this.failed.isEmpty()) {
                if (this.frontier >= this.maxMessages) {
                    return null;
                }
                return new ToSendTrackerResult(this.frontier++, true);
            }
            return new ToSendTrackerResult(this.failed.remove(0), false);
        }
    }

    private static class ToSendTrackerResult {
        final int index;
        final boolean firstSend;

        ToSendTrackerResult(int index, boolean firstSend) {
            this.index = index;
            this.firstSend = firstSend;
        }
    }

    class Prepare
    implements Runnable {
        Prepare() {
        }

        @Override
        public void run() {
            try {
                if (RoundTripWorker.this.spec.targetMessagesPerSec() <= 0) {
                    throw new ConfigException("Can't have targetMessagesPerSec <= 0.");
                }
                HashMap<String, NewTopic> newTopics = new HashMap<String, NewTopic>();
                HashSet<TopicPartition> active = new HashSet<TopicPartition>();
                for (Map.Entry<String, PartitionsSpec> entry : RoundTripWorker.this.spec.activeTopics().materialize().entrySet()) {
                    String topicName = entry.getKey();
                    PartitionsSpec partSpec = entry.getValue();
                    newTopics.put(topicName, partSpec.newTopic(topicName));
                    for (Integer partitionNumber : partSpec.partitionNumbers()) {
                        active.add(new TopicPartition(topicName, partitionNumber.intValue()));
                    }
                }
                if (active.isEmpty()) {
                    throw new RuntimeException("You must specify at least one active topic.");
                }
                RoundTripWorker.this.status.update((JsonNode)new TextNode("Creating " + newTopics.keySet().size() + " topic(s)"));
                WorkerUtils.createTopics(log, RoundTripWorker.this.spec.bootstrapServers(), RoundTripWorker.this.spec.commonClientConf(), RoundTripWorker.this.spec.adminClientConf(), newTopics, true);
                RoundTripWorker.this.status.update((JsonNode)new TextNode("Created " + newTopics.keySet().size() + " topic(s)"));
                RoundTripWorker.this.toSendTracker = new ToSendTracker(RoundTripWorker.this.spec.maxMessages());
                RoundTripWorker.this.toReceiveTracker = new ToReceiveTracker();
                RoundTripWorker.this.executor.submit(new ProducerRunnable(active));
                RoundTripWorker.this.executor.submit(new ConsumerRunnable(active));
                RoundTripWorker.this.executor.submit(new StatusUpdater());
                RoundTripWorker.this.executor.scheduleWithFixedDelay(new StatusUpdater(), 30L, 30L, TimeUnit.SECONDS);
            }
            catch (Throwable e) {
                WorkerUtils.abort(log, "Prepare", e, (KafkaFutureImpl<String>)RoundTripWorker.this.doneFuture);
            }
        }
    }
}

