package com.netflix.logging.messaging;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.netflix.blitz4j.BlitzConfig;
import com.netflix.blitz4j.LoggingConfiguration;
import com.netflix.servo.annotations.DataSourceType;
import com.netflix.servo.annotations.Monitor;
import com.netflix.servo.monitor.Counter;
import com.netflix.servo.monitor.Monitors;
import com.netflix.servo.monitor.Stopwatch;
import com.netflix.servo.monitor.Timer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/* loaded from: input_file:WEB-INF/lib/blitz4j-1.18.jar:com/netflix/logging/messaging/MessageBatcher.class */
public class MessageBatcher<T> {
    private static final BlitzConfig CONFIGURATION = LoggingConfiguration.getInstance().getConfiguration();
    private static final String DOT = ".";
    private static final String BATCHER_PREFIX = "batcher.";
    private static final String COLLECTOR_SUFFIX = ".collector";
    private boolean shouldCollectorShutdown;
    List<Object> batch;
    protected String name;
    protected BlockingQueue queue;
    protected int maxMessages;
    protected static long maxDelay;
    protected MessageBatcher<T>.Collector collector;
    protected ThreadPoolExecutor processor;
    protected MessageProcessor target;
    protected Timer queueSizeTracer;
    protected Timer batchSyncPutTracer;
    protected Timer threadSubmitTracer;
    protected Timer processTimeTracer;
    protected Timer avgBatchSizeTracer;
    protected Counter queueOverflowCounter;
    private boolean isShutDown;
    private boolean blockingProperty;
    private boolean isCollectorPaused;
    private Counter processCount;
    public static final String POOL_MAX_THREADS = "maxThreads";
    public static final String POOL_MIN_THREADS = "minThreads";
    public static final String POOL_KEEP_ALIVE_TIME = "keepAliveTime";
    protected final AtomicInteger concurrentBatches = new AtomicInteger(0);
    private AtomicLong numberAdded = new AtomicLong();
    private AtomicLong numberDropped = new AtomicLong();

    /* loaded from: input_file:WEB-INF/lib/blitz4j-1.18.jar:com/netflix/logging/messaging/MessageBatcher$Collector.class */
    private class Collector extends Thread {
        private static final int SLEEP_TIME_MS = 1;
        private Timer processTimeTracer;
        private Counter rejectedCounter;
        private static final int RETRY_EXECUTION_TIMEOUT_MS = 1;
        private final MessageBatcher stream;
        private final Timer queueSizeTracer;

        public Collector(MessageBatcher messageBatcher, String str) {
            super(str);
            this.rejectedCounter = Monitors.newCounter(MessageBatcher.this.processCount + ".rejected");
            this.processTimeTracer = Monitors.newTimer(str + ".processTime");
            this.stream = messageBatcher;
            this.queueSizeTracer = Monitors.newTimer(str + ".queue_size_at_drain");
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            boolean z;
            while (!MessageBatcher.this.shouldCollectorShutdown) {
                if (MessageBatcher.this.isCollectorPaused) {
                    try {
                        Thread.sleep(1L);
                    } catch (InterruptedException e) {
                    }
                } else {
                    try {
                        if (MessageBatcher.this.batch.size() < this.stream.maxMessages) {
                            long nanoTime = System.nanoTime();
                            do {
                                if (this.stream.queue.drainTo(MessageBatcher.this.batch, this.stream.maxMessages - MessageBatcher.this.batch.size()) <= 0) {
                                    MessageBatcher messageBatcher = this.stream;
                                    long j = (nanoTime + MessageBatcher.maxDelay) - nanoTime;
                                    if (j <= 0) {
                                        break;
                                    }
                                    Object obj = null;
                                    try {
                                        obj = this.stream.queue.poll(j, TimeUnit.NANOSECONDS);
                                    } catch (InterruptedException e2) {
                                    }
                                    if (obj == null) {
                                        break;
                                    }
                                    MessageBatcher.this.batch.add(obj);
                                    nanoTime = System.nanoTime();
                                }
                            } while (MessageBatcher.this.batch.size() < this.stream.maxMessages);
                        }
                        int size = MessageBatcher.this.batch.size();
                        if (size > 0) {
                            try {
                                this.queueSizeTracer.record(this.stream.queue.size());
                            } catch (Exception e3) {
                            }
                            MessageBatcher.this.avgBatchSizeTracer.record(size);
                            Stopwatch start = this.processTimeTracer.start();
                            do {
                                try {
                                    this.stream.processor.execute(new ProcessMessages(this.stream, MessageBatcher.this.batch));
                                    z = false;
                                } catch (RejectedExecutionException e4) {
                                    this.rejectedCounter.increment();
                                    z = true;
                                    Thread.sleep(1L);
                                }
                            } while (z);
                            MessageBatcher.this.processCount.increment(size);
                            start.stop();
                            MessageBatcher.this.batch = new ArrayList(this.stream.maxMessages);
                        }
                    } catch (Throwable th) {
                        if (MessageBatcher.CONFIGURATION.shouldPrintLoggingErrors()) {
                            th.printStackTrace();
                        }
                    }
                }
            }
        }
    }

