/*
 * Decompiled with CFR 0.152.
 */
package com.google.appengine.tools.pipeline.impl;

import com.google.appengine.api.datastore.Key;
import com.google.appengine.api.datastore.KeyFactory;
import com.google.appengine.api.taskqueue.TaskAlreadyExistsException;
import com.google.appengine.tools.pipeline.FutureList;
import com.google.appengine.tools.pipeline.ImmediateValue;
import com.google.appengine.tools.pipeline.Job;
import com.google.appengine.tools.pipeline.JobSetting;
import com.google.appengine.tools.pipeline.NoSuchObjectException;
import com.google.appengine.tools.pipeline.OrphanedObjectException;
import com.google.appengine.tools.pipeline.Value;
import com.google.appengine.tools.pipeline.impl.FutureValueImpl;
import com.google.appengine.tools.pipeline.impl.backend.AppEngineBackEnd;
import com.google.appengine.tools.pipeline.impl.backend.PipelineBackEnd;
import com.google.appengine.tools.pipeline.impl.backend.UpdateSpec;
import com.google.appengine.tools.pipeline.impl.model.Barrier;
import com.google.appengine.tools.pipeline.impl.model.JobInstanceRecord;
import com.google.appengine.tools.pipeline.impl.model.JobRecord;
import com.google.appengine.tools.pipeline.impl.model.PipelineModelObject;
import com.google.appengine.tools.pipeline.impl.model.PipelineObjects;
import com.google.appengine.tools.pipeline.impl.model.Slot;
import com.google.appengine.tools.pipeline.impl.model.SlotDescriptor;
import com.google.appengine.tools.pipeline.impl.servlets.PipelineServlet;
import com.google.appengine.tools.pipeline.impl.tasks.DeletePipelineTask;
import com.google.appengine.tools.pipeline.impl.tasks.FanoutTask;
import com.google.appengine.tools.pipeline.impl.tasks.FinalizeJobTask;
import com.google.appengine.tools.pipeline.impl.tasks.HandleSlotFilledTask;
import com.google.appengine.tools.pipeline.impl.tasks.ObjRefTask;
import com.google.appengine.tools.pipeline.impl.tasks.RunJobTask;
import com.google.appengine.tools.pipeline.impl.tasks.Task;
import com.google.appengine.tools.pipeline.impl.util.GUIDGenerator;
import com.google.appengine.tools.pipeline.impl.util.StringUtils;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

public class PipelineManager {
    private static final Logger logger = Logger.getLogger(PipelineManager.class.getName());
    private static PipelineBackEnd backEnd = new AppEngineBackEnd();

    public static String startNewPipeline(JobSetting[] settings, Job<?> jobInstance, Object ... params) {
        UpdateSpec updateSpec = new UpdateSpec(null);
        String graphGUID = null;
        JobRecord parentJobKey = null;
        JobRecord jobRecord = PipelineManager.registerNewJobRecord(updateSpec, settings, parentJobKey, graphGUID, jobInstance, params);
        updateSpec.setRootJobKey(jobRecord.getRootJobKey());
        backEnd.save(updateSpec);
        return jobRecord.getKey().getName();
    }

    public static JobRecord registerNewJobRecord(UpdateSpec updateSpec, JobSetting[] settings, JobRecord generatorJob, String graphGUID, Job<?> jobInstance, Object[] params) {
        Key rootKey = null == generatorJob ? null : generatorJob.getRootJobKey();
        Key generatorKey = null == generatorJob ? null : generatorJob.getKey();
        JobRecord jobRecord = new JobRecord(rootKey, generatorKey, graphGUID, jobInstance, settings);
        updateSpec.setRootJobKey(jobRecord.getRootJobKey());
        for (Object param : params) {
            Value<Object> value = null != param && param instanceof Value ? (Value)param : new ImmediateValue<Object>(param);
            PipelineManager.registerSlotsWithBarrier(updateSpec, value, jobRecord.getRootJobKey(), generatorKey, graphGUID, jobRecord.getRunBarrierInflated());
        }
        if (0 == jobRecord.getRunBarrierInflated().getWaitingOnKeys().size()) {
            Slot slot = new Slot(jobRecord.getRootJobKey(), generatorKey, graphGUID);
            jobRecord.getRunBarrierInflated().addPhantomArgumentSlot(slot);
            PipelineManager.registerSlotFilled(updateSpec, slot, null);
        }
        UpdateSpec.Group updateGroup = updateSpec.getNonTransactionalGroup();
        updateGroup.includeBarrier(jobRecord.getRunBarrierInflated());
        updateGroup.includeBarrier(jobRecord.getFinalizeBarrierInflated());
        updateGroup.includeSlot(jobRecord.getOutputSlotInflated());
        updateGroup.includeJob(jobRecord);
        updateGroup.includeJobInstanceRecord(jobRecord.getJobInstanceInflated());
        return jobRecord;
    }

