package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;

import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.WeakHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.InternalCachedBag;
import org.apache.pig.data.SelfSpillBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.GroupingSpillable;
import org.apache.pig.impl.util.Spillable;
import org.apache.pig.impl.util.SpillableMemoryManager;
import org.apache.pig.scripting.ScriptEngine;

/* loaded from: input_file:org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.class */
public class POPartialAgg extends PhysicalOperator implements Spillable, GroupingSpillable {
    private static final long serialVersionUID = 1;
    private static final int NUM_RECS_TO_SAMPLE = 10000;
    private static final int MAX_LIST_SIZE = 25000;
    private int listSizeThreshold;
    private static final int DEFAULT_MIN_REDUCTION = 7;
    private static final int FIRST_TIER_THRESHOLD = 20000;
    private static final int SECOND_TIER_THRESHOLD = 2857;
    private PhysicalPlan keyPlan;
    private ExpressionOperator keyLeaf;
    private List<PhysicalPlan> valuePlans;
    private List<ExpressionOperator> valueLeaves;
    private boolean isGroupAll;
    private transient int numRecsInRawMap;
    private transient int numRecsInProcessedMap;
    private transient Map<Object, List<Tuple>> rawInputMap;
    private transient Map<Object, List<Tuple>> processedInputMap;
    private transient boolean initialized;
    private transient boolean disableMapAgg;
    private transient boolean sizeReductionChecked;
    private transient boolean inputsExhausted;
    private transient boolean estimatedMemThresholds;
    private volatile transient boolean doSpill;
    private volatile transient boolean doContingentSpill;
    private volatile transient boolean startedContingentSpill;
    private volatile transient Object spillLock;
    private transient int minOutputReduction;
    private transient float percentUsage;
    private transient int numRecordsToSample;
    private transient int firstTierThreshold;
    private transient int secondTierThreshold;
    private transient int sizeReduction;
    private transient int avgTupleSize;
    private transient Iterator<Map.Entry<Object, List<Tuple>>> spillingIterator;
    private static final Log LOG = LogFactory.getLog(POPartialAgg.class);
    private static final Result EOP_RESULT = new Result((byte) 3, null);
    private static final WeakHashMap<POPartialAgg, Byte> ALL_POPARTS = new WeakHashMap<>();

    public POPartialAgg(OperatorKey operatorKey) {
        this(operatorKey, false);
    }

    public POPartialAgg(OperatorKey operatorKey, boolean z) {
        super(operatorKey);
        this.listSizeThreshold = 9367;
        this.isGroupAll = z;
    }

    private void init() throws ExecException {
        this.numRecsInRawMap = 0;
        this.numRecsInProcessedMap = 0;
        this.minOutputReduction = 7;
        this.numRecordsToSample = NUM_RECS_TO_SAMPLE;
        this.firstTierThreshold = FIRST_TIER_THRESHOLD;
        this.secondTierThreshold = SECOND_TIER_THRESHOLD;
        this.sizeReduction = 1;
        this.avgTupleSize = 0;
        this.percentUsage = 0.2f;
        this.spillLock = new Object();
        if (PigMapReduce.sJobConfInternal.get() != null) {
            String str = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.memusage");
            if (str != null) {
                this.percentUsage = Float.parseFloat(str);
            }
            this.minOutputReduction = PigMapReduce.sJobConfInternal.get().getInt("pig.exec.mapPartAgg.minReduction", 7);
            if (this.minOutputReduction <= 0) {
                LOG.info("Specified reduction is < 0 (" + this.minOutputReduction + "). Using default 7");
                this.minOutputReduction = 7;
            }
        }
        if (this.percentUsage <= 0.0f) {
            LOG.info("No memory allocated to intermediate memory buffers. Turning off partial aggregation.");
            this.disableMapAgg = true;
            this.sizeReductionChecked = true;
            this.estimatedMemThresholds = true;
        } else {
            ALL_POPARTS.put(this, null);
            SpillableMemoryManager.getInstance().registerSpillable(this);
        }
        if (!this.disableMapAgg) {
            this.rawInputMap = Maps.newHashMapWithExpectedSize(NUM_RECS_TO_SAMPLE);
            this.processedInputMap = Maps.newHashMapWithExpectedSize(SECOND_TIER_THRESHOLD);
        }
        if (this.isGroupAll) {
            this.listSizeThreshold = Math.min(this.numRecordsToSample, MAX_LIST_SIZE);
        }
        this.initialized = true;
    }

