package org.shoal.ha.cache.impl.interceptor;

import java.io.IOException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.shoal.adapter.store.commands.NoOpCommand;
import org.shoal.ha.cache.api.DataStoreContext;
import org.shoal.ha.cache.api.ShoalCacheLoggerConstants;
import org.shoal.ha.cache.impl.command.Command;
import org.shoal.ha.cache.impl.util.ASyncReplicationManager;

/* loaded from: input_file:org/shoal/ha/cache/impl/interceptor/ReplicationCommandTransmitterWithList.class */
public class ReplicationCommandTransmitterWithList<K, V> implements Runnable, CommandCollector<K, V> {
    private DataStoreContext<K, V> dsc;
    private volatile String targetName;
    private ScheduledFuture future;
    private static final String TRANSMITTER_FREQUECNCY_PROP_NAME = "org.shoal.cache.transmitter.frequency.in.millis";
    private static final String MAX_BATCH_SIZE_PROP_NAME = "org.shoal.cache.transmitter.max.batch.size";
    private AtomicReference<ReplicationCommandTransmitterWithList<K, V>.BatchedCommandListDataFrame> mapRef;
    ThreadPoolExecutor executor;
    private static final Logger _logger = Logger.getLogger(ShoalCacheLoggerConstants.CACHE_TRANSMIT_INTERCEPTOR);
    private static int TRANSMITTER_FREQUECNCY_IN_MILLIS = 100;
    private int MAX_BATCH_SIZE = 20;
    ASyncReplicationManager asyncReplicationManager = ASyncReplicationManager._getInstance();
    private volatile long timeStamp = System.currentTimeMillis();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/shoal/ha/cache/impl/interceptor/ReplicationCommandTransmitterWithList$BatchedCommandListDataFrame.class */
    public class BatchedCommandListDataFrame implements Runnable {
        private AtomicInteger current;
        private transient ConcurrentLinkedQueue<Command> list;
        private long batchCreationTime;

        private BatchedCommandListDataFrame() {
            this.current = new AtomicInteger(-1);
            this.list = new ConcurrentLinkedQueue<>();
            this.batchCreationTime = System.currentTimeMillis();
        }

        public boolean addCommand(Command command) {
            int incrementAndGet = this.current.incrementAndGet();
            if (incrementAndGet < ReplicationCommandTransmitterWithList.this.MAX_BATCH_SIZE) {
                this.list.add(command);
                if (this.list.size() == ReplicationCommandTransmitterWithList.this.MAX_BATCH_SIZE) {
                    ReplicationCommandTransmitterWithList.this.asyncReplicationManager.getExecutorService().submit(this);
                }
            }
            return incrementAndGet < ReplicationCommandTransmitterWithList.this.MAX_BATCH_SIZE;
        }

        boolean isTimeToFlush(long j) {
            return this.batchCreationTime == j && this.list.size() > 0;
        }

        long getBatchCreationTime() {
            return this.batchCreationTime;
        }

        @Override // java.lang.Runnable
        public void run() {
            ReplicationFramePayloadCommand replicationFramePayloadCommand = new ReplicationFramePayloadCommand();
            replicationFramePayloadCommand.setTargetInstance(ReplicationCommandTransmitterWithList.this.targetName);
            try {
                int size = this.list.size();
                for (int i = 0; i < size; i++) {
                    Command poll = this.list.poll();
                    if (poll.getOpcode() != 102) {
                        replicationFramePayloadCommand.addComamnd(poll);
                    }
                }
                ReplicationCommandTransmitterWithList.this.dsc.getCommandManager().execute(replicationFramePayloadCommand);
            } catch (IOException e) {
                ReplicationCommandTransmitterWithList._logger.log(Level.WARNING, "Batch operation (ASyncCommandList failed...", (Throwable) e);
            }
        }
    }

    @Override // org.shoal.ha.cache.impl.interceptor.CommandCollector
    public void initialize(String str, DataStoreContext<K, V> dataStoreContext) {
        this.executor = ASyncReplicationManager._getInstance().getExecutorService();
        this.targetName = str;
        this.dsc = dataStoreContext;
        try {
            TRANSMITTER_FREQUECNCY_IN_MILLIS = Integer.getInteger(System.getProperty(TRANSMITTER_FREQUECNCY_PROP_NAME, "" + TRANSMITTER_FREQUECNCY_IN_MILLIS)).intValue();
        } catch (Exception e) {
        }
        try {
            this.MAX_BATCH_SIZE = Integer.getInteger(System.getProperty(MAX_BATCH_SIZE_PROP_NAME, "" + this.MAX_BATCH_SIZE)).intValue();
        } catch (Exception e2) {
        }
        this.mapRef = new AtomicReference<>(new BatchedCommandListDataFrame());
        this.future = this.asyncReplicationManager.getScheduledThreadPoolExecutor().scheduleAtFixedRate(this, TRANSMITTER_FREQUECNCY_IN_MILLIS, TRANSMITTER_FREQUECNCY_IN_MILLIS, TimeUnit.MILLISECONDS);
    }

    @Override // org.shoal.ha.cache.impl.interceptor.CommandCollector
    public void close() {
        try {
            this.future.cancel(false);
        } catch (Exception e) {
        }
    }

    @Override // org.shoal.ha.cache.impl.interceptor.CommandCollector
    public void addCommand(Command<K, V> command) {
        boolean z = false;
        while (!z) {
            ReplicationCommandTransmitterWithList<K, V>.BatchedCommandListDataFrame batchedCommandListDataFrame = this.mapRef.get();
            z = batchedCommandListDataFrame.addCommand(command);
            if (!z) {
                ReplicationCommandTransmitterWithList<K, V>.BatchedCommandListDataFrame batchedCommandListDataFrame2 = new BatchedCommandListDataFrame();
                batchedCommandListDataFrame2.addCommand(command);
                z = this.mapRef.compareAndSet(batchedCommandListDataFrame, batchedCommandListDataFrame2);
            }
        }
    }

    @Override // org.shoal.ha.cache.impl.interceptor.CommandCollector
    public void removeCommand(Command<K, V> command) {
        addCommand(command);
    }

    private void sendMessage(byte[] bArr) {
        this.dsc.getGroupService().sendMessage(this.targetName, this.dsc.getServiceName(), bArr);
        if (_logger.isLoggable(Level.FINE)) {
            _logger.log(Level.FINE, this.dsc.getServiceName() + ": ReplicationCommandTransmitterWithList.onTransmit() Sent " + (this.targetName == null ? " ALL MEMBERS " : this.targetName) + "; size: " + bArr.length);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        ReplicationCommandTransmitterWithList<K, V>.BatchedCommandListDataFrame batchedCommandListDataFrame = this.mapRef.get();
        if (batchedCommandListDataFrame.isTimeToFlush(this.timeStamp)) {
            do {
            } while (batchedCommandListDataFrame.addCommand(new NoOpCommand()));
        }
        this.timeStamp = batchedCommandListDataFrame.getBatchCreationTime();
    }
}