    private static void registerSlotsWithBarrier(UpdateSpec updateSpec, Value<?> value, Key rootJobKey, Key generatorJobKey, String graphGUID, Barrier barrier) {
        if (null == value || value instanceof ImmediateValue) {
            Object concreteValue = null;
            if (null != value) {
                ImmediateValue iv = (ImmediateValue)value;
                concreteValue = iv.getValue();
            }
            Slot slot = new Slot(rootJobKey, generatorJobKey, graphGUID);
            PipelineManager.registerSlotFilled(updateSpec, slot, concreteValue);
            barrier.addRegularArgumentSlot(slot);
        } else if (value instanceof FutureValueImpl) {
            FutureValueImpl futureValue = (FutureValueImpl)value;
            Slot slot = futureValue.getSlot();
            barrier.addRegularArgumentSlot(slot);
            updateSpec.getNonTransactionalGroup().includeSlot(slot);
        } else if (value instanceof FutureList) {
            FutureList futureList = (FutureList)value;
            ArrayList<Slot> slotList = new ArrayList<Slot>(futureList.getListOfValues().size());
            Slot dummyListSlot = new Slot(rootJobKey, generatorJobKey, graphGUID);
            PipelineManager.registerSlotFilled(updateSpec, dummyListSlot, null);
            for (Value valFromList : futureList.getListOfValues()) {
                Slot slot = null;
                if (valFromList instanceof ImmediateValue) {
                    ImmediateValue ivFromList = (ImmediateValue)valFromList;
                    slot = new Slot(rootJobKey, generatorJobKey, graphGUID);
                    PipelineManager.registerSlotFilled(updateSpec, slot, ivFromList.getValue());
                } else if (valFromList instanceof FutureValueImpl) {
                    FutureValueImpl futureValFromList = (FutureValueImpl)valFromList;
                    slot = futureValFromList.getSlot();
                } else {
                    if (value instanceof FutureList) {
                        throw new IllegalArgumentException("The Pipeline framework does not currently support FutureLists of FutureLists");
                    }
                    PipelineManager.throwUnrecognizedValueException(valFromList);
                }
                slotList.add(slot);
                updateSpec.getNonTransactionalGroup().includeSlot(slot);
            }
            barrier.addListArgumentSlots(dummyListSlot, slotList);
        } else {
            PipelineManager.throwUnrecognizedValueException(value);
        }
    }

    private static void throwUnrecognizedValueException(Value<?> value) {
        throw new RuntimeException("Internal logic error: Unrecognized implementation of Value interface: " + value.getClass().getName());
    }

    private static void registerSlotFilled(UpdateSpec updateSpec, Slot slot, Object value) {
        slot.fill(value);
        updateSpec.getNonTransactionalGroup().includeSlot(slot);
        updateSpec.getFinalTransaction().registerTask(new HandleSlotFilledTask(slot));
    }

    public static PipelineObjects queryFullPipeline(String rootJobHandle) {
        Key rootJobKey = KeyFactory.createKey((String)"pipeline-job", (String)rootJobHandle);
        return backEnd.queryFullPipeline(rootJobKey);
    }

    private static void checkNonEmpty(String s, String name) {
        if (null == s || s.trim().length() == 0) {
            throw new IllegalArgumentException(name + " is empty.");
        }
    }

    public static JobRecord getJob(String jobHandle) throws NoSuchObjectException {
        PipelineManager.checkNonEmpty(jobHandle, "jobHandle");
        Key key = KeyFactory.createKey((String)"pipeline-job", (String)jobHandle);
        logger.finest("getJob: " + key.getName());
        return backEnd.queryJob(key, JobRecord.InflationType.FOR_OUTPUT);
    }

