/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.direct.CommittedBundle;
import org.apache.beam.runners.direct.DirectGroupByKey;
import org.apache.beam.runners.direct.EvaluationContext;
import org.apache.beam.runners.direct.StepTransformResult;
import org.apache.beam.runners.direct.TransformEvaluator;
import org.apache.beam.runners.direct.TransformEvaluatorFactory;
import org.apache.beam.runners.direct.TransformResult;
import org.apache.beam.runners.direct.UncommittedBundle;
import org.apache.beam.runners.direct.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.Iterables;
import org.apache.beam.runners.direct.repackaged.runners.core.KeyedWorkItem;
import org.apache.beam.runners.direct.repackaged.runners.core.KeyedWorkItems;
import org.apache.beam.runners.local.StructuralKey;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;

class GroupByKeyOnlyEvaluatorFactory
implements TransformEvaluatorFactory {
    private final EvaluationContext evaluationContext;

    GroupByKeyOnlyEvaluatorFactory(EvaluationContext evaluationContext) {
        this.evaluationContext = evaluationContext;
    }

    @Override
    public <InputT> TransformEvaluator<InputT> forApplication(AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle) {
        TransformEvaluator evaluator = this.createEvaluator(application, inputBundle);
        return evaluator;
    }

    @Override
    public void cleanup() {
    }

    private <K, V> TransformEvaluator<KV<K, V>> createEvaluator(AppliedPTransform<PCollection<KV<K, V>>, PCollection<KeyedWorkItem<K, V>>, DirectGroupByKey.DirectGroupByKeyOnly<K, V>> application, CommittedBundle<KV<K, V>> inputBundle) {
        return new GroupByKeyOnlyEvaluator<K, V>(this.evaluationContext, application);
    }

    private static class GroupByKeyOnlyEvaluator<K, V>
    implements TransformEvaluator<KV<K, V>> {
        private final EvaluationContext evaluationContext;
        private final AppliedPTransform<PCollection<KV<K, V>>, PCollection<KeyedWorkItem<K, V>>, DirectGroupByKey.DirectGroupByKeyOnly<K, V>> application;
        private final Coder<K> keyCoder;
        private Map<GroupingKey<K>, List<WindowedValue<V>>> groupingMap;

        public GroupByKeyOnlyEvaluator(EvaluationContext evaluationContext, AppliedPTransform<PCollection<KV<K, V>>, PCollection<KeyedWorkItem<K, V>>, DirectGroupByKey.DirectGroupByKeyOnly<K, V>> application) {
            this.evaluationContext = evaluationContext;
            this.application = application;
            this.keyCoder = this.getKeyCoder(((PCollection)Iterables.getOnlyElement(application.getInputs().values())).getCoder());
            this.groupingMap = new HashMap<GroupingKey<K>, List<WindowedValue<V>>>();
        }

        private Coder<K> getKeyCoder(Coder<KV<K, V>> coder) {
            Preconditions.checkState(coder instanceof KvCoder, "%s requires a coder of class %s. This is an internal error; this is checked during pipeline construction but became corrupted.", (Object)this.getClass().getSimpleName(), (Object)KvCoder.class.getSimpleName());
            Coder keyCoder = ((KvCoder)coder).getKeyCoder();
            return keyCoder;
        }

        @Override
        public void processElement(WindowedValue<KV<K, V>> element) {
            byte[] encodedKey;
            KV kv = (KV)element.getValue();
            Object key = kv.getKey();
            try {
                encodedKey = CoderUtils.encodeToByteArray(this.keyCoder, (Object)key);
            }
            catch (CoderException exn) {
                throw new IllegalArgumentException(String.format("unable to encode key %s of input to %s using %s", key, this, this.keyCoder), exn);
            }
            GroupingKey<Object> groupingKey = new GroupingKey<Object>(key, encodedKey);
            List values = this.groupingMap.computeIfAbsent(groupingKey, k -> new ArrayList());
            values.add(element.withValue(kv.getValue()));
        }

        @Override
        public TransformResult<KV<K, V>> finishBundle() {
            StepTransformResult.Builder resultBuilder = StepTransformResult.withoutHold(this.application);
            for (Map.Entry<GroupingKey<K>, List<WindowedValue<V>>> groupedEntry : this.groupingMap.entrySet()) {
                Object key = ((GroupingKey)groupedEntry.getKey()).key;
                KeyedWorkItem groupedKv = KeyedWorkItems.elementsWorkItem(key, (Iterable)groupedEntry.getValue());
                UncommittedBundle bundle = this.evaluationContext.createKeyedBundle(StructuralKey.of((Object)key, this.keyCoder), (PCollection)Iterables.getOnlyElement(this.application.getOutputs().values()));
                bundle.add(WindowedValue.valueInGlobalWindow(groupedKv));
                resultBuilder.addOutput(bundle, new UncommittedBundle[0]);
            }
            return resultBuilder.build();
        }

        private static class GroupingKey<K> {
            private K key;
            private byte[] encodedKey;

            public GroupingKey(K key, byte[] encodedKey) {
                this.key = key;
                this.encodedKey = encodedKey;
            }

            public boolean equals(Object o) {
                if (o instanceof GroupingKey) {
                    GroupingKey that = (GroupingKey)o;
                    return Arrays.equals(this.encodedKey, that.encodedKey);
                }
                return false;
            }

            public int hashCode() {
                return Arrays.hashCode(this.encodedKey);
            }
        }
    }
}

