package org.apache.hama.bsp.ft;

import java.io.EOFException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.Constants;
import org.apache.hama.bsp.BSPJob;
import org.apache.hama.bsp.BSPJobID;
import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.GroomServerAction;
import org.apache.hama.bsp.GroomServerStatus;
import org.apache.hama.bsp.JobInProgress;
import org.apache.hama.bsp.RecoverTaskAction;
import org.apache.hama.bsp.Task;
import org.apache.hama.bsp.TaskAttemptID;
import org.apache.hama.bsp.TaskID;
import org.apache.hama.bsp.TaskInProgress;
import org.apache.hama.bsp.TaskStatus;
import org.apache.hama.bsp.message.MessageEventListener;
import org.apache.hama.bsp.message.MessageManager;
import org.apache.hama.bsp.sync.MasterSyncClient;
import org.apache.hama.bsp.sync.PeerSyncClient;
import org.apache.hama.bsp.taskallocation.BSPResource;
import org.apache.hama.bsp.taskallocation.TaskAllocationStrategy;
import org.apache.hama.util.ZKUtil;

/* loaded from: input_file:org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.class */
public class AsyncRcvdMsgCheckpointImpl<M extends Writable> implements BSPFaultTolerantService<M> {
    private static final Log LOG = LogFactory.getLog(AsyncRcvdMsgCheckpointImpl.class);

    /* loaded from: input_file:org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl$CheckpointMasterService.class */
    private static class CheckpointMasterService implements FaultTolerantMasterService {
        private Configuration conf;
        private TaskInProgress[] tasks;
        private BSPJobID jobId;
        private int maxTaskAttempts;
        private int currentAttemptId;
        private MasterSyncClient masterSyncClient;
        private TaskAllocationStrategy allocationStrategy;

        private CheckpointMasterService() {
        }

        public void initialize(BSPJobID bSPJobID, int i, TaskInProgress[] taskInProgressArr, Configuration configuration, MasterSyncClient masterSyncClient, TaskAllocationStrategy taskAllocationStrategy) {
            this.tasks = taskInProgressArr;
            this.jobId = bSPJobID;
            this.conf = configuration;
            this.maxTaskAttempts = i;
            this.currentAttemptId = 0;
            this.masterSyncClient = masterSyncClient;
            this.allocationStrategy = taskAllocationStrategy;
        }

        @Override // org.apache.hama.bsp.ft.FaultTolerantMasterService
        public boolean isRecoveryPossible(TaskInProgress taskInProgress) {
            return this.currentAttemptId < this.maxTaskAttempts;
        }

        @Override // org.apache.hama.bsp.ft.FaultTolerantMasterService
        public boolean isAlreadyRecovered(TaskInProgress taskInProgress) {
            return this.currentAttemptId < taskInProgress.getCurrentTaskAttemptId().getId();
        }

