/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.tests;

import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ForeachAction;

public class StreamsBrokerDownResilienceTest {
    private static final int KEY = 0;
    private static final int VALUE = 1;
    private static final String SOURCE_TOPIC_1 = "streamsResilienceSource";
    private static final String SINK_TOPIC = "streamsResilienceSink";

    public static void main(String[] args) throws IOException {
        if (args.length < 2) {
            System.err.println("StreamsBrokerDownResilienceTest are expecting two parameters: propFile, additionalConfigs; but only see " + args.length + " parameter");
            System.exit(1);
        }
        System.out.println("StreamsTest instance started");
        String propFileName = args[0];
        String additionalConfigs = args[1];
        Properties streamsProperties = Utils.loadProps((String)propFileName);
        String kafka = streamsProperties.getProperty("bootstrap.servers");
        if (kafka == null) {
            System.err.println("No bootstrap kafka servers specified in bootstrap.servers");
            System.exit(1);
        }
        streamsProperties.put("application.id", "kafka-streams-resilience");
        streamsProperties.put("default.key.serde", Serdes.String().getClass());
        streamsProperties.put("default.value.serde", Serdes.String().getClass());
        streamsProperties.put("commit.interval.ms", (Object)100);
        if (additionalConfigs != null && !additionalConfigs.equalsIgnoreCase("none")) {
            Map<String, String> updated = StreamsBrokerDownResilienceTest.updatedConfigs(additionalConfigs);
            System.out.println("Updating configs with " + updated);
            streamsProperties.putAll(updated);
        }
        if (!StreamsBrokerDownResilienceTest.confirmCorrectConfigs(streamsProperties)) {
            System.err.println(String.format("ERROR: Did not have all required configs expected  to contain %s %s %s %s", StreamsConfig.consumerPrefix((String)"max.poll.interval.ms"), StreamsConfig.producerPrefix((String)"retries"), StreamsConfig.producerPrefix((String)"request.timeout.ms"), StreamsConfig.producerPrefix((String)"max.block.ms")));
            System.exit(1);
        }
        StreamsBuilder builder = new StreamsBuilder();
        Serde stringSerde = Serdes.String();
        builder.stream(Collections.singletonList(SOURCE_TOPIC_1), Consumed.with((Serde)stringSerde, (Serde)stringSerde)).peek((ForeachAction)new ForeachAction<String, String>(){
            int messagesProcessed = 0;

            public void apply(String key, String value) {
                System.out.println("received key " + key + " and value " + value);
                ++this.messagesProcessed;
                System.out.println("processed" + this.messagesProcessed + "messages");
                System.out.flush();
            }
        }).to(SINK_TOPIC);
        KafkaStreams streams = new KafkaStreams(builder.build(), streamsProperties);
        streams.setUncaughtExceptionHandler((t, e) -> {
            System.err.println("FATAL: An unexpected exception " + e);
            System.err.flush();
            streams.close(Duration.ofSeconds(30L));
        });
        System.out.println("Start Kafka Streams");
        streams.start();
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            streams.close(Duration.ofSeconds(30L));
            System.out.println("Complete shutdown of streams resilience test app now");
            System.out.flush();
        }));
    }

    private static boolean confirmCorrectConfigs(Properties properties) {
        return properties.containsKey(StreamsConfig.consumerPrefix((String)"max.poll.interval.ms")) && properties.containsKey(StreamsConfig.producerPrefix((String)"retries")) && properties.containsKey(StreamsConfig.producerPrefix((String)"request.timeout.ms")) && properties.containsKey(StreamsConfig.producerPrefix((String)"max.block.ms"));
    }

    private static Map<String, String> updatedConfigs(String formattedConfigs) {
        String[] parts = formattedConfigs.split(",");
        HashMap<String, String> updatedConfigs = new HashMap<String, String>();
        for (String part : parts) {
            String[] keyValue = part.split("=");
            updatedConfigs.put(keyValue[0], keyValue[1]);
        }
        return updatedConfigs;
    }
}

