package org.apache.activemq.artemis.shaded.org.jgroups.protocols;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.shaded.org.jgroups.Address;
import org.apache.activemq.artemis.shaded.org.jgroups.Message;
import org.apache.activemq.artemis.shaded.org.jgroups.NullAddress;
import org.apache.activemq.artemis.shaded.org.jgroups.View;
import org.apache.activemq.artemis.shaded.org.jgroups.annotations.Experimental;
import org.apache.activemq.artemis.shaded.org.jgroups.annotations.ManagedAttribute;
import org.apache.activemq.artemis.shaded.org.jgroups.annotations.Property;
import org.apache.activemq.artemis.shaded.org.jgroups.conf.AttributeType;
import org.apache.activemq.artemis.shaded.org.jgroups.util.ByteArrayDataOutputStream;
import org.apache.activemq.artemis.shaded.org.jgroups.util.TimeScheduler;
import org.apache.activemq.artemis.shaded.org.jgroups.util.Util;

@Experimental
/* loaded from: input_file:org/apache/activemq/artemis/shaded/org/jgroups/protocols/BatchBundler.class */
public class BatchBundler extends NoBundler {

    @ManagedAttribute(description = "Local address")
    protected volatile Address local_addr;

    @ManagedAttribute(description = "Number of messages sent in BatchMessages", type = AttributeType.SCALAR)
    protected long num_msgs_sent;

    @ManagedAttribute(description = "Number of BatchMessages sent", type = AttributeType.SCALAR)
    protected long num_ebs_sent;

    @ManagedAttribute(description = "Number of BatchMessages sent because the queue was full", type = AttributeType.SCALAR)
    protected long num_ebs_sent_due_to_full_queue;

    @ManagedAttribute(description = "Number of BatchMessages sent because the max number of messages has been reached (max_batch_size)", type = AttributeType.SCALAR)
    protected long num_ebs_sent_due_to_max_number_of_msgs;

    @ManagedAttribute(description = "Number of BatchMessages sent because the timeout kicked in", type = AttributeType.SCALAR)
    protected long num_ebs_sent_due_to_timeout;
    protected TimeScheduler timer;
    protected volatile boolean running;
    protected Future<?> flush_task;

    @Property(name = "max_size", type = AttributeType.BYTES, description = "Maximum number of bytes for messages to be queued until they are sent")
    protected int max_size = 64000;

    @Property(description = "Max interval (millis) at which the queued messages are sent")
    protected long flush_interval = 100;

    @Property(description = "The maximum number of messages per batch")
    public int max_batch_size = 1000;
    protected ConcurrentMap<Address, Buffer> msgMap = Util.createConcurrentMap();
    protected final NullAddress nullAddress = new NullAddress();

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/activemq/artemis/shaded/org/jgroups/protocols/BatchBundler$Buffer.class */
    public class Buffer {
        private final Address dest;
        private final Message[] msgs;
        private int index = 0;
        private boolean closed;
        private long total_bytes;
        private final ByteArrayDataOutputStream output;

        protected Buffer(Address address) {
            this.dest = address;
            this.msgs = new Message[BatchBundler.this.max_batch_size];
            this.output = new ByteArrayDataOutputStream(BatchBundler.this.max_size + 5);
        }