        @Override // org.apache.hama.bsp.ft.FaultTolerantMasterService
        public void recoverTasks(JobInProgress jobInProgress, Map<String, GroomServerStatus> map, TaskInProgress[] taskInProgressArr, TaskInProgress[] taskInProgressArr2, Map<GroomServerStatus, Integer> map2, Map<GroomServerStatus, List<GroomServerAction>> map3) throws IOException {
            HashMap hashMap = new HashMap(2 * taskInProgressArr.length);
            for (int i = 0; i < taskInProgressArr.length; i++) {
                hashMap.put(taskInProgressArr[i].getTaskId(), taskInProgressArr[i]);
            }
            long j = Long.MAX_VALUE;
            String[] childKeySet = this.masterSyncClient.getChildKeySet(this.masterSyncClient.constructKey(this.jobId, "checkpoint"), null);
            if (AsyncRcvdMsgCheckpointImpl.LOG.isDebugEnabled()) {
                StringBuffer stringBuffer = new StringBuffer(25 * childKeySet.length);
                stringBuffer.append("got child key set").append(childKeySet.length).append(ZKUtil.ZK_SEPARATOR).append(this.tasks.length).append(" ");
                for (String str : childKeySet) {
                    stringBuffer.append(str).append(",");
                }
                AsyncRcvdMsgCheckpointImpl.LOG.debug(stringBuffer);
            }
            if (childKeySet.length == this.tasks.length) {
                int i2 = 0;
                while (true) {
                    if (i2 >= childKeySet.length) {
                        break;
                    }
                    Writable arrayWritable = new ArrayWritable(LongWritable.class);
                    if (!this.masterSyncClient.getInformation(this.masterSyncClient.constructKey(this.jobId, "checkpoint", childKeySet[i2]), arrayWritable)) {
                        j = -1;
                        break;
                    }
                    LongWritable longWritable = arrayWritable.get()[0];
                    if (longWritable != null && longWritable.get() < j) {
                        j = longWritable.get();
                        if (AsyncRcvdMsgCheckpointImpl.LOG.isDebugEnabled()) {
                            AsyncRcvdMsgCheckpointImpl.LOG.debug("Got superstep number " + j + " from " + childKeySet[i2]);
                        }
                    }
                    i2++;
                }
                clearClientForSuperstep(j);
                restartJob(j, map, hashMap, taskInProgressArr2, map2, map3);
            } else {
                restartJob(-1L, map, hashMap, taskInProgressArr2, map2, map3);
            }
            this.currentAttemptId++;
        }

        private void clearClientForSuperstep(long j) {
            this.masterSyncClient.remove(this.masterSyncClient.constructKey(this.jobId, "sync"), null);
        }

        private void populateAction(Task task, long j, GroomServerStatus groomServerStatus, Map<GroomServerStatus, List<GroomServerAction>> map) {
            List<GroomServerAction> list = map.get(groomServerStatus);
            if (!map.containsKey(groomServerStatus)) {
                list = new ArrayList();
                map.put(groomServerStatus, list);
            }
            list.add(new RecoverTaskAction(task, j));
        }

        private void restartTask(TaskInProgress taskInProgress, long j, Map<String, GroomServerStatus> map, Map<GroomServerStatus, List<GroomServerAction>> map2) {
            GroomServerStatus groomServerStatus = taskInProgress.getGroomServerStatus();
            populateAction(taskInProgress.constructTask(groomServerStatus), j, groomServerStatus, map2);
        }

