Zookeeper集群管理详解


一、基本介绍

1. 集群介绍

随着现在服务需求的日渐增长,传统的单节点服务方式已然捉襟见肘,分布式的集群方案因此映入大众视野。

首先粗略介绍一下分布式集群的方式,即将计算服务部署于不同的服务器组成集群,有分发中心统一管理,外部服务请求到达分发中心,再由其根据一定策略转发到集群的节点,集群中各节点间的通讯也由分发中心实现。

通过分布式集群方式从而分摊服务器压力,从而达到更好的性能,而实现集群管理最常用的即 Zookeeper 服务,后续将详细进行介绍。

2. 框架介绍

Curator 是一个 Apache 的开源框架,它封装了 ZooKeeper 客户端的 API ,使开发者更容易地实现一些常用的 ZooKeeper 操作,例如分布式锁、选举、缓存、监控等。 Curator 提供了比 ZooKeeper API 更高级的抽象,它具有更好的可重用性和扩展性,以及更方便的错误处理和重试机制。

Curator 包含多个模块,每个模块都专注于不同的领域。

  • Curator Client: 提供与 ZooKeeper 服务进行连接、会话管理、节点创建、节点更新、节点删除等基础操作的封装。
  • Curator Recipes: 提供一些常见的基于 ZooKeeper 的分布式应用场景解决方案,例如分布式锁、分布式计数器、分布式队列、分布式缓存、分布式读写锁等。
  • Curator Framework: 将 ZooKeeper 的一些核心概念进行了封装,提供了一些高级的功能和抽象,例如 Cache、Watcher、Leader 选举等。

Curator 框架不仅提供了方便的 API ,还提供了很好的文档和示例,可以帮助开发者快速上手, Curator 的使用可以有效地减少 ZooKeeper 客户端的代码量,同时提高代码的可读性和可维护性。

二、集群节点

1. 工程依赖

新建 Spring Boot 工程 cluster-node 并添加 Curator 依赖,这里同时添加了 Log4j 用于日志打印。

需要注意 Curator 的版本需要根据的你的 Zookeeper 版本而定,否则程序将无法正常运行,CuratorZookeeper 版本对应如下:

  • Curator 5.0 支持 zookeeper 3.6.X,不再支持 zookeeper3.4.X
  • Curator 4.X 支持 zookeeper 3.5.X,软兼容 3.4.X
  • Curator 2.X 支持 zookeeper 3.4.X

我的 Zookeeper 版本是 3.4.12,所以 Maven 中选择的 Curator 版本是 2.X,根据你的情况自行调整。

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>2.12.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.12.0</version>
</dependency>
<!-- Log -->
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.25</version>
</dependency>

2. 项目配置

在项目的 application.yml 文件中配置应用端口与 Zookeeper 相关连接信息。

其中 node-pathnode-name 分别是后续注册到 Zookeeper 的路径与节点名称,建议 node-path 一旦确定即不要随意变动。

server:
  port: 9091

cluster:
  retry: 3
  zookeeper:
    ip: 192.168.0.20:2181
    timeout: 60000
  node-path: /ibudai/cluster
  node-name: node1

3. 依赖注入

在工程中新建 ZKConfig 用于配置相应的 zookeeper 连接对象。

@Configuration
public class ZKConfig {

    @Value("${cluster.zookeeper.ip}")
    private String zkHost;
    @Value("${cluster.zookeeper.timeout}")
    private int zkTimeout;

    @Bean
    public CuratorFramework zkClient() {
        return CuratorFrameworkFactory.builder()
                // Zookeeper 地址
                .connectString(zkHost)
                // 会话超时
                .sessionTimeoutMs(zkTimeout)
                // 重连策略
                .retryPolicy(new RetryForever(10000))
                .build();
    }
}

4. 服务节点

新建 CuratorService 类并实现 ApplicationRunner 接口,在工程启动时即将自身注册到 Zookeeper 中。

@Component
public class CuratorService implements ApplicationRunner {

    private final Logger logger = LoggerFactory.getLogger(CuratorService.class);

    @Value("${server.port}")
    private String port;

    /**
     * 服务节点路径
     */
    @Value("${cluster.node-path}")
    private String nodePath;

    /**
     * 服务节点名称
     */
    @Value("${cluster.node-name}")
    private String nodeName;

    /**
     * 服务节点的全路径
     */
    private String fullPath;

    private final AtomicBoolean isAvailable = new AtomicBoolean(false);

    @Autowired
    private CuratorFramework zkClient;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        // 连接 Zookeeper 服务
        zkClient.start();
        // 注册本机地址到 Zookeeper
        registered();
    }
}

下面重点解析一下节点注册和节点下线的核心代码和逻辑。

(1) 节点注册

在注册业务模块中需要根据 yml 配置文件中的路径和节点名称在 Zookeeper 中注册自身,同时监控节点的状态变化,节点监控在下一点介绍。

