/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness.state;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.beam.fn.harness.state.BagUserState;
import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.fn.harness.state.MultimapSideInput;
import org.apache.beam.fn.harness.state.SideInputSpec;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.function.ThrowingRunnable;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.MapState;
import org.apache.beam.sdk.state.ReadableState;
import org.apache.beam.sdk.state.ReadableStates;
import org.apache.beam.sdk.state.SetState;
import org.apache.beam.sdk.state.StateBinder;
import org.apache.beam.sdk.state.StateContext;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.CombineFnUtil;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Maps;

public class FnApiStateAccessor
implements SideInputReader,
StateBinder {
    private final PipelineOptions pipelineOptions;
    private final Map<BeamFnApi.StateKey, Object> stateKeyObjectCache;
    private final Map<TupleTag<?>, SideInputSpec> sideInputSpecMap;
    private final BeamFnStateClient beamFnStateClient;
    private final String ptransformId;
    private final Supplier<String> processBundleInstructionId;
    private final Collection<ThrowingRunnable> stateFinalizers;
    private final Supplier<BoundedWindow> currentWindowSupplier;
    private final Supplier<ByteString> encodedCurrentKeySupplier;
    private final Supplier<ByteString> encodedCurrentWindowSupplier;

    public FnApiStateAccessor(PipelineOptions pipelineOptions, String ptransformId, Supplier<String> processBundleInstructionId, Map<TupleTag<?>, SideInputSpec> sideInputSpecMap, BeamFnStateClient beamFnStateClient, Coder<?> keyCoder, Coder<BoundedWindow> windowCoder, Supplier<WindowedValue<?>> currentElementSupplier, Supplier<BoundedWindow> currentWindowSupplier) {
        this.pipelineOptions = pipelineOptions;
        this.stateKeyObjectCache = Maps.newHashMap();
        this.sideInputSpecMap = sideInputSpecMap;
        this.beamFnStateClient = beamFnStateClient;
        this.ptransformId = ptransformId;
        this.processBundleInstructionId = processBundleInstructionId;
        this.stateFinalizers = new ArrayList<ThrowingRunnable>();
        this.currentWindowSupplier = currentWindowSupplier;
        this.encodedCurrentKeySupplier = FnApiStateAccessor.memoizeFunction(currentElementSupplier, element -> {
            Preconditions.checkState(element.getValue() instanceof KV, "Accessing state in unkeyed context. Current element is not a KV: %s.", element);
            Preconditions.checkState(keyCoder != null, "Accessing state in unkeyed context, no key coder available");
            ByteString.Output encodedKeyOut = ByteString.newOutput();
            try {
                keyCoder.encode(((KV)element.getValue()).getKey(), encodedKeyOut);
            }
            catch (IOException e) {
                throw new IllegalStateException(e);
            }
            return encodedKeyOut.toByteString();
        });
        this.encodedCurrentWindowSupplier = FnApiStateAccessor.memoizeFunction(currentWindowSupplier, window -> {
            ByteString.Output encodedWindowOut = ByteString.newOutput();
            try {
                windowCoder.encode((BoundedWindow)window, encodedWindowOut);
            }
            catch (IOException e) {
                throw new IllegalStateException(e);
            }
            return encodedWindowOut.toByteString();
        });
    }

    private static <ArgT, ResultT> Supplier<ResultT> memoizeFunction(final Supplier<ArgT> arg, final Function<ArgT, ResultT> f) {
        return new Supplier<ResultT>(){
            private ArgT memoizedArg;
            private ResultT memoizedResult;

            @Override
            public ResultT get() {
                Object currentArg = arg.get();
                if (currentArg != this.memoizedArg) {
                    this.memoizedArg = currentArg;
                    this.memoizedResult = f.apply(this.memoizedArg);
                }
                return this.memoizedResult;
            }
        };
    }

    @Override
    @Nullable
    public <T> T get(PCollectionView<T> view, BoundedWindow window) {
        TupleTag<?> tag = view.getTagInternal();
        SideInputSpec sideInputSpec = this.sideInputSpecMap.get(tag);
        Preconditions.checkArgument(sideInputSpec != null, "Attempting to access unknown side input %s.", view);
        KvCoder kvCoder = (KvCoder)sideInputSpec.getCoder();
        ByteString.Output encodedWindowOut = ByteString.newOutput();
        try {
            sideInputSpec.getWindowCoder().encode(sideInputSpec.getWindowMappingFn().getSideInputWindow(window), encodedWindowOut);
        }
        catch (IOException e) {
            throw new IllegalStateException(e);
        }
        ByteString encodedWindow = encodedWindowOut.toByteString();
        BeamFnApi.StateKey.Builder cacheKeyBuilder = BeamFnApi.StateKey.newBuilder();
        cacheKeyBuilder.getMultimapSideInputBuilder().setPtransformId(this.ptransformId).setSideInputId(tag.getId()).setWindow(encodedWindow);
        return (T)this.stateKeyObjectCache.computeIfAbsent(cacheKeyBuilder.build(), key -> sideInputSpec.getViewFn().apply(new MultimapSideInput(this.beamFnStateClient, this.processBundleInstructionId.get(), this.ptransformId, tag.getId(), encodedWindow, kvCoder.getKeyCoder(), kvCoder.getValueCoder())));
    }

    @Override
    public <T> boolean contains(PCollectionView<T> view) {
        return this.sideInputSpecMap.containsKey(view.getTagInternal());
    }

    @Override
    public boolean isEmpty() {
        return this.sideInputSpecMap.isEmpty();
    }

    @Override
    public <T> ValueState<T> bindValue(final String id, StateSpec<ValueState<T>> spec, final Coder<T> coder) {
        return (ValueState)this.stateKeyObjectCache.computeIfAbsent(this.createBagUserStateKey(id), new Function<BeamFnApi.StateKey, Object>(){

            @Override
            public Object apply(BeamFnApi.StateKey key) {
                return new ValueState<T>(){
                    private final BagUserState<T> impl;
                    {
                        this.impl = FnApiStateAccessor.this.createBagUserState(id, coder);
                    }

                    @Override
                    public void clear() {
                        this.impl.clear();
                    }

                    @Override
                    public void write(T input) {
                        this.impl.clear();
                        this.impl.append(input);
                    }

                    @Override
                    public T read() {
                        Iterator value = this.impl.get().iterator();
                        if (value.hasNext()) {
                            return value.next();
                        }
                        return null;
                    }

                    @Override
                    public ValueState<T> readLater() {
                        return this;
                    }
                };
            }
        });
    }

    @Override
    public <T> BagState<T> bindBag(final String id, StateSpec<BagState<T>> spec, final Coder<T> elemCoder) {
        return (BagState)this.stateKeyObjectCache.computeIfAbsent(this.createBagUserStateKey(id), new Function<BeamFnApi.StateKey, Object>(){

            @Override
            public Object apply(BeamFnApi.StateKey key) {
                return new BagState<T>(){
                    private final BagUserState<T> impl;
                    {
                        this.impl = FnApiStateAccessor.this.createBagUserState(id, elemCoder);
                    }

                    @Override
                    public void add(T value) {
                        this.impl.append(value);
                    }

                    @Override
                    public ReadableState<Boolean> isEmpty() {
                        return new ReadableState<Boolean>(){

                            @Override
                            @Nullable
                            public Boolean read() {
                                return !impl.get().iterator().hasNext();
                            }

                            @Override
                            public ReadableState<Boolean> readLater() {
                                return this;
                            }
                        };
                    }

                    @Override
                    public Iterable<T> read() {
                        return this.impl.get();
                    }

                    @Override
                    public BagState<T> readLater() {
                        return this;
                    }

                    @Override
                    public void clear() {
                        this.impl.clear();
                    }
                };
            }
        });
    }

    @Override
    public <T> SetState<T> bindSet(String id, StateSpec<SetState<T>> spec, Coder<T> elemCoder) {
        throw new UnsupportedOperationException("TODO: Add support for a map state to the Fn API.");
    }

    @Override
    public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(String id, StateSpec<MapState<KeyT, ValueT>> spec, Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
        throw new UnsupportedOperationException("TODO: Add support for a map state to the Fn API.");
    }

    public <ElementT, AccumT, ResultT> CombiningState<ElementT, AccumT, ResultT> bindCombining(final String id, StateSpec<CombiningState<ElementT, AccumT, ResultT>> spec, final Coder<AccumT> accumCoder, final Combine.CombineFn<ElementT, AccumT, ResultT> combineFn) {
        return (CombiningState)this.stateKeyObjectCache.computeIfAbsent(this.createBagUserStateKey(id), new Function<BeamFnApi.StateKey, Object>(){

            @Override
            public Object apply(BeamFnApi.StateKey key) {
                return new CombiningState<ElementT, AccumT, ResultT>(){
                    private final BagUserState<AccumT> impl;
                    {
                        this.impl = FnApiStateAccessor.this.createBagUserState(id, accumCoder);
                    }

                    @Override
                    public AccumT getAccum() {
                        Iterator iterator = this.impl.get().iterator();
                        if (iterator.hasNext()) {
                            return iterator.next();
                        }
                        return combineFn.createAccumulator();
                    }

                    @Override
                    public void addAccum(AccumT accum) {
                        Iterator iterator = this.impl.get().iterator();
                        if (iterator.hasNext()) {
                            accum = combineFn.mergeAccumulators(ImmutableList.of(iterator.next(), accum));
                            this.impl.clear();
                        }
                        this.impl.append(accum);
                    }

                    @Override
                    public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
                        return combineFn.mergeAccumulators(accumulators);
                    }

                    @Override
                    public CombiningState<ElementT, AccumT, ResultT> readLater() {
                        return this;
                    }

                    @Override
                    public ResultT read() {
                        Iterator iterator = this.impl.get().iterator();
                        if (iterator.hasNext()) {
                            return combineFn.extractOutput(iterator.next());
                        }
                        return combineFn.defaultValue();
                    }

                    @Override
                    public void add(ElementT value) {
                        Object newAccumulator = combineFn.addInput(this.getAccum(), value);
                        this.impl.clear();
                        this.impl.append(newAccumulator);
                    }

                    @Override
                    public ReadableState<Boolean> isEmpty() {
                        return ReadableStates.immediate(!this.impl.get().iterator().hasNext());
                    }

                    @Override
                    public void clear() {
                        this.impl.clear();
                    }
                };
            }
        });
    }

    public <ElementT, AccumT, ResultT> CombiningState<ElementT, AccumT, ResultT> bindCombiningWithContext(String id, StateSpec<CombiningState<ElementT, AccumT, ResultT>> spec, Coder<AccumT> accumCoder, CombineWithContext.CombineFnWithContext<ElementT, AccumT, ResultT> combineFn) {
        return (CombiningState)this.stateKeyObjectCache.computeIfAbsent(this.createBagUserStateKey(id), key -> this.bindCombining(id, spec, (Coder<AccumT>)accumCoder, CombineFnUtil.bindContext(combineFn, new StateContext<BoundedWindow>(){

            @Override
            public PipelineOptions getPipelineOptions() {
                return FnApiStateAccessor.this.pipelineOptions;
            }

            @Override
            public <T> T sideInput(PCollectionView<T> view) {
                return FnApiStateAccessor.this.get(view, (BoundedWindow)FnApiStateAccessor.this.currentWindowSupplier.get());
            }

            @Override
            public BoundedWindow window() {
                return (BoundedWindow)FnApiStateAccessor.this.currentWindowSupplier.get();
            }
        })));
    }

    @Override
    @Deprecated
    public WatermarkHoldState bindWatermark(String id, StateSpec<WatermarkHoldState> spec, TimestampCombiner timestampCombiner) {
        throw new UnsupportedOperationException("WatermarkHoldState is unsupported by the Fn API.");
    }

    private <T> BagUserState<T> createBagUserState(String stateId, Coder<T> valueCoder) {
        BagUserState<T> rval = new BagUserState<T>(this.beamFnStateClient, this.processBundleInstructionId.get(), this.ptransformId, stateId, this.encodedCurrentWindowSupplier.get(), this.encodedCurrentKeySupplier.get(), valueCoder);
        this.stateFinalizers.add(rval::asyncClose);
        return rval;
    }

    private BeamFnApi.StateKey createBagUserStateKey(String stateId) {
        BeamFnApi.StateKey.Builder builder = BeamFnApi.StateKey.newBuilder();
        builder.getBagUserStateBuilder().setWindow(this.encodedCurrentWindowSupplier.get()).setKey(this.encodedCurrentKeySupplier.get()).setPtransformId(this.ptransformId).setUserStateId(stateId);
        return builder.build();
    }

    public void finalizeState() {
        try {
            for (ThrowingRunnable runnable : this.stateFinalizers) {
                runnable.run();
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IllegalStateException(e);
        }
        catch (Exception e) {
            throw new IllegalStateException(e);
        }
    }
}

