/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.RequestFuture;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.HeartbeatRequest;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;

public class ConsumerNetworkClientTest {
    private String topicName = "test";
    private MockTime time = new MockTime(1L);
    private MockClient client = new MockClient(this.time);
    private Cluster cluster = TestUtils.singletonCluster(this.topicName, 1);
    private Node node = (Node)this.cluster.nodes().get(0);
    private Metadata metadata = new Metadata(0L, Long.MAX_VALUE, true);
    private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(new LogContext(), (KafkaClient)this.client, this.metadata, (Time)this.time, 100L, 1000L, Integer.MAX_VALUE);

    @Test
    public void send() {
        this.client.prepareResponse((AbstractResponse)this.heartbeatResponse(Errors.NONE));
        RequestFuture future = this.consumerClient.send(this.node, (AbstractRequest.Builder)this.heartbeat());
        Assert.assertEquals((long)1L, (long)this.consumerClient.pendingRequestCount());
        Assert.assertEquals((long)1L, (long)this.consumerClient.pendingRequestCount(this.node));
        Assert.assertFalse((boolean)future.isDone());
        this.consumerClient.poll(future);
        Assert.assertTrue((boolean)future.isDone());
        Assert.assertTrue((boolean)future.succeeded());
        ClientResponse clientResponse = (ClientResponse)future.value();
        HeartbeatResponse response = (HeartbeatResponse)clientResponse.responseBody();
        Assert.assertEquals((Object)Errors.NONE, (Object)response.error());
    }

