/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.schemas.transforms;

import com.google.auto.value.AutoValue;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.schemas.transforms.AutoValue_CoGroup_By;
import org.apache.beam.sdk.schemas.utils.SelectHelpers;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Maps;

public class CoGroup {
    private static final List NULL_LIST = Lists.newArrayList();

    public static Inner join(By clause) {
        return new Inner(new JoinArguments(clause));
    }

    public static Inner join(String tag, By clause) {
        return new Inner(new JoinArguments(ImmutableMap.of(tag, clause)));
    }

    static void verify(PCollectionTuple input, JoinArguments joinArgs) {
        Set joinTags;
        Set inputTags;
        if (joinArgs.allInputsJoinArgs == null && !(inputTags = input.getAll().keySet().stream().map(TupleTag::getId).collect(Collectors.toSet())).equals(joinTags = joinArgs.joinArgsMap.keySet())) {
            throw new IllegalArgumentException("The input PCollectionTuple has tags: " + inputTags + " and the join was specified for tags " + joinTags + ". These do not match.");
        }
    }

    static {
        NULL_LIST.add(null);
    }

    public static class ExpandCrossProduct
    extends PTransform<PCollectionTuple, PCollection<Row>> {
        private final JoinArguments joinArgs;

        ExpandCrossProduct(JoinArguments joinArgs) {
            this.joinArgs = joinArgs;
        }

        public ExpandCrossProduct join(String tag, By clause) {
            if (this.joinArgs.allInputsJoinArgs != null) {
                throw new IllegalStateException("Cannot set both a global and per-tag fields.");
            }
            return new ExpandCrossProduct(this.joinArgs.with(tag, clause));
        }

        private Schema getOutputSchema(JoinInformation joinInformation) {
            Schema.Builder joinedSchemaBuilder = Schema.builder();
            for (Map.Entry entry : joinInformation.componentSchemas.entrySet()) {
                Schema.FieldType fieldType = Schema.FieldType.row((Schema)entry.getValue());
                if (this.joinArgs.getOuterJoinParticipation((String)entry.getKey())) {
                    fieldType = fieldType.withNullable(true);
                }
                joinedSchemaBuilder.addField((String)entry.getKey(), fieldType);
            }
            return joinedSchemaBuilder.build();
        }

        @Override
        public PCollection<Row> expand(PCollectionTuple input) {
            CoGroup.verify(input, this.joinArgs);
            JoinInformation joinInformation = JoinInformation.from(input, x$0 -> this.joinArgs.getFieldAccessDescriptor(x$0));
            Schema joinedSchema = this.getOutputSchema(joinInformation);
            return ((PCollection)((Object)((PCollection)((PCollection)joinInformation.keyedPCollectionTuple.apply("CoGroupByKey", CoGroupByKey.create())).apply("Values", Values.create())).apply("ExpandToRow", ParDo.of(new ExpandToRows(joinInformation.sortedTags, joinInformation.toRows, joinedSchema, joinInformation.tagToKeyedTag))))).setRowSchema(joinedSchema);
        }

        private class ExpandToRows
        extends DoFn<CoGbkResult, Row> {
            private final List<String> sortedTags;
            private final Map<Integer, SerializableFunction<Object, Row>> toRows;
            private final Schema outputSchema;
            private final Map<Integer, String> tagToKeyedTag;

            public ExpandToRows(List<String> sortedTags, Map<Integer, SerializableFunction<Object, Row>> toRows, Schema outputSchema, Map<Integer, String> tagToKeyedTag) {
                this.sortedTags = sortedTags;
                this.toRows = toRows;
                this.outputSchema = outputSchema;
                this.tagToKeyedTag = tagToKeyedTag;
            }

            @DoFn.ProcessElement
            public void process(@DoFn.Element CoGbkResult gbkResult, DoFn.OutputReceiver<Row> o) {
                List<Iterable> allIterables = this.extractIterables(gbkResult);
                ArrayList<Row> accumulatedRows = Lists.newArrayListWithCapacity(this.sortedTags.size());
                this.crossProduct(0, accumulatedRows, allIterables, o);
            }

            private List<Iterable> extractIterables(CoGbkResult gbkResult) {
                ArrayList<Iterable> iterables = Lists.newArrayListWithCapacity(this.sortedTags.size());
                for (int i = 0; i < this.sortedTags.size(); ++i) {
                    String tag = this.sortedTags.get(i);
                    Iterable<Object> items = gbkResult.getAll(this.tagToKeyedTag.get(i));
                    if (!items.iterator().hasNext() && ExpandCrossProduct.this.joinArgs.getOuterJoinParticipation(tag)) {
                        items = () -> NULL_LIST.iterator();
                    }
                    iterables.add(items);
                }
                return iterables;
            }

            private void crossProduct(int tagIndex, List<Row> accumulatedRows, List<Iterable> iterables, DoFn.OutputReceiver<Row> o) {
                if (tagIndex >= this.sortedTags.size()) {
                    return;
                }
                SerializableFunction<Object, Row> toRow = this.toRows.get(tagIndex);
                for (Object item : iterables.get(tagIndex)) {
                    Row row = toRow.apply(item);
                    this.crossProductHelper(tagIndex, accumulatedRows, row, iterables, o);
                }
            }

            private void crossProductHelper(int tagIndex, List<Row> accumulatedRows, Row newRow, List<Iterable> iterables, DoFn.OutputReceiver<Row> o) {
                boolean atBottom = tagIndex == this.sortedTags.size() - 1;
                accumulatedRows.add(newRow);
                if (atBottom) {
                    o.output(this.buildOutputRow(accumulatedRows));
                } else {
                    this.crossProduct(tagIndex + 1, accumulatedRows, iterables, o);
                }
                accumulatedRows.remove(accumulatedRows.size() - 1);
            }

            private Row buildOutputRow(List rows) {
                return Row.withSchema(this.outputSchema).addValues(rows).build();
            }
        }
    }

