/*
 * Decompiled with CFR 0.152.
 */
package backtype.storm;

import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.Nimbus;
import backtype.storm.generated.StormTopology;
import backtype.storm.generated.SubmitOptions;
import backtype.storm.generated.TopologyAssignException;
import backtype.storm.utils.BufferFileInputStream;
import backtype.storm.utils.NimbusClient;
import backtype.storm.utils.Utils;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StormSubmitter {
    public static Logger LOG = LoggerFactory.getLogger(StormSubmitter.class);
    private static Nimbus.Iface localNimbus = null;
    private static String submittedJar = null;
    private static String path = null;

    public static void setLocalNimbus(Nimbus.Iface localNimbusHandler) {
        localNimbus = localNimbusHandler;
    }

    public static void submitTopology(String name, Map stormConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException {
        StormSubmitter.submitTopology(name, stormConf, topology, null);
    }

    public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts, List<File> jarFiles) throws AlreadyAliveException, InvalidTopologyException {
        if (jarFiles == null) {
            jarFiles = new ArrayList<File>();
        }
        HashMap<String, String> jars = new HashMap<String, String>(jarFiles.size());
        ArrayList<String> names = new ArrayList<String>(jarFiles.size());
        for (File f : jarFiles) {
            if (!f.exists()) {
                LOG.info(f.getName() + " is not existed: " + f.getAbsolutePath());
                continue;
            }
            jars.put(f.getName(), f.getAbsolutePath());
            names.add(f.getName());
        }
        LOG.info("Files: " + names + " will be loaded");
        stormConf.put("topology.lib.path", jars);
        stormConf.put("topology.lib.name", names);
        StormSubmitter.submitTopology(name, stormConf, topology, opts);
    }

    public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts, ProgressListener listener) throws AlreadyAliveException, InvalidTopologyException {
        StormSubmitter.submitTopology(name, stormConf, topology, opts);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts) throws AlreadyAliveException, InvalidTopologyException {
        if (!Utils.isValidConf(stormConf)) {
            throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
        }
        stormConf = new HashMap(stormConf);
        stormConf.putAll(Utils.readCommandLineOpts());
        Map conf = Utils.readStormConfig();
        conf.putAll(stormConf);
        StormSubmitter.putUserInfo(conf, stormConf);
        try {
            String serConf = Utils.to_json(stormConf);
            if (localNimbus != null) {
                LOG.info("Submitting topology " + name + " in local mode");
                localNimbus.submitTopology(name, null, serConf, topology);
            } else {
                try (NimbusClient client = NimbusClient.getConfiguredClient(conf);){
                    if (StormSubmitter.topologyNameExists(client, conf, name)) {
                        throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
                    }
                    StormSubmitter.submitJar(client, conf);
                    LOG.info("Submitting topology " + name + " in distributed mode with conf " + serConf);
                    if (opts != null) {
                        client.getClient().submitTopologyWithOpts(name, path, serConf, topology, opts);
                    } else {
                        client.getClient().submitTopology(name, path, serConf, topology);
                    }
                }
            }
            LOG.info("Finished submitting topology: " + name);
        }
        catch (InvalidTopologyException e) {
            LOG.warn("Topology submission exception", (Throwable)((Object)e));
            throw e;
        }
        catch (AlreadyAliveException e) {
            LOG.warn("Topology already alive exception", (Throwable)((Object)e));
            throw e;
        }
        catch (TopologyAssignException e) {
            LOG.warn("Failed to assign " + e.get_msg(), (Throwable)((Object)e));
            throw new RuntimeException((Throwable)((Object)e));
        }
        catch (TException e) {
            LOG.warn("Failed to assign ", (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    public static void submitTopologyWithProgressBar(String name, Map stormConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException {
        StormSubmitter.submitTopologyWithProgressBar(name, stormConf, topology, null);
    }

    public static void submitTopologyWithProgressBar(String name, Map stormConf, StormTopology topology, SubmitOptions opts) throws AlreadyAliveException, InvalidTopologyException {
        StormSubmitter.submitTopology(name, stormConf, topology, opts);
    }

    public static boolean topologyNameExists(NimbusClient client, Map conf, String name) {
        try {
            client.getClient().getTopologyInfoByName(name);
            return true;
        }
        catch (Exception e) {
            return false;
        }
    }

    private static void submitJar(NimbusClient client, Map conf) {
        if (submittedJar == null) {
            try {
                LOG.info("Jar not uploaded to master yet. Submitting jar...");
                String localJar = System.getProperty("storm.jar");
                path = client.getClient().beginFileUpload();
                String[] pathCache = path.split("/");
                String uploadLocation = path + "/stormjar-" + pathCache[pathCache.length - 1] + ".jar";
                List lib = (List)conf.get("topology.lib.name");
                Map libPath = (Map)conf.get("topology.lib.path");
                if (lib != null && lib.size() != 0) {
                    for (String libName : lib) {
                        String jarPath = path + "/lib/" + libName;
                        client.getClient().beginLibUpload(jarPath);
                        StormSubmitter.submitJar(conf, (String)libPath.get(libName), jarPath, client);
                    }
                } else if (localJar == null) {
                    throw new RuntimeException("No client app jar, please upload it");
                }
                if (localJar != null) {
                    submittedJar = StormSubmitter.submitJar(conf, localJar, uploadLocation, client);
                }
                client.getClient().finishFileUpload(uploadLocation);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        } else {
            LOG.info("Jar already uploaded to master. Not submitting jar.");
        }
    }

    public static String submitJar(Map conf, String localJar, String uploadLocation, NimbusClient client) {
        if (localJar == null) {
            throw new RuntimeException("Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload.");
        }
        try {
            byte[] toSubmit;
            LOG.info("Uploading topology jar " + localJar + " to assigned location: " + uploadLocation);
            int bufferSize = 524288;
            Object maxBufSizeObject = conf.get("nimbus.thrift.max_buffer_size");
            if (maxBufSizeObject != null) {
                bufferSize = Utils.getInt(maxBufSizeObject) / 2;
            }
            BufferFileInputStream is = new BufferFileInputStream(localJar, bufferSize);
            while ((toSubmit = is.read()).length != 0) {
                client.getClient().uploadChunk(uploadLocation, ByteBuffer.wrap(toSubmit));
            }
            client.getClient().finishFileUpload(uploadLocation);
            LOG.info("Successfully uploaded topology jar to assigned location: " + uploadLocation);
            String string = uploadLocation;
            return string;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private static void putUserInfo(Map conf, Map stormConf) {
        stormConf.put("user.group", conf.get("user.group"));
        stormConf.put("user.name", conf.get("user.name"));
        stormConf.put("user.password", conf.get("user.password"));
    }

    public static interface ProgressListener {
        public void onStart(String var1, String var2, long var3);

        public void onProgress(String var1, String var2, long var3, long var5);

        public void onCompleted(String var1, String var2, long var3);
    }
}

