package org.apache.storm.starter.trident;

import java.util.Properties;
import kafka.api.OffsetRequest;
import org.apache.commons.configuration.tree.DefaultExpressionEngine;
import org.apache.hadoop.fs.shell.Count;
import org.apache.hadoop.fs.shell.Test;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.LocalDRPC;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.kafka.StringScheme;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.kafka.bolt.KafkaBolt;
import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
import org.apache.storm.kafka.trident.TransactionalTridentKafkaSpout;
import org.apache.storm.kafka.trident.TridentKafkaConfig;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.starter.spout.RandomSentenceSpout;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentState;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.builtin.FilterNull;
import org.apache.storm.trident.operation.builtin.MapGet;
import org.apache.storm.trident.testing.MemoryMapState;
import org.apache.storm.trident.testing.Split;
import org.apache.storm.tuple.Fields;

/* loaded from: input_file:org/apache/storm/starter/trident/TridentKafkaWordCount.class */
public class TridentKafkaWordCount {
    private String zkUrl;
    private String brokerUrl;

    TridentKafkaWordCount(String str, String str2) {
        this.zkUrl = str;
        this.brokerUrl = str2;
    }

    private TransactionalTridentKafkaSpout createKafkaSpout() {
        TridentKafkaConfig tridentKafkaConfig = new TridentKafkaConfig(new ZkHosts(this.zkUrl), Test.NAME);
        tridentKafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        tridentKafkaConfig.startOffsetTime = OffsetRequest.LatestTime();
        return new TransactionalTridentKafkaSpout(tridentKafkaConfig);
    }

    private Stream addDRPCStream(TridentTopology tridentTopology, TridentState tridentState, LocalDRPC localDRPC) {
        return tridentTopology.newDRPCStream("words", localDRPC).each(new Fields(new String[]{"args"}), new Split(), new Fields(new String[]{"word"})).groupBy(new Fields(new String[]{"word"})).stateQuery(tridentState, new Fields(new String[]{"word"}), new MapGet(), new Fields(new String[]{Count.NAME})).each(new Fields(new String[]{Count.NAME}), new FilterNull()).project(new Fields(new String[]{"word", Count.NAME}));
    }

    private TridentState addTridentState(TridentTopology tridentTopology) {
        return tridentTopology.newStream("spout1", createKafkaSpout()).parallelismHint(1).each(new Fields(new String[]{"str"}), new Split(), new Fields(new String[]{"word"})).groupBy(new Fields(new String[]{"word"})).persistentAggregate(new MemoryMapState.Factory(), new org.apache.storm.trident.operation.builtin.Count(), new Fields(new String[]{Count.NAME})).parallelismHint(1);
    }

    public StormTopology buildConsumerTopology(LocalDRPC localDRPC) {
        TridentTopology tridentTopology = new TridentTopology();
        addDRPCStream(tridentTopology, addTridentState(tridentTopology), localDRPC);
        return tridentTopology.build();
    }

    public Config getConsumerConfig() {
        Config config = new Config();
        config.setMaxSpoutPending(20);
        return config;
    }

    public StormTopology buildProducerTopology(Properties properties) {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("spout", new RandomSentenceSpout(), 2);
        topologyBuilder.setBolt("forwardToKafka", new KafkaBolt().withProducerProperties(properties).withTopicSelector(new DefaultTopicSelector(Test.NAME)).withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("key", "word")), 1).shuffleGrouping("spout");
        return topologyBuilder.createTopology();
    }

    public Properties getProducerConfig() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.brokerUrl);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("client.id", "storm-kafka-producer");
        return properties;
    }

    public static void main(String[] strArr) throws Exception {
        String str = "localhost:2181";
        String str2 = "localhost:9092";
        if (strArr.length > 2 || (strArr.length == 1 && strArr[0].matches("^-h|--help$"))) {
            System.out.println("Usage: TridentKafkaWordCount [kafka zookeeper url] [kafka broker url]");
            System.out.println("   E.g TridentKafkaWordCount [" + str + DefaultExpressionEngine.DEFAULT_ATTRIBUTE_END + " [" + str2 + DefaultExpressionEngine.DEFAULT_ATTRIBUTE_END);
            System.exit(1);
        } else if (strArr.length == 1) {
            str = strArr[0];
        } else if (strArr.length == 2) {
            str = strArr[0];
            str2 = strArr[1];
        }
        System.out.println("Using Kafka zookeeper url: " + str + " broker url: " + str2);
        TridentKafkaWordCount tridentKafkaWordCount = new TridentKafkaWordCount(str, str2);
        LocalDRPC localDRPC = new LocalDRPC();
        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology("wordCounter", tridentKafkaWordCount.getConsumerConfig(), tridentKafkaWordCount.buildConsumerTopology(localDRPC));
        Config config = new Config();
        config.setMaxSpoutPending(20);
        localCluster.submitTopology("kafkaBolt", config, tridentKafkaWordCount.buildProducerTopology(tridentKafkaWordCount.getProducerConfig()));
        for (int i = 0; i < 60; i++) {
            System.out.println("DRPC RESULT: " + localDRPC.execute("words", "the and apple snow jumped"));
            Thread.sleep(1000L);
        }
        localCluster.killTopology("kafkaBolt");
        localCluster.killTopology("wordCounter");
        localCluster.shutdown();
    }
}