通过 create().forPath(var1, var1)Zookeeper 中新建节点并存入数据,创建每个节点都可以存储一定的数据,若无需存储数据则使用 .forPath(var1) 即可。

其中 var1 为节点的完整路径,即上述的 nodePath + nodeNamevar2 为创建节点时所存入的数据,这里通过 InetAddress 获取当前服务的 IP 与端口然后存入。

public void registered() {
    try {
        InetAddress localHost = InetAddress.getLocalHost();
        String host = localHost.getHostAddress() + ":" + port;
        // 注册节点
        fullPath = zkClient.create()
                // 父级目录不存在时创建
                .creatingParentsIfNeeded()
                // 设置策略, EPHEMERAL: 临时节点,当会话结束,节点会被自动删除
                .withMode(CreateMode.EPHEMERAL)
                // 注册节点并存入数据, 通过 zkClient.getData().forPath(node) 获取
                .forPath(nodePath + "/" + nodeName, host.getBytes());
        logger.info("服务节点 [{}] 已上线.", nodeName);
        // 监控节点
        monitorNode(fullPath);
    } catch (Exception e) {
    }
}
(2) 节点下线

节点下线非必须,如果你的集群节点不涉及到下线功能跳过即可。

节点下线功能相对简单,先判断节点是否存在,若存在删除即可。

public void cancellation() {
    try {
        Stat stat = zkClient.checkExists().forPath(fullPath);
        if (stat != null) {
            zkClient.delete().forPath(fullPath);
            logger.info("节点 [{}] 已下线.", nodeName);
        }
    } catch (Exception e) {
    }
}

5. 节点监控

在上述节点下线的示例中通过 checkExists() 检查节点是否存在,而通过 checkExists().usingWatcher() 即可为节点设置一个监听器。

这里使用监听器主要是为了监控节点状态,节点状态发生变化时则会触发,当节点异常下线时我们需要重新注册上线,以保证集群节点的稳定。

需要注意 Watcher 只会在节点发生变化时触发一次,触发后会自动被删除,需要重新注册 Watcher 才能再次观察节点变化,因此在上述的 registered() 的节点注册完成后都执行 monitorNode() 方法。

private void monitorNode(String nodePath) {
    try {
        Stat stat = zkClient.checkExists().usingWatcher((CuratorWatcher) event -> {
            if (event.getType() == Watcher.Event.EventType.NodeDeleted) {
                // node is deleted, check its state
                Stat st = zkClient.checkExists().forPath(nodePath);
                if (st != null && st.getEphemeralOwner() == 0) {
                    logger.info("node is unavailable, it may be a failure.");
                    isAvailable.set(false);
                    // restart node
                    registeredAddress();
                } else {
                    logger.info("node is still available, it is a normal down.");
                    isAvailable.set(true);
                }
            }
        }).forPath(nodePath);
        isAvailable.set(stat != null && stat.getEphemeralOwner() != 0);
    } catch (Exception e) {
        logger.error("Node monitor error.", e);
    }
}

6. 测试服务

完成上述配置之后所有节点注册的工作的都已经完成了,下面添加一个测试接口用于后续模拟请求。

在工程中新建 ProviderController 类并添加一个测试接口,接口返回当前节点的 IP 和端口。

@RestController
@RequestMapping("api/server")
public class ProviderController {

    @Value("${server.port}")
    private String port;

    /**
     * 调用方法
     *
     * @return String
     */
    @GetMapping("/callMethod")
    public String callMethod() {
        String msg = null;
        try {
            msg = "调用了服务提供者 " + InetAddress.getLocalHost().getHostAddress() + ":" + port + " 的方法";
        } catch (UnknownHostException e) {
            e.printStackTrace();
        }
        return msg;
    }
}

三、分发中心

1. 工程创建

在上面我们创建了一个工程 cluster-node 用于模拟集群中的节点,下面再新建工程 cluster-center 用于模拟分发中心,后续对外服务也是由分发中心提供,其 Maven 依赖同 cluster-node

工程中的同样在 YML 中配置节点信息,具体配置与上述 cluster-node 的配置相同这里不再重复介绍。

不同的是 cluster-center 在配置注入时多了 RestTemplate 用于后续请求发送,它是 Spring 中用于网络请求的类,类似于 Okhttp 等工具。

@Configuration
public class ZKConfig {

    @Value("${cluster.zookeeper.ip}")
    private String zkHost;
    @Value("${cluster.zookeeper.timeout}")
    private int zkTimeout;

    @Bean
    public RestTemplate restTemplate() {
        return new RestTemplate();
    }

    @Bean
    public CuratorFramework zkClient() {
        return CuratorFrameworkFactory.builder()
                // Zookeeper 地址
                .connectString(zkHost)
                // 会话超时
                .sessionTimeoutMs(zkTimeout)
                // 重连策略
                .retryPolicy(new RetryForever(10000))
                .build();
    }
}