    @Override // org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator
    public Result getNextTuple() throws ExecException {
        if (!this.initialized) {
            init();
        }
        while (true) {
            if (!this.sizeReductionChecked && this.numRecsInRawMap >= this.numRecordsToSample) {
                checkSizeReduction();
                if (this.doContingentSpill && !this.doSpill) {
                    LOG.info("Avoided emitting records during spill memory call.");
                    this.doContingentSpill = false;
                }
            }
            if (!this.estimatedMemThresholds && this.numRecsInRawMap >= this.numRecordsToSample) {
                estimateMemThresholds();
            }
            if (this.doContingentSpill) {
                this.startedContingentSpill = true;
                if (!this.doSpill) {
                    boolean z = 3 * this.numRecsInRawMap < this.numRecsInProcessedMap;
                    aggregateBothLevels(false, false);
                    if (z || shouldSpill()) {
                        startSpill(false);
                    } else {
                        LOG.info("Avoided emitting records during spill memory call.");
                        this.doContingentSpill = false;
                    }
                }
            }
            if (this.doSpill) {
                startSpill(true);
                Result spillResult = spillResult();
                if (spillResult.returnStatus == 3) {
                    this.doSpill = false;
                    this.doContingentSpill = false;
                }
                if (spillResult.returnStatus != 3) {
                    return spillResult;
                }
                if (this.inputsExhausted) {
                    freeMemory();
                    return spillResult;
                }
            }
            if (mapAggDisabled()) {
                freeMemory();
                return processInput();
            }
            Result processInput = processInput();
            if (processInput.returnStatus == 2) {
                return processInput;
            }
            if (processInput.returnStatus == 3) {
                if (!this.parentPlan.endOfAllInput) {
                    return EOP_RESULT;
                }
                this.inputsExhausted = true;
                LOG.info("Spilling last bits.");
                startSpill(true);
            } else if (processInput.returnStatus == 1) {
                continue;
            } else {
                Tuple tuple = (Tuple) processInput.result;
                this.keyPlan.attachInput(tuple);
                Result result = getResult(this.keyLeaf);
                if (result.returnStatus != 0) {
                    return result;
                }
                Object obj = result.result;
                this.keyPlan.detachInput();
                this.numRecsInRawMap++;
                addKeyValToMap(this.rawInputMap, obj, tuple);
                aggregateBothLevels(true, true);
                if (shouldSpill()) {
                    startSpill(false);
                }
            }
        }
    }

    private void freeMemory() throws ExecException {
        if (this.rawInputMap != null && !this.rawInputMap.isEmpty()) {
            throw new ExecException("Illegal state. Trying to free up partial aggregation maps when they are not empty");
        }
        this.rawInputMap = null;
        this.processedInputMap = null;
    }

    private void estimateMemThresholds() {
        if (!mapAggDisabled()) {
            LOG.info("Getting mem limits; considering " + ALL_POPARTS.size() + " POPArtialAgg objects. with memory percentage " + this.percentUsage);
            SelfSpillBag.MemoryLimits memoryLimits = new SelfSpillBag.MemoryLimits(ALL_POPARTS.size(), this.percentUsage);
            int i = 0;
            int i2 = 0;
            Iterator<Map.Entry<Object, List<Tuple>>> it = this.rawInputMap.entrySet().iterator();
            while (it.hasNext()) {
                Iterator<Tuple> it2 = it.next().getValue().iterator();
                while (it2.hasNext()) {
                    i2++;
                    int memorySize = (int) it2.next().getMemorySize();
                    i += memorySize;
                    memoryLimits.addNewObjSize(memorySize);
                }
            }
            this.avgTupleSize = i / i2;
            LOG.info("Estimated total tuples to buffer, based on " + i2 + " tuples that took up " + i + " bytes: " + memoryLimits.getCacheLimit());
            this.firstTierThreshold = (int) (0.5d + (((float) r0) * (1.0f - (1.0f / this.sizeReduction))));
            this.secondTierThreshold = (int) (0.5d + (((float) r0) * (1.0f / this.sizeReduction)));
            LOG.info("Setting thresholds. Primary: " + this.firstTierThreshold + ". Secondary: " + this.secondTierThreshold);
            if (this.secondTierThreshold == 0) {
                this.secondTierThreshold++;
                this.firstTierThreshold--;
            }
            if (this.isGroupAll) {
                this.listSizeThreshold = Math.min(this.firstTierThreshold, MAX_LIST_SIZE);
            }
        }
        this.estimatedMemThresholds = true;
    }

