package org.apache.rocketmq.client.java.example;

import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.class */
public class AsyncSimpleConsumerExample {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) AsyncSimpleConsumerExample.class);

    private AsyncSimpleConsumerExample() {
    }

    public static void main(String[] strArr) throws ClientException, IOException, InterruptedException {
        ClientServiceProvider loadService = ClientServiceProvider.loadService();
        ClientConfiguration build = ClientConfiguration.newBuilder().setEndpoints("foobar.com:8080").setCredentialProvider(new StaticSessionCredentialsProvider("yourAccessKey", "yourSecretKey")).build();
        SimpleConsumer build2 = loadService.newSimpleConsumerBuilder().setClientConfiguration(build).setConsumerGroup("yourConsumerGroup").setAwaitDuration(Duration.ofSeconds(30L)).setSubscriptionExpressions(Collections.singletonMap("yourTopic", new FilterExpression("yourMessageTagA", FilterExpressionType.TAG))).build();
        build2.receiveAsync(16, Duration.ofSeconds(15L)).thenAccept(list -> {
            log.info("Received {} message(s)", Integer.valueOf(list.size()));
            Stream stream = list.stream();
            Function function = messageView -> {
                return messageView;
            };
            Objects.requireNonNull(build2);
            for (Map.Entry entry : ((Map) stream.collect(Collectors.toMap(function, build2::ackAsync))).entrySet()) {
                MessageId messageId = ((MessageView) entry.getKey()).getMessageId();
                ((CompletableFuture) entry.getValue()).thenAccept(r5 -> {
                    log.info("Message is acknowledged successfully, messageId={}", messageId);
                }).exceptionally(th -> {
                    log.error("Message is failed to be acknowledged, messageId={}", messageId);
                    return null;
                });
            }
        }).exceptionally(th -> {
            log.error("Failed to receive message from remote", th);
            return null;
        });
        Thread.sleep(Long.MAX_VALUE);
        build2.close();
    }
}
