package org.apache.flink.table.runtime.operators.join.stream;

import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.RowDataUtil;
import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
import org.apache.flink.table.runtime.operators.bundle.trigger.BundleTriggerCallback;
import org.apache.flink.table.runtime.operators.bundle.trigger.CoBundleTrigger;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.operators.join.stream.bundle.BufferBundle;
import org.apache.flink.table.runtime.operators.join.stream.bundle.InputSideHasNoUniqueKeyBundle;
import org.apache.flink.table.runtime.operators.join.stream.bundle.InputSideHasUniqueKeyBundle;
import org.apache.flink.table.runtime.operators.join.stream.bundle.JoinKeyContainsUniqueKeyBundle;
import org.apache.flink.table.runtime.operators.join.stream.state.JoinInputSideSpec;
import org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateView;
import org.apache.flink.table.runtime.operators.metrics.SimpleGauge;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/join/stream/MiniBatchStreamingJoinOperator.class */
public abstract class MiniBatchStreamingJoinOperator extends StreamingJoinOperator implements BundleTriggerCallback {
    private static final long serialVersionUID = -1106342589994963997L;
    private final CoBundleTrigger<RowData, RowData> coBundleTrigger;
    private transient BufferBundle<?> leftBuffer;
    private transient BufferBundle<?> rightBuffer;
    private transient SimpleGauge<Integer> leftBundleReducedSizeGauge;
    private transient SimpleGauge<Integer> rightBundleReducedSizeGauge;
    private transient TypeSerializer<RowData> leftSerializer;
    private transient TypeSerializer<RowData> rightSerializer;

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/stream/MiniBatchStreamingJoinOperator$MiniBatchFullOuterJoinStreamOperator.class */
    private static final class MiniBatchFullOuterJoinStreamOperator extends MiniBatchStreamingJoinOperator {
        public MiniBatchFullOuterJoinStreamOperator(MiniBatchStreamingJoinParameter miniBatchStreamingJoinParameter) {
            super(miniBatchStreamingJoinParameter);
        }

        @Override // org.apache.flink.table.runtime.operators.join.stream.MiniBatchStreamingJoinOperator
        protected void processBundles(BufferBundle<?> bufferBundle, BufferBundle<?> bufferBundle2) throws Exception {
            processSingleSideBundles(bufferBundle2, this.rightRecordStateView, this.leftRecordStateView, false);
            processSingleSideBundles(bufferBundle, this.leftRecordStateView, this.rightRecordStateView, true);
        }
    }

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/stream/MiniBatchStreamingJoinOperator$MiniBatchInnerJoinStreamOperator.class */
    private static final class MiniBatchInnerJoinStreamOperator extends MiniBatchStreamingJoinOperator {
        public MiniBatchInnerJoinStreamOperator(MiniBatchStreamingJoinParameter miniBatchStreamingJoinParameter) {
            super(miniBatchStreamingJoinParameter);
        }

        @Override // org.apache.flink.table.runtime.operators.join.stream.MiniBatchStreamingJoinOperator
        protected void processBundles(BufferBundle<?> bufferBundle, BufferBundle<?> bufferBundle2) throws Exception {
            processSingleSideBundles(bufferBundle2, this.rightRecordStateView, this.leftRecordStateView, false);
            processSingleSideBundles(bufferBundle, this.leftRecordStateView, this.rightRecordStateView, true);
        }
    }

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/stream/MiniBatchStreamingJoinOperator$MiniBatchLeftOuterJoinStreamOperator.class */
    private static final class MiniBatchLeftOuterJoinStreamOperator extends MiniBatchStreamingJoinOperator {
        public MiniBatchLeftOuterJoinStreamOperator(MiniBatchStreamingJoinParameter miniBatchStreamingJoinParameter) {
            super(miniBatchStreamingJoinParameter);
        }

