一、基本介绍
1. 集群介绍
随着现在服务需求的日渐增长,传统的单节点服务方式已然捉襟见肘,分布式的集群方案因此映入大众视野。
首先粗略介绍一下分布式集群的方式,即将计算服务部署于不同的服务器组成集群,有分发中心统一管理,外部服务请求到达分发中心,再由其根据一定策略转发到集群的节点,集群中各节点间的通讯也由分发中心实现。
通过分布式集群方式从而分摊服务器压力,从而达到更好的性能,而实现集群管理最常用的即 Zookeeper
服务,后续将详细进行介绍。
2. 框架介绍
Curator
是一个 Apache
的开源框架,它封装了 ZooKeeper
客户端的 API ,使开发者更容易地实现一些常用的 ZooKeeper
操作,例如分布式锁、选举、缓存、监控等。 Curator
提供了比 ZooKeeper API
更高级的抽象,它具有更好的可重用性和扩展性,以及更方便的错误处理和重试机制。
Curator
包含多个模块,每个模块都专注于不同的领域。
- Curator Client: 提供与
ZooKeeper
服务进行连接、会话管理、节点创建、节点更新、节点删除等基础操作的封装。- Curator Recipes: 提供一些常见的基于
ZooKeeper
的分布式应用场景解决方案,例如分布式锁、分布式计数器、分布式队列、分布式缓存、分布式读写锁等。- Curator Framework: 将
ZooKeeper
的一些核心概念进行了封装,提供了一些高级的功能和抽象,例如Cache、Watcher、Leader
选举等。
Curator
框架不仅提供了方便的 API
,还提供了很好的文档和示例,可以帮助开发者快速上手, Curator
的使用可以有效地减少 ZooKeeper
客户端的代码量,同时提高代码的可读性和可维护性。
二、集群节点
1. 工程依赖
新建 Spring Boot
工程 cluster-node
并添加 Curator
依赖,这里同时添加了 Log4j
用于日志打印。
需要注意 Curator
的版本需要根据的你的 Zookeeper
版本而定,否则程序将无法正常运行,Curator
与 Zookeeper
版本对应如下:
Curator 5.0
支持zookeeper 3.6.X
,不再支持zookeeper3.4.X
。Curator 4.X
支持zookeeper 3.5.X
,软兼容3.4.X
。Curator 2.X
支持zookeeper 3.4.X
。
我的 Zookeeper
版本是 3.4.12
,所以 Maven
中选择的 Curator
版本是 2.X
,根据你的情况自行调整。
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
<!-- Log -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
2. 项目配置
在项目的 application.yml
文件中配置应用端口与 Zookeeper
相关连接信息。
其中 node-path
与 node-name
分别是后续注册到 Zookeeper
的路径与节点名称,建议 node-path
一旦确定即不要随意变动。
server:
port: 9091
cluster:
retry: 3
zookeeper:
ip: 192.168.0.20:2181
timeout: 60000
node-path: /ibudai/cluster
node-name: node1
3. 依赖注入
在工程中新建 ZKConfig
用于配置相应的 zookeeper
连接对象。
@Configuration
public class ZKConfig {
@Value("${cluster.zookeeper.ip}")
private String zkHost;
@Value("${cluster.zookeeper.timeout}")
private int zkTimeout;
@Bean
public CuratorFramework zkClient() {
return CuratorFrameworkFactory.builder()
// Zookeeper 地址
.connectString(zkHost)
// 会话超时
.sessionTimeoutMs(zkTimeout)
// 重连策略
.retryPolicy(new RetryForever(10000))
.build();
}
}
4. 服务节点
新建 CuratorService
类并实现 ApplicationRunner
接口,在工程启动时即将自身注册到 Zookeeper
中。
@Component
public class CuratorService implements ApplicationRunner {
private final Logger logger = LoggerFactory.getLogger(CuratorService.class);
@Value("${server.port}")
private String port;
/**
* 服务节点路径
*/
@Value("${cluster.node-path}")
private String nodePath;
/**
* 服务节点名称
*/
@Value("${cluster.node-name}")
private String nodeName;
/**
* 服务节点的全路径
*/
private String fullPath;
private final AtomicBoolean isAvailable = new AtomicBoolean(false);
@Autowired
private CuratorFramework zkClient;
@Override
public void run(ApplicationArguments args) throws Exception {
// 连接 Zookeeper 服务
zkClient.start();
// 注册本机地址到 Zookeeper
registered();
}
}
下面重点解析一下节点注册和节点下线的核心代码和逻辑。
(1) 节点注册
在注册业务模块中需要根据 yml
配置文件中的路径和节点名称在 Zookeeper
中注册自身,同时监控节点的状态变化,节点监控在下一点介绍。
通过 create().forPath(var1, var1)
在 Zookeeper
中新建节点并存入数据,创建每个节点都可以存储一定的数据,若无需存储数据则使用 .forPath(var1)
即可。
其中 var1
为节点的完整路径,即上述的 nodePath + nodeName
;var2
为创建节点时所存入的数据,这里通过 InetAddress
获取当前服务的 IP
与端口然后存入。
public void registered() {
try {
InetAddress localHost = InetAddress.getLocalHost();
String host = localHost.getHostAddress() + ":" + port;
// 注册节点
fullPath = zkClient.create()
// 父级目录不存在时创建
.creatingParentsIfNeeded()
// 设置策略, EPHEMERAL: 临时节点,当会话结束,节点会被自动删除
.withMode(CreateMode.EPHEMERAL)
// 注册节点并存入数据, 通过 zkClient.getData().forPath(node) 获取
.forPath(nodePath + "/" + nodeName, host.getBytes());
logger.info("服务节点 [{}] 已上线.", nodeName);
// 监控节点
monitorNode(fullPath);
} catch (Exception e) {
}
}
(2) 节点下线
节点下线非必须,如果你的集群节点不涉及到下线功能跳过即可。
节点下线功能相对简单,先判断节点是否存在,若存在删除即可。
public void cancellation() {
try {
Stat stat = zkClient.checkExists().forPath(fullPath);
if (stat != null) {
zkClient.delete().forPath(fullPath);
logger.info("节点 [{}] 已下线.", nodeName);
}
} catch (Exception e) {
}
}
5. 节点监控
在上述节点下线的示例中通过 checkExists()
检查节点是否存在,而通过 checkExists().usingWatcher()
即可为节点设置一个监听器。
这里使用监听器主要是为了监控节点状态,节点状态发生变化时则会触发,当节点异常下线时我们需要重新注册上线,以保证集群节点的稳定。
需要注意 Watcher
只会在节点发生变化时触发一次,触发后会自动被删除,需要重新注册 Watcher
才能再次观察节点变化,因此在上述的 registered()
的节点注册完成后都执行 monitorNode()
方法。
private void monitorNode(String nodePath) {
try {
Stat stat = zkClient.checkExists().usingWatcher((CuratorWatcher) event -> {
if (event.getType() == Watcher.Event.EventType.NodeDeleted) {
// node is deleted, check its state
Stat st = zkClient.checkExists().forPath(nodePath);
if (st != null && st.getEphemeralOwner() == 0) {
logger.info("node is unavailable, it may be a failure.");
isAvailable.set(false);
// restart node
registeredAddress();
} else {
logger.info("node is still available, it is a normal down.");
isAvailable.set(true);
}
}
}).forPath(nodePath);
isAvailable.set(stat != null && stat.getEphemeralOwner() != 0);
} catch (Exception e) {
logger.error("Node monitor error.", e);
}
}
6. 测试服务
完成上述配置之后所有节点注册的工作的都已经完成了,下面添加一个测试接口用于后续模拟请求。
在工程中新建 ProviderController
类并添加一个测试接口,接口返回当前节点的 IP
和端口。
@RestController
@RequestMapping("api/server")
public class ProviderController {
@Value("${server.port}")
private String port;
/**
* 调用方法
*
* @return String
*/
@GetMapping("/callMethod")
public String callMethod() {
String msg = null;
try {
msg = "调用了服务提供者 " + InetAddress.getLocalHost().getHostAddress() + ":" + port + " 的方法";
} catch (UnknownHostException e) {
e.printStackTrace();
}
return msg;
}
}
三、分发中心
1. 工程创建
在上面我们创建了一个工程 cluster-node
用于模拟集群中的节点,下面再新建工程 cluster-center
用于模拟分发中心,后续对外服务也是由分发中心提供,其 Maven
依赖同 cluster-node
。
工程中的同样在 YML
中配置节点信息,具体配置与上述 cluster-node
的配置相同这里不再重复介绍。
不同的是 cluster-center
在配置注入时多了 RestTemplate
用于后续请求发送,它是 Spring
中用于网络请求的类,类似于 Okhttp
等工具。
@Configuration
public class ZKConfig {
@Value("${cluster.zookeeper.ip}")
private String zkHost;
@Value("${cluster.zookeeper.timeout}")
private int zkTimeout;
@Bean
public RestTemplate restTemplate() {
return new RestTemplate();
}
@Bean
public CuratorFramework zkClient() {
return CuratorFrameworkFactory.builder()
// Zookeeper 地址
.connectString(zkHost)
// 会话超时
.sessionTimeoutMs(zkTimeout)
// 重连策略
.retryPolicy(new RetryForever(10000))
.build();
}
}
2. 集群监控
同样在工程中新建 CuratorService
类并实现 ApplicationRunner
,当程序启动时对配置的 Zookeeper
路径下的集群节点进行监控。
集群监控主要的功能即监控当前处于活动状态的节点,及时将以下线的节点从服务列表中移除,从而达到故障转移的效果。
@Component
public class CuratorService implements ApplicationRunner {
private final Logger logger = LoggerFactory.getLogger(CuratorService.class);
/**
* 服务节点路径
*/
@Value("${cluster.node-path}")
private String nodePath;
/**
* 当前在线的服务节点
*/
private List<String> nodeList;
private static int requestNum = 0;
@Autowired
private CuratorFramework zkClient;
@Override
public void run(ApplicationArguments args) throws Exception {
// 构建 CuratorFramework 客户端,并开启会话
zkClient.start();
// 获取服务列表
Stat stat = zkClient.checkExists().forPath(nodePath);
if (stat == null) {
throw new RuntimeException("服务地址未注册到 Zookeeper.");
} else {
nodeList = zkClient.getChildren().forPath(nodePath);
}
// 开启对 PROVIDER_NODE 子节点变化事件的监听
monitorPath();
}
}
(1) 节点监控
PathChildrenCache
是 Curator
提供的一种缓存节点的方式,可以监控一个节点的所有子节点变化,包括子节点的新增、删除和修改操作。而之前 usingWatcher()
只能同时监控单个节点且是一次性的,触发之后需要重新创建。
在 PathChildrenCache
中我们创建了一个监听器实时监听 Zookeeper
指定路径下的所有节点状态,当发生变化时则获取当前在线的节点列表并更新。
注意 PathChildrenCache
在高版本的 Curator
已标为弃用,而使用 CuratorCacheListener
创建监听器,但其目的是类似的这里不再重复介绍。
public void monitorPath() throws Exception {
PathChildrenCache cache = new PathChildrenCache(zkClient, nodePath, true);
cache.getListenable().addListener((client, event) -> {
nodeList = client.getChildren().forPath(nodePath);
logger.info("节点变更,当前在线节点: [{}]", nodeList);
});
cache.start();
}
3. 轮询策略
这里通过静态变量 requestNum
简单的实现一个类 Nginx
的基础轮询,即按节点列表顺序获取服务地址。
在 cluster-node
工程中注册节点时我们存入的节点的服务 IP
和端口,这里通过 getData()
方法进行获取。
public String roundRobin() {
if (nodeList.isEmpty()) {
throw new RuntimeException(">>> 服务提供者地址列表为空");
}
byte[] data;
try {
int i = requestNum % nodeList.size();
requestNum++;
String node = nodeList.get(i);
// 获取节点服务信息
node = nodePath + "/" + node;
Stat stat = zkClient.checkExists().forPath(node);
if (stat != null) {
data = zkClient.getData().forPath(node);
} else {
logger.error("[{}] does not exist.", node);
throw new RuntimeException("does not exist");
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return new String(data);
}
4. 测试接口
同样为了测试这里添加了一个接口用于服务调用,当外部服务请求分心中心时,分发中心会根据轮询策略从节点列表中获取节点实现分布式请求,这里使用了 RestTemplate
实现网络通讯,你也可以根据需要替换为 Okhttp
等网络框架。
其中参数 apiPath
为 cluster-node
中的接口地址,如 /api/provider/callMethod
。
@RestController
@RequestMapping("/api/center")
public class ConsumerController {
private final Logger logger = LoggerFactory.getLogger(ConsumerController.class);
@Autowired
private RestTemplate restTemplate;
@Autowired
private CuratorService curatorService;
/**
* 调用方法
*
* @return String
*/
@GetMapping("/callMethod")
public String callMethod(String apiPath) {
// 轮询策略获取服务地址
String path = curatorService.roundRobin() + apiPath;
// 使用 RestTemplate 远程调用服务
String msg = restTemplate.getForObject("http://" + path, String.class);
logger.info(msg);
return msg;
}
}
四、分发测试
1. 工程复制
在 IDEA
中启动 cluster-node
工程,并选择 Edit Configuration
进行工程服务,模拟集群中存在多个节点。
在复制的启动项添加 VM Option
,具体内容如下:
-Dserver.port=9092
-Dcluster.node-name=node2
2. 工程启动
完成后启动上述两个工程与 cluster-center
工程,控制台输出日志如下启动成功。
3. 请求测试
通过多次请求 cluster-center
接口可以看到控制台打印按照我们预期将请求在节点列表中实现了轮询。