package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.regex.Pattern;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DeleteRecordsResult;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
import org.easymock.MockType;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.IsEqual;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;

@RunWith(EasyMockRunner.class)
/* loaded from: input_file:org/apache/kafka/streams/processor/internals/TaskManagerTest.class */
public class TaskManagerTest {

    @Mock(type = MockType.STRICT)
    private InternalTopologyBuilder.SubscriptionUpdates subscriptionUpdates;

    @Mock(type = MockType.STRICT)
    private InternalTopologyBuilder topologyBuilder;

    @Mock(type = MockType.STRICT)
    private StateDirectory stateDirectory;

    @Mock(type = MockType.NICE)
    private ChangelogReader changeLogReader;

    @Mock(type = MockType.NICE)
    private StreamsMetadataState streamsMetadataState;

    @Mock(type = MockType.NICE)
    private Consumer<byte[], byte[]> restoreConsumer;

    @Mock(type = MockType.NICE)
    private Consumer<byte[], byte[]> consumer;

    @Mock(type = MockType.NICE)
    private StreamThread.AbstractTaskCreator<StreamTask> activeTaskCreator;

    @Mock(type = MockType.NICE)
    private StreamThread.AbstractTaskCreator<StandbyTask> standbyTaskCreator;

    @Mock(type = MockType.NICE)
    private AdminClient adminClient;

    @Mock(type = MockType.NICE)
    private StreamTask streamTask;

    @Mock(type = MockType.NICE)
    private StandbyTask standbyTask;

    @Mock(type = MockType.NICE)
    private AssignedStreamsTasks active;

    @Mock(type = MockType.NICE)
    private AssignedStandbyTasks standby;
    private TaskManager taskManager;
    private final TaskId taskId0 = new TaskId(0, 0);
    private final TopicPartition t1p0 = new TopicPartition("t1", 0);
    private final Set<TopicPartition> taskId0Partitions = Utils.mkSet(new TopicPartition[]{this.t1p0});
    private final Map<TaskId, Set<TopicPartition>> taskId0Assignment = Collections.singletonMap(this.taskId0, this.taskId0Partitions);
    private final String topic1 = "topic1";
    private final String topic2 = "topic2";
    private final TopicPartition t1p1 = new TopicPartition("topic1", 1);
    private final TopicPartition t1p2 = new TopicPartition("topic1", 2);
    private final TopicPartition t1p3 = new TopicPartition("topic1", 3);
    private final TopicPartition t2p1 = new TopicPartition("topic2", 1);
    private final TopicPartition t2p2 = new TopicPartition("topic2", 2);
    private final TopicPartition t2p3 = new TopicPartition("topic2", 3);
    private final TaskId task01 = new TaskId(0, 1);
    private final TaskId task02 = new TaskId(0, 2);
    private final TaskId task03 = new TaskId(0, 3);
    private final TaskId task11 = new TaskId(1, 1);

    @Rule
    public final TemporaryFolder testFolder = new TemporaryFolder();

    @Before
    public void setUp() {
        this.taskManager = new TaskManager(this.changeLogReader, UUID.randomUUID(), "", this.restoreConsumer, this.streamsMetadataState, this.activeTaskCreator, this.standbyTaskCreator, this.adminClient, this.active, this.standby);
        this.taskManager.setConsumer(this.consumer);
    }

    private void replay() {
        EasyMock.replay(new Object[]{this.changeLogReader, this.restoreConsumer, this.consumer, this.activeTaskCreator, this.standbyTaskCreator, this.active, this.standby, this.adminClient});
    }

    @Test
    public void shouldUpdateSubscriptionFromAssignment() {
        mockTopologyBuilder();
        EasyMock.expect(this.subscriptionUpdates.getUpdates()).andReturn(Utils.mkSet(new String[]{"topic1"}));
        this.topologyBuilder.updateSubscribedTopics((Set) EasyMock.eq(Utils.mkSet(new String[]{"topic1", "topic2"})), EasyMock.anyString());
        EasyMock.expectLastCall().once();
        EasyMock.replay(new Object[]{this.activeTaskCreator, this.topologyBuilder, this.subscriptionUpdates});
        this.taskManager.updateSubscriptionsFromAssignment(Utils.mkList(new TopicPartition[]{this.t1p1, this.t2p1}));
        EasyMock.verify(new Object[]{this.activeTaskCreator, this.topologyBuilder, this.subscriptionUpdates});
    }

