SharedParamManager.java
package org.apache.synapse.commons.throttle.core;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.commons.throttle.core.internal.ThrottleServiceDataHolder;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class SharedParamManager {
private static Map<String, Long> counters= new ConcurrentHashMap<String, Long>();//Locally managed counters map for non clustered environment
private static Map<String, Long> timestamps = new ConcurrentHashMap<String, Long>();//Locally managed time stamps map for non clustered environment
private static Log log = LogFactory.getLog(SharedParamManager.class.getName());
private SharedParamManager() {
}
/**
* Return distributed shared counter for this caller context with given id. If it's not distributed will get from the
* local counter
*
* @param id of the shared counter
* @return shared hazelcast current shared counter
*/
public static long getDistributedCounter(String id) {
if(log.isDebugEnabled()) {
log.debug("GET TIMESTAMP WITH ID " + id);
}
id = ThrottleConstants.THROTTLE_SHARED_COUNTER_KEY + id;
DistributedCounterManager distributedCounterManager =
ThrottleServiceDataHolder.getInstance().getDistributedCounterManager();
if (distributedCounterManager != null && distributedCounterManager.isEnable()) {
return distributedCounterManager.getCounter(id);
} else {
Long counter = counters.get(id);
if (counter != null) {
return counter;
} else {
counters.put(id, 0L);
return 0;
}
}
}
/**
* Set distribute counter of caller context of given id to the provided value. If it's not distributed do the same for
* local counter
*
* @param id of the caller context
* @param value to set to the global counter
*/
public static void setDistributedCounter(String id, long value) {
if(log.isDebugEnabled()) {
log.debug("SETTING COUNTER WITH ID " + id);
}
id = ThrottleConstants.THROTTLE_SHARED_COUNTER_KEY + id;
DistributedCounterManager distributedCounterManager =
ThrottleServiceDataHolder.getInstance().getDistributedCounterManager();
if (distributedCounterManager != null && distributedCounterManager.isEnable()) {
distributedCounterManager.setCounter(id,value);
} else {
counters.put(id, value);
}
}
/**
* Set the distributed counter with the given id key with an expiry time
*
* @param id key id
* @param value value to set
* @param expiryTime expiry time in milliseconds
*/
public static void setDistributedCounterWithExpiry(String id, long value, long expiryTime) {
if (log.isDebugEnabled()) {
log.debug("SETTING COUNTER WITH ID " + id);
}
id = ThrottleConstants.THROTTLE_SHARED_COUNTER_KEY + id;
DistributedCounterManager distributedCounterManager =
ThrottleServiceDataHolder.getInstance().getDistributedCounterManager();
if (distributedCounterManager != null && distributedCounterManager.isEnable()) {
//distributedCounterManager.setCounter(id, value);
distributedCounterManager.setCounterWithExpiry(id, value, expiryTime);
} else {
counters.put(id, value);
}
}
/**
* Set the shared timestamp with the given id key with an expiry time
*
* @param id key id
* @param timestamp timestamp value to set
* @param expiryTime expiry time in milliseconds
*/
public static void setSharedTimestampWithExpiry(String id, long timestamp, long expiryTime) {
if (log.isDebugEnabled()) {
log.debug("Setting the shared timestamp of key " + id + " with value " + timestamp + " with an expiry "
+ "time of " + expiryTime);
}
String key = ThrottleConstants.THROTTLE_TIMESTAMP_KEY + id;
DistributedCounterManager distributedCounterManager =
ThrottleServiceDataHolder.getInstance().getDistributedCounterManager();
if (distributedCounterManager != null && distributedCounterManager.isEnable()) {
distributedCounterManager.setTimestampWithExpiry(key, timestamp, expiryTime);
} else {
timestamps.put(id, timestamp);
}
}
/**
* Add given value to the distribute counter of caller context of given id. If it's not
* distributed return local counter
*
* @param id of the caller context
* @param value to set to the global counter
*/
public static long addAndGetDistributedCounter(String id, long value) {
DistributedCounterManager distributedCounterManager =
ThrottleServiceDataHolder.getInstance().getDistributedCounterManager();
id = ThrottleConstants.THROTTLE_SHARED_COUNTER_KEY + id;
if (distributedCounterManager != null && distributedCounterManager.isEnable()) {
return distributedCounterManager.addAndGetCounter(id, value);
} else {
long currentCount = counters.get(id);
long updatedCount = currentCount + value;
counters.put(id, updatedCount);
return updatedCount;
}
}
/**
* Asynchronously add given value to the distribute counter of caller context of given id. If it's not
* distributed return local counter. This will return global value before add the provided counter
*
* @param id of the caller context
* @param value to set to the global counter
*/
public static long asyncGetAndAddDistributedCounter(String id, long value) {
if(log.isDebugEnabled()) {
log.debug("ASYNC CREATING AND SETTING COUNTER WITH ID " + id);
}
id = ThrottleConstants.THROTTLE_SHARED_COUNTER_KEY + id;
DistributedCounterManager distributedCounterManager =
ThrottleServiceDataHolder.getInstance().getDistributedCounterManager();
if (distributedCounterManager != null && distributedCounterManager.isEnable()) {
return distributedCounterManager.asyncGetAndAddCounter(id, value);
} else {
Long currentCount = counters.get(id);
if(currentCount == null) {
currentCount = 0L;
}
long updatedCount = currentCount + value;
counters.put(id, updatedCount);
return currentCount;
}
}
/**
* Asynchronously add given value to the distribute counter of caller context of given id. If it's not
* distributed return local counter. This will return global value before add the provided counter
*
* @param id of the caller context
* @param value to set to the global counter
*/
public static long asyncGetAndAlterDistributedCounter(String id, long value) {
id = ThrottleConstants.THROTTLE_SHARED_COUNTER_KEY + id;
DistributedCounterManager distributedCounterManager =
ThrottleServiceDataHolder.getInstance().getDistributedCounterManager();
if (distributedCounterManager != null && distributedCounterManager.isEnable()) {
return distributedCounterManager.asyncGetAndAlterCounter(id,value);
} else {
Long currentCount = counters.get(id);
if(currentCount == null) {
currentCount = 0L;
}
long updatedCount = currentCount + value;
counters.put(id, updatedCount);
return currentCount;
}
}
/**
* Destroy hazelcast global counter, if it's local then remove the map entry
*
* @param id of the caller context
*/
public static void removeCounter(String id) {
if(log.isDebugEnabled()) {
log.debug("REMOVING COUNTER WITH ID " + id);
}
id = ThrottleConstants.THROTTLE_SHARED_COUNTER_KEY + id;
DistributedCounterManager distributedCounterManager =
ThrottleServiceDataHolder.getInstance().getDistributedCounterManager();
if (distributedCounterManager != null && distributedCounterManager.isEnable()) {
distributedCounterManager.removeCounter(id);
} else {
counters.remove(id);
}
}
/**
* Return hazelcast shared timestamp for this caller context with given id. If it's not distributed will get from the
* local counter
*
* @param id of the shared counter
* @return shared hazelcast current shared counter
*/
public static long getSharedTimestamp(String id) {
if(log.isDebugEnabled()) {
log.debug("GET TIMESTAMP WITH ID " + id);
}
String key = ThrottleConstants.THROTTLE_TIMESTAMP_KEY + id;
DistributedCounterManager distributedCounterManager =
ThrottleServiceDataHolder.getInstance().getDistributedCounterManager();
if (distributedCounterManager != null && distributedCounterManager.isEnable()) {
return distributedCounterManager.getTimestamp(key);
} else {
Long timestamp = timestamps.get(key);
if(timestamp != null) {
return timestamp;
} else {
timestamps.put(key, 0L);
return 0;
}
}
}
/**
* Set distribute timestamp of caller context of given id to the provided value. If it's not distributed do the same for
* local counter
*
* @param id of the caller context
* @param timestamp to set to the global counter
*/
public static void setSharedTimestamp(String id, long timestamp) {
if(log.isDebugEnabled()) {
log.debug("SETTING TIMESTAMP WITH ID" + id);
}
String key = ThrottleConstants.THROTTLE_TIMESTAMP_KEY + id;
DistributedCounterManager distributedCounterManager =
ThrottleServiceDataHolder.getInstance().getDistributedCounterManager();
if (distributedCounterManager != null && distributedCounterManager.isEnable()) {
distributedCounterManager.setTimestamp(key, timestamp);
} else {
timestamps.put(id, timestamp);
}
}
/**
* Destroy hazelcast shared timggestamp counter, if it's local then remove the map entry
*
* @param id of the caller context
*/
public static void removeTimestamp(String id) {
if(log.isDebugEnabled()) {
log.debug("REMOVING TIMESTAMP WITH ID " + id);
}
String key = ThrottleConstants.THROTTLE_TIMESTAMP_KEY + id;
DistributedCounterManager distributedCounterManager =
ThrottleServiceDataHolder.getInstance().getDistributedCounterManager();
if (distributedCounterManager != null && distributedCounterManager.isEnable()) {
distributedCounterManager.removeTimestamp(key);
} else {
timestamps.remove(key);
}
}
public static void setExpiryTime(String id, long expiryTimeStamp) {
if(log.isDebugEnabled()) {
log.debug("SETTING Expiry WITH ID " + id);
}
String sharedCounterKey = ThrottleConstants.THROTTLE_SHARED_COUNTER_KEY + id;
String sharedTimeStampKey = ThrottleConstants.THROTTLE_TIMESTAMP_KEY + id;
DistributedCounterManager distributedCounterManager =
ThrottleServiceDataHolder.getInstance().getDistributedCounterManager();
if (distributedCounterManager != null && distributedCounterManager.isEnable()) {
if (log.isTraceEnabled()) {
log.trace("Setting expiry time for key:" + sharedCounterKey + " value: " + expiryTimeStamp);
}
distributedCounterManager.setExpiry(sharedCounterKey, expiryTimeStamp);
if (log.isTraceEnabled()) {
log.trace("Setting expiry time for key:" + sharedTimeStampKey + " value: " + expiryTimeStamp);
}
distributedCounterManager.setExpiry(sharedTimeStampKey, expiryTimeStamp);
}
}
/**
* Get the time-to-live value for the given key
*
* @param key name key of the key
* @return time-to-live value
*/
public static long getTtl(String key) {
long ttl = 0;
DistributedCounterManager distributedCounterManager =
ThrottleServiceDataHolder.getInstance().getDistributedCounterManager();
if (distributedCounterManager != null && distributedCounterManager.isEnable()) {
ttl = distributedCounterManager.getTtl(key);
}
return ttl;
}
/**
* Acquire lock for the given callerContext (with the given value), so that another process cannot acquire the same
* lock
*
* @return true if lock acquired, false if lock is not acquired within the configured timeout period
*/
public static boolean lockSharedKeys(String callerContextId, String lockValue) {
DistributedCounterManager distributedCounterManager =
ThrottleServiceDataHolder.getInstance().getDistributedCounterManager();
if (distributedCounterManager != null && distributedCounterManager.isEnable()) {
boolean lockAcquired;
// key of the lock tried to acquire. i.e. "lock-/pizzashack/1.0.0:1.0.0:PRODUCTION"
String lockKey = ThrottleConstants.THROTTLE_LOCK_KEY_PREFIX + callerContextId;
long startTime = System.currentTimeMillis();
do {
lockAcquired = distributedCounterManager.setLockWithExpiry(lockKey, lockValue, System.currentTimeMillis() +
distributedCounterManager.getKeyLockRetrievalTimeout() * 2);
if (lockAcquired) {
// lock acquired
if (log.isTraceEnabled()) {
long timeNow = System.currentTimeMillis();
log.trace(
"current time:" + timeNow + "Lock acquired for key: " + lockKey + " within " + (timeNow
- startTime) + " ms");
}
return true;
} else {
long time = System.currentTimeMillis();
long timeElapsed = time - startTime;
if (timeElapsed > distributedCounterManager.getKeyLockRetrievalTimeout()) {
log.warn("current time:" + time + " Unable to" + " acquire lock for key: " + lockKey
+ " within the configured " + "timeout period. Elapsed time: " + timeElapsed + " ms");
return false;
}
try {
Thread.sleep(5);
if (log.isTraceEnabled()) {
log.trace("current time:" + time + "Retrying to get lock for key: " + lockKey);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
} while (true);
}
return true;
}
/**
* Release the lock of the given callerContext
*/
public static void releaseSharedKeys(String callerContextId) {
DistributedCounterManager distributedCounterManager = ThrottleServiceDataHolder.getInstance()
.getDistributedCounterManager();
if (distributedCounterManager != null && distributedCounterManager.isEnable()) {
distributedCounterManager.removeLock(callerContextId);
if (log.isTraceEnabled()) {
log.trace("Current time:" + System.currentTimeMillis() + "Lock released for key: " + callerContextId);
}
}
}
}