        protected synchronized boolean addMessage(Message message) {
            if (this.closed) {
                return false;
            }
            int length = message.getLength();
            if (this.total_bytes + length > BatchBundler.this.max_size) {
                BatchBundler.this.num_ebs_sent_due_to_full_queue++;
                sendBatch(false);
            }
            Message[] messageArr = this.msgs;
            int i = this.index;
            this.index = i + 1;
            messageArr[i] = message;
            this.total_bytes += length;
            if (this.index != this.msgs.length) {
                return true;
            }
            BatchBundler.this.num_ebs_sent_due_to_max_number_of_msgs++;
            sendBatch(false);
            return true;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public synchronized void sendBatch(boolean z) {
            if (this.index == 0) {
                return;
            }
            if (this.index == 1) {
                BatchBundler.this._send(this.msgs[0], this.output);
                this.msgs[0] = null;
                this.index = 0;
                this.total_bytes = 0L;
                BatchBundler.this.num_msgs_sent++;
                return;
            }
            sendMessageList(this.dest instanceof NullAddress ? null : this.dest, BatchBundler.this.local_addr, this.msgs, this.index);
            BatchBundler.this.num_msgs_sent += this.index;
            BatchBundler.this.num_ebs_sent++;
            if (z) {
                BatchBundler.this.num_ebs_sent_due_to_timeout++;
            }
            this.index = 0;
            this.total_bytes = 0L;
        }

        protected void sendMessageList(Address address, Address address2, Message[] messageArr, int i) {
            try {
                this.output.position(0);
                Util.writeMessageList(address, address2, BatchBundler.this.transport.cluster_name.chars(), messageArr, 0, i, this.output, address == null);
                BatchBundler.this.transport.doSend(this.output.buffer(), 0, this.output.position(), address);
            } catch (Throwable th) {
                BatchBundler.this.log.trace(Util.getMessage("FailureSendingMsgBundle"), BatchBundler.this.transport.getAddress(), th);
            }
        }

        protected synchronized void close() {
            this.closed = true;
            sendBatch(false);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/activemq/artemis/shaded/org/jgroups/protocols/BatchBundler$FlushTask.class */
    public class FlushTask implements Runnable {
        protected FlushTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            BatchBundler.this.flush();
        }

        public String toString() {
            return BatchBundler.class.getSimpleName() + ": FlushTask (interval=" + BatchBundler.this.flush_interval + " ms)";
        }
    }

    @ManagedAttribute(description = "Average number of messages in an BatchMessage")
    public double avgBatchSize() {
        if (this.num_ebs_sent == 0 || this.num_msgs_sent == 0) {
            return 0.0d;
        }
        return this.num_msgs_sent / this.num_ebs_sent;
    }

    @Override // org.apache.activemq.artemis.shaded.org.jgroups.protocols.Bundler
    public void resetStats() {
        this.num_msgs_sent = 0L;
        this.num_ebs_sent = 0L;
        this.num_ebs_sent_due_to_full_queue = 0L;
        this.num_ebs_sent_due_to_timeout = 0L;
        this.num_ebs_sent_due_to_max_number_of_msgs = 0L;
    }

    @Override // org.apache.activemq.artemis.shaded.org.jgroups.protocols.Bundler
    public void viewChange(View view) {
        List<Address> members = view.getMembers();
        if (members == null) {
            return;
        }
        members.stream().filter(address -> {
            return !this.msgMap.containsKey(address);
        }).forEach(address2 -> {
            this.msgMap.putIfAbsent(address2, new Buffer(address2));
        });
        this.msgMap.keySet().stream().filter(address3 -> {
            return (members.contains(address3) || (address3 instanceof NullAddress)) ? false : true;
        }).forEach(address4 -> {
            this.msgMap.remove(address4).close();
        });
    }

    @Override // org.apache.activemq.artemis.shaded.org.jgroups.protocols.NoBundler, org.apache.activemq.artemis.shaded.org.jgroups.protocols.Bundler
    public void init(TP tp) {
        super.init(tp);
    }

    @Override // org.apache.activemq.artemis.shaded.org.jgroups.protocols.NoBundler, org.apache.activemq.artemis.shaded.org.jgroups.protocols.Bundler
    public void start() {
        this.timer = this.transport.getTimer();
        if (this.timer == null) {
            throw new RuntimeException("timer is null");
        }
        this.local_addr = (Address) Objects.requireNonNull(this.transport.getAddress());
        this.running = true;
        startFlushTask();
    }

    @Override // org.apache.activemq.artemis.shaded.org.jgroups.protocols.NoBundler, org.apache.activemq.artemis.shaded.org.jgroups.protocols.Bundler
    public void stop() {
        this.running = false;
        stopFlushTask();
    }

    @Override // org.apache.activemq.artemis.shaded.org.jgroups.protocols.NoBundler, org.apache.activemq.artemis.shaded.org.jgroups.protocols.Bundler
    public void send(Message message) throws Exception {
        if (message.isFlagSet(Message.Flag.OOB)) {
            super.send(message);
            return;
        }
        if (message.getSrc() == null) {
            message.setSrc(this.local_addr);
        }
        if (!Objects.equals(message.getSrc(), this.local_addr)) {
            super.send(message);
            return;
        }
        Address dest = message.dest() == null ? this.nullAddress : message.dest();
        Buffer buffer = this.msgMap.get(dest);
        if (buffer == null) {
            buffer = this.msgMap.computeIfAbsent(dest, address -> {
                return new Buffer(dest);
            });
        }
        if (buffer.addMessage(message)) {
            return;
        }
        super.send(message);
    }

    @Override // org.apache.activemq.artemis.shaded.org.jgroups.protocols.NoBundler, org.apache.activemq.artemis.shaded.org.jgroups.protocols.Bundler
    public int size() {
        return 0;
    }

    @Override // org.apache.activemq.artemis.shaded.org.jgroups.protocols.NoBundler, org.apache.activemq.artemis.shaded.org.jgroups.protocols.Bundler
    public int getQueueSize() {
        return 0;
    }

    @Override // org.apache.activemq.artemis.shaded.org.jgroups.protocols.Bundler
    public int getCapacity() {
        return this.max_batch_size;
    }

    @Override // org.apache.activemq.artemis.shaded.org.jgroups.protocols.NoBundler, org.apache.activemq.artemis.shaded.org.jgroups.protocols.Bundler
    public int getMaxSize() {
        return this.max_size;
    }

    @Override // org.apache.activemq.artemis.shaded.org.jgroups.protocols.Bundler
    public Bundler setMaxSize(int i) {
        this.max_size = i;
        return this;
    }

    protected void startFlushTask() {
        if (this.flush_task == null || this.flush_task.isDone()) {
            this.flush_task = this.timer.scheduleWithFixedDelay(new FlushTask(), 0L, this.flush_interval, TimeUnit.MILLISECONDS, true);
        }
    }

    protected void stopFlushTask() {
        if (this.flush_task != null) {
            this.flush_task.cancel(true);
            this.flush_task = null;
        }
    }

    protected void _send(Message message, ByteArrayDataOutputStream byteArrayDataOutputStream) {
        try {
            sendSingleMessage(message, byteArrayDataOutputStream);
        } catch (Exception e) {
            this.log.error("%s: failed sending message: %s", this.local_addr, e);
        }
    }

    public void flush() {
        this.msgMap.forEach((address, buffer) -> {
            buffer.sendBatch(true);
        });
    }
}