    public static class Inner
    extends PTransform<PCollectionTuple, PCollection<KV<Row, Row>>> {
        private final JoinArguments joinArgs;

        private Inner() {
            this(new JoinArguments(Collections.emptyMap()));
        }

        private Inner(JoinArguments joinArgs) {
            this.joinArgs = joinArgs;
        }

        public Inner join(String tag, By clause) {
            if (this.joinArgs.allInputsJoinArgs != null) {
                throw new IllegalStateException("Cannot set both a global and per-tag fields.");
            }
            return new Inner(this.joinArgs.with(tag, clause));
        }

        public ExpandCrossProduct crossProductJoin() {
            return new ExpandCrossProduct(this.joinArgs);
        }

        private Schema getOutputSchema(JoinInformation joinInformation) {
            Schema.Builder joinedSchemaBuilder = Schema.builder();
            for (Map.Entry entry : joinInformation.componentSchemas.entrySet()) {
                joinedSchemaBuilder.addArrayField((String)entry.getKey(), Schema.FieldType.row((Schema)entry.getValue()));
            }
            return joinedSchemaBuilder.build();
        }

        @Override
        public PCollection<KV<Row, Row>> expand(PCollectionTuple input) {
            CoGroup.verify(input, this.joinArgs);
            JoinInformation joinInformation = JoinInformation.from(input, x$0 -> this.joinArgs.getFieldAccessDescriptor(x$0));
            Schema joinedSchema = this.getOutputSchema(joinInformation);
            return ((PCollection)((Object)((PCollection)joinInformation.keyedPCollectionTuple.apply("CoGroupByKey", CoGroupByKey.create())).apply("ConvertToRow", ParDo.of(new ConvertToRow(joinInformation.sortedTags, joinInformation.toRows, joinedSchema, joinInformation.tagToKeyedTag))))).setCoder(KvCoder.of(SchemaCoder.of(joinInformation.keySchema), SchemaCoder.of(joinedSchema)));
        }

        private static class ConvertToRow
        extends DoFn<KV<Row, CoGbkResult>, KV<Row, Row>> {
            private final List<String> sortedTags;
            private final Map<Integer, SerializableFunction<Object, Row>> toRows;
            private final Map<Integer, String> tagToKeyedTag;
            private final Schema joinedSchema;

            ConvertToRow(List<String> sortedTags, Map<Integer, SerializableFunction<Object, Row>> toRows, Schema joinedSchema, Map<Integer, String> tagToKeyedTag) {
                this.sortedTags = sortedTags;
                this.toRows = toRows;
                this.joinedSchema = joinedSchema;
                this.tagToKeyedTag = tagToKeyedTag;
            }

            @DoFn.ProcessElement
            public void process(@DoFn.Element KV<Row, CoGbkResult> kv, DoFn.OutputReceiver<KV<Row, Row>> o) {
                Row key = kv.getKey();
                CoGbkResult result = kv.getValue();
                ArrayList<Object> fields = Lists.newArrayListWithCapacity(this.sortedTags.size());
                for (int i = 0; i < this.sortedTags.size(); ++i) {
                    String tag = this.sortedTags.get(i);
                    SerializableFunction<Object, Row> toRow = this.toRows.get(i);
                    String tupleTag = this.tagToKeyedTag.get(i);
                    ArrayList<Row> joined = Lists.newArrayList();
                    for (Object item : result.getAll(tupleTag)) {
                        joined.add(toRow.apply(item));
                    }
                    fields.add(joined);
                }
                o.output(KV.of(key, Row.withSchema(this.joinedSchema).addValues(fields).build()));
            }
        }
    }

