package com.hazelcast.jet.pipeline;

import com.hazelcast.client.test.CustomCredentials;
import com.hazelcast.client.test.executor.tasks.SelectAllMembers;
import com.hazelcast.client.test.executor.tasks.SelectNoMembers;
import com.hazelcast.client.test.executor.tasks.SerializedCounterCallable;
import com.hazelcast.client.test.ifunction.AppendString;
import com.hazelcast.client.test.ifunction.Multiplication;
import com.hazelcast.collection.IList;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.Functions;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.accumulator.LongAccumulator;
import com.hazelcast.jet.aggregate.AggregateOperation1;
import com.hazelcast.jet.aggregate.AggregateOperations;
import com.hazelcast.jet.aggregate.CoAggregateOperationBuilder;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.core.Edge;
import com.hazelcast.jet.core.test.JetAssert;
import com.hazelcast.jet.datamodel.ItemsByTag;
import com.hazelcast.jet.datamodel.Tag;
import com.hazelcast.jet.datamodel.Tuple2;
import com.hazelcast.jet.datamodel.Tuple3;
import com.hazelcast.jet.pipeline.test.AssertionSinks;
import com.hazelcast.jet.pipeline.test.TestSources;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import com.hazelcast.topic.TopicStressTest;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Spliterators;
import java.util.function.Function;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.annotation.Nonnull;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({QuickTest.class, ParallelJVMTest.class})
/* loaded from: input_file:com/hazelcast/jet/pipeline/RebalanceBatchStageTest.class */
public class RebalanceBatchStageTest extends PipelineTestSupport {
    private static final AggregateOperation1<Integer, LongAccumulator, Long> SUMMING = AggregateOperations.summingLong(num -> {
        return num.intValue();
    });

    @Test
    public void when_rebalanceAndMap_then_dagEdgeDistributed() {
        List<Integer> sequence = sequence(this.itemCount);
        BatchStage<Integer> batchStageFromList = batchStageFromList(sequence);
        FunctionEx functionEx = num -> {
            return String.format("%04d-string", num);
        };
        batchStageFromList.rebalance().map(functionEx).writeTo(this.sink);
        Edge edge = (Edge) this.p.toDag().getInboundEdges("map").get(0);
        Assert.assertTrue("Rebalancing should make the edge distributed", edge.isDistributed());
        Assert.assertNull("Didn't rebalance by key, the edge must not be partitioned", edge.getPartitioner());
        execute();
        Assert.assertEquals(streamToString(sequence.stream(), functionEx), streamToString(sinkStreamOf(String.class), Function.identity()));
    }

    @Test
    public void when_rebalanceByKeyAndMap_then_dagEdgePartitionedDistributed() {
        List<Integer> sequence = sequence(this.itemCount);
        BatchStage<Integer> batchStageFromList = batchStageFromList(sequence);
        FunctionEx functionEx = num -> {
            return String.format("%04d-string", num);
        };
        batchStageFromList.rebalance(num2 -> {
            return num2;
        }).map(functionEx).writeTo(this.sink);
        Edge edge = (Edge) this.p.toDag().getInboundEdges("map").get(0);
        Assert.assertTrue("Rebalancing should make the edge distributed", edge.isDistributed());
        Assert.assertNotNull("Rebalancing by key, the edge must be partitioned", edge.getPartitioner());
        execute();
        Assert.assertEquals(streamToString(sequence.stream(), functionEx), streamToString(sinkStreamOf(String.class), Function.identity()));
    }

    @Test
    public void when_peekAndRebalanceAndMap_then_dagEdgeDistributed() {
        List<Integer> sequence = sequence(this.itemCount);
        BatchStage<Integer> batchStageFromList = batchStageFromList(sequence);
        FunctionEx functionEx = num -> {
            return String.format("%04d-string", num);
        };
        batchStageFromList.peek().rebalance().map(functionEx).writeTo(this.sink);
        Edge edge = (Edge) this.p.toDag().getInboundEdges("map").get(0);
        Assert.assertTrue("Rebalancing should make the edge distributed", edge.isDistributed());
        Assert.assertNull("Didn't rebalance by key, the edge must not be partitioned", edge.getPartitioner());
        execute();
        Assert.assertEquals(streamToString(sequence.stream(), functionEx), streamToString(sinkStreamOf(String.class), Function.identity()));
    }

    @Test
    public void when_peekAndRebalanceByKeyAndMap_then_dagEdgePartitionedDistributed() {
        List<Integer> sequence = sequence(this.itemCount);
        BatchStage<Integer> batchStageFromList = batchStageFromList(sequence);
        FunctionEx functionEx = num -> {
            return String.format("%04d-string", num);
        };
        batchStageFromList.peek().rebalance(num2 -> {
            return num2;
        }).map(functionEx).writeTo(this.sink);
        Edge edge = (Edge) this.p.toDag().getInboundEdges("map").get(0);
        Assert.assertTrue("Rebalancing should make the edge distributed", edge.isDistributed());
        Assert.assertNotNull("Rebalancing by key, the edge must be partitioned", edge.getPartitioner());
        execute();
        Assert.assertEquals(streamToString(sequence.stream(), functionEx), streamToString(sinkStreamOf(String.class), Function.identity()));
    }

