/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.trogdor.workload;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.ManualMetadataUpdater;
import org.apache.kafka.clients.MetadataUpdater;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.NetworkClientUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.ThreadUtils;
import org.apache.kafka.trogdor.common.WorkerUtils;
import org.apache.kafka.trogdor.task.TaskWorker;
import org.apache.kafka.trogdor.task.WorkerStatusTracker;
import org.apache.kafka.trogdor.workload.ConnectionStressSpec;
import org.apache.kafka.trogdor.workload.Throttle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConnectionStressWorker
implements TaskWorker {
    private static final Logger log = LoggerFactory.getLogger(ConnectionStressWorker.class);
    private static final Time TIME = Time.SYSTEM;
    private static final int THROTTLE_PERIOD_MS = 100;
    private static final int REPORT_INTERVAL_MS = 20000;
    private final String id;
    private final ConnectionStressSpec spec;
    private final AtomicBoolean running = new AtomicBoolean(false);
    private KafkaFutureImpl<String> doneFuture;
    private WorkerStatusTracker status;
    private long totalConnections;
    private long totalFailedConnections;
    private long startTimeMs;
    private Throttle throttle;
    private long nextReportTime;
    private ExecutorService workerExecutor;

    public ConnectionStressWorker(String id, ConnectionStressSpec spec) {
        this.id = id;
        this.spec = spec;
    }

    @Override
    public void start(Platform platform, WorkerStatusTracker status, KafkaFutureImpl<String> doneFuture) throws Exception {
        if (!this.running.compareAndSet(false, true)) {
            throw new IllegalStateException("ConnectionStressWorker is already running.");
        }
        log.info("{}: Activating ConnectionStressWorker with {}", (Object)this.id, (Object)this.spec);
        this.doneFuture = doneFuture;
        this.status = status;
        this.totalConnections = 0L;
        this.totalFailedConnections = 0L;
        this.startTimeMs = TIME.milliseconds();
        this.throttle = new ConnectStressThrottle(WorkerUtils.perSecToPerPeriod(this.spec.targetConnectionsPerSec(), 100L));
        this.nextReportTime = 0L;
        this.workerExecutor = Executors.newFixedThreadPool(this.spec.numThreads(), ThreadUtils.createThreadFactory("ConnectionStressWorkerThread%d", false));
        for (int i = 0; i < this.spec.numThreads(); ++i) {
            this.workerExecutor.submit(new ConnectLoop());
        }
    }

    @Override
    public void stop(Platform platform) throws Exception {
        if (!this.running.compareAndSet(true, false)) {
            throw new IllegalStateException("ConnectionStressWorker is not running.");
        }
        log.info("{}: Deactivating ConnectionStressWorker.", (Object)this.id);
        this.doneFuture.complete((Object)"");
        this.workerExecutor.shutdownNow();
        this.workerExecutor.awaitTermination(1L, TimeUnit.DAYS);
        this.workerExecutor = null;
        this.status = null;
    }

    public static class StatusData {
        private final long totalConnections;
        private final long totalFailedConnections;
        private final double connectsPerSec;

        @JsonCreator
        StatusData(@JsonProperty(value="totalConnections") long totalConnections, @JsonProperty(value="totalFailedConnections") long totalFailedConnections, @JsonProperty(value="connectsPerSec") double connectsPerSec) {
            this.totalConnections = totalConnections;
            this.totalFailedConnections = totalFailedConnections;
            this.connectsPerSec = connectsPerSec;
        }

        @JsonProperty
        public long totalConnections() {
            return this.totalConnections;
        }

        @JsonProperty
        public long totalFailedConnections() {
            return this.totalFailedConnections;
        }

        @JsonProperty
        public double connectsPerSec() {
            return this.connectsPerSec;
        }
    }

    public class ConnectLoop
    implements Runnable {
        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        @Override
        public void run() {
            try {
                Properties props = new Properties();
                props.put("bootstrap.servers", ConnectionStressWorker.this.spec.bootstrapServers());
                WorkerUtils.addConfigsToProperties(props, ConnectionStressWorker.this.spec.commonClientConf(), ConnectionStressWorker.this.spec.commonClientConf());
                AdminClientConfig conf = new AdminClientConfig((Map)props);
                List addresses = ClientUtils.parseAndValidateAddresses((List)conf.getList("bootstrap.servers"), (String)conf.getString("client.dns.lookup"));
                ManualMetadataUpdater updater = new ManualMetadataUpdater(Cluster.bootstrap((List)addresses).nodes());
                while (true) {
                    if (ConnectionStressWorker.this.doneFuture.isDone()) {
                        return;
                    }
                    ConnectionStressWorker.this.throttle.increment();
                    long lastTimeMs = ConnectionStressWorker.this.throttle.lastTimeMs();
                    boolean success = false;
                    switch (ConnectionStressWorker.this.spec.action()) {
                        case CONNECT: {
                            success = this.attemptConnection(conf, updater);
                            break;
                        }
                        case FETCH_METADATA: {
                            success = this.attemptMetadataFetch(props);
                            break;
                        }
                    }
                    ConnectionStressWorker connectionStressWorker = ConnectionStressWorker.this;
                    synchronized (connectionStressWorker) {
                        ConnectionStressWorker.this.totalConnections++;
                        if (!success) {
                            ConnectionStressWorker.this.totalFailedConnections++;
                        }
                        if (lastTimeMs > ConnectionStressWorker.this.nextReportTime) {
                            ConnectionStressWorker.this.status.update(JsonUtil.JSON_SERDE.valueToTree((Object)new StatusData(ConnectionStressWorker.this.totalConnections, ConnectionStressWorker.this.totalFailedConnections, (double)ConnectionStressWorker.this.totalConnections * 1000.0 / (double)(lastTimeMs - ConnectionStressWorker.this.startTimeMs))));
                            ConnectionStressWorker.this.nextReportTime = lastTimeMs + 20000L;
                        }
                    }
                }
            }
            catch (Exception e) {
                WorkerUtils.abort(log, "ConnectionStressRunnable", e, (KafkaFutureImpl<String>)ConnectionStressWorker.this.doneFuture);
            }
        }

        private boolean attemptConnection(AdminClientConfig conf, ManualMetadataUpdater updater) throws Exception {
            try {
                List nodes = updater.fetchNodes();
                Node targetNode = (Node)nodes.get(ThreadLocalRandom.current().nextInt(nodes.size()));
                try (ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder((AbstractConfig)conf, (Time)TIME);
                     Metrics metrics = new Metrics();){
                    LogContext logContext = new LogContext();
                    try (Selector selector = new Selector(conf.getLong("connections.max.idle.ms").longValue(), metrics, TIME, "", channelBuilder, logContext);
                         NetworkClient client = new NetworkClient((Selectable)selector, (MetadataUpdater)updater, "ConnectionStressWorker", 1, 1000L, 1000L, 4096, 4096, 1000, ClientDnsLookup.forConfig((String)conf.getString("client.dns.lookup")), TIME, false, new ApiVersions(), logContext);){
                        NetworkClientUtils.awaitReady((KafkaClient)client, (Node)targetNode, (Time)TIME, (long)100L);
                    }
                }
                return true;
            }
            catch (IOException e) {
                return false;
            }
        }

        private boolean attemptMetadataFetch(Properties conf) {
            try (AdminClient client = AdminClient.create((Properties)conf);){
                client.describeCluster().nodes().get();
            }
            catch (RuntimeException e) {
                return false;
            }
            catch (Exception e) {
                return false;
            }
            return true;
        }
    }

    private static class ConnectStressThrottle
    extends Throttle {
        ConnectStressThrottle(int maxPerPeriod) {
            super(maxPerPeriod, 100);
        }
    }
}

