0%

ZooKeeper-入门

Zookeeper的介绍,helloword,Curator,分布式锁,集群

ZooKeeper-基础

官网:

版本:ZooKeeper3.6

支持JDK8、11、12

image-20200825064140355

介绍

Hadoop的一个子项目,用于管理服务。

  • 服务注册中心
  • 服务配置
  • 分布式锁

安装

Zookeeper-安装

zookeeper命令行操作

数据模型

zookeeper的结构类似Linux树形目录结构。

由一个个节点+节点之间的联系构成。

zookeeper的节点可以有子节点,并且可以存储少量信息(1M,如:地址信息,172.168.0.1)

image-20200903180546203

节点可以分为四大类:

  • PERSISTENT 持久化节点
  • EPHEMERAL 临时节点 :-e
  • PERSISTENT_SEQUENTIAL 持久化顺序节点 :-s
  • EPHEMERAL_SEQUENTIAL 临时顺序节点 :-es

server端

1
cd /opt/zookeeper/apache-zookeeper-3.5.6-bin/bin/
  • 启动

    1
    ./zkServer.sh start
    1
    2
    3
    ZooKeeper JMX enabled by default
    Using config: /opt/zookeeper/apache-zookeeper-3.5.6-bin/bin/../conf/zoo.cfg
    Starting zookeeper ... STARTED
  • 停止

    1
    ./zkServer.sh stop
    1
    2
    3
    ZooKeeper JMX enabled by default
    Using config: /opt/zookeeper/apache-zookeeper-3.5.6-bin/bin/../conf/zoo.cfg
    Stopping zookeeper ... STOPPED
  • 重启

    1
    ./zkServer.sh restart
    1
    2
    3
    4
    5
    6
    7
    8
    ZooKeeper JMX enabled by default
    Using config: /opt/zookeeper/apache-zookeeper-3.5.6-bin/bin/../conf/zoo.cfg
    ZooKeeper JMX enabled by default
    Using config: /opt/zookeeper/apache-zookeeper-3.5.6-bin/bin/../conf/zoo.cfg
    Stopping zookeeper ... STOPPED
    ZooKeeper JMX enabled by default
    Using config: /opt/zookeeper/apache-zookeeper-3.5.6-bin/bin/../conf/zoo.cfg
    Starting zookeeper ... STARTED
  • 查看状态

    1
    ./zkServer.sh status
    1
    2
    3
    4
    5
    #未启动
    ZooKeeper JMX enabled by default
    Using config: /opt/zookeeper/apache-zookeeper-3.5.6-bin/bin/../conf/zoo.cfg
    Client port found: 2181. Client address: localhost.
    Error contacting service. It is probably not running.

client端

连接

  • 克隆会话

  • 进入zookeeper的bin目录

  • 连接

    1
    2
    3
    4
    #./zkCli.sh –server ip:port
    ./zkCli.sh –server localhost:2181
    #或者
    ./zkCli.sh

    image-20200913123531063

退出

1
quit

节点操作

  • 查看

    1
    2
    ls /
    #结果:[zookeeper]
  • 查看详细信息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    [zk: localhost:2181(CONNECTED) 4] ls -s /
    [zookeeper]cZxid = 0x0
    ctime = Thu Jan 01 08:00:00 CST 1970
    mZxid = 0x0
    mtime = Thu Jan 01 08:00:00 CST 1970
    pZxid = 0x0
    cversion = -1
    dataVersion = 0
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 0
    numChildren = 1

    解释

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    czxid:节点被创建的事务ID 
    ctime: 创建时间
    mzxid: 最后一次被更新的事务ID
    mtime: 修改时间
    pzxid:子节点列表最后一次被更新的事务ID
    cversion:子节点的版本号
    dataversion:数据版本号
    aclversion:权限版本号
    ephemeralOwner:用于临时节点,代表临时节点的事务ID,如果为持久节点则为0
    dataLength:节点存储的数据的长度
    numChildren:当前节点的子节点个数
  • 创建持久节点 默认

    1
    2
    3
    4
    [zk: localhost:2181(CONNECTED) 6] create /xiaorui
    Created /xiaorui
    [zk: localhost:2181(CONNECTED) 7] ls /
    [xiaorui, zookeeper]
  • 获取节点

    1
    2
    [zk: localhost:2181(CONNECTED) 0] get /xiaorui
    null
  • 设置节点值

    1
    2
    #set path value
    set /xiaorui han
  • 删除单个节点

    1
    delete /节点path
  • 删除带有子节点的节点

    1
    deleteall /节点path
  • 临时节点 -e 会话

    1
    create -e /节点path value
  • 顺序节点 -s

    1
    create -s /节点path value

