/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.core.aggregation.persistedaggregation;

import io.siddhi.core.aggregation.persistedaggregation.QueuedCudStreamProcessor;
import io.siddhi.core.event.ComplexEventChunk;
import io.siddhi.core.event.stream.StreamEvent;
import java.sql.SQLException;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class CudStreamProcessorQueueManager
implements Runnable {
    private static final Logger log = LogManager.getLogger(CudStreamProcessorQueueManager.class);
    private volatile boolean run = true;
    private LinkedBlockingQueue<QueuedCudStreamProcessor> cudStreamProcessorQueue;

    public LinkedBlockingQueue<QueuedCudStreamProcessor> initializeAndGetCudStreamProcessorQueue() {
        this.cudStreamProcessorQueue = new LinkedBlockingQueue();
        return this.cudStreamProcessorQueue;
    }

    public LinkedBlockingQueue<QueuedCudStreamProcessor> getCudStreamProcessorQueue() {
        return this.cudStreamProcessorQueue;
    }

    @Override
    public void run() {
        block6: while (this.run) {
            QueuedCudStreamProcessor queuedCudStreamProcessor = null;
            try {
                queuedCudStreamProcessor = this.cudStreamProcessorQueue.take();
            }
            catch (InterruptedException e) {
                log.warn("Thread interrupted. Error when trying to retrieve queued values." + e.getMessage());
            }
            if (null == queuedCudStreamProcessor) continue;
            if (log.isDebugEnabled()) {
                log.debug("Current queue size is = " + this.cudStreamProcessorQueue.size());
            }
            int i = 0;
            while (true) {
                ++i;
                try {
                    ComplexEventChunk<StreamEvent> complexEventChunk = new ComplexEventChunk<StreamEvent>();
                    complexEventChunk.add(queuedCudStreamProcessor.getStreamEvent());
                    if (log.isDebugEnabled()) {
                        log.debug("Starting processing for duration " + queuedCudStreamProcessor.getDuration());
                    }
                    queuedCudStreamProcessor.getCudStreamProcessor().process(complexEventChunk);
                    complexEventChunk.clear();
                    if (!log.isDebugEnabled()) continue block6;
                    log.debug("End processing for duration " + queuedCudStreamProcessor.getDuration());
                }
                catch (Exception e) {
                    if (e.getCause() instanceof SQLException) {
                        if (e.getCause().getLocalizedMessage().contains("try restarting transaction") && i < 3) {
                            log.error("Error occurred while executing the aggregation for data between " + queuedCudStreamProcessor.getStartTimeOfNewAggregates() + " - " + queuedCudStreamProcessor.getEmittedTime() + " for duration " + queuedCudStreamProcessor.getDuration() + " Retrying the transaction attempt " + (i - 1), (Throwable)e);
                            try {
                                Thread.sleep(3000L);
                            }
                            catch (InterruptedException interruptedException) {
                                log.error("Thread sleep interrupted while waiting to re-execute the aggregation query for duration " + queuedCudStreamProcessor.getDuration(), (Throwable)interruptedException);
                            }
                            continue;
                        }
                        log.error("Error occurred while executing the aggregation for data between " + queuedCudStreamProcessor.getStartTimeOfNewAggregates() + " - " + queuedCudStreamProcessor.getEmittedTime() + " for duration " + queuedCudStreamProcessor.getDuration() + ". Attempted re-executing the query for 9 seconds. This Should be investigated since this will lead to a data mismatch\n", (Throwable)e);
                        continue block6;
                    }
                    log.error("Error occurred while executing the aggregation for data between " + queuedCudStreamProcessor.getStartTimeOfNewAggregates() + " - " + queuedCudStreamProcessor.getEmittedTime() + " for duration \n" + queuedCudStreamProcessor.getDuration(), (Throwable)e);
                }
                break;
            }
        }
    }
}