    @Test(expected = JetException.class)
    public void when_rebalanceAndPeekAndMap_then_dagEdgeDistributed() {
        BatchStage<Integer> batchStageFromList = batchStageFromList(sequence(this.itemCount));
        batchStageFromList.rebalance().peek().map(num -> {
            return String.format("%04d-string", num);
        });
    }

    @Test(expected = JetException.class)
    public void when_rebalanceByKeyAndPeekAndMap_then_dagEdgePartitionedDistributed() {
        BatchStage<Integer> batchStageFromList = batchStageFromList(sequence(this.itemCount));
        batchStageFromList.rebalance(num -> {
            return num;
        }).peek().map(num2 -> {
            return String.format("%04d-string", num2);
        });
    }

    @Test
    public void when_mergeWithRebalanced_thenOnlyRebalancedEdgeDistributed() {
        Iterator<Integer> it = IntStream.iterate(0, i -> {
            return i + 1;
        }).boxed().iterator();
        batchStageFromList((List) streamFromIterator(it).limit(this.itemCount).collect(Collectors.toList())).merge(batchStageFromList((List) streamFromIterator(it).limit(this.itemCount).collect(Collectors.toList())).rebalance()).writeTo(AssertionSinks.assertAnyOrder((Collection) IntStream.range(0, 2 * this.itemCount).boxed().collect(Collectors.toList())));
        List inboundEdges = this.p.toDag().getInboundEdges("merge");
        JetAssert.assertFalse("Didn't rebalance this stage, why is its edge distributed?", ((Edge) inboundEdges.get(0)).isDistributed());
        Assert.assertTrue("Rebalancing should make the edge distributed", ((Edge) inboundEdges.get(1)).isDistributed());
        Assert.assertNull("Didn't rebalance by key, the edge must not be partitioned", ((Edge) inboundEdges.get(1)).getPartitioner());
        execute();
    }

    @Test
    public void when_mergeWithRebalancedByKey_thenOnlyRebalancedEdgePartitionedDistributed() {
        Iterator<Integer> it = IntStream.iterate(0, i -> {
            return i + 1;
        }).boxed().iterator();
        batchStageFromList((List) streamFromIterator(it).limit(this.itemCount).collect(Collectors.toList())).merge(batchStageFromList((List) streamFromIterator(it).limit(this.itemCount).collect(Collectors.toList())).rebalance(num -> {
            return num;
        })).writeTo(AssertionSinks.assertAnyOrder((Collection) IntStream.range(0, 2 * this.itemCount).boxed().collect(Collectors.toList())));
        List inboundEdges = this.p.toDag().getInboundEdges("merge");
        JetAssert.assertFalse("Didn't rebalance this stage, why is its edge distributed?", ((Edge) inboundEdges.get(0)).isDistributed());
        Assert.assertNull("Didn't rebalance by key, the edge must not be partitioned", ((Edge) inboundEdges.get(0)).getPartitioner());
        Assert.assertTrue("Rebalancing should make the edge distributed", ((Edge) inboundEdges.get(1)).isDistributed());
        Assert.assertNotNull("Rebalancing by key, the edge must be partitioned", ((Edge) inboundEdges.get(1)).getPartitioner());
        execute();
    }

    @Test
    public void when_mergeRebalancedWithNonRebalanced_thenOnlyRebalancedEdgeDistributed() {
        Iterator<Integer> it = IntStream.iterate(0, i -> {
            return i + 1;
        }).boxed().iterator();
        batchStageFromList((List) streamFromIterator(it).limit(this.itemCount).collect(Collectors.toList())).rebalance().merge(batchStageFromList((List) streamFromIterator(it).limit(this.itemCount).collect(Collectors.toList()))).writeTo(AssertionSinks.assertAnyOrder((Collection) IntStream.range(0, 2 * this.itemCount).boxed().collect(Collectors.toList())));
        List inboundEdges = this.p.toDag().getInboundEdges("merge");
        Assert.assertTrue("Rebalancing should make the edge distributed", ((Edge) inboundEdges.get(0)).isDistributed());
        Assert.assertNull("Didn't rebalance by key, the edge must not be partitioned", ((Edge) inboundEdges.get(0)).getPartitioner());
        JetAssert.assertFalse("Didn't rebalance this stage, why is its edge distributed?", ((Edge) inboundEdges.get(1)).isDistributed());
        execute();
    }

