/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct;

import java.util.HashSet;
import java.util.Map;
import org.apache.beam.runners.direct.repackaged.com.google.common.base.Optional;
import org.apache.beam.runners.direct.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.Iterables;
import org.apache.beam.runners.direct.repackaged.javax.annotation.Nullable;
import org.apache.beam.runners.direct.repackaged.runners.core.InMemoryStateInternals;
import org.apache.beam.runners.direct.repackaged.runners.core.StateInternals;
import org.apache.beam.runners.direct.repackaged.runners.core.StateNamespace;
import org.apache.beam.runners.direct.repackaged.runners.core.StateTable;
import org.apache.beam.runners.direct.repackaged.runners.core.StateTag;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.MapState;
import org.apache.beam.sdk.state.SetState;
import org.apache.beam.sdk.state.State;
import org.apache.beam.sdk.state.StateContext;
import org.apache.beam.sdk.state.StateContexts;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.CombineFnUtil;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;

class CopyOnAccessInMemoryStateInternals<K>
implements StateInternals {
    private final CopyOnAccessInMemoryStateTable table;
    private K key;

    public static <K> CopyOnAccessInMemoryStateInternals withUnderlying(K key, @Nullable CopyOnAccessInMemoryStateInternals underlying) {
        return new CopyOnAccessInMemoryStateInternals<K>(key, underlying);
    }

    private CopyOnAccessInMemoryStateInternals(K key, CopyOnAccessInMemoryStateInternals underlying) {
        this.key = key;
        this.table = new CopyOnAccessInMemoryStateTable(underlying == null ? null : underlying.table);
    }

    public CopyOnAccessInMemoryStateInternals commit() {
        this.table.commit();
        return this;
    }

    public Instant getEarliestWatermarkHold() {
        Preconditions.checkState(this.table.earliestWatermarkHold.isPresent(), "Can't get the earliest watermark hold in a %s before it is committed", (Object)this.getClass().getSimpleName());
        return (Instant)this.table.earliestWatermarkHold.get();
    }

    @Override
    public <T extends State> T state(StateNamespace namespace, StateTag<T> address) {
        return this.state(namespace, address, StateContexts.nullContext());
    }

    @Override
    public <T extends State> T state(StateNamespace namespace, StateTag<T> address, StateContext<?> c) {
        return this.table.get(namespace, address, c);
    }

    @Override
    public Object getKey() {
        return this.key;
    }

    public boolean isEmpty() {
        return Iterables.isEmpty(this.table.values());
    }

    private static class CopyOnAccessInMemoryStateTable
    extends StateTable {
        private Optional<StateTable> underlying;
        private StateBinderFactory binderFactory;
        private Optional<Instant> earliestWatermarkHold;

        public CopyOnAccessInMemoryStateTable(StateTable underlying) {
            this.underlying = Optional.fromNullable(underlying);
            this.binderFactory = new CopyOnBindBinderFactory(this.underlying);
            this.earliestWatermarkHold = Optional.absent();
        }

        private void commit() {
            Instant earliestHold = this.getEarliestWatermarkHold();
            if (this.underlying.isPresent()) {
                ReadThroughBinderFactory readThroughBinder;
                this.binderFactory = readThroughBinder = new ReadThroughBinderFactory(this.underlying.get());
                Instant earliestUnderlyingHold = readThroughBinder.readThroughAndGetEarliestHold(this);
                if (earliestUnderlyingHold.isBefore((ReadableInstant)earliestHold)) {
                    earliestHold = earliestUnderlyingHold;
                }
            }
            this.earliestWatermarkHold = Optional.of(earliestHold);
            this.clearEmpty();
            this.binderFactory = new InMemoryStateBinderFactory();
            this.underlying = Optional.absent();
        }

        private Instant getEarliestWatermarkHold() {
            Instant earliest = BoundedWindow.TIMESTAMP_MAX_VALUE;
            for (State existingState : this.values()) {
                Instant hold;
                if (!(existingState instanceof WatermarkHoldState) || (hold = (Instant)((WatermarkHoldState)existingState).read()) == null || !hold.isBefore((ReadableInstant)earliest)) continue;
                earliest = hold;
            }
            return earliest;
        }

        private void clearEmpty() {
            HashSet<StateNamespace> emptyNamespaces = new HashSet<StateNamespace>(this.getNamespacesInUse());
            block0: for (StateNamespace namespace : this.getNamespacesInUse()) {
                for (State existingState : this.getTagsInUse(namespace).values()) {
                    if (((InMemoryStateInternals.InMemoryState)existingState).isCleared()) continue;
                    emptyNamespaces.remove(namespace);
                    continue block0;
                }
            }
            for (StateNamespace empty : emptyNamespaces) {
                this.clearNamespace(empty);
            }
        }

        @Override
        protected StateTag.StateBinder binderForNamespace(StateNamespace namespace, StateContext<?> c) {
            return this.binderFactory.forNamespace(namespace, c);
        }

        private static class InMemoryStateBinderFactory
        implements StateBinderFactory {
            @Override
            public StateTag.StateBinder forNamespace(StateNamespace namespace, StateContext<?> c) {
                return new InMemoryStateInternals.InMemoryStateBinder(c);
            }
        }

        private static class ReadThroughBinderFactory<K>
        implements StateBinderFactory {
            private final StateTable underlying;

            public ReadThroughBinderFactory(StateTable underlying) {
                this.underlying = underlying;
            }

            public Instant readThroughAndGetEarliestHold(StateTable readTo) {
                Instant earliestHold = BoundedWindow.TIMESTAMP_MAX_VALUE;
                for (StateNamespace namespace : this.underlying.getNamespacesInUse()) {
                    for (Map.Entry<StateTag, State> existingState : this.underlying.getTagsInUse(namespace).entrySet()) {
                        Instant hold;
                        Object state;
                        if (((InMemoryStateInternals.InMemoryState)existingState.getValue()).isCleared() || !((state = readTo.get(namespace, existingState.getKey(), StateContexts.nullContext())) instanceof WatermarkHoldState) || (hold = (Instant)((WatermarkHoldState)state).read()) == null || !hold.isBefore((ReadableInstant)earliestHold)) continue;
                        earliestHold = hold;
                    }
                }
                return earliestHold;
            }

            @Override
            public StateTag.StateBinder forNamespace(final StateNamespace namespace, final StateContext<?> c) {
                return new StateTag.StateBinder(){

                    @Override
                    public WatermarkHoldState bindWatermark(StateTag<WatermarkHoldState> address, TimestampCombiner timestampCombiner) {
                        return underlying.get(namespace, address, c);
                    }

                    @Override
                    public <T> ValueState<T> bindValue(StateTag<ValueState<T>> address, Coder<T> coder) {
                        return underlying.get(namespace, address, c);
                    }

                    @Override
                    public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValue(StateTag<CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
                        return underlying.get(namespace, address, c);
                    }

                    @Override
                    public <T> BagState<T> bindBag(StateTag<BagState<T>> address, Coder<T> elemCoder) {
                        return underlying.get(namespace, address, c);
                    }

                    @Override
                    public <T> SetState<T> bindSet(StateTag<SetState<T>> address, Coder<T> elemCoder) {
                        return underlying.get(namespace, address, c);
                    }

                    @Override
                    public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(StateTag<MapState<KeyT, ValueT>> address, Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
                        return underlying.get(namespace, address, c);
                    }

                    @Override
                    public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(StateTag<CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
                        return this.bindCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, (StateContext)c));
                    }
                };
            }
        }

        private static class CopyOnBindBinderFactory
        implements StateBinderFactory {
            private final Optional<StateTable> underlying;

            public CopyOnBindBinderFactory(Optional<StateTable> underlying) {
                this.underlying = underlying;
            }

            private boolean containedInUnderlying(StateNamespace namespace, StateTag<?> tag) {
                return this.underlying.isPresent() && this.underlying.get().isNamespaceInUse(namespace) && this.underlying.get().getTagsInUse(namespace).containsKey(tag);
            }

            @Override
            public StateTag.StateBinder forNamespace(final StateNamespace namespace, final StateContext<?> c) {
                return new StateTag.StateBinder(){

                    @Override
                    public WatermarkHoldState bindWatermark(StateTag<WatermarkHoldState> address, TimestampCombiner timestampCombiner) {
                        if (this.containedInUnderlying(namespace, address)) {
                            InMemoryStateInternals.InMemoryState existingState = (InMemoryStateInternals.InMemoryState)((StateTable)underlying.get()).get(namespace, address, c);
                            return (WatermarkHoldState)existingState.copy();
                        }
                        return new InMemoryStateInternals.InMemoryWatermarkHold(timestampCombiner);
                    }

                    @Override
                    public <T> ValueState<T> bindValue(StateTag<ValueState<T>> address, Coder<T> coder) {
                        if (this.containedInUnderlying(namespace, address)) {
                            InMemoryStateInternals.InMemoryState existingState = (InMemoryStateInternals.InMemoryState)((StateTable)underlying.get()).get(namespace, address, c);
                            return (ValueState)existingState.copy();
                        }
                        return new InMemoryStateInternals.InMemoryValue<T>(coder);
                    }

                    @Override
                    public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValue(StateTag<CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
                        if (this.containedInUnderlying(namespace, address)) {
                            InMemoryStateInternals.InMemoryState existingState = (InMemoryStateInternals.InMemoryState)((StateTable)underlying.get()).get(namespace, address, c);
                            return (CombiningState)existingState.copy();
                        }
                        return new InMemoryStateInternals.InMemoryCombiningState<InputT, AccumT, OutputT>(combineFn, accumCoder);
                    }

                    @Override
                    public <T> BagState<T> bindBag(StateTag<BagState<T>> address, Coder<T> elemCoder) {
                        if (this.containedInUnderlying(namespace, address)) {
                            InMemoryStateInternals.InMemoryState existingState = (InMemoryStateInternals.InMemoryState)((StateTable)underlying.get()).get(namespace, address, c);
                            return (BagState)existingState.copy();
                        }
                        return new InMemoryStateInternals.InMemoryBag<T>(elemCoder);
                    }

                    @Override
                    public <T> SetState<T> bindSet(StateTag<SetState<T>> address, Coder<T> elemCoder) {
                        if (this.containedInUnderlying(namespace, address)) {
                            InMemoryStateInternals.InMemoryState existingState = (InMemoryStateInternals.InMemoryState)((StateTable)underlying.get()).get(namespace, address, c);
                            return (SetState)existingState.copy();
                        }
                        return new InMemoryStateInternals.InMemorySet<T>(elemCoder);
                    }

                    @Override
                    public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(StateTag<MapState<KeyT, ValueT>> address, Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
                        if (this.containedInUnderlying(namespace, address)) {
                            InMemoryStateInternals.InMemoryState existingState = (InMemoryStateInternals.InMemoryState)((StateTable)underlying.get()).get(namespace, address, c);
                            return (MapState)existingState.copy();
                        }
                        return new InMemoryStateInternals.InMemoryMap<KeyT, ValueT>(mapKeyCoder, mapValueCoder);
                    }

                    @Override
                    public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(StateTag<CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
                        return this.bindCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, (StateContext)c));
                    }
                };
            }
        }

        private static interface StateBinderFactory {
            public StateTag.StateBinder forNamespace(StateNamespace var1, StateContext<?> var2);
        }
    }
}

