package org.apache.geode.internal.cache.tier.sockets;

import java.io.IOException;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.geode.CancelException;
import org.apache.geode.DataSerializer;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.RegionExistsException;
import org.apache.geode.cache.query.internal.cq.InternalCqQuery;
import org.apache.geode.internal.cache.ClientServerObserverHolder;
import org.apache.geode.internal.cache.Conflatable;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.ha.HAContainerWrapper;
import org.apache.geode.internal.cache.ha.HARegionQueue;
import org.apache.geode.internal.cache.ha.HARegionQueueAttributes;
import org.apache.geode.internal.cache.ha.HARegionQueueStats;
import org.apache.geode.internal.logging.log4j.LogMarker;
import org.apache.geode.internal.serialization.ByteArrayDataInput;
import org.apache.geode.internal.serialization.Version;
import org.apache.geode.internal.statistics.StatisticsClock;
import org.apache.geode.logging.internal.executors.LoggingThread;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.util.internal.UncheckedUtils;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:org/apache/geode/internal/cache/tier/sockets/MessageDispatcher.class */
public class MessageDispatcher extends LoggingThread {
    private static final Logger logger = LogService.getLogger();
    private static final long DEFAULT_SLOW_STARTING_TIME = 5000;
    private static final String KEY_SLOW_START_TIME_FOR_TESTING = "slowStartTimeForTesting";
    protected final HARegionQueue _messageQueue;
    private final CacheClientProxy _proxy;
    private volatile boolean _isStopped;
    protected final Object _pausedLock;
    private final Object _stopDispatchingLock;
    private final ReadWriteLock socketLock;
    private final Lock socketWriteLock;

    /* JADX INFO: Access modifiers changed from: protected */
    public MessageDispatcher(CacheClientProxy cacheClientProxy, String str, StatisticsClock statisticsClock) throws CacheException {
        super(str);
        this._isStopped = true;
        this._pausedLock = new Object();
        this._stopDispatchingLock = new Object();
        this.socketLock = new ReentrantReadWriteLock();
        this.socketWriteLock = this.socketLock.writeLock();
        this._proxy = cacheClientProxy;
        try {
            HARegionQueueAttributes hARegionQueueAttributes = new HARegionQueueAttributes();
            hARegionQueueAttributes.setBlockingQueueCapacity(cacheClientProxy._maximumMessageCount);
            hARegionQueueAttributes.setExpiryTime(cacheClientProxy._messageTimeToLive);
            ((HAContainerWrapper) cacheClientProxy._cacheClientNotifier.getHaContainer()).putProxy(HARegionQueue.createRegionName(getProxy().getHARegionName()), getProxy());
            boolean isDurable = cacheClientProxy.proxyID.isDurable();
            boolean z = cacheClientProxy.getClientVersion().compareTo(Version.GFE_61) >= 0 && cacheClientProxy.getCache().getInternalDistributedSystem().getConfig().getDeltaPropagation() && cacheClientProxy.clientConflation != 1;
            if ((isDurable || z) && logger.isDebugEnabled()) {
                logger.debug("Creating a {} subscription queue for {}", isDurable ? "durable" : "non-durable", cacheClientProxy.getProxyID());
            }
            this._messageQueue = HARegionQueue.getHARegionQueueInstance(getProxy().getHARegionName(), getCache(), hARegionQueueAttributes, 1, isDurable, cacheClientProxy._cacheClientNotifier.getHaContainer(), cacheClientProxy.getProxyID(), this._proxy.clientConflation, this._proxy.isPrimary(), z, statisticsClock);
            if (this._proxy.hasRegisteredInterested()) {
                this._messageQueue.setHasRegisteredInterest(true);
            }
        } catch (CancelException e) {
            throw e;
        } catch (RegionExistsException e2) {
            throw e2;
        } catch (Exception e3) {
            getCache().getCancelCriterion().checkCancelInProgress(e3);
            throw new CacheException("Exception occurred while trying to create a message queue.", e3) { // from class: org.apache.geode.internal.cache.tier.sockets.MessageDispatcher.1
                private static final long serialVersionUID = 0;
            };
        }
    }

