package org.apache.drill.exec.physical.impl.partitionsender;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import javax.inject.Named;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.compile.sig.RuntimeOverridden;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.AccountingDataTunnel;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.ops.OperatorStats;
import org.apache.drill.exec.physical.MinorFragmentEndpoint;
import org.apache.drill.exec.physical.config.HashPartitionSender;
import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec;
import org.apache.drill.exec.planner.sql.parser.impl.DrillParserImplConstants;
import org.apache.drill.exec.proto.ExecProtos;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.FragmentWritableBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.vector.ValueVector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.class */
public abstract class PartitionerTemplate implements Partitioner {
    static final Logger logger = LoggerFactory.getLogger(PartitionerTemplate.class);
    private static final int DEFAULT_RECORD_BATCH_SIZE = 1023;
    private SelectionVector2 sv2;
    private SelectionVector4 sv4;
    private RecordBatch incoming;
    private OperatorStats stats;
    private int start;
    private int end;
    private List<OutgoingRecordBatch> outgoingBatches = Lists.newArrayList();
    private int outgoingRecordBatchSize = DEFAULT_RECORD_BATCH_SIZE;

    /* loaded from: input_file:org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate$OutgoingRecordBatch.class */
    public class OutgoingRecordBatch implements PartitionOutgoingBatch, VectorAccessible {
        private final AccountingDataTunnel tunnel;
        private final HashPartitionSender operator;
        private final FragmentContext context;
        private final BufferAllocator allocator;
        private final int oppositeMinorFragmentId;
        private final OperatorStats stats;
        private int recordCount;
        private int totalRecords;
        private final VectorContainer vectorContainer = new VectorContainer();
        private boolean isLast = false;
        private boolean dropAll = false;

        public OutgoingRecordBatch(OperatorStats operatorStats, HashPartitionSender hashPartitionSender, AccountingDataTunnel accountingDataTunnel, FragmentContext fragmentContext, BufferAllocator bufferAllocator, int i) {
            this.context = fragmentContext;
            this.allocator = bufferAllocator;
            this.operator = hashPartitionSender;
            this.tunnel = accountingDataTunnel;
            this.stats = operatorStats;
            this.oppositeMinorFragmentId = i;
        }

        protected void copy(int i) throws IOException {
            doEval(i, this.recordCount);
            this.recordCount++;
            this.totalRecords++;
            if (this.recordCount == PartitionerTemplate.this.outgoingRecordBatchSize) {
                flush(false);
            }
        }

        @Override // org.apache.drill.exec.physical.impl.partitionsender.PartitionOutgoingBatch
        public void terminate() {
            this.dropAll = true;
        }

        @RuntimeOverridden
        protected void doSetup(@Named("incoming") RecordBatch recordBatch, @Named("outgoing") VectorAccessible vectorAccessible) {
        }

        @RuntimeOverridden
        protected void doEval(@Named("inIndex") int i, @Named("outIndex") int i2) {
        }

        /* JADX WARN: Type inference failed for: r0v39, types: [org.apache.drill.exec.vector.ValueVector] */
        public void flush(boolean z) throws IOException {
            if (this.dropAll) {
                this.recordCount = 0;
                return;
            }
            ExecProtos.FragmentHandle handle = this.context.getHandle();
            boolean z2 = this.isLast || Thread.currentThread().isInterrupted();
            if (z2 || this.recordCount != 0) {
                if (this.recordCount != 0) {
                    Iterator<VectorWrapper<?>> it = this.vectorContainer.iterator();
                    while (it.hasNext()) {
                        it.next().getValueVector().getMutator().setValueCount(this.recordCount);
                    }
                }
                FragmentWritableBatch fragmentWritableBatch = new FragmentWritableBatch(z2, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), this.operator.getOppositeMajorFragmentId(), this.oppositeMinorFragmentId, getWritableBatch());
                updateStats(fragmentWritableBatch);
                this.stats.startWait();
                try {
                    this.tunnel.sendRecordBatch(fragmentWritableBatch);
                    this.stats.stopWait();
                    if (z2) {
                        this.dropAll = true;
                    }
                    if (z) {
                        return;
                    }
                    this.recordCount = 0;
                    this.vectorContainer.zeroVectors();
                    allocateOutgoingRecordBatch();
                } catch (Throwable th) {
                    this.stats.stopWait();
                    throw th;
                }
            }
        }