2. 集群监控

同样在工程中新建 CuratorService 类并实现 ApplicationRunner ,当程序启动时对配置的 Zookeeper 路径下的集群节点进行监控。

集群监控主要的功能即监控当前处于活动状态的节点,及时将以下线的节点从服务列表中移除,从而达到故障转移的效果。

@Component
public class CuratorService implements ApplicationRunner {

    private final Logger logger = LoggerFactory.getLogger(CuratorService.class);

    /**
     * 服务节点路径
     */
    @Value("${cluster.node-path}")
    private String nodePath;

    /**
     * 当前在线的服务节点
     */
    private List<String> nodeList;

    private static int requestNum = 0;

    @Autowired
    private CuratorFramework zkClient;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        // 构建 CuratorFramework 客户端,并开启会话
        zkClient.start();
        // 获取服务列表
        Stat stat = zkClient.checkExists().forPath(nodePath);
        if (stat == null) {
            throw new RuntimeException("服务地址未注册到 Zookeeper.");
        } else {
            nodeList = zkClient.getChildren().forPath(nodePath);
        }
        // 开启对 PROVIDER_NODE 子节点变化事件的监听
        monitorPath();
    }
}
(1) 节点监控

PathChildrenCacheCurator 提供的一种缓存节点的方式,可以监控一个节点的所有子节点变化,包括子节点的新增、删除和修改操作。而之前 usingWatcher() 只能同时监控单个节点且是一次性的,触发之后需要重新创建。

PathChildrenCache 中我们创建了一个监听器实时监听 Zookeeper 指定路径下的所有节点状态,当发生变化时则获取当前在线的节点列表并更新。

注意 PathChildrenCache 在高版本的 Curator 已标为弃用,而使用 CuratorCacheListener 创建监听器,但其目的是类似的这里不再重复介绍。

public void monitorPath() throws Exception {
    PathChildrenCache cache = new PathChildrenCache(zkClient, nodePath, true);
    cache.getListenable().addListener((client, event) -> {
        nodeList = client.getChildren().forPath(nodePath);
        logger.info("节点变更,当前在线节点: [{}]", nodeList);
    });
    cache.start();
}

3. 轮询策略

这里通过静态变量 requestNum 简单的实现一个类 Nginx 的基础轮询,即按节点列表顺序获取服务地址。

cluster-node 工程中注册节点时我们存入的节点的服务 IP 和端口,这里通过 getData() 方法进行获取。

public String roundRobin() {
    if (nodeList.isEmpty()) {
        throw new RuntimeException(">>> 服务提供者地址列表为空");
    }

    byte[] data;
    try {
        int i = requestNum % nodeList.size();
        requestNum++;
        String node = nodeList.get(i);
        // 获取节点服务信息
        node = nodePath + "/" + node;
        Stat stat = zkClient.checkExists().forPath(node);
        if (stat != null) {
            data = zkClient.getData().forPath(node);
        } else {
            logger.error("[{}] does not exist.", node);
            throw new RuntimeException("does not exist");
        }
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
    return new String(data);
}

4. 测试接口

同样为了测试这里添加了一个接口用于服务调用,当外部服务请求分心中心时,分发中心会根据轮询策略从节点列表中获取节点实现分布式请求,这里使用了 RestTemplate 实现网络通讯,你也可以根据需要替换为 Okhttp 等网络框架。

其中参数 apiPathcluster-node 中的接口地址,如 /api/provider/callMethod

@RestController
@RequestMapping("/api/center")
public class ConsumerController {

    private final Logger logger = LoggerFactory.getLogger(ConsumerController.class);

    @Autowired
    private RestTemplate restTemplate;

    @Autowired
    private CuratorService curatorService;

    /**
     * 调用方法
     *
     * @return String
     */
    @GetMapping("/callMethod")
    public String callMethod(String apiPath) {
        // 轮询策略获取服务地址
        String path = curatorService.roundRobin() + apiPath;
        // 使用 RestTemplate 远程调用服务
        String msg = restTemplate.getForObject("http://" + path, String.class);
        logger.info(msg);
        return msg;
    }
}

四、分发测试

1. 工程复制

IDEA 中启动 cluster-node 工程,并选择 Edit Configuration 进行工程服务,模拟集群中存在多个节点。

在复制的启动项添加 VM Option,具体内容如下:

-Dserver.port=9092
-Dcluster.node-name=node2

2. 工程启动

完成后启动上述两个工程与 cluster-center 工程,控制台输出日志如下启动成功。


3. 请求测试

通过多次请求 cluster-center 接口可以看到控制台打印按照我们预期将请求在节点列表中实现了轮询。


文章作者: 烽火戏诸诸诸侯
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 烽火戏诸诸诸侯 !
  目录