    @Test
    public void shouldNotUpdateSubscriptionFromAssignment() {
        mockTopologyBuilder();
        EasyMock.expect(this.subscriptionUpdates.getUpdates()).andReturn(Utils.mkSet(new String[]{"topic1", "topic2"}));
        EasyMock.replay(new Object[]{this.activeTaskCreator, this.topologyBuilder, this.subscriptionUpdates});
        this.taskManager.updateSubscriptionsFromAssignment(Utils.mkList(new TopicPartition[]{this.t1p1}));
        EasyMock.verify(new Object[]{this.activeTaskCreator, this.topologyBuilder, this.subscriptionUpdates});
    }

    @Test
    public void shouldUpdateSubscriptionFromMetadata() {
        mockTopologyBuilder();
        EasyMock.expect(this.subscriptionUpdates.getUpdates()).andReturn(Utils.mkSet(new String[]{"topic1"}));
        this.topologyBuilder.updateSubscribedTopics((Set) EasyMock.eq(Utils.mkSet(new String[]{"topic1", "topic2"})), EasyMock.anyString());
        EasyMock.expectLastCall().once();
        EasyMock.replay(new Object[]{this.activeTaskCreator, this.topologyBuilder, this.subscriptionUpdates});
        this.taskManager.updateSubscriptionsFromMetadata(Utils.mkSet(new String[]{"topic1", "topic2"}));
        EasyMock.verify(new Object[]{this.activeTaskCreator, this.topologyBuilder, this.subscriptionUpdates});
    }

    @Test
    public void shouldNotUpdateSubscriptionFromMetadata() {
        mockTopologyBuilder();
        EasyMock.expect(this.subscriptionUpdates.getUpdates()).andReturn(Utils.mkSet(new String[]{"topic1"}));
        EasyMock.replay(new Object[]{this.activeTaskCreator, this.topologyBuilder, this.subscriptionUpdates});
        this.taskManager.updateSubscriptionsFromMetadata(Utils.mkSet(new String[]{"topic1"}));
        EasyMock.verify(new Object[]{this.activeTaskCreator, this.topologyBuilder, this.subscriptionUpdates});
    }

    @Test
    public void shouldReturnCachedTaskIdsFromDirectory() throws IOException {
        File[] fileArr = (File[]) Utils.mkList(new File[]{this.testFolder.newFolder("0_1"), this.testFolder.newFolder("0_2"), this.testFolder.newFolder("0_3"), this.testFolder.newFolder("1_1"), this.testFolder.newFolder("dummy")}).toArray(new File[0]);
        Assert.assertTrue(new File(fileArr[0], ".checkpoint").createNewFile());
        Assert.assertTrue(new File(fileArr[1], ".checkpoint").createNewFile());
        Assert.assertTrue(new File(fileArr[3], ".checkpoint").createNewFile());
        EasyMock.expect(this.activeTaskCreator.stateDirectory()).andReturn(this.stateDirectory).once();
        EasyMock.expect(this.stateDirectory.listTaskDirectories()).andReturn(fileArr).once();
        EasyMock.replay(new Object[]{this.activeTaskCreator, this.stateDirectory});
        Set cachedTasksIds = this.taskManager.cachedTasksIds();
        EasyMock.verify(new Object[]{this.activeTaskCreator, this.stateDirectory});
        MatcherAssert.assertThat(cachedTasksIds, IsEqual.equalTo(Utils.mkSet(new TaskId[]{this.task01, this.task02, this.task11})));
    }

    @Test
    public void shouldCloseActiveUnAssignedSuspendedTasksWhenCreatingNewTasks() {
        mockSingleActiveTask();
        this.active.closeNonAssignedSuspendedTasks(this.taskId0Assignment);
        EasyMock.expectLastCall();
        replay();
        this.taskManager.setAssignmentMetadata(this.taskId0Assignment, Collections.emptyMap());
        this.taskManager.createTasks(this.taskId0Partitions);
        EasyMock.verify(new Object[]{this.active});
    }

