package org.apache.hama.bsp;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSPJobClient;
import org.apache.hama.bsp.BSPMaster;
import org.apache.hama.bsp.BSPPeerImpl;
import org.apache.hama.bsp.JobInProgress;
import org.apache.hama.bsp.JobStatus;
import org.apache.hama.bsp.message.AbstractMessageManager;
import org.apache.hama.bsp.message.MessageManager;
import org.apache.hama.bsp.message.MessageManagerFactory;
import org.apache.hama.bsp.sync.BSPPeerSyncClient;
import org.apache.hama.bsp.sync.SyncClient;
import org.apache.hama.bsp.sync.SyncEvent;
import org.apache.hama.bsp.sync.SyncEventListener;
import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.bsp.sync.SyncServiceFactory;
import org.apache.hama.ipc.BSPPeerProtocol;
import org.apache.hama.ipc.JobSubmissionProtocol;

/* loaded from: input_file:org/apache/hama/bsp/LocalBSPRunner.class */
public class LocalBSPRunner implements JobSubmissionProtocol {
    private static final String IDENTIFIER = "localrunner";
    private volatile ThreadPoolExecutor threadPool;
    private String jobFile;
    private String jobName;
    private JobStatus currentJobStatus;
    private final Configuration conf;
    private FileSystem fs;
    private static String[] peerNames;
    private final int maxTasks;
    private static final Log LOG = LogFactory.getLog(LocalBSPRunner.class);
    private static String WORKING_DIR = "/tmp/hama-bsp/";
    private static volatile long superStepCount = 0;
    private final Counters globalCounters = new Counters();
    private final int totalTasks = 0;

    /* loaded from: input_file:org/apache/hama/bsp/LocalBSPRunner$BSPRunner.class */
    static class BSPRunner implements Callable<BSPPeerImpl> {
        private final HamaConfiguration conf;
        private final BSPJob job;
        private final int id;
        private final BSP bsp;
        private final BSPJobClient.RawSplit[] splits;
        private BSPPeerImpl peer;

        public BSPRunner(HamaConfiguration hamaConfiguration, BSPJob bSPJob, int i, BSPJobClient.RawSplit[] rawSplitArr) {
            this.conf = hamaConfiguration;
            this.job = bSPJob;
            this.id = i;
            this.splits = rawSplitArr;
            hamaConfiguration.setInt(Constants.PEER_PORT, i);
            hamaConfiguration.set(Constants.PEER_HOST, "local");
            this.bsp = (BSP) ReflectionUtils.newInstance(bSPJob.getConfiguration().getClass("bsp.work.class", BSP.class), bSPJob.getConfiguration());
        }

