/**
*分布式锁工厂类
*/
public class RedisLockUtil {
private static final Logger logger = Logger.getLogger(RedisLockUtil.class);
private static Object schemeLock = new Object();
private static Map instances = new ConcurrentHashMap();
public static RedisLockUtil getInstance(String schema){
RedisLockUtil u = instances.get(schema);
if(u==null){
synchronized(schemeLock){
u = instances.get(schema);
if(u == null){
LockObserver lo = new LockObserver(schema);
u = new RedisLockUtil(schema,lo);
instances.put(schema, u);
}
}
}
return u;
}
private Object mutexLock = new Object();
private Map mutexLockMap = new ConcurrentHashMap();
private Map cache = new ConcurrentHashMap();
private DelayQueue dq = new DelayQueue();
private AbstractLockObserver lo;
public RedisLockUtil(String schema, AbstractLockObserver lo){
Thread th = new Thread(lo);
th.setDaemon(false);
th.setName("Lock Observer:" schema);
th.start();
clearUselessLocks(schema);
this.lo = lo;
}
public void clearUselessLocks(String schema){
Thread th = new Thread(new Runnable(){
@Override
public void run() {
while(!SystemExitListener.isOver()){
try {
RedisReentrantLock t = dq.take();
if(t.clear()){
String key = t.getKey();
synchronized(getMutex(key)){
cache.remove(key);
}
}
t.resetCleartime();
} catch (InterruptedException e) {
}
}
}
});
th.setDaemon(true);
th.setName("Lock cleaner:" schema);
th.start();
}
private Object getMutex(String key){
Object mx = mutexLockMap.get(key);
if(mx == null){
synchronized(mutexLock){
mx = mutexLockMap.get(key);
if(mx==null){
mx = new Object();
mutexLockMap.put(key,mx);
}
}
}
return mx;
}
private RedisReentrantLock getLock(String key,boolean addref){
RedisReentrantLock lock = cache.get(key);
if(lock == null){
synchronized(getMutex(key)){
lock = cache.get(key);
if(lock == null){
lock = new RedisReentrantLock(key,lo);
cache.put(key, lock);
}
}
}
if(addref){
if(!lock.incRef()){
synchronized(getMutex(key)){
lock = cache.get(key);
if(!lock.incRef()){
lock = new RedisReentrantLock(key,lo);
cache.put(key, lock);
}
}
}
}
return lock;
}
public void reset(){
for(String s : cache.keySet()){
getLock(s,false).unlock();
}
}
/**
* 尝试加锁
* 如果当前线程已经拥有该锁的话,直接返回,表示不用再次加锁,此时不应该再调用unlock进行解锁
*
* @param key
* @return
* @throws Exception
* @throws InterruptedException
* @throws KeeperException
*/
public LockStat lock(String key) {
return lock(key,-1);
}
public LockStat lock(String key,int timeout) {
RedisReentrantLock ll = getLock(key,true);
ll.incRef();
try{
if(ll.isOwner(false)){
ll.descrRef();
return LockStat.NONEED;
}
if(ll.lock(timeout)){
return LockStat.SUCCESS;
}else{
ll.descrRef();
if(ll.setCleartime()){
dq.put(ll);
}
return null;
}
}catch(LockNotExistsException e){
ll.descrRef();
return lock(key,timeout);
}catch(RuntimeException e){
ll.descrRef();
throw e;
}
}
public void unlock(String key,LockStat stat) {
unlock(key,stat,false);
}
public void unlock(String key,LockStat stat,boolean keepalive){
if(stat == null) return;
if(LockStat.SUCCESS.equals(stat)){
RedisReentrantLock lock = getLock(key,false);
boolean candestroy = lock.unlock();
if(candestroy && !keepalive){
if(lock.setCleartime()){
dq.put(lock);
}
}
}
}
public static enum LockStat{
NONEED,
SUCCESS
}
}
/**
*分布式锁本地代理类
*/
public class RedisReentrantLock implements Delayed{
private static final Logger logger = Logger.getLogger(RedisReentrantLock.class);
private ReentrantLock reentrantLock = new ReentrantLock();
private RedisLock redisLock;
private long timeout = 3*60;
private CountDownLatch lockcount = new CountDownLatch(1);
private String key;
private AbstractLockObserver observer;
private int ref = 0;
private Object refLock = new Object();
private boolean destroyed = false;
private long cleartime = -1;
public RedisReentrantLock(String key,AbstractLockObserver observer) {
this.key = key;
this.observer = observer;
initWriteLock();
}
public boolean isDestroyed() {
return destroyed;
}
private synchronized void initWriteLock(){
redisLock = new RedisLock(key,new LockListener(){
@Override
public void lockAcquired() {
lockcount.countDown();
}
@Override
public long getExpire() {
return 0;
}
@Override
public void lockError() {
/*synchronized(mutex){
mutex.notify();
}*/
lockcount.countDown();
}
},observer);
}
public boolean incRef(){
synchronized(refLock){
if(destroyed) return false;
ref ;
}
return true;
}
public void descrRef(){
synchronized(refLock){
ref --;
}
}
public boolean clear() {
if(destroyed) return true;
synchronized(refLock){
if(ref > 0){
return false;
}
destroyed = true;
redisLock.clear();
redisLock = null;
return true;
}
}
public boolean lock(long timeout) throws LockNotExistsException{
if(timeout nbso online casino reviews public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public boolean isOwner(boolean check) {
synchronized(refLock){
if(redisLock == null) {
logger.error("reidsLock is null:key=" key);
return false;
}
boolean a = reentrantLock.isHeldByCurrentThread();
boolean b = redisLock.isOwner();
if(check){
if(!a || !b){
logger.error(key ";a:" a ";b:" b);
}
}
return a && b;
}
}
public boolean setCleartime() {
synchronized(this){
if(cleartime>0) return false;
this.cleartime = System.currentTimeMillis() 10*1000;
return true;
}
}
public void resetCleartime(){
synchronized(this){
this.cleartime = -1;
}
}
@Override
public int compareTo(Delayed object) {
if(object instanceof RedisReentrantLock){
RedisReentrantLock t = (RedisReentrantLock)object;
long l = this.cleartime - t.cleartime;
if(l > 0) return 1 ; //比当前的小则返回1,比当前的大则返回-1,否则为0
else if(l
/**
*使用Redis实现的分布式锁
*基本工作原理如下:
*1. 使用setnx(key,时间戮 超时),如果设置成功,则直接拿到锁
*2. 如果设置不成功,获取key的值v1(它的到期时间戮),跟当前时间对比,看是否已经超时
*3. 如果超时(说明拿到锁的结点已经挂掉),v2=getset(key,时间戮 超时 1),判断v2是否等于v1,如果相等,加锁成功,否则加锁失败,等过段时间再重试(200MS)
*/
public class RedisLock implements LockListener{
private String key;
private boolean owner = false;
private AbstractLockObserver observer = null;
private LockListener lockListener = null;
private boolean waiting = false;
private long expire;//锁超时时间,以秒为单位
private boolean expired = false;
public RedisLock(String key, LockListener lockListener, AbstractLockObserver observer) {
this.key = key;
this.lockListener = lockListener;
this.observer = observer;
}
public boolean trylock(long expire) {
synchronized(this){
if(owner){
return true;
}
this.expire = expire;
this.expired = false;
if(!waiting){
owner = observer.tryLock(key,expire);
if(!owner){
waiting = true;
observer.addLockListener(key, this);
}
}
return owner;
}
}
public boolean isOwner() {
return owner;
}
public void unlock() {
synchronized(this){
observer.unLock(key);
owner = false;
}
}
public void clear() {
synchronized(this){
if(waiting) {
observer.removeLockListener(key);
waiting = false;
}
}
}
public boolean doExpire(){
synchronized(this){
if(owner) return true;
if(expired) return false;
expired = true;
clear();
}
return false;
}
@Override
public void lockAcquired() {
synchronized(this){
if(expired){
unlock();
return;
}
owner = true;
waiting = false;
}
lockListener.lockAcquired();
}
@Override
public long getExpire() {
return this.expire;
}
@Override
public void lockError() {
synchronized(this){
owner = false;
waiting = false;
lockListener.lockError();
}
}
}
500线程 tps = 35000
[root@DB1 benchtest-util]# target/benchtest/bin/TestFastRedis /data/config/util/config_0_11.properties lock 500 500000
线程总时间:6553466;平均:13.106932
实际总时间:13609; 平均:0.027218
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
机房租用,北京机房租用,IDC机房托管, http://www.e1idc.net