zookeeper JavaAPI操作

curator

官网:http://curator.apache.org/

简化原生JavaAPI对zookeeper的操作。

Apache的顶级项目

版本:与zookeeper版本对应

创建项目

导入curator+junit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.xiaoruiit</groupId>
<artifactId>zookeeper-curator</artifactId>
<version>1.0-SNAPSHOT</version>

<dependencies>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>

<!--curator-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>

<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
<!--日志-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
</dependency>

</dependencies>


<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>

</project>

操作curator

建立连接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
private CuratorFramework client;

/**
* 建立连接
*/
@Before
public void testConnect() {

/*
*
* @param connectString 连接字符串。zk server 地址和端口 "192.168.3.3:2181"
* @param sessionTimeoutMs 会话超时时间 单位ms
* @param connectionTimeoutMs 连接超时时间 单位ms
* @param retryPolicy 重试策略
*/
/* //重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,10);
//1.第一种方式
CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.3.3:2181",
60 * 1000, 15 * 1000, retryPolicy);*/
//重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
//2.第二种方式
//CuratorFrameworkFactory.builder();
client = CuratorFrameworkFactory.builder()
.connectString("192.168.3.3:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retryPolicy)
.namespace("xiaoruiit")
.build();

//开启连接
client.start();

}

@After
public void close() {
if (client != null) {
client.close();
}
}
创建节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
* 创建节点:create 持久 临时 顺序 数据
* 1. 基本创建 :create().forPath("")
* 2. 创建节点 带有数据:create().forPath("",data)
* 3. 设置节点的类型:create().withMode().forPath("",data)
* 4. 创建多级节点 /app1/p1 :create().creatingParentsIfNeeded().forPath("",data)
*/
@Test
public void testCreate() throws Exception {
//2. 创建节点 带有数据
//如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储
String path = client.create().forPath("/app2", "haha".getBytes());
System.out.println(path);

}

@Test
public void testCreate2() throws Exception {
//1. 基本创建
//如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储
String path = client.create().forPath("/app1");
System.out.println(path);

}

@Test
public void testCreate3() throws Exception {
//3. 设置节点的类型
//默认类型:持久化
String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");
System.out.println(path);


}

@Test
public void testCreate4() throws Exception {
//4. 创建多级节点 /app1/p1
//creatingParentsIfNeeded():如果父节点不存在,则创建父节点
String path = client.create().creatingParentsIfNeeded().forPath("/app4/p1");
System.out.println(path);
}

image-20200913185407017

查询节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* 查询节点:
* 1. 查询数据:get: getData().forPath()
* 2. 查询子节点: ls: getChildren().forPath()
* 3. 查询节点状态信息:ls -s:getData().storingStatIn(状态对象).forPath()
*/

@Test
public void testGet1() throws Exception {
//1. 查询数据:get
byte[] data = client.getData().forPath("/app1");
System.out.println(new String(data));
}

@Test
public void testGet2() throws Exception {
// 2. 查询子节点: ls
List<String> path = client.getChildren().forPath("/");

System.out.println(path);
}

@Test
public void testGet3() throws Exception {


Stat status = new Stat();
System.out.println(status);
//3. 查询节点状态信息:ls -s
client.getData().storingStatIn(status).forPath("/app1");

System.out.println(status);

}
修改节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 修改数据
* 1. 基本修改数据:setData().forPath()
* 2. 根据版本修改: setData().withVersion().forPath()
* * version 是查询出来的。目的是为了让其他客户端或者线程不干扰我。
*
* @throws Exception
*/
@Test
public void testSet() throws Exception {
client.setData().forPath("/app1", "itcast".getBytes());
}


@Test
public void testSetForVersion() throws Exception {

Stat status = new Stat();
//3. 查询节点状态信息:ls -s
client.getData().storingStatIn(status).forPath("/app1");


int version = status.getVersion();//查询出来的 3
System.out.println(version);
client.setData().withVersion(version).forPath("/app1", "haha".getBytes());
}
删除节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* 删除节点: delete deleteall
* 1. 删除单个节点:delete().forPath("/app1");
* 2. 删除带有子节点的节点:delete().deletingChildrenIfNeeded().forPath("/app1");
* 3. 必须成功的删除:为了防止网络抖动。本质就是重试。 client.delete().guaranteed().forPath("/app2");
* 4. 回调:inBackground
* @throws Exception
*/


@Test
public void testDelete() throws Exception {
// 1. 删除单个节点
client.delete().forPath("/app1");
}

@Test
public void testDelete2() throws Exception {
//2. 删除带有子节点的节点
client.delete().deletingChildrenIfNeeded().forPath("/app4");
}
@Test
public void testDelete3() throws Exception {
//3. 必须成功的删除
client.delete().guaranteed().forPath("/app2");
}
@Test
public void testDelete4() throws Exception {
//4. 回调
client.delete().guaranteed().inBackground(new BackgroundCallback(){

@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println("删除了~");
System.out.println(event);
}
}).forPath("/app1");
}
Watch监听