    public static void stopJob(String jobHandle) throws NoSuchObjectException {
        PipelineManager.checkNonEmpty(jobHandle, "jobHandle");
        Key key = KeyFactory.createKey((String)"pipeline-job", (String)jobHandle);
        JobRecord jobRecord = backEnd.queryJob(key, JobRecord.InflationType.NONE);
        jobRecord.setState(JobRecord.State.STOPPED);
        UpdateSpec updateSpec = new UpdateSpec(jobRecord.getRootJobKey());
        updateSpec.getTransaction("stopJob").includeJob(jobRecord);
        backEnd.save(updateSpec);
    }

    public static void deletePipelineRecords(String pipelineHandle, boolean force, boolean async) throws NoSuchObjectException, IllegalStateException {
        PipelineManager.checkNonEmpty(pipelineHandle, "pipelineHandle");
        Key key = KeyFactory.createKey((String)"pipeline-job", (String)pipelineHandle);
        backEnd.deletePipeline(key, force, async);
    }

    public static void acceptPromisedValue(String promiseHandle, Object value) throws NoSuchObjectException, OrphanedObjectException {
        PipelineManager.checkNonEmpty(promiseHandle, "promiseHandle");
        Key key = KeyFactory.stringToKey((String)promiseHandle);
        PipelineModelObject slot = null;
        for (int i = 0; i < 5; ++i) {
            try {
                slot = backEnd.querySlot(key, false);
                continue;
            }
            catch (NoSuchObjectException e) {
                try {
                    Thread.sleep((long)Math.pow(2.0, i) * 1000L);
                    continue;
                }
                catch (InterruptedException f) {
                    // empty catch block
                }
            }
        }
        if (null == slot) {
            throw new NoSuchObjectException("There is no promise with handle " + promiseHandle);
        }
        Key generatorJobKey = slot.getGeneratorJobKey();
        if (null == generatorJobKey) {
            throw new RuntimeException("Pipeline is fatally corrupted. Slot for promised value has no generatorJobKey: " + slot);
        }
        JobRecord generatorJob = backEnd.queryJob(generatorJobKey, JobRecord.InflationType.NONE);
        if (null == generatorJob) {
            throw new RuntimeException("Pipeline is fatally corrupted. The generator job for a promised value slot was not found: " + generatorJobKey);
        }
        String childGraphGuid = generatorJob.getChildGraphGuid();
        if (null == childGraphGuid) {
            throw new NoSuchObjectException("The framework is not ready to accept the promised value yet. Please try again after the job that generated the promis handle has completed.");
        }
        if (!childGraphGuid.equals(slot.getGraphGuid())) {
            throw new OrphanedObjectException(promiseHandle);
        }
        UpdateSpec updateSpec = new UpdateSpec(slot.getRootJobKey());
        PipelineManager.registerSlotFilled(updateSpec, (Slot)slot, value);
        backEnd.save(updateSpec);
    }

    public static void processTask(Task task) {
        logger.finest("Processing task " + task);
        try {
            switch (task.getType()) {
                case RUN_JOB: {
                    RunJobTask runJobTask = (RunJobTask)task;
                    PipelineManager.runJob(runJobTask.getJobKey());
                    break;
                }
                case HANDLE_SLOT_FILLED: {
                    HandleSlotFilledTask hsfTask = (HandleSlotFilledTask)task;
                    PipelineManager.handleSlotFilled(hsfTask.getSlotKey());
                    break;
                }
                case FINALIZE_JOB: {
                    FinalizeJobTask finalizeJobTask = (FinalizeJobTask)task;
                    PipelineManager.finalizeJob(finalizeJobTask.getJobKey());
                    break;
                }
                case FAN_OUT: {
                    FanoutTask fanoutTask = (FanoutTask)task;
                    PipelineManager.handleFanoutTaskOrAbandonTask(fanoutTask);
                    break;
                }
                case DELETE_PIPELINE: {
                    DeletePipelineTask deletePipelineTask = (DeletePipelineTask)task;
                    try {
                        backEnd.deletePipeline(deletePipelineTask.getRootJobKey(), deletePipelineTask.shouldForce(), false);
                    }
                    catch (Exception e) {
                        logger.log(Level.WARNING, "DeletePipeline operation failed.", e);
                    }
                    break;
                }
                default: {
                    throw new IllegalArgumentException("Unrecognized task type: " + (Object)((Object)task.getType()));
                }
            }
        }
        catch (AbandonTaskException abandonTaskException) {
            // empty catch block
        }
    }

    public static PipelineBackEnd getBackEnd() {
        return backEnd;
    }