    @Test
    public void multiSend() {
        this.client.prepareResponse((AbstractResponse)this.heartbeatResponse(Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.heartbeatResponse(Errors.NONE));
        RequestFuture future1 = this.consumerClient.send(this.node, (AbstractRequest.Builder)this.heartbeat());
        RequestFuture future2 = this.consumerClient.send(this.node, (AbstractRequest.Builder)this.heartbeat());
        Assert.assertEquals((long)2L, (long)this.consumerClient.pendingRequestCount());
        Assert.assertEquals((long)2L, (long)this.consumerClient.pendingRequestCount(this.node));
        this.consumerClient.awaitPendingRequests(this.node, Long.MAX_VALUE);
        Assert.assertTrue((boolean)future1.succeeded());
        Assert.assertTrue((boolean)future2.succeeded());
    }

    @Test
    public void testDisconnectWithUnsentRequests() {
        RequestFuture future = this.consumerClient.send(this.node, (AbstractRequest.Builder)this.heartbeat());
        Assert.assertTrue((boolean)this.consumerClient.hasPendingRequests(this.node));
        Assert.assertFalse((boolean)this.client.hasInFlightRequests(this.node.idString()));
        this.consumerClient.disconnectAsync(this.node);
        this.consumerClient.pollNoWakeup();
        Assert.assertTrue((boolean)future.failed());
        Assert.assertTrue((boolean)(future.exception() instanceof DisconnectException));
    }

    @Test
    public void testDisconnectWithInFlightRequests() {
        RequestFuture future = this.consumerClient.send(this.node, (AbstractRequest.Builder)this.heartbeat());
        this.consumerClient.pollNoWakeup();
        Assert.assertTrue((boolean)this.consumerClient.hasPendingRequests(this.node));
        Assert.assertTrue((boolean)this.client.hasInFlightRequests(this.node.idString()));
        this.consumerClient.disconnectAsync(this.node);
        this.consumerClient.pollNoWakeup();
        Assert.assertTrue((boolean)future.failed());
        Assert.assertTrue((boolean)(future.exception() instanceof DisconnectException));
    }

    @Test
    public void doNotBlockIfPollConditionIsSatisfied() {
        NetworkClient mockNetworkClient = (NetworkClient)EasyMock.mock(NetworkClient.class);
        ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(new LogContext(), (KafkaClient)mockNetworkClient, this.metadata, (Time)this.time, 100L, 1000L, Integer.MAX_VALUE);
        EasyMock.expect((Object)mockNetworkClient.poll(EasyMock.eq((long)0L), EasyMock.anyLong())).andReturn(Collections.emptyList());
        EasyMock.replay((Object[])new Object[]{mockNetworkClient});
        consumerClient.poll(Long.MAX_VALUE, this.time.milliseconds(), new ConsumerNetworkClient.PollCondition(){

            public boolean shouldBlock() {
                return false;
            }
        });
        EasyMock.verify((Object[])new Object[]{mockNetworkClient});
    }

    @Test
    public void blockWhenPollConditionNotSatisfied() {
        long timeout = 4000L;
        NetworkClient mockNetworkClient = (NetworkClient)EasyMock.mock(NetworkClient.class);
        ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(new LogContext(), (KafkaClient)mockNetworkClient, this.metadata, (Time)this.time, 100L, 1000L, Integer.MAX_VALUE);
        EasyMock.expect((Object)mockNetworkClient.inFlightRequestCount()).andReturn((Object)1);
        EasyMock.expect((Object)mockNetworkClient.poll(EasyMock.eq((long)timeout), EasyMock.anyLong())).andReturn(Collections.emptyList());
        EasyMock.replay((Object[])new Object[]{mockNetworkClient});
        consumerClient.poll(timeout, this.time.milliseconds(), new ConsumerNetworkClient.PollCondition(){

            public boolean shouldBlock() {
                return true;
            }
        });
        EasyMock.verify((Object[])new Object[]{mockNetworkClient});
    }

    @Test
    public void blockOnlyForRetryBackoffIfNoInflightRequests() {
        long retryBackoffMs = 100L;
        NetworkClient mockNetworkClient = (NetworkClient)EasyMock.mock(NetworkClient.class);
        ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(new LogContext(), (KafkaClient)mockNetworkClient, this.metadata, (Time)this.time, retryBackoffMs, 1000L, Integer.MAX_VALUE);
        EasyMock.expect((Object)mockNetworkClient.inFlightRequestCount()).andReturn((Object)0);
        EasyMock.expect((Object)mockNetworkClient.poll(EasyMock.eq((long)retryBackoffMs), EasyMock.anyLong())).andReturn(Collections.emptyList());
        EasyMock.replay((Object[])new Object[]{mockNetworkClient});
        consumerClient.poll(Long.MAX_VALUE, this.time.milliseconds(), new ConsumerNetworkClient.PollCondition(){

            public boolean shouldBlock() {
                return true;
            }
        });
        EasyMock.verify((Object[])new Object[]{mockNetworkClient});
    }

    @Test
    public void wakeup() {
        RequestFuture future = this.consumerClient.send(this.node, (AbstractRequest.Builder)this.heartbeat());
        this.consumerClient.wakeup();
        try {
            this.consumerClient.poll(0L);
            Assert.fail();
        }
        catch (WakeupException wakeupException) {
            // empty catch block
        }
        this.client.respond((AbstractResponse)this.heartbeatResponse(Errors.NONE));
        this.consumerClient.poll(future);
        Assert.assertTrue((boolean)future.isDone());
    }

    @Test
    public void testDisconnectWakesUpPoll() throws Exception {
        final RequestFuture future = this.consumerClient.send(this.node, (AbstractRequest.Builder)this.heartbeat());
        this.client.enableBlockingUntilWakeup(1);
        Thread t = new Thread(){

            @Override
            public void run() {
                ConsumerNetworkClientTest.this.consumerClient.poll(future);
            }
        };
        t.start();
        this.consumerClient.disconnectAsync(this.node);
        t.join();
        Assert.assertTrue((boolean)future.failed());
        Assert.assertTrue((boolean)(future.exception() instanceof DisconnectException));
    }

    @Test
    public void testFutureCompletionOutsidePoll() throws Exception {
        final RequestFuture future = this.consumerClient.send(this.node, (AbstractRequest.Builder)this.heartbeat());
        this.consumerClient.pollNoWakeup();
        this.client.enableBlockingUntilWakeup(2);
        Thread t1 = new Thread(){

            @Override
            public void run() {
                ConsumerNetworkClientTest.this.consumerClient.pollNoWakeup();
            }
        };
        t1.start();
        Thread.sleep(50L);
        Thread t2 = new Thread(){

            @Override
            public void run() {
                ConsumerNetworkClientTest.this.consumerClient.poll(future);
            }
        };
        t2.start();
        Thread.sleep(50L);
        this.client.respond((AbstractResponse)this.heartbeatResponse(Errors.NONE));
        this.client.wakeup();
        t1.join();
        t2.join();
        Assert.assertTrue((boolean)future.succeeded());
    }

    @Test
    public void testAwaitForMetadataUpdateWithTimeout() {
        Assert.assertFalse((boolean)this.consumerClient.awaitMetadataUpdate(10L));
    }

    @Test
    public void sendExpiry() throws InterruptedException {
        long unsentExpiryMs = 10L;
        final AtomicBoolean isReady = new AtomicBoolean();
        final AtomicBoolean disconnected = new AtomicBoolean();
        this.client = new MockClient(this.time){

            @Override
            public boolean ready(Node node, long now) {
                if (isReady.get()) {
                    return super.ready(node, now);
                }
                return false;
            }

            @Override
            public boolean connectionFailed(Node node) {
                return disconnected.get();
            }
        };
        this.consumerClient = new ConsumerNetworkClient(new LogContext(), (KafkaClient)this.client, this.metadata, (Time)this.time, 100L, unsentExpiryMs, Integer.MAX_VALUE);
        RequestFuture future1 = this.consumerClient.send(this.node, (AbstractRequest.Builder)this.heartbeat());
        Assert.assertEquals((long)1L, (long)this.consumerClient.pendingRequestCount());
        Assert.assertEquals((long)1L, (long)this.consumerClient.pendingRequestCount(this.node));
        Assert.assertFalse((boolean)future1.isDone());
        this.time.sleep(unsentExpiryMs + 1L);
        RequestFuture future2 = this.consumerClient.send(this.node, (AbstractRequest.Builder)this.heartbeat());
        Assert.assertEquals((long)2L, (long)this.consumerClient.pendingRequestCount());
        Assert.assertEquals((long)2L, (long)this.consumerClient.pendingRequestCount(this.node));
        Assert.assertFalse((boolean)future2.isDone());
        this.consumerClient.poll(0L);
        Assert.assertTrue((boolean)future1.isDone());
        Assert.assertFalse((boolean)future1.succeeded());
        Assert.assertEquals((long)1L, (long)this.consumerClient.pendingRequestCount());
        Assert.assertEquals((long)1L, (long)this.consumerClient.pendingRequestCount(this.node));
        Assert.assertFalse((boolean)future2.isDone());
        isReady.set(true);
        this.client.prepareResponse((AbstractResponse)this.heartbeatResponse(Errors.NONE));
        this.consumerClient.poll(future2);
        ClientResponse clientResponse = (ClientResponse)future2.value();
        HeartbeatResponse response = (HeartbeatResponse)clientResponse.responseBody();
        Assert.assertEquals((Object)Errors.NONE, (Object)response.error());
        isReady.set(false);
        RequestFuture future3 = this.consumerClient.send(this.node, (AbstractRequest.Builder)this.heartbeat());
        Assert.assertEquals((long)1L, (long)this.consumerClient.pendingRequestCount());
        Assert.assertEquals((long)1L, (long)this.consumerClient.pendingRequestCount(this.node));
        disconnected.set(true);
        this.consumerClient.poll(0L);
        Assert.assertTrue((boolean)future3.isDone());
        Assert.assertFalse((boolean)future3.succeeded());
        Assert.assertEquals((long)0L, (long)this.consumerClient.pendingRequestCount());
        Assert.assertEquals((long)0L, (long)this.consumerClient.pendingRequestCount(this.node));
    }

    private HeartbeatRequest.Builder heartbeat() {
        return new HeartbeatRequest.Builder("group", 1, "memberId");
    }

    private HeartbeatResponse heartbeatResponse(Errors error) {
        return new HeartbeatResponse(error);
    }
}