    @Test
    public void shouldCloseStandbyUnAssignedSuspendedTasksWhenCreatingNewTasks() {
        mockSingleActiveTask();
        this.standby.closeNonAssignedSuspendedTasks(this.taskId0Assignment);
        EasyMock.expectLastCall();
        replay();
        this.taskManager.setAssignmentMetadata(this.taskId0Assignment, Collections.emptyMap());
        this.taskManager.createTasks(this.taskId0Partitions);
        EasyMock.verify(new Object[]{this.active});
    }

    @Test
    public void shouldResetChangeLogReaderOnCreateTasks() {
        mockSingleActiveTask();
        this.changeLogReader.reset();
        EasyMock.expectLastCall();
        replay();
        this.taskManager.setAssignmentMetadata(this.taskId0Assignment, Collections.emptyMap());
        this.taskManager.createTasks(this.taskId0Partitions);
        EasyMock.verify(new Object[]{this.changeLogReader});
    }

    @Test
    public void shouldAddNonResumedActiveTasks() {
        mockSingleActiveTask();
        EasyMock.expect(Boolean.valueOf(this.active.maybeResumeSuspendedTask(this.taskId0, this.taskId0Partitions))).andReturn(false);
        this.active.addNewTask((Task) EasyMock.same(this.streamTask));
        replay();
        this.taskManager.setAssignmentMetadata(this.taskId0Assignment, Collections.emptyMap());
        this.taskManager.createTasks(this.taskId0Partitions);
        EasyMock.verify(new Object[]{this.activeTaskCreator, this.active});
    }

    @Test
    public void shouldNotAddResumedActiveTasks() {
        EasyMock.checkOrder(this.active, true);
        EasyMock.expect(Boolean.valueOf(this.active.maybeResumeSuspendedTask(this.taskId0, this.taskId0Partitions))).andReturn(true);
        replay();
        this.taskManager.setAssignmentMetadata(this.taskId0Assignment, Collections.emptyMap());
        this.taskManager.createTasks(this.taskId0Partitions);
        EasyMock.verify(new Object[]{this.active, this.activeTaskCreator});
    }

    @Test
    public void shouldAddNonResumedStandbyTasks() {
        mockStandbyTaskExpectations();
        EasyMock.expect(Boolean.valueOf(this.standby.maybeResumeSuspendedTask(this.taskId0, this.taskId0Partitions))).andReturn(false);
        this.standby.addNewTask((Task) EasyMock.same(this.standbyTask));
        replay();
        this.taskManager.setAssignmentMetadata(Collections.emptyMap(), this.taskId0Assignment);
        this.taskManager.createTasks(this.taskId0Partitions);
        EasyMock.verify(new Object[]{this.standbyTaskCreator, this.active});
    }

    @Test
    public void shouldNotAddResumedStandbyTasks() {
        EasyMock.checkOrder(this.active, true);
        EasyMock.expect(Boolean.valueOf(this.standby.maybeResumeSuspendedTask(this.taskId0, this.taskId0Partitions))).andReturn(true);
        replay();
        this.taskManager.setAssignmentMetadata(Collections.emptyMap(), this.taskId0Assignment);
        this.taskManager.createTasks(this.taskId0Partitions);
        EasyMock.verify(new Object[]{this.standby, this.standbyTaskCreator});
    }

    @Test
    public void shouldPauseActivePartitions() {
        mockSingleActiveTask();
        this.consumer.pause(this.taskId0Partitions);
        EasyMock.expectLastCall();
        replay();
        this.taskManager.setAssignmentMetadata(this.taskId0Assignment, Collections.emptyMap());
        this.taskManager.createTasks(this.taskId0Partitions);
        EasyMock.verify(new Object[]{this.consumer});
    }

    @Test
    public void shouldSuspendActiveTasks() {
        EasyMock.expect(this.active.suspend()).andReturn((Object) null);
        replay();
        this.taskManager.suspendTasksAndState();
        EasyMock.verify(new Object[]{this.active});
    }

    @Test
    public void shouldSuspendStandbyTasks() {
        EasyMock.expect(this.standby.suspend()).andReturn((Object) null);
        replay();
        this.taskManager.suspendTasksAndState();
        EasyMock.verify(new Object[]{this.standby});
    }

