package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.PropertyConfigurator;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.pig.impl.plan.DependencyOrderWalker;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.SpillableMemoryManager;
import org.apache.pig.tools.pigstats.PigStatusReporter;

/* loaded from: input_file:org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.class */
public abstract class PigMapBase extends Mapper<Text, Tuple, PigNullableWritable, Writable> {
    private static final Tuple DUMMYTUPLE = null;
    protected byte keyType;
    protected PhysicalPlan mp;
    protected List<POStore> stores;
    Mapper<Text, Tuple, PigNullableWritable, Writable>.Context outputCollector;
    ProgressableReporter pigReporter;
    PhysicalOperator[] roots;
    private PhysicalOperator leaf;
    private final Log log = LogFactory.getLog(getClass());
    protected TupleFactory tf = TupleFactory.getInstance();
    protected boolean errorInMap = false;
    PigContext pigContext = null;
    private volatile boolean initialized = false;

    public void cleanup(Mapper<Text, Tuple, PigNullableWritable, Writable>.Context context) throws IOException, InterruptedException {
        super.cleanup(context);
        if (this.errorInMap) {
            return;
        }
        if (PigMapReduce.sJobConfInternal.get().get(JobControlCompiler.END_OF_INP_IN_MAP, "false").equals("true")) {
            this.mp.endOfAllInput = true;
            runPipeline(this.leaf);
        }
        for (POStore pOStore : this.stores) {
            if (!this.initialized) {
                pOStore.setStoreImpl(new MapReducePOStoreImpl(context));
                pOStore.setUp();
            }
            pOStore.tearDown();
        }
        try {
            new UDFFinishVisitor(this.mp, new DependencyOrderWalker(this.mp)).visit();
            this.mp = null;
            PhysicalOperator.setReporter(null);
            this.initialized = false;
        } catch (VisitorException e) {
            throw new VisitorException("Error while calling finish method on UDFs.", 2121, (byte) 4, e);
        }
    }

    public void setup(Mapper<Text, Tuple, PigNullableWritable, Writable>.Context context) throws IOException, InterruptedException {
        super.setup(context);
        Configuration configuration = context.getConfiguration();
        SpillableMemoryManager.configure(ConfigurationUtil.toProperties(configuration));
        PigMapReduce.sJobContext = context;
        PigMapReduce.sJobConfInternal.set(context.getConfiguration());
        PigMapReduce.sJobConf = context.getConfiguration();
        PigContext.setPackageImportList((ArrayList) ObjectSerializer.deserialize(configuration.get("udf.import.list")));
        this.pigContext = (PigContext) ObjectSerializer.deserialize(configuration.get("pig.pigContext"));
        if (this.pigContext.getLog4jProperties() != null) {
            PropertyConfigurator.configure(this.pigContext.getLog4jProperties());
        }
        this.mp = (PhysicalPlan) ObjectSerializer.deserialize(configuration.get("pig.mapPlan"));
        this.stores = PlanHelper.getStores(this.mp);
        if (this.mp.isEmpty()) {
            this.log.debug("Map Plan empty!");
        } else {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            this.mp.explain(byteArrayOutputStream);
            this.log.debug(byteArrayOutputStream.toString());
        }
        this.keyType = ((byte[]) ObjectSerializer.deserialize(configuration.get("pig.map.keytype")))[0];
        this.pigReporter = new ProgressableReporter();
        MapRedUtil.setupUDFContext(configuration);
        if (!this.mp.isEmpty()) {
            List<OperatorKey> targetOps = ((PigSplit) context.getInputSplit()).getTargetOps();
            ArrayList arrayList = new ArrayList();
            Iterator<OperatorKey> it = targetOps.iterator();
            while (it.hasNext()) {
                arrayList.add(this.mp.getOperator(it.next()));
            }
            this.roots = (PhysicalOperator[]) arrayList.toArray(new PhysicalOperator[1]);
            this.leaf = this.mp.getLeaves().get(0);
        }
        PigStatusReporter.setContext(context);
    }

    protected void map(Text text, Tuple tuple, Mapper<Text, Tuple, PigNullableWritable, Writable>.Context context) throws IOException, InterruptedException {
        if (!this.initialized) {
            this.initialized = true;
            this.outputCollector = context;
            this.pigReporter.setRep(context);
            PhysicalOperator.setReporter(this.pigReporter);
            for (POStore pOStore : this.stores) {
                pOStore.setStoreImpl(new MapReducePOStoreImpl(context));
                pOStore.setUp();
            }
            boolean equalsIgnoreCase = "true".equalsIgnoreCase(this.pigContext.getProperties().getProperty("aggregate.warning"));
            PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
            pigHadoopLogger.setAggregate(equalsIgnoreCase);
            pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
            PhysicalOperator.setPigLogger(pigHadoopLogger);
        }
        if (this.mp.isEmpty()) {
            collect(context, tuple);
            return;
        }
        for (PhysicalOperator physicalOperator : this.roots) {
            physicalOperator.attachInput(this.tf.newTupleNoCopy(tuple.getAll()));
        }
        runPipeline(this.leaf);
    }

    protected void runPipeline(PhysicalOperator physicalOperator) throws IOException, InterruptedException {
        while (true) {
            Result next = physicalOperator.getNext(DUMMYTUPLE);
            if (next.returnStatus == 0) {
                collect(this.outputCollector, (Tuple) next.result);
            } else {
                if (next.returnStatus == 3) {
                    return;
                }
                if (next.returnStatus != 1 && next.returnStatus == 2) {
                    this.errorInMap = true;
                    throw new ExecException(next.result != null ? "Received Error while processing the map plan: " + next.result : "Received Error while processing the map plan.", 2055, (byte) 4);
                }
            }
        }
    }

    public abstract void collect(Mapper<Text, Tuple, PigNullableWritable, Writable>.Context context, Tuple tuple) throws InterruptedException, IOException;

    public byte getKeyType() {
        return this.keyType;
    }

    public void setKeyType(byte b) {
        this.keyType = b;
    }

    protected /* bridge */ /* synthetic */ void map(Object obj, Object obj2, Mapper.Context context) throws IOException, InterruptedException {
        map((Text) obj, (Tuple) obj2, (Mapper<Text, Tuple, PigNullableWritable, Writable>.Context) context);
    }
}
