/*
 * Decompiled with CFR 0.152.
 */
package org.apache.heron.streamlet.impl.operators;

import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.heron.api.Pair;
import org.apache.heron.api.bolt.OutputCollector;
import org.apache.heron.api.topology.TopologyContext;
import org.apache.heron.api.tuple.Tuple;
import org.apache.heron.api.tuple.Values;
import org.apache.heron.api.windowing.TupleWindow;
import org.apache.heron.streamlet.JoinType;
import org.apache.heron.streamlet.KeyValue;
import org.apache.heron.streamlet.KeyedWindow;
import org.apache.heron.streamlet.SerializableBiFunction;
import org.apache.heron.streamlet.SerializableFunction;
import org.apache.heron.streamlet.Window;
import org.apache.heron.streamlet.impl.operators.StreamletWindowOperator;

public class JoinOperator<K, V1, V2, VR>
extends StreamletWindowOperator {
    private static final long serialVersionUID = 4875450390444745407L;
    private static final String LEFT_COMPONENT_NAME = "_streamlet_joinbolt_left_component_name_";
    private static final String RIGHT_COMPONENT_NAME = "_streamlet_joinbolt_right_component_name_";
    private JoinType joinType;
    private String leftComponent;
    private String rightComponent;
    private SerializableFunction<V1, K> leftKeyExtractor;
    private SerializableFunction<V2, K> rightKeyExtractor;
    private SerializableBiFunction<V1, V2, ? extends VR> joinFn;
    private OutputCollector collector;

    public JoinOperator(JoinType joinType, String leftComponent, String rightComponent, SerializableFunction<V1, K> leftKeyExtractor, SerializableFunction<V2, K> rightKeyExtractor, SerializableBiFunction<V1, V2, ? extends VR> joinFn) {
        this.joinType = joinType;
        this.leftComponent = leftComponent;
        this.rightComponent = rightComponent;
        this.leftKeyExtractor = leftKeyExtractor;
        this.rightKeyExtractor = rightKeyExtractor;
        this.joinFn = joinFn;
    }

    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        Map<String, Object> cfg = super.getComponentConfiguration();
        cfg.put(LEFT_COMPONENT_NAME, this.leftComponent);
        cfg.put(RIGHT_COMPONENT_NAME, this.rightComponent);
        return cfg;
    }

    @Override
    public void execute(TupleWindow inputWindow) {
        HashMap joinMap = new HashMap();
        for (Tuple tuple : inputWindow.get()) {
            Object tup;
            if (tuple.getSourceComponent().equals(this.leftComponent)) {
                tup = tuple.getValue(0);
                if (tup == null) continue;
                this.addMapLeft(joinMap, tup);
                continue;
            }
            tup = tuple.getValue(0);
            if (tup == null) continue;
            this.addMapRight(joinMap, tup);
        }
        this.evaluateJoinMap(joinMap, inputWindow);
    }

    private void evaluateJoinMap(Map<K, Pair<List<V1>, List<V2>>> joinMap, TupleWindow tupleWindow) {
        block6: for (K key : joinMap.keySet()) {
            Pair<List<V1>, List<V2>> val = joinMap.get(key);
            switch (this.joinType) {
                case INNER: {
                    if (val.getFirst().isEmpty() || val.getSecond().isEmpty()) continue block6;
                    this.innerJoinAndEmit(key, tupleWindow, val);
                    continue block6;
                }
                case OUTER_LEFT: {
                    if (!val.getFirst().isEmpty() && !val.getSecond().isEmpty()) {
                        this.innerJoinAndEmit(key, tupleWindow, val);
                        continue block6;
                    }
                    if (val.getFirst().isEmpty()) continue block6;
                    this.outerLeftJoinAndEmit(key, tupleWindow, val);
                    continue block6;
                }
                case OUTER_RIGHT: {
                    if (!val.getFirst().isEmpty() && !val.getSecond().isEmpty()) {
                        this.innerJoinAndEmit(key, tupleWindow, val);
                        continue block6;
                    }
                    if (val.getSecond().isEmpty()) continue block6;
                    this.outerRightJoinAndEmit(key, tupleWindow, val);
                    continue block6;
                }
                case OUTER: {
                    if (!val.getFirst().isEmpty() && !val.getSecond().isEmpty()) {
                        this.innerJoinAndEmit(key, tupleWindow, val);
                        continue block6;
                    }
                    if (!val.getSecond().isEmpty()) {
                        this.outerRightJoinAndEmit(key, tupleWindow, val);
                        continue block6;
                    }
                    if (val.getFirst().isEmpty()) continue block6;
                    this.outerLeftJoinAndEmit(key, tupleWindow, val);
                    continue block6;
                }
            }
            throw new RuntimeException("Unknown join type: " + this.joinType.name());
        }
    }

    private void addMapLeft(Map<K, Pair<List<V1>, List<V2>>> joinMap, V1 tup) {
        Object key = this.leftKeyExtractor.apply(tup);
        if (!joinMap.containsKey(key)) {
            joinMap.put(key, Pair.of(new LinkedList(), new LinkedList()));
        }
        joinMap.get(key).getFirst().add(tup);
    }

    private void addMapRight(Map<K, Pair<List<V1>, List<V2>>> joinMap, V2 tup) {
        Object key = this.rightKeyExtractor.apply(tup);
        if (!joinMap.containsKey(key)) {
            joinMap.put(key, Pair.of(new LinkedList(), new LinkedList()));
        }
        joinMap.get(key).getSecond().add(tup);
    }

    private KeyedWindow<K> getKeyedWindow(K key, TupleWindow tupleWindow) {
        long startWindow = tupleWindow.getStartTimestamp() == null ? 0L : tupleWindow.getStartTimestamp();
        long endWindow = tupleWindow.getEndTimestamp() == null ? 0L : tupleWindow.getEndTimestamp();
        Window window = new Window(startWindow, endWindow, tupleWindow.get().size());
        return new KeyedWindow<K>(key, window);
    }

    private void innerJoinAndEmit(K key, TupleWindow tupleWindow, Pair<List<V1>, List<V2>> val) {
        KeyedWindow<K> keyedWindow = this.getKeyedWindow(key, tupleWindow);
        for (V1 val1 : val.getFirst()) {
            for (V2 val2 : val.getSecond()) {
                this.collector.emit(new Values(new KeyValue(keyedWindow, this.joinFn.apply(val1, val2))));
            }
        }
    }

    private void outerLeftJoinAndEmit(K key, TupleWindow tupleWindow, Pair<List<V1>, List<V2>> val) {
        KeyedWindow<K> keyedWindow = this.getKeyedWindow(key, tupleWindow);
        for (V1 val1 : val.getFirst()) {
            this.collector.emit(new Values(new KeyValue(keyedWindow, this.joinFn.apply(val1, null))));
        }
    }

    private void outerRightJoinAndEmit(K key, TupleWindow tupleWindow, Pair<List<V1>, List<V2>> val) {
        KeyedWindow<K> keyedWindow = this.getKeyedWindow(key, tupleWindow);
        for (V2 val2 : val.getSecond()) {
            this.collector.emit(new Values(new KeyValue(keyedWindow, this.joinFn.apply(null, val2))));
        }
    }
}