    private static void invokePrivateJobMethod(String methodName, Job<?> jobObject, Object ... params) {
        Class[] signature = new Class[params.length];
        int i = 0;
        for (Object param : params) {
            signature[i++] = param.getClass();
        }
        PipelineManager.invokePrivateJobMethod(methodName, jobObject, signature, params);
    }

    private static void invokePrivateJobMethod(String methodName, Job<?> jobObject, Class<?>[] signature, Object ... params) {
        Class<Job> jobClass = Job.class;
        try {
            Method method = jobClass.getDeclaredMethod(methodName, signature);
            method.setAccessible(true);
            method.invoke(jobObject, params);
        }
        catch (NoSuchMethodException e) {
            throw new RuntimeException(e);
        }
        catch (InvocationTargetException e) {
            throw new RuntimeException(e);
        }
        catch (IllegalAccessException e) {
            throw new RuntimeException(e);
        }
    }

    private static Method findAppropriateRunMethod(Class<?> klass, Object ... params) {
        Method runMethod = null;
        for (Method method : klass.getMethods()) {
            if (!"run".equals(method.getName())) continue;
            runMethod = method;
            break;
        }
        return runMethod;
    }

    private static void setJobRecord(Job<?> jobObject, JobRecord jobRecord) {
        PipelineManager.invokePrivateJobMethod("setJobRecord", jobObject, jobRecord);
    }

    private static void setCurrentRunGuid(Job<?> jobObject, String guid) {
        PipelineManager.invokePrivateJobMethod("setCurrentRunGuid", jobObject, guid);
    }

    private static void setUpdateSpec(Job<?> jobObject, UpdateSpec updateSpec) {
        PipelineManager.invokePrivateJobMethod("setUpdateSpec", jobObject, updateSpec);
    }

    private static void runJob(Key jobKey) {
        JobRecord jobRecord = null;
        jobRecord = PipelineManager.queryJobOrAbandonTask(jobKey, JobRecord.InflationType.FOR_RUN);
        Key rootJobKey = jobRecord.getRootJobKey();
        logger.info("Running pipeline job " + jobKey.getName() + "; UI at " + PipelineServlet.makeViewerUrl(rootJobKey, jobKey));
        JobRecord rootJobRecord = jobRecord;
        if (!rootJobKey.equals((Object)jobKey)) {
            rootJobRecord = PipelineManager.queryJobOrAbandonTask(rootJobKey, JobRecord.InflationType.NONE);
        }
        if (rootJobRecord.getState() == JobRecord.State.STOPPED) {
            logger.warning("The pipeline has been stopped: " + rootJobRecord);
            throw new AbandonTaskException();
        }
        JobRecord.State jobState = jobRecord.getState();
        Barrier runBarrier = jobRecord.getRunBarrierInflated();
        if (null == runBarrier) {
            throw new RuntimeException("Internal logic error: " + jobRecord + " has not been inflated.");
        }
        Barrier finalizeBarrier = jobRecord.getFinalizeBarrierInflated();
        if (null == finalizeBarrier) {
            throw new RuntimeException("Internal logic error: finalize barrier not inflated in " + jobRecord);
        }
        runBarrier.setReleased();
        UpdateSpec updateSpec = new UpdateSpec(rootJobKey);
        updateSpec.getTransaction("releaseRunBarrier").includeBarrier(runBarrier);
        backEnd.save(updateSpec);
        updateSpec = new UpdateSpec(rootJobKey);
        switch (jobState) {
            case WAITING_TO_RUN: 
            case RETRY: {
                break;
            }
            case WAITING_TO_FINALIZE: {
                logger.info("This job has already been run " + jobRecord);
                return;
            }
            case STOPPED: {
                logger.info("This job has been stoped. " + jobRecord);
                return;
            }
        }
        JobInstanceRecord record = jobRecord.getJobInstanceInflated();
        if (null == record) {
            throw new RuntimeException("Internal logic error:" + jobRecord + " does not have jobInstanceInflated.");
        }
        Job<?> jobObject = record.getJobInstanceDeserialized();
        PipelineManager.setJobRecord(jobObject, jobRecord);
        String currentRunGUID = GUIDGenerator.nextGUID();
        PipelineManager.setCurrentRunGuid(jobObject, currentRunGUID);
        PipelineManager.setUpdateSpec(jobObject, updateSpec);
        Object[] params = runBarrier.buildArgumentArray();
        Method runMethod = PipelineManager.findAppropriateRunMethod(jobObject.getClass(), params);
        if (logger.isLoggable(Level.FINEST)) {
            StringBuilder builder = new StringBuilder(1024);
            builder.append("Running " + jobRecord + " with params: ");
            builder.append(StringUtils.toString(params));
            logger.finest(builder.toString());
        }
        jobRecord.incrementAttemptNumber();
        jobRecord.setStartTime(new Date());
        UpdateSpec tempSpec = new UpdateSpec(jobRecord.getRootJobKey());
        tempSpec.getNonTransactionalGroup().includeJob(jobRecord);
        backEnd.save(tempSpec);
        Value returnValue = null;
        Exception caughtException = null;
        try {
            runMethod.setAccessible(true);
            returnValue = (Value)runMethod.invoke(jobObject, params);
        }
        catch (Exception e) {
            caughtException = e;
        }
        if (null != caughtException) {
            PipelineManager.handleExceptionDuringRun(jobRecord, rootJobRecord, caughtException);
            return;
        }
        logger.finest("Job returned: " + returnValue);
        PipelineManager.registerSlotsWithBarrier(updateSpec, returnValue, rootJobKey, jobRecord.getKey(), currentRunGUID, finalizeBarrier);
        jobRecord.setState(JobRecord.State.WAITING_TO_FINALIZE);
        jobRecord.setChildGraphGuid(currentRunGUID);
        updateSpec.getFinalTransaction().includeJob(jobRecord);
        updateSpec.getFinalTransaction().includeBarrier(finalizeBarrier);
        backEnd.saveWithJobStateCheck(updateSpec, jobKey, JobRecord.State.WAITING_TO_RUN, JobRecord.State.RETRY);
    }