    private CacheClientProxy getProxy() {
        return this._proxy;
    }

    private InternalCache getCache() {
        return getProxy().getCache();
    }

    private Socket getSocket() {
        return getProxy().getSocket();
    }

    private ByteBuffer getCommBuffer() {
        return getProxy().getCommBuffer();
    }

    private CacheClientProxyStats getStatistics() {
        return getProxy().getStatistics();
    }

    private void basicStopDispatching() {
        if (logger.isDebugEnabled()) {
            logger.debug("{}: notified dispatcher to stop", this);
        }
        this._isStopped = true;
    }

    public String toString() {
        return getProxy().toString();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Finally extract failed */
    public synchronized void stopDispatching(boolean z) {
        List peek;
        if (isStopped()) {
            return;
        }
        if (logger.isDebugEnabled()) {
            logger.debug("{}: Stopping dispatching", this);
        }
        if (!z) {
            basicStopDispatching();
            return;
        }
        for (int i = 0; i < CacheClientProxy.MAXIMUM_SHUTDOWN_PEEKS; i++) {
            try {
                boolean interrupted = Thread.interrupted();
                try {
                    try {
                        try {
                            peek = this._messageQueue.peek(1, -1);
                        } catch (Throwable th) {
                            if (interrupted) {
                                Thread.currentThread().interrupt();
                            }
                            throw th;
                        }
                    } catch (CancelException e) {
                        if (interrupted) {
                            Thread.currentThread().interrupt();
                        }
                    }
                } catch (InterruptedException e2) {
                    if (1 != 0) {
                        Thread.currentThread().interrupt();
                    }
                } catch (CacheException e3) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("{}: Exception occurred while trying to stop dispatching", this, e3);
                    }
                    if (interrupted) {
                        Thread.currentThread().interrupt();
                    }
                }
                if (peek == null || peek.size() == 0) {
                    if (interrupted) {
                        Thread.currentThread().interrupt();
                    }
                }
                if (logger.isDebugEnabled()) {
                    logger.debug("Waiting for client to drain queue: {}", this._proxy.proxyID);
                }
                Thread.sleep(500L);
                if (interrupted) {
                    Thread.currentThread().interrupt();
                }
            } finally {
                basicStopDispatching();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isStopped() {
        return this._isStopped;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int getQueueSize() {
        if (this._messageQueue == null) {
            return 0;
        }
        return this._messageQueue.size();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int getQueueSizeStat() {
        if (this._messageQueue == null) {
            return 0;
        }
        HARegionQueueStats statistics = this._messageQueue.getStatistics();
        return (int) (((((((statistics.getEventsEnqued() - statistics.getEventsRemoved()) - statistics.getEventsConflated()) - statistics.getMarkerEventsConflated()) - statistics.getEventsExpired()) - statistics.getEventsRemovedByQrm()) - statistics.getEventsTaken()) - statistics.getNumVoidRemovals());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void drainClientCqEvents(ClientProxyMembershipID clientProxyMembershipID, InternalCqQuery internalCqQuery) {
        this._messageQueue.closeClientCq(clientProxyMembershipID, internalCqQuery);
    }

    public void run() {
        if (CacheClientProxy.isSlowStartForTesting) {
            long longValue = Long.getLong(KEY_SLOW_START_TIME_FOR_TESTING, 5000L).longValue();
            long j = 0;
            long currentTimeMillis = System.currentTimeMillis();
            while (longValue > j && CacheClientProxy.isSlowStartForTesting) {
                try {
                    Thread.sleep(500L);
                    j = System.currentTimeMillis() - currentTimeMillis;
                } catch (InterruptedException e) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Slow start for testing interrupted");
                    }
                }
            }
            if (longValue < j) {
                CacheClientProxy.isSlowStartForTesting = false;
            }
        }
        runDispatcher();
    }

    @VisibleForTesting
    protected void runDispatcher() {
        ClientMessage clientMessage;
        boolean z = false;
        this._isStopped = false;
        if (logger.isDebugEnabled()) {
            logger.debug("{}: Beginning to process events", this);
        }
        Object obj = null;
        while (true) {
            if (isStopped() || this._proxy._cache.getCancelCriterion().isCancelInProgress()) {
                break;
            }
            try {
                if (getProxy().isPaused()) {
                    synchronized (this._pausedLock) {
                        try {
                            logger.info("available ids = " + this._messageQueue.size() + " , isEmptyAckList =" + this._messageQueue.isEmptyAckList() + ", peekInitialized = " + this._messageQueue.isPeekInitialized());
                            while (!this._messageQueue.isEmptyAckList() && this._messageQueue.isPeekInitialized()) {
                                this._messageQueue.remove();
                            }
                        } catch (InterruptedException e) {
                            logger.warn("{}: sleep interrupted.", this);
                        }
                    }
                    waitForResumption();
                }
                try {
                    clientMessage = (ClientMessage) this._messageQueue.peek();
                    getStatistics().setQueueSize(this._messageQueue.size());
                } catch (RegionDestroyedException e2) {
                }
            } catch (IOException e3) {
                synchronized (this._stopDispatchingLock) {
                    if (!isStopped() && !getProxy().isPaused()) {
                        if ("Broken pipe".equals(e3.getMessage())) {
                            logger.warn("{}: Proxy closing due to unexpected broken pipe on socket connection.", this);
                        } else if ("Connection reset".equals(e3.getMessage())) {
                            logger.warn("{}: Proxy closing due to unexpected reset on socket connection.", this);
                        } else if ("Connection reset by peer".equals(e3.getMessage())) {
                            logger.warn("{}: Proxy closing due to unexpected reset by peer on socket connection.", this);
                        } else if ("Socket is closed".equals(e3.getMessage()) || "Socket Closed".equals(e3.getMessage())) {
                            logger.info("{}: Proxy closing due to socket being closed locally.", this);
                        } else {
                            logger.warn(String.format("%s: An unexpected IOException occurred so the proxy will be closed.", this), e3);
                        }
                        pauseOrUnregisterProxy(e3);
                    }
                    z = true;
                }
            } catch (InterruptedException e4) {
                if (getProxy().isPaused()) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("{}: interrupted because it is being paused. It will continue and wait for resumption.", this);
                    }
                    Thread.interrupted();
                } else if (logger.isDebugEnabled()) {
                    logger.debug("{}: interrupted", this);
                }
            } catch (CancelException e5) {
                if (logger.isDebugEnabled()) {
                    logger.debug("{}: shutting down due to cancellation", this);
                }
                z = true;
            } catch (RegionDestroyedException e6) {
                if (logger.isDebugEnabled()) {
                    logger.debug("{}: shutting down due to loss of message queue", this);
                }
                z = true;
            } catch (MessageTooLargeException e7) {
                logger.warn("Message too large to send to client: {}, {}", obj, e7.getMessage());
            } catch (Exception e8) {
                if (!isStopped()) {
                    logger.fatal(String.format("%s : An unexpected Exception occurred", this), e8);
                }
            }
            if (isStopped()) {
                break;
            }
            if (clientMessage != null) {
                long startTime = getStatistics().startTime();
                boolean dispatchMessage = dispatchMessage(clientMessage);
                getStatistics().endMessage(startTime);
                if (dispatchMessage) {
                    this._messageQueue.remove();
                    if (clientMessage instanceof ClientMarkerMessageImpl) {
                        getProxy().setMarkerEnqueued(false);
                    }
                }
            } else {
                this._messageQueue.remove();
            }
            obj = null;
        }
        if (!z) {
            ArrayList arrayList = new ArrayList();
            try {
                Thread.interrupted();
                arrayList.addAll((Collection) UncheckedUtils.uncheckedCast(this._messageQueue.peek(this._messageQueue.size())));
                if (logger.isDebugEnabled()) {
                    logger.debug("{}: After flagging the dispatcher to stop , the residual List of messages to be dispatched={} size={}", this, arrayList, Integer.valueOf(arrayList.size()));
                }
                if (arrayList.size() > 0) {
                    long startTime2 = getStatistics().startTime();
                    Iterator it = arrayList.iterator();
                    while (it.hasNext()) {
                        dispatchMessage((ClientMessage) it.next());
                        getStatistics().endMessage(startTime2);
                        it.remove();
                    }
                    this._messageQueue.remove();
                }
            } catch (CancelException e9) {
                if (logger.isDebugEnabled()) {
                    logger.debug("CacheClientNotifier stopped due to cancellation");
                }
            } catch (Exception e10) {
                String str = null;
                if ("Broken pipe".equals(e10.getMessage())) {
                    str = "Problem caused by broken pipe on socket.";
                } else if (e10 instanceof RegionDestroyedException) {
                    str = "Problem caused by message queue being closed.";
                }
                if (str == null) {
                    str = "Problem caused by: " + e10.getMessage();
                }
                Logger logger2 = logger;
                Object[] objArr = new Object[2];
                objArr[0] = !isStopped() ? toString() + ": " : "";
                objArr[1] = Integer.valueOf(arrayList.size());
                logger2.info(String.format("%s Possibility of not being able to send some or all of the messages to clients. Total messages currently present in the list %s.", objArr));
                logger.info(str);
            }
            if (!arrayList.isEmpty() && logger.isTraceEnabled()) {
                logger.trace("Messages remaining in the list are: {}", arrayList);
            }
        }
        if (logger.isTraceEnabled()) {
            logger.trace("{}: Dispatcher thread is ending", this);
        }
    }

    private void pauseOrUnregisterProxy(Throwable th) {
        if (getProxy().isDurable()) {
            try {
                getProxy().pauseDispatching();
            } catch (Exception e) {
                if (logger.isDebugEnabled()) {
                    logger.debug("{}: {}", this, e);
                }
            }
        } else {
            this._isStopped = true;
        }
        ClientHealthMonitor clientHealthMonitor = ClientHealthMonitor.getInstance();
        if (clientHealthMonitor != null) {
            ClientProxyMembershipID clientProxyMembershipID = getProxy().proxyID;
            clientHealthMonitor.removeAllConnectionsAndUnregisterClient(clientProxyMembershipID, th);
            if (getProxy().isDurable()) {
                return;
            }
            getProxy().getCacheClientNotifier().unregisterClient(clientProxyMembershipID, false);
        }
    }

    protected boolean dispatchMessage(ClientMessage clientMessage) throws IOException {
        Message message;
        boolean z = false;
        if (logger.isTraceEnabled(LogMarker.BRIDGE_SERVER_VERBOSE)) {
            logger.trace(LogMarker.BRIDGE_SERVER_VERBOSE, "Dispatching {}", clientMessage);
        }
        if (clientMessage instanceof ClientUpdateMessage) {
            byte[] bArr = (byte[]) ((ClientUpdateMessage) clientMessage).getValue();
            if (logger.isTraceEnabled()) {
                StringBuilder sb = new StringBuilder(100);
                sb.append(this).append(": Using latest value: ").append(Arrays.toString(bArr));
                if (((ClientUpdateMessage) clientMessage).valueIsObject()) {
                    if (bArr != null) {
                        sb.append(" (").append(deserialize(bArr)).append(")");
                    }
                    sb.append(" for ").append(clientMessage);
                }
                logger.trace(sb.toString());
            }
            message = ((ClientUpdateMessageImpl) clientMessage).getMessage(getProxy(), bArr);
            if (CacheClientProxy.AFTER_MESSAGE_CREATION_FLAG) {
                ClientServerObserverHolder.getInstance().afterMessageCreation(message);
            }
        } else {
            message = clientMessage.getMessage(getProxy(), true);
        }
        if (!this._proxy.isPaused()) {
            sendMessage(message);
            if (logger.isTraceEnabled()) {
                logger.trace("{}: Dispatched {}", this, clientMessage);
            }
            z = true;
        } else if (logger.isDebugEnabled()) {
            logger.debug("Message Dispatcher of a Paused CCProxy is trying to dispatch message");
        }
        if (z) {
            this._messageQueue.getStatistics().incEventsDispatched();
        }
        return z;
    }

    private void sendMessage(Message message) throws IOException {
        if (message == null) {
            return;
        }
        this.socketWriteLock.lock();
        try {
            message.setComms(getSocket(), getCommBuffer(), getStatistics());
            message.send();
            getProxy().resetPingCounter();
            if (logger.isTraceEnabled()) {
                logger.trace("{}: Sent {}", this, message);
            }
        } finally {
            this.socketWriteLock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void enqueueMessage(Conflatable conflatable) {
        try {
            this._messageQueue.put(conflatable);
            if (this._proxy.isPaused() && this._proxy.isDurable()) {
                this._proxy._cacheClientNotifier.statistics.incEventEnqueuedWhileClientAwayCount();
                if (logger.isDebugEnabled()) {
                    logger.debug("{}: Queued message while Durable Client is away {}", this, conflatable);
                }
            }
        } catch (CancelException e) {
            throw e;
        } catch (Exception e2) {
            if (isStopped()) {
                return;
            }
            this._proxy._statistics.incMessagesFailedQueued();
            logger.fatal(String.format("%s: Exception occurred while attempting to add message to queue", this), e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void enqueueMarker(ClientMessage clientMessage) {
        try {
            if (logger.isDebugEnabled()) {
                logger.debug("{}: Queueing marker message. <{}>. The queue contains {} entries.", this, clientMessage, Integer.valueOf(getQueueSize()));
            }
            this._messageQueue.put(clientMessage);
            if (logger.isDebugEnabled()) {
                logger.debug("{}: Queued marker message. The queue contains {} entries.", this, Integer.valueOf(getQueueSize()));
            }
        } catch (CancelException e) {
            throw e;
        } catch (Exception e2) {
            if (isStopped()) {
                return;
            }
            logger.fatal(String.format("%s : Exception occurred while attempting to add message to queue", this), e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void sendMessageDirectly(ClientMessage clientMessage) {
        try {
            if (logger.isDebugEnabled()) {
                logger.debug("{}: Dispatching directly: {}", this, clientMessage);
            }
            sendMessage(clientMessage.getMessage(getProxy(), true));
            if (logger.isDebugEnabled()) {
                logger.debug("{}: Dispatched directly: {}", this, clientMessage);
            }
        } catch (MessageTooLargeException e) {
            logger.warn("Message too large to send to client: {}, {}", clientMessage, e.getMessage());
        } catch (IOException e2) {
            synchronized (this._stopDispatchingLock) {
                if (!isStopped() && !getProxy().isPaused()) {
                    logger.fatal(String.format("%s : An unexpected Exception occurred", this), e2);
                    pauseOrUnregisterProxy(e2);
                }
            }
        } catch (Exception e3) {
            if (isStopped()) {
                return;
            }
            logger.fatal(String.format("%s : An unexpected Exception occurred", this), e3);
        }
    }

    protected void waitForResumption() throws InterruptedException {
        synchronized (this._pausedLock) {
            logger.info("{} : Pausing processing", this);
            if (getProxy().isPaused()) {
                while (getProxy().isPaused()) {
                    this._pausedLock.wait();
                }
                this._messageQueue.clearPeekedIDs();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void resumeDispatching() {
        logger.info("{} : Resuming processing", this);
        this._pausedLock.notifyAll();
    }

    protected Object deserialize(byte[] bArr) {
        Object obj = bArr;
        try {
            obj = DataSerializer.readObject(new ByteArrayDataInput(bArr));
        } catch (Exception e) {
        }
        return obj;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void initializeTransients() {
        while (!this._messageQueue.isEmptyAckList() && this._messageQueue.isPeekInitialized()) {
            try {
                this._messageQueue.remove();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        this._messageQueue.initializeTransients();
    }
}