        @Override // org.apache.flink.table.runtime.operators.join.stream.MiniBatchStreamingJoinOperator
        protected void processBundles(BufferBundle<?> bufferBundle, BufferBundle<?> bufferBundle2) throws Exception {
            processSingleSideBundles(bufferBundle2, this.rightRecordStateView, this.leftRecordStateView, false);
            processSingleSideBundles(bufferBundle, this.leftRecordStateView, this.rightRecordStateView, true);
        }
    }

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/stream/MiniBatchStreamingJoinOperator$MiniBatchRightOuterJoinStreamOperator.class */
    private static final class MiniBatchRightOuterJoinStreamOperator extends MiniBatchStreamingJoinOperator {
        public MiniBatchRightOuterJoinStreamOperator(MiniBatchStreamingJoinParameter miniBatchStreamingJoinParameter) {
            super(miniBatchStreamingJoinParameter);
        }

        @Override // org.apache.flink.table.runtime.operators.join.stream.MiniBatchStreamingJoinOperator
        protected void processBundles(BufferBundle<?> bufferBundle, BufferBundle<?> bufferBundle2) throws Exception {
            processSingleSideBundles(bufferBundle, this.leftRecordStateView, this.rightRecordStateView, true);
            processSingleSideBundles(bufferBundle2, this.rightRecordStateView, this.leftRecordStateView, false);
        }
    }

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/stream/MiniBatchStreamingJoinOperator$MiniBatchStreamingJoinParameter.class */
    static class MiniBatchStreamingJoinParameter implements Serializable {
        InternalTypeInfo<RowData> leftType;
        InternalTypeInfo<RowData> rightType;
        GeneratedJoinCondition generatedJoinCondition;
        JoinInputSideSpec leftInputSideSpec;
        JoinInputSideSpec rightInputSideSpec;
        boolean leftIsOuter;
        boolean rightIsOuter;
        boolean[] filterNullKeys;
        long leftStateRetentionTime;
        long rightStateRetentionTime;
        CoBundleTrigger<RowData, RowData> coBundleTrigger;

        MiniBatchStreamingJoinParameter(InternalTypeInfo<RowData> internalTypeInfo, InternalTypeInfo<RowData> internalTypeInfo2, GeneratedJoinCondition generatedJoinCondition, JoinInputSideSpec joinInputSideSpec, JoinInputSideSpec joinInputSideSpec2, boolean z, boolean z2, boolean[] zArr, long j, long j2, CoBundleTrigger<RowData, RowData> coBundleTrigger) {
            this.leftType = internalTypeInfo;
            this.rightType = internalTypeInfo2;
            this.generatedJoinCondition = generatedJoinCondition;
            this.leftInputSideSpec = joinInputSideSpec;
            this.rightInputSideSpec = joinInputSideSpec2;
            this.leftIsOuter = z;
            this.rightIsOuter = z2;
            this.filterNullKeys = zArr;
            this.leftStateRetentionTime = j;
            this.rightStateRetentionTime = j2;
            this.coBundleTrigger = coBundleTrigger;
        }
    }

    public MiniBatchStreamingJoinOperator(MiniBatchStreamingJoinParameter miniBatchStreamingJoinParameter) {
        super(miniBatchStreamingJoinParameter.leftType, miniBatchStreamingJoinParameter.rightType, miniBatchStreamingJoinParameter.generatedJoinCondition, miniBatchStreamingJoinParameter.leftInputSideSpec, miniBatchStreamingJoinParameter.rightInputSideSpec, miniBatchStreamingJoinParameter.leftIsOuter, miniBatchStreamingJoinParameter.rightIsOuter, miniBatchStreamingJoinParameter.filterNullKeys, miniBatchStreamingJoinParameter.leftStateRetentionTime, miniBatchStreamingJoinParameter.rightStateRetentionTime);
        this.coBundleTrigger = miniBatchStreamingJoinParameter.coBundleTrigger;
    }