    private void checkSizeReduction() throws ExecException {
        if (mapAggDisabled()) {
            return;
        }
        int i = this.numRecsInProcessedMap + this.numRecsInRawMap;
        aggregateBothLevels(false, false);
        int i2 = this.numRecsInProcessedMap + this.numRecsInRawMap;
        LOG.info("After reduction, processed map: " + this.numRecsInProcessedMap + "; raw map: " + this.numRecsInRawMap);
        LOG.info("Observed reduction factor: from " + i + " to " + i2 + " => " + (i / i2) + ScriptEngine.NAMESPACE_SEPARATOR);
        if (i / i2 < this.minOutputReduction) {
            LOG.info("Disabling in-memory aggregation, since observed reduction is less than " + this.minOutputReduction);
            disableMapAgg();
        }
        this.sizeReduction = i / i2;
        this.sizeReductionChecked = true;
    }

    private void disableMapAgg() throws ExecException {
        startSpill(false);
        this.disableMapAgg = true;
        ALL_POPARTS.remove(this);
    }

    private boolean mapAggDisabled() {
        return this.disableMapAgg;
    }

    private boolean shouldAggregateFirstLevel() {
        return this.numRecsInRawMap > this.firstTierThreshold;
    }

    private boolean shouldAggregateSecondLevel() {
        return this.numRecsInProcessedMap > this.secondTierThreshold;
    }

    private boolean shouldSpill() {
        return shouldAggregateSecondLevel();
    }

    private void addKeySingleValToMap(Map<Object, List<Tuple>> map, Object obj, List<Tuple> list) throws ExecException {
        List<Tuple> list2 = map.get(obj);
        if (list2 == null) {
            map.put(obj, list);
        } else {
            list2.add(list.get(0));
        }
    }

    private void addKeyValToMap(Map<Object, List<Tuple>> map, Object obj, Tuple tuple) throws ExecException {
        List<Tuple> list = map.get(obj);
        if (list == null) {
            list = createNewValueList(map);
            map.put(obj, list);
        }
        list.add(tuple);
        if (list.size() > this.listSizeThreshold) {
            boolean z = map == this.rawInputMap;
            if (LOG.isDebugEnabled()) {
                LOG.debug("The cache for key " + obj + " has grown too large. Aggregating " + (z ? "first level." : "second level."));
            }
            if (z) {
                aggregateRawRow(obj, list);
            } else {
                aggregateSecondLevel();
            }
        }
    }

    private List<Tuple> createNewValueList(Map<Object, List<Tuple>> map) {
        ArrayList arrayList;
        if (this.isGroupAll) {
            arrayList = new ArrayList(map == this.rawInputMap ? this.listSizeThreshold : Math.min(this.secondTierThreshold + 1, MAX_LIST_SIZE));
        } else {
            arrayList = new ArrayList();
        }
        return arrayList;
    }

    private void startSpill(boolean z) throws ExecException {
        if (this.spillingIterator != null) {
            return;
        }
        LOG.info("Starting spill.");
        if (z) {
            aggregateBothLevels(false, false);
        }
        this.doSpill = true;
        this.spillingIterator = this.processedInputMap.entrySet().iterator();
    }

    private Result spillResult() throws ExecException {
        if (this.processedInputMap.isEmpty()) {
            this.spillingIterator = null;
            LOG.info("In spillResults(), processed map is empty -- done spilling.");
            return EOP_RESULT;
        }
        Map.Entry<Object, List<Tuple>> next = this.spillingIterator.next();
        Tuple createValueTuple = createValueTuple(next.getKey(), next.getValue());
        this.numRecsInProcessedMap -= next.getValue().size();
        this.spillingIterator.remove();
        return getOutput(next.getKey(), createValueTuple);
    }

