/*
 * Decompiled with CFR 0.152.
 */
package com.twitter.heron.streamlet.impl;

import com.twitter.heron.api.topology.TopologyBuilder;
import com.twitter.heron.streamlet.JoinType;
import com.twitter.heron.streamlet.KeyValue;
import com.twitter.heron.streamlet.KeyedWindow;
import com.twitter.heron.streamlet.SerializableBiFunction;
import com.twitter.heron.streamlet.SerializableBinaryOperator;
import com.twitter.heron.streamlet.SerializableConsumer;
import com.twitter.heron.streamlet.SerializableFunction;
import com.twitter.heron.streamlet.SerializablePredicate;
import com.twitter.heron.streamlet.SerializableSupplier;
import com.twitter.heron.streamlet.SerializableTransformer;
import com.twitter.heron.streamlet.Sink;
import com.twitter.heron.streamlet.Source;
import com.twitter.heron.streamlet.Streamlet;
import com.twitter.heron.streamlet.WindowConfig;
import com.twitter.heron.streamlet.impl.streamlets.ConsumerStreamlet;
import com.twitter.heron.streamlet.impl.streamlets.FilterStreamlet;
import com.twitter.heron.streamlet.impl.streamlets.FlatMapStreamlet;
import com.twitter.heron.streamlet.impl.streamlets.GeneralReduceByKeyAndWindowStreamlet;
import com.twitter.heron.streamlet.impl.streamlets.JoinStreamlet;
import com.twitter.heron.streamlet.impl.streamlets.LogStreamlet;
import com.twitter.heron.streamlet.impl.streamlets.MapStreamlet;
import com.twitter.heron.streamlet.impl.streamlets.ReduceByKeyAndWindowStreamlet;
import com.twitter.heron.streamlet.impl.streamlets.RemapStreamlet;
import com.twitter.heron.streamlet.impl.streamlets.SinkStreamlet;
import com.twitter.heron.streamlet.impl.streamlets.SourceStreamlet;
import com.twitter.heron.streamlet.impl.streamlets.SupplierStreamlet;
import com.twitter.heron.streamlet.impl.streamlets.TransformStreamlet;
import com.twitter.heron.streamlet.impl.streamlets.UnionStreamlet;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.logging.Logger;

