package io.siddhi.core.aggregation.persistedaggregation;

import io.siddhi.core.event.ComplexEventChunk;
import java.sql.SQLException;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* JADX WARN: Classes with same name are omitted:
  input_file:dependencies/siddhi-core-5.1.24.jar:io/siddhi/core/aggregation/persistedaggregation/CudStreamProcessorQueueManager.class
 */
/* loaded from: input_file:io/siddhi/core/aggregation/persistedaggregation/CudStreamProcessorQueueManager.class */
public class CudStreamProcessorQueueManager implements Runnable {
    private static final Logger log = LogManager.getLogger((Class<?>) CudStreamProcessorQueueManager.class);
    private volatile boolean run = true;
    private LinkedBlockingQueue<QueuedCudStreamProcessor> cudStreamProcessorQueue;

    public LinkedBlockingQueue<QueuedCudStreamProcessor> initializeAndGetCudStreamProcessorQueue() {
        this.cudStreamProcessorQueue = new LinkedBlockingQueue<>();
        return this.cudStreamProcessorQueue;
    }

    public LinkedBlockingQueue<QueuedCudStreamProcessor> getCudStreamProcessorQueue() {
        return this.cudStreamProcessorQueue;
    }

    @Override // java.lang.Runnable
    public void run() {
        while (this.run) {
            QueuedCudStreamProcessor queuedCudStreamProcessor = null;
            try {
                queuedCudStreamProcessor = this.cudStreamProcessorQueue.take();
            } catch (InterruptedException e) {
                log.warn("Thread interrupted. Error when trying to retrieve queued values." + e.getMessage());
            }
            if (null != queuedCudStreamProcessor) {
                if (log.isDebugEnabled()) {
                    log.debug("Current queue size is = " + this.cudStreamProcessorQueue.size());
                }
                int i = 0;
                while (true) {
                    i++;
                    try {
                        ComplexEventChunk complexEventChunk = new ComplexEventChunk();
                        complexEventChunk.add(queuedCudStreamProcessor.getStreamEvent());
                        if (log.isDebugEnabled()) {
                            log.debug("Starting processing for duration " + queuedCudStreamProcessor.getDuration());
                        }
                        queuedCudStreamProcessor.getCudStreamProcessor().process(complexEventChunk);
                        complexEventChunk.clear();
                        if (log.isDebugEnabled()) {
                            log.debug("End processing for duration " + queuedCudStreamProcessor.getDuration());
                        }
                    } catch (Exception e2) {
                        if (!(e2.getCause() instanceof SQLException)) {
                            log.error("Error occurred while executing the aggregation for data between " + queuedCudStreamProcessor.getStartTimeOfNewAggregates() + " - " + queuedCudStreamProcessor.getEmittedTime() + " for duration \n" + queuedCudStreamProcessor.getDuration(), (Throwable) e2);
                            break;
                        } else if (!e2.getCause().getLocalizedMessage().contains("try restarting transaction") || i >= 3) {
                            log.error("Error occurred while executing the aggregation for data between " + queuedCudStreamProcessor.getStartTimeOfNewAggregates() + " - " + queuedCudStreamProcessor.getEmittedTime() + " for duration " + queuedCudStreamProcessor.getDuration() + ". Attempted re-executing the query for 9 seconds. This Should be investigated since this will lead to a data mismatch\n", (Throwable) e2);
                        } else {
                            log.error("Error occurred while executing the aggregation for data between " + queuedCudStreamProcessor.getStartTimeOfNewAggregates() + " - " + queuedCudStreamProcessor.getEmittedTime() + " for duration " + queuedCudStreamProcessor.getDuration() + " Retrying the transaction attempt " + (i - 1), (Throwable) e2);
                            try {
                                Thread.sleep(3000L);
                            } catch (InterruptedException e3) {
                                log.error("Thread sleep interrupted while waiting to re-execute the aggregation query for duration " + queuedCudStreamProcessor.getDuration(), (Throwable) e3);
                            }
                        }
                    }
                }
                log.error("Error occurred while executing the aggregation for data between " + queuedCudStreamProcessor.getStartTimeOfNewAggregates() + " - " + queuedCudStreamProcessor.getEmittedTime() + " for duration " + queuedCudStreamProcessor.getDuration() + ". Attempted re-executing the query for 9 seconds. This Should be investigated since this will lead to a data mismatch\n", (Throwable) e2);
            }
        }
    }
}