        public void run() throws Exception {
            String str = null;
            BytesWritable bytesWritable = null;
            if (this.splits != null && this.splits.length > this.id) {
                str = this.splits[this.id].getClassName();
                bytesWritable = this.splits[this.id].getBytes();
            }
            this.peer = new BSPPeerImpl(this.job, this.conf, new TaskAttemptID(new TaskID(this.job.getJobID(), this.id), this.id), new LocalUmbilical(), this.id, str, bytesWritable, new Counters());
            Exception exc = null;
            try {
                try {
                    try {
                        this.bsp.setup(this.peer);
                        this.bsp.bsp(this.peer);
                        try {
                            this.bsp.cleanup(this.peer);
                            try {
                                try {
                                    this.peer.clear();
                                    this.peer.close();
                                    if (0 != 0) {
                                        throw null;
                                    }
                                } catch (Exception e) {
                                    LocalBSPRunner.LOG.error("Exception closing BSP peer,", e);
                                    if (0 == 0) {
                                        exc = e;
                                    }
                                    if (exc != null) {
                                        throw exc;
                                    }
                                }
                            } catch (Throwable th) {
                                if (0 == 0) {
                                    throw th;
                                }
                                throw null;
                            }
                        } catch (Exception e2) {
                            LocalBSPRunner.LOG.error("Error cleaning up after bsp execution.", e2);
                            if (0 == 0) {
                                exc = e2;
                            }
                            try {
                                try {
                                    this.peer.clear();
                                    this.peer.close();
                                    if (exc != null) {
                                        throw exc;
                                    }
                                } catch (Exception e3) {
                                    LocalBSPRunner.LOG.error("Exception closing BSP peer,", e3);
                                    if (exc == null) {
                                        exc = e3;
                                    }
                                    if (exc != null) {
                                        throw exc;
                                    }
                                }
                            } catch (Throwable th2) {
                                if (exc == null) {
                                    throw th2;
                                }
                                throw exc;
                            }
                        }
                    } catch (Throwable th3) {
                        try {
                            try {
                                this.peer.clear();
                                this.peer.close();
                                if (exc != null) {
                                    throw exc;
                                }
                            } catch (Exception e4) {
                                LocalBSPRunner.LOG.error("Exception closing BSP peer,", e4);
                                if (exc == null) {
                                    exc = e4;
                                }
                                if (exc != null) {
                                    throw exc;
                                }
                                throw th3;
                            }
                            throw th3;
                        } catch (Throwable th4) {
                            if (exc == null) {
                                throw th4;
                            }
                            throw exc;
                        }
                    }
                } catch (Exception e5) {
                    try {
                        LocalBSPRunner.LOG.error("Exception during BSP execution!", e5);
                        exc = e5;
                        try {
                            try {
                                this.bsp.cleanup(this.peer);
                                try {
                                    this.peer.clear();
                                    this.peer.close();
                                    if (exc != null) {
                                        throw exc;
                                    }
                                } catch (Exception e6) {
                                    LocalBSPRunner.LOG.error("Exception closing BSP peer,", e6);
                                    if (exc == null) {
                                        exc = e6;
                                    }
                                    if (exc != null) {
                                        throw exc;
                                    }
                                }
                            } catch (Throwable th5) {
                                if (exc == null) {
                                    throw th5;
                                }
                                throw exc;
                            }
                        } catch (Exception e7) {
                            LocalBSPRunner.LOG.error("Error cleaning up after bsp execution.", e7);
                            if (exc == null) {
                                exc = e7;
                            }
                            try {
                                try {
                                    this.peer.clear();
                                    this.peer.close();
                                    if (exc != null) {
                                        throw exc;
                                    }
                                } catch (Exception e8) {
                                    LocalBSPRunner.LOG.error("Exception closing BSP peer,", e8);
                                    if (exc == null) {
                                        exc = e8;
                                    }
                                    if (exc != null) {
                                        throw exc;
                                    }
                                }
                            } catch (Throwable th6) {
                                if (exc == null) {
                                    throw th6;
                                }
                                throw exc;
                            }
                        }
                    } catch (Throwable th7) {
                        try {
                            try {
                                this.peer.clear();
                                this.peer.close();
                                if (exc != null) {
                                    throw exc;
                                }
                            } catch (Exception e9) {
                                LocalBSPRunner.LOG.error("Exception closing BSP peer,", e9);
                                if (exc == null) {
                                    exc = e9;
                                }
                                if (exc != null) {
                                    throw exc;
                                }
                                throw th7;
                            }
                            throw th7;
                        } catch (Throwable th8) {
                            if (exc == null) {
                                throw th8;
                            }
                            throw exc;
                        }
                    }
                }
            } catch (Throwable th9) {
                try {
                    try {
                        this.bsp.cleanup(this.peer);
                    } catch (Exception e10) {
                        LocalBSPRunner.LOG.error("Error cleaning up after bsp execution.", e10);
                        if (exc == null) {
                            exc = e10;
                        }
                        try {
                            try {
                                this.peer.clear();
                                this.peer.close();
                                if (exc != null) {
                                    throw exc;
                                }
                            } catch (Throwable th10) {
                                if (exc == null) {
                                    throw th10;
                                }
                                throw exc;
                            }
                        } catch (Exception e11) {
                            LocalBSPRunner.LOG.error("Exception closing BSP peer,", e11);
                            if (exc == null) {
                                exc = e11;
                            }
                            if (exc != null) {
                                throw exc;
                            }
                        }
                        throw th9;
                    }
                    try {
                        try {
                            this.peer.clear();
                            this.peer.close();
                            if (exc != null) {
                                throw exc;
                            }
                        } catch (Exception e12) {
                            LocalBSPRunner.LOG.error("Exception closing BSP peer,", e12);
                            if (exc == null) {
                                exc = e12;
                            }
                            if (exc != null) {
                                throw exc;
                            }
                            throw th9;
                        }
                        throw th9;
                    } catch (Throwable th11) {
                        if (exc == null) {
                            throw th11;
                        }
                        throw exc;
                    }
                } catch (Throwable th12) {
                    try {
                        try {
                            this.peer.clear();
                            this.peer.close();
                            if (exc != null) {
                                throw exc;
                            }
                        } catch (Exception e13) {
                            LocalBSPRunner.LOG.error("Exception closing BSP peer,", e13);
                            if (exc == null) {
                                exc = e13;
                            }
                            if (exc != null) {
                                throw exc;
                            }
                        }
                        throw th12;
                    } catch (Throwable th13) {
                        if (exc == null) {
                            throw th13;
                        }
                        throw exc;
                    }
                }
            }
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public BSPPeerImpl call() throws Exception {
            run();
            return this.peer;
        }
    }