    @Test
    public void shouldUnassignChangelogPartitionsOnSuspend() {
        this.restoreConsumer.unsubscribe();
        EasyMock.expectLastCall();
        replay();
        this.taskManager.suspendTasksAndState();
        EasyMock.verify(new Object[]{this.restoreConsumer});
    }

    @Test
    public void shouldThrowStreamsExceptionAtEndIfExceptionDuringSuspend() {
        EasyMock.expect(this.active.suspend()).andReturn(new RuntimeException(""));
        EasyMock.expect(this.standby.suspend()).andReturn(new RuntimeException(""));
        EasyMock.expectLastCall();
        this.restoreConsumer.unsubscribe();
        replay();
        try {
            this.taskManager.suspendTasksAndState();
            Assert.fail("Should have thrown streams exception");
        } catch (StreamsException e) {
        }
        EasyMock.verify(new Object[]{this.restoreConsumer, this.active, this.standby});
    }

    @Test
    public void shouldCloseActiveTasksOnShutdown() {
        this.active.close(true);
        EasyMock.expectLastCall();
        replay();
        this.taskManager.shutdown(true);
        EasyMock.verify(new Object[]{this.active});
    }

    @Test
    public void shouldCloseStandbyTasksOnShutdown() {
        this.standby.close(false);
        EasyMock.expectLastCall();
        replay();
        this.taskManager.shutdown(false);
        EasyMock.verify(new Object[]{this.standby});
    }

    @Test
    public void shouldUnassignChangelogPartitionsOnShutdown() {
        this.restoreConsumer.unsubscribe();
        EasyMock.expectLastCall();
        replay();
        this.taskManager.shutdown(true);
        EasyMock.verify(new Object[]{this.restoreConsumer});
    }

    @Test
    public void shouldInitializeNewActiveTasks() {
        this.active.updateRestored((Collection) EasyMock.anyObject());
        EasyMock.expectLastCall();
        replay();
        this.taskManager.updateNewAndRestoringTasks();
        EasyMock.verify(new Object[]{this.active});
    }

    @Test
    public void shouldInitializeNewStandbyTasks() {
        this.active.updateRestored((Collection) EasyMock.anyObject());
        EasyMock.expectLastCall();
        replay();
        this.taskManager.updateNewAndRestoringTasks();
        EasyMock.verify(new Object[]{this.standby});
    }

    @Test
    public void shouldRestoreStateFromChangeLogReader() {
        EasyMock.expect(this.changeLogReader.restore(this.active)).andReturn(this.taskId0Partitions);
        this.active.updateRestored(this.taskId0Partitions);
        EasyMock.expectLastCall();
        replay();
        this.taskManager.updateNewAndRestoringTasks();
        EasyMock.verify(new Object[]{this.changeLogReader, this.active});
    }

    @Test
    public void shouldResumeRestoredPartitions() {
        EasyMock.expect(this.changeLogReader.restore(this.active)).andReturn(this.taskId0Partitions);
        EasyMock.expect(Boolean.valueOf(this.active.allTasksRunning())).andReturn(true);
        EasyMock.expect(this.consumer.assignment()).andReturn(this.taskId0Partitions);
        EasyMock.expect(this.standby.running()).andReturn(Collections.emptySet());
        this.consumer.resume(this.taskId0Partitions);
        EasyMock.expectLastCall();
        replay();
        this.taskManager.updateNewAndRestoringTasks();
        EasyMock.verify(new Object[]{this.consumer});
    }

    @Test
    public void shouldAssignStandbyPartitionsWhenAllActiveTasksAreRunning() {
        mockAssignStandbyPartitions(1L);
        replay();
        Assert.assertTrue(this.taskManager.updateNewAndRestoringTasks());
        EasyMock.verify(new Object[]{this.restoreConsumer});
    }

    @Test
    public void shouldReturnFalseWhenThereAreStillNonRunningTasks() {
        EasyMock.expect(Boolean.valueOf(this.active.allTasksRunning())).andReturn(false);
        replay();
        Assert.assertFalse(this.taskManager.updateNewAndRestoringTasks());
    }

    @Test
    public void shouldSeekToCheckpointedOffsetOnStandbyPartitionsWhenOffsetGreaterThanEqualTo0() {
        mockAssignStandbyPartitions(1L);
        this.restoreConsumer.seek(this.t1p0, 1L);
        EasyMock.expectLastCall();
        replay();
        this.taskManager.updateNewAndRestoringTasks();
        EasyMock.verify(new Object[]{this.restoreConsumer});
    }

