package org.apache.drill.exec.work;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.drill.common.SelfCleaningRunnable;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.metrics.DrillMetrics;
import org.apache.drill.exec.proto.BitControl;
import org.apache.drill.exec.proto.BitData;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.proto.ExecProtos;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.DrillRpcFuture;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.control.Controller;
import org.apache.drill.exec.rpc.control.WorkEventBus;
import org.apache.drill.exec.rpc.data.DataConnectionCreator;
import org.apache.drill.exec.server.BootStrapContext;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.rest.auth.DrillUserPrincipal;
import org.apache.drill.exec.store.sys.PersistentStoreProvider;
import org.apache.drill.exec.work.batch.ControlMessageHandler;
import org.apache.drill.exec.work.filter.RuntimeFilterWritable;
import org.apache.drill.exec.work.foreman.Foreman;
import org.apache.drill.exec.work.fragment.FragmentExecutor;
import org.apache.drill.exec.work.fragment.FragmentManager;
import org.apache.drill.exec.work.user.UserWorker;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/drill/exec/work/WorkManager.class */
public class WorkManager implements AutoCloseable {
    private static final Logger logger = LoggerFactory.getLogger(WorkManager.class);
    private static final int EXIT_TIMEOUT_MS = 5000;
    private final BootStrapContext bContext;
    private DrillbitContext dContext;
    private final Executor executor;
    private Condition isEmptyCondition;
    private static final int STATUS_PERIOD_SECONDS = 5;
    private final ConcurrentMap<ExecProtos.FragmentHandle, FragmentExecutor> runningFragments = Maps.newConcurrentMap();
    private final ConcurrentMap<UserBitShared.QueryId, Foreman> queries = Maps.newConcurrentMap();
    private final Lock isEmptyLock = new ReentrantLock();
    private final WorkerBee bee = new WorkerBee();
    private final WorkEventBus workBus = new WorkEventBus();
    private final ControlMessageHandler controlMessageWorker = new ControlMessageHandler(this.bee);
    private final UserWorker userWorker = new UserWorker(this.bee);
    private final StatusThread statusThread = new StatusThread();