    private static void handleExceptionDuringRun(JobRecord jobRecord, JobRecord rootJobRecord, Exception e) {
        int attemptNumber = jobRecord.getAttemptNumber();
        int maxAttempts = jobRecord.getMaxAttempts();
        String message = StringUtils.printStackTraceToString(e);
        jobRecord.setErrorMessage(message);
        Key thisJobKey = jobRecord.getKey();
        UpdateSpec updateSpec = new UpdateSpec(jobRecord.getRootJobKey());
        if (attemptNumber >= maxAttempts) {
            jobRecord.setState(JobRecord.State.STOPPED);
            rootJobRecord.setState(JobRecord.State.STOPPED);
            rootJobRecord.setErrorMessage(message);
            updateSpec.getNonTransactionalGroup().includeJob(jobRecord);
            updateSpec.getNonTransactionalGroup().includeJob(rootJobRecord);
            backEnd.save(updateSpec);
        } else {
            jobRecord.setState(JobRecord.State.RETRY);
            updateSpec.getNonTransactionalGroup().includeJob(jobRecord);
            updateSpec.getNonTransactionalGroup().includeJob(rootJobRecord);
            backEnd.save(updateSpec);
            int backoffFactor = jobRecord.getBackoffFactor();
            int backoffSeconds = jobRecord.getBackoffSeconds();
            RunJobTask task = new RunJobTask(jobRecord.getKey(), attemptNumber);
            task.setDelaySeconds((long)backoffSeconds * (long)Math.pow(backoffFactor, attemptNumber));
            task.setOnBackend(jobRecord.getOnBackend());
            backEnd.enqueue(task);
        }
        logger.log(Level.SEVERE, "An exception occurred while attempting to run " + jobRecord + ". " + "This was attempt number " + attemptNumber + " of " + maxAttempts + ".", e);
    }

