/*
 * Decompiled with CFR 0.152.
 */
package flex.messaging.endpoints;

import flex.messaging.FlexContext;
import flex.messaging.FlexSession;
import flex.messaging.HttpFlexSession;
import flex.messaging.MessageException;
import flex.messaging.client.EndpointPushNotifier;
import flex.messaging.client.FlexClient;
import flex.messaging.client.FlushResult;
import flex.messaging.client.UserAgentSettings;
import flex.messaging.config.ConfigMap;
import flex.messaging.endpoints.BaseHTTPEndpoint;
import flex.messaging.log.Log;
import flex.messaging.messages.AcknowledgeMessage;
import flex.messaging.messages.AsyncMessage;
import flex.messaging.messages.CommandMessage;
import flex.messaging.messages.Message;
import flex.messaging.messages.MessagePerformanceInfo;
import flex.messaging.messages.MessagePerformanceUtils;
import flex.messaging.util.TimeoutManager;
import flex.messaging.util.UserAgentManager;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

public abstract class BaseStreamingHTTPEndpoint
extends BaseHTTPEndpoint {
    private static final byte[] CRLF_BYTES = new byte[]{13, 10};
    private static final byte ZERO_BYTE = 48;
    private static final byte NULL_BYTE = 0;
    private static final String COMMAND_PARAM_NAME = "command";
    private static final String OPEN_COMMAND = "open";
    private static final String CLOSE_COMMAND = "close";
    private static final String STREAM_ID_PARAM_NAME = "streamId";
    private static final String HTTP_1_0 = "HTTP/1.0";
    private static final String STREAMING_THREAD_NAME_EXTENSION = "-in-streaming-mode";
    private static final String PROPERTY_CONNECTION_IDLE_TIMEOUT_MINUTES = "connection-idle-timeout-minutes";
    private static final String PROPERTY_LEGACY_CONNECTION_IDLE_TIMEOUT_MINUTES = "idle-timeout-minutes";
    private static final String MAX_STREAMING_CLIENTS = "max-streaming-clients";
    private static final String SERVER_TO_CLIENT_HEARTBEAT_MILLIS = "server-to-client-heartbeat-millis";
    private static final String PROPERTY_INVALIDATE_MESSAGECLIENT_ON_STREAMING_CLOSE = "invalidate-messageclient-on-streaming-close";
    private static final boolean DEFAULT_INVALIDATE_MESSAGECLIENT_ON_STREAMING_CLOSE = false;
    private static final int DEFAULT_SERVER_TO_CLIENT_HEARTBEAT_MILLIS = 5000;
    private static final int DEFAULT_MAX_STREAMING_CLIENTS = 10;
    public static final String POLL_NOT_SUPPORTED_CODE = "Server.PollNotSupported";
    public static final int POLL_NOT_SUPPORTED_MESSAGE = 10034;
    protected final Object lock = new Object();
    protected UserAgentManager userAgentManager = new UserAgentManager();
    private volatile boolean canStream = true;
    private volatile TimeoutManager pushNotifierTimeoutManager;
    private ConcurrentHashMap<String, EndpointPushNotifier> currentStreamingRequests;
    private volatile boolean invalidateMessageClientOnStreamingClose = false;
    private long serverToClientHeartbeatMillis = 5000L;
    private int connectionIdleTimeoutMinutes = 0;
    private int maxStreamingClients = 10;
    protected int streamingClientsCount;

    public BaseStreamingHTTPEndpoint() {
        this(false);
    }

    public BaseStreamingHTTPEndpoint(boolean enableManagement) {
        super(enableManagement);
    }

    @Override
    public void initialize(String id, ConfigMap properties) {
        super.initialize(id, properties);
        if (properties == null || properties.size() == 0) {
            UserAgentManager.setupUserAgentManager(null, this.userAgentManager);
            return;
        }
        this.serverToClientHeartbeatMillis = properties.getPropertyAsLong(SERVER_TO_CLIENT_HEARTBEAT_MILLIS, 5000L);
        this.setServerToClientHeartbeatMillis(this.serverToClientHeartbeatMillis);
        this.setInvalidateMessageClientOnStreamingClose(properties.getPropertyAsBoolean(PROPERTY_INVALIDATE_MESSAGECLIENT_ON_STREAMING_CLOSE, false));
        int connectionIdleTimeoutMinutes = properties.getPropertyAsInt(PROPERTY_CONNECTION_IDLE_TIMEOUT_MINUTES, this.getConnectionIdleTimeoutMinutes());
        if (connectionIdleTimeoutMinutes != 0) {
            this.setConnectionIdleTimeoutMinutes(connectionIdleTimeoutMinutes);
        } else {
            connectionIdleTimeoutMinutes = properties.getPropertyAsInt(PROPERTY_LEGACY_CONNECTION_IDLE_TIMEOUT_MINUTES, this.getConnectionIdleTimeoutMinutes());
            if (connectionIdleTimeoutMinutes != 0) {
                this.setConnectionIdleTimeoutMinutes(connectionIdleTimeoutMinutes);
            }
        }
        UserAgentManager.setupUserAgentManager(properties, this.userAgentManager);
        this.maxStreamingClients = properties.getPropertyAsInt(MAX_STREAMING_CLIENTS, 10);
        this.canStream = this.maxStreamingClients > 0;
    }

    @Override
    public void start() {
        if (this.isStarted()) {
            return;
        }
        super.start();
        if (this.connectionIdleTimeoutMinutes > 0) {
            this.pushNotifierTimeoutManager = new TimeoutManager(new ThreadFactory(){
                int counter = 1;

                @Override
                public synchronized Thread newThread(Runnable runnable) {
                    Thread t = new Thread(runnable);
                    t.setName(BaseStreamingHTTPEndpoint.this.getId() + "-StreamingConnectionTimeoutThread-" + this.counter++);
                    return t;
                }
            });
        }
        this.currentStreamingRequests = new ConcurrentHashMap();
    }

    @Override
    public void stop() {
        if (!this.isStarted()) {
            return;
        }
        if (this.pushNotifierTimeoutManager != null) {
            this.pushNotifierTimeoutManager.shutdown();
            this.pushNotifierTimeoutManager = null;
        }
        for (EndpointPushNotifier notifier : this.currentStreamingRequests.values()) {
            notifier.close();
        }
        this.currentStreamingRequests = null;
        super.stop();
    }

    public boolean isInvalidateMessageClientOnStreamingClose() {
        return this.invalidateMessageClientOnStreamingClose;
    }

    public void setInvalidateMessageClientOnStreamingClose(boolean value) {
        this.invalidateMessageClientOnStreamingClose = value;
    }

    public long getServerToClientHeartbeatMillis() {
        return this.serverToClientHeartbeatMillis;
    }

    public void setServerToClientHeartbeatMillis(long serverToClientHeartbeatMillis) {
        if (serverToClientHeartbeatMillis < 0L) {
            serverToClientHeartbeatMillis = 0L;
        }
        this.serverToClientHeartbeatMillis = serverToClientHeartbeatMillis;
    }

    public int getConnectionIdleTimeoutMinutes() {
        return this.connectionIdleTimeoutMinutes;
    }

    public void setConnectionIdleTimeoutMinutes(int value) {
        if (value < 0) {
            value = 0;
        }
        this.connectionIdleTimeoutMinutes = value;
    }

    public int getIdleTimeoutMinutes() {
        return this.getConnectionIdleTimeoutMinutes();
    }

    public void setIdleTimeoutMinutes(int value) {
        this.setConnectionIdleTimeoutMinutes(value);
    }

    public int getMaxStreamingClients() {
        return this.maxStreamingClients;
    }

    public void setMaxStreamingClients(int maxStreamingClients) {
        this.maxStreamingClients = maxStreamingClients;
        this.canStream = this.streamingClientsCount < maxStreamingClients;
    }

    public int getStreamingClientsCount() {
        return this.streamingClientsCount;
    }

    @Override
    public void service(HttpServletRequest req, HttpServletResponse res) {
        String command = req.getParameter(COMMAND_PARAM_NAME);
        if (command != null) {
            this.serviceStreamingRequest(req, res);
        } else {
            super.service(req, res);
        }
    }

    protected void addPerformanceInfo(Message message) {
        MessagePerformanceInfo mpip;
        MessagePerformanceInfo mpiOriginal;
        block8: {
            mpiOriginal = this.getMPI(message);
            if (mpiOriginal == null) {
                return;
            }
            mpip = (MessagePerformanceInfo)mpiOriginal.clone();
            try {
                MessagePerformanceUtils.setMPIP(message, mpip);
                MessagePerformanceUtils.setMPII(message, null);
            }
            catch (Exception e) {
                if (!Log.isDebug()) break block8;
                this.log.debug("MPI exception while streaming the message: " + e.toString());
            }
        }
        MessagePerformanceInfo mpio = new MessagePerformanceInfo();
        if (mpip.recordMessageTimes) {
            mpio.sendTime = System.currentTimeMillis();
            mpio.infoType = "OUT";
        }
        mpio.pushedFlag = true;
        MessagePerformanceUtils.setMPIO(message, mpio);
        if (mpip.recordMessageSizes) {
            try {
                long serializationOverhead = System.currentTimeMillis();
                mpio.messageSize = this.getMessageSizeForPerformanceInfo(message);
                if (mpip.recordMessageTimes) {
                    serializationOverhead = System.currentTimeMillis() - serializationOverhead;
                    mpip.addToOverhead(serializationOverhead);
                    mpiOriginal.addToOverhead(serializationOverhead);
                    mpio.sendTime = System.currentTimeMillis();
                }
            }
            catch (Exception e) {
                this.log.debug("MPI exception while streaming the message: " + e.toString());
            }
        }
    }

    protected Message convertPushMessageToSmall(Message message) {
        FlexSession session = FlexContext.getFlexSession();
        if (session != null && session.useSmallMessages()) {
            return this.convertToSmallMessage(message);
        }
        return message;
    }

    protected long getMessageSizeForPerformanceInfo(Message message) {
        return 0L;
    }

    @Override
    protected FlushResult handleFlexClientPoll(FlexClient flexClient, CommandMessage pollCommand) {
        MessageException me = new MessageException();
        me.setMessage(10034);
        me.setDetails(10034);
        me.setCode(POLL_NOT_SUPPORTED_CODE);
        throw me;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void handleFlexClientStreamingOpenRequest(HttpServletRequest req, HttpServletResponse res, FlexClient flexClient) {
        block119: {
            FlexSession session;
            block120: {
                Object os;
                boolean suppressIOExceptionLogging;
                EndpointPushNotifier notifier;
                String threadName;
                Thread currentThread;
                block117: {
                    boolean thisThreadCanStream;
                    session = FlexContext.getFlexSession();
                    if (!this.canStream || !session.canStream) break block120;
                    Object object = this.lock;
                    synchronized (object) {
                        ++this.streamingClientsCount;
                        if (this.streamingClientsCount == this.maxStreamingClients) {
                            thisThreadCanStream = true;
                            this.canStream = false;
                        } else if (this.streamingClientsCount > this.maxStreamingClients) {
                            thisThreadCanStream = false;
                            --this.streamingClientsCount;
                        } else {
                            thisThreadCanStream = true;
                        }
                    }
                    if (!thisThreadCanStream) {
                        String errorMessage = "Endpoint with id '" + this.getId() + "' cannot grant streaming connection to FlexClient with id '" + flexClient.getId() + "' because " + MAX_STREAMING_CLIENTS + " limit of '" + this.maxStreamingClients + "' has been reached.";
                        if (Log.isError()) {
                            this.log.error(errorMessage);
                        }
                        try {
                            errorMessage = "Endpoint with id '" + this.getId() + "' cannot grant streaming connection to FlexClient with id '" + flexClient.getId() + "' because " + MAX_STREAMING_CLIENTS + " limit has been reached.";
                            res.sendError(503, errorMessage);
                        }
                        catch (IOException ignore) {
                            // empty catch block
                        }
                        return;
                    }
                    byte[] kickStartBytesToStream = null;
                    String userAgentValue = req.getHeader("User-Agent");
                    UserAgentSettings agentSettings = this.userAgentManager.match(userAgentValue);
                    if (agentSettings != null) {
                        FlexSession flexSession = session;
                        synchronized (flexSession) {
                            session.maxConnectionsPerSession = agentSettings.getMaxPersistentConnectionsPerSession();
                        }
                        int kickStartBytes = agentSettings.getKickstartBytes();
                        if (kickStartBytes > 0) {
                            try {
                                int chunkLengthHeaderSize = Integer.toHexString(kickStartBytes).getBytes("ASCII").length;
                                int chunkOverhead = chunkLengthHeaderSize + 4;
                                int minimumKickstartBytes = kickStartBytes - chunkOverhead;
                                kickStartBytesToStream = new byte[minimumKickstartBytes > 0 ? minimumKickstartBytes : kickStartBytes];
                            }
                            catch (UnsupportedEncodingException ignore) {
                                kickStartBytesToStream = new byte[kickStartBytes];
                            }
                            Arrays.fill(kickStartBytesToStream, (byte)0);
                        }
                    }
                    FlexSession kickStartBytes = session;
                    synchronized (kickStartBytes) {
                        ++session.streamingConnectionsCount;
                        if (session.maxConnectionsPerSession == -1) {
                            thisThreadCanStream = true;
                        } else if (session.streamingConnectionsCount == session.maxConnectionsPerSession) {
                            thisThreadCanStream = true;
                            session.canStream = false;
                        } else if (session.streamingConnectionsCount > session.maxConnectionsPerSession) {
                            thisThreadCanStream = false;
                            --session.streamingConnectionsCount;
                            Object ignore = this.lock;
                            synchronized (ignore) {
                                --this.streamingClientsCount;
                            }
                        } else {
                            thisThreadCanStream = true;
                        }
                    }
                    if (!thisThreadCanStream) {
                        if (Log.isError()) {
                            this.log.error("Endpoint with id '" + this.getId() + "' cannot grant streaming connection to FlexClient with id '" + flexClient.getId() + "' because " + "max-persistent-connections-per-session" + " limit of '" + session.maxConnectionsPerSession + (agentSettings != null ? "' for user-agent '" + agentSettings.getMatchOn() + "'" : "") + " has been reached.");
                        }
                        try {
                            String errorMessage = "The server cannot grant streaming connection to this client because limit has been reached.";
                            res.sendError(503, errorMessage);
                        }
                        catch (IOException ignore) {
                            // empty catch block
                        }
                        return;
                    }
                    currentThread = Thread.currentThread();
                    threadName = currentThread.getName();
                    notifier = null;
                    suppressIOExceptionLogging = false;
                    currentThread.setName(threadName + STREAMING_THREAD_NAME_EXTENSION);
                    if (this.addNoCacheHeaders) {
                        BaseStreamingHTTPEndpoint.addNoCacheHeaders(req, res);
                    }
                    res.setContentType(this.getResponseContentType());
                    res.setHeader("Connection", CLOSE_COMMAND);
                    res.setHeader("Transfer-Encoding", "chunked");
                    os = res.getOutputStream();
                    res.flushBuffer();
                    if (kickStartBytesToStream != null) {
                        if (Log.isDebug()) {
                            this.log.debug("Endpoint with id '" + this.getId() + "' is streaming " + kickStartBytesToStream.length + " bytes (not counting chunk encoding overhead) to kick-start the streaming connection for FlexClient with id '" + flexClient.getId() + "'.");
                        }
                        this.streamChunk(kickStartBytesToStream, (ServletOutputStream)os, res);
                    }
                    this.setThreadLocals();
                    try {
                        notifier = new EndpointPushNotifier(this, flexClient);
                    }
                    catch (MessageException me) {
                        FlexSession flexSession;
                        if (me.getNumber() != 10033) break block117;
                        if (Log.isWarn()) {
                            this.log.warn("Endpoint with id '" + this.getId() + "' received a duplicate streaming connection request from, FlexClient with id '" + flexClient.getId() + "'. Faulting request.");
                        }
                        Object object2 = this.lock;
                        synchronized (object2) {
                            --this.streamingClientsCount;
                            this.canStream = this.streamingClientsCount < this.maxStreamingClients;
                            flexSession = session;
                            synchronized (flexSession) {
                                --session.streamingConnectionsCount;
                                session.canStream = session.maxConnectionsPerSession == -1 || session.streamingConnectionsCount < session.maxConnectionsPerSession;
                            }
                        }
                        try {
                            res.sendError(400);
                        }
                        catch (IOException ignore) {
                            // empty catch block
                        }
                        currentThread.setName(threadName);
                        Object ignore = this.lock;
                        synchronized (ignore) {
                            --this.streamingClientsCount;
                            this.canStream = this.streamingClientsCount < this.maxStreamingClients;
                            flexSession = session;
                            synchronized (flexSession) {
                                --session.streamingConnectionsCount;
                                session.canStream = session.maxConnectionsPerSession == -1 || session.streamingConnectionsCount < session.maxConnectionsPerSession;
                            }
                        }
                        if (notifier != null && this.currentStreamingRequests != null) {
                            this.currentStreamingRequests.remove(notifier.getNotifierId());
                            notifier.close();
                        }
                        if (Log.isDebug()) {
                            Log.getLogger((String)"Endpoint.FlexSession").info("Number of streaming clients for FlexSession with id '" + session.getId() + "' is " + session.streamingConnectionsCount + ".");
                        }
                        if (Log.isDebug()) {
                            this.log.debug("Number of streaming clients for endpoint with id '" + this.getId() + "' is " + this.streamingClientsCount + ".");
                        }
                        return;
                    }
                }
                try {
                    if (this.connectionIdleTimeoutMinutes > 0) {
                        notifier.setIdleTimeoutMinutes(this.connectionIdleTimeoutMinutes);
                    }
                    notifier.setLogCategory(this.getLogCategory());
                    this.monitorTimeout(notifier);
                    this.currentStreamingRequests.put(notifier.getNotifierId(), notifier);
                    AcknowledgeMessage connectAck = new AcknowledgeMessage();
                    connectAck.setBody(notifier.getNotifierId());
                    connectAck.setCorrelationId(OPEN_COMMAND);
                    ArrayList<AcknowledgeMessage> toPush = new ArrayList<AcknowledgeMessage>(1);
                    toPush.add(connectAck);
                    this.streamMessages(toPush, (ServletOutputStream)os, res);
                    if (Log.isDebug()) {
                        Log.getLogger((String)"Endpoint.FlexSession").info("Number of streaming clients for FlexSession with id '" + session.getId() + "' is " + session.streamingConnectionsCount + ".");
                    }
                    if (Log.isDebug()) {
                        this.log.debug("Number of streaming clients for endpoint with id '" + this.getId() + "' is " + this.streamingClientsCount + ".");
                    }
                    while (!notifier.isClosed()) {
                        block118: {
                            try {
                                List<AsyncMessage> messages = null;
                                Object object = notifier.pushNeeded;
                                synchronized (object) {
                                    messages = notifier.drainMessages();
                                }
                                this.streamMessages(messages, (ServletOutputStream)os, res);
                                object = notifier.pushNeeded;
                                synchronized (object) {
                                    notifier.pushNeeded.wait(this.serverToClientHeartbeatMillis);
                                    messages = notifier.drainMessages();
                                }
                                if (messages == null && this.serverToClientHeartbeatMillis > 0L) {
                                    try {
                                        os.write(0);
                                        res.flushBuffer();
                                        break block118;
                                    }
                                    catch (IOException e) {
                                        if (!Log.isWarn()) break;
                                        this.log.warn("Endpoint with id '" + this.getId() + "' is closing the streaming connection to FlexClient with id '" + flexClient.getId() + "' because endpoint encountered a socket write error" + ", possibly due to an unresponsive FlexClient.", (Throwable)e);
                                        break;
                                    }
                                }
                                notifier.updateLastUse();
                                this.streamMessages(messages, (ServletOutputStream)os, res);
                            }
                            catch (InterruptedException e) {
                                if (Log.isWarn()) {
                                    this.log.warn("Streaming thread '" + threadName + "' for endpoint with id '" + this.getId() + "' has been interrupted and the streaming connection will be closed.");
                                }
                                os.close();
                                break;
                            }
                        }
                        flexClient.updateLastUse();
                    }
                    if (Log.isDebug()) {
                        this.log.debug("Streaming thread '" + threadName + "' for endpoint with id '" + this.getId() + "' is releasing connection and returning to the request handler pool.");
                    }
                    suppressIOExceptionLogging = true;
                    this.streamChunk(null, (ServletOutputStream)os, res);
                    currentThread.setName(threadName);
                    os = this.lock;
                }
                catch (IOException e) {
                    Object object;
                    try {
                        if (Log.isWarn() && !suppressIOExceptionLogging) {
                            this.log.warn("Streaming thread '" + threadName + "' for endpoint with id '" + this.getId() + "' is closing connection due to an IO error.", (Throwable)e);
                        }
                        currentThread.setName(threadName);
                        object = this.lock;
                    }
                    catch (Throwable throwable) {
                        currentThread.setName(threadName);
                        Object object3 = this.lock;
                        synchronized (object3) {
                            --this.streamingClientsCount;
                            this.canStream = this.streamingClientsCount < this.maxStreamingClients;
                            FlexSession flexSession = session;
                            synchronized (flexSession) {
                                --session.streamingConnectionsCount;
                                session.canStream = session.maxConnectionsPerSession == -1 || session.streamingConnectionsCount < session.maxConnectionsPerSession;
                            }
                        }
                        if (notifier != null && this.currentStreamingRequests != null) {
                            this.currentStreamingRequests.remove(notifier.getNotifierId());
                            notifier.close();
                        }
                        if (Log.isDebug()) {
                            Log.getLogger((String)"Endpoint.FlexSession").info("Number of streaming clients for FlexSession with id '" + session.getId() + "' is " + session.streamingConnectionsCount + ".");
                        }
                        if (Log.isDebug()) {
                            this.log.debug("Number of streaming clients for endpoint with id '" + this.getId() + "' is " + this.streamingClientsCount + ".");
                        }
                        throw throwable;
                    }
                    synchronized (object) {
                        --this.streamingClientsCount;
                        this.canStream = this.streamingClientsCount < this.maxStreamingClients;
                        FlexSession flexSession = session;
                        synchronized (flexSession) {
                            --session.streamingConnectionsCount;
                            session.canStream = session.maxConnectionsPerSession == -1 || session.streamingConnectionsCount < session.maxConnectionsPerSession;
                        }
                    }
                    if (notifier != null && this.currentStreamingRequests != null) {
                        this.currentStreamingRequests.remove(notifier.getNotifierId());
                        notifier.close();
                    }
                    if (Log.isDebug()) {
                        Log.getLogger((String)"Endpoint.FlexSession").info("Number of streaming clients for FlexSession with id '" + session.getId() + "' is " + session.streamingConnectionsCount + ".");
                    }
                    if (Log.isDebug()) {
                        this.log.debug("Number of streaming clients for endpoint with id '" + this.getId() + "' is " + this.streamingClientsCount + ".");
                    }
                    break block119;
                }
                synchronized (os) {
                    --this.streamingClientsCount;
                    this.canStream = this.streamingClientsCount < this.maxStreamingClients;
                    FlexSession flexSession = session;
                    synchronized (flexSession) {
                        --session.streamingConnectionsCount;
                        session.canStream = session.maxConnectionsPerSession == -1 || session.streamingConnectionsCount < session.maxConnectionsPerSession;
                    }
                }
                if (notifier != null && this.currentStreamingRequests != null) {
                    this.currentStreamingRequests.remove(notifier.getNotifierId());
                    notifier.close();
                }
                if (Log.isDebug()) {
                    Log.getLogger((String)"Endpoint.FlexSession").info("Number of streaming clients for FlexSession with id '" + session.getId() + "' is " + session.streamingConnectionsCount + ".");
                }
                if (Log.isDebug()) {
                    this.log.debug("Number of streaming clients for endpoint with id '" + this.getId() + "' is " + this.streamingClientsCount + ".");
                }
                break block119;
            }
            if (Log.isError()) {
                String logString = null;
                if (!this.canStream) {
                    logString = "Endpoint with id '" + this.getId() + "' cannot grant streaming connection to FlexClient with id '" + flexClient.getId() + "' because " + MAX_STREAMING_CLIENTS + " limit of '" + this.maxStreamingClients + "' has been reached.";
                } else if (!session.canStream) {
                    logString = "Endpoint with id '" + this.getId() + "' cannot grant streaming connection to FlexClient with id '" + flexClient.getId() + "' because " + "max-streaming-connections-per-session" + " limit of '" + session.maxConnectionsPerSession + "' has been reached.";
                }
                if (logString != null) {
                    this.log.error(logString);
                }
            }
            try {
                String errorMessage = null;
                if (!this.canStream) {
                    errorMessage = "Endpoint with id '" + this.getId() + "' cannot grant streaming connection to FlexClient with id '" + flexClient.getId() + "' because " + MAX_STREAMING_CLIENTS + " limit has been reached.";
                } else if (!session.canStream) {
                    errorMessage = "Endpoint with id '" + this.getId() + "' cannot grant streaming connection to FlexClient with id '" + flexClient.getId() + "' because " + "max-streaming-connections-per-session" + " limit has been reached.";
                }
                res.sendError(503, errorMessage);
            }
            catch (IOException ignore) {
                // empty catch block
            }
        }
    }

    protected void handleFlexClientStreamingCloseRequest(HttpServletRequest req, HttpServletResponse res, FlexClient flexClient, String streamId) {
        EndpointPushNotifier notifier;
        if (streamId != null && (notifier = (EndpointPushNotifier)flexClient.getEndpointPushHandler(this.getId())) != null && notifier.getNotifierId().equals(streamId)) {
            notifier.close();
        }
    }

    protected void serviceStreamingRequest(HttpServletRequest req, HttpServletResponse res) {
        String command = req.getParameter(COMMAND_PARAM_NAME);
        if (req.getProtocol().equals(HTTP_1_0)) {
            if (Log.isError()) {
                this.log.error("Endpoint with id '" + this.getId() + "' cannot service the streaming request made with " + " HTTP 1.0. Only HTTP 1.1 is supported.");
            }
            try {
                res.sendError(400);
            }
            catch (IOException ignore) {
                // empty catch block
            }
            return;
        }
        if (!command.equals(OPEN_COMMAND) && !command.equals(CLOSE_COMMAND)) {
            if (Log.isError()) {
                this.log.error("Endpoint with id '" + this.getId() + "' cannot service the streaming request as the supplied command '" + command + "' is invalid.");
            }
            try {
                res.sendError(400);
            }
            catch (IOException ignore) {
                // empty catch block
            }
            return;
        }
        String flexClientId = req.getParameter("DSId");
        if (flexClientId == null) {
            if (Log.isError()) {
                this.log.error("Endpoint with id '" + this.getId() + "' cannot service the streaming request as no FlexClient id" + " has been supplied in the request.");
            }
            try {
                res.sendError(400);
            }
            catch (IOException ignore) {
                // empty catch block
            }
            return;
        }
        FlexClient flexClient2 = null;
        List<FlexClient> flexClients = FlexContext.getFlexSession().getFlexClients();
        boolean validFlexClientId = false;
        for (FlexClient flexClient2 : flexClients) {
            if (!flexClient2.getId().equals(flexClientId) || !flexClient2.isValid()) continue;
            validFlexClientId = true;
            break;
        }
        if (!command.equals(CLOSE_COMMAND) && !validFlexClientId) {
            if (Log.isError()) {
                this.log.error("Endpoint with id '" + this.getId() + "' cannot service the streaming request as either the supplied" + " FlexClient id '" + flexClientId + " is not valid, or the FlexClient with that id is not valid.");
            }
            try {
                res.sendError(400);
            }
            catch (IOException ignore) {
                // empty catch block
            }
            return;
        }
        if (command.equals(CLOSE_COMMAND) && flexClients.size() == 0) {
            FlexSession flexSession = FlexContext.getFlexSession();
            if (flexSession instanceof HttpFlexSession) {
                ((HttpFlexSession)flexSession).invalidate(false);
            }
            return;
        }
        if (flexClient2 != null) {
            if (command.equals(OPEN_COMMAND)) {
                this.handleFlexClientStreamingOpenRequest(req, res, flexClient2);
            } else if (command.equals(CLOSE_COMMAND)) {
                this.handleFlexClientStreamingCloseRequest(req, res, flexClient2, req.getParameter(STREAM_ID_PARAM_NAME));
            }
        }
    }

    protected void streamChunk(byte[] bytes, ServletOutputStream os, HttpServletResponse response) throws IOException {
        if (bytes != null && bytes.length > 0) {
            byte[] chunkLength = Integer.toHexString(bytes.length).getBytes("ASCII");
            os.write(chunkLength);
            os.write(CRLF_BYTES);
            os.write(bytes);
            os.write(CRLF_BYTES);
            response.flushBuffer();
        } else {
            os.write(48);
            os.write(CRLF_BYTES);
            response.flushBuffer();
        }
    }

    protected abstract void streamMessages(List var1, ServletOutputStream var2, HttpServletResponse var3) throws IOException;

    protected MessagePerformanceInfo getMPI(Message message) {
        return this.isRecordMessageSizes() || this.isRecordMessageTimes() ? MessagePerformanceUtils.getMPII(message) : null;
    }

    private void monitorTimeout(EndpointPushNotifier notifier) {
        if (this.pushNotifierTimeoutManager != null) {
            this.pushNotifierTimeoutManager.scheduleTimeout(notifier);
        }
    }
}