    @Test
    public void when_hashJoinRebalanceMainStage_then_distributedEdge() {
        List<Integer> sequence = sequence(this.itemCount);
        String str = "value-";
        batchStageFromList(sequence).rebalance().hashJoin(batchStageFromList(sequence).map(num -> {
            return Util.entry(num, str + num);
        }), JoinClause.joinMapEntries(Functions.wholeItem()), (num2, str2) -> {
            return Util.entry(num2, str2);
        }).writeTo(this.sink);
        Edge edge = (Edge) this.p.toDag().getInboundEdges("2-way hash-join-joiner").get(0);
        Assert.assertTrue("Rebalancing should make the edge distributed", edge.isDistributed());
        Assert.assertNull("Didn't rebalance by key, the edge must not be partitioned", edge.getPartitioner());
        execute();
        Function function = entry -> {
            return String.format("(%04d, %s)", entry.getKey(), entry.getValue());
        };
        Assert.assertEquals(streamToString(sequence.stream().map(num3 -> {
            return Tuple2.tuple2(num3, str + num3);
        }), function), streamToString(sinkStreamOfEntry(), function));
    }

    @Test
    public void when_hashJoinRebalanceEnrichingStage_then_noEffect() {
        List<Integer> sequence = sequence(this.itemCount);
        String str = "value-";
        batchStageFromList(sequence).hashJoin(batchStageFromList(sequence).map(num -> {
            return Util.entry(num, str + num);
        }).rebalance(), JoinClause.joinMapEntries(Functions.wholeItem()), (v0, v1) -> {
            return Util.entry(v0, v1);
        }).writeTo(this.sink);
        DAG dag = this.p.toDag();
        Edge edge = (Edge) dag.getInboundEdges("2-way hash-join-collector1").get(0);
        Assert.assertTrue("Edge into a hash-join collector vertex must be distributed", edge.isDistributed());
        Assert.assertNull("Didn't rebalance by key, the edge must not be partitioned", edge.getPartitioner());
        Edge edge2 = (Edge) dag.getInboundEdges("2-way hash-join-joiner").get(0);
        JetAssert.assertFalse("Didn't rebalance this stage, why is its edge distributed?", edge2.isDistributed());
        Assert.assertNull("Didn't rebalance by key, the edge must not be partitioned", edge2.getPartitioner());
        execute();
        Function function = entry -> {
            return String.format("(%04d, %s)", entry.getKey(), entry.getValue());
        };
        Assert.assertEquals(streamToString(sequence.stream().map(num2 -> {
            return Tuple2.tuple2(num2, str + num2);
        }), function), streamToString(sinkStreamOfEntry(), function));
    }

    @Test
    public void when_hashJoinBuilderRebalanceMainStage_then_distributedEdge() {
        List<Integer> sequence = sequence(this.itemCount);
        String str = "value-";
        BatchStage rebalance = batchStageFromList(sequence).rebalance();
        BatchStage map = batchStageFromList(sequence).map(num -> {
            return Util.entry(num, str + num);
        });
        HashJoinBuilder hashJoinBuilder = rebalance.hashJoinBuilder();
        Tag add = hashJoinBuilder.add(map, JoinClause.joinMapEntries(Functions.wholeItem()));
        hashJoinBuilder.build((num2, itemsByTag) -> {
            return Util.entry(num2, itemsByTag);
        }).writeTo(this.sink);
        Edge edge = (Edge) this.p.toDag().getInboundEdges("2-way hash-join-joiner").get(0);
        Assert.assertTrue("Rebalancing should make the edge distributed", edge.isDistributed());
        Assert.assertNull("Didn't rebalance by key, the edge must not be partitioned", edge.getPartitioner());
        execute();
        Function function = entry -> {
            return String.format("(%04d, %s)", entry.getKey(), entry.getValue());
        };
        Assert.assertEquals(streamToString(sequence.stream().map(num3 -> {
            return Tuple2.tuple2(num3, str + num3);
        }), function), streamToString(sinkStreamOfEntry(), entry2 -> {
            return (String) function.apply(Util.entry((Integer) entry2.getKey(), (String) ((ItemsByTag) entry2.getValue()).get(add)));
        }));
    }

    @Test
    public void when_rebalanceAndAggregate_then_unicastDistributedEdgeAndTwoStageAggregation() {
        List<Integer> sequence = sequence(this.itemCount);
        batchStageFromList(sequence).rebalance().aggregate(SUMMING).writeTo(this.sink);
        assertTwoStageGlobalAggregation(1);
        execute();
        Assert.assertEquals(Collections.singletonList(Long.valueOf(sequence.stream().mapToLong(num -> {
            return num.intValue();
        }).sum())), new ArrayList((Collection) this.sinkList));
    }