public abstract class StreamletImpl<R>
implements Streamlet<R> {
    private static final Logger LOG = Logger.getLogger(StreamletImpl.class.getName());
    protected String name;
    protected int nPartitions = -1;
    private List<StreamletImpl<?>> children = new LinkedList();
    private boolean built = false;

    public boolean isBuilt() {
        return this.built;
    }

    public boolean allBuilt() {
        if (!this.built) {
            return false;
        }
        for (StreamletImpl<?> child : this.children) {
            if (child.allBuilt()) continue;
            return false;
        }
        return true;
    }

    public List<StreamletImpl<?>> getChildren() {
        return this.children;
    }

    @Override
    public Streamlet<R> setName(String sName) {
        this.require(sName != null && !sName.trim().isEmpty(), "Streamlet name cannot be null/blank");
        this.name = sName;
        return this;
    }

    @Override
    public String getName() {
        return this.name;
    }

    protected void setDefaultNameIfNone(StreamletNamePrefix prefix, Set<String> stageNames) {
        if (this.getName() == null) {
            this.setName(this.defaultNameCalculator(prefix, stageNames));
        }
        if (stageNames.contains(this.getName())) {
            throw new RuntimeException(String.format("The stage name %s is used multiple times in the same topology", this.getName()));
        }
        stageNames.add(this.getName());
    }

    @Override
    public Streamlet<R> setNumPartitions(int numPartitions) {
        this.require(numPartitions > 0, "Streamlet's partitions number should be > 0");
        this.nPartitions = numPartitions;
        return this;
    }

    @Override
    public int getNumPartitions() {
        return this.nPartitions;
    }

    protected StreamletImpl() {
    }

    public void build(TopologyBuilder bldr, Set<String> stageNames) {
        if (this.built) {
            throw new RuntimeException("Logic Error While building " + this.getName());
        }
        if (this.doBuild(bldr, stageNames)) {
            this.built = true;
            for (StreamletImpl<?> streamlet : this.children) {
                streamlet.build(bldr, stageNames);
            }
        }
    }

    protected abstract boolean doBuild(TopologyBuilder var1, Set<String> var2);

    public <T> void addChild(StreamletImpl<T> child) {
        this.children.add(child);
    }

    private String defaultNameCalculator(StreamletNamePrefix prefix, Set<String> stageNames) {
        String calculatedName;
        int index = 1;
        while (stageNames.contains(calculatedName = prefix.toString() + index)) {
            ++index;
        }
        LOG.info("Calculated stage Name as " + calculatedName);
        return calculatedName;
    }

    static <T> StreamletImpl<T> createSupplierStreamlet(SerializableSupplier<T> supplier) {
        return new SupplierStreamlet<T>(supplier);
    }

    static <T> StreamletImpl<T> createGeneratorStreamlet(Source<T> generator) {
        return new SourceStreamlet<T>(generator);
    }

    @Override
    public <T> Streamlet<T> map(SerializableFunction<R, ? extends T> mapFn) {
        MapStreamlet<R, ? extends T> retval = new MapStreamlet<R, T>(this, mapFn);
        this.addChild(retval);
        return retval;
    }

    @Override
    public <T> Streamlet<T> flatMap(SerializableFunction<R, ? extends Iterable<? extends T>> flatMapFn) {
        FlatMapStreamlet retval = new FlatMapStreamlet(this, flatMapFn);
        this.addChild(retval);
        return retval;
    }

    @Override
    public Streamlet<R> filter(SerializablePredicate<R> filterFn) {
        FilterStreamlet<R> retval = new FilterStreamlet<R>(this, filterFn);
        this.addChild(retval);
        return retval;
    }

    @Override
    public Streamlet<R> repartition(int numPartitions) {
        return this.map(a -> a).setNumPartitions(numPartitions);
    }

    @Override
    public Streamlet<R> repartition(int numPartitions, SerializableBiFunction<R, Integer, List<Integer>> partitionFn) {
        RemapStreamlet<R> retval = new RemapStreamlet<R>(this, partitionFn);
        retval.setNumPartitions(numPartitions);
        this.addChild(retval);
        return retval;
    }

    @Override
    public List<Streamlet<R>> clone(int numClones) {
        ArrayList<Streamlet<R>> retval = new ArrayList<Streamlet<R>>();
        for (int i = 0; i < numClones; ++i) {
            retval.add(this.repartition(this.getNumPartitions()));
        }
        return retval;
    }

    @Override
    public <K, S, T> Streamlet<KeyValue<KeyedWindow<K>, T>> join(Streamlet<S> other, SerializableFunction<R, K> thisKeyExtractor, SerializableFunction<S, K> otherKeyExtractor, WindowConfig windowCfg, SerializableBiFunction<R, S, ? extends T> joinFunction) {
        return this.join(other, thisKeyExtractor, otherKeyExtractor, windowCfg, JoinType.INNER, joinFunction);
    }

    @Override
    public <K, S, T> Streamlet<KeyValue<KeyedWindow<K>, T>> join(Streamlet<S> other, SerializableFunction<R, K> thisKeyExtractor, SerializableFunction<S, K> otherKeyExtractor, WindowConfig windowCfg, JoinType joinType, SerializableBiFunction<R, S, ? extends T> joinFunction) {
        StreamletImpl joinee = (StreamletImpl)other;
        JoinStreamlet<K, R, S, ? extends T> retval = JoinStreamlet.createJoinStreamlet(this, joinee, thisKeyExtractor, otherKeyExtractor, windowCfg, joinType, joinFunction);
        this.addChild(retval);
        joinee.addChild(retval);
        return retval;
    }

    @Override
    public <K, V> Streamlet<KeyValue<KeyedWindow<K>, V>> reduceByKeyAndWindow(SerializableFunction<R, K> keyExtractor, SerializableFunction<R, V> valueExtractor, WindowConfig windowCfg, SerializableBinaryOperator<V> reduceFn) {
        ReduceByKeyAndWindowStreamlet<K, V, R> retval = new ReduceByKeyAndWindowStreamlet<K, V, R>(this, keyExtractor, valueExtractor, windowCfg, reduceFn);
        this.addChild(retval);
        return retval;
    }

    @Override
    public <K, T> Streamlet<KeyValue<KeyedWindow<K>, T>> reduceByKeyAndWindow(SerializableFunction<R, K> keyExtractor, WindowConfig windowCfg, T identity, SerializableBiFunction<T, R, ? extends T> reduceFn) {
        GeneralReduceByKeyAndWindowStreamlet<K, R, ? extends T> retval = new GeneralReduceByKeyAndWindowStreamlet<K, R, T>(this, keyExtractor, windowCfg, identity, reduceFn);
        this.addChild(retval);
        return retval;
    }

    @Override
    public Streamlet<R> union(Streamlet<? extends R> other) {
        StreamletImpl joinee = (StreamletImpl)other;
        UnionStreamlet retval = new UnionStreamlet(this, joinee);
        this.addChild(retval);
        joinee.addChild(retval);
        return retval;
    }

    @Override
    public void log() {
        LogStreamlet logger = new LogStreamlet(this);
        this.addChild(logger);
    }

    @Override
    public void consume(SerializableConsumer<R> consumer) {
        ConsumerStreamlet<R> consumerStreamlet = new ConsumerStreamlet<R>(this, consumer);
        this.addChild(consumerStreamlet);
    }

    @Override
    public void toSink(Sink<R> sink) {
        SinkStreamlet<R> sinkStreamlet = new SinkStreamlet<R>(this, sink);
        this.addChild(sinkStreamlet);
    }

    @Override
    public <T> Streamlet<T> transform(SerializableTransformer<R, ? extends T> serializableTransformer) {
        TransformStreamlet<R, ? extends T> transformStreamlet = new TransformStreamlet<R, T>(this, serializableTransformer);
        this.addChild(transformStreamlet);
        return transformStreamlet;
    }

    private void require(Boolean requirement, String errorMessage) {
        if (!requirement.booleanValue()) {
            throw new IllegalArgumentException(errorMessage);
        }
    }

    protected static enum StreamletNamePrefix {
        CONSUMER("consumer"),
        FILTER("filter"),
        FLATMAP("flatmap"),
        REDUCE("reduceByKeyAndWindow"),
        JOIN("join"),
        LOGGER("logger"),
        MAP("map"),
        REMAP("remap"),
        SINK("sink"),
        SOURCE("generator"),
        SUPPLIER("supplier"),
        TRANSFORM("transform"),
        UNION("union");

        private final String prefix;

        private StreamletNamePrefix(String prefix) {
            this.prefix = prefix;
        }

        public String toString() {
            return this.prefix;
        }
    }
}