    @Test
    public void shouldSeekToBeginningIfOffsetIsLessThan0() {
        mockAssignStandbyPartitions(-1L);
        this.restoreConsumer.seekToBeginning(this.taskId0Partitions);
        EasyMock.expectLastCall();
        replay();
        this.taskManager.updateNewAndRestoringTasks();
        EasyMock.verify(new Object[]{this.restoreConsumer});
    }

    @Test
    public void shouldCommitActiveAndStandbyTasks() {
        EasyMock.expect(Integer.valueOf(this.active.commit())).andReturn(1);
        EasyMock.expect(Integer.valueOf(this.standby.commit())).andReturn(2);
        replay();
        MatcherAssert.assertThat(Integer.valueOf(this.taskManager.commitAll()), IsEqual.equalTo(3));
        EasyMock.verify(new Object[]{this.active, this.standby});
    }

    @Test
    public void shouldPropagateExceptionFromActiveCommit() {
        EasyMock.checkOrder(this.standby, true);
        this.active.commit();
        EasyMock.expectLastCall().andThrow(new RuntimeException(""));
        replay();
        try {
            this.taskManager.commitAll();
            Assert.fail("should have thrown first exception");
        } catch (Exception e) {
        }
        EasyMock.verify(new Object[]{this.active, this.standby});
    }

    @Test
    public void shouldPropagateExceptionFromStandbyCommit() {
        EasyMock.expect(Integer.valueOf(this.standby.commit())).andThrow(new RuntimeException(""));
        replay();
        try {
            this.taskManager.commitAll();
            Assert.fail("should have thrown exception");
        } catch (Exception e) {
        }
        EasyMock.verify(new Object[]{this.standby});
    }

    @Test
    public void shouldSendPurgeData() {
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        Map singletonMap = Collections.singletonMap(this.t1p1, RecordsToDelete.beforeOffset(5L));
        DeleteRecordsResult deleteRecordsResult = new DeleteRecordsResult(Collections.singletonMap(this.t1p1, kafkaFutureImpl));
        kafkaFutureImpl.complete((Object) null);
        EasyMock.expect(this.active.recordsToDelete()).andReturn(Collections.singletonMap(this.t1p1, 5L)).times(2);
        EasyMock.expect(this.adminClient.deleteRecords(singletonMap)).andReturn(deleteRecordsResult).times(2);
        replay();
        this.taskManager.maybePurgeCommitedRecords();
        this.taskManager.maybePurgeCommitedRecords();
        EasyMock.verify(new Object[]{this.active, this.adminClient});
    }

    @Test
    public void shouldNotSendPurgeDataIfPreviousNotDone() {
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        Map singletonMap = Collections.singletonMap(this.t1p1, RecordsToDelete.beforeOffset(5L));
        DeleteRecordsResult deleteRecordsResult = new DeleteRecordsResult(Collections.singletonMap(this.t1p1, kafkaFutureImpl));
        EasyMock.expect(this.active.recordsToDelete()).andReturn(Collections.singletonMap(this.t1p1, 5L)).once();
        EasyMock.expect(this.adminClient.deleteRecords(singletonMap)).andReturn(deleteRecordsResult).once();
        replay();
        this.taskManager.maybePurgeCommitedRecords();
        this.taskManager.maybePurgeCommitedRecords();
        EasyMock.verify(new Object[]{this.active, this.adminClient});
    }

    @Test
    public void shouldIgnorePurgeDataErrors() {
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        Map singletonMap = Collections.singletonMap(this.t1p1, RecordsToDelete.beforeOffset(5L));
        DeleteRecordsResult deleteRecordsResult = new DeleteRecordsResult(Collections.singletonMap(this.t1p1, kafkaFutureImpl));
        kafkaFutureImpl.completeExceptionally(new Exception("KABOOM!"));
        EasyMock.expect(this.active.recordsToDelete()).andReturn(Collections.singletonMap(this.t1p1, 5L)).times(2);
        EasyMock.expect(this.adminClient.deleteRecords(singletonMap)).andReturn(deleteRecordsResult).times(2);
        replay();
        this.taskManager.maybePurgeCommitedRecords();
        this.taskManager.maybePurgeCommitedRecords();
        EasyMock.verify(new Object[]{this.active, this.adminClient});
    }