    @Test
    public void when_rebalanceByKeyAndAggregate_then_distributedPartitionedEdgeAndTwoStageAggregation() {
        List<Integer> sequence = sequence(this.itemCount);
        batchStageFromList(sequence).rebalance(num -> {
            return Integer.valueOf(2 * num.intValue());
        }).aggregate(SUMMING).writeTo(this.sink);
        DAG dag = this.p.toDag();
        try {
            Edge edge = (Edge) dag.getOutboundEdges("items").get(0);
            Assert.assertNotNull("Rebalanced edge to grouped aggregation must be partitioned", edge.getPartitioner());
            Assert.assertTrue("Outbound edge after rebalancing must be distributed", edge.isDistributed());
            String destName = edge.getDestName();
            Assert.assertEquals("Aggregation must be two-stage", "aggregate-prepare", destName);
            Edge edge2 = (Edge) dag.getOutboundEdges(destName).get(0);
            Assert.assertTrue("Internal aggregation edge must be distributed", edge2.isDistributed());
            Assert.assertNotNull("Internal aggregation edge must be partitioned", edge2.getPartitioner());
            execute();
            Assert.assertEquals(Collections.singletonList(Long.valueOf(sequence.stream().mapToLong(num2 -> {
                return num2.intValue();
            }).sum())), new ArrayList((Collection) this.sinkList));
        } catch (AssertionError e) {
            System.err.println(dag.toDotString());
            throw e;
        }
    }

    @Test
    public void when_aggregate2WithRebalancedStage_then_twoStageAggregation() {
        List<Integer> sequence = sequence(this.itemCount);
        batchStageFromList(sequence).aggregate2(SUMMING, batchStageFromList(sequence).rebalance(), SUMMING).writeTo(this.sink);
        assertTwoStageGlobalAggregation(2);
        execute();
        long sum = sequence.stream().mapToLong(num -> {
            return num.intValue();
        }).sum();
        Assert.assertEquals(Collections.singletonList(Tuple2.tuple2(Long.valueOf(sum), Long.valueOf(sum))), new ArrayList((Collection) this.sinkList));
    }

    @Test
    public void when_aggregate3WithRebalancedStage_then_twoStageAggregation() {
        List<Integer> sequence = sequence(this.itemCount);
        batchStageFromList(sequence).aggregate3(SUMMING, batchStageFromList(sequence), SUMMING, batchStageFromList(sequence).rebalance(), SUMMING).writeTo(this.sink);
        assertTwoStageGlobalAggregation(3);
        execute();
        long sum = sequence.stream().mapToLong(num -> {
            return num.intValue();
        }).sum();
        Assert.assertEquals(Collections.singletonList(Tuple3.tuple3(Long.valueOf(sum), Long.valueOf(sum), Long.valueOf(sum))), new ArrayList((Collection) this.sinkList));
    }

    @Test
    public void when_rebalanceAndGroupAggregate_then_singleStageAggregation() {
        List<Integer> sequence = sequence(this.itemCount);
        FunctionEx functionEx = num -> {
            return Integer.valueOf(num.intValue() % 5);
        };
        BatchStage aggregate = batchStageFromList(sequence).rebalance().groupingKey(functionEx).aggregate(SUMMING);
        Map map = (Map) sequence.stream().collect(Collectors.groupingBy(functionEx, Collectors.summingLong(num2 -> {
            return num2.intValue();
        })));
        aggregate.writeTo(this.sink);
        assertSingleStageAggregation();
        execute();
        Assert.assertEquals(streamToString(map.entrySet().stream(), BatchAggregateTest.FORMAT_FN), streamToString(sinkStreamOfEntry(), BatchAggregateTest.FORMAT_FN));
    }

    @Test
    public void when_rebalanceAndGroupAggregate2_then_singleStageAggregation() {
        FunctionEx functionEx = num -> {
            return Integer.valueOf(num.intValue() % 10);
        };
        Iterator<Integer> it = IntStream.iterate(0, i -> {
            return i + 1;
        }).boxed().iterator();
        List<Integer> list = (List) streamFromIterator(it).limit(this.itemCount).collect(Collectors.toList());
        List<Integer> list2 = (List) streamFromIterator(it).limit(this.itemCount).collect(Collectors.toList());
        batchStageFromList(list).groupingKey(functionEx).aggregate2(SUMMING, batchStageFromList(list2).rebalance().groupingKey(functionEx), SUMMING).writeTo(this.sink);
        assertSingleStageAggregation();
        execute();
        Map map = (Map) list.stream().collect(Collectors.groupingBy(functionEx, Collectors.summingLong(num2 -> {
            return num2.intValue();
        })));
        Map map2 = (Map) list2.stream().collect(Collectors.groupingBy(functionEx, Collectors.summingLong(num3 -> {
            return num3.intValue();
        })));
        Assert.assertEquals(streamToString(map.entrySet().stream(), entry -> {
            return (String) BatchAggregateTest.FORMAT_FN_2.apply((Integer) entry.getKey(), Tuple2.tuple2((Long) entry.getValue(), (Long) map2.get(entry.getKey())));
        }), streamToString(sinkStreamOfEntry(), entry2 -> {
            return (String) BatchAggregateTest.FORMAT_FN_2.apply((Integer) entry2.getKey(), (Tuple2) entry2.getValue());
        }));
    }

