package com.twitter.heron.streamlet.impl.operators;

import com.twitter.heron.api.Pair;
import com.twitter.heron.api.bolt.OutputCollector;
import com.twitter.heron.api.topology.TopologyContext;
import com.twitter.heron.api.tuple.Tuple;
import com.twitter.heron.api.tuple.Values;
import com.twitter.heron.api.windowing.TupleWindow;
import com.twitter.heron.streamlet.JoinType;
import com.twitter.heron.streamlet.KeyValue;
import com.twitter.heron.streamlet.KeyedWindow;
import com.twitter.heron.streamlet.SerializableBiFunction;
import com.twitter.heron.streamlet.SerializableFunction;
import com.twitter.heron.streamlet.Window;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

/* loaded from: input_file:com/twitter/heron/streamlet/impl/operators/JoinOperator.class */
public class JoinOperator<K, V1, V2, VR> extends StreamletWindowOperator {
    private static final long serialVersionUID = 4875450390444745407L;
    private static final String LEFT_COMPONENT_NAME = "_streamlet_joinbolt_left_component_name_";
    private static final String RIGHT_COMPONENT_NAME = "_streamlet_joinbolt_right_component_name_";
    private JoinType joinType;
    private String leftComponent;
    private String rightComponent;
    private SerializableFunction<V1, K> leftKeyExtractor;
    private SerializableFunction<V2, K> rightKeyExtractor;
    private SerializableBiFunction<V1, V2, ? extends VR> joinFn;
    private OutputCollector collector;

    public JoinOperator(JoinType joinType, String str, String str2, SerializableFunction<V1, K> serializableFunction, SerializableFunction<V2, K> serializableFunction2, SerializableBiFunction<V1, V2, ? extends VR> serializableBiFunction) {
        this.joinType = joinType;
        this.leftComponent = str;
        this.rightComponent = str2;
        this.leftKeyExtractor = serializableFunction;
        this.rightKeyExtractor = serializableFunction2;
        this.joinFn = serializableBiFunction;
    }

    @Override // com.twitter.heron.api.bolt.BaseWindowedBolt, com.twitter.heron.api.bolt.IWindowedBolt
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
    }

    @Override // com.twitter.heron.api.bolt.BaseWindowedBolt, com.twitter.heron.api.topology.IComponent
    public Map<String, Object> getComponentConfiguration() {
        Map<String, Object> componentConfiguration = super.getComponentConfiguration();
        componentConfiguration.put(LEFT_COMPONENT_NAME, this.leftComponent);
        componentConfiguration.put(RIGHT_COMPONENT_NAME, this.rightComponent);
        return componentConfiguration;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // com.twitter.heron.api.bolt.IWindowedBolt
    public void execute(TupleWindow tupleWindow) {
        HashMap hashMap = new HashMap();
        for (Tuple tuple : tupleWindow.get()) {
            if (tuple.getSourceComponent().equals(this.leftComponent)) {
                Object value = tuple.getValue(0);
                if (value != null) {
                    addMapLeft(hashMap, value);
                }
            } else {
                Object value2 = tuple.getValue(0);
                if (value2 != null) {
                    addMapRight(hashMap, value2);
                }
            }
        }
        evaluateJoinMap(hashMap, tupleWindow);
    }

    private void evaluateJoinMap(Map<K, Pair<List<V1>, List<V2>>> map, TupleWindow tupleWindow) {
        for (K k : map.keySet()) {
            Pair<List<V1>, List<V2>> pair = map.get(k);
            switch (this.joinType) {
                case INNER:
                    if (!pair.getFirst().isEmpty() && !pair.getSecond().isEmpty()) {
                        innerJoinAndEmit(k, tupleWindow, pair);
                        break;
                    }
                    break;
                case OUTER_LEFT:
                    if (!pair.getFirst().isEmpty() && !pair.getSecond().isEmpty()) {
                        innerJoinAndEmit(k, tupleWindow, pair);
                        break;
                    } else if (pair.getFirst().isEmpty()) {
                        break;
                    } else {
                        outerLeftJoinAndEmit(k, tupleWindow, pair);
                        break;
                    }
                case OUTER_RIGHT:
                    if (!pair.getFirst().isEmpty() && !pair.getSecond().isEmpty()) {
                        innerJoinAndEmit(k, tupleWindow, pair);
                        break;
                    } else if (pair.getSecond().isEmpty()) {
                        break;
                    } else {
                        outerRightJoinAndEmit(k, tupleWindow, pair);
                        break;
                    }
                case OUTER:
                    if (!pair.getFirst().isEmpty() && !pair.getSecond().isEmpty()) {
                        innerJoinAndEmit(k, tupleWindow, pair);
                        break;
                    } else if (pair.getSecond().isEmpty()) {
                        if (pair.getFirst().isEmpty()) {
                            break;
                        } else {
                            outerLeftJoinAndEmit(k, tupleWindow, pair);
                            break;
                        }
                    } else {
                        outerRightJoinAndEmit(k, tupleWindow, pair);
                        break;
                    }
                default:
                    throw new RuntimeException("Unknown join type " + this.joinType.name());
            }
        }
    }

    private void addMapLeft(Map<K, Pair<List<V1>, List<V2>>> map, V1 v1) {
        K apply = this.leftKeyExtractor.apply(v1);
        if (!map.containsKey(apply)) {
            map.put(apply, Pair.of(new LinkedList(), new LinkedList()));
        }
        map.get(apply).getFirst().add(v1);
    }

    private void addMapRight(Map<K, Pair<List<V1>, List<V2>>> map, V2 v2) {
        K apply = this.rightKeyExtractor.apply(v2);
        if (!map.containsKey(apply)) {
            map.put(apply, Pair.of(new LinkedList(), new LinkedList()));
        }
        map.get(apply).getSecond().add(v2);
    }

    private KeyedWindow<K> getKeyedWindow(K k, TupleWindow tupleWindow) {
        return new KeyedWindow<>(k, new Window(tupleWindow.getStartTimestamp() == null ? 0L : tupleWindow.getStartTimestamp().longValue(), tupleWindow.getEndTimestamp() == null ? 0L : tupleWindow.getEndTimestamp().longValue(), tupleWindow.get().size()));
    }

    private void innerJoinAndEmit(K k, TupleWindow tupleWindow, Pair<List<V1>, List<V2>> pair) {
        KeyedWindow<K> keyedWindow = getKeyedWindow(k, tupleWindow);
        for (V1 v1 : pair.getFirst()) {
            Iterator<V2> it = pair.getSecond().iterator();
            while (it.hasNext()) {
                this.collector.emit(new Values(new KeyValue(keyedWindow, this.joinFn.apply(v1, it.next()))));
            }
        }
    }

    private void outerLeftJoinAndEmit(K k, TupleWindow tupleWindow, Pair<List<V1>, List<V2>> pair) {
        KeyedWindow<K> keyedWindow = getKeyedWindow(k, tupleWindow);
        Iterator<V1> it = pair.getFirst().iterator();
        while (it.hasNext()) {
            this.collector.emit(new Values(new KeyValue(keyedWindow, this.joinFn.apply(it.next(), null))));
        }
    }

    private void outerRightJoinAndEmit(K k, TupleWindow tupleWindow, Pair<List<V1>, List<V2>> pair) {
        KeyedWindow<K> keyedWindow = getKeyedWindow(k, tupleWindow);
        Iterator<V2> it = pair.getSecond().iterator();
        while (it.hasNext()) {
            this.collector.emit(new Values(new KeyValue(keyedWindow, this.joinFn.apply(null, it.next()))));
        }
    }
}
