package backtype.storm.messaging.netty;

import backtype.storm.Config;
import backtype.storm.messaging.ConnectionWithStatus;
import backtype.storm.messaging.TaskMessage;
import backtype.storm.metric.api.IStatefulObject;
import backtype.storm.utils.StormBoundedExponentialBackoffRetry;
import backtype.storm.utils.Utils;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.storm.guava.util.concurrent.FutureCallback;
import org.apache.storm.guava.util.concurrent.Futures;
import org.apache.storm.guava.util.concurrent.ListeningScheduledExecutorService;
import org.apache.storm.guava.util.concurrent.MoreExecutors;
import org.apache.storm.netty.bootstrap.ClientBootstrap;
import org.apache.storm.netty.channel.Channel;
import org.apache.storm.netty.channel.ChannelFactory;
import org.apache.storm.netty.channel.ChannelFuture;
import org.apache.storm.netty.channel.ChannelFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:backtype/storm/messaging/netty/Client.class */
public class Client extends ConnectionWithStatus implements IStatefulObject {
    private static final Logger LOG = LoggerFactory.getLogger(Client.class);
    private static final String PREFIX = "Netty-Client-";
    private static final long NO_DELAY_MS = 0;
    private static final long MINIMUM_INITIAL_DELAY_MS = 30000;
    private static final long PENDING_MESSAGES_FLUSH_TIMEOUT_MS = 600000;
    private static final long PENDING_MESSAGES_FLUSH_INTERVAL_MS = 1000;
    private static final long DISTANT_FUTURE_TIME_MS = Long.MAX_VALUE;
    private final StormBoundedExponentialBackoffRetry retryPolicy;
    private final ClientBootstrap bootstrap;
    private final InetSocketAddress dstAddress;
    protected final String dstAddressPrefixedName;
    private final int maxReconnectionAttempts;
    private volatile boolean closing;
    private final int flushCheckIntervalMs;
    private final int messageBatchSize;
    private final ListeningScheduledExecutorService scheduler;
    protected final Map stormConf;
    private final AtomicReference<Channel> channelRef = new AtomicReference<>(null);
    private final AtomicInteger totalConnectionAttempts = new AtomicInteger(0);
    private final AtomicInteger connectionAttempts = new AtomicInteger(0);
    private final AtomicInteger messagesSent = new AtomicInteger(0);
    private final AtomicInteger messagesLost = new AtomicInteger(0);
    private final AtomicLong pendingMessages = new AtomicLong(NO_DELAY_MS);
    private final AtomicBoolean backgroundFlushingEnabled = new AtomicBoolean(false);
    private final AtomicLong nextBackgroundFlushTimeMs = new AtomicLong(DISTANT_FUTURE_TIME_MS);
    private MessageBatch messageBatch = null;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:backtype/storm/messaging/netty/Client$Connector.class */
    public class Connector implements Callable<Channel> {
        private final InetSocketAddress address;
        private final int connectionAttempt;