    @Test
    public void when_rebalanceAndGroupAggregate3_then_singleStageAggregation() {
        FunctionEx functionEx = num -> {
            return Integer.valueOf(num.intValue() % 10);
        };
        Iterator<Integer> it = IntStream.iterate(0, i -> {
            return i + 1;
        }).boxed().iterator();
        List<Integer> list = (List) streamFromIterator(it).limit(this.itemCount).collect(Collectors.toList());
        List<Integer> list2 = (List) streamFromIterator(it).limit(this.itemCount).collect(Collectors.toList());
        List<Integer> list3 = (List) streamFromIterator(it).limit(this.itemCount).collect(Collectors.toList());
        batchStageFromList(list).groupingKey(functionEx).aggregate3(SUMMING, batchStageFromList(list2).groupingKey(functionEx), SUMMING, batchStageFromList(list3).rebalance().groupingKey(functionEx), SUMMING).writeTo(this.sink);
        assertSingleStageAggregation();
        execute();
        Collector<? super Integer, A, R> groupingBy = Collectors.groupingBy(functionEx, Collectors.summingLong(num2 -> {
            return num2.intValue();
        }));
        Map map = (Map) list.stream().collect(groupingBy);
        Map map2 = (Map) list2.stream().collect(groupingBy);
        Map map3 = (Map) list3.stream().collect(groupingBy);
        Assert.assertEquals(streamToString(map.entrySet().stream(), entry -> {
            return (String) BatchAggregateTest.FORMAT_FN_3.apply((Integer) entry.getKey(), Tuple3.tuple3((Long) entry.getValue(), (Long) map2.get(entry.getKey()), (Long) map3.get(entry.getKey())));
        }), streamToString(sinkStreamOfEntry(), entry2 -> {
            return (String) BatchAggregateTest.FORMAT_FN_3.apply((Integer) entry2.getKey(), (Tuple3) entry2.getValue());
        }));
    }

    @Test
    public void when_rebalanceAndGroupAggregateBuilder_then_singleStageAggregation() {
        FunctionEx functionEx = num -> {
            return Integer.valueOf(num.intValue() % 10);
        };
        Iterator<Integer> it = IntStream.iterate(0, i -> {
            return i + 1;
        }).boxed().iterator();
        List<Integer> list = (List) streamFromIterator(it).limit(this.itemCount).collect(Collectors.toList());
        List<Integer> list2 = (List) streamFromIterator(it).limit(this.itemCount).collect(Collectors.toList());
        List<Integer> list3 = (List) streamFromIterator(it).limit(this.itemCount).collect(Collectors.toList());
        BatchStageWithKey groupingKey = batchStageFromList(list).groupingKey(functionEx);
        BatchStageWithKey groupingKey2 = batchStageFromList(list2).groupingKey(functionEx);
        BatchStageWithKey groupingKey3 = batchStageFromList(list3).rebalance().groupingKey(functionEx);
        GroupAggregateBuilder aggregateBuilder = groupingKey.aggregateBuilder(SUMMING);
        Tag tag0 = aggregateBuilder.tag0();
        Tag add = aggregateBuilder.add(groupingKey2, SUMMING);
        Tag add2 = aggregateBuilder.add(groupingKey3, SUMMING);
        aggregateBuilder.build().writeTo(this.sink);
        assertSingleStageAggregation();
        execute();
        Collector<? super Integer, A, R> groupingBy = Collectors.groupingBy(functionEx, Collectors.summingLong(num2 -> {
            return num2.intValue();
        }));
        Map map = (Map) list.stream().collect(groupingBy);
        Map map2 = (Map) list2.stream().collect(groupingBy);
        Map map3 = (Map) list3.stream().collect(groupingBy);
        Assert.assertEquals(streamToString(map.entrySet().stream(), entry -> {
            return (String) BatchAggregateTest.FORMAT_FN_3.apply((Integer) entry.getKey(), Tuple3.tuple3((Long) entry.getValue(), (Long) map2.get(entry.getKey()), (Long) map3.get(entry.getKey())));
        }), streamToString(sinkStreamOfEntry(), entry2 -> {
            return (String) BatchAggregateTest.FORMAT_FN_3.apply((Integer) entry2.getKey(), Tuple3.tuple3((Long) ((ItemsByTag) entry2.getValue()).get(tag0), (Long) ((ItemsByTag) entry2.getValue()).get(add), (Long) ((ItemsByTag) entry2.getValue()).get(add2)));
        }));
    }