        /* JADX WARN: Type inference failed for: r0v9, types: [org.apache.drill.exec.vector.ValueVector] */
        private void allocateOutgoingRecordBatch() {
            Iterator<VectorWrapper<?>> it = this.vectorContainer.iterator();
            while (it.hasNext()) {
                it.next().getValueVector().allocateNew();
            }
        }

        public void updateStats(FragmentWritableBatch fragmentWritableBatch) {
            this.stats.addLongStat(PartitionSenderRootExec.Metric.BYTES_SENT, fragmentWritableBatch.getByteCount());
            this.stats.addLongStat(PartitionSenderRootExec.Metric.BATCHES_SENT, 1L);
            this.stats.addLongStat(PartitionSenderRootExec.Metric.RECORDS_SENT, fragmentWritableBatch.getHeader().getDef().getRecordCount());
        }

        public void initializeBatch() {
            Iterator it = PartitionerTemplate.this.incoming.iterator();
            while (it.hasNext()) {
                ValueVector newVector = TypeHelper.getNewVector(((VectorWrapper) it.next()).getField(), this.allocator);
                newVector.setInitialCapacity(PartitionerTemplate.this.outgoingRecordBatchSize);
                this.vectorContainer.add(newVector);
            }
            allocateOutgoingRecordBatch();
            doSetup(PartitionerTemplate.this.incoming, this.vectorContainer);
        }

        public void resetBatch() {
            this.isLast = false;
            this.recordCount = 0;
            this.vectorContainer.clear();
        }

        public void setIsLast() {
            this.isLast = true;
        }

        @Override // org.apache.drill.exec.record.VectorAccessible
        public BatchSchema getSchema() {
            return PartitionerTemplate.this.incoming.getSchema();
        }

        @Override // org.apache.drill.exec.record.VectorAccessible
        public int getRecordCount() {
            return this.recordCount;
        }

        @Override // org.apache.drill.exec.physical.impl.partitionsender.PartitionOutgoingBatch
        public long getTotalRecords() {
            return this.totalRecords;
        }

        @Override // org.apache.drill.exec.record.VectorAccessible
        public TypedFieldId getValueVectorId(SchemaPath schemaPath) {
            return this.vectorContainer.getValueVectorId(schemaPath);
        }

        @Override // org.apache.drill.exec.record.VectorAccessible
        public VectorWrapper<?> getValueAccessorById(Class<?> cls, int... iArr) {
            return this.vectorContainer.getValueAccessorById(cls, iArr);
        }

        @Override // java.lang.Iterable
        public Iterator<VectorWrapper<?>> iterator() {
            return this.vectorContainer.iterator();
        }

        @Override // org.apache.drill.exec.record.VectorAccessible
        public SelectionVector2 getSelectionVector2() {
            throw new UnsupportedOperationException();
        }

        @Override // org.apache.drill.exec.record.VectorAccessible
        public SelectionVector4 getSelectionVector4() {
            throw new UnsupportedOperationException();
        }

        public WritableBatch getWritableBatch() {
            return WritableBatch.getBatchNoHVWrap(this.recordCount, this, false);
        }