    private void aggregateRawRow(Object obj, List<Tuple> list) throws ExecException {
        this.numRecsInRawMap -= list.size();
        Result output = getOutput(obj, createValueTuple(obj, list));
        this.rawInputMap.remove(obj);
        addKeyValToMap(this.processedInputMap, obj, getAggResultTuple(output.result));
        this.numRecsInProcessedMap++;
    }

    private int aggregate(Map<Object, List<Tuple>> map, Map<Object, List<Tuple>> map2, int i) throws ExecException {
        boolean z = map != map2;
        Iterator<Map.Entry<Object, List<Tuple>>> it = map.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<Object, List<Tuple>> next = it.next();
            if (next.getValue().size() != 1) {
                Result output = getOutput(next.getKey(), createValueTuple(next.getKey(), next.getValue()));
                if (z) {
                    it.remove();
                    addKeyValToMap(map2, next.getKey(), getAggResultTuple(output.result));
                } else {
                    List<Tuple> value = next.getValue();
                    if (this.isGroupAll) {
                        value.clear();
                    } else {
                        value = createNewValueList(map2);
                        map2.put(next.getKey(), value);
                    }
                    value.add(getAggResultTuple(output.result));
                }
            } else if (z) {
                it.remove();
                addKeySingleValToMap(map2, next.getKey(), next.getValue());
            }
            i++;
        }
        return i;
    }

    private void aggregateBothLevels(boolean z, boolean z2) throws ExecException {
        boolean z3 = !this.processedInputMap.isEmpty();
        if (!z || shouldAggregateFirstLevel()) {
            aggregateFirstLevel();
        }
        if (z3) {
            if (!z2 || shouldAggregateSecondLevel()) {
                aggregateSecondLevel();
            }
        }
    }

    private void aggregateFirstLevel() throws ExecException {
        if (this.rawInputMap.isEmpty()) {
            return;
        }
        int i = this.numRecsInRawMap;
        int i2 = this.numRecsInProcessedMap;
        this.numRecsInProcessedMap = aggregate(this.rawInputMap, this.processedInputMap, this.numRecsInProcessedMap);
        this.numRecsInRawMap = 0;
        LOG.info("Aggregated " + i + " raw tuples. Processed tuples before aggregation = " + i2 + ", after aggregation = " + this.numRecsInProcessedMap);
    }

    private void aggregateSecondLevel() throws ExecException {
        if (this.processedInputMap.isEmpty()) {
            return;
        }
        int i = this.numRecsInProcessedMap;
        this.numRecsInProcessedMap = aggregate(this.processedInputMap, this.processedInputMap, 0);
        LOG.info("Aggregated " + i + " processed tuples to " + this.numRecsInProcessedMap + " tuples");
    }

    private Tuple createValueTuple(Object obj, List<Tuple> list) throws ExecException {
        Tuple newTuple = mTupleFactory.newTuple(this.valuePlans.size() + 1);
        newTuple.set(0, obj);
        for (int i = 0; i < this.valuePlans.size(); i++) {
            newTuple.set(i + 1, this.doContingentSpill ? new InternalCachedBag() : new InternalCachedBag(1, 0.1f));
        }
        for (Tuple tuple : list) {
            for (int i2 = 1; i2 < tuple.size(); i2++) {
                ((DataBag) newTuple.get(i2)).add((Tuple) tuple.get(i2));
            }
        }
        return newTuple;
    }

    private Tuple getAggResultTuple(Object obj) throws ExecException {
        try {
            return (Tuple) obj;
        } catch (ClassCastException e) {
            throw new ExecException("Intermediate Algebraic functions must implement EvalFunc<Tuple>");
        }
    }

    @Override // org.apache.pig.pen.Illustrable
    public Tuple illustratorMarkup(Object obj, Object obj2, int i) {
        throw new UnsupportedOperationException();
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator, org.apache.pig.impl.plan.Operator
    public void visit(PhyPlanVisitor phyPlanVisitor) throws VisitorException {
        phyPlanVisitor.visitPartialAgg(this);
    }

    private Result getResult(ExpressionOperator expressionOperator) throws ExecException {
        switch (expressionOperator.getResultType()) {
            case 5:
            case 10:
            case 15:
            case 20:
            case 25:
            case 30:
            case 50:
            case 55:
            case 65:
            case 70:
            case 100:
            case 110:
            case 120:
                return expressionOperator.getNext(expressionOperator.getResultType());
            default:
                throw new ExecException("Invalid result type: " + ((int) DataType.findType(Byte.valueOf(expressionOperator.getResultType()))), 2270, (byte) 4);
        }
    }

    private Result getOutput(Object obj, Tuple tuple) throws ExecException {
        Tuple newTuple = mTupleFactory.newTuple(this.valuePlans.size() + 1);
        newTuple.set(0, obj);
        for (int i = 0; i < this.valuePlans.size(); i++) {
            this.valuePlans.get(i).attachInput(tuple);
            Result result = getResult(this.valueLeaves.get(i));
            if (result.returnStatus == 2) {
                return result;
            }
            newTuple.set(i + 1, result.result);
        }
        return new Result((byte) 0, newTuple);
    }

    @Override // org.apache.pig.impl.plan.Operator
    public boolean supportsMultipleInputs() {
        return false;
    }

    @Override // org.apache.pig.impl.plan.Operator
    public boolean supportsMultipleOutputs() {
        return false;
    }

    @Override // org.apache.pig.impl.plan.Operator
    public String name() {
        return getAliasString() + "Partial Agg[" + DataType.findTypeName(this.resultType) + "]" + this.mKey.toString();
    }

    public PhysicalPlan getKeyPlan() {
        return this.keyPlan;
    }

    public void setKeyPlan(PhysicalPlan physicalPlan) {
        this.keyPlan = physicalPlan;
        this.keyLeaf = (ExpressionOperator) physicalPlan.getLeaves().get(0);
    }

    public List<PhysicalPlan> getValuePlans() {
        return this.valuePlans;
    }

    public void setValuePlans(List<PhysicalPlan> list) {
        this.valuePlans = list;
        this.valueLeaves = new ArrayList();
        Iterator<PhysicalPlan> it = list.iterator();
        while (it.hasNext()) {
            this.valueLeaves.add((ExpressionOperator) it.next().getLeaves().get(0));
        }
    }

    @Override // org.apache.pig.impl.util.Spillable
    public long spill() {
        if (mapAggDisabled()) {
            return 0L;
        }
        if (this.doContingentSpill && !this.startedContingentSpill) {
            LOG.info("Spill triggered by SpillableMemoryManager, but previous spill call is still not processed. Skipping");
            return 0L;
        }
        synchronized (this.spillLock) {
            if (this.rawInputMap != null) {
                LOG.info("Spill triggered. Memory usage: " + getMemorySize() + ". Raw map: keys = " + this.rawInputMap.size() + ", tuples = " + this.numRecsInRawMap + ", Processed map: keys = " + this.processedInputMap.size() + ", tuples = " + this.numRecsInProcessedMap);
            }
            this.startedContingentSpill = false;
            this.doContingentSpill = true;
            if (!this.sizeReductionChecked || !this.estimatedMemThresholds) {
                this.numRecordsToSample = this.numRecsInRawMap;
            }
            try {
                long currentTimeMillis = System.currentTimeMillis();
                while (this.doContingentSpill) {
                    Thread.sleep(25L);
                    if (!this.startedContingentSpill && System.currentTimeMillis() - currentTimeMillis >= 5000) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                LOG.warn("Interrupted exception while waiting for spill to finish", e);
            }
            if (this.doContingentSpill) {
                LOG.info("Not blocking for spill and letting SpillableMemoryManager process other spillable objects as main thread has not reached here for 5 secs");
                return 0L;
            }
            LOG.info("Finished spill for SpillableMemoryManager call");
            return serialVersionUID;
        }
    }

    @Override // org.apache.pig.impl.util.Spillable
    public long getMemorySize() {
        return this.avgTupleSize * (this.numRecsInProcessedMap + this.numRecsInRawMap);
    }

    @Override // org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator, org.apache.pig.impl.plan.Operator
    public PhysicalOperator clone() throws CloneNotSupportedException {
        POPartialAgg pOPartialAgg = (POPartialAgg) super.clone();
        pOPartialAgg.setKeyPlan(this.keyPlan.m85clone());
        pOPartialAgg.setValuePlans(clonePlans(this.valuePlans));
        return pOPartialAgg;
    }
}