    @Override // org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator, org.apache.flink.table.runtime.operators.join.stream.AbstractStreamingJoinOperator
    public void open() throws Exception {
        super.open();
        this.leftBuffer = initialBuffer(this.leftInputSideSpec);
        this.rightBuffer = initialBuffer(this.rightInputSideSpec);
        this.coBundleTrigger.registerCallback(this);
        this.coBundleTrigger.reset();
        LOG.info("Initialize MiniBatchStreamingJoinOperator successfully.");
        this.leftSerializer = this.leftType.createSerializer(getExecutionConfig());
        this.rightSerializer = this.rightType.createSerializer(getExecutionConfig());
        this.leftBundleReducedSizeGauge = new SimpleGauge<>(0);
        this.rightBundleReducedSizeGauge = new SimpleGauge<>(0);
        getRuntimeContext().getMetricGroup().gauge("leftBundleReducedSize", this.leftBundleReducedSizeGauge);
        getRuntimeContext().getMetricGroup().gauge("rightBundleReducedSize", this.rightBundleReducedSizeGauge);
    }

    @Override // org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator
    public void processElement1(StreamRecord<RowData> streamRecord) throws Exception {
        RowData rowData = (RowData) this.leftSerializer.copy(streamRecord.getValue());
        RowData rowData2 = (RowData) getCurrentKey();
        RowData rowData3 = null;
        if (this.leftInputSideSpec.getUniqueKeySelector() != null) {
            rowData3 = (RowData) this.leftInputSideSpec.getUniqueKeySelector().getKey(rowData);
        }
        this.leftBuffer.addRecord(rowData2, rowData3, rowData);
        this.coBundleTrigger.onElement1(rowData);
    }

    @Override // org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator
    public void processElement2(StreamRecord<RowData> streamRecord) throws Exception {
        RowData rowData = (RowData) this.rightSerializer.copy(streamRecord.getValue());
        RowData rowData2 = (RowData) getCurrentKey();
        RowData rowData3 = null;
        if (this.rightInputSideSpec.getUniqueKeySelector() != null) {
            rowData3 = (RowData) this.rightInputSideSpec.getUniqueKeySelector().getKey(rowData);
        }
        this.rightBuffer.addRecord(rowData2, rowData3, rowData);
        this.coBundleTrigger.onElement2(rowData);
    }

    public void processWatermark1(Watermark watermark) throws Exception {
        finishBundle();
        super.processWatermark1(watermark);
    }

    public void processWatermark2(Watermark watermark) throws Exception {
        finishBundle();
        super.processWatermark2(watermark);
    }

    public void prepareSnapshotPreBarrier(long j) throws Exception {
        finishBundle();
    }

    public void finish() throws Exception {
        finishBundle();
        super.finish();
    }

    @Override // org.apache.flink.table.runtime.operators.join.stream.AbstractStreamingJoinOperator
    public void close() throws Exception {
        super.close();
        this.leftBuffer.clear();
        this.rightBuffer.clear();
    }

    @Override // org.apache.flink.table.runtime.operators.bundle.trigger.BundleTriggerCallback
    public void finishBundle() throws Exception {
        if (!this.leftBuffer.isEmpty() || !this.rightBuffer.isEmpty()) {
            this.leftBundleReducedSizeGauge.update(Integer.valueOf(this.leftBuffer.reducedSize()));
            this.rightBundleReducedSizeGauge.update(Integer.valueOf(this.rightBuffer.reducedSize()));
            processBundles(this.leftBuffer, this.rightBuffer);
            this.leftBuffer.clear();
            this.rightBuffer.clear();
        }
        this.coBundleTrigger.reset();
    }

    protected abstract void processBundles(BufferBundle<?> bufferBundle, BufferBundle<?> bufferBundle2) throws Exception;

    private BufferBundle<?> initialBuffer(JoinInputSideSpec joinInputSideSpec) {
        return joinInputSideSpec.joinKeyContainsUniqueKey() ? new JoinKeyContainsUniqueKeyBundle() : joinInputSideSpec.hasUniqueKey() ? new InputSideHasUniqueKeyBundle() : new InputSideHasNoUniqueKeyBundle();
    }