    /* loaded from: input_file:org/apache/hama/bsp/LocalBSPRunner$LocalMessageManager.class */
    public static class LocalMessageManager<M extends Writable> extends AbstractMessageManager<M> {
        private static final ConcurrentHashMap<InetSocketAddress, LocalMessageManager> MANAGER_MAP = new ConcurrentHashMap<>();
        private InetSocketAddress selfAddress;

        @Override // org.apache.hama.bsp.message.AbstractMessageManager, org.apache.hama.bsp.message.MessageManager
        public void init(TaskAttemptID taskAttemptID, BSPPeer<?, ?, ?, ?, M> bSPPeer, HamaConfiguration hamaConfiguration, InetSocketAddress inetSocketAddress) {
            super.init(taskAttemptID, bSPPeer, hamaConfiguration, inetSocketAddress);
            MANAGER_MAP.put(inetSocketAddress, this);
            this.selfAddress = inetSocketAddress;
        }

        @Override // org.apache.hama.bsp.message.MessageManager
        public void transfer(InetSocketAddress inetSocketAddress, BSPMessageBundle<M> bSPMessageBundle) throws IOException {
            MANAGER_MAP.get(inetSocketAddress).localQueueForNextIteration.addBundle(bSPMessageBundle);
            this.peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED, bSPMessageBundle.size());
        }

        @Override // org.apache.hama.bsp.message.MessageManager
        public InetSocketAddress getListenerAddress() {
            return this.selfAddress;
        }