    @Test
    public void shouldMaybeCommitActiveTasks() {
        EasyMock.expect(Integer.valueOf(this.active.maybeCommitPerUserRequested())).andReturn(5);
        replay();
        MatcherAssert.assertThat(Integer.valueOf(this.taskManager.maybeCommitActiveTasksPerUserRequested()), IsEqual.equalTo(5));
        EasyMock.verify(new Object[]{this.active});
    }

    @Test
    public void shouldProcessActiveTasks() {
        EasyMock.expect(Integer.valueOf(this.active.process(0L))).andReturn(10);
        replay();
        MatcherAssert.assertThat(Integer.valueOf(this.taskManager.process(0L)), IsEqual.equalTo(10));
        EasyMock.verify(new Object[]{this.active});
    }

    @Test
    public void shouldPunctuateActiveTasks() {
        EasyMock.expect(Integer.valueOf(this.active.punctuate())).andReturn(20);
        replay();
        MatcherAssert.assertThat(Integer.valueOf(this.taskManager.punctuate()), IsEqual.equalTo(20));
        EasyMock.verify(new Object[]{this.active});
    }

    @Test
    public void shouldNotResumeConsumptionUntilAllStoresRestored() {
        EasyMock.expect(Boolean.valueOf(this.active.allTasksRunning())).andReturn(false);
        Consumer consumer = (Consumer) EasyMock.createStrictMock(Consumer.class);
        this.taskManager.setConsumer(consumer);
        EasyMock.replay(new Object[]{this.active, consumer});
        this.taskManager.updateNewAndRestoringTasks();
        EasyMock.verify(new Object[]{consumer});
    }

    @Test
    public void shouldUpdateTasksFromPartitionAssignment() {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        this.taskManager.setAssignmentMetadata(hashMap, hashMap2);
        Assert.assertTrue(this.taskManager.assignedActiveTasks().isEmpty());
        hashMap.put(this.task01, new HashSet(Arrays.asList(this.t1p1, this.t2p1)));
        hashMap.put(this.task02, new HashSet(Arrays.asList(this.t1p2, this.t2p2)));
        hashMap2.put(this.task03, new HashSet(Arrays.asList(this.t1p3, this.t2p3)));
        this.taskManager.setAssignmentMetadata(hashMap, hashMap2);
        MatcherAssert.assertThat(this.taskManager.assignedActiveTasks(), IsEqual.equalTo(hashMap));
        MatcherAssert.assertThat(this.taskManager.assignedStandbyTasks(), IsEqual.equalTo(hashMap2));
    }

    private void mockAssignStandbyPartitions(long j) {
        StandbyTask standbyTask = (StandbyTask) EasyMock.createNiceMock(StandbyTask.class);
        EasyMock.expect(Boolean.valueOf(this.active.allTasksRunning())).andReturn(true);
        EasyMock.expect(this.standby.running()).andReturn(Collections.singletonList(standbyTask));
        EasyMock.expect(standbyTask.checkpointedOffsets()).andReturn(Collections.singletonMap(this.t1p0, Long.valueOf(j)));
        this.restoreConsumer.assign(this.taskId0Partitions);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{standbyTask});
    }

    private void mockStandbyTaskExpectations() {
        EasyMock.expect(this.standbyTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(this.taskId0Assignment))).andReturn(Collections.singletonList(this.standbyTask));
    }

    private void mockSingleActiveTask() {
        EasyMock.expect(this.activeTaskCreator.createTasks((Consumer) EasyMock.anyObject(), (Map) EasyMock.eq(this.taskId0Assignment))).andReturn(Collections.singletonList(this.streamTask));
    }

    private void mockTopologyBuilder() {
        EasyMock.expect(this.activeTaskCreator.builder()).andReturn(this.topologyBuilder).anyTimes();
        EasyMock.expect(this.topologyBuilder.sourceTopicPattern()).andReturn(Pattern.compile("abc"));
        EasyMock.expect(this.topologyBuilder.subscriptionUpdates()).andReturn(this.subscriptionUpdates);
    }
}