    @Test
    public void when_rebalanceAndGroupAggregateBuilderWithComplexAggrOp_then_singleStageAggregation() {
        FunctionEx functionEx = num -> {
            return Integer.valueOf(num.intValue() % 10);
        };
        Iterator<Integer> it = IntStream.iterate(0, i -> {
            return i + 1;
        }).boxed().iterator();
        List<Integer> list = (List) streamFromIterator(it).limit(this.itemCount).collect(Collectors.toList());
        List<Integer> list2 = (List) streamFromIterator(it).limit(this.itemCount).collect(Collectors.toList());
        List<Integer> list3 = (List) streamFromIterator(it).limit(this.itemCount).collect(Collectors.toList());
        BatchStageWithKey groupingKey = batchStageFromList(list).groupingKey(functionEx);
        BatchStageWithKey groupingKey2 = batchStageFromList(list2).groupingKey(functionEx);
        BatchStageWithKey groupingKey3 = batchStageFromList(list3).rebalance().groupingKey(functionEx);
        GroupAggregateBuilder1 aggregateBuilder = groupingKey.aggregateBuilder();
        Tag tag0 = aggregateBuilder.tag0();
        Tag add = aggregateBuilder.add(groupingKey2);
        Tag add2 = aggregateBuilder.add(groupingKey3);
        CoAggregateOperationBuilder coAggregateOperationBuilder = AggregateOperations.coAggregateOperationBuilder();
        Tag add3 = coAggregateOperationBuilder.add(tag0, SUMMING);
        Tag add4 = coAggregateOperationBuilder.add(add, SUMMING);
        Tag add5 = coAggregateOperationBuilder.add(add2, SUMMING);
        aggregateBuilder.build(coAggregateOperationBuilder.build()).writeTo(this.sink);
        assertSingleStageAggregation();
        execute();
        Collector<? super Integer, A, R> groupingBy = Collectors.groupingBy(functionEx, Collectors.summingLong(num2 -> {
            return num2.intValue();
        }));
        Map map = (Map) list.stream().collect(groupingBy);
        Map map2 = (Map) list2.stream().collect(groupingBy);
        Map map3 = (Map) list3.stream().collect(groupingBy);
        Assert.assertEquals(streamToString(map.entrySet().stream(), entry -> {
            return (String) BatchAggregateTest.FORMAT_FN_3.apply((Integer) entry.getKey(), Tuple3.tuple3((Long) entry.getValue(), (Long) map2.get(entry.getKey()), (Long) map3.get(entry.getKey())));
        }), streamToString(sinkStreamOfEntry(), entry2 -> {
            return (String) BatchAggregateTest.FORMAT_FN_3.apply((Integer) entry2.getKey(), Tuple3.tuple3((Long) ((ItemsByTag) entry2.getValue()).get(add3), (Long) ((ItemsByTag) entry2.getValue()).get(add4), (Long) ((ItemsByTag) entry2.getValue()).get(add5)));
        }));
    }

    @Test
    public void twoConsecutiveRebalance() {
        Pipeline create = Pipeline.create();
        create.readFrom(TestSources.items(new Integer[]{1, 2, 3, 4, 5, 6, 7, 8})).rebalance().filter(num -> {
            return true;
        }).setName("filter trues 1").filter(num2 -> {
            return true;
        }).setName("filter trues 2").writeTo(SinkBuilder.sinkBuilder("sink", context -> {
            return context.hazelcastInstance().getList("result" + context.globalProcessorIndex());
        }).receiveFn((iList, obj) -> {
            iList.add(obj);
        }).build());
        member.getJet().newJob(create).join();
        IList list = member.getList("result0");
        IList list2 = member.getList("result1");
        Assertions.assertThat(list).hasSize(4);
        Assertions.assertThat(list2).hasSize(4);
    }

    @Nonnull
    private static Stream<Integer> streamFromIterator(Iterator<Integer> it) {
        return StreamSupport.stream(Spliterators.spliteratorUnknownSize(it, 0), false);
    }

    private void assertSingleStageAggregation() {
        DAG dag = this.p.toDag();
        try {
            Assert.assertTrue("Outbound edge after rebalancing must be distributed", ((Edge) dag.getOutboundEdges("items").get(0)).isDistributed());
            Assert.assertEquals("Aggregation after rebalancing must be single-stage", 0L, dag.getOutboundEdges(((Edge) dag.getOutboundEdges(r0.getDestName()).get(0)).getDestName()).size());
        } catch (AssertionError e) {
            System.err.println(dag.toDotString());
            throw e;
        }
    }