        private void restartJob(long j, Map<String, GroomServerStatus> map, Map<TaskID, TaskInProgress> map2, TaskInProgress[] taskInProgressArr, Map<GroomServerStatus, Integer> map3, Map<GroomServerStatus, List<GroomServerAction>> map4) throws IOException {
            String[] strArr;
            String str = this.conf.get("bsp.checkpoint.prefix_path", "/checkpoint/");
            if (j < 0) {
                for (int i = 0; i < taskInProgressArr.length; i++) {
                    if (map2.containsKey(taskInProgressArr[i].getTaskId())) {
                        this.allocationStrategy.getGroomToAllocate(map, this.allocationStrategy.selectGrooms(map, map3, new BSPResource[0], taskInProgressArr[i]), map3, new BSPResource[0], taskInProgressArr[i]);
                    } else {
                        restartTask(taskInProgressArr[i], j, map, map4);
                    }
                }
                return;
            }
            FileSystem fileSystem = FileSystem.get(this.conf);
            for (int i2 = 0; i2 < taskInProgressArr.length; i2++) {
                if (map2.containsKey(taskInProgressArr[i2].getTaskId())) {
                    Integer num = map3.get(taskInProgressArr[i2].getGroomServerStatus());
                    if (num != null) {
                        map3.put(taskInProgressArr[i2].getGroomServerStatus(), Integer.valueOf(num.intValue() - 1));
                    }
                    StringBuffer stringBuffer = new StringBuffer(str);
                    stringBuffer.append(this.jobId.toString());
                    stringBuffer.append(ZKUtil.ZK_SEPARATOR).append(j).append(ZKUtil.ZK_SEPARATOR).append(taskInProgressArr[i2].getTaskId().getId());
                    Path path = new Path(stringBuffer.toString());
                    if (fileSystem.exists(path)) {
                        FileStatus fileStatus = fileSystem.getFileStatus(path);
                        strArr = fileSystem.getFileBlockLocations(fileStatus, 0L, fileStatus.getLen())[0].getHosts();
                    } else {
                        strArr = new String[map.keySet().size()];
                        map.keySet().toArray(strArr);
                    }
                    GroomServerStatus groomToAllocate = this.allocationStrategy.getGroomToAllocate(map, strArr, map3, new BSPResource[0], taskInProgressArr[i2]);
                    populateAction(taskInProgressArr[i2].constructTask(groomToAllocate), j, groomToAllocate, map4);
                } else {
                    restartTask(taskInProgressArr[i2], j, map, map4);
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl$CheckpointPeerService.class */
    public static class CheckpointPeerService<M extends Writable> implements FaultTolerantPeerService<M>, MessageEventListener<M> {
        private BSPJob job;
        private BSPPeer peer;
        private PeerSyncClient syncClient;
        private long superstep;
        private Configuration conf;
        private MessageManager<M> messenger;
        private FileSystem fs;
        private int checkPointInterval;
        private volatile long lastCheckPointStep;
        private volatile boolean checkpointState;
        private volatile FSDataOutputStream checkpointStream;
        private volatile long checkpointMessageCount;

        public void initialize(BSPJob bSPJob, BSPPeer bSPPeer, PeerSyncClient peerSyncClient, InetSocketAddress inetSocketAddress, TaskAttemptID taskAttemptID, long j, Configuration configuration, MessageManager<M> messageManager) throws IOException {
            this.job = bSPJob;
            this.peer = bSPPeer;
            this.syncClient = peerSyncClient;
            this.superstep = j;
            this.conf = configuration;
            this.messenger = messageManager;
            this.fs = FileSystem.get(configuration);
            this.checkPointInterval = configuration.getInt(Constants.CHECKPOINT_INTERVAL, 1);
            this.checkPointInterval = configuration.getInt(Constants.CHECKPOINT_INTERVAL, 1);
            this.checkpointState = configuration.getBoolean(Constants.CHECKPOINT_ENABLED, false);
            if (j > 0) {
                this.lastCheckPointStep = this.superstep;
            } else {
                this.lastCheckPointStep = 1L;
            }
            this.checkpointMessageCount = 0L;
        }

        private String checkpointPath(long j) {
            String str = this.conf.get("bsp.checkpoint.prefix_path", "checkpoint/") + this.job.getJobID().toString() + ZKUtil.ZK_SEPARATOR + j + ZKUtil.ZK_SEPARATOR + this.peer.getPeerIndex();
            if (AsyncRcvdMsgCheckpointImpl.LOG.isDebugEnabled()) {
                AsyncRcvdMsgCheckpointImpl.LOG.debug("Received Messages are to be saved to " + str);
            }
            return str;
        }

        @Override // org.apache.hama.bsp.ft.FaultTolerantPeerService
        public TaskStatus.State onPeerInitialized(TaskStatus.State state) throws Exception {
            if (this.superstep >= 0 && state.equals(TaskStatus.State.RECOVERING)) {
                Writable arrayWritable = new ArrayWritable(LongWritable.class);
                if (!this.syncClient.getInformation(this.syncClient.constructKey(this.job.getJobID(), "checkpoint", String.valueOf(this.peer.getPeerIndex())), arrayWritable)) {
                    throw new IOException("No data found to restore peer state.");
                }
                LongWritable[] longWritableArr = arrayWritable.get();
                long j = longWritableArr[0].get();
                long j2 = longWritableArr[1].get();
                if (AsyncRcvdMsgCheckpointImpl.LOG.isDebugEnabled()) {
                    AsyncRcvdMsgCheckpointImpl.LOG.debug("Got sstep =" + j + " numMessages = " + j2 + " this.superstep = " + this.superstep);
                }
                if (j2 > 0) {
                    FSDataInputStream open = this.fs.open(new Path(checkpointPath(j)));
                    BSPMessageBundle<? extends Writable> bSPMessageBundle = new BSPMessageBundle<>();
                    for (int i = 0; i < j2; i++) {
                        try {
                            try {
                                Writable writable = (Writable) ReflectionUtils.newInstance(Class.forName(open.readUTF()), this.conf);
                                writable.readFields(open);
                                bSPMessageBundle.addMessage(writable);
                            } catch (EOFException e) {
                                AsyncRcvdMsgCheckpointImpl.LOG.error("Error recovering from checkpointing", e);
                                throw new IOException(e);
                            }
                        } catch (Throwable th) {
                            this.fs.close();
                            throw th;
                        }
                    }
                    this.messenger.loopBackMessages(bSPMessageBundle);
                    this.fs.close();
                }
            }
            this.messenger.registerListener(this);
            return TaskStatus.State.RUNNING;
        }

        public final boolean isReadyToCheckpoint() {
            this.checkPointInterval = this.conf.getInt(Constants.CHECKPOINT_INTERVAL, 1);
            AsyncRcvdMsgCheckpointImpl.LOG.info(new StringBuffer(1000).append("Enabled = ").append(this.conf.getBoolean(Constants.CHECKPOINT_ENABLED, false)).append(" checkPointInterval = ").append(this.checkPointInterval).append(" lastCheckPointStep = ").append(this.lastCheckPointStep).append(" getSuperstepCount() = ").append(this.peer.getSuperstepCount()).toString());
            if (AsyncRcvdMsgCheckpointImpl.LOG.isDebugEnabled()) {
                AsyncRcvdMsgCheckpointImpl.LOG.debug(new StringBuffer(1000).append("Enabled = ").append(this.conf.getBoolean(Constants.CHECKPOINT_ENABLED, false)).append(" checkPointInterval = ").append(this.checkPointInterval).append(" lastCheckPointStep = ").append(this.lastCheckPointStep).append(" getSuperstepCount() = ").append(this.peer.getSuperstepCount()).toString());
            }
            return this.conf.getBoolean(Constants.CHECKPOINT_ENABLED, false) && this.checkPointInterval != 0 && ((int) ((this.peer.getSuperstepCount() + 1) - this.lastCheckPointStep)) >= this.checkPointInterval;
        }

        @Override // org.apache.hama.bsp.ft.FaultTolerantPeerService
        public void beforeBarrier() throws Exception {
        }

        @Override // org.apache.hama.bsp.ft.FaultTolerantPeerService
        public void duringBarrier() throws Exception {
        }

        @Override // org.apache.hama.bsp.ft.FaultTolerantPeerService
        public void afterBarrier() throws Exception {
            synchronized (this) {
                if (this.checkpointState) {
                    if (this.checkpointStream != null) {
                        this.checkpointStream.close();
                        this.checkpointStream = null;
                    }
                    this.lastCheckPointStep = this.peer.getSuperstepCount();
                    Writable arrayWritable = new ArrayWritable(LongWritable.class);
                    arrayWritable.set(new Writable[]{new LongWritable(this.lastCheckPointStep), new LongWritable(this.checkpointMessageCount)});
                    if (AsyncRcvdMsgCheckpointImpl.LOG.isDebugEnabled()) {
                        AsyncRcvdMsgCheckpointImpl.LOG.debug("Storing lastCheckPointStep = " + this.lastCheckPointStep + " checkpointMessageCount = " + this.checkpointMessageCount + " for peer = " + String.valueOf(this.peer.getPeerIndex()));
                    }
                    this.syncClient.storeInformation(this.syncClient.constructKey(this.job.getJobID(), "checkpoint", String.valueOf(this.peer.getPeerIndex())), arrayWritable, true, null);
                }
                this.checkpointState = isReadyToCheckpoint();
                this.checkpointMessageCount = 0L;
            }
            AsyncRcvdMsgCheckpointImpl.LOG.info("checkpointNext = " + this.checkpointState + " checkpointMessageCount = " + this.checkpointMessageCount);
        }

        @Override // org.apache.hama.bsp.message.MessageEventListener
        public void onInitialized() {
        }

        @Override // org.apache.hama.bsp.message.MessageEventListener
        public void onMessageSent(String str, M m) {
        }

        @Override // org.apache.hama.bsp.message.MessageEventListener
        public void onMessageReceived(M m) {
            String str = null;
            if (m == null) {
                AsyncRcvdMsgCheckpointImpl.LOG.error("Message M is found to be null");
            }
            synchronized (this) {
                if (this.checkpointState) {
                    if (this.checkpointStream == null) {
                        str = checkpointPath(this.peer.getSuperstepCount() + 1);
                        try {
                            AsyncRcvdMsgCheckpointImpl.LOG.info("Creating path " + str);
                            if (AsyncRcvdMsgCheckpointImpl.LOG.isDebugEnabled()) {
                                AsyncRcvdMsgCheckpointImpl.LOG.debug("Creating path " + str);
                            }
                            this.checkpointStream = this.fs.create(new Path(str));
                        } catch (IOException e) {
                            AsyncRcvdMsgCheckpointImpl.LOG.error("Fail checkpointing messages to " + str, e);
                            throw new RuntimeException("Failed opening HDFS file " + str, e);
                        }
                    }
                    try {
                        this.checkpointMessageCount++;
                        this.checkpointStream.writeUTF(m.getClass().getCanonicalName());
                        m.write(this.checkpointStream);
                        if (AsyncRcvdMsgCheckpointImpl.LOG.isDebugEnabled()) {
                            AsyncRcvdMsgCheckpointImpl.LOG.debug("message count = " + this.checkpointMessageCount);
                        }
                    } catch (IOException e2) {
                        AsyncRcvdMsgCheckpointImpl.LOG.error("Fail checkpointing messages to " + str, e2);
                        throw new RuntimeException("Failed writing to HDFS file " + str, e2);
                    }
                }
            }
        }

        @Override // org.apache.hama.bsp.message.MessageEventListener
        public void onClose() {
        }
    }

    @Override // org.apache.hama.bsp.ft.BSPFaultTolerantService
    public FaultTolerantPeerService<M> constructPeerFaultTolerance(BSPJob bSPJob, BSPPeer bSPPeer, PeerSyncClient peerSyncClient, InetSocketAddress inetSocketAddress, TaskAttemptID taskAttemptID, long j, Configuration configuration, MessageManager<M> messageManager) throws Exception {
        CheckpointPeerService checkpointPeerService = new CheckpointPeerService();
        checkpointPeerService.initialize(bSPJob, bSPPeer, peerSyncClient, inetSocketAddress, taskAttemptID, j, configuration, messageManager);
        return checkpointPeerService;
    }

    @Override // org.apache.hama.bsp.ft.BSPFaultTolerantService
    public FaultTolerantMasterService constructMasterFaultTolerance(BSPJobID bSPJobID, int i, TaskInProgress[] taskInProgressArr, Configuration configuration, MasterSyncClient masterSyncClient, TaskAllocationStrategy taskAllocationStrategy) throws Exception {
        CheckpointMasterService checkpointMasterService = new CheckpointMasterService();
        checkpointMasterService.initialize(bSPJobID, i, taskInProgressArr, configuration, masterSyncClient, taskAllocationStrategy);
        return checkpointMasterService;
    }
}
