/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.nodes.exec.stream;

import java.util.Collections;
import java.util.List;
import org.apache.flink.FlinkVersion;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.spec.SortSpec;
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.keyselector.EmptyRowDataKeySelector;
import org.apache.flink.table.runtime.operators.sort.ProcTimeSortOperator;
import org.apache.flink.table.runtime.operators.sort.RowTimeSortOperator;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
import org.apache.flink.util.Preconditions;

@ExecNodeMetadata(name="stream-exec-temporal-sort", version=1, producedTransformations={"temporal-sort"}, minPlanVersion=FlinkVersion.v1_15, minStateVersion=FlinkVersion.v1_15)
public class StreamExecTemporalSort
extends ExecNodeBase<RowData>
implements StreamExecNode<RowData>,
MultipleTransformationTranslator<RowData> {
    public static final String TEMPORAL_SORT_TRANSFORMATION = "temporal-sort";
    public static final String FIELD_NAME_SORT_SPEC = "orderBy";
    @JsonProperty(value="orderBy")
    private final SortSpec sortSpec;

    public StreamExecTemporalSort(ReadableConfig tableConfig, SortSpec sortSpec, InputProperty inputProperty, RowType outputType, String description) {
        this(ExecNodeContext.newNodeId(), ExecNodeContext.newContext(StreamExecTemporalSort.class), ExecNodeContext.newPersistedConfig(StreamExecTemporalSort.class, tableConfig), sortSpec, Collections.singletonList(inputProperty), outputType, description);
    }

    @JsonCreator
    public StreamExecTemporalSort(@JsonProperty(value="id") int id, @JsonProperty(value="type") ExecNodeContext context, @JsonProperty(value="configuration") ReadableConfig persistedConfig, @JsonProperty(value="orderBy") SortSpec sortSpec, @JsonProperty(value="inputProperties") List<InputProperty> inputProperties, @JsonProperty(value="outputType") RowType outputType, @JsonProperty(value="description") String description) {
        super(id, context, persistedConfig, inputProperties, (LogicalType)outputType, description);
        Preconditions.checkArgument((inputProperties.size() == 1 ? 1 : 0) != 0);
        this.sortSpec = (SortSpec)Preconditions.checkNotNull((Object)sortSpec);
    }

    @Override
    protected Transformation<RowData> translateToPlanInternal(PlannerBase planner, ExecNodeConfig config) {
        if (this.sortSpec.getFieldSize() == 0 || !this.sortSpec.getFieldSpec(0).getIsAscendingOrder()) {
            throw new TableException("Sort: Primary sort order of a streaming table must be ascending on time.\nplease re-check sort statement according to the description above");
        }
        ExecEdge inputEdge = this.getInputEdges().get(0);
        Transformation<?> inputTransform = inputEdge.translateToPlan(planner);
        RowType inputType = (RowType)inputEdge.getOutputType();
        LogicalType timeType = inputType.getTypeAt(this.sortSpec.getFieldSpec(0).getFieldIndex());
        if (LogicalTypeChecks.isRowtimeAttribute((LogicalType)timeType)) {
            return this.createSortRowTime(inputType, inputTransform, config);
        }
        if (LogicalTypeChecks.isProctimeAttribute((LogicalType)timeType)) {
            return this.createSortProcTime(inputType, inputTransform, config);
        }
        throw new TableException(String.format("Sort: Internal Error\nFirst field in temporal sort is not a time attribute, %s is given.", timeType));
    }

    private Transformation<RowData> createSortProcTime(RowType inputType, Transformation<RowData> inputTransform, ExecNodeConfig config) {
        if (this.sortSpec.getFieldSize() > 1) {
            SortSpec specExcludeTime = this.sortSpec.createSubSortSpec(1);
            GeneratedRecordComparator rowComparator = ComparatorCodeGenerator.gen(config.getTableConfig(), "ProcTimeSortComparator", inputType, specExcludeTime);
            ProcTimeSortOperator sortOperator = new ProcTimeSortOperator(InternalTypeInfo.of((RowType)inputType), rowComparator);
            OneInputTransformation transform = ExecNodeUtil.createOneInputTransformation(inputTransform, this.createTransformationMeta(TEMPORAL_SORT_TRANSFORMATION, config), sortOperator, InternalTypeInfo.of((RowType)inputType), inputTransform.getParallelism());
            if (this.inputsContainSingleton()) {
                transform.setParallelism(1);
                transform.setMaxParallelism(1);
            }
            EmptyRowDataKeySelector selector = EmptyRowDataKeySelector.INSTANCE;
            transform.setStateKeySelector((KeySelector)selector);
            transform.setStateKeyType((TypeInformation)selector.getProducedType());
            return transform;
        }
        return inputTransform;
    }

    private Transformation<RowData> createSortRowTime(RowType inputType, Transformation<RowData> inputTransform, ExecNodeConfig config) {
        GeneratedRecordComparator rowComparator = null;
        if (this.sortSpec.getFieldSize() > 1) {
            SortSpec specExcludeTime = this.sortSpec.createSubSortSpec(1);
            rowComparator = ComparatorCodeGenerator.gen(config.getTableConfig(), "RowTimeSortComparator", inputType, specExcludeTime);
        }
        RowTimeSortOperator sortOperator = new RowTimeSortOperator(InternalTypeInfo.of((RowType)inputType), this.sortSpec.getFieldSpec(0).getFieldIndex(), rowComparator);
        OneInputTransformation transform = ExecNodeUtil.createOneInputTransformation(inputTransform, this.createTransformationMeta(TEMPORAL_SORT_TRANSFORMATION, config), sortOperator, InternalTypeInfo.of((RowType)inputType), inputTransform.getParallelism());
        if (this.inputsContainSingleton()) {
            transform.setParallelism(1);
            transform.setMaxParallelism(1);
        }
        EmptyRowDataKeySelector selector = EmptyRowDataKeySelector.INSTANCE;
        transform.setStateKeySelector((KeySelector)selector);
        transform.setStateKeyType((TypeInformation)selector.getProducedType());
        return transform;
    }
}

