package org.apache.beam.runners.flink.translation.wrappers.streaming.state;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.beam.repackaged.beam_sdks_java_core.net.bytebuddy.asm.Advice;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.MapState;
import org.apache.beam.sdk.state.ReadableState;
import org.apache.beam.sdk.state.SetState;
import org.apache.beam.sdk.state.State;
import org.apache.beam.sdk.state.StateContext;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.KeyGroupsList;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.class */
public class FlinkKeyGroupStateInternals<K> implements StateInternals {
    private final Coder<K> keyCoder;
    private final KeyGroupsList localKeyGroupRange;
    private KeyedStateBackend keyedStateBackend;
    private final int localKeyGroupRangeStartIdx;
    private final Map<String, Tuple2<Coder<?>, Map<String, ?>>>[] stateTables;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals$AbstractKeyGroupState.class */
    public abstract class AbstractKeyGroupState<InputT, AccumT, OutputT> {
        private String stateName;
        private String namespace;
        private Coder<AccumT> coder;
        private KeyGroupCombiner<InputT, AccumT, OutputT> keyGroupCombiner;

        AbstractKeyGroupState(String str, String str2, Coder<AccumT> coder, KeyGroupCombiner<InputT, AccumT, OutputT> keyGroupCombiner) {
            this.stateName = str;
            this.namespace = str2;
            this.coder = coder;
            this.keyGroupCombiner = keyGroupCombiner;
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r1v12, types: [java.util.HashMap, T1] */
        void addInput(InputT inputt) {
            Map map = FlinkKeyGroupStateInternals.this.stateTables[FlinkKeyGroupStateInternals.this.getIndexForKeyGroup(FlinkKeyGroupStateInternals.this.keyedStateBackend.getCurrentKeyGroupIndex())];
            Tuple2 tuple2 = (Tuple2) map.get(this.stateName);
            if (tuple2 == null) {
                tuple2 = new Tuple2();
                tuple2.f0 = this.coder;
                tuple2.f1 = new HashMap();
                map.put(this.stateName, tuple2);
            }
            Map map2 = (Map) tuple2.f1;
            Object obj = map2.get(this.namespace);
            if (obj == null) {
                obj = this.keyGroupCombiner.createAccumulator();
            }
            map2.put(this.namespace, this.keyGroupCombiner.addInput(obj, inputt));
        }

        OutputT extractOutput() {
            Object obj;
            ArrayList arrayList = new ArrayList(FlinkKeyGroupStateInternals.this.stateTables.length);
            for (Map map : FlinkKeyGroupStateInternals.this.stateTables) {
                Tuple2 tuple2 = (Tuple2) map.get(this.stateName);
                if (tuple2 != null && (obj = ((Map) tuple2.f1).get(this.namespace)) != null) {
                    arrayList.add(obj);
                }
            }
            return this.keyGroupCombiner.extractOutput(arrayList);
        }

        boolean isEmptyInternal() {
            for (Map map : FlinkKeyGroupStateInternals.this.stateTables) {
                Tuple2 tuple2 = (Tuple2) map.get(this.stateName);
                if (tuple2 != null && ((Map) tuple2.f1).get(this.namespace) != null) {
                    return false;
                }
            }
            return true;
        }

        void clearInternal() {
            for (Map map : FlinkKeyGroupStateInternals.this.stateTables) {
                Tuple2 tuple2 = (Tuple2) map.get(this.stateName);
                if (tuple2 != null) {
                    ((Map) tuple2.f1).remove(this.namespace);
                    if (((Map) tuple2.f1).isEmpty()) {
                        map.remove(this.stateName);
                    }
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals$FlinkKeyGroupBagState.class */
    public class FlinkKeyGroupBagState<T> extends FlinkKeyGroupStateInternals<K>.AbstractKeyGroupState<T, List<T>, Iterable<T>> implements BagState<T> {
        private final StateNamespace namespace;
        private final StateTag<BagState<T>> address;

        FlinkKeyGroupBagState(StateTag<BagState<T>> stateTag, StateNamespace stateNamespace, Coder<T> coder) {
            super(stateTag.getId(), stateNamespace.stringKey(), ListCoder.of(coder), new KeyGroupBagCombiner());
            this.namespace = stateNamespace;
            this.address = stateTag;
        }

        @Override // org.apache.beam.sdk.state.GroupingState
        public void add(T t) {
            addInput(t);
        }

        @Override // org.apache.beam.sdk.state.BagState, org.apache.beam.sdk.state.GroupingState, org.apache.beam.sdk.state.ReadableState
        public BagState<T> readLater() {
            return this;
        }

        @Override // org.apache.beam.sdk.state.BagState, org.apache.beam.sdk.state.ReadableState
        public Iterable<T> read() {
            Iterable<T> extractOutput = extractOutput();
            return extractOutput != null ? extractOutput : Collections.emptyList();
        }

        @Override // org.apache.beam.sdk.state.GroupingState
        public ReadableState<Boolean> isEmpty() {
            return new ReadableState<Boolean>() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkKeyGroupStateInternals.FlinkKeyGroupBagState.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // org.apache.beam.sdk.state.ReadableState
                public Boolean read() {
                    try {
                        return Boolean.valueOf(FlinkKeyGroupBagState.this.isEmptyInternal());
                    } catch (Exception e) {
                        throw new RuntimeException("Error reading state.", e);
                    }
                }

                @Override // org.apache.beam.sdk.state.ReadableState
                public ReadableState<Boolean> readLater() {
                    return this;
                }
            };
        }

        @Override // org.apache.beam.sdk.state.State
        public void clear() {
            clearInternal();
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            FlinkKeyGroupBagState flinkKeyGroupBagState = (FlinkKeyGroupBagState) obj;
            return this.namespace.equals(flinkKeyGroupBagState.namespace) && this.address.equals(flinkKeyGroupBagState.address);
        }

        public int hashCode() {
            return (31 * this.namespace.hashCode()) + this.address.hashCode();
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals$KeyGroupBagCombiner.class */
    private static class KeyGroupBagCombiner<T> implements KeyGroupCombiner<T, List<T>, Iterable<T>> {
        private KeyGroupBagCombiner() {
        }

        @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkKeyGroupStateInternals.KeyGroupCombiner
        public List<T> createAccumulator() {
            return new ArrayList();
        }

        public List<T> addInput(List<T> list, T t) {
            list.add(t);
            return list;
        }

        @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkKeyGroupStateInternals.KeyGroupCombiner
        public Iterable<T> extractOutput(Iterable<List<T>> iterable) {
            ArrayList arrayList = new ArrayList();
            Iterator<List<T>> it = iterable.iterator();
            while (it.hasNext()) {
                arrayList.addAll(it.next());
            }
            return arrayList;
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkKeyGroupStateInternals.KeyGroupCombiner
        public /* bridge */ /* synthetic */ Object addInput(Object obj, Object obj2) {
            return addInput((List<List<T>>) obj, (List<T>) obj2);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals$KeyGroupCombiner.class */
    public interface KeyGroupCombiner<InputT, AccumT, OutputT> {
        AccumT createAccumulator();

        AccumT addInput(AccumT accumt, InputT inputt);

        OutputT extractOutput(Iterable<AccumT> iterable);
    }

    public FlinkKeyGroupStateInternals(Coder<K> coder, KeyedStateBackend keyedStateBackend) {
        this.keyCoder = coder;
        this.keyedStateBackend = keyedStateBackend;
        this.localKeyGroupRange = keyedStateBackend.getKeyGroupRange();
        int i = Integer.MAX_VALUE;
        Iterator<Integer> it = this.localKeyGroupRange.iterator();
        while (it.hasNext()) {
            i = Math.min(it.next().intValue(), i);
        }
        this.localKeyGroupRangeStartIdx = i;
        this.stateTables = new Map[this.localKeyGroupRange.getNumberOfKeyGroups()];
        for (int i2 = 0; i2 < this.stateTables.length; i2++) {
            this.stateTables[i2] = new HashMap();
        }
    }

    @Override // org.apache.beam.runners.core.StateInternals
    public K getKey() {
        ByteBuffer byteBuffer = (ByteBuffer) this.keyedStateBackend.getCurrentKey();
        try {
            byte[] bArr = new byte[byteBuffer.remaining()];
            byteBuffer.get(bArr);
            byteBuffer.position(byteBuffer.position() - bArr.length);
            return (K) CoderUtils.decodeFromByteArray(this.keyCoder, bArr);
        } catch (CoderException e) {
            throw new RuntimeException("Error decoding key.", e);
        }
    }

    @Override // org.apache.beam.runners.core.StateInternals
    public <T extends State> T state(final StateNamespace stateNamespace, StateTag<T> stateTag, StateContext<?> stateContext) {
        return stateTag.bind(new StateTag.StateBinder() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkKeyGroupStateInternals.1
            @Override // org.apache.beam.runners.core.StateTag.StateBinder
            public <T2> ValueState<T2> bindValue(StateTag<ValueState<T2>> stateTag2, Coder<T2> coder) {
                throw new UnsupportedOperationException(String.format("%s is not supported", ValueState.class.getSimpleName()));
            }

            @Override // org.apache.beam.runners.core.StateTag.StateBinder
            public <T2> BagState<T2> bindBag(StateTag<BagState<T2>> stateTag2, Coder<T2> coder) {
                return new FlinkKeyGroupBagState(stateTag2, stateNamespace, coder);
            }

            @Override // org.apache.beam.runners.core.StateTag.StateBinder
            public <T2> SetState<T2> bindSet(StateTag<SetState<T2>> stateTag2, Coder<T2> coder) {
                throw new UnsupportedOperationException(String.format("%s is not supported", SetState.class.getSimpleName()));
            }

            @Override // org.apache.beam.runners.core.StateTag.StateBinder
            public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(StateTag<MapState<KeyT, ValueT>> stateTag2, Coder<KeyT> coder, Coder<ValueT> coder2) {
                throw new UnsupportedOperationException(String.format("%s is not supported", MapState.class.getSimpleName()));
            }

            @Override // org.apache.beam.runners.core.StateTag.StateBinder
            public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValue(StateTag<CombiningState<InputT, AccumT, OutputT>> stateTag2, Coder<AccumT> coder, Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
                throw new UnsupportedOperationException("bindCombiningValue is not supported.");
            }

            @Override // org.apache.beam.runners.core.StateTag.StateBinder
            public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(StateTag<CombiningState<InputT, AccumT, OutputT>> stateTag2, Coder<AccumT> coder, CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFnWithContext) {
                throw new UnsupportedOperationException("bindCombiningValueWithContext is not supported.");
            }

            @Override // org.apache.beam.runners.core.StateTag.StateBinder
            public WatermarkHoldState bindWatermark(StateTag<WatermarkHoldState> stateTag2, TimestampCombiner timestampCombiner) {
                throw new UnsupportedOperationException(String.format("%s is not supported", CombiningState.class.getSimpleName()));
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public int getIndexForKeyGroup(int i) {
        Preconditions.checkArgument(this.localKeyGroupRange.contains(i), "Key Group " + i + " does not belong to the local range.");
        return i - this.localKeyGroupRangeStartIdx;
    }

    public void snapshotKeyGroupState(int i, DataOutputStream dataOutputStream) throws Exception {
        Map<String, Tuple2<Coder<?>, Map<String, ?>>> map = this.stateTables[getIndexForKeyGroup(i)];
        Preconditions.checkState(map.size() <= 32767, "Too many States: " + map.size() + ". Currently at most " + Advice.MethodSizeHandler.UNDEFINED_SIZE + " states are supported");
        dataOutputStream.writeShort(map.size());
        for (Map.Entry<String, Tuple2<Coder<?>, Map<String, ?>>> entry : map.entrySet()) {
            dataOutputStream.writeUTF(entry.getKey());
            Coder<?> coder = entry.getValue().f0;
            InstantiationUtil.serializeObject(dataOutputStream, coder);
            Map<String, ?> map2 = entry.getValue().f1;
            dataOutputStream.writeInt(map2.size());
            for (Map.Entry<String, ?> entry2 : map2.entrySet()) {
                StringUtf8Coder.of().encode(entry2.getKey(), (OutputStream) dataOutputStream);
                coder.encode(entry2.getValue(), dataOutputStream);
            }
        }
    }

    /* JADX WARN: Type inference failed for: r0v13, types: [org.apache.beam.sdk.coders.Coder, T0] */
    /* JADX WARN: Type inference failed for: r1v11, types: [java.util.HashMap, T1] */
    public void restoreKeyGroupState(int i, DataInputStream dataInputStream, ClassLoader classLoader) throws Exception {
        Map<String, Tuple2<Coder<?>, Map<String, ?>>> map = this.stateTables[getIndexForKeyGroup(i)];
        int readShort = dataInputStream.readShort();
        for (int i2 = 0; i2 < readShort; i2++) {
            String readUTF = dataInputStream.readUTF();
            ?? r0 = (Coder) InstantiationUtil.deserializeObject(dataInputStream, classLoader);
            Tuple2<Coder<?>, Map<String, ?>> tuple2 = map.get(readUTF);
            if (tuple2 == null) {
                tuple2 = new Tuple2<>();
                tuple2.f0 = r0;
                tuple2.f1 = new HashMap();
                map.put(readUTF, tuple2);
            }
            Map<String, ?> map2 = tuple2.f1;
            int readInt = dataInputStream.readInt();
            for (int i3 = 0; i3 < readInt; i3++) {
                map2.put(StringUtf8Coder.of().decode((InputStream) dataInputStream), r0.decode(dataInputStream));
            }
        }
    }
}