        public void clear() {
            this.vectorContainer.clear();
        }
    }

    @Override // org.apache.drill.exec.physical.impl.partitionsender.Partitioner
    public List<? extends PartitionOutgoingBatch> getOutgoingBatches() {
        return this.outgoingBatches;
    }

    @Override // org.apache.drill.exec.physical.impl.partitionsender.Partitioner
    public PartitionOutgoingBatch getOutgoingBatch(int i) {
        if (i < this.start || i >= this.end) {
            return null;
        }
        return this.outgoingBatches.get(i - this.start);
    }

    @Override // org.apache.drill.exec.physical.impl.partitionsender.Partitioner
    public final void setup(FragmentContext fragmentContext, RecordBatch recordBatch, HashPartitionSender hashPartitionSender, OperatorStats operatorStats, OperatorContext operatorContext, int i, int i2) throws SchemaChangeException {
        this.incoming = recordBatch;
        this.stats = operatorStats;
        this.start = i;
        this.end = i2;
        doSetup(fragmentContext, recordBatch, null);
        if (hashPartitionSender.getDestinations().size() > 1000) {
            this.outgoingRecordBatchSize = DrillParserImplConstants.VIEW;
        }
        int i3 = 0;
        for (MinorFragmentEndpoint minorFragmentEndpoint : hashPartitionSender.getDestinations()) {
            if (i3 >= i && i3 < i2) {
                logger.debug("start: {}, count: {}, fieldId: {}", new Object[]{Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(i3)});
                this.outgoingBatches.add(new OutgoingRecordBatch(operatorStats, hashPartitionSender, fragmentContext.getDataTunnel(minorFragmentEndpoint.getEndpoint()), fragmentContext, operatorContext.getAllocator(), minorFragmentEndpoint.getId()));
            }
            i3++;
        }
        Iterator<OutgoingRecordBatch> it = this.outgoingBatches.iterator();
        while (it.hasNext()) {
            it.next().initializeBatch();
        }
        BatchSchema.SelectionVectorMode selectionVectorMode = recordBatch.getSchema().getSelectionVectorMode();
        switch (selectionVectorMode) {
            case FOUR_BYTE:
                this.sv4 = recordBatch.getSelectionVector4();
                return;
            case TWO_BYTE:
                this.sv2 = recordBatch.getSelectionVector2();
                return;
            case NONE:
                return;
            default:
                throw new UnsupportedOperationException("Unknown selection vector mode: " + selectionVectorMode.toString());
        }
    }

    @Override // org.apache.drill.exec.physical.impl.partitionsender.Partitioner
    public OperatorStats getStats() {
        return this.stats;
    }

    @Override // org.apache.drill.exec.physical.impl.partitionsender.Partitioner
    public void flushOutgoingBatches(boolean z, boolean z2) throws IOException {
        for (OutgoingRecordBatch outgoingRecordBatch : this.outgoingBatches) {
            logger.debug("Attempting to flush all outgoing batches");
            if (z) {
                outgoingRecordBatch.setIsLast();
            }
            outgoingRecordBatch.flush(z2);
            if (z2) {
                outgoingRecordBatch.resetBatch();
                outgoingRecordBatch.initializeBatch();
            }
        }
    }

    @Override // org.apache.drill.exec.physical.impl.partitionsender.Partitioner
    public void partitionBatch(RecordBatch recordBatch) throws IOException {
        BatchSchema.SelectionVectorMode selectionVectorMode = recordBatch.getSchema().getSelectionVectorMode();
        switch (selectionVectorMode) {
            case FOUR_BYTE:
                for (int i = 0; i < recordBatch.getRecordCount(); i++) {
                    doCopy(this.sv4.get(i));
                }
                return;
            case TWO_BYTE:
                for (int i2 = 0; i2 < recordBatch.getRecordCount(); i2++) {
                    doCopy(this.sv2.getIndex(i2));
                }
                return;
            case NONE:
                for (int i3 = 0; i3 < recordBatch.getRecordCount(); i3++) {
                    doCopy(i3);
                }
                return;
            default:
                throw new UnsupportedOperationException("Unknown selection vector mode: " + selectionVectorMode.toString());
        }
    }

    private void doCopy(int i) throws IOException {
        int doEval = doEval(i);
        if (doEval < this.start || doEval >= this.end) {
            return;
        }
        this.outgoingBatches.get(doEval - this.start).copy(i);
    }

    @Override // org.apache.drill.exec.physical.impl.partitionsender.Partitioner
    public void clear() {
        Iterator<OutgoingRecordBatch> it = this.outgoingBatches.iterator();
        while (it.hasNext()) {
            it.next().clear();
        }
    }

    public abstract void doSetup(@Named("context") FragmentContext fragmentContext, @Named("incoming") RecordBatch recordBatch, @Named("outgoing") OutgoingRecordBatch[] outgoingRecordBatchArr) throws SchemaChangeException;

    public abstract int doEval(@Named("inIndex") int i);
}