        @Override // org.apache.hama.bsp.message.MessageManager
        public void transfer(InetSocketAddress inetSocketAddress, M m) throws IOException {
            MANAGER_MAP.get(inetSocketAddress).localQueueForNextIteration.add(m);
            this.peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED, 1L);
        }
    }

    /* loaded from: input_file:org/apache/hama/bsp/LocalBSPRunner$LocalSyncClient.class */
    public static class LocalSyncClient extends BSPPeerSyncClient {
        private static CyclicBarrier barrier;
        private int tasks;

        @Override // org.apache.hama.bsp.sync.BSPPeerSyncClient, org.apache.hama.bsp.sync.PeerSyncClient
        public void init(Configuration configuration, BSPJobID bSPJobID, TaskAttemptID taskAttemptID) throws Exception {
            this.tasks = configuration.getInt(Constants.JOB_PEERS_COUNT, 1);
            synchronized (LocalSyncClient.class) {
                if (barrier == null) {
                    barrier = new CyclicBarrier(this.tasks);
                    LocalBSPRunner.LOG.info("Setting up a new barrier for " + this.tasks + " tasks!");
                }
            }
        }

        @Override // org.apache.hama.bsp.sync.BSPPeerSyncClient, org.apache.hama.bsp.sync.PeerSyncClient
        public void enterBarrier(BSPJobID bSPJobID, TaskAttemptID taskAttemptID, long j) throws SyncException {
            try {
                barrier.await();
            } catch (Exception e) {
                throw new SyncException(e.toString());
            }
        }

        @Override // org.apache.hama.bsp.sync.BSPPeerSyncClient, org.apache.hama.bsp.sync.PeerSyncClient
        public void leaveBarrier(BSPJobID bSPJobID, TaskAttemptID taskAttemptID, long j) throws SyncException {
            try {
                barrier.await();
                if (j > LocalBSPRunner.superStepCount) {
                    long unused = LocalBSPRunner.superStepCount = j;
                }
            } catch (Exception e) {
                throw new SyncException(e.toString());
            }
        }

        @Override // org.apache.hama.bsp.sync.BSPPeerSyncClient, org.apache.hama.bsp.sync.PeerSyncClient
        public void register(BSPJobID bSPJobID, TaskAttemptID taskAttemptID, String str, long j) {
        }

        @Override // org.apache.hama.bsp.sync.BSPPeerSyncClient, org.apache.hama.bsp.sync.PeerSyncClient
        public void deregisterFromBarrier(BSPJobID bSPJobID, TaskAttemptID taskAttemptID, String str, long j) {
        }

        @Override // org.apache.hama.bsp.sync.BSPPeerSyncClient, org.apache.hama.bsp.sync.PeerSyncClient
        public void stopServer() {
        }

        @Override // org.apache.hama.bsp.sync.BSPPeerSyncClient, org.apache.hama.bsp.sync.SyncClient
        public void close() {
            barrier = null;
        }

        @Override // org.apache.hama.bsp.sync.SyncClient
        public String constructKey(BSPJobID bSPJobID, String... strArr) {
            return null;
        }

        @Override // org.apache.hama.bsp.sync.SyncClient
        public boolean storeInformation(String str, Writable writable, boolean z, SyncEventListener syncEventListener) {
            return false;
        }

        @Override // org.apache.hama.bsp.sync.SyncClient
        public boolean getInformation(String str, Writable writable) {
            return false;
        }

        @Override // org.apache.hama.bsp.sync.SyncClient
        public boolean addKey(String str, boolean z, SyncEventListener syncEventListener) {
            return false;
        }

        @Override // org.apache.hama.bsp.sync.SyncClient
        public boolean hasKey(String str) {
            return false;
        }

        @Override // org.apache.hama.bsp.sync.SyncClient
        public String[] getChildKeySet(String str, SyncEventListener syncEventListener) {
            return null;
        }

        @Override // org.apache.hama.bsp.sync.SyncClient
        public boolean registerListener(String str, SyncEvent syncEvent, SyncEventListener syncEventListener) {
            return false;
        }

        @Override // org.apache.hama.bsp.sync.SyncClient
        public boolean remove(String str, SyncEventListener syncEventListener) {
            return false;
        }

        @Override // org.apache.hama.bsp.sync.BSPPeerSyncClient, org.apache.hama.bsp.sync.PeerSyncClient
        public String[] getAllPeerNames(BSPJobID bSPJobID) {
            return LocalBSPRunner.peerNames;
        }
    }

    /* loaded from: input_file:org/apache/hama/bsp/LocalBSPRunner$LocalUmbilical.class */
    public static class LocalUmbilical implements BSPPeerProtocol {
        @Override // org.apache.hama.ipc.VersionedProtocol
        public long getProtocolVersion(String str, long j) throws IOException {
            return 0L;
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
        }

        @Override // org.apache.hama.ipc.BSPPeerProtocol
        public Task getTask(TaskAttemptID taskAttemptID) throws IOException {
            return null;
        }

        @Override // org.apache.hama.ipc.BSPPeerProtocol
        public boolean ping(TaskAttemptID taskAttemptID) throws IOException {
            return false;
        }

        @Override // org.apache.hama.ipc.BSPPeerProtocol
        public void done(TaskAttemptID taskAttemptID) throws IOException {
        }

        @Override // org.apache.hama.ipc.BSPPeerProtocol
        public void fsError(TaskAttemptID taskAttemptID, String str) throws IOException {
        }

        @Override // org.apache.hama.ipc.BSPPeerProtocol
        public void fatalError(TaskAttemptID taskAttemptID, String str) throws IOException {
        }

        @Override // org.apache.hama.ipc.BSPPeerProtocol
        public boolean statusUpdate(TaskAttemptID taskAttemptID, TaskStatus taskStatus) throws IOException, InterruptedException {
            return true;
        }

        @Override // org.apache.hama.ipc.BSPPeerProtocol
        public int getAssignedPortNum(TaskAttemptID taskAttemptID) {
            return 0;
        }
    }

    /* loaded from: input_file:org/apache/hama/bsp/LocalBSPRunner$ThreadObserver.class */
    class ThreadObserver implements Runnable {
        private final ExecutorCompletionService<BSPPeerImpl> completionService;
        private final int numTasks;

        public ThreadObserver(int i, ExecutorCompletionService<BSPPeerImpl> executorCompletionService) {
            this.numTasks = i;
            this.completionService = executorCompletionService;
        }

        @Override // java.lang.Runnable
        public void run() {
            boolean z = true;
            for (int i = 0; i < this.numTasks; i++) {
                try {
                    Future<BSPPeerImpl> take = this.completionService.take();
                    if (take != null) {
                        LocalBSPRunner.this.currentJobStatus.getCounter().incrAllCounters(take.get().getCounters());
                    }
                } catch (Exception e) {
                    LocalBSPRunner.LOG.error("Exception during BSP execution!", e);
                    z = false;
                }
            }
            if (z) {
                LocalBSPRunner.this.currentJobStatus.setState(JobStatus.State.SUCCEEDED);
                LocalBSPRunner.this.currentJobStatus.setRunState(2);
            } else {
                LocalBSPRunner.this.currentJobStatus.setState(JobStatus.State.FAILED);
                LocalBSPRunner.this.currentJobStatus.setRunState(3);
            }
            LocalBSPRunner.this.threadPool.shutdownNow();
        }
    }

    public LocalBSPRunner(Configuration configuration) throws IOException {
        this.conf = configuration;
        this.maxTasks = configuration.getInt("bsp.local.tasks.maximum", 20);
        String str = configuration.get("bsp.local.dir");
        if (str == null || str.isEmpty()) {
            return;
        }
        WORKING_DIR = str;
    }

    @Override // org.apache.hama.ipc.VersionedProtocol
    public long getProtocolVersion(String str, long j) throws IOException {
        return 3L;
    }

    @Override // org.apache.hama.ipc.JobSubmissionProtocol
    public BSPJobID getNewJobId() throws IOException {
        return new BSPJobID(IDENTIFIER, 1);
    }

    @Override // org.apache.hama.ipc.JobSubmissionProtocol
    public JobStatus submitJob(BSPJobID bSPJobID, String str) throws IOException {
        this.jobFile = str;
        if (this.fs == null) {
            this.fs = FileSystem.get(this.conf);
        }
        this.conf.addResource(this.fs.open(new Path(str)));
        this.conf.setClass(MessageManagerFactory.MESSAGE_MANAGER_CLASS, LocalMessageManager.class, MessageManager.class);
        this.conf.setClass(SyncServiceFactory.SYNC_CLIENT_CLASS, LocalSyncClient.class, SyncClient.class);
        BSPJob bSPJob = new BSPJob(new HamaConfiguration(this.conf), bSPJobID);
        this.currentJobStatus = new JobStatus(bSPJobID, System.getProperty("user.name"), 0L, 1, this.globalCounters);
        int numBspTask = bSPJob.getNumBspTask();
        String str2 = this.conf.get("bsp.job.split.file");
        BSPJobClient.RawSplit[] rawSplitArr = null;
        if (str2 != null) {
            FSDataInputStream open = this.fs.open(new Path(str2));
            try {
                rawSplitArr = BSPJobClient.readSplitFile(open);
                open.close();
            } catch (Throwable th) {
                open.close();
                throw th;
            }
        }
        this.threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(numBspTask);
        ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(this.threadPool);
        peerNames = new String[numBspTask];
        for (int i = 0; i < numBspTask; i++) {
            peerNames[i] = "local:" + i;
            executorCompletionService.submit(new BSPRunner(new HamaConfiguration(this.conf), bSPJob, i, rawSplitArr));
            this.globalCounters.incrCounter(JobInProgress.JobCounter.LAUNCHED_TASKS, 1L);
        }
        new Thread(new ThreadObserver(numBspTask, executorCompletionService)).start();
        return this.currentJobStatus;
    }

    @Override // org.apache.hama.ipc.JobSubmissionProtocol, org.apache.hama.bsp.GroomServerManager
    public ClusterStatus getClusterStatus(boolean z) throws IOException {
        return new ClusterStatus(this.maxTasks, 0, this.maxTasks, BSPMaster.State.RUNNING);
    }

    @Override // org.apache.hama.ipc.JobSubmissionProtocol
    public JobProfile getJobProfile(BSPJobID bSPJobID) throws IOException {
        return new JobProfile(System.getProperty("user.name"), bSPJobID, this.jobFile, this.jobName);
    }

    @Override // org.apache.hama.ipc.JobSubmissionProtocol
    public JobStatus getJobStatus(BSPJobID bSPJobID) throws IOException {
        this.currentJobStatus.setSuperstepCount(superStepCount);
        this.currentJobStatus.setProgress(superStepCount);
        return this.currentJobStatus;
    }

    @Override // org.apache.hama.ipc.JobSubmissionProtocol
    public String getFilesystemName() throws IOException {
        return this.fs.getUri().toString();
    }

    @Override // org.apache.hama.ipc.JobSubmissionProtocol
    public JobStatus[] jobsToComplete() throws IOException {
        return null;
    }

    @Override // org.apache.hama.ipc.JobSubmissionProtocol
    public JobStatus[] getAllJobs() throws IOException {
        return null;
    }

    @Override // org.apache.hama.ipc.JobSubmissionProtocol, org.apache.hama.ipc.MasterProtocol
    public String getSystemDir() {
        return WORKING_DIR;
    }

    @Override // org.apache.hama.ipc.JobSubmissionProtocol
    public void killJob(BSPJobID bSPJobID) throws IOException {
    }

    @Override // org.apache.hama.ipc.JobSubmissionProtocol
    public boolean killTask(TaskAttemptID taskAttemptID, boolean z) throws IOException {
        return false;
    }

    @Override // org.apache.hama.ipc.JobSubmissionProtocol
    public TaskCompletionEvent[] getTaskCompletionEvents(BSPJobID bSPJobID, int i, int i2) {
        return TaskCompletionEvent.EMPTY_ARRAY;
    }
}