ZooKeeper 允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,ZooKeeper 服务端会将事件通知到感兴趣的客户端上去,该机制是 ZooKeeper 实现分布式协调服务的重要特性。

Curator引入了 Cache 来实现对 ZooKeeper 服务端事件的监听。

ZooKeeper提供了三种Watcher:

  • NodeCache : 只是监听某一个特定的节点
  • PathChildrenCache : 监控一个ZNode的子节点.
  • TreeCache : 可以监控整个树上的所有节点,类似于PathChildrenCache和NodeCache的组合

implementation

  • Watch监听-NodeCache

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    /**
    * 演示 NodeCache:给指定一个节点注册监听器
    */
    @Test
    public void testNodeCache() throws Exception {
    //1. 创建NodeCache对象
    final NodeCache nodeCache = new NodeCache(client,"/app1");
    //2. 注册监听
    nodeCache.getListenable().addListener(new NodeCacheListener() {
    @Override
    public void nodeChanged() throws Exception {
    System.out.println("节点变了~");
    //获取修改节点后的数据
    byte[] data = nodeCache.getCurrentData().getData();
    System.out.println(new String(data));
    }
    });
    //3. 开启监听.如果设置为true,则开启监听是,加载缓冲数据
    nodeCache.start(true);
    while (true){
    }
    }
  • Watch监听-PathChildrenCache

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    @Test
    public void testPathChildrenCache() throws Exception {
    //1.创建监听对象
    PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/app2",true);
    //2. 绑定监听器
    pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
    @Override
    public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
    System.out.println("子节点变化了~");
    System.out.println(event);
    //监听子节点的数据变更,并且拿到变更后的数据
    //1.获取类型
    PathChildrenCacheEvent.Type type = event.getType();
    //2.判断类型是否是update
    if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
    System.out.println("数据变了");
    byte[] data = event.getData().getData();
    System.out.println(new String(data));
    }
    }
    });
    //3. 开启
    pathChildrenCache.start();
    while (true){
    }
    }
  • Watch监听-TreeCache

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    /**
    * 演示 TreeCache:监听某个节点自己和所有子节点们
    */
    @Test
    public void testTreeCache() throws Exception {
    //1. 创建监听器
    TreeCache treeCache = new TreeCache(client,"/app2");
    //2. 注册监听
    treeCache.getListenable().addListener(new TreeCacheListener() {
    @Override
    public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
    System.out.println("节点变了");
    System.out.println(event);
    }
    });
    //3. 开启
    treeCache.start();
    while (true){
    }
    }

