curator简介
Netflix curator 是Netflix公司开源的一个Zookeeper client library,用于简化zookeeper客户端编程,包含一下几个模块:
- curator-client – zookeeper client封装,用于取代原生的zookeeper客户端,提供一些非常有用的客户端特性
- curator-framework – zookeeper api的高层封装,大大简化zookeeper客户端编程,添加了例如zookeeper连接管理、重试机制等
- curator-recipes – zookeeper recipes 基于curator-framework的实现(除2PC以外)
maven dependency:
1.
2. com.netflix.curator
3. curator-recipes
4. 0.6.4
5.
注意:在www.mvnrepository.com中认为0.32为最新版本,其实迄今为止最新版本为0.64,github trunk中的版本现在是0.65-SNAPSHOT
curator framework 使用
示例代码:
1. String path = "/test_path";
2. CuratorFramework client = CuratorFrameworkFactory.builder()
3. "test:2181").namespace("/test1")
4. new RetryNTimes(Integer.MAX_VALUE, 1000))
5. 5000).build();
6. //create a node
7. client.create().forPath("/head", new byte[0]);
8.
9. //delete a node in background
10. client.delete().inBackground().forPath("/head");
11.
12. // create a EPHEMERAL_SEQUENTIAL
13. client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/head/child", new byte[0]);
14.
15. // get the data
16. client.getData().watched().inBackground().forPath("/test");
17.
18. // check the path exits
19. client.checkExists().forPath(path);
curator framework使用builder模式和类似nio的chain api,代码非常简洁
curator recipes 使用
InterProcessMutex
用途:进程间互斥锁
示例代码:
1. String lockName = "/lock1";
2. InterProcessLock lock1 = new InterProcessMutex(this.curator, lockName);
3. InterProcessLock lock2 = new InterProcessMutex(this.curator, lockName);
4. lock1.acquire();
5. boolean result = lock2.acquire(1, TimeUnit.SECONDS);
6. assertFalse(result);
7. lock1.release();
8. result = lock2.acquire(1, TimeUnit.SECONDS);
9. assertTrue(result);
原理:每次调用acquire在/lock1节点节点下使用CreateMode.EPHEMERAL_SEQUENTIAL 创建新的ephemeral节点,然后getChildren获取所有的children,判断刚刚创建的临时节点是否为第一个,如果是,则获取锁成功;如果不是,则删除刚刚创建的临时节点。
注意: 每次accquire操作,成功,则请求zk server 2次(一次写,一次getChildren);如果失败,则请求zk server 3次(一次写,一次getChildren,一次delete)
InterProcessReadWriteLock
示例代码:
1. @Test
2. public void testReadWriteLock() throws Exception{
3. "/RWLock";
4. new InterProcessReadWriteLock(this.curator, readWriteLockPath);
5. InterProcessMutex writeLock1 = readWriteLock1.writeLock();
6. InterProcessMutex readLock1 = readWriteLock1.readLock();
7.
8. new InterProcessReadWriteLock(this.curator, readWriteLockPath);
9. InterProcessMutex writeLock2 = readWriteLock2.writeLock();
10. InterProcessMutex readLock2 = readWriteLock2.readLock();
11. writeLock1.acquire();
12.
13. // same with WriteLock, can read
14. 1, TimeUnit.SECONDS));
15.
16. // different lock, can't read while writting
17. 1, TimeUnit.SECONDS));
18.
19. // different write lock, can't write
20. 1, TimeUnit.SECONDS));
21.
22. // release the write lock
23. writeLock1.release();
24.
25. //both read lock can read
26. 1, TimeUnit.SECONDS));
27. 1, TimeUnit.SECONDS));
28. }
原理: 同InterProcessMutext,在ephemeral node的排序算法上做trick,write lock的排序在前。
注意: 同一个InterProcessReadWriteLock如果已经获取了write lock,则获取read lock也会成功
LeaderSelector
示例代码:
1. @Test
2. public void testLeader() throws Exception{
3. new LeaderSelectorListener(){
4.
5.
6. @Override
7. public void takeLeadership(CuratorFramework client)
8. throws Exception {
9. "i'm leader");
10. }
11.
12. @Override
13. public void handleException(CuratorFramework client,
14. Exception exception) {
15.
16. }
17.
18. @Override
19. public void notifyClientClosing(CuratorFramework client) {
20.
21. }};
22. "/leader";
23. new LeaderSelector(this.curator, leaderPath, listener);
24. selector1.start();
25. new LeaderSelector(this.curator, leaderPath, listener);
26. selector2.start();
27. assertFalse(selector2.hasLeadership());
28. }
原理:内部基于InterProcessMutex实现,具体细节参见shared lock一节
总结
curator还提供了很多其他的实现,具体参见https://github.com/Netflix/curator/wiki/Recipes
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
机房租用,北京机房租用,IDC机房托管, http://www.e1idc.net