package com.netflix.conductor.core.events.queue.dyno;

import com.google.common.annotations.VisibleForTesting;
import com.netflix.conductor.core.config.Configuration;
import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.core.events.queue.ObservableQueue;
import com.netflix.conductor.dao.QueueDAO;
import com.netflix.conductor.metrics.Monitors;
import com.netflix.conductor.service.WorkflowBulkService;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.functions.Action1;

@Singleton
/* loaded from: input_file:com/netflix/conductor/core/events/queue/dyno/DynoObservableQueue.class */
public class DynoObservableQueue implements ObservableQueue {
    private static final Logger logger = LoggerFactory.getLogger(DynoObservableQueue.class);
    private static final String QUEUE_TYPE = "conductor";
    private final String queueName;
    private final QueueDAO queueDAO;
    private final int pollTimeInMS;
    private final int longPollTimeout;
    private final int pollCount;

    /* JADX INFO: Access modifiers changed from: package-private */
    @Inject
    public DynoObservableQueue(String str, QueueDAO queueDAO, Configuration configuration) {
        this.queueName = str;
        this.queueDAO = queueDAO;
        this.pollTimeInMS = configuration.getIntProperty("workflow.dyno.queues.pollingInterval", 100);
        this.pollCount = configuration.getIntProperty("workflow.dyno.queues.pollCount", 10);
        this.longPollTimeout = configuration.getIntProperty("workflow.dyno.queues.longPollTimeout", WorkflowBulkService.MAX_REQUEST_ITEMS);
    }

    @Override // com.netflix.conductor.core.events.queue.ObservableQueue
    public Observable<Message> observe() {
        return Observable.create(getOnSubscribe());
    }

    @Override // com.netflix.conductor.core.events.queue.ObservableQueue
    public List<String> ack(List<Message> list) {
        Iterator<Message> it = list.iterator();
        while (it.hasNext()) {
            this.queueDAO.remove(this.queueName, it.next().getId());
        }
        return (List) list.stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList());
    }

    @Override // com.netflix.conductor.core.events.queue.ObservableQueue
    public void setUnackTimeout(Message message, long j) {
        this.queueDAO.setUnackTimeout(this.queueName, message.getId(), j);
    }

    @Override // com.netflix.conductor.core.events.queue.ObservableQueue
    public void publish(List<Message> list) {
        this.queueDAO.push(this.queueName, list);
    }

    @Override // com.netflix.conductor.core.events.queue.ObservableQueue
    public long size() {
        return this.queueDAO.getSize(this.queueName);
    }

    @Override // com.netflix.conductor.core.events.queue.ObservableQueue
    public String getType() {
        return "conductor";
    }

    @Override // com.netflix.conductor.core.events.queue.ObservableQueue
    public String getName() {
        return this.queueName;
    }

    @Override // com.netflix.conductor.core.events.queue.ObservableQueue
    public String getURI() {
        return this.queueName;
    }

    @VisibleForTesting
    private List<Message> receiveMessages() {
        try {
            List<Message> pollMessages = this.queueDAO.pollMessages(this.queueName, this.pollCount, this.longPollTimeout);
            Monitors.recordEventQueueMessagesProcessed("conductor", this.queueName, pollMessages.size());
            return pollMessages;
        } catch (Exception e) {
            logger.error("Exception while getting messages from  queueDAO", e);
            Monitors.recordObservableQMessageReceivedErrors("conductor");
            return new ArrayList();
        }
    }

    @VisibleForTesting
    private Observable.OnSubscribe<Message> getOnSubscribe() {
        return subscriber -> {
            Observable flatMap = Observable.interval(this.pollTimeInMS, TimeUnit.MILLISECONDS).flatMap(l -> {
                return Observable.from(receiveMessages());
            });
            subscriber.getClass();
            Action1 action1 = (v1) -> {
                r1.onNext(v1);
            };
            subscriber.getClass();
            flatMap.subscribe(action1, subscriber::onError);
        };
    }
}