zookeeper分布式锁

介绍:后端项目使用分布式集群时,这时是多jvm环境,锁无法解决多机同步问题。出现了分布式锁。

zookeeper分布式锁原理:客户端要获取锁,1.在请求下创建临时顺序子节点,2.若节点最小,则获得了锁,执行业务3.使用完锁,则删除该节点,释放锁。

原理详解:

1.客户端在lock节点下创建临时顺序节点。

2.获取lock下面的所有子节点,如果发现自己创建的子节点序号最小,那么就认为该客户端获取到了锁,执行业务代码。使用完锁后,将该节点删除。

3.如果自己创建的节点并非lock所有子节点中最小的,找到比自己小的那个节点,同时对其注册事件监听器,监听删除事件。

4.如果发现比自己小的那个节点被删除,则客户端的Watcher会收到相应通知,此时再次判断自己创建的节点是否是lock子节点中序号最小的,如果是则获取到了锁,如果不是则重复以上步骤(监听比自己小一个节点的删除事件,判断自己是否是最小节点)。

Curator实现分布式锁:

  • 五种锁方案:

    • InterProcessSemaphoreMutex:分布式排它锁(非可重入锁)

    • InterProcessMutex:分布式可重入排它锁

    • InterProcessReadWriteLock:分布式读写锁

    • InterProcessMultiLock:将多个锁作为单个实体管理的容器

    • InterProcessSemaphoreV2:共享信号量

  • implementation:

    InterProcessMutex 方式

    1.创建线程,加锁设置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    public class Ticket12306 implements Runnable{
    private int tickets = 20; //数据库的票数
    private InterProcessMutex lock ;
    @Override
    public void run() {
    while(true){
    //获取锁
    try {
    lock.acquire(2, TimeUnit.SECONDS);
    if(tickets > 0){
    System.out.println(Thread.currentThread()+":"+tickets);
    Thread.sleep(100);
    tickets--;
    }
    } catch (Exception e) {
    e.printStackTrace();
    }finally {
    //释放锁
    try {
    lock.release();
    } catch (Exception e) {
    e.printStackTrace();
    }

    }
    }
    }
    }

    2.创建连接,初始化锁

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    public Ticket12306(){
    //重试策略
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
    //2.第二种方式
    //CuratorFrameworkFactory.builder();
    CuratorFramework client = CuratorFrameworkFactory.builder()
    .connectString("192.168.3.3:2181")
    .sessionTimeoutMs(60 * 1000)
    .connectionTimeoutMs(15 * 1000)
    .retryPolicy(retryPolicy)
    .build();
    //开启连接
    client.start();
    lock = new InterProcessMutex(client,"/lock");
    }

    3.多个线程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    public class LockTest {
    public static void main(String[] args) {
    Ticket12306 ticket12306 = new Ticket12306();
    //创建客户端
    Thread t1 = new Thread(ticket12306,"骆驼旅行");
    Thread t2 = new Thread(ticket12306,"分流抢票");
    t1.start();
    t2.start();
    }
    }

集群

集群角色

  • leader 领导
    • 处理事务请求
    • 集群内部各服务器的调度
  • follower 跟随者
    • 处理客户端非事务请求,转发事务给leader服务器
    • 参与leader选举投票
  • observer 观察者
    • 处理客户端非事务请求,转发事务给leader服务器

领导算法

介绍:服务器投票选举主服务器,名称为领导(leader)。

逻辑:

  • 服务器id越大,权重越大。

  • 半数投票前,每加入一个服务器都重新投票和选举。

  • 某个服务器得到半数投票后,角色变为leader。集群变为可用状态。

  • 有leader角色后,新服务器加入集群,不会重新投票和选举。

示例:

服务器id=1,2,3三个服务器,1,2,3依次启动。1启动,给自己投一票;2启动,1重新投票,投给2一票,2投给自己一票,2一共的了2票,超过半票,2的角色变为leader,1的角色为follower,集群启动;3启动,已经有leader角色,不投票,3的角色为follower.