为了解决数据的一致性,下面将从单机模式和集

作者:美狮美高梅官方网站

在Zookeeper实现分布式锁的前两篇文章

理解 zookeeper

Zookeeper 主要用来解决分布式应用中经常遇到的一些数据管理,如统一命名服务器、状态同步服务、集群管理、分布式应用配置项的管理。

之前整理过一篇文章《zookeeper 分布式锁服务》,本文介绍的 Zookeeper 是以 3.4.5 这个稳定版本为基础,最新的版本可以通过官网 来获取,Zookeeper 的安装非常简单,下面将从单机模式和集群模式两个方面介绍 Zookeeper 的Windows安装和配置.

Zookeeper实现分布式锁While版Zookeeper实现分布式锁Watcher版

节点特性

  1. 同一时刻多台机器创建同一个节点,只有一个会争抢成功。利用这个特性可以做分布式锁。
  2. 临时节点的生命周期与会话一致,会话关闭则临时节点删除。利用这个特性经常来做心跳,动态监控,负载等动作。
  3. 顺序节点保证节点名全局唯一。这个特性可以用来生成分布式环境下的全局自增长id。

首先需要安装JdK,从Oracle的Java网站下载,安装很简单,就不再详述。

我们实现了两种类型的锁,第一种是while循环,由于无限循环带来的cpu和zookeeper服务器的消耗,我们使用了watcher的方式,watcher方式采用了客户端监听锁节点的方式,完美解决了while出现的问题,但是这种方式又会存在“惊群”效应,本篇文章我们来实现一种排队锁也就是公平锁。

zookeeper 提供的服务

  1. 创建节点
  2. 删除节点
  3. 更新节点
  4. 获取节点信息
  5. 权限控制
  6. 事件监听

Zookeeper的集群对server进行了归类:

  • Leader
  • Follower
  • Observer

单机模式

zookeeper实现公平锁依赖于zookeeper的顺序节点,先来了解一下zookeeper的节点都有什么类型的节点。

Zookeeper 作用(使用场景)

  1. 配置中心 -- Zookeeper 的目录结构比较特殊,可以这个特性作为分布式的配置中心,当配置内容发生更新可以及时通知各服务器进行更新
  2. 集群选举 -- 当某一个服务宕机或者整个服务重启,可根据Zookeeper节点的顺序一致性来选择最大节点或者最小节点作为leader
  3. 分布式锁 -- 原理同集群选举,根据节点的顺序一致性来选择最小节点对应的那个服务获得锁,当服务执行完成删除节点就会释放锁,再由其他服务去争取锁。
  4. 注册中心 -- Zookeeper的目录以及子节点。主要通过对节点的管理做到发布以及事件监听做到订阅。
  5. 队列管理
    1. 同步队列 当一个队列的队员都聚齐时,队列才可用,否则一直等待所有的成员到达,这种是同步队列
      创建一个父目录 /synchronizing ,每个成员都监控标志(Set Watch)位目录 /synchronizing/start 是否存在,然后每个成员都加入这个队列,加入队列的方式就是创建 /synchronizing/member_i 的临时目录节点,然后每个成员获取 、synchronizing 目录的所有目录节点,也就是 member_i,判断 i 的值是否已经是成员的目录,如果小于成员个数就等待 synchronizing/start 的出现,如果相等就创建synchronizing/start
    2. 异步队列 队列按照FIFO方式进行入队和出队操作,例如实现生产者和消费者模型。
      保证所有成员加入队列时都是有编号的,出队是通过getChildren() 方法可以返回当前所有的队列元素,然后消费其中最小的元素。

单机安装非常简单,只要获取到 Zookeeper 的压缩包并解压到某个目录如:C:zookeeper-3.4.5下,Zookeeper 的启动脚本在 bin 目录下,Windows 下的启动脚本是 zkServer.cmd。

