package org.wso2.carbon.sp.jobmanager.core.appCreator;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import kafka.admin.AdminUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.ZkServer;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.text.StrSubstitutor;
import org.apache.log4j.Logger;
import org.wso2.carbon.sp.jobmanager.core.internal.ServiceDataHolder;
import org.wso2.carbon.sp.jobmanager.core.topology.InputStreamDataHolder;
import org.wso2.carbon.sp.jobmanager.core.topology.OutputStreamDataHolder;
import org.wso2.carbon.sp.jobmanager.core.topology.PublishingStrategyDataHolder;
import org.wso2.carbon.sp.jobmanager.core.topology.SiddhiQueryGroup;
import org.wso2.carbon.sp.jobmanager.core.topology.SubscriptionStrategyDataHolder;
import org.wso2.carbon.sp.jobmanager.core.util.ResourceManagerConstants;
import org.wso2.carbon.sp.jobmanager.core.util.TransportStrategy;
import org.wso2.siddhi.core.exception.SiddhiAppCreationException;

/* loaded from: input_file:org/wso2/carbon/sp/jobmanager/core/appCreator/SPSiddhiAppCreator.class */
public class SPSiddhiAppCreator extends AbstractSiddhiAppCreator {
    private static final Logger log = Logger.getLogger(SPSiddhiAppCreator.class);

    @Override // org.wso2.carbon.sp.jobmanager.core.appCreator.AbstractSiddhiAppCreator
    protected List<SiddhiQuery> createApps(String str, SiddhiQueryGroup siddhiQueryGroup) {
        String name = siddhiQueryGroup.getName();
        List<SiddhiQuery> generateQueryList = generateQueryList(siddhiQueryGroup.getSiddhiApp(), str, name, siddhiQueryGroup.getParallelism());
        processInputStreams(str, name, generateQueryList, siddhiQueryGroup.getInputStreams().values());
        processOutputStreams(str, name, generateQueryList, siddhiQueryGroup.getOutputStreams().values());
        return generateQueryList;
    }

    private void processOutputStreams(String str, String str2, List<SiddhiQuery> list, Collection<OutputStreamDataHolder> collection) {
        HashMap hashMap = new HashMap();
        hashMap.put(ResourceManagerConstants.BOOTSTRAP_SERVER_URL, ServiceDataHolder.getDeploymentConfig().getBootstrapURLs());
        for (OutputStreamDataHolder outputStreamDataHolder : collection) {
            HashMap hashMap2 = new HashMap();
            HashMap hashMap3 = new HashMap();
            HashMap hashMap4 = new HashMap();
            for (PublishingStrategyDataHolder publishingStrategyDataHolder : outputStreamDataHolder.getPublishingStrategyList()) {
                hashMap.put(ResourceManagerConstants.TOPIC_LIST, str + "." + outputStreamDataHolder.getStreamName() + (publishingStrategyDataHolder.getGroupingField() == null ? "" : "." + publishingStrategyDataHolder.getGroupingField()));
                if (publishingStrategyDataHolder.getStrategy() == TransportStrategy.FIELD_GROUPING) {
                    if (hashMap3.get(publishingStrategyDataHolder.getGroupingField()) == null || ((Integer) hashMap3.get(publishingStrategyDataHolder.getGroupingField())).intValue() <= publishingStrategyDataHolder.getParallelism()) {
                        hashMap.remove(str + "." + outputStreamDataHolder.getStreamName());
                        hashMap3.put(publishingStrategyDataHolder.getGroupingField(), Integer.valueOf(publishingStrategyDataHolder.getParallelism()));
                        hashMap.put(ResourceManagerConstants.PARTITION_KEY, publishingStrategyDataHolder.getGroupingField());
                        ArrayList arrayList = new ArrayList(publishingStrategyDataHolder.getParallelism());
                        for (int i = 0; i < publishingStrategyDataHolder.getParallelism(); i++) {
                            HashMap hashMap5 = new HashMap(publishingStrategyDataHolder.getParallelism());
                            hashMap5.put(ResourceManagerConstants.PARTITION_NO, String.valueOf(i));
                            arrayList.add(getUpdatedQuery(ResourceManagerConstants.DESTINATION, hashMap5));
                        }
                        hashMap.put(ResourceManagerConstants.DESTINATIONS, StringUtils.join(arrayList, ","));
                        hashMap2.put(hashMap.get(ResourceManagerConstants.TOPIC_LIST), getUpdatedQuery(ResourceManagerConstants.PARTITIONED_KAFKA_SINK_TEMPLATE, hashMap));
                        hashMap4.put(hashMap.get(ResourceManagerConstants.TOPIC_LIST), Integer.valueOf(publishingStrategyDataHolder.getParallelism()));
                    }
                } else if (hashMap3.isEmpty()) {
                    hashMap2.put(hashMap.get(ResourceManagerConstants.TOPIC_LIST), getUpdatedQuery(ResourceManagerConstants.DEFAULT_KAFKA_SINK_TEMPLATE, hashMap));
                }
            }
            HashMap hashMap6 = new HashMap(1);
            hashMap6.put(outputStreamDataHolder.getStreamName(), StringUtils.join(hashMap2.values(), "\n"));
            updateQueryList(list, hashMap6);
            createTopicPartitions(hashMap4);
        }
    }

    private void createTopicPartitions(Map<String, Integer> map) {
        String[] split = ServiceDataHolder.getDeploymentConfig().getBootstrapURLs().split(",");
        String zooKeeperURLs = ServiceDataHolder.getDeploymentConfig().getZooKeeperURLs();
        ZkClient zkClient = new ZkClient(zooKeeperURLs, ZkServer.DEFAULT_MIN_SESSION_TIMEOUT, 8000, (ZkSerializer) ZKStringSerializer$.MODULE$);
        ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zooKeeperURLs), false);
        Properties properties = new Properties();
        for (Map.Entry<String, Integer> entry : map.entrySet()) {
            String key = entry.getKey();
            Integer value = entry.getValue();
            if (AdminUtils.topicExists(zkUtils, key)) {
                int size = AdminUtils.fetchTopicMetadataFromZk(key, zkUtils).partitionsMetadata().size();
                if (size < value.intValue()) {
                    AdminUtils.addPartitions(zkUtils, key, value.intValue(), "", true);
                    log.info("Added " + value + " partitions to topic " + key);
                } else if (size > value.intValue()) {
                    log.info("Topic " + key + " has higher number of partitions than expected partition count. Hence have to delete the topic and recreate with " + value + "partitions.");
                    AdminUtils.deleteTopic(zkUtils, key);
                    long currentTimeMillis = System.currentTimeMillis();
                    while (AdminUtils.topicExists(zkUtils, key)) {
                        try {
                            TimeUnit.SECONDS.sleep(1L);
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                        if (System.currentTimeMillis() - currentTimeMillis > 120 * 1000) {
                            break;
                        }
                    }
                    if (AdminUtils.topicExists(zkUtils, key)) {
                        throw new SiddhiAppCreationException("Topic " + key + " deletion failed. Hence Could not create new topic to facilitate new partitions.");
                    }
                    AdminUtils.createTopic(zkUtils, key, value.intValue(), split.length, properties);
                    log.info("Created topic " + key + "with " + value + " partitions.");
                } else {
                    continue;
                }
            } else {
                AdminUtils.createTopic(zkUtils, key, value.intValue(), split.length, properties);
                log.info("Created topic " + key + "with " + value + " partitions.");
            }
        }
        zkClient.close();
    }

    private void processInputStreams(String str, String str2, List<SiddhiQuery> list, Collection<InputStreamDataHolder> collection) {
        HashMap hashMap = new HashMap();
        hashMap.put(ResourceManagerConstants.BOOTSTRAP_SERVER_URL, ServiceDataHolder.getDeploymentConfig().getBootstrapURLs());
        for (InputStreamDataHolder inputStreamDataHolder : collection) {
            SubscriptionStrategyDataHolder subscriptionStrategy = inputStreamDataHolder.getSubscriptionStrategy();
            hashMap.put(ResourceManagerConstants.TOPIC_LIST, str + "." + inputStreamDataHolder.getStreamName() + (inputStreamDataHolder.getSubscriptionStrategy().getPartitionKey() == null ? "" : "." + inputStreamDataHolder.getSubscriptionStrategy().getPartitionKey()));
            if (!inputStreamDataHolder.isUserGiven()) {
                if (subscriptionStrategy.getStrategy() == TransportStrategy.FIELD_GROUPING) {
                    hashMap.put(ResourceManagerConstants.CONSUMER_GROUP_ID, str2);
                    for (int i = 0; i < list.size(); i++) {
                        hashMap.put(ResourceManagerConstants.PARTITION_LIST, StringUtils.join(getPartitionNumbers(list.size(), subscriptionStrategy.getOfferedParallelism(), i), ","));
                        String updatedQuery = getUpdatedQuery(ResourceManagerConstants.PARTITIONED_KAFKA_SOURCE_TEMPLATE, hashMap);
                        HashMap hashMap2 = new HashMap(1);
                        hashMap2.put(inputStreamDataHolder.getStreamName(), updatedQuery);
                        list.get(i).setApp(getUpdatedQuery(list.get(i).getApp(), hashMap2));
                    }
                } else if (subscriptionStrategy.getStrategy() == TransportStrategy.ROUND_ROBIN) {
                    hashMap.put(ResourceManagerConstants.CONSUMER_GROUP_ID, str2);
                    String updatedQuery2 = getUpdatedQuery(ResourceManagerConstants.DEFAULT_KAFKA_SOURCE_TEMPLATE, hashMap);
                    HashMap hashMap3 = new HashMap(1);
                    hashMap3.put(inputStreamDataHolder.getStreamName(), updatedQuery2);
                    updateQueryList(list, hashMap3);
                } else {
                    for (int i2 = 0; i2 < list.size(); i2++) {
                        hashMap.put(ResourceManagerConstants.CONSUMER_GROUP_ID, str2 + "-" + i2);
                        String updatedQuery3 = getUpdatedQuery(ResourceManagerConstants.DEFAULT_KAFKA_SOURCE_TEMPLATE, hashMap);
                        HashMap hashMap4 = new HashMap(1);
                        hashMap4.put(inputStreamDataHolder.getStreamName(), updatedQuery3);
                        list.get(i2).setApp(getUpdatedQuery(list.get(i2).getApp(), hashMap4));
                    }
                }
            }
        }
    }

    private List<Integer> getPartitionNumbers(int i, int i2, int i3) {
        ArrayList arrayList = new ArrayList();
        if (i2 == i) {
            arrayList.add(Integer.valueOf(i3));
            return arrayList;
        }
        int i4 = i2 / i;
        if (i3 + 1 != i) {
            for (int i5 = 0; i5 < i4; i5++) {
                arrayList.add(Integer.valueOf((i3 * i4) + i5));
            }
            return arrayList;
        }
        int i6 = i2 - ((i - 1) * i4);
        for (int i7 = 0; i7 < i6; i7++) {
            arrayList.add(Integer.valueOf((i3 * i4) + i7));
        }
        return arrayList;
    }

    private List<SiddhiQuery> generateQueryList(String str, String str2, String str3, int i) {
        ArrayList arrayList = new ArrayList(i);
        for (int i2 = 0; i2 < i; i2++) {
            HashMap hashMap = new HashMap(1);
            String str4 = str3 + "-" + (i2 + 1);
            hashMap.put(ResourceManagerConstants.APP_NAME, str4);
            arrayList.add(new SiddhiQuery(str4, new StrSubstitutor(hashMap).replace(str)));
        }
        return arrayList;
    }

    private void updateQueryList(List<SiddhiQuery> list, Map<String, String> map) {
        StrSubstitutor strSubstitutor = new StrSubstitutor(map);
        for (SiddhiQuery siddhiQuery : list) {
            siddhiQuery.setApp(strSubstitutor.replace(siddhiQuery.getApp()));
        }
    }

    private String getUpdatedQuery(String str, Map<String, String> map) {
        return new StrSubstitutor(map).replace(str);
    }
}