        public Connector(InetSocketAddress inetSocketAddress, int i) {
            this.address = inetSocketAddress;
            if (i < 1) {
                throw new IllegalArgumentException("connection attempt must be >= 1 (you provided " + i + ")");
            }
            this.connectionAttempt = i;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Channel call() throws Exception {
            Client.LOG.debug("connecting to {} [attempt {}]", this.address.toString(), Integer.valueOf(this.connectionAttempt));
            Channel channel = null;
            ChannelFuture connect = Client.this.bootstrap.connect(this.address);
            connect.awaitUninterruptibly();
            Channel channel2 = connect.getChannel();
            if (connect.isSuccess() && Client.this.connectionEstablished(channel2)) {
                channel = channel2;
                Client.LOG.debug("successfully connected to {}, {} [attempt {}]", new Object[]{this.address.toString(), channel.toString(), Integer.valueOf(this.connectionAttempt)});
            } else {
                Client.LOG.debug("failed to connect to {} [attempt {}]", this.address.toString(), Integer.valueOf(this.connectionAttempt));
                if (channel2 != null) {
                    channel2.close();
                }
            }
            return channel;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Client(Map map, ChannelFactory channelFactory, ScheduledExecutorService scheduledExecutorService, String str, int i) {
        this.closing = false;
        this.closing = false;
        this.stormConf = map;
        this.scheduler = MoreExecutors.listeningDecorator(scheduledExecutorService);
        int intValue = Utils.getInt(map.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE)).intValue();
        LOG.info("creating Netty Client, connecting to {}:{}, bufferSize: {}", new Object[]{str, Integer.valueOf(i), Integer.valueOf(intValue)});
        this.messageBatchSize = Utils.getInt(map.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), 262144).intValue();
        this.flushCheckIntervalMs = Utils.getInt(map.get(Config.STORM_NETTY_FLUSH_CHECK_INTERVAL_MS), 10).intValue();
        this.maxReconnectionAttempts = Utils.getInt(map.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES)).intValue();
        this.retryPolicy = new StormBoundedExponentialBackoffRetry(Utils.getInt(map.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS)).intValue(), Utils.getInt(map.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS)).intValue(), this.maxReconnectionAttempts);
        this.bootstrap = createClientBootstrap(channelFactory, intValue);
        this.dstAddress = new InetSocketAddress(str, i);
        this.dstAddressPrefixedName = prefixedName(this.dstAddress);
        connect(NO_DELAY_MS);
        pauseBackgroundFlushing();
        scheduledExecutorService.scheduleWithFixedDelay(createBackgroundFlusher(), Math.min(MINIMUM_INITIAL_DELAY_MS, r0 * this.maxReconnectionAttempts), this.flushCheckIntervalMs, TimeUnit.MILLISECONDS);
    }

    private ClientBootstrap createClientBootstrap(ChannelFactory channelFactory, int i) {
        ClientBootstrap clientBootstrap = new ClientBootstrap(channelFactory);
        clientBootstrap.setOption("tcpNoDelay", true);
        clientBootstrap.setOption("sendBufferSize", Integer.valueOf(i));
        clientBootstrap.setOption("keepAlive", true);
        clientBootstrap.setPipelineFactory(new StormClientPipelineFactory(this));
        return clientBootstrap;
    }

    private String prefixedName(InetSocketAddress inetSocketAddress) {
        return null != inetSocketAddress ? PREFIX + inetSocketAddress.toString() : "";
    }

    private Runnable createBackgroundFlusher() {
        return new Runnable() { // from class: backtype.storm.messaging.netty.Client.1
            @Override // java.lang.Runnable
            public void run() {
                if (Client.this.closing || !Client.this.backgroundFlushingEnabled.get() || Client.this.nowMillis() <= Client.this.nextBackgroundFlushTimeMs.get()) {
                    return;
                }
                Client.LOG.debug("flushing {} pending messages to {} in background", Integer.valueOf(Client.this.messageBatch.size()), Client.this.dstAddressPrefixedName);
                Client.this.flushPendingMessages();
            }
        };
    }

    private void pauseBackgroundFlushing() {
        this.backgroundFlushingEnabled.set(false);
    }

    private void resumeBackgroundFlushing() {
        this.backgroundFlushingEnabled.set(true);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void flushPendingMessages() {
        Channel channel = this.channelRef.get();
        if (containsMessages(this.messageBatch)) {
            if (!connectionEstablished(channel)) {
                closeChannelAndReconnect(channel);
                return;
            }
            if (channel.isWritable()) {
                pauseBackgroundFlushing();
                flushMessages(channel, this.messageBatch);
                this.messageBatch = null;
            } else if (this.closing) {
                resumeBackgroundFlushing();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public long nowMillis() {
        return System.currentTimeMillis();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void connect(long j) {
        try {
            if (this.closing || connectionEstablished(this.channelRef.get())) {
                return;
            }
            this.connectionAttempts.getAndIncrement();
            if (!reconnectingAllowed()) {
                close();
                throw new RuntimeException("Giving up to connect to " + this.dstAddressPrefixedName + " after " + this.connectionAttempts + " failed attempts");
            }
            this.totalConnectionAttempts.getAndIncrement();
            LOG.info("connection attempt {} to {} scheduled to run in {} ms", new Object[]{Integer.valueOf(this.connectionAttempts.get()), this.dstAddressPrefixedName, Long.valueOf(j)});
            Futures.addCallback(this.scheduler.schedule((Callable) new Connector(this.dstAddress, this.connectionAttempts.get()), j, TimeUnit.MILLISECONDS), new FutureCallback<Channel>() { // from class: backtype.storm.messaging.netty.Client.2
                @Override // org.apache.storm.guava.util.concurrent.FutureCallback
                public void onSuccess(Channel channel) {
                    if (!Client.this.connectionEstablished(channel)) {
                        reconnectAgain(new RuntimeException("Returned channel was actually not established"));
                        return;
                    }
                    Client.this.setChannel(channel);
                    Client.LOG.info("connection established to {}", Client.this.dstAddressPrefixedName);
                    Client.this.connectionAttempts.set(0);
                }

                @Override // org.apache.storm.guava.util.concurrent.FutureCallback
                public void onFailure(Throwable th) {
                    reconnectAgain(th);
                }

                private void reconnectAgain(Throwable th) {
                    String format = String.format("connection attempt %s to %s failed", Client.this.connectionAttempts, Client.this.dstAddressPrefixedName);
                    Client.LOG.error(th == null ? format : format + ": " + th.toString());
                    Client.this.connect(Client.this.retryPolicy.getSleepTimeMs(Client.this.connectionAttempts.get(), Client.NO_DELAY_MS));
                }
            });
        } catch (Exception e) {
            throw new RuntimeException("Failed to connect to " + this.dstAddressPrefixedName, e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setChannel(Channel channel) {
        this.channelRef.set(channel);
    }

    private boolean reconnectingAllowed() {
        return !this.closing && this.connectionAttempts.get() <= this.maxReconnectionAttempts + 1;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean connectionEstablished(Channel channel) {
        return channel != null && channel.isConnected();
    }

    @Override // backtype.storm.messaging.ConnectionWithStatus
    public ConnectionWithStatus.Status status() {
        return this.closing ? ConnectionWithStatus.Status.Closed : !connectionEstablished(this.channelRef.get()) ? ConnectionWithStatus.Status.Connecting : ConnectionWithStatus.Status.Ready;
    }

    @Override // backtype.storm.messaging.IConnection
    public Iterator<TaskMessage> recv(int i, int i2) {
        throw new UnsupportedOperationException("Client connection should not receive any messages");
    }

    @Override // backtype.storm.messaging.IConnection
    public void send(int i, byte[] bArr) {
        TaskMessage taskMessage = new TaskMessage(i, bArr);
        ArrayList arrayList = new ArrayList(1);
        arrayList.add(taskMessage);
        send(arrayList.iterator());
    }

    @Override // backtype.storm.messaging.IConnection
    public synchronized void send(Iterator<TaskMessage> it) {
        if (this.closing) {
            LOG.error("discarding {} messages because the Netty client to {} is being closed", Integer.valueOf(iteratorSize(it)), this.dstAddressPrefixedName);
            return;
        }
        if (hasMessages(it)) {
            Channel channel = this.channelRef.get();
            if (!connectionEstablished(channel)) {
                closeChannelAndReconnect(channel);
                handleMessagesWhenConnectionIsUnavailable(it);
                return;
            }
            while (it.hasNext()) {
                TaskMessage next = it.next();
                if (this.messageBatch == null) {
                    this.messageBatch = new MessageBatch(this.messageBatchSize);
                }
                this.messageBatch.add(next);
                if (this.messageBatch.isFull()) {
                    flushMessages(channel, this.messageBatch);
                    this.messageBatch = null;
                }
            }
            if (containsMessages(this.messageBatch)) {
                if (!connectionEstablished(channel) || !channel.isWritable()) {
                    resumeBackgroundFlushing();
                    this.nextBackgroundFlushTimeMs.set(nowMillis() + this.flushCheckIntervalMs);
                } else {
                    pauseBackgroundFlushing();
                    MessageBatch messageBatch = this.messageBatch;
                    this.messageBatch = null;
                    flushMessages(channel, messageBatch);
                }
            }
        }
    }

    private boolean hasMessages(Iterator<TaskMessage> it) {
        return it != null && it.hasNext();
    }

    private void handleMessagesWhenConnectionIsUnavailable(Iterator<TaskMessage> it) {
        LOG.error("connection to {} is unavailable", this.dstAddressPrefixedName);
        dropMessages(it);
    }

    private void dropMessages(Iterator<TaskMessage> it) {
        int iteratorSize = iteratorSize(it);
        this.messagesLost.getAndAdd(iteratorSize);
        LOG.error("dropping {} message(s) destined for {}", Integer.valueOf(iteratorSize), this.dstAddressPrefixedName);
    }

    private int iteratorSize(Iterator<TaskMessage> it) {
        int i = 0;
        if (it != null) {
            while (it.hasNext()) {
                i++;
                it.next();
            }
        }
        return i;
    }

    private synchronized void flushMessages(Channel channel, final MessageBatch messageBatch) {
        if (containsMessages(messageBatch)) {
            final int size = messageBatch.size();
            this.pendingMessages.getAndAdd(size);
            LOG.debug("writing {} messages to channel {}", Integer.valueOf(messageBatch.size()), channel.toString());
            channel.write(messageBatch).addListener(new ChannelFutureListener() { // from class: backtype.storm.messaging.netty.Client.3
                @Override // org.apache.storm.netty.channel.ChannelFutureListener
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    Client.this.pendingMessages.getAndAdd(0 - size);
                    if (channelFuture.isSuccess()) {
                        Client.LOG.debug("sent {} messages to {}", Integer.valueOf(size), Client.this.dstAddressPrefixedName);
                        Client.this.messagesSent.getAndAdd(messageBatch.size());
                    } else {
                        Client.LOG.error("failed to send {} messages to {}: {}", new Object[]{Integer.valueOf(size), Client.this.dstAddressPrefixedName, channelFuture.getCause()});
                        Client.this.closeChannelAndReconnect(channelFuture.getChannel());
                        Client.this.messagesLost.getAndAdd(size);
                    }
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void closeChannelAndReconnect(Channel channel) {
        if (channel != null) {
            channel.close();
            if (this.channelRef.compareAndSet(channel, null)) {
                connect(NO_DELAY_MS);
            }
        }
    }

    private boolean containsMessages(MessageBatch messageBatch) {
        return (messageBatch == null || messageBatch.isEmpty()) ? false : true;
    }

    @Override // backtype.storm.messaging.IConnection
    public void close() {
        if (this.closing) {
            return;
        }
        LOG.info("closing Netty Client {}", this.dstAddressPrefixedName);
        this.closing = true;
        flushPendingMessages();
        waitForPendingMessagesToBeSent();
        closeChannel();
    }

    private synchronized void waitForPendingMessagesToBeSent() {
        LOG.info("waiting up to {} ms to send {} pending messages to {}", new Object[]{Long.valueOf(PENDING_MESSAGES_FLUSH_TIMEOUT_MS), Long.valueOf(this.pendingMessages.get()), this.dstAddressPrefixedName});
        long j = this.pendingMessages.get();
        long nowMillis = nowMillis();
        while (this.pendingMessages.get() != NO_DELAY_MS) {
            try {
                if (nowMillis() - nowMillis > PENDING_MESSAGES_FLUSH_TIMEOUT_MS) {
                    LOG.error("failed to send all pending messages to {} within timeout, {} of {} messages were not sent", new Object[]{this.dstAddressPrefixedName, Long.valueOf(this.pendingMessages.get()), Long.valueOf(j)});
                    return;
                }
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                return;
            }
        }
    }

    private synchronized void closeChannel() {
        if (this.channelRef.get() != null) {
            this.channelRef.get().close();
            LOG.debug("channel to {} closed", this.dstAddressPrefixedName);
        }
    }

    @Override // backtype.storm.metric.api.IStatefulObject
    public Object getState() {
        LOG.info("Getting metrics for client connection to {}", this.dstAddressPrefixedName);
        HashMap hashMap = new HashMap();
        hashMap.put("reconnects", Integer.valueOf(this.totalConnectionAttempts.getAndSet(0)));
        hashMap.put("sent", Integer.valueOf(this.messagesSent.getAndSet(0)));
        hashMap.put("pending", Long.valueOf(this.pendingMessages.get()));
        hashMap.put("lostOnSend", Integer.valueOf(this.messagesLost.getAndSet(0)));
        hashMap.put("dest", this.dstAddress.toString());
        String srcAddressName = srcAddressName();
        if (srcAddressName != null) {
            hashMap.put("src", srcAddressName);
        }
        return hashMap;
    }

    private String srcAddressName() {
        SocketAddress localAddress;
        String str = null;
        Channel channel = this.channelRef.get();
        if (channel != null && (localAddress = channel.getLocalAddress()) != null) {
            str = localAddress.toString();
        }
        return str;
    }

    public String toString() {
        return String.format("Netty client for connecting to %s", this.dstAddressPrefixedName);
    }
}