    private static void finalizeJob(Key jobKey) {
        JobRecord jobRecord = null;
        jobRecord = PipelineManager.queryJobOrAbandonTask(jobKey, JobRecord.InflationType.FOR_FINALIZE);
        Barrier finalizeBarrier = jobRecord.getFinalizeBarrierInflated();
        if (null == finalizeBarrier) {
            throw new RuntimeException("" + jobRecord + " has not been inflated");
        }
        Slot outputSlot = jobRecord.getOutputSlotInflated();
        if (null == outputSlot) {
            throw new RuntimeException("" + jobRecord + " has not been inflated.");
        }
        finalizeBarrier.setReleased();
        UpdateSpec updateSpec = new UpdateSpec(jobRecord.getRootJobKey());
        updateSpec.getTransaction("releaseFinalizeBarrier").includeBarrier(finalizeBarrier);
        backEnd.save(updateSpec);
        updateSpec = new UpdateSpec(jobRecord.getRootJobKey());
        List<Object> finalizeArguments = finalizeBarrier.buildArgumentList();
        int numFinalizeArguments = finalizeArguments.size();
        if (1 != numFinalizeArguments) {
            throw new RuntimeException("Internal logic error: numFinalizeArguments=" + numFinalizeArguments);
        }
        Object finalizeValue = finalizeArguments.get(0);
        logger.finest("Finalizing " + jobRecord + " with value=" + finalizeValue);
        outputSlot.fill(finalizeValue);
        jobRecord.setState(JobRecord.State.FINALIZED);
        jobRecord.setEndTime(new Date());
        Key fillerJobKey = PipelineManager.getFinalizeSlotFiller(finalizeBarrier);
        if (null == fillerJobKey) {
            fillerJobKey = jobKey;
        }
        outputSlot.setSourceJobKey(fillerJobKey);
        updateSpec.getNonTransactionalGroup().includeJob(jobRecord);
        updateSpec.getNonTransactionalGroup().includeSlot(outputSlot);
        backEnd.save(updateSpec);
        backEnd.enqueue(new HandleSlotFilledTask(outputSlot));
    }

    private static Key getFinalizeSlotFiller(Barrier finalizeBarrier) {
        Key fillerJobKey = null;
        for (SlotDescriptor slotDescriptor : finalizeBarrier.getWaitingOnInflated()) {
            Key key = slotDescriptor.slot.getSourceJobKey();
            if (null == key) continue;
            if (null == fillerJobKey) {
                fillerJobKey = key;
                continue;
            }
            if (fillerJobKey.toString().equals(key.toString())) continue;
            return null;
        }
        return fillerJobKey;
    }

    private static void handleSlotFilled(Key slotKey) {
        Slot slot = null;
        slot = PipelineManager.querySlotOrAbandonTask(slotKey, true);
        List<Barrier> waitingList = slot.getWaitingOnMeInflated();
        if (null == waitingList) {
            throw new RuntimeException("Internal logic error: " + slot + " is not inflated");
        }
        for (Barrier barrier : waitingList) {
            ObjRefTask task;
            logger.finest("Checking " + barrier);
            if (barrier.isReleased()) continue;
            boolean shouldBeReleased = true;
            if (null == barrier.getWaitingOnInflated()) {
                throw new RuntimeException("Internal logic error: " + barrier + " is not inflated.");
            }
            for (SlotDescriptor sd : barrier.getWaitingOnInflated()) {
                if (sd.slot.isFilled()) continue;
                logger.finest("Not filled: " + sd.slot);
                shouldBeReleased = false;
                break;
            }
            if (!shouldBeReleased) continue;
            Key jobKey = barrier.getJobKey();
            switch (barrier.getType()) {
                case RUN: {
                    task = new RunJobTask(jobKey);
                    break;
                }
                case FINALIZE: {
                    task = new FinalizeJobTask(jobKey);
                    break;
                }
                default: {
                    throw new RuntimeException("Unknown barrier type " + (Object)((Object)barrier.getType()));
                }
            }
            try {
                backEnd.enqueue(task);
            }
            catch (TaskAlreadyExistsException e) {}
        }
    }

    private static JobRecord queryJobOrAbandonTask(Key key, JobRecord.InflationType inflationType) {
        try {
            return backEnd.queryJob(key, inflationType);
        }
        catch (NoSuchObjectException e) {
            logger.log(Level.SEVERE, "Cannot find some part of the job: " + key + ". Aborting the pipeline.", e);
            throw new AbandonTaskException();
        }
    }

    private static Slot querySlotOrAbandonTask(Key key, boolean inflate) {
        try {
            return backEnd.querySlot(key, inflate);
        }
        catch (NoSuchObjectException e) {
            logger.log(Level.SEVERE, "Cannot find the slot: " + key + ". Aborting the pipeline.", e);
            throw new AbandonTaskException();
        }
    }

    private static void handleFanoutTaskOrAbandonTask(FanoutTask fanoutTask) {
        try {
            backEnd.handleFanoutTask(fanoutTask);
        }
        catch (NoSuchObjectException e) {
            logger.log(Level.SEVERE, "Pipeline is fatally corrupted. Fanout task record not found", e);
            throw new AbandonTaskException();
        }
    }

    private static class AbandonTaskException
    extends RuntimeException {
        private static final long serialVersionUID = 358437646006972459L;

        private AbandonTaskException() {
        }
    }
}

