/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.KvState;
import org.apache.flink.runtime.state.KvStateSnapshot;

public class GenericReducingState<K, N, T, Backend extends AbstractStateBackend, W extends ValueState<T> & KvState<K, N, ValueState<T>, ValueStateDescriptor<T>, Backend>>
implements ReducingState<T>,
KvState<K, N, ReducingState<T>, ReducingStateDescriptor<T>, Backend> {
    private final W wrappedState;
    private final ReduceFunction<T> reduceFunction;

    public GenericReducingState(ValueState<T> wrappedState, ReduceFunction<T> reduceFunction) {
        if (!(wrappedState instanceof KvState)) {
            throw new IllegalArgumentException("Wrapped state must be a KvState.");
        }
        this.wrappedState = wrappedState;
        this.reduceFunction = reduceFunction;
    }

    @Override
    public void setCurrentKey(K key) {
        ((KvState)this.wrappedState).setCurrentKey(key);
    }

    @Override
    public void setCurrentNamespace(N namespace) {
        ((KvState)this.wrappedState).setCurrentNamespace(namespace);
    }

    @Override
    public KvStateSnapshot<K, N, ReducingState<T>, ReducingStateDescriptor<T>, Backend> snapshot(long checkpointId, long timestamp) throws Exception {
        KvStateSnapshot wrappedSnapshot = ((KvState)this.wrappedState).snapshot(checkpointId, timestamp);
        return new Snapshot(wrappedSnapshot, this.reduceFunction);
    }

    @Override
    public void dispose() {
        ((KvState)this.wrappedState).dispose();
    }

    public T get() throws Exception {
        return (T)this.wrappedState.value();
    }

    public void add(T value) throws Exception {
        Object currentValue = this.wrappedState.value();
        if (currentValue == null) {
            this.wrappedState.update(value);
        } else {
            this.wrappedState.update(this.reduceFunction.reduce(currentValue, value));
        }
    }

    public void clear() {
        this.wrappedState.clear();
    }

    private static class Snapshot<K, N, T, Backend extends AbstractStateBackend>
    implements KvStateSnapshot<K, N, ReducingState<T>, ReducingStateDescriptor<T>, Backend> {
        private static final long serialVersionUID = 1L;
        private final KvStateSnapshot<K, N, ValueState<T>, ValueStateDescriptor<T>, Backend> wrappedSnapshot;
        private final ReduceFunction<T> reduceFunction;

        public Snapshot(KvStateSnapshot<K, N, ValueState<T>, ValueStateDescriptor<T>, Backend> wrappedSnapshot, ReduceFunction<T> reduceFunction) {
            this.wrappedSnapshot = wrappedSnapshot;
            this.reduceFunction = reduceFunction;
        }

        @Override
        public KvState<K, N, ReducingState<T>, ReducingStateDescriptor<T>, Backend> restoreState(Backend stateBackend, TypeSerializer<K> keySerializer, ClassLoader classLoader, long recoveryTimestamp) throws Exception {
            return new GenericReducingState((ValueState)this.wrappedSnapshot.restoreState(stateBackend, keySerializer, classLoader, recoveryTimestamp), this.reduceFunction);
        }

        @Override
        public void discardState() throws Exception {
            this.wrappedSnapshot.discardState();
        }

        @Override
        public long getStateSize() throws Exception {
            return this.wrappedSnapshot.getStateSize();
        }
    }
}

