/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.worker;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
import org.apache.pulsar.functions.worker.FunctionAction;
import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
import org.apache.pulsar.functions.worker.Utils;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FunctionActioner
implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(FunctionActioner.class);
    private final WorkerConfig workerConfig;
    private final RuntimeFactory runtimeFactory;
    private final Namespace dlogNamespace;
    private LinkedBlockingQueue<FunctionAction> actionQueue;
    private volatile boolean running;
    private Thread actioner;

    public FunctionActioner(WorkerConfig workerConfig, RuntimeFactory runtimeFactory, Namespace dlogNamespace, LinkedBlockingQueue<FunctionAction> actionQueue) {
        this.workerConfig = workerConfig;
        this.runtimeFactory = runtimeFactory;
        this.dlogNamespace = dlogNamespace;
        this.actionQueue = actionQueue;
        this.actioner = new Thread(() -> {
            log.info("Starting Actioner Thread...");
            while (this.running) {
                try {
                    FunctionAction action = (FunctionAction)actionQueue.poll(1L, TimeUnit.SECONDS);
                    if (action == null) continue;
                    if (action.getAction() == FunctionAction.Action.START) {
                        try {
                            this.startFunction(action.getFunctionRuntimeInfo());
                        }
                        catch (Exception ex) {
                            log.info("Error starting function", (Throwable)ex);
                            action.getFunctionRuntimeInfo().setStartupException(ex);
                        }
                        continue;
                    }
                    this.stopFunction(action.getFunctionRuntimeInfo());
                }
                catch (InterruptedException interruptedException) {}
            }
        });
        this.actioner.setName("FunctionActionerThread");
    }

    public void start() {
        this.running = true;
        this.actioner.start();
    }

    @Override
    public void close() {
        this.running = false;
    }

    public void join() throws InterruptedException {
        this.actioner.join();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void startFunction(FunctionRuntimeInfo functionRuntimeInfo) throws Exception {
        File tempPkgFile;
        Function.Instance instance = functionRuntimeInfo.getFunctionInstance();
        Function.FunctionMetaData functionMetaData = instance.getFunctionMetaData();
        log.info("Starting function {} - {} ...", (Object)functionMetaData.getFunctionDetails().getName(), (Object)instance.getInstanceId());
        File pkgDir = new File(this.workerConfig.getDownloadDirectory(), this.getDownloadPackagePath(functionMetaData, instance.getInstanceId()));
        pkgDir.mkdirs();
        int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId();
        File pkgFile = new File(pkgDir, new File(FunctionDetailsUtils.getDownloadFileName((Function.FunctionDetails)functionMetaData.getFunctionDetails())).getName());
        if (pkgFile.exists()) {
            log.warn("Function package exists already {} deleting it", (Object)pkgFile);
            pkgFile.delete();
        }
        while ((tempPkgFile = new File(pkgDir, pkgFile.getName() + "." + instanceId + "." + UUID.randomUUID().toString())).exists() || !tempPkgFile.createNewFile()) {
        }
        try {
            log.info("Function package file {} will be downloaded from {}", (Object)tempPkgFile, (Object)functionMetaData.getPackageLocation());
            Utils.downloadFromBookkeeper(this.dlogNamespace, new FileOutputStream(tempPkgFile), functionMetaData.getPackageLocation().getPackagePath());
            try {
                Files.createLink(Paths.get(pkgFile.toURI()), Paths.get(tempPkgFile.toURI()));
                log.info("Function package file is linked from {} to {}", (Object)tempPkgFile, (Object)pkgFile);
            }
            catch (FileAlreadyExistsException faee) {
                log.warn("Function package has been downloaded from {} and saved at {}", (Object)functionMetaData.getPackageLocation(), (Object)pkgFile);
            }
        }
        finally {
            tempPkgFile.delete();
        }
        InstanceConfig instanceConfig = new InstanceConfig();
        instanceConfig.setFunctionDetails(functionMetaData.getFunctionDetails());
        instanceConfig.setFunctionId(UUID.randomUUID().toString());
        instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
        instanceConfig.setInstanceId(String.valueOf(instanceId));
        instanceConfig.setMaxBufferedTuples(1024);
        RuntimeSpawner runtimeSpawner = new RuntimeSpawner(instanceConfig, pkgFile.getAbsolutePath(), this.runtimeFactory, this.workerConfig.getInstanceLivenessCheckFreqMs());
        functionRuntimeInfo.setRuntimeSpawner(runtimeSpawner);
        runtimeSpawner.start();
    }

    private void stopFunction(FunctionRuntimeInfo functionRuntimeInfo) {
        File pkgDir;
        Function.Instance instance = functionRuntimeInfo.getFunctionInstance();
        Function.FunctionMetaData functionMetaData = instance.getFunctionMetaData();
        log.info("Stopping function {} - {}...", (Object)functionMetaData.getFunctionDetails().getName(), (Object)instance.getInstanceId());
        if (functionRuntimeInfo.getRuntimeSpawner() != null) {
            functionRuntimeInfo.getRuntimeSpawner().close();
            functionRuntimeInfo.setRuntimeSpawner(null);
        }
        if ((pkgDir = new File(this.workerConfig.getDownloadDirectory(), this.getDownloadPackagePath(functionMetaData, instance.getInstanceId()))).exists()) {
            try {
                FileUtils.deleteDirectory((File)pkgDir);
            }
            catch (IOException e) {
                log.warn("Failed to delete package for function: {}", (Object)FunctionDetailsUtils.getFullyQualifiedName((Function.FunctionDetails)functionMetaData.getFunctionDetails()), (Object)e);
            }
        }
    }

    private String getDownloadPackagePath(Function.FunctionMetaData functionMetaData, int instanceId) {
        return StringUtils.join((Object[])new String[]{functionMetaData.getFunctionDetails().getTenant(), functionMetaData.getFunctionDetails().getNamespace(), functionMetaData.getFunctionDetails().getName(), Integer.toString(instanceId)}, (char)File.separatorChar);
    }

    public void setActionQueue(LinkedBlockingQueue<FunctionAction> actionQueue) {
        this.actionQueue = actionQueue;
    }

    public void setRunning(boolean running) {
        this.running = running;
    }

    public void setActioner(Thread actioner) {
        this.actioner = actioner;
    }

    public WorkerConfig getWorkerConfig() {
        return this.workerConfig;
    }

    public RuntimeFactory getRuntimeFactory() {
        return this.runtimeFactory;
    }

    public Namespace getDlogNamespace() {
        return this.dlogNamespace;
    }

    public LinkedBlockingQueue<FunctionAction> getActionQueue() {
        return this.actionQueue;
    }

    public boolean isRunning() {
        return this.running;
    }

    public Thread getActioner() {
        return this.actioner;
    }

    public boolean equals(Object o) {
        if (o == this) {
            return true;
        }
        if (!(o instanceof FunctionActioner)) {
            return false;
        }
        FunctionActioner other = (FunctionActioner)o;
        if (!other.canEqual(this)) {
            return false;
        }
        WorkerConfig this$workerConfig = this.getWorkerConfig();
        WorkerConfig other$workerConfig = other.getWorkerConfig();
        if (this$workerConfig == null ? other$workerConfig != null : !((Object)this$workerConfig).equals(other$workerConfig)) {
            return false;
        }
        RuntimeFactory this$runtimeFactory = this.getRuntimeFactory();
        RuntimeFactory other$runtimeFactory = other.getRuntimeFactory();
        if (this$runtimeFactory == null ? other$runtimeFactory != null : !this$runtimeFactory.equals(other$runtimeFactory)) {
            return false;
        }
        Namespace this$dlogNamespace = this.getDlogNamespace();
        Namespace other$dlogNamespace = other.getDlogNamespace();
        if (this$dlogNamespace == null ? other$dlogNamespace != null : !this$dlogNamespace.equals(other$dlogNamespace)) {
            return false;
        }
        LinkedBlockingQueue<FunctionAction> this$actionQueue = this.getActionQueue();
        LinkedBlockingQueue<FunctionAction> other$actionQueue = other.getActionQueue();
        if (this$actionQueue == null ? other$actionQueue != null : !this$actionQueue.equals(other$actionQueue)) {
            return false;
        }
        if (this.isRunning() != other.isRunning()) {
            return false;
        }
        Thread this$actioner = this.getActioner();
        Thread other$actioner = other.getActioner();
        return !(this$actioner == null ? other$actioner != null : !this$actioner.equals(other$actioner));
    }

    protected boolean canEqual(Object other) {
        return other instanceof FunctionActioner;
    }

    public int hashCode() {
        int PRIME = 59;
        int result = 1;
        WorkerConfig $workerConfig = this.getWorkerConfig();
        result = result * 59 + ($workerConfig == null ? 43 : ((Object)$workerConfig).hashCode());
        RuntimeFactory $runtimeFactory = this.getRuntimeFactory();
        result = result * 59 + ($runtimeFactory == null ? 43 : $runtimeFactory.hashCode());
        Namespace $dlogNamespace = this.getDlogNamespace();
        result = result * 59 + ($dlogNamespace == null ? 43 : $dlogNamespace.hashCode());
        LinkedBlockingQueue<FunctionAction> $actionQueue = this.getActionQueue();
        result = result * 59 + ($actionQueue == null ? 43 : $actionQueue.hashCode());
        result = result * 59 + (this.isRunning() ? 79 : 97);
        Thread $actioner = this.getActioner();
        result = result * 59 + ($actioner == null ? 43 : $actioner.hashCode());
        return result;
    }

    public String toString() {
        return "FunctionActioner(workerConfig=" + this.getWorkerConfig() + ", runtimeFactory=" + this.getRuntimeFactory() + ", dlogNamespace=" + this.getDlogNamespace() + ", actionQueue=" + this.getActionQueue() + ", running=" + this.isRunning() + ", actioner=" + this.getActioner() + ")";
    }
}