    private void assertTwoStageGlobalAggregation(int i) {
        String str;
        DAG dag = this.p.toDag();
        if (i == 1) {
            str = "";
        } else {
            try {
                str = "-" + i;
            } catch (AssertionError e) {
                System.err.println(dag.toDotString());
                throw e;
            }
        }
        Edge edge = (Edge) dag.getOutboundEdges("items" + str).get(0);
        Assert.assertNull("Rebalanced edge to global aggregation must be unicast", edge.getPartitioner());
        Assert.assertTrue("Outbound edge after rebalancing must be distributed", edge.isDistributed());
        String destName = edge.getDestName();
        Assert.assertEquals("Aggregation must be two-stage", "aggregate-prepare", destName.substring(destName.length() - "aggregate-prepare".length()));
        Edge edge2 = (Edge) dag.getOutboundEdges(destName).get(0);
        Assert.assertTrue("Internal aggregation edge must be distributed", edge2.isDistributed());
        Assert.assertNotNull("Internal aggregation edge must be partitioned", edge2.getPartitioner());
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1917087343:
                if (implMethodName.equals("lambda$when_rebalanceAndGroupAggregateBuilder_then_singleStageAggregation$52c622ae$1")) {
                    z = 14;
                    break;
                }
                break;
            case -1718146781:
                if (implMethodName.equals("lambda$when_peekAndRebalanceAndMap_then_dagEdgeDistributed$785790e6$1")) {
                    z = 18;
                    break;
                }
                break;
            case -1695191337:
                if (implMethodName.equals("lambda$when_mergeWithRebalancedByKey_thenOnlyRebalancedEdgePartitionedDistributed$c5856792$1")) {
                    z = 8;
                    break;
                }
                break;
            case -1448961447:
                if (implMethodName.equals("lambda$when_rebalanceAndGroupAggregate3_then_singleStageAggregation$52c622ae$1")) {
                    z = 22;
                    break;
                }
                break;
            case -1201483492:
                if (implMethodName.equals("lambda$when_rebalanceByKeyAndAggregate_then_distributedPartitionedEdgeAndTwoStageAggregation$9d52c840$1")) {
                    z = 24;
                    break;
                }
                break;
            case -1101359183:
                if (implMethodName.equals("lambda$when_hashJoinBuilderRebalanceMainStage_then_distributedEdge$2fcc963$1")) {
                    z = 15;
                    break;
                }
                break;
            case -828911042:
                if (implMethodName.equals("lambda$when_hashJoinRebalanceEnrichingStage_then_noEffect$3a5497cd$1")) {
                    z = 13;
                    break;
                }
                break;
            case -752085965:
                if (implMethodName.equals("lambda$twoConsecutiveRebalance$a45a00b3$1")) {
                    z = 7;
                    break;
                }
                break;
            case -752085964:
                if (implMethodName.equals("lambda$twoConsecutiveRebalance$a45a00b3$2")) {
                    z = 4;
                    break;
                }
                break;
            case -719462433:
                if (implMethodName.equals("lambda$when_rebalanceAndGroupAggregateBuilderWithComplexAggrOp_then_singleStageAggregation$52c622ae$1")) {
                    z = false;
                    break;
                }
                break;
            case -694431604:
                if (implMethodName.equals("lambda$when_hashJoinRebalanceMainStage_then_distributedEdge$2fcc963$1")) {
                    z = 3;
                    break;
                }
                break;
            case -579639845:
                if (implMethodName.equals("lambda$twoConsecutiveRebalance$a441ef18$1")) {
                    z = 11;
                    break;
                }
                break;
            case -210641270:
                if (implMethodName.equals("lambda$when_rebalanceByKeyAndMap_then_dagEdgePartitionedDistributed$785790e6$1")) {
                    z = 20;
                    break;
                }
                break;
            case 96667762:
                if (implMethodName.equals("entry")) {
                    z = 16;
                    break;
                }
                break;
            case 142383239:
                if (implMethodName.equals("lambda$when_rebalanceAndPeekAndMap_then_dagEdgeDistributed$785790e6$1")) {
                    z = 21;
                    break;
                }
                break;
            case 388913911:
                if (implMethodName.equals("lambda$when_rebalanceByKeyAndMap_then_dagEdgePartitionedDistributed$688b6e52$1")) {
                    z = 5;
                    break;
                }
                break;
            case 652153280:
                if (implMethodName.equals("lambda$when_rebalanceAndGroupAggregate_then_singleStageAggregation$52c622ae$1")) {
                    z = 6;
                    break;
                }
                break;
            case 760853096:
                if (implMethodName.equals("lambda$twoConsecutiveRebalance$c8cc76b3$1")) {
                    z = 17;
                    break;
                }
                break;
            case 803702170:
                if (implMethodName.equals("lambda$when_hashJoinRebalanceMainStage_then_distributedEdge$5fb0c9c8$1")) {
                    z = 23;
                    break;
                }
                break;
            case 1073849109:
                if (implMethodName.equals("lambda$when_hashJoinBuilderRebalanceMainStage_then_distributedEdge$5fb0c9c8$1")) {
                    z = 2;
                    break;
                }
                break;
            case 1115624086:
                if (implMethodName.equals("lambda$when_peekAndRebalanceByKeyAndMap_then_dagEdgePartitionedDistributed$785790e6$1")) {
                    z = 12;
                    break;
                }
                break;
            case 1122856526:
                if (implMethodName.equals("lambda$when_rebalanceByKeyAndPeekAndMap_then_dagEdgePartitionedDistributed$785790e6$1")) {
                    z = 9;
                    break;
                }
                break;
            case 1345143113:
                if (implMethodName.equals("lambda$static$3498aefe$1")) {
                    z = 10;
                    break;
                }
                break;
            case 1384425850:
                if (implMethodName.equals("lambda$when_rebalanceAndGroupAggregate2_then_singleStageAggregation$52c622ae$1")) {
                    z = 25;
                    break;
                }
                break;
            case 1399348503:
                if (implMethodName.equals("lambda$when_rebalanceAndMap_then_dagEdgeDistributed$785790e6$1")) {
                    z = 26;
                    break;
                }
                break;
            case 1715179267:
                if (implMethodName.equals("lambda$when_peekAndRebalanceByKeyAndMap_then_dagEdgePartitionedDistributed$688b6e52$1")) {
                    z = 19;
                    break;
                }
                break;
            case 1722411707:
                if (implMethodName.equals("lambda$when_rebalanceByKeyAndPeekAndMap_then_dagEdgePartitionedDistributed$688b6e52$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/RebalanceBatchStageTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num -> {
                        return Integer.valueOf(num.intValue() % 10);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/RebalanceBatchStageTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num2 -> {
                        return num2;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/RebalanceBatchStageTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Ljava/lang/Integer;)Ljava/util/Map$Entry;")) {
                    String str = (String) serializedLambda.getCapturedArg(0);
                    return num3 -> {
                        return Util.entry(num3, str + num3);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/BiFunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/RebalanceBatchStageTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;Ljava/lang/String;)Ljava/util/Map$Entry;")) {
                    return (num22, str2) -> {
                        return Util.entry(num22, str2);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/PredicateEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("testEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/RebalanceBatchStageTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Z")) {
                    return num23 -> {
                        return true;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/RebalanceBatchStageTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num24 -> {
                        return num24;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/RebalanceBatchStageTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num4 -> {
                        return Integer.valueOf(num4.intValue() % 5);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/PredicateEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("testEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/RebalanceBatchStageTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Z")) {
                    return num5 -> {
                        return true;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/RebalanceBatchStageTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num6 -> {
                        return num6;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/RebalanceBatchStageTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/String;")) {
                    return num25 -> {
                        return String.format("%04d-string", num25);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/ToLongFunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyAsLongEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)J") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/RebalanceBatchStageTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)J")) {
                    return num7 -> {
                        return num7.intValue();
                    };
                }
                break;
            case SelectAllMembers.CLASS_ID /* 11 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/RebalanceBatchStageTest") && serializedLambda.getImplMethodSignature().equals("(Lcom/hazelcast/jet/core/Processor$Context;)Lcom/hazelcast/collection/IList;")) {
                    return context -> {
                        return context.hazelcastInstance().getList("result" + context.globalProcessorIndex());
                    };
                }
                break;
            case SelectNoMembers.CLASS_ID /* 12 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/RebalanceBatchStageTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/String;")) {
                    return num8 -> {
                        return String.format("%04d-string", num8);
                    };
                }
                break;
            case SerializedCounterCallable.CLASS_ID /* 13 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/RebalanceBatchStageTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Ljava/lang/Integer;)Ljava/util/Map$Entry;")) {
                    String str3 = (String) serializedLambda.getCapturedArg(0);
                    return num9 -> {
                        return Util.entry(num9, str3 + num9);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/RebalanceBatchStageTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num10 -> {
                        return Integer.valueOf(num10.intValue() % 10);
                    };
                }
                break;
            case CustomCredentials.CLASS_ID /* 15 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/BiFunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/RebalanceBatchStageTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;Lcom/hazelcast/jet/datamodel/ItemsByTag;)Ljava/util/Map$Entry;")) {
                    return (num26, itemsByTag) -> {
                        return Util.entry(num26, itemsByTag);
                    };
                }
                break;
            case Multiplication.CLASS_ID /* 16 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/BiFunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/Util") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/util/Map$Entry;")) {
                    return (v0, v1) -> {
                        return Util.entry(v0, v1);
                    };
                }
                break;
            case AppendString.CLASS_ID /* 17 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/BiConsumerEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("acceptEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/RebalanceBatchStageTest") && serializedLambda.getImplMethodSignature().equals("(Lcom/hazelcast/collection/IList;Ljava/lang/Object;)V")) {
                    return (iList, obj) -> {
                        iList.add(obj);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/RebalanceBatchStageTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/String;")) {
                    return num11 -> {
                        return String.format("%04d-string", num11);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/RebalanceBatchStageTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num27 -> {
                        return num27;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/RebalanceBatchStageTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/String;")) {
                    return num12 -> {
                        return String.format("%04d-string", num12);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/RebalanceBatchStageTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/String;")) {
                    return num13 -> {
                        return String.format("%04d-string", num13);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/RebalanceBatchStageTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num14 -> {
                        return Integer.valueOf(num14.intValue() % 10);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/RebalanceBatchStageTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Ljava/lang/Integer;)Ljava/util/Map$Entry;")) {
                    String str4 = (String) serializedLambda.getCapturedArg(0);
                    return num15 -> {
                        return Util.entry(num15, str4 + num15);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/RebalanceBatchStageTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num16 -> {
                        return Integer.valueOf(2 * num16.intValue());
                    };
                }
                break;
            case TopicStressTest.MAX_PUBLISH_DELAY_MILLIS /* 25 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/RebalanceBatchStageTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num17 -> {
                        return Integer.valueOf(num17.intValue() % 10);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/RebalanceBatchStageTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/String;")) {
                    return num18 -> {
                        return String.format("%04d-string", num18);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