    /* loaded from: input_file:WEB-INF/lib/blitz4j-1.18.jar:com/netflix/logging/messaging/MessageBatcher$ProcessMessages.class */
    private static class ProcessMessages implements Runnable {
        private final MessageBatcher stream;
        private List batch;
        private Timer processMessagesTracer;
        private Timer avgConcurrentBatches;

        public ProcessMessages(MessageBatcher messageBatcher, List list) {
            this.stream = messageBatcher;
            this.batch = list;
            this.processMessagesTracer = messageBatcher.processTimeTracer;
            this.avgConcurrentBatches = Monitors.newTimer(messageBatcher.name + ".concurrentBatches");
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                if (this.batch == null) {
                    return;
                }
                try {
                    this.avgConcurrentBatches.record(this.stream.concurrentBatches.incrementAndGet());
                    Stopwatch start = this.processMessagesTracer.start();
                    this.stream.target.process(this.batch);
                    start.stop();
                    this.stream.concurrentBatches.decrementAndGet();
                } catch (Throwable th) {
                    this.stream.concurrentBatches.decrementAndGet();
                    throw th;
                }
            } catch (Throwable th2) {
                th2.printStackTrace();
            }
        }
    }

    public MessageBatcher(String str, MessageProcessor messageProcessor) {
        this.target = null;
        this.name = BATCHER_PREFIX + str;
        this.target = messageProcessor;
        this.queue = new ArrayBlockingQueue(CONFIGURATION.getBatcherQueueMaxMessages(this.name));
        setBatchMaxMessages(CONFIGURATION.getBatchSize(this.name));
        this.batch = new ArrayList(this.maxMessages);
        setBatchMaxDelay(CONFIGURATION.getBatcherMaxDelay(this.name));
        this.collector = new Collector(this, this.name + COLLECTOR_SUFFIX);
        createProcessor(this.name);
        this.queueSizeTracer = Monitors.newTimer("queue_size");
        this.batchSyncPutTracer = Monitors.newTimer("waitTimeforBuffer");
        this.avgBatchSizeTracer = Monitors.newTimer("batch_size");
        this.processCount = Monitors.newCounter("messages_processed");
        this.threadSubmitTracer = Monitors.newTimer("thread_invocation_time");
        this.processTimeTracer = Monitors.newTimer("message_processTime");
        this.queueOverflowCounter = Monitors.newCounter("queue_overflow");
        this.blockingProperty = CONFIGURATION.shouldWaitWhenBatcherQueueNotEmpty(this.name);
        this.collector.setDaemon(true);
        this.collector.start();
        try {
            Monitors.registerObject(this.name, this);
        } catch (Throwable th) {
            if (CONFIGURATION.shouldPrintLoggingErrors()) {
                th.printStackTrace();
            }
        }
    }

    public synchronized void setTarget(MessageProcessor messageProcessor) {
        this.target = messageProcessor;
    }

    public synchronized void setBatchMaxMessages(int i) {
        this.maxMessages = i;
    }

    public synchronized void setBatchMaxDelay(double d) {
        maxDelay = (long) (d * 1.0E9d);
    }

    void setProcessorMaxThreads(int i) {
        if (this.processor.getCorePoolSize() > i) {
            this.processor.setCorePoolSize(i);
        }
        this.processor.setMaximumPoolSize(i);
    }

    public boolean isSpaceAvailable() {
        return this.queue.remainingCapacity() > 0;
    }

    public boolean process(T t) {
        if (this.isShutDown) {
            return false;
        }
        try {
            this.queueSizeTracer.record(this.queue.size());
        } catch (Throwable th) {
        }
        if (this.queue.offer(t)) {
            this.numberAdded.incrementAndGet();
            return true;
        }
        this.numberDropped.incrementAndGet();
        this.queueOverflowCounter.increment();
        return false;
    }

    public void processSync(T t) {
        if (this.isShutDown) {
            return;
        }
        try {
            this.queueSizeTracer.record(this.queue.size());
        } catch (Throwable th) {
        }
        try {
            Stopwatch start = this.batchSyncPutTracer.start();
            this.queue.put(t);
            start.stop();
            this.numberAdded.incrementAndGet();
        } catch (InterruptedException e) {
        }
    }

    public void process(List<T> list) {
        for (T t : list) {
            if (this.isShutDown) {
                return;
            } else {
                process((MessageBatcher<T>) t);
            }
        }
    }

    public void process(List<T> list, boolean z) {
        for (T t : list) {
            if (this.isShutDown) {
                return;
            }
            if (z) {
                processSync(t);
            } else {
                process((MessageBatcher<T>) t);
            }
        }
    }

    public void pause() {
        if (this.isShutDown) {
            return;
        }
        this.isCollectorPaused = true;
    }

    public boolean isPaused() {
        return this.isCollectorPaused;
    }

    public void resume() {
        if (this.isShutDown) {
            return;
        }
        this.isCollectorPaused = false;
    }

    public void stop() {
        long batcherWaitTimeBeforeShutdown = CONFIGURATION.getBatcherWaitTimeBeforeShutdown(this.name) + System.currentTimeMillis();
        while (true) {
            if ((this.queue.size() > 0 || this.batch.size() > 0) && System.currentTimeMillis() < batcherWaitTimeBeforeShutdown) {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                }
            }
            try {
                this.shouldCollectorShutdown = true;
                this.processor.shutdownNow();
            } catch (Throwable th) {
                th.printStackTrace();
            }
            this.isShutDown = true;
            return;
        }
    }

    @Monitor(name = "batcherQueueSize", type = DataSourceType.GAUGE)
    public int getSize() {
        if (this.queue != null) {
            return this.queue.size();
        }
        return 0;
    }

    public void resetNumberAdded() {
        this.numberAdded.set(0L);
    }

    public void resetNumberDropped() {
        this.numberDropped.set(0L);
    }

    @Monitor(name = "numberAdded", type = DataSourceType.GAUGE)
    public long getNumberAdded() {
        return this.numberAdded.get();
    }

    @Monitor(name = "numberDropped", type = DataSourceType.GAUGE)
    public long getNumberDropped() {
        return this.numberDropped.get();
    }

    @Monitor(name = "blocking", type = DataSourceType.INFORMATIONAL)
    public boolean isBlocking() {
        return this.blockingProperty;
    }

    private void createProcessor(String str) {
        this.processor = new ThreadPoolExecutor(CONFIGURATION.getBatcherMinThreads(this.name), CONFIGURATION.getBatcherMaxThreads(this.name), CONFIGURATION.getBatcherThreadKeepAliveTime(this.name), TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactoryBuilder().setDaemon(false).setNameFormat(this.name + "-process").build());
        if (CONFIGURATION.shouldRejectWhenAllBatcherThreadsUsed(this.name)) {
            return;
        }
        this.processor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy() { // from class: com.netflix.logging.messaging.MessageBatcher.1
            @Override // java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy, java.util.concurrent.RejectedExecutionHandler
            public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
                super.rejectedExecution(runnable, threadPoolExecutor);
            }
        });
    }
}
