ConcurrencyThrottlingUtils.java
/*
* Copyright (c) 2018 WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.synapse.util;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.MessageContext;
import org.apache.synapse.SynapseConstants;
import org.apache.synapse.commons.throttle.core.ConcurrentAccessController;
import org.apache.synapse.commons.throttle.core.ConcurrentAccessReplicator;
/**
* Utility class with related Synapse concurrency throttling.
*/
public class ConcurrencyThrottlingUtils {
private static final Log log = LogFactory.getLog(ConcurrencyThrottlingUtils.class);
/**
* Decrement the internal counter for concurrency throttling, in case of normal response retrieval,
* delayed response at synapse timeout, exceptional cases while mediation end up trigger fault.
* In clustered environment, replicate the decremented value to other members.
*
* @param synCtx Synapse Message Context of which mediation occurs.
*/
public static void decrementConcurrencyThrottleAccessController(MessageContext synCtx) {
Boolean isConcurrencyThrottleEnabled = (Boolean) synCtx
.getProperty(SynapseConstants.SYNAPSE_CONCURRENCY_THROTTLE);
if (isConcurrencyThrottleEnabled != null && isConcurrencyThrottleEnabled) {
ConcurrentAccessController concurrentAccessController = (ConcurrentAccessController) synCtx
.getProperty(SynapseConstants.SYNAPSE_CONCURRENT_ACCESS_CONTROLLER);
int available = concurrentAccessController.incrementAndGet();
int concurrentLimit = concurrentAccessController.getLimit();
if (log.isDebugEnabled()) {
log.debug("Concurrency Throttle : Connection returned" + " :: " + available + " of available of"
+ concurrentLimit + " connections");
}
ConcurrentAccessReplicator concurrentAccessReplicator = (ConcurrentAccessReplicator) synCtx
.getProperty(SynapseConstants.SYNAPSE_CONCURRENT_ACCESS_REPLICATOR);
String throttleKey = (String) synCtx.getProperty(SynapseConstants.SYNAPSE_CONCURRENCY_THROTTLE_KEY);
if (concurrentAccessReplicator != null) {
concurrentAccessReplicator.replicate(throttleKey, true);
}
//once decremented, clear the flag since we no longer required to decrement the value
//concurrency throttle access controller
synCtx.setProperty(SynapseConstants.SYNAPSE_CONCURRENCY_THROTTLE, false);
}
}
}