/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.MemberToRemove;
import org.apache.kafka.clients.admin.RemoveMembersFromConsumerGroupOptions;
import org.apache.kafka.clients.admin.RemoveMembersFromConsumerGroupResult;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.KeyQueryMetadata;
import org.apache.kafka.streams.LagInfo;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetadata;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.StreamsNotStartedException;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.errors.UnknownStateStoreException;
import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.internals.metrics.ClientMetrics;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TaskMetadata;
import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.processor.internals.ClientUtils;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator;
import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.internals.GlobalStateStoreProvider;
import org.apache.kafka.streams.state.internals.QueryableStoreProvider;
import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider;
import org.slf4j.Logger;

public class KafkaStreams
implements AutoCloseable {
    private static final String JMX_PREFIX = "kafka.streams";
    private final Time time;
    private final Logger log;
    private final String clientId;
    private final Metrics metrics;
    private final StreamsConfig config;
    protected final List<StreamThread> threads;
    private final StateDirectory stateDirectory;
    private final StreamsMetadataState streamsMetadataState;
    private final ScheduledExecutorService stateDirCleaner;
    private final ScheduledExecutorService rocksDBMetricsRecordingService;
    private final Admin adminClient;
    private final StreamsMetricsImpl streamsMetrics;
    private final ProcessorTopology taskTopology;
    private final ProcessorTopology globalTaskTopology;
    private final long totalCacheSize;
    private final StreamStateListener streamStateListener;
    private final StateRestoreListener delegatingStateRestoreListener;
    private final Map<Long, StreamThread.State> threadState;
    private final UUID processId;
    private final KafkaClientSupplier clientSupplier;
    private final InternalTopologyBuilder internalTopologyBuilder;
    private final QueryableStoreProvider queryableStoreProvider;
    GlobalStreamThread globalStreamThread;
    private StateListener stateListener;
    private StateRestoreListener globalStateRestoreListener;
    private boolean oldHandler;
    private Consumer<Throwable> streamsUncaughtExceptionHandler;
    private final Object changeThreadCount = new Object();
    private final Object stateLock = new Object();
    protected volatile State state = State.CREATED;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private boolean waitOnState(State targetState, long waitMs) {
        long begin = this.time.milliseconds();
        Object object = this.stateLock;
        synchronized (object) {
            boolean interrupted = false;
            long elapsedMs = 0L;
            try {
                while (this.state != targetState) {
                    if (waitMs <= elapsedMs) {
                        this.log.debug("Cannot transit to {} within {}ms", (Object)targetState, (Object)waitMs);
                        boolean bl = false;
                        return bl;
                    }
                    long remainingMs = waitMs - elapsedMs;
                    try {
                        this.stateLock.wait(remainingMs);
                    }
                    catch (InterruptedException e) {
                        interrupted = true;
                    }
                    elapsedMs = this.time.milliseconds() - begin;
                }
            }
            finally {
                if (interrupted) {
                    Thread.currentThread().interrupt();
                }
            }
            return true;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean setState(State newState) {
        State oldState;
        Object object = this.stateLock;
        synchronized (object) {
            oldState = this.state;
            if (this.state == State.PENDING_SHUTDOWN && newState != State.NOT_RUNNING) {
                return false;
            }
            if (this.state == State.NOT_RUNNING && (newState == State.PENDING_SHUTDOWN || newState == State.NOT_RUNNING)) {
                return false;
            }
            if (this.state == State.REBALANCING && newState == State.REBALANCING) {
                return false;
            }
            if (this.state == State.ERROR && (newState == State.PENDING_ERROR || newState == State.ERROR)) {
                return false;
            }
            if (this.state == State.PENDING_ERROR && newState != State.ERROR) {
                return false;
            }
            if (!this.state.isValidTransition(newState)) {
                throw new IllegalStateException("Stream-client " + this.clientId + ": Unexpected state transition from " + (Object)((Object)oldState) + " to " + (Object)((Object)newState));
            }
            this.log.info("State transition from {} to {}", (Object)oldState, (Object)newState);
            this.state = newState;
            this.stateLock.notifyAll();
        }
        if (this.stateListener != null) {
            this.stateListener.onChange(newState, oldState);
        }
        return true;
    }

    public State state() {
        return this.state;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean isRunningOrRebalancing() {
        Object object = this.stateLock;
        synchronized (object) {
            return this.state.isRunningOrRebalancing();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void validateIsRunningOrRebalancing() {
        Object object = this.stateLock;
        synchronized (object) {
            if (this.state == State.CREATED) {
                throw new StreamsNotStartedException("KafkaStreams has not been started, you can retry after calling start()");
            }
            if (!this.isRunningOrRebalancing()) {
                throw new IllegalStateException("KafkaStreams is not running. State is " + (Object)((Object)this.state) + ".");
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setStateListener(StateListener listener) {
        Object object = this.stateLock;
        synchronized (object) {
            if (this.state != State.CREATED) {
                throw new IllegalStateException("Can only set StateListener in CREATED state. Current state is: " + (Object)((Object)this.state));
            }
            this.stateListener = listener;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Deprecated
    public void setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
        Object object = this.stateLock;
        synchronized (object) {
            if (this.state == State.CREATED) {
                this.oldHandler = true;
                this.processStreamThread(thread -> thread.setUncaughtExceptionHandler(uncaughtExceptionHandler));
                if (this.globalStreamThread != null) {
                    this.globalStreamThread.setUncaughtExceptionHandler(uncaughtExceptionHandler);
                }
            } else {
                throw new IllegalStateException("Can only set UncaughtExceptionHandler in CREATED state. Current state is: " + (Object)((Object)this.state));
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
        Consumer<Throwable> handler = exception -> this.handleStreamsUncaughtException((Throwable)exception, streamsUncaughtExceptionHandler);
        Object object = this.stateLock;
        synchronized (object) {
            if (this.state == State.CREATED) {
                this.streamsUncaughtExceptionHandler = handler;
                Objects.requireNonNull(streamsUncaughtExceptionHandler);
                this.processStreamThread(thread -> thread.setStreamsUncaughtExceptionHandler(handler));
                if (this.globalStreamThread != null) {
                    this.globalStreamThread.setUncaughtExceptionHandler(handler);
                }
            } else {
                throw new IllegalStateException("Can only set UncaughtExceptionHandler in CREATED state. Current state is: " + (Object)((Object)this.state));
            }
        }
    }

    private void defaultStreamsUncaughtExceptionHandler(Throwable throwable) {
        if (this.oldHandler) {
            this.threads.remove(Thread.currentThread());
            if (throwable instanceof RuntimeException) {
                throw (RuntimeException)throwable;
            }
            if (throwable instanceof Error) {
                throw (Error)throwable;
            }
            throw new RuntimeException("Unexpected checked exception caught in the uncaught exception handler", throwable);
        }
        this.handleStreamsUncaughtException(throwable, t -> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT);
    }

    private void replaceStreamThread(Throwable throwable) {
        if (this.globalStreamThread != null && Thread.currentThread().getName().equals(this.globalStreamThread.getName())) {
            this.log.warn("The global thread cannot be replaced. Reverting to shutting down the client.");
            this.log.error("Encountered the following exception during processing  The streams client is going to shut down now. ", throwable);
            this.closeToError();
        }
        StreamThread deadThread = (StreamThread)Thread.currentThread();
        deadThread.shutdown();
        this.addStreamThread();
        if (throwable instanceof RuntimeException) {
            throw (RuntimeException)throwable;
        }
        if (throwable instanceof Error) {
            throw (Error)throwable;
        }
        throw new RuntimeException("Unexpected checked exception caught in the uncaught exception handler", throwable);
    }

    private void handleStreamsUncaughtException(Throwable throwable, StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
        StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action = streamsUncaughtExceptionHandler.handle(throwable);
        if (this.oldHandler) {
            this.log.warn("Stream's new uncaught exception handler is set as well as the deprecated old handler.The old handler will be ignored as long as a new handler is set.");
        }
        switch (action) {
            case REPLACE_THREAD: {
                this.replaceStreamThread(throwable);
                break;
            }
            case SHUTDOWN_CLIENT: {
                this.log.error("Encountered the following exception during processing and the registered exception handler opted to " + (Object)((Object)action) + ". The streams client is going to shut down now. ", throwable);
                this.closeToError();
                break;
            }
            case SHUTDOWN_APPLICATION: {
                if (this.getNumLiveStreamThreads() == 1) {
                    this.log.warn("Attempt to shut down the application requires adding a thread to communicate the shutdown. No processing will be done on this thread");
                    this.addStreamThread();
                }
                if (throwable instanceof Error) {
                    this.log.error("This option requires running threads to shut down the application.but the uncaught exception was an Error, which means this runtime is no longer in a well-defined state. Attempting to send the shutdown command anyway.", throwable);
                }
                if (Thread.currentThread().equals(this.globalStreamThread) && this.getNumLiveStreamThreads() == 0) {
                    this.log.error("Exception in global thread caused the application to attempt to shutdown. This action will succeed only if there is at least one StreamThread running on this client. Currently there are no running threads so will now close the client.");
                    this.closeToError();
                    break;
                }
                this.processStreamThread(thread -> thread.sendShutdownRequest(AssignorError.SHUTDOWN_REQUESTED));
                this.log.error("Encountered the following exception during processing and sent shutdown request for the entire application.", throwable);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setGlobalStateRestoreListener(StateRestoreListener globalStateRestoreListener) {
        Object object = this.stateLock;
        synchronized (object) {
            if (this.state != State.CREATED) {
                throw new IllegalStateException("Can only set GlobalStateRestoreListener in CREATED state. Current state is: " + (Object)((Object)this.state));
            }
            this.globalStateRestoreListener = globalStateRestoreListener;
        }
    }

    public Map<MetricName, ? extends Metric> metrics() {
        LinkedHashMap<MetricName, Metric> result = new LinkedHashMap<MetricName, Metric>();
        this.processStreamThread(thread -> {
            result.putAll(thread.producerMetrics());
            result.putAll(thread.consumerMetrics());
            result.putAll(thread.adminClientMetrics());
        });
        if (this.globalStreamThread != null) {
            result.putAll(this.globalStreamThread.consumerMetrics());
        }
        result.putAll(this.metrics.metrics());
        return Collections.unmodifiableMap(result);
    }

    public KafkaStreams(Topology topology, Properties props) {
        this(topology.internalTopologyBuilder, new StreamsConfig(props), (KafkaClientSupplier)new DefaultKafkaClientSupplier());
    }

    public KafkaStreams(Topology topology, Properties props, KafkaClientSupplier clientSupplier) {
        this(topology.internalTopologyBuilder, new StreamsConfig(props), clientSupplier, Time.SYSTEM);
    }

    public KafkaStreams(Topology topology, Properties props, Time time) {
        this(topology.internalTopologyBuilder, new StreamsConfig(props), (KafkaClientSupplier)new DefaultKafkaClientSupplier(), time);
    }

    public KafkaStreams(Topology topology, Properties props, KafkaClientSupplier clientSupplier, Time time) {
        this(topology.internalTopologyBuilder, new StreamsConfig(props), clientSupplier, time);
    }

    public KafkaStreams(Topology topology, StreamsConfig config) {
        this(topology, config, (KafkaClientSupplier)new DefaultKafkaClientSupplier());
    }

    public KafkaStreams(Topology topology, StreamsConfig config, KafkaClientSupplier clientSupplier) {
        this(topology.internalTopologyBuilder, config, clientSupplier);
    }

    public KafkaStreams(Topology topology, StreamsConfig config, Time time) {
        this(topology.internalTopologyBuilder, config, (KafkaClientSupplier)new DefaultKafkaClientSupplier(), time);
    }

    private KafkaStreams(InternalTopologyBuilder internalTopologyBuilder, StreamsConfig config, KafkaClientSupplier clientSupplier) throws StreamsException {
        this(internalTopologyBuilder, config, clientSupplier, Time.SYSTEM);
    }

    private KafkaStreams(InternalTopologyBuilder internalTopologyBuilder, StreamsConfig config, KafkaClientSupplier clientSupplier, Time time) throws StreamsException {
        this.config = config;
        this.time = time;
        this.internalTopologyBuilder = internalTopologyBuilder;
        internalTopologyBuilder.rewriteTopology(config);
        this.taskTopology = internalTopologyBuilder.buildTopology();
        this.globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology();
        boolean hasGlobalTopology = this.globalTaskTopology != null;
        boolean hasPersistentStores = this.taskTopology.hasPersistentLocalStore() || hasGlobalTopology && this.globalTaskTopology.hasPersistentGlobalStore();
        try {
            this.stateDirectory = new StateDirectory(config, time, hasPersistentStores, internalTopologyBuilder.hasNamedTopologies());
            this.processId = this.stateDirectory.initializeProcessId();
        }
        catch (ProcessorStateException fatal) {
            throw new StreamsException((Throwable)((Object)fatal));
        }
        String userClientId = config.getString("client.id");
        String applicationId = config.getString("application.id");
        this.clientId = userClientId.length() <= 0 ? applicationId + "-" + this.processId : userClientId;
        LogContext logContext = new LogContext(String.format("stream-client [%s] ", this.clientId));
        this.log = logContext.logger(this.getClass());
        this.clientSupplier = clientSupplier;
        this.adminClient = clientSupplier.getAdmin(config.getAdminConfigs(ClientUtils.getSharedAdminClientId(this.clientId)));
        this.log.info("Kafka Streams version: {}", (Object)ClientMetrics.version());
        this.log.info("Kafka Streams commit ID: {}", (Object)ClientMetrics.commitId());
        this.metrics = KafkaStreams.getMetrics(config, time, this.clientId);
        this.streamsMetrics = new StreamsMetricsImpl(this.metrics, this.clientId, config.getString("built.in.metrics.version"), time);
        ClientMetrics.addVersionMetric(this.streamsMetrics);
        ClientMetrics.addCommitIdMetric(this.streamsMetrics);
        ClientMetrics.addApplicationIdMetric(this.streamsMetrics, config.getString("application.id"));
        ClientMetrics.addTopologyDescriptionMetric(this.streamsMetrics, internalTopologyBuilder.describe().toString());
        ClientMetrics.addStateMetric(this.streamsMetrics, (Gauge<State>)((Gauge)(metricsConfig, now) -> this.state));
        this.threads = Collections.synchronizedList(new LinkedList());
        ClientMetrics.addNumAliveStreamThreadMetric(this.streamsMetrics, (Gauge<Integer>)((Gauge)(metricsConfig, now) -> this.getNumLiveStreamThreads()));
        this.streamsMetadataState = new StreamsMetadataState(internalTopologyBuilder, KafkaStreams.parseHostInfo(config.getString("application.server")));
        this.oldHandler = false;
        this.streamsUncaughtExceptionHandler = this::defaultStreamsUncaughtExceptionHandler;
        this.delegatingStateRestoreListener = new DelegatingStateRestoreListener();
        this.totalCacheSize = config.getLong("cache.max.bytes.buffering");
        int numStreamThreads = this.getNumStreamThreads(hasGlobalTopology);
        long cacheSizePerThread = this.getCacheSizePerThread(numStreamThreads);
        GlobalStreamThread.State globalThreadState = null;
        if (hasGlobalTopology) {
            String globalThreadId = this.clientId + "-GlobalStreamThread";
            this.globalStreamThread = new GlobalStreamThread(this.globalTaskTopology, config, clientSupplier.getGlobalConsumer(config.getGlobalConsumerConfigs(this.clientId)), this.stateDirectory, cacheSizePerThread, this.streamsMetrics, time, globalThreadId, this.delegatingStateRestoreListener, this.streamsUncaughtExceptionHandler);
            globalThreadState = this.globalStreamThread.state();
        }
        this.threadState = new HashMap<Long, StreamThread.State>(numStreamThreads);
        this.streamStateListener = new StreamStateListener(this.threadState, globalThreadState);
        GlobalStateStoreProvider globalStateStoreProvider = new GlobalStateStoreProvider(internalTopologyBuilder.globalStateStores());
        if (hasGlobalTopology) {
            this.globalStreamThread.setStateListener(this.streamStateListener);
        }
        this.queryableStoreProvider = new QueryableStoreProvider(globalStateStoreProvider);
        for (int i = 1; i <= numStreamThreads; ++i) {
            this.createAndAddStreamThread(cacheSizePerThread, i);
        }
        this.stateDirCleaner = this.setupStateDirCleaner();
        this.rocksDBMetricsRecordingService = KafkaStreams.maybeCreateRocksDBMetricsRecordingService(this.clientId, config);
    }

    private StreamThread createAndAddStreamThread(long cacheSizePerThread, int threadIdx) {
        StreamThread streamThread = StreamThread.create(this.internalTopologyBuilder, this.config, this.clientSupplier, this.adminClient, this.processId, this.clientId, this.streamsMetrics, this.time, this.streamsMetadataState, cacheSizePerThread, this.stateDirectory, this.delegatingStateRestoreListener, threadIdx, this::closeToError, this.streamsUncaughtExceptionHandler);
        streamThread.setStateListener(this.streamStateListener);
        this.threads.add(streamThread);
        this.threadState.put(streamThread.getId(), streamThread.state());
        this.queryableStoreProvider.addStoreProviderForThread(streamThread.getName(), new StreamThreadStateStoreProvider(streamThread));
        return streamThread;
    }

    private static Metrics getMetrics(StreamsConfig config, Time time, String clientId) {
        MetricConfig metricConfig = new MetricConfig().samples(config.getInt("metrics.num.samples").intValue()).recordLevel(Sensor.RecordingLevel.forName((String)config.getString("metrics.recording.level"))).timeWindow(config.getLong("metrics.sample.window.ms").longValue(), TimeUnit.MILLISECONDS);
        List reporters = config.getConfiguredInstances("metric.reporters", MetricsReporter.class, Collections.singletonMap("client.id", clientId));
        JmxReporter jmxReporter = new JmxReporter();
        jmxReporter.configure(config.originals());
        reporters.add(jmxReporter);
        KafkaMetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX, config.originalsWithPrefix("metrics.context."));
        return new Metrics(metricConfig, reporters, time, (MetricsContext)metricsContext);
    }

    private int getNumStreamThreads(boolean hasGlobalTopology) {
        int numStreamThreads;
        if (this.internalTopologyBuilder.hasNoNonGlobalTopology()) {
            this.log.info("Overriding number of StreamThreads to zero for global-only topology");
            numStreamThreads = 0;
        } else {
            numStreamThreads = this.config.getInt("num.stream.threads");
        }
        if (numStreamThreads == 0 && !hasGlobalTopology) {
            this.log.error("Topology with no input topics will create no stream threads and no global thread.");
            throw new TopologyException("Topology has no stream threads and no global threads, must subscribe to at least one source topic or global table.");
        }
        return numStreamThreads;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Optional<String> addStreamThread() {
        if (this.isRunningOrRebalancing()) {
            StreamThread streamThread;
            Object object = this.changeThreadCount;
            synchronized (object) {
                int threadIdx = this.getNextThreadIndex();
                int numLiveThreads = this.getNumLiveStreamThreads();
                long cacheSizePerThread = this.getCacheSizePerThread(numLiveThreads + 1);
                this.log.info("Adding StreamThread-{}, there will now be {} live threads and the new cache size per thread is {}", new Object[]{threadIdx, numLiveThreads + 1, cacheSizePerThread});
                this.resizeThreadCache(cacheSizePerThread);
                streamThread = this.createAndAddStreamThread(cacheSizePerThread, threadIdx);
            }
            object = this.stateLock;
            synchronized (object) {
                if (this.isRunningOrRebalancing()) {
                    streamThread.start();
                    return Optional.of(streamThread.getName());
                }
                this.log.warn("Terminating the new thread because the Kafka Streams client is in state {}", (Object)this.state);
                streamThread.shutdown();
                this.threads.remove(streamThread);
                long cacheSizePerThread = this.getCacheSizePerThread(this.getNumLiveStreamThreads());
                this.log.info("Resizing thread cache due to terminating added thread, new cache size per thread is {}", (Object)cacheSizePerThread);
                this.resizeThreadCache(cacheSizePerThread);
                return Optional.empty();
            }
        }
        this.log.warn("Cannot add a stream thread when Kafka Streams client is in state {}", (Object)this.state);
        return Optional.empty();
    }

    public Optional<String> removeStreamThread() {
        return this.removeStreamThread(Long.MAX_VALUE);
    }

    public Optional<String> removeStreamThread(Duration timeout) {
        String msgPrefix = ApiUtils.prepareMillisCheckFailMsgPrefix(timeout, "timeout");
        long timeoutMs = ApiUtils.validateMillisecondDuration(timeout, msgPrefix);
        return this.removeStreamThread(timeoutMs);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Optional<String> removeStreamThread(long timeoutMs) throws org.apache.kafka.common.errors.TimeoutException {
        long startMs = this.time.milliseconds();
        if (this.isRunningOrRebalancing()) {
            Object object = this.changeThreadCount;
            synchronized (object) {
                for (StreamThread streamThread : new ArrayList<StreamThread>(this.threads)) {
                    long remainingTimeMs;
                    boolean callingThreadIsNotCurrentStreamThread;
                    boolean bl = callingThreadIsNotCurrentStreamThread = !streamThread.getName().equals(Thread.currentThread().getName());
                    if (!streamThread.isAlive() || !callingThreadIsNotCurrentStreamThread && this.getNumLiveStreamThreads() != 1) continue;
                    this.log.info("Removing StreamThread " + streamThread.getName());
                    Optional<String> groupInstanceID = streamThread.getGroupInstanceID();
                    streamThread.requestLeaveGroupDuringShutdown();
                    streamThread.shutdown();
                    if (!streamThread.getName().equals(Thread.currentThread().getName())) {
                        long remainingTimeMs2 = timeoutMs - (this.time.milliseconds() - startMs);
                        if (remainingTimeMs2 <= 0L || !streamThread.waitOnThreadState(StreamThread.State.DEAD, remainingTimeMs2)) {
                            this.log.warn("{} did not shutdown in the allotted time.", (Object)streamThread.getName());
                        } else {
                            this.log.info("Successfully removed {} in {}ms", (Object)streamThread.getName(), (Object)(this.time.milliseconds() - startMs));
                            this.threads.remove(streamThread);
                            this.queryableStoreProvider.removeStoreProviderForThread(streamThread.getName());
                        }
                    } else {
                        this.log.info("{} is the last remaining thread and must remove itself, therefore we cannot wait for it to complete shutdown as this will result in deadlock.", (Object)streamThread.getName());
                    }
                    long cacheSizePerThread = this.getCacheSizePerThread(this.getNumLiveStreamThreads());
                    this.log.info("Resizing thread cache due to thread removal, new cache size per thread is {}", (Object)cacheSizePerThread);
                    this.resizeThreadCache(cacheSizePerThread);
                    if (groupInstanceID.isPresent() && callingThreadIsNotCurrentStreamThread) {
                        MemberToRemove memberToRemove = new MemberToRemove(groupInstanceID.get());
                        List<MemberToRemove> membersToRemove = Collections.singletonList(memberToRemove);
                        RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroupResult = this.adminClient.removeMembersFromConsumerGroup(this.config.getString("application.id"), new RemoveMembersFromConsumerGroupOptions(membersToRemove));
                        try {
                            long remainingTimeMs3 = timeoutMs - (this.time.milliseconds() - startMs);
                            removeMembersFromConsumerGroupResult.memberResult(memberToRemove).get(remainingTimeMs3, TimeUnit.MILLISECONDS);
                        }
                        catch (TimeoutException e) {
                            this.log.error("Could not remove static member {} from consumer group {} due to a timeout: {}", new Object[]{groupInstanceID.get(), this.config.getString("application.id"), e});
                            throw new org.apache.kafka.common.errors.TimeoutException(e.getMessage(), (Throwable)e);
                        }
                        catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                        catch (ExecutionException e) {
                            this.log.error("Could not remove static member {} from consumer group {} due to: {}", new Object[]{groupInstanceID.get(), this.config.getString("application.id"), e});
                            throw new StreamsException("Could not remove static member " + groupInstanceID.get() + " from consumer group " + this.config.getString("application.id") + " for the following reason: ", e.getCause());
                        }
                    }
                    if ((remainingTimeMs = timeoutMs - (this.time.milliseconds() - startMs)) <= 0L) {
                        throw new org.apache.kafka.common.errors.TimeoutException("Thread " + streamThread.getName() + " did not stop in the allotted time");
                    }
                    return Optional.of(streamThread.getName());
                }
            }
            this.log.warn("There are no threads eligible for removal");
        } else {
            this.log.warn("Cannot remove a stream thread when Kafka Streams client is in state  " + (Object)((Object)this.state()));
        }
        return Optional.empty();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int getNumLiveStreamThreads() {
        AtomicInteger numLiveThreads = new AtomicInteger(0);
        List<StreamThread> list = this.threads;
        synchronized (list) {
            this.processStreamThread(thread -> {
                if (thread.state() == StreamThread.State.DEAD) {
                    this.log.debug("Trimming thread {} from the threads list since it's state is {}", (Object)thread.getName(), (Object)StreamThread.State.DEAD);
                    this.threads.remove(thread);
                } else if (thread.state() == StreamThread.State.PENDING_SHUTDOWN) {
                    this.log.debug("Skipping thread {} from num live threads computation since it's state is {}", (Object)thread.getName(), (Object)StreamThread.State.PENDING_SHUTDOWN);
                } else {
                    numLiveThreads.incrementAndGet();
                }
            });
            return numLiveThreads.get();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int getNextThreadIndex() {
        HashSet allLiveThreadNames = new HashSet();
        AtomicInteger maxThreadId = new AtomicInteger(1);
        List<StreamThread> list = this.threads;
        synchronized (list) {
            this.processStreamThread(thread -> {
                if (thread.state() == StreamThread.State.DEAD) {
                    this.threads.remove(thread);
                } else {
                    allLiveThreadNames.add(thread.getName());
                    int threadId = Integer.parseInt(thread.getName().substring(thread.getName().lastIndexOf("-") + 1));
                    if (threadId > maxThreadId.get()) {
                        maxThreadId.set(threadId);
                    }
                }
            });
            String baseName = this.clientId + "-StreamThread-";
            for (int i = 1; i <= maxThreadId.get(); ++i) {
                String name = baseName + i;
                if (allLiveThreadNames.contains(name)) continue;
                return i;
            }
            return this.threads.size() + 1;
        }
    }

    private long getCacheSizePerThread(int numStreamThreads) {
        if (numStreamThreads == 0) {
            return this.totalCacheSize;
        }
        return this.totalCacheSize / (long)(numStreamThreads + (this.globalTaskTopology != null ? 1 : 0));
    }

    private void resizeThreadCache(long cacheSizePerThread) {
        this.processStreamThread(thread -> thread.resizeCache(cacheSizePerThread));
        if (this.globalStreamThread != null) {
            this.globalStreamThread.resize(cacheSizePerThread);
        }
    }

    private ScheduledExecutorService setupStateDirCleaner() {
        return Executors.newSingleThreadScheduledExecutor(r -> {
            Thread thread = new Thread(r, this.clientId + "-CleanupThread");
            thread.setDaemon(true);
            return thread;
        });
    }

    private static ScheduledExecutorService maybeCreateRocksDBMetricsRecordingService(String clientId, StreamsConfig config) {
        if (Sensor.RecordingLevel.forName((String)config.getString("metrics.recording.level")) == Sensor.RecordingLevel.DEBUG) {
            return Executors.newSingleThreadScheduledExecutor(r -> {
                Thread thread = new Thread(r, clientId + "-RocksDBMetricsRecordingTrigger");
                thread.setDaemon(true);
                return thread;
            });
        }
        return null;
    }

    private static HostInfo parseHostInfo(String endPoint) {
        HostInfo hostInfo = HostInfo.buildFromEndpoint(endPoint);
        if (hostInfo == null) {
            return StreamsMetadataState.UNKNOWN_HOST;
        }
        return hostInfo;
    }

    public synchronized void start() throws IllegalStateException, StreamsException {
        if (this.setState(State.REBALANCING)) {
            this.log.debug("Starting Streams client");
            if (this.globalStreamThread != null) {
                this.globalStreamThread.start();
            }
            this.processStreamThread(Thread::start);
            Long cleanupDelay = this.config.getLong("state.cleanup.delay.ms");
            this.stateDirCleaner.scheduleAtFixedRate(() -> {
                if (this.state == State.RUNNING) {
                    this.stateDirectory.cleanRemovedTasks(cleanupDelay);
                }
            }, cleanupDelay, cleanupDelay, TimeUnit.MILLISECONDS);
            long recordingDelay = 0L;
            long recordingInterval = 1L;
            if (this.rocksDBMetricsRecordingService != null) {
                this.rocksDBMetricsRecordingService.scheduleAtFixedRate(this.streamsMetrics.rocksDBMetricsRecordingTrigger(), 0L, 1L, TimeUnit.MINUTES);
            }
        } else {
            throw new IllegalStateException("The client is either already started or already stopped, cannot re-start");
        }
    }

    @Override
    public void close() {
        this.close(Long.MAX_VALUE);
    }

    private Thread shutdownHelper(boolean error) {
        this.stateDirCleaner.shutdownNow();
        if (this.rocksDBMetricsRecordingService != null) {
            this.rocksDBMetricsRecordingService.shutdownNow();
        }
        return new Thread(() -> {
            this.processStreamThread(StreamThread::shutdown);
            this.processStreamThread(thread -> {
                try {
                    if (!thread.isRunning()) {
                        thread.join();
                    }
                }
                catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                }
            });
            if (this.globalStreamThread != null) {
                this.globalStreamThread.shutdown();
            }
            if (this.globalStreamThread != null && !this.globalStreamThread.stillRunning()) {
                try {
                    this.globalStreamThread.join();
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                this.globalStreamThread = null;
            }
            this.stateDirectory.close();
            this.adminClient.close();
            this.streamsMetrics.removeAllClientLevelSensorsAndMetrics();
            this.metrics.close();
            if (!error) {
                this.setState(State.NOT_RUNNING);
            } else {
                this.setState(State.ERROR);
            }
        }, "kafka-streams-close-thread");
    }

    private boolean close(long timeoutMs) {
        if (this.state == State.ERROR || this.state == State.NOT_RUNNING) {
            this.log.info("Streams client is already in the terminal {} state, all resources are closed and the client has stopped.", (Object)this.state);
            return true;
        }
        if (this.state == State.PENDING_ERROR || this.state == State.PENDING_SHUTDOWN) {
            this.log.info("Streams client is in {}, all resources are being closed and the client will be stopped.", (Object)this.state);
            if (this.state == State.PENDING_ERROR && this.waitOnState(State.ERROR, timeoutMs)) {
                this.log.info("Streams client stopped to ERROR completely");
                return true;
            }
            if (this.state == State.PENDING_SHUTDOWN && this.waitOnState(State.NOT_RUNNING, timeoutMs)) {
                this.log.info("Streams client stopped to NOT_RUNNING completely");
                return true;
            }
            this.log.warn("Streams client cannot transition to {}} completely within the timeout", (Object)(this.state == State.PENDING_SHUTDOWN ? State.NOT_RUNNING : State.ERROR));
            return false;
        }
        if (!this.setState(State.PENDING_SHUTDOWN)) {
            this.log.error("Failed to transition to PENDING_SHUTDOWN, current state is {}", (Object)this.state);
            throw new StreamsException("Failed to shut down while in state " + (Object)((Object)this.state));
        }
        Thread shutdownThread = this.shutdownHelper(false);
        shutdownThread.setDaemon(true);
        shutdownThread.start();
        if (this.waitOnState(State.NOT_RUNNING, timeoutMs)) {
            this.log.info("Streams client stopped completely");
            return true;
        }
        this.log.info("Streams client cannot stop completely within the timeout");
        return false;
    }

    private void closeToError() {
        if (!this.setState(State.PENDING_ERROR)) {
            this.log.info("Skipping shutdown since we are already in " + (Object)((Object)this.state()));
        } else {
            Thread shutdownThread = this.shutdownHelper(true);
            shutdownThread.setDaemon(true);
            shutdownThread.start();
        }
    }

    public synchronized boolean close(Duration timeout) throws IllegalArgumentException {
        String msgPrefix = ApiUtils.prepareMillisCheckFailMsgPrefix(timeout, "timeout");
        long timeoutMs = ApiUtils.validateMillisecondDuration(timeout, msgPrefix);
        if (timeoutMs < 0L) {
            throw new IllegalArgumentException("Timeout can't be negative.");
        }
        this.log.debug("Stopping Streams client with timeoutMillis = {} ms.", (Object)timeoutMs);
        return this.close(timeoutMs);
    }

    public void cleanUp() {
        if (this.state != State.CREATED && this.state != State.NOT_RUNNING && this.state != State.ERROR) {
            throw new IllegalStateException("Cannot clean up while running.");
        }
        this.stateDirectory.clean();
    }

    @Deprecated
    public Collection<org.apache.kafka.streams.state.StreamsMetadata> allMetadata() {
        this.validateIsRunningOrRebalancing();
        return this.streamsMetadataState.getAllMetadata().stream().map(streamsMetadata -> new org.apache.kafka.streams.state.StreamsMetadata(streamsMetadata.hostInfo(), streamsMetadata.stateStoreNames(), streamsMetadata.topicPartitions(), streamsMetadata.standbyStateStoreNames(), streamsMetadata.standbyTopicPartitions())).collect(Collectors.toSet());
    }

    public Collection<StreamsMetadata> metadataForAllStreamsClients() {
        this.validateIsRunningOrRebalancing();
        return this.streamsMetadataState.getAllMetadata();
    }

    @Deprecated
    public Collection<org.apache.kafka.streams.state.StreamsMetadata> allMetadataForStore(String storeName) {
        this.validateIsRunningOrRebalancing();
        return this.streamsMetadataState.getAllMetadataForStore(storeName).stream().map(streamsMetadata -> new org.apache.kafka.streams.state.StreamsMetadata(streamsMetadata.hostInfo(), streamsMetadata.stateStoreNames(), streamsMetadata.topicPartitions(), streamsMetadata.standbyStateStoreNames(), streamsMetadata.standbyTopicPartitions())).collect(Collectors.toSet());
    }

    public Collection<StreamsMetadata> streamsMetadataForStore(String storeName) {
        this.validateIsRunningOrRebalancing();
        return this.streamsMetadataState.getAllMetadataForStore(storeName);
    }

    public <K> KeyQueryMetadata queryMetadataForKey(String storeName, K key, Serializer<K> keySerializer) {
        this.validateIsRunningOrRebalancing();
        return this.streamsMetadataState.getKeyQueryMetadataForKey(storeName, key, keySerializer);
    }

    public <K> KeyQueryMetadata queryMetadataForKey(String storeName, K key, StreamPartitioner<? super K, ?> partitioner) {
        this.validateIsRunningOrRebalancing();
        return this.streamsMetadataState.getKeyQueryMetadataForKey(storeName, key, partitioner);
    }

    public <T> T store(StoreQueryParameters<T> storeQueryParameters) {
        this.validateIsRunningOrRebalancing();
        String storeName = storeQueryParameters.storeName();
        if (!(this.taskTopology != null && this.taskTopology.hasStore(storeName) || this.globalTaskTopology != null && this.globalTaskTopology.hasStore(storeName))) {
            throw new UnknownStateStoreException("Cannot get state store " + storeName + " because no such store is registered in the topology.");
        }
        return this.queryableStoreProvider.getStore(storeQueryParameters);
    }

    private void processStreamThread(Consumer<StreamThread> consumer) {
        ArrayList<StreamThread> copy = new ArrayList<StreamThread>(this.threads);
        for (StreamThread thread : copy) {
            consumer.accept(thread);
        }
    }

    @Deprecated
    public Set<ThreadMetadata> localThreadsMetadata() {
        return this.metadataForLocalThreads().stream().map(threadMetadata -> new ThreadMetadata(threadMetadata.threadName(), threadMetadata.threadState(), threadMetadata.consumerClientId(), threadMetadata.restoreConsumerClientId(), threadMetadata.producerClientIds(), threadMetadata.adminClientId(), threadMetadata.activeTasks().stream().map(taskMetadata -> new TaskMetadata(taskMetadata.taskId().toString(), taskMetadata.topicPartitions(), taskMetadata.committedOffsets(), taskMetadata.endOffsets(), taskMetadata.timeCurrentIdlingStarted())).collect(Collectors.toSet()), threadMetadata.standbyTasks().stream().map(taskMetadata -> new TaskMetadata(taskMetadata.taskId().toString(), taskMetadata.topicPartitions(), taskMetadata.committedOffsets(), taskMetadata.endOffsets(), taskMetadata.timeCurrentIdlingStarted())).collect(Collectors.toSet()))).collect(Collectors.toSet());
    }

    public Set<org.apache.kafka.streams.ThreadMetadata> metadataForLocalThreads() {
        HashSet<org.apache.kafka.streams.ThreadMetadata> threadMetadata = new HashSet<org.apache.kafka.streams.ThreadMetadata>();
        this.processStreamThread(thread -> {
            Object object = thread.getStateLock();
            synchronized (object) {
                if (thread.state() != StreamThread.State.DEAD) {
                    threadMetadata.add(thread.threadMetadata());
                }
            }
        });
        return threadMetadata;
    }

    public Map<String, Map<Integer, LagInfo>> allLocalStorePartitionLags() {
        TreeMap<String, Map> localStorePartitionLags = new TreeMap<String, Map>();
        LinkedList<TopicPartition> allPartitions = new LinkedList<TopicPartition>();
        HashMap allChangelogPositions = new HashMap();
        this.processStreamThread(thread -> {
            for (Task task : thread.allTasks().values()) {
                allPartitions.addAll(task.changelogPartitions());
                allChangelogPositions.putAll(task.changelogOffsets());
            }
        });
        this.log.debug("Current changelog positions: {}", allChangelogPositions);
        Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> allEndOffsets = ClientUtils.fetchEndOffsets(allPartitions, this.adminClient);
        this.log.debug("Current end offsets :{}", allEndOffsets);
        for (Map.Entry<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> entry : allEndOffsets.entrySet()) {
            long earliestOffset = 0L;
            long changelogPosition = allChangelogPositions.getOrDefault(entry.getKey(), 0L);
            long latestOffset = entry.getValue().offset();
            LagInfo lagInfo = new LagInfo(changelogPosition == -2L ? latestOffset : changelogPosition, latestOffset);
            String storeName = this.streamsMetadataState.getStoreForChangelogTopic(entry.getKey().topic());
            localStorePartitionLags.computeIfAbsent(storeName, ignored -> new TreeMap()).put(entry.getKey().partition(), lagInfo);
        }
        return Collections.unmodifiableMap(localStorePartitionLags);
    }

    final class DelegatingStateRestoreListener
    implements StateRestoreListener {
        DelegatingStateRestoreListener() {
        }

        private void throwOnFatalException(Exception fatalUserException, TopicPartition topicPartition, String storeName) {
            throw new StreamsException(String.format("Fatal user code error in store restore listener for store %s, partition %s.", storeName, topicPartition), fatalUserException);
        }

        @Override
        public void onRestoreStart(TopicPartition topicPartition, String storeName, long startingOffset, long endingOffset) {
            if (KafkaStreams.this.globalStateRestoreListener != null) {
                try {
                    KafkaStreams.this.globalStateRestoreListener.onRestoreStart(topicPartition, storeName, startingOffset, endingOffset);
                }
                catch (Exception fatalUserException) {
                    this.throwOnFatalException(fatalUserException, topicPartition, storeName);
                }
            }
        }

        @Override
        public void onBatchRestored(TopicPartition topicPartition, String storeName, long batchEndOffset, long numRestored) {
            if (KafkaStreams.this.globalStateRestoreListener != null) {
                try {
                    KafkaStreams.this.globalStateRestoreListener.onBatchRestored(topicPartition, storeName, batchEndOffset, numRestored);
                }
                catch (Exception fatalUserException) {
                    this.throwOnFatalException(fatalUserException, topicPartition, storeName);
                }
            }
        }

        @Override
        public void onRestoreEnd(TopicPartition topicPartition, String storeName, long totalRestored) {
            if (KafkaStreams.this.globalStateRestoreListener != null) {
                try {
                    KafkaStreams.this.globalStateRestoreListener.onRestoreEnd(topicPartition, storeName, totalRestored);
                }
                catch (Exception fatalUserException) {
                    this.throwOnFatalException(fatalUserException, topicPartition, storeName);
                }
            }
        }
    }

    final class StreamStateListener
    implements StreamThread.StateListener {
        private final Map<Long, StreamThread.State> threadState;
        private GlobalStreamThread.State globalThreadState;
        private final Object threadStatesLock;

        StreamStateListener(Map<Long, StreamThread.State> threadState, GlobalStreamThread.State globalThreadState) {
            this.threadState = threadState;
            this.globalThreadState = globalThreadState;
            this.threadStatesLock = new Object();
        }

        private void maybeSetRunning() {
            for (StreamThread.State state : this.threadState.values()) {
                if (state == StreamThread.State.RUNNING || state == StreamThread.State.DEAD) continue;
                return;
            }
            if (this.globalThreadState != null && this.globalThreadState != GlobalStreamThread.State.RUNNING) {
                return;
            }
            KafkaStreams.this.setState(State.RUNNING);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public synchronized void onChange(Thread thread, ThreadStateTransitionValidator abstractNewState, ThreadStateTransitionValidator abstractOldState) {
            Object object = this.threadStatesLock;
            synchronized (object) {
                if (thread instanceof StreamThread) {
                    StreamThread.State newState = (StreamThread.State)abstractNewState;
                    this.threadState.put(thread.getId(), newState);
                    if (newState == StreamThread.State.PARTITIONS_REVOKED || newState == StreamThread.State.PARTITIONS_ASSIGNED) {
                        KafkaStreams.this.setState(State.REBALANCING);
                    } else if (newState == StreamThread.State.RUNNING) {
                        this.maybeSetRunning();
                    }
                } else if (thread instanceof GlobalStreamThread) {
                    GlobalStreamThread.State newState;
                    this.globalThreadState = newState = (GlobalStreamThread.State)abstractNewState;
                    if (newState == GlobalStreamThread.State.RUNNING) {
                        this.maybeSetRunning();
                    } else if (newState == GlobalStreamThread.State.DEAD) {
                        KafkaStreams.this.log.error("Global thread has died. The streams application or client will now close to ERROR.");
                        KafkaStreams.this.closeToError();
                    }
                }
            }
        }
    }

    public static interface StateListener {
        public void onChange(State var1, State var2);
    }

    public static enum State {
        CREATED(1, 3),
        REBALANCING(2, 3, 5),
        RUNNING(1, 2, 3, 5),
        PENDING_SHUTDOWN(4),
        NOT_RUNNING(new Integer[0]),
        PENDING_ERROR(6),
        ERROR(new Integer[0]);

        private final Set<Integer> validTransitions = new HashSet<Integer>();

        private State(Integer ... validTransitions) {
            this.validTransitions.addAll(Arrays.asList(validTransitions));
        }

        public boolean isRunningOrRebalancing() {
            return this.equals((Object)RUNNING) || this.equals((Object)REBALANCING);
        }

        public boolean isValidTransition(State newState) {
            return this.validTransitions.contains(newState.ordinal());
        }
    }
}

