Zookeeper的介绍,helloword,Curator,分布式锁,集群
ZooKeeper-基础
官网:
版本:ZooKeeper3.6
支持JDK8、11、12

介绍
Hadoop的一个子项目,用于管理服务。
- 服务注册中心
- 服务配置
- 分布式锁
安装
Zookeeper-安装
zookeeper命令行操作
数据模型
zookeeper的结构类似Linux树形目录结构。
由一个个节点+节点之间的联系构成。
zookeeper的节点可以有子节点,并且可以存储少量信息(1M,如:地址信息,172.168.0.1)

节点可以分为四大类:
- PERSISTENT 持久化节点
- EPHEMERAL 临时节点 :-e
- PERSISTENT_SEQUENTIAL 持久化顺序节点 :-s
- EPHEMERAL_SEQUENTIAL 临时顺序节点 :-es
server端
1 | cd /opt/zookeeper/apache-zookeeper-3.5.6-bin/bin/ |
启动
1
./zkServer.sh start
1
2
3ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/apache-zookeeper-3.5.6-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED停止
1
./zkServer.sh stop
1
2
3ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/apache-zookeeper-3.5.6-bin/bin/../conf/zoo.cfg
Stopping zookeeper ... STOPPED重启
1
./zkServer.sh restart
1
2
3
4
5
6
7
8ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/apache-zookeeper-3.5.6-bin/bin/../conf/zoo.cfg
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/apache-zookeeper-3.5.6-bin/bin/../conf/zoo.cfg
Stopping zookeeper ... STOPPED
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/apache-zookeeper-3.5.6-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED查看状态
1
./zkServer.sh status
1
2
3
4
5#未启动
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/apache-zookeeper-3.5.6-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Error contacting service. It is probably not running.
client端
连接
克隆会话
进入zookeeper的bin目录
连接
1
2
3
4#./zkCli.sh –server ip:port
./zkCli.sh –server localhost:2181
#或者
./zkCli.sh
退出
1 | quit |
节点操作
查看
1
2ls /
#结果:[zookeeper]查看详细信息
1
2
3
4
5
6
7
8
9
10
11
12[zk: localhost:2181(CONNECTED) 4] ls -s /
[zookeeper]cZxid = 0x0
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x0
mtime = Thu Jan 01 08:00:00 CST 1970
pZxid = 0x0
cversion = -1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1解释
1
2
3
4
5
6
7
8
9
10
11czxid:节点被创建的事务ID
ctime: 创建时间
mzxid: 最后一次被更新的事务ID
mtime: 修改时间
pzxid:子节点列表最后一次被更新的事务ID
cversion:子节点的版本号
dataversion:数据版本号
aclversion:权限版本号
ephemeralOwner:用于临时节点,代表临时节点的事务ID,如果为持久节点则为0
dataLength:节点存储的数据的长度
numChildren:当前节点的子节点个数创建持久节点 默认
1
2
3
4[zk: localhost:2181(CONNECTED) 6] create /xiaorui
Created /xiaorui
[zk: localhost:2181(CONNECTED) 7] ls /
[xiaorui, zookeeper]获取节点
1
2[zk: localhost:2181(CONNECTED) 0] get /xiaorui
null设置节点值
1
2#set path value
set /xiaorui han删除单个节点
1
delete /节点path
删除带有子节点的节点
1
deleteall /节点path
临时节点 -e 会话
1
create -e /节点path value
顺序节点 -s
1
create -s /节点path value
zookeeper JavaAPI操作
curator
简化原生JavaAPI对zookeeper的操作。
Apache的顶级项目
版本:与zookeeper版本对应
创建项目
导入curator+junit
1 | <?xml version="1.0" encoding="UTF-8"?> |
操作curator
建立连接
1 | private CuratorFramework client; |
创建节点
1 | /** |

查询节点
1 | /** |
修改节点
1 | /** |
删除节点
1 | /** |
Watch监听
ZooKeeper 允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,ZooKeeper 服务端会将事件通知到感兴趣的客户端上去,该机制是 ZooKeeper 实现分布式协调服务的重要特性。
Curator引入了 Cache 来实现对 ZooKeeper 服务端事件的监听。
ZooKeeper提供了三种Watcher:
- NodeCache : 只是监听某一个特定的节点
- PathChildrenCache : 监控一个ZNode的子节点.
- TreeCache : 可以监控整个树上的所有节点,类似于PathChildrenCache和NodeCache的组合
implementation
Watch监听-NodeCache
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22/**
* 演示 NodeCache:给指定一个节点注册监听器
*/
@Test
public void testNodeCache() throws Exception {
//1. 创建NodeCache对象
final NodeCache nodeCache = new NodeCache(client,"/app1");
//2. 注册监听
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("节点变了~");
//获取修改节点后的数据
byte[] data = nodeCache.getCurrentData().getData();
System.out.println(new String(data));
}
});
//3. 开启监听.如果设置为true,则开启监听是,加载缓冲数据
nodeCache.start(true);
while (true){
}
}Watch监听-PathChildrenCache
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26@Test
public void testPathChildrenCache() throws Exception {
//1.创建监听对象
PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/app2",true);
//2. 绑定监听器
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
System.out.println("子节点变化了~");
System.out.println(event);
//监听子节点的数据变更,并且拿到变更后的数据
//1.获取类型
PathChildrenCacheEvent.Type type = event.getType();
//2.判断类型是否是update
if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
System.out.println("数据变了");
byte[] data = event.getData().getData();
System.out.println(new String(data));
}
}
});
//3. 开启
pathChildrenCache.start();
while (true){
}
}Watch监听-TreeCache
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20/**
* 演示 TreeCache:监听某个节点自己和所有子节点们
*/
@Test
public void testTreeCache() throws Exception {
//1. 创建监听器
TreeCache treeCache = new TreeCache(client,"/app2");
//2. 注册监听
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
System.out.println("节点变了");
System.out.println(event);
}
});
//3. 开启
treeCache.start();
while (true){
}
}
zookeeper分布式锁
介绍:后端项目使用分布式集群时,这时是多jvm环境,锁无法解决多机同步问题。出现了分布式锁。
zookeeper分布式锁原理:客户端要获取锁,1.在请求下创建临时顺序子节点,2.若节点最小,则获得了锁,执行业务3.使用完锁,则删除该节点,释放锁。
原理详解:
1.客户端在lock节点下创建临时顺序节点。
2.获取lock下面的所有子节点,如果发现自己创建的子节点序号最小,那么就认为该客户端获取到了锁,执行业务代码。使用完锁后,将该节点删除。
3.如果自己创建的节点并非lock所有子节点中最小的,找到比自己小的那个节点,同时对其注册事件监听器,监听删除事件。
4.如果发现比自己小的那个节点被删除,则客户端的Watcher会收到相应通知,此时再次判断自己创建的节点是否是lock子节点中序号最小的,如果是则获取到了锁,如果不是则重复以上步骤(监听比自己小一个节点的删除事件,判断自己是否是最小节点)。
Curator实现分布式锁:
五种锁方案:
InterProcessSemaphoreMutex:分布式排它锁(非可重入锁)
InterProcessMutex:分布式可重入排它锁
InterProcessReadWriteLock:分布式读写锁
InterProcessMultiLock:将多个锁作为单个实体管理的容器
InterProcessSemaphoreV2:共享信号量
implementation:
InterProcessMutex 方式
1.创建线程,加锁设置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28public class Ticket12306 implements Runnable{
private int tickets = 20; //数据库的票数
private InterProcessMutex lock ;
@Override
public void run() {
while(true){
//获取锁
try {
lock.acquire(2, TimeUnit.SECONDS);
if(tickets > 0){
System.out.println(Thread.currentThread()+":"+tickets);
Thread.sleep(100);
tickets--;
}
} catch (Exception e) {
e.printStackTrace();
}finally {
//释放锁
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}2.创建连接,初始化锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15public Ticket12306(){
//重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
//2.第二种方式
//CuratorFrameworkFactory.builder();
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.3.3:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retryPolicy)
.build();
//开启连接
client.start();
lock = new InterProcessMutex(client,"/lock");
}3.多个线程
1
2
3
4
5
6
7
8
9
10public class LockTest {
public static void main(String[] args) {
Ticket12306 ticket12306 = new Ticket12306();
//创建客户端
Thread t1 = new Thread(ticket12306,"骆驼旅行");
Thread t2 = new Thread(ticket12306,"分流抢票");
t1.start();
t2.start();
}
}
集群
集群角色
- leader 领导
- 处理事务请求
- 集群内部各服务器的调度
- follower 跟随者
- 处理客户端非事务请求,转发事务给leader服务器
- 参与leader选举投票
- observer 观察者
- 处理客户端非事务请求,转发事务给leader服务器
领导算法
介绍:服务器投票选举主服务器,名称为领导(leader)。
逻辑:
服务器id越大,权重越大。
半数投票前,每加入一个服务器都重新投票和选举。
某个服务器得到半数投票后,角色变为leader。集群变为可用状态。
有leader角色后,新服务器加入集群,不会重新投票和选举。
示例:
服务器id=1,2,3三个服务器,1,2,3依次启动。1启动,给自己投一票;2启动,1重新投票,投给2一票,2投给自己一票,2一共的了2票,超过半票,2的角色变为leader,1的角色为follower,集群启动;3启动,已经有leader角色,不投票,3的角色为follower.