    /* loaded from: input_file:org/apache/drill/exec/work/WorkManager$StatusThread.class */
    private class StatusThread extends Thread {
        StatusThread() {
            setName("WorkManager.StatusThread");
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            Controller controller = WorkManager.this.dContext.getController();
            CoordinationProtos.DrillbitEndpoint endpoint = WorkManager.this.dContext.getEndpoint();
            while (true) {
                ArrayList newArrayList = Lists.newArrayList();
                for (FragmentExecutor fragmentExecutor : WorkManager.this.runningFragments.values()) {
                    BitControl.FragmentStatus status = fragmentExecutor.getStatus();
                    if (status != null) {
                        CoordinationProtos.DrillbitEndpoint foremanEndpoint = fragmentExecutor.getContext().getForemanEndpoint();
                        if (endpoint.equals(foremanEndpoint)) {
                            WorkManager.this.workBus.statusUpdate(status);
                        } else {
                            newArrayList.add(controller.getTunnel(foremanEndpoint).sendFragmentStatus(status));
                        }
                    }
                }
                Iterator it = newArrayList.iterator();
                while (it.hasNext()) {
                    try {
                        ((DrillRpcFuture) it.next()).checkedGet();
                    } catch (RpcException e) {
                        WorkManager.logger.info("Failure while sending intermediate fragment status to Foreman", e);
                    }
                }
                try {
                    Thread.sleep(5000L);
                } catch (InterruptedException e2) {
                    Thread.currentThread().interrupt();
                    return;
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/drill/exec/work/WorkManager$WorkerBee.class */
    public class WorkerBee {
        public WorkerBee() {
        }

        public void addNewForeman(Foreman foreman) {
            WorkManager.this.queries.put(foreman.getQueryId(), foreman);
            WorkManager.this.executor.execute(foreman);
        }

        public void addNewWork(Runnable runnable) {
            WorkManager.this.executor.execute(runnable);
        }

        public boolean cancelForeman(UserBitShared.QueryId queryId, DrillUserPrincipal drillUserPrincipal) {
            Preconditions.checkNotNull(queryId);
            Foreman foreman = (Foreman) WorkManager.this.queries.get(queryId);
            if (foreman == null) {
                return false;
            }
            String queryId2 = QueryIdHelper.getQueryId(queryId);
            if (drillUserPrincipal != null && !drillUserPrincipal.canManageQueryOf(foreman.getQueryContext().getQueryUserName())) {
                throw UserException.permissionError().message("Not authorized to cancel the query '%s'", new Object[]{queryId2}).build(WorkManager.logger);
            }
            WorkManager.this.executor.execute(() -> {
                Thread currentThread = Thread.currentThread();
                String name = currentThread.getName();
                try {
                    try {
                        currentThread.setName(queryId2 + ":foreman:cancel");
                        WorkManager.logger.debug("Canceling foreman. Thread: {}", name);
                        foreman.cancel();
                        currentThread.setName(name);
                    } catch (Throwable th) {
                        WorkManager.logger.warn("Exception while canceling foreman", th);
                        currentThread.setName(name);
                    }
                } catch (Throwable th2) {
                    currentThread.setName(name);
                    throw th2;
                }
            });
            return true;
        }

        public void retireForeman(Foreman foreman) {
            Preconditions.checkNotNull(foreman);
            UserBitShared.QueryId queryId = foreman.getQueryId();
            if (!WorkManager.this.queries.remove(queryId, foreman)) {
                WorkManager.logger.warn("Couldn't find retiring Foreman for query " + queryId);
            }
            WorkManager.this.indicateIfSafeToExit();
        }

        public Foreman getForemanForQueryId(UserBitShared.QueryId queryId) {
            return (Foreman) WorkManager.this.queries.get(queryId);
        }

        public DrillbitContext getContext() {
            return WorkManager.this.dContext;
        }

        public void addFragmentRunner(FragmentExecutor fragmentExecutor) {
            final ExecProtos.FragmentHandle handle = fragmentExecutor.getContext().getHandle();
            WorkManager.this.runningFragments.put(handle, fragmentExecutor);
            WorkManager.this.executor.execute(new SelfCleaningRunnable(fragmentExecutor) { // from class: org.apache.drill.exec.work.WorkManager.WorkerBee.1
                protected void cleanup() {
                    WorkManager.this.runningFragments.remove(handle);
                    WorkManager.this.indicateIfSafeToExit();
                }
            });
        }

        public void startFragmentPendingRemote(final FragmentManager fragmentManager) {
            final ExecProtos.FragmentHandle handle = fragmentManager.getHandle();
            FragmentExecutor runnable = fragmentManager.getRunnable();
            if (runnable == null) {
                return;
            }
            WorkManager.this.runningFragments.put(handle, runnable);
            WorkManager.this.executor.execute(new SelfCleaningRunnable(runnable) { // from class: org.apache.drill.exec.work.WorkManager.WorkerBee.2
                protected void cleanup() {
                    WorkManager.this.runningFragments.remove(handle);
                    if (!fragmentManager.isCancelled()) {
                        WorkManager.this.workBus.removeFragmentManager(handle, false);
                    }
                    WorkManager.this.indicateIfSafeToExit();
                }
            });
        }

        public FragmentExecutor getFragmentRunner(ExecProtos.FragmentHandle fragmentHandle) {
            return (FragmentExecutor) WorkManager.this.runningFragments.get(fragmentHandle);
        }

        public void receiveRuntimeFilter(RuntimeFilterWritable runtimeFilterWritable) {
            BitData.RuntimeFilterBDef runtimeFilterBDef = runtimeFilterWritable.getRuntimeFilterBDef();
            boolean toForeman = runtimeFilterBDef.getToForeman();
            UserBitShared.QueryId queryId = runtimeFilterBDef.getQueryId();
            String queryId2 = QueryIdHelper.getQueryId(queryId);
            runtimeFilterWritable.retainBuffers(1);
            if (toForeman) {
                Foreman foreman = (Foreman) WorkManager.this.queries.get(queryId);
                if (foreman != null) {
                    WorkManager.this.executor.execute(() -> {
                        Thread currentThread = Thread.currentThread();
                        String name = currentThread.getName();
                        currentThread.setName(queryId2 + ":foreman:routeRuntimeFilter");
                        try {
                            try {
                                foreman.getRuntimeFilterRouter().register(runtimeFilterWritable);
                                currentThread.setName(name);
                                runtimeFilterWritable.close();
                            } catch (Exception e) {
                                WorkManager.logger.warn("Exception while registering the RuntimeFilter", e);
                                currentThread.setName(name);
                                runtimeFilterWritable.close();
                            }
                        } catch (Throwable th) {
                            currentThread.setName(name);
                            runtimeFilterWritable.close();
                            throw th;
                        }
                    });
                    return;
                }
                return;
            }
            int majorFragmentId = runtimeFilterBDef.getMajorFragmentId();
            FragmentExecutor fragmentExecutor = (FragmentExecutor) WorkManager.this.runningFragments.get(ExecProtos.FragmentHandle.newBuilder().setMajorFragmentId(majorFragmentId).setMinorFragmentId(runtimeFilterBDef.getMinorFragmentId()).setQueryId(queryId).build());
            if (fragmentExecutor != null) {
                fragmentExecutor.getContext().addRuntimeFilter(runtimeFilterWritable);
            }
        }
    }

    public WorkManager(BootStrapContext bootStrapContext) {
        this.bContext = bootStrapContext;
        this.executor = bootStrapContext.getExecutor();
    }

    public void start(CoordinationProtos.DrillbitEndpoint drillbitEndpoint, Controller controller, DataConnectionCreator dataConnectionCreator, ClusterCoordinator clusterCoordinator, PersistentStoreProvider persistentStoreProvider, PersistentStoreProvider persistentStoreProvider2) {
        this.dContext = new DrillbitContext(drillbitEndpoint, this.bContext, clusterCoordinator, controller, dataConnectionCreator, this.workBus, persistentStoreProvider, persistentStoreProvider2);
        this.statusThread.start();
        ConcurrentMap<ExecProtos.FragmentHandle, FragmentExecutor> concurrentMap = this.runningFragments;
        Objects.requireNonNull(concurrentMap);
        DrillMetrics.register("drill.fragments.running", concurrentMap::size);
    }

    public Executor getExecutor() {
        return this.executor;
    }

    public WorkEventBus getWorkBus() {
        return this.workBus;
    }

    public ControlMessageHandler getControlMessageHandler() {
        return this.controlMessageWorker;
    }

    public UserWorker getUserWorker() {
        return this.userWorker;
    }

    public WorkerBee getBee() {
        return this.bee;
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.statusThread.interrupt();
        long size = this.runningFragments.size();
        if (size != 0) {
            logger.warn("Closing WorkManager but there are {} running fragments.", Long.valueOf(size));
            if (logger.isDebugEnabled()) {
                for (ExecProtos.FragmentHandle fragmentHandle : this.runningFragments.keySet()) {
                    logger.debug("Fragment still running: {} status: {}", QueryIdHelper.getQueryIdentifier(fragmentHandle), this.runningFragments.get(fragmentHandle).getStatus());
                }
            }
        }
        if (getContext() != null) {
            getContext().close();
        }
    }

    public DrillbitContext getContext() {
        return this.dContext;
    }

    public void waitToExit(boolean z) {
        this.isEmptyLock.lock();
        this.isEmptyCondition = this.isEmptyLock.newCondition();
        try {
            if (z) {
                long currentTimeMillis = System.currentTimeMillis() + 5000;
                while (!areQueriesAndFragmentsEmpty()) {
                    long currentTimeMillis2 = System.currentTimeMillis();
                    if (currentTimeMillis2 >= currentTimeMillis) {
                        break;
                    }
                    try {
                    } catch (InterruptedException e) {
                        logger.error("Interrupted while waiting to exit");
                    }
                    if (!this.isEmptyCondition.await(currentTimeMillis - currentTimeMillis2, TimeUnit.MILLISECONDS)) {
                        break;
                    }
                }
                if (!areQueriesAndFragmentsEmpty()) {
                    logger.warn("Timed out after {} millis. Shutting down before all fragments and foremen have completed.", Integer.valueOf(EXIT_TIMEOUT_MS));
                    Iterator<UserBitShared.QueryId> it = this.queries.keySet().iterator();
                    while (it.hasNext()) {
                        logger.warn("Query {} is still running.", QueryIdHelper.getQueryId(it.next()));
                    }
                    Iterator<ExecProtos.FragmentHandle> it2 = this.runningFragments.keySet().iterator();
                    while (it2.hasNext()) {
                        logger.warn("Fragment {} is still running.", QueryIdHelper.getQueryIdentifier(it2.next()));
                    }
                }
            } else {
                while (!areQueriesAndFragmentsEmpty()) {
                    this.isEmptyCondition.awaitUninterruptibly();
                }
            }
        } finally {
            this.isEmptyLock.unlock();
        }
    }

    private boolean areQueriesAndFragmentsEmpty() {
        return this.queries.isEmpty() && this.runningFragments.isEmpty();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void indicateIfSafeToExit() {
        this.isEmptyLock.lock();
        try {
            if (this.isEmptyCondition != null) {
                logger.info("Waiting for {} running queries before shutting down.", Integer.valueOf(this.queries.size()));
                logger.info("Waiting for {} running fragments before shutting down.", Integer.valueOf(this.runningFragments.size()));
                if (areQueriesAndFragmentsEmpty()) {
                    this.isEmptyCondition.signal();
                }
            }
        } finally {
            this.isEmptyLock.unlock();
        }
    }

    public synchronized Map<String, Integer> getRemainingQueries() {
        HashMap hashMap = new HashMap();
        hashMap.put("queriesCount", Integer.valueOf(this.queries.size()));
        hashMap.put("fragmentsCount", Integer.valueOf(this.runningFragments.size()));
        return hashMap;
    }
}