    private void processElementWithSuppress(Iterator<RowData> it, JoinRecordStateView joinRecordStateView, JoinRecordStateView joinRecordStateView2, boolean z) throws Exception {
        RowData rowData = null;
        while (it.hasNext()) {
            RowData next = it.next();
            boolean z2 = false;
            if (RowDataUtil.isRetractMsg(next) && it.hasNext()) {
                RowData next2 = it.next();
                if (RowDataUtil.isAccumulateMsg(next2)) {
                    z2 = true;
                } else {
                    rowData = next2;
                }
                processElement(next, joinRecordStateView, joinRecordStateView2, z, z2);
                if (z2) {
                    processElement(next2, joinRecordStateView, joinRecordStateView2, z, z2);
                }
            } else {
                if (rowData != null) {
                    if (RowDataUtil.isAccumulateMsg(next)) {
                        z2 = true;
                    }
                    processElement(rowData, joinRecordStateView, joinRecordStateView2, z, z2);
                    rowData = null;
                }
                processElement(next, joinRecordStateView, joinRecordStateView2, z, z2);
            }
        }
    }

    protected void processSingleSideBundles(BufferBundle<?> bufferBundle, JoinRecordStateView joinRecordStateView, JoinRecordStateView joinRecordStateView2, boolean z) throws Exception {
        if (bufferBundle instanceof InputSideHasNoUniqueKeyBundle) {
            for (Map.Entry<RowData, List<RowData>> entry : bufferBundle.getRecords().entrySet()) {
                setCurrentKey(entry.getKey());
                Iterator<RowData> it = entry.getValue().iterator();
                while (it.hasNext()) {
                    processElement(it.next(), joinRecordStateView, joinRecordStateView2, z, false);
                }
            }
            return;
        }
        if (bufferBundle instanceof JoinKeyContainsUniqueKeyBundle) {
            for (Map.Entry<RowData, List<RowData>> entry2 : bufferBundle.getRecords().entrySet()) {
                setCurrentKey(entry2.getKey());
                processElementWithSuppress(entry2.getValue().iterator(), joinRecordStateView, joinRecordStateView2, z);
            }
            return;
        }
        if (bufferBundle instanceof InputSideHasUniqueKeyBundle) {
            for (RowData rowData : bufferBundle.getJoinKeys()) {
                setCurrentKey(rowData);
                Iterator<Map.Entry<RowData, List<RowData>>> it2 = bufferBundle.getRecordsWithJoinKey(rowData).entrySet().iterator();
                while (it2.hasNext()) {
                    processElementWithSuppress(it2.next().getValue().iterator(), joinRecordStateView, joinRecordStateView2, z);
                }
            }
        }
    }

    public static MiniBatchStreamingJoinOperator newMiniBatchStreamJoinOperator(FlinkJoinType flinkJoinType, InternalTypeInfo<RowData> internalTypeInfo, InternalTypeInfo<RowData> internalTypeInfo2, GeneratedJoinCondition generatedJoinCondition, JoinInputSideSpec joinInputSideSpec, JoinInputSideSpec joinInputSideSpec2, boolean z, boolean z2, boolean[] zArr, long j, long j2, CoBundleTrigger<RowData, RowData> coBundleTrigger) {
        MiniBatchStreamingJoinParameter miniBatchStreamingJoinParameter = new MiniBatchStreamingJoinParameter(internalTypeInfo, internalTypeInfo2, generatedJoinCondition, joinInputSideSpec, joinInputSideSpec2, z, z2, zArr, j, j2, coBundleTrigger);
        switch (flinkJoinType) {
            case INNER:
                return new MiniBatchInnerJoinStreamOperator(miniBatchStreamingJoinParameter);
            case LEFT:
                return new MiniBatchLeftOuterJoinStreamOperator(miniBatchStreamingJoinParameter);
            case RIGHT:
                return new MiniBatchRightOuterJoinStreamOperator(miniBatchStreamingJoinParameter);
            case FULL:
                return new MiniBatchFullOuterJoinStreamOperator(miniBatchStreamingJoinParameter);
            default:
                throw new UnsupportedOperationException("Unsupported join type: " + flinkJoinType);
        }
    }
}