    private static class JoinInformation {
        private final KeyedPCollectionTuple<Row> keyedPCollectionTuple;
        private final Schema keySchema;
        private final Map<String, Schema> componentSchemas;
        private final Map<Integer, SerializableFunction<Object, Row>> toRows;
        private final List<String> sortedTags;
        private final Map<Integer, String> tagToKeyedTag;

        private JoinInformation(KeyedPCollectionTuple<Row> keyedPCollectionTuple, Schema keySchema, Map<String, Schema> componentSchemas, Map<Integer, SerializableFunction<Object, Row>> toRows, List<String> sortedTags, Map<Integer, String> tagToKeyedTag) {
            this.keyedPCollectionTuple = keyedPCollectionTuple;
            this.keySchema = keySchema;
            this.componentSchemas = componentSchemas;
            this.toRows = toRows;
            this.sortedTags = sortedTags;
            this.tagToKeyedTag = tagToKeyedTag;
        }

        private static JoinInformation from(PCollectionTuple input, Function<String, FieldAccessDescriptor> getFieldAccessDescriptor) {
            KeyedPCollectionTuple<Row> keyedPCollectionTuple = KeyedPCollectionTuple.empty(input.getPipeline());
            List<String> sortedTags = input.getAll().keySet().stream().map(TupleTag::getId).sorted().collect(Collectors.toList());
            TreeMap<String, Schema> componentSchemas = Maps.newTreeMap();
            HashMap<Integer, SerializableFunction<Object, Row>> toRows = Maps.newHashMap();
            HashMap<Integer, String> tagToKeyedTag = Maps.newHashMap();
            Schema keySchema = null;
            for (Map.Entry<TupleTag<?>, PCollection<?>> entry : input.getAll().entrySet()) {
                String tag = entry.getKey().getId();
                int tagIndex = sortedTags.indexOf(tag);
                PCollection<?> pc = entry.getValue();
                Schema schema = pc.getSchema();
                componentSchemas.put(tag, schema);
                toRows.put(tagIndex, pc.getToRowFunction());
                FieldAccessDescriptor fieldAccessDescriptor = getFieldAccessDescriptor.apply(tag);
                if (fieldAccessDescriptor == null) {
                    throw new IllegalStateException("No fields were set for input " + tag);
                }
                FieldAccessDescriptor resolved = fieldAccessDescriptor.withOrderByFieldInsertionOrder().resolve(schema);
                Schema currentKeySchema = SelectHelpers.getOutputSchema(schema, resolved);
                if (keySchema == null) {
                    keySchema = currentKeySchema;
                } else if (!currentKeySchema.typesEqual(keySchema)) {
                    throw new IllegalStateException("All keys must have the same schema");
                }
                TupleTag randomTag = new TupleTag();
                String keyedTag = tag + "_" + randomTag;
                tagToKeyedTag.put(tagIndex, keyedTag);
                PCollection keyedPCollection = JoinInformation.extractKey(pc, schema, keySchema, resolved, tag);
                keyedPCollectionTuple = keyedPCollectionTuple.and(keyedTag, keyedPCollection);
            }
            return new JoinInformation(keyedPCollectionTuple, keySchema, componentSchemas, toRows, sortedTags, tagToKeyedTag);
        }

