package org.apache.sysds.runtime.controlprogram.parfor;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.util.LongAccumulator;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.common.Types;
import org.apache.sysds.parser.dml.DmlSyntacticValidator;
import org.apache.sysds.runtime.codegen.CodegenUtils;
import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
import org.apache.sysds.runtime.lineage.Lineage;
import org.apache.sysds.runtime.util.CollectionUtils;
import org.apache.sysds.runtime.util.ProgramConverter;
import scala.Tuple2;

/* loaded from: input_file:org/apache/sysds/runtime/controlprogram/parfor/RemoteParForSparkWorker.class */
public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFunction<Task, Long, String> {
    private static final long serialVersionUID = -3254950138084272296L;
    private static final CachedReuseVariables reuseVars = new CachedReuseVariables();
    private final long _jobid;
    private final String _prog;
    private final boolean _isLocal;
    private final HashMap<String, byte[]> _clsMap;
    private boolean _initialized;
    private boolean _caching;
    private final boolean _cleanCache;
    private final Map<String, String> _lineage;
    private final LongAccumulator _aTasks;
    private final LongAccumulator _aIters;
    private final Map<String, Broadcast<CacheBlock>> _brInputs;

    public RemoteParForSparkWorker(long j, String str, boolean z, HashMap<String, byte[]> hashMap, boolean z2, LongAccumulator longAccumulator, LongAccumulator longAccumulator2, Map<String, Broadcast<CacheBlock>> map, boolean z3, Map<String, String> map2) {
        this._initialized = false;
        this._caching = true;
        this._jobid = j;
        this._prog = str;
        this._isLocal = z;
        this._clsMap = hashMap;
        this._initialized = false;
        this._caching = z2;
        this._aTasks = longAccumulator;
        this._aIters = longAccumulator2;
        this._brInputs = map;
        this._cleanCache = z3;
        this._lineage = map2;
    }

    public Iterator<Tuple2<Long, String>> call(Task task) throws Exception {
        if (!this._initialized) {
            configureWorker(TaskContext.get().taskAttemptId());
        }
        HashSet hashSet = new HashSet(this._ec.getVariables().keySet());
        long executedIterations = getExecutedIterations();
        super.executeTask(task);
        this._aTasks.add(1L);
        this._aIters.add((int) (getExecutedIterations() - executedIterations));
        this._ec.getVariables().keySet().stream().filter(str -> {
            return !hashSet.contains(str);
        }).map(str2 -> {
            return this._ec.getVariable(str2);
        }).filter(data -> {
            return data instanceof CacheableData;
        }).forEach(data2 -> {
            ((CacheableData) data2).freeEvictedBlob();
        });
        if (DMLScript.LINEAGE) {
            RemoteParForUtils.exportLineageItems(this._workerID, this._ec.getVariables(), this._resultVars, this._ec.getLineage());
        }
        return RemoteParForUtils.exportResultVariables(this._workerID, this._ec.getVariables(), this._resultVars).stream().map(str3 -> {
            return new Tuple2(Long.valueOf(this._workerID), str3);
        }).iterator();
    }

    private void configureWorker(long j) throws IOException {
        this._workerID = j;
        for (Map.Entry<String, byte[]> entry : this._clsMap.entrySet()) {
            CodegenUtils.getClassSync(entry.getKey(), entry.getValue());
        }
        ParForBody parseParForBody = ProgramConverter.parseParForBody(this._prog, (int) this._workerID, true);
        this._childBlocks = parseParForBody.getChildBlocks();
        this._ec = parseParForBody.getEc();
        this._resultVars = parseParForBody.getResultVariables();
        this._numTasks = 0L;
        this._numIters = 0L;
        reuseVars.reuseVariables(this._jobid, this._ec.getVariables(), CollectionUtils.asSet((List) this._resultVars.stream().map(resultVar -> {
            return resultVar._name;
        }).collect(Collectors.toList()), this._ec.getVarListPartitioned()), this._brInputs, this._cleanCache);
        RemoteParForUtils.setupBufferPool(this._workerID, this._isLocal);
        super.pinResultVariables();
        if (!this._caching && !this._isLocal) {
            CacheableData.disableCaching();
        }
        if (!this._isLocal) {
            DMLScript.setGlobalExecMode(Types.ExecMode.SINGLE_NODE);
        }
        DmlSyntacticValidator.init();
        if (this._lineage != null) {
            DMLScript.LINEAGE = true;
            this._ec.setLineage(Lineage.deserialize(this._lineage));
        }
        this._initialized = true;
    }

    public static void cleanupCachedVariables(long j) {
        reuseVars.clearVariables(j);
    }
}
