package com.cloudera.livy.rsc.driver;

import com.cloudera.livy.JobHandle;
import com.cloudera.livy.client.common.Serializer;
import com.cloudera.livy.rsc.BaseProtocol;
import com.cloudera.livy.rsc.BypassJobStatus;
import com.cloudera.livy.rsc.FutureListener;
import com.cloudera.livy.rsc.RSCConf;
import com.cloudera.livy.rsc.Utils;
import com.cloudera.livy.rsc.rpc.Rpc;
import com.cloudera.livy.rsc.rpc.RpcDispatcher;
import com.cloudera.livy.rsc.rpc.RpcServer;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.concurrent.ScheduledFuture;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ChannelHandler.Sharable
/* loaded from: input_file:com/cloudera/livy/rsc/driver/RSCDriver.class */
public class RSCDriver extends BaseProtocol {
    private static final Logger LOG = LoggerFactory.getLogger(RSCDriver.class);
    private RpcServer server;
    private volatile JobContextImpl jc;
    private volatile boolean running;
    protected final SparkConf conf;
    protected final RSCConf livyConf;
    private final File localTmpDir = Files.createTempDirectory("rsc-tmp", PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString("rwx------"))).toFile();
    private final ExecutorService executor = Executors.newCachedThreadPool();
    private final List<JobWrapper<?>> jobQueue = new LinkedList();
    private final Collection<Rpc> clients = new ConcurrentLinkedDeque();
    private final Serializer serializer = new Serializer(new Class[0]);
    private final Object jcLock = new Object();
    private final Object shutdownLock = new Object();
    final Map<String, JobWrapper<?>> activeJobs = new ConcurrentHashMap();
    private final Collection<BypassJobWrapper> bypassJobs = new ConcurrentLinkedDeque();
    private final AtomicReference<ScheduledFuture<?>> idleTimeout = new AtomicReference<>();

    /* renamed from: com.cloudera.livy.rsc.driver.RSCDriver$4, reason: invalid class name */
    /* loaded from: input_file:com/cloudera/livy/rsc/driver/RSCDriver$4.class */
    static /* synthetic */ class AnonymousClass4 {
        static final /* synthetic */ int[] $SwitchMap$com$cloudera$livy$JobHandle$State = new int[JobHandle.State.values().length];

        static {
            try {
                $SwitchMap$com$cloudera$livy$JobHandle$State[JobHandle.State.CANCELLED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$cloudera$livy$JobHandle$State[JobHandle.State.FAILED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$cloudera$livy$JobHandle$State[JobHandle.State.SUCCEEDED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public RSCDriver(SparkConf sparkConf, RSCConf rSCConf) throws Exception {
        this.conf = sparkConf;
        this.livyConf = rSCConf;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void shutdown() {
        if (this.running) {
            this.running = false;
            Iterator<JobWrapper<?>> it = this.activeJobs.values().iterator();
            while (it.hasNext()) {
                it.next().cancel();
            }
            try {
                shutdownContext();
            } catch (Exception e) {
                LOG.warn("Error during shutdown.", e);
            }
            try {
                shutdownServer();
            } catch (Exception e2) {
                LOG.warn("Error during shutdown.", e2);
            }
            synchronized (this.shutdownLock) {
                this.shutdownLock.notifyAll();
            }
            synchronized (this.jcLock) {
                this.jcLock.notifyAll();
            }
        }
    }

    private void initializeServer() throws Exception {
        String str = this.livyConf.get(RSCConf.Entry.CLIENT_ID);
        Utils.checkArgument(str != null, "No client ID provided.", new Object[0]);
        String str2 = this.livyConf.get(RSCConf.Entry.CLIENT_SECRET);
        Utils.checkArgument(str2 != null, "No secret provided.", new Object[0]);
        String str3 = this.livyConf.get(RSCConf.Entry.LAUNCHER_ADDRESS);
        Utils.checkArgument(str3 != null, "Missing launcher address.", new Object[0]);
        int i = this.livyConf.getInt(RSCConf.Entry.LAUNCHER_PORT);
        Utils.checkArgument(i > 0, "Missing launcher port.", new Object[0]);
        LOG.info("Connecting to: {}:{}", str3, Integer.valueOf(i));
        this.livyConf.set(RSCConf.Entry.RPC_SERVER_ADDRESS, (Object) null);
        LOG.info("Starting RPC server...");
        this.server = new RpcServer(this.livyConf);
        this.server.registerClient(str, str2, new RpcServer.ClientCallback() { // from class: com.cloudera.livy.rsc.driver.RSCDriver.1
            @Override // com.cloudera.livy.rsc.rpc.RpcServer.ClientCallback
            public RpcDispatcher onNewClient(Rpc rpc) {
                RSCDriver.this.registerClient(rpc);
                return RSCDriver.this;
            }
        });
        Rpc rpc = (Rpc) Rpc.createClient(this.livyConf, this.server.getEventLoopGroup(), str3, i, str, str2, this).get();
        try {
            try {
                rpc.call(new BaseProtocol.RemoteDriverAddress(this.server.getAddress(), this.server.getPort())).get(this.livyConf.getTimeAsMs(RSCConf.Entry.RPC_CLIENT_HANDSHAKE_TIMEOUT), TimeUnit.MILLISECONDS);
                rpc.close();
                setupIdleTimeout();
            } catch (TimeoutException e) {
                LOG.warn("Timed out sending address to Livy server, shutting down.");
                throw e;
            }
        } catch (Throwable th) {
            rpc.close();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void registerClient(final Rpc rpc) {
        this.clients.add(rpc);
        stopIdleTimeout();
        Utils.addListener(rpc.getChannel().closeFuture(), new FutureListener<Void>() { // from class: com.cloudera.livy.rsc.driver.RSCDriver.2
            @Override // com.cloudera.livy.rsc.FutureListener
            public void onSuccess(Void r4) {
                RSCDriver.this.clients.remove(rpc);
                RSCDriver.this.setupIdleTimeout();
            }
        });
        LOG.debug("Registered new connection from {}.", rpc.getChannel());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setupIdleTimeout() {
        if (this.clients.size() > 0) {
            return;
        }
        ScheduledFuture<?> schedule = this.server.getEventLoopGroup().schedule(new Runnable() { // from class: com.cloudera.livy.rsc.driver.RSCDriver.3
            @Override // java.lang.Runnable
            public void run() {
                RSCDriver.LOG.warn("Shutting down RSC due to idle timeout ({}).", RSCDriver.this.livyConf.get(RSCConf.Entry.SERVER_IDLE_TIMEOUT));
                RSCDriver.this.shutdown();
            }
        }, this.livyConf.getTimeAsMs(RSCConf.Entry.SERVER_IDLE_TIMEOUT), TimeUnit.MILLISECONDS);
        if (!this.idleTimeout.compareAndSet(null, schedule)) {
            LOG.debug("Timeout task already registered.");
            schedule.cancel(false);
        }
        if (this.clients.size() > 0) {
            stopIdleTimeout();
        }
    }

    private void stopIdleTimeout() {
        ScheduledFuture<?> andSet = this.idleTimeout.getAndSet(null);
        if (andSet != null) {
            LOG.debug("Cancelling idle timeout since new client connected.");
            andSet.cancel(false);
        }
    }

    protected JavaSparkContext initializeContext() throws Exception {
        long nanoTime = System.nanoTime();
        LOG.info("Starting Spark context...");
        JavaSparkContext javaSparkContext = new JavaSparkContext(this.conf);
        LOG.info("Spark context finished initialization in {}ms", Long.valueOf(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime)));
        return javaSparkContext;
    }

    protected void shutdownContext() {
        if (this.jc != null) {
            this.jc.stop();
        }
        this.executor.shutdownNow();
        try {
            FileUtils.deleteDirectory(this.localTmpDir);
        } catch (IOException e) {
            LOG.warn("Failed to delete local tmp dir: " + this.localTmpDir, e);
        }
    }

    private void shutdownServer() {
        if (this.server != null) {
            this.server.close();
        }
        Iterator<Rpc> it = this.clients.iterator();
        while (it.hasNext()) {
            it.next().close();
        }
    }

    private void broadcast(Object obj) {
        for (Rpc rpc : this.clients) {
            try {
                rpc.call(obj);
            } catch (Exception e) {
                LOG.warn("Failed to send message to client " + rpc, e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void run() throws Exception {
        this.running = true;
        Thread.currentThread().setContextClassLoader(new MutableClassLoader(Thread.currentThread().getContextClassLoader()));
        try {
            initializeServer();
            JavaSparkContext initializeContext = initializeContext();
            synchronized (this.jcLock) {
                this.jc = new JobContextImpl(initializeContext, this.localTmpDir);
                this.jcLock.notifyAll();
            }
            synchronized (this.jcLock) {
                Iterator<JobWrapper<?>> it = this.jobQueue.iterator();
                while (it.hasNext()) {
                    submit(it.next());
                }
                this.jobQueue.clear();
            }
            synchronized (this.shutdownLock) {
                while (this.running) {
                    try {
                        this.shutdownLock.wait();
                    } catch (InterruptedException e) {
                    }
                }
            }
        } finally {
            shutdown();
        }
    }

    public void submit(JobWrapper<?> jobWrapper) {
        if (this.jc != null) {
            jobWrapper.submit(this.executor);
            return;
        }
        synchronized (this.jcLock) {
            if (this.jc != null) {
                jobWrapper.submit(this.executor);
            } else {
                LOG.info("SparkContext not yet up, queueing job request.");
                this.jobQueue.add(jobWrapper);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public JobContextImpl jobContext() {
        return this.jc;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Serializer serializer() {
        return this.serializer;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <T> void jobFinished(String str, T t, Throwable th) {
        LOG.debug("Send job({}) result to Client.", str);
        broadcast(new BaseProtocol.JobResult(str, t, th));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void jobStarted(String str) {
        broadcast(new BaseProtocol.JobStarted(str));
    }

    public void handle(ChannelHandlerContext channelHandlerContext, BaseProtocol.CancelJob cancelJob) {
        JobWrapper<?> jobWrapper = this.activeJobs.get(cancelJob.id);
        if (jobWrapper == null || !jobWrapper.cancel()) {
            LOG.info("Requested to cancel an already finished job.");
        }
    }

    public void handle(ChannelHandlerContext channelHandlerContext, BaseProtocol.EndSession endSession) {
        LOG.debug("Shutting down due to EndSession request.");
        shutdown();
    }

    public void handle(ChannelHandlerContext channelHandlerContext, BaseProtocol.JobRequest<?> jobRequest) {
        LOG.info("Received job request {}", jobRequest.id);
        JobWrapper<?> jobWrapper = new JobWrapper<>(this, jobRequest.id, jobRequest.job);
        this.activeJobs.put(jobRequest.id, jobWrapper);
        submit(jobWrapper);
    }

    public void handle(ChannelHandlerContext channelHandlerContext, BaseProtocol.BypassJobRequest bypassJobRequest) throws Exception {
        LOG.info("Received bypass job request {}", bypassJobRequest.id);
        BypassJobWrapper bypassJobWrapper = new BypassJobWrapper(this, bypassJobRequest.id, bypassJobRequest.serializedJob);
        this.bypassJobs.add(bypassJobWrapper);
        this.activeJobs.put(bypassJobRequest.id, bypassJobWrapper);
        if (!bypassJobRequest.synchronous) {
            submit(bypassJobWrapper);
            return;
        }
        waitForJobContext();
        try {
            bypassJobWrapper.call();
        } catch (Throwable th) {
        }
    }

    public Object handle(ChannelHandlerContext channelHandlerContext, BaseProtocol.SyncJobRequest syncJobRequest) throws Exception {
        waitForJobContext();
        return syncJobRequest.job.call(this.jc);
    }

    public BypassJobStatus handle(ChannelHandlerContext channelHandlerContext, BaseProtocol.GetBypassJobStatus getBypassJobStatus) {
        Iterator<BypassJobWrapper> it = this.bypassJobs.iterator();
        while (it.hasNext()) {
            BypassJobWrapper next = it.next();
            if (next.jobId.equals(getBypassJobStatus.id)) {
                BypassJobStatus status = next.getStatus();
                switch (AnonymousClass4.$SwitchMap$com$cloudera$livy$JobHandle$State[status.state.ordinal()]) {
                    case 1:
                    case 2:
                    case 3:
                        it.remove();
                        break;
                }
                return status;
            }
        }
        throw new NoSuchElementException(getBypassJobStatus.id);
    }

    private void waitForJobContext() throws InterruptedException {
        synchronized (this.jcLock) {
            do {
                if (this.jc == null) {
                    this.jcLock.wait();
                }
            } while (this.running);
            throw new IllegalStateException("Remote context is shutting down.");
        }
    }
}