        private static <T> PCollection<KV<Row, Row>> extractKey(PCollection<T> pCollection, final Schema schema, final Schema keySchema, final FieldAccessDescriptor keyFields, String tag) {
            return ((PCollection)pCollection.apply("extractKey" + tag, ParDo.of(new DoFn<T, KV<Row, Row>>(){

                @DoFn.ProcessElement
                public void process(@DoFn.Element Row row, DoFn.OutputReceiver<KV<Row, Row>> o) {
                    o.output(KV.of(SelectHelpers.selectRow(row, keyFields, schema, keySchema), row));
                }
            }))).setCoder(KvCoder.of(SchemaCoder.of(keySchema), SchemaCoder.of(schema)));
        }
    }

    private static class JoinArguments
    implements Serializable {
        @Nullable
        private final By allInputsJoinArgs;
        private final Map<String, By> joinArgsMap;

        JoinArguments(@Nullable By allInputsJoinArgs) {
            this.allInputsJoinArgs = allInputsJoinArgs;
            this.joinArgsMap = Collections.emptyMap();
        }

        JoinArguments(Map<String, By> joinArgsMap) {
            this.allInputsJoinArgs = null;
            this.joinArgsMap = joinArgsMap;
        }

        JoinArguments with(String tag, By clause) {
            return new JoinArguments(new ImmutableMap.Builder<String, By>().putAll(this.joinArgsMap).put(tag, clause).build());
        }

        @Nullable
        private FieldAccessDescriptor getFieldAccessDescriptor(String tag) {
            return this.allInputsJoinArgs != null ? this.allInputsJoinArgs.getFieldAccessDescriptor() : this.joinArgsMap.get(tag).getFieldAccessDescriptor();
        }

        private boolean getOuterJoinParticipation(String tag) {
            return this.allInputsJoinArgs != null ? this.allInputsJoinArgs.getOuterJoinParticipation() : this.joinArgsMap.get(tag).getOuterJoinParticipation();
        }
    }

    @AutoValue
    public static abstract class By
    implements Serializable {
        abstract FieldAccessDescriptor getFieldAccessDescriptor();

        abstract boolean getOuterJoinParticipation();

        abstract Builder toBuilder();

        public static By fieldNames(String ... fieldNames) {
            return By.fieldAccessDescriptor(FieldAccessDescriptor.withFieldNames(fieldNames));
        }

        public static By fieldIds(Integer ... fieldIds) {
            return By.fieldAccessDescriptor(FieldAccessDescriptor.withFieldIds(fieldIds));
        }

        public static By fieldAccessDescriptor(FieldAccessDescriptor fieldAccessDescriptor) {
            return new AutoValue_CoGroup_By.Builder().setFieldAccessDescriptor(fieldAccessDescriptor).setOuterJoinParticipation(false).build();
        }

        public By withOuterJoinParticipation() {
            return this.toBuilder().setOuterJoinParticipation(true).build();
        }

        @AutoValue.Builder
        static abstract class Builder {
            Builder() {
            }

            abstract Builder setFieldAccessDescriptor(FieldAccessDescriptor var1);

            abstract Builder setOuterJoinParticipation(boolean var1);

            abstract By build();
        }
    }
}