图片 197Z0@F$BP%S)Y1W07WZK`41.png一共有四种

Zookeeper 在分布式中的作用:

使用Zookeeper提供分布式锁机制,从而实现分布式的一致性处理。

  • Barrier
  • Queue
  • Lock
  • 2PC

在你执行启动脚本之前,还有几个基本的配置项需要配置一下,Zookeeper 的配置文件在 conf 目录下,这个目录下有 zoo_sample.cfg 和 log4j.properties,你需要做的就是将 zoo_sample.cfg 改名为 zoo.cfg,因为 Zookeeper 在启动时会找这个文件作为默认配置文件。下面详细介绍一下,这个配置文件中各个配置项的意义。

EPHEMERAL:临时节点,当客户端与ZooKeeper集合断开连接时,临时节点会自动删除。PERSISTENT:持久节点,即使在创建该节点的客户端断开连接后,持久节点仍然存在。EPHEMERAL_SEQUENTIAL:临时顺序节点PERSISTENT_SEQUENTIAL:持久顺序节点

Java Api 接口

String create(String path, byte data[], List<ACL> acl, CreateMode createMode)
void   create(String path, byte data[], List<ACL> acl, CreateMode createMode, StringCallback cb, Object ctx)

void delete(String path, int version)
void delete(String path, int version, VoidCallback cb, Object ctx)

Stat setData(String path, byte data[], int version)
void setData(String path, byte data[], int version, StatCallback cb, Object ctx)

Stat setACL(String path, List<ACL> acl, int version)
void setACL(String path, List<ACL> acl, int version, StatCallback cb, Object ctx)

Stat exists(String path, Watcher watcher)
Stat exists(String path, boolean watch)
void exists(String path, Watcher watcher, StatCallback cb, Object ctx)
void exists(String path, boolean watch  , StatCallback cb, Object ctx)

byte[] getData(String path, Watcher watcher, Stat stat)
byte[] getData(String path, boolean watch  , Stat stat)
void   getData(String path, Watcher watcher, DataCallback cb, Object ctx)
void   getData(String path, boolean watch  , DataCallback cb, Object ctx)

List<String> getChildren(String path, Watcher watcher)
List<String> getChildren(String path, boolean watch  )
void  getChildren(String path, Watcher watcher, ChildrenCallback cb, Object ctx)
void  getChildren(String path, boolean watch  , ChildrenCallback cb, Object ctx)

List<String> getChildren(String path, Watcher watcher, Stat stat)
List<String> getChildren(String path, boolean watch  , Stat stat)
void getChildren(String path, Watcher watcher, Children2Callback cb, Object ctx)
void getChildren(String path, boolean watch  , Children2Callback cb, Object ctx)
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=C:\zookeeper-3.4.5\data
dataLogDir=C:\zookeeper-3.4.5\log
# the port at which the clients will connect
clientPort=2181
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

重点说一下顺序节点的含义:顺序节点可以是持久的或临时的。当一个新的znode被创建为一个顺序节点时,ZooKeeper通过将10位的序列号附加到原始名称来设置znode的路径。例如,如果将具有路径 /myapp 的znode创建为顺序节点,则ZooKeeper会将路径更改为 /myapp0000000001 ,并将下一个序列号设置为0000000002。如果两个顺序节点是同时创建的,那么ZooKeeper不会对每个znode使用相同的数字。

接口说明

  • 每一种安同步还是异步
  • 添加指定watcher还是默认watcher分为4中。默认watcher在Zookeeper 初始化中进行指定。
  • 如果包含boolean watch 的读方法传入true,则将默认为watcher注册为所关注事件的watch。如果传入false则不注册watch。
  • tickTime:这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。
  • dataDir:顾名思义就是 Zookeeper 保存数据的目录,默认情况下,Zookeeper 将写数据的日志文件也保存在这个目录里。
  • dataLogDir:顾名思义就是 Zookeeper 保存日志文件的目录
  • clientPort:这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。

根据同时创建的同名顺序节点zookeeper会自动在命名上排序的特性,可以实现我们的公平锁。

CreateMode

  • PERSISTENT 持续的。相比与EPHEMERAL,不会随着client session的close或者expire而消失
  • PERSISTENT_SEQUENTIAL
  • EPHEMERAL 短暂的,生命周期依赖于client session、对应session close/expire 后其znode也会消失。
  • EPHEMERAL_SEQUENTIAL SEQUENTIAL意为顺序的。

Zookeeper 为了解决数据的一致性,使用Watcher的异步回调接口,将服务器znode的变化以事件的形式通知给客户端,主要是一种方向推送的机制,让客户端可以做出及时响应。比如及时更新后端的可用集群服务列表。

当这些配置项配置好后,你现在就可以启动 Zookeeper 了,启动后要检查 Zookeeper 是否已经在服务,可以通过 netstat – ano 命令查看是否有你配置的 clientPort 端口号在监听服务。

1.获取锁

多个客户端在lockName目录下创建同名的临时顺序节点,因为zookeeper会为我们保证节点的命名不重复性和顺序性,所以可以利用节点的顺序进行锁的判断。

public void lock(){ String path = null; try { path = zk.create(lockName+"/mylock_", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); lockZnode = path; List<String> minPath = zk.getChildren(lockName,false); System.out.println; Collections.sort; System.out.println("最小的节点是:"+minPath.get; if (path!=null&&!path.isEmpty() &&minPath.get!=null&&!minPath.get.isEmpty() &&path.equals(lockName+"/"+minPath.get { System.out.println(Thread.currentThread().getName() + " 获取锁..."); return; } String watchNode = null; for (int i=minPath.size()-1;i>=0;i--){ if(minPath.get.compareTo(path.substring(path.lastIndexOf)<0){ watchNode = minPath.get; break; } } if (watchNode!=null){ final String watchNodeTmp = watchNode; final Thread thread = Thread.currentThread(); Stat stat = zk.exists(lockName + "/" + watchNodeTmp,new Watcher() { @Override public void process(WatchedEvent watchedEvent) { if(watchedEvent.getType() == Event.EventType.NodeDeleted){ System.out.println("delete事件来了"); thread.interrupt(); System.out.println; } } }); if(stat != null){ System.out.println(Thread.currentThread().getName() + " waiting for " + lockName + "/" + watchNode); } } try { Thread.sleep; }catch (InterruptedException ex){ System.out.println(Thread.currentThread().getName() + " 被唤醒"); System.out.println(Thread.currentThread().getName() + " 获取锁..."); return; } } catch (Exception e) { e.printStackTrace(); }}

获取锁逻辑:

1.首先创建顺序节点,然后获取当前目录下最小的节点,判断最小节点是不是当前节点,如果是那么获取锁成功,如果不是那么获取锁失败。2.获取锁失败的节点获取当前节点上一个顺序节点,对此节点注册watcher监听,并使当前线程进入sleep状态。3.当监听的节点unlock删除节点之后会捕获到delete事件,这说明前面的线程都执行完了,当前线程interrupt,打断sleep状态,获取锁。

参考网站

Zookeeper Api(java)入门与应用(转)

保证分布式系统数据一致性的6种方案

解决分布式系统的一致性问题,我们需要了解哪些理论

分布式系统的事务处理

ZooKeeper典型应用场景一览

zookeeper中的基本概念

图片 2

2.释放锁
public void unlock(){ try { System.out.println(Thread.currentThread().getName() + "释放 Lock..."); zk.delete(lockZnode,-1); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } }

Watcher/Callback 参考网站

集群模式

3.输出结果
Receive event WatchedEvent state:SyncConnected type:None path:nullconnection is ok[mylock_0000000112]最小的节点是:mylock_0000000112pool-1-thread-2 获取锁...Receive event WatchedEvent state:SyncConnected type:None path:nullconnection is okReceive event WatchedEvent state:SyncConnected type:None path:nullconnection is okReceive event WatchedEvent state:SyncConnected type:None path:nullconnection is ok[mylock_0000000113, mylock_0000000112, mylock_0000000115, mylock_0000000114]最小的节点是:mylock_0000000112[mylock_0000000113, mylock_0000000112, mylock_0000000115, mylock_0000000114]最小的节点是:mylock_0000000112[mylock_0000000113, mylock_0000000112, mylock_0000000115, mylock_0000000114]最小的节点是:mylock_0000000112pool-1-thread-2释放 Lock...pool-1-thread-1 waiting for /mylock/mylock_0000000113pool-1-thread-3 waiting for /mylock/mylock_0000000114pool-1-thread-4 waiting for /mylock/mylock_0000000112delete事件来了打断当前线程pool-1-thread-4 被唤醒pool-1-thread-4 获取锁...pool-1-thread-4释放 Lock...delete事件来了打断当前线程pool-1-thread-1 被唤醒pool-1-thread-1 获取锁...pool-1-thread-1释放 Lock...delete事件来了打断当前线程pool-1-thread-3 被唤醒pool-1-thread-3 获取锁...pool-1-thread-3释放 Lock...

根据输出结果,可以看出来这次的客户端获取锁是公平的,排着队一个一个来的,因为每个节点都有自己的一个数字id,根据数字id来监听比自己小的节点,这样释放锁只唤醒一个客户端,而不会产生惊群效应。

while版、watcher版、FairLock版这三种锁while:性能好,但是资源消耗大,适合业务逻辑时间短的场景watcher:性能中上,但是容易惊群,适合客户端比较少的场景FairLock:性能低,但是安全性高,适合速度要求一般,按顺序来的场景

githup代码:

ACL

Zookeeper 不仅可以单机提供服务,同时也支持多机组成集群来提供服务。实际上 Zookeeper 还支持另外一种伪集群的方式,也就是可以在一台物理机上运行多个 Zookeeper 实例,下面将介绍集群模式的安装和配置。

集群管理

paxos 实现 paxos算法介绍续 zookeeper代码解析

Zookeeper 的集群模式的安装和配置也不是很复杂,所要做的就是增加几个配置项。集群模式除了上面的三个配置项还要增加下面几个配置项:

Zookeeper 官方文档

initLimit=5 
syncLimit=2 
server.1=192.168.211.1:2888:3888 
server.2=192.168.211.2:2888:3888

TODO

Zookeeper 一致性

  • initLimit:这个配置项是用来配置 Zookeeper 接受客户端(这里所说的客户端不是用户连接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中连接到 Leader 的 Follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过 10 个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 5*2000=10 秒
  • syncLimit:这个配置项标识 Leader 与 Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是 2*2000=4 秒
  • server.A=B:C:D:其中 A 是一个数字,表示这个是第几号服务器;B 是这个服务器的 ip 地址;C 表示的是这个服务器与集群中的 Leader 服务器交换信息的端口;D 表示的是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口。如果是伪集群的配置方式,由于 B 都是一样,所以不同的 Zookeeper 实例通信端口号不能一样,所以要给它们分配不同的端口号。
  • 除了修改 zoo.cfg 配置文件,集群模式下还要配置一个文件 myid,这个文件在 dataDir 目录下,这个文件里面就有一个数据就是 A 的值,Zookeeper 启动时会读取这个文件,拿到里面的数据与 zoo.cfg 里面的配置信息比较从而判断到底是那个 server。

数据模型

Zookeeper 会维护一个具有层次关系的数据结构,它非常类似于一个标准的文件系统,如图 1 所示:

  • 图片 3

Zookeeper 这种数据结构有如下这些特点:

  1. 每个子目录项如 NameService 都被称作为 znode,这个 znode 是被它所在的路径唯一标识,如 Server1 这个 znode 的标识为 /NameService/Server1
  2. znode 可以有子节点目录,并且每个 znode 可以存储数据,注意 EPHEMERAL 类型的目录节点不能有子节点目录
  3. znode 是有版本的,每个 znode 中存储的数据可以有多个版本,也就是一个访问路径中可以存储多份数据
  4. znode 可以是临时节点,一旦创建这个 znode 的客户端与服务器失去联系,这个 znode 也将自动删除,Zookeeper 的客户端和服务器通信采用长连接方式,每个客户端和服务器通过心跳来保持连接,这个连接状态称为 session,如果 znode 是临时节点,这个 session 失效,znode 也就删除了
  5. znode 的目录名可以自动编号,如 App1 已经存在,再创建的话,将会自动命名为 App2
  6. znode 可以被监控,包括这个目录节点中存储的数据的修改,子节点目录的变化等,一旦变化可以通知设置监控的客户端,这个是 Zookeeper 的核心特性,Zookeeper 的很多功能都是基于这个特性实现的,后面在典型的应用场景中会有实例介绍

如何使用

Zookeeper 作为一个分布式的服务框架,主要用来解决分布式集群中应用系统的一致性问题,它能提供基于类似于文件系统的目录节点树方式的数据存储,但是 Zookeeper 并不是用来专门存储数据的,它的作用主要是用来维护和监控你存储的数据的状态变化。通过监控这些数据状态的变化,从而可以达到基于数据的集群管理.

通过C#代码使用zookeeper

Zookeeper的使用主要是通过创建其Nuget ZooKeeperNet包下的Zookeeper实例,并且调用其接口方法进行的,主要的操作就是对znode的增删改操作,监听znode的变化以及处理。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using ZooKeeperNet;

namespace ZookeeperDemo
{
    class Watcher : IWatcher
    {
        public void Process(WatchedEvent @event)
        {
            if (@event.Type == EventType.NodeDataChanged)
            {
                Console.WriteLine(@event.Path);
            }
        }
    }
}

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using ZooKeeperNet;

namespace ZookeeperDemo
{
    class Program
    {
        static void Main(string[] args)
        {

            //创建一个Zookeeper实例,第一个参数为目标服务器地址和端口,第二个参数为Session超时时间,第三个为节点变化时的回调方法 
            using (ZooKeeper zk = new ZooKeeper("127.0.0.1:2181", new TimeSpan(0, 0, 0, 50000), new Watcher()))
            {
                var stat = zk.Exists("/root",true);

                ////创建一个节点root,数据是mydata,不进行ACL权限控制,节点为永久性的(即客户端shutdown了也不会消失) 
                //zk.Create("/root", "mydata".GetBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.Persistent);

                //在root下面创建一个childone znode,数据为childone,不进行ACL权限控制,节点为永久性的 
                zk.Create("/root/childone", "childone".GetBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.Persistent);
                //取得/root节点下的子节点名称,返回List<String> 
                zk.GetChildren("/root", true);
                //取得/root/childone节点下的数据,返回byte[] 
                zk.GetData("/root/childone", true, null);

                //修改节点/root/childone下的数据,第三个参数为版本,如果是-1,那会无视被修改的数据版本,直接改掉
                zk.SetData("/root/childone", "childonemodify".GetBytes(), -1);
                //删除/root/childone这个节点,第二个参数为版本,-1的话直接删除,无视版本 
                zk.Delete("/root/childone", -1);
            }

        }
    }
}

 

本文由美狮美高梅官方网站发布,转载请注明来源

关键词: