/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.kafka.client.consumer.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.kafka.client.common.impl.Helper;
import io.vertx.kafka.client.consumer.KafkaReadStream;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;

public class KafkaReadStreamImpl<K, V>
implements KafkaReadStream<K, V> {
    private static final AtomicInteger threadCount = new AtomicInteger(0);
    private final Context context;
    private final AtomicBoolean closed = new AtomicBoolean(true);
    private final Consumer<K, V> consumer;
    private final AtomicBoolean paused = new AtomicBoolean(true);
    private Handler<ConsumerRecord<K, V>> recordHandler;
    private Iterator<ConsumerRecord<K, V>> current;
    private Handler<Set<TopicPartition>> partitionsRevokedHandler;
    private Handler<Set<TopicPartition>> partitionsAssignedHandler;
    private ExecutorService worker;
    private final ConsumerRebalanceListener rebalanceListener = new ConsumerRebalanceListener(){

        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            Handler handler = KafkaReadStreamImpl.this.partitionsRevokedHandler;
            if (handler != null) {
                KafkaReadStreamImpl.this.context.runOnContext(v -> handler.handle(Helper.toSet(partitions)));
            }
        }

        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            Handler handler = KafkaReadStreamImpl.this.partitionsAssignedHandler;
            if (handler != null) {
                KafkaReadStreamImpl.this.context.runOnContext(v -> handler.handle(Helper.toSet(partitions)));
            }
        }
    };

    public KafkaReadStreamImpl(Context context, Consumer<K, V> consumer) {
        this.context = context;
        this.consumer = consumer;
    }

    private <T> void start(BiConsumer<Consumer<K, V>, Future<T>> task, Handler<AsyncResult<T>> handler) {
        this.worker = Executors.newSingleThreadExecutor(r -> new Thread(r, "vert.x-kafka-consumer-thread-" + threadCount.getAndIncrement()));
        this.submitTask(task, handler);
    }

    private <T> void submitTask(BiConsumer<Consumer<K, V>, Future<T>> task, Handler<AsyncResult<T>> handler) {
        this.worker.submit(() -> {
            block4: {
                Future future;
                if (handler != null) {
                    future = Future.future();
                    future.setHandler(handler);
                } else {
                    future = null;
                }
                try {
                    task.accept(this.consumer, future);
                }
                catch (Exception e) {
                    if (future == null || future.isComplete()) break block4;
                    future.fail((Throwable)e);
                }
            }
        });
    }

    private void pollRecords(Handler<ConsumerRecords<K, V>> handler) {
        this.worker.submit(() -> {
            if (!this.closed.get()) {
                try {
                    ConsumerRecords records = this.consumer.poll(1000L);
                    if (records != null && records.count() > 0) {
                        this.context.runOnContext(v -> handler.handle((Object)records));
                    } else {
                        this.pollRecords(handler);
                    }
                }
                catch (WakeupException wakeupException) {
                    // empty catch block
                }
            }
        });
    }

    private void schedule(long delay) {
        if (!this.paused.get()) {
            Handler<ConsumerRecord<K, V>> handler = this.recordHandler;
            if (delay > 0L) {
                this.context.owner().setTimer(delay, v -> this.run(handler));
            } else {
                this.context.runOnContext(v -> this.run(handler));
            }
        }
    }

    private void run(Handler<ConsumerRecord<K, V>> handler) {
        if (this.closed.get()) {
            return;
        }
        if (this.current == null || !this.current.hasNext()) {
            this.pollRecords(records -> {
                if (records != null && records.count() > 0) {
                    this.current = records.iterator();
                    this.schedule(0L);
                } else {
                    this.schedule(1L);
                }
            });
        } else {
            int count = 0;
            while (this.current.hasNext() && count++ < 10) {
                ConsumerRecord<K, V> next = this.current.next();
                if (handler == null) continue;
                handler.handle(next);
            }
            this.schedule(0L);
        }
    }

    @Override
    public KafkaReadStream<K, V> pause(Set<TopicPartition> topicPartitions) {
        return this.pause(topicPartitions, null);
    }

    @Override
    public KafkaReadStream<K, V> pause(Set<TopicPartition> topicPartitions, Handler<AsyncResult<Void>> completionHandler) {
        this.submitTask((consumer, future) -> {
            consumer.pause((Collection)topicPartitions);
            if (future != null) {
                future.complete();
            }
        }, completionHandler);
        return this;
    }

    @Override
    public void paused(Handler<AsyncResult<Set<TopicPartition>>> handler) {
        this.submitTask((consumer, future) -> {
            Set result = consumer.paused();
            if (future != null) {
                future.complete((Object)result);
            }
        }, handler);
    }

    @Override
    public KafkaReadStream<K, V> resume(Set<TopicPartition> topicPartitions) {
        return this.resume(topicPartitions, null);
    }

    @Override
    public KafkaReadStream<K, V> resume(Set<TopicPartition> topicPartitions, Handler<AsyncResult<Void>> completionHandler) {
        this.submitTask((consumer, future) -> {
            consumer.resume((Collection)topicPartitions);
            if (future != null) {
                future.complete();
            }
        }, completionHandler);
        return this;
    }

    @Override
    public void committed(TopicPartition topicPartition, Handler<AsyncResult<OffsetAndMetadata>> handler) {
        this.submitTask((consumer, future) -> {
            OffsetAndMetadata result = consumer.committed(topicPartition);
            if (future != null) {
                future.complete((Object)result);
            }
        }, handler);
    }

    @Override
    public KafkaReadStream<K, V> seekToEnd(Set<TopicPartition> topicPartitions) {
        return this.seekToEnd(topicPartitions, null);
    }

    @Override
    public KafkaReadStream<K, V> seekToEnd(Set<TopicPartition> topicPartitions, Handler<AsyncResult<Void>> completionHandler) {
        this.submitTask((consumer, future) -> {
            consumer.seekToEnd((Collection)topicPartitions);
            if (future != null) {
                future.complete();
            }
        }, completionHandler);
        return this;
    }

    @Override
    public KafkaReadStream<K, V> seekToBeginning(Set<TopicPartition> topicPartitions) {
        return this.seekToBeginning(topicPartitions, null);
    }

    @Override
    public KafkaReadStream<K, V> seekToBeginning(Set<TopicPartition> topicPartitions, Handler<AsyncResult<Void>> completionHandler) {
        this.submitTask((consumer, future) -> {
            consumer.seekToBeginning((Collection)topicPartitions);
            if (future != null) {
                future.complete();
            }
        }, completionHandler);
        return this;
    }

    @Override
    public KafkaReadStream<K, V> seek(TopicPartition topicPartition, long offset) {
        return this.seek(topicPartition, offset, null);
    }

    @Override
    public KafkaReadStream<K, V> seek(TopicPartition topicPartition, long offset, Handler<AsyncResult<Void>> completionHandler) {
        this.submitTask((consumer, future) -> {
            consumer.seek(topicPartition, offset);
            if (future != null) {
                future.complete();
            }
        }, completionHandler);
        return this;
    }

    @Override
    public KafkaReadStream<K, V> partitionsRevokedHandler(Handler<Set<TopicPartition>> handler) {
        this.partitionsRevokedHandler = handler;
        return this;
    }

    @Override
    public KafkaReadStream<K, V> partitionsAssignedHandler(Handler<Set<TopicPartition>> handler) {
        this.partitionsAssignedHandler = handler;
        return this;
    }

    @Override
    public KafkaReadStream<K, V> subscribe(Set<String> topics) {
        return this.subscribe(topics, null);
    }

    @Override
    public KafkaReadStream<K, V> subscribe(Set<String> topics, Handler<AsyncResult<Void>> completionHandler) {
        if (this.recordHandler == null) {
            throw new IllegalStateException();
        }
        if (this.closed.compareAndSet(true, false)) {
            this.start((consumer, future) -> {
                consumer.subscribe((Collection)topics, this.rebalanceListener);
                this.resume();
                if (future != null) {
                    future.complete();
                }
            }, completionHandler);
        } else {
            this.submitTask((consumer, future) -> {
                consumer.subscribe((Collection)topics, this.rebalanceListener);
                if (future != null) {
                    future.complete();
                }
            }, completionHandler);
        }
        return this;
    }

    @Override
    public KafkaReadStream<K, V> unsubscribe() {
        return this.unsubscribe(null);
    }

    @Override
    public KafkaReadStream<K, V> unsubscribe(Handler<AsyncResult<Void>> completionHandler) {
        this.submitTask((consumer, future) -> {
            consumer.unsubscribe();
            if (future != null) {
                future.complete();
            }
        }, completionHandler);
        return this;
    }

    @Override
    public KafkaReadStream<K, V> subscription(Handler<AsyncResult<Set<String>>> handler) {
        this.submitTask((consumer, future) -> {
            Set subscription = consumer.subscription();
            if (future != null) {
                future.complete((Object)subscription);
            }
        }, handler);
        return this;
    }

    @Override
    public KafkaReadStream<K, V> assign(Set<TopicPartition> partitions) {
        return this.assign(partitions, null);
    }

    @Override
    public KafkaReadStream<K, V> assign(Set<TopicPartition> partitions, Handler<AsyncResult<Void>> completionHandler) {
        if (this.recordHandler == null) {
            throw new IllegalStateException();
        }
        if (this.closed.compareAndSet(true, false)) {
            this.start((consumer, future) -> {
                consumer.assign((Collection)partitions);
                this.resume();
                if (future != null) {
                    future.complete();
                }
            }, completionHandler);
        } else {
            this.submitTask((consumer, future) -> {
                consumer.assign((Collection)partitions);
                if (future != null) {
                    future.complete();
                }
            }, completionHandler);
        }
        return this;
    }

    @Override
    public KafkaReadStream<K, V> assignment(Handler<AsyncResult<Set<TopicPartition>>> handler) {
        this.submitTask((consumer, future) -> {
            Set partitions = consumer.assignment();
            if (future != null) {
                future.complete((Object)partitions);
            }
        }, handler);
        return this;
    }

    @Override
    public KafkaReadStream<K, V> listTopics(Handler<AsyncResult<Map<String, List<PartitionInfo>>>> handler) {
        this.submitTask((consumer, future) -> {
            Map topics = consumer.listTopics();
            if (future != null) {
                future.complete((Object)topics);
            }
        }, handler);
        return this;
    }

    @Override
    public void commit() {
        this.commit((Handler<AsyncResult<Map<TopicPartition, OffsetAndMetadata>>>)((Handler)null));
    }

    @Override
    public void commit(Handler<AsyncResult<Map<TopicPartition, OffsetAndMetadata>>> completionHandler) {
        this.commit(null, completionHandler);
    }

    @Override
    public void commit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        this.commit(offsets, null);
    }

    @Override
    public void commit(Map<TopicPartition, OffsetAndMetadata> offsets, Handler<AsyncResult<Map<TopicPartition, OffsetAndMetadata>>> completionHandler) {
        this.submitTask((consumer, future) -> {
            OffsetCommitCallback callback = (result, exception) -> {
                if (future != null) {
                    if (exception != null) {
                        future.fail((Throwable)exception);
                    } else {
                        future.complete((Object)result);
                    }
                }
            };
            if (offsets == null) {
                consumer.commitAsync(callback);
            } else {
                consumer.commitAsync(offsets, callback);
            }
        }, completionHandler);
    }

    @Override
    public KafkaReadStreamImpl<K, V> partitionsFor(String topic, Handler<AsyncResult<List<PartitionInfo>>> handler) {
        this.submitTask((consumer, future) -> {
            List partitions = consumer.partitionsFor(topic);
            if (future != null) {
                future.complete((Object)partitions);
            }
        }, handler);
        return this;
    }

    public KafkaReadStreamImpl<K, V> exceptionHandler(Handler<Throwable> handler) {
        return this;
    }

    public KafkaReadStreamImpl<K, V> handler(Handler<ConsumerRecord<K, V>> handler) {
        this.recordHandler = handler;
        return this;
    }

    public KafkaReadStreamImpl<K, V> pause() {
        this.paused.set(true);
        return this;
    }

    public KafkaReadStreamImpl<K, V> resume() {
        if (this.paused.compareAndSet(true, false)) {
            this.schedule(0L);
        }
        return this;
    }

    public KafkaReadStreamImpl<K, V> endHandler(Handler<Void> endHandler) {
        return this;
    }

    @Override
    public void close(Handler<AsyncResult<Void>> completionHandler) {
        if (this.closed.compareAndSet(false, true)) {
            this.worker.submit(() -> {
                this.consumer.close();
                this.context.runOnContext(v -> {
                    this.worker.shutdownNow();
                    if (completionHandler != null) {
                        completionHandler.handle((Object)Future.succeededFuture());
                    }
                });
            });
            this.consumer.wakeup();
        }
    }

    @Override
    public void position(TopicPartition partition, Handler<AsyncResult<Long>> handler) {
        this.submitTask((consumer, future) -> {
            long pos = this.consumer.position(partition);
            if (future != null) {
                future.complete((Object)pos);
            }
        }, handler);
    }

    @Override
    public void offsetsForTimes(Map<TopicPartition, Long> topicPartitionTimestamps, Handler<AsyncResult<Map<TopicPartition, OffsetAndTimestamp>>> handler) {
        this.submitTask((consumer, future) -> {
            Map offsetsForTimes = this.consumer.offsetsForTimes(topicPartitionTimestamps);
            if (future != null) {
                future.complete((Object)offsetsForTimes);
            }
        }, handler);
    }

    @Override
    public void offsetsForTimes(TopicPartition topicPartition, long timestamp, Handler<AsyncResult<OffsetAndTimestamp>> handler) {
        this.submitTask((consumer, future) -> {
            HashMap<TopicPartition, Long> input = new HashMap<TopicPartition, Long>();
            input.put(topicPartition, timestamp);
            Map offsetsForTimes = this.consumer.offsetsForTimes(input);
            if (future != null) {
                future.complete(offsetsForTimes.get(topicPartition));
            }
        }, handler);
    }

    @Override
    public void beginningOffsets(Set<TopicPartition> topicPartitions, Handler<AsyncResult<Map<TopicPartition, Long>>> handler) {
        this.submitTask((consumer, future) -> {
            Map beginningOffsets = this.consumer.beginningOffsets((Collection)topicPartitions);
            if (future != null) {
                future.complete((Object)beginningOffsets);
            }
        }, handler);
    }

    @Override
    public void beginningOffsets(TopicPartition topicPartition, Handler<AsyncResult<Long>> handler) {
        this.submitTask((consumer, future) -> {
            HashSet<TopicPartition> input = new HashSet<TopicPartition>();
            input.add(topicPartition);
            Map beginningOffsets = this.consumer.beginningOffsets(input);
            if (future != null) {
                future.complete(beginningOffsets.get(topicPartition));
            }
        }, handler);
    }

    @Override
    public void endOffsets(Set<TopicPartition> topicPartitions, Handler<AsyncResult<Map<TopicPartition, Long>>> handler) {
        this.submitTask((consumer, future) -> {
            Map endOffsets = this.consumer.endOffsets((Collection)topicPartitions);
            if (future != null) {
                future.complete((Object)endOffsets);
            }
        }, handler);
    }

    @Override
    public void endOffsets(TopicPartition topicPartition, Handler<AsyncResult<Long>> handler) {
        this.submitTask((consumer, future) -> {
            HashSet<TopicPartition> input = new HashSet<TopicPartition>();
            input.add(topicPartition);
            Map endOffsets = this.consumer.endOffsets(input);
            if (future != null) {
                future.complete(endOffsets.get(topicPartition));
            }
        }, handler);
    }

    @Override
    public Consumer<K, V> consumer() {
        return this.consumer;
    }
}

