1.前言
本文章是笔主在声哥的手写RPC框架的学习下,对注册中心的一个拓展。因为声哥某些部分没有保留拓展性,所以本文章的项目与声哥的工程有部分区别,核心内容在Curator的注册发现与注销,思想看准即可。
本文章Git仓库:zko0/zko0-rpc
声哥的RPC项目写的确实很详细,跟学一遍受益匪浅:
何人听我楚狂声的博客
在声哥的项目里使用Nacos作为了服务注册中心。本人拓展添加了ZooKeeper实现服务注册。
Nacos的服务注册和发现,设计的不是非常好,每次服务的发现都需要去注册中心拉取。本人实现ZooKeeper注册中心时,参考了Dubbo的设计原理,结合本人自身想法,添加了本地缓存:
- Client发现服务后缓存在本地,维护一个服务——实例列表
- 当监听到注册中心的服务列表发生了变化,Client更新本地列表
- 当注册中心宕机,Client能够依靠本地的服务列表继续提供服务
问题:
- 实现服务注册的本地缓存,还需要实现注册中心的监听,当注册中心的服务发生更改时能够实现动态更新。或者用轮训的方式,定时更新,不过这种方式的服务实时性较差
- 当Server宕机,非临时节点注册容易出现服务残留无法清除的问题。所以我建议全部使用临时节点去注册。
2.内容
zookeeper需要简单学一下,知识内容非常简单,搭建也很简单,在此跳过。
如果你感兴趣,可以参考我的ZooKeeper的文章:Zookeeper学习笔记 - zko0
①添加依赖
Curator:(简化ZooKeeper客户端使用)(Netfix研发,捐给Apache,是Apache顶级项目)
这里排除slf4j依赖,因为笔主使用的slf4j存在冲突
<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
②代码编写
1.首先创建一个连接类:
@Slf4j
public class ZookeeperUtil {
//内部化构造方法
private ZookeeperUtil(){
}
private static final String SERVER_HOSTNAME= RegisterCenterConfig.getHostName();
private static final Integer SERVER_PORT=RegisterCenterConfig.getServerPort();
private static CuratorFramework zookeeperClient;
public static CuratorFramework getZookeeperClient(){
if (zookeeperClient==null){
synchronized (ZookeeperUtil.class){
if (zookeeperClient==null){
RetryPolicy retryPolic=new ExponentialBackoffRetry(3000,10);
zookeeperClient = CuratorFrameworkFactory.builder()
.connectString(SERVER_HOSTNAME+\":\"+SERVER_PORT)
.retryPolicy(retryPolic)
// zookeeper根目录为/serviceRegister,不为/
.namespace(\"serviceRegister\")
.build();
zookeeperClient.start();
}
}
}
return zookeeperClient;
}
public static String getServerHostname(){
return SERVER_HOSTNAME;
}
public static Integer getServerPort(){
return SERVER_PORT;
}
}
其中HOST,PORT信息我保存在regiserCenter.properties配置文件夹中,使用类读取:
public class RpcConfig {
//注册中心类型
private static String registerCenterType;
//序列化类型
private static String serializerType;
//负载均衡类型
private static String loadBalanceType;
//配置Nacos地址
private static String registerCenterHost;
private static Integer registerCenterPort;
private static boolean zookeeperDestoryIsEphemeral;
private static String serverHostName;
private static Integer serverPort;
static {
ResourceBundle bundle = ResourceBundle.getBundle(\"rpc\");
registerCenterType=bundle.getString(\"registerCenter.type\");
loadBalanceType=bundle.getString(\"loadBalance.type\");
registerCenterHost=bundle.getString(\"registerCenter.host\");
registerCenterPort = Integer.parseInt(bundle.getString(\"registerCenter.port\"));
try {
zookeeperDestoryIsEphemeral=\"true\".equals(bundle.getString(\"registerCenter.destory.isEphemeral\"));
} catch (Exception e) {
zookeeperDestoryIsEphemeral=false;
}
serializerType=bundle.getString(\"serializer.type\");
serverHostName=bundle.getString(\"server.hostName\");
serverPort=Integer.parseInt(bundle.getString(\"server.port\"));
}
public static String getRegisterCenterType() {
return registerCenterType;
}
public static String getSerializerType() {
return serializerType;
}
public static String getLoadBalanceType() {
return loadBalanceType;
}
public static String getRegisterCenterHost() {
return registerCenterHost;
}
public static Integer getRegisterCenterPort() {
return registerCenterPort;
}
public static String getServerHostName() {
return serverHostName;
}
public static Integer getServerPort() {
return serverPort;
}
public static boolean isZookeeperDestoryIsEphemeral() {
return zookeeperDestoryIsEphemeral;
}
}
下面的代码我和声哥有些不同,我将服务注册,注销方法放在ServerUtils中,服务发现方法放在ClientUtils中:
服务的高一致性存在两种做法:
- 因为ZooKeeper存在临时节点,注册中心可以实现Client(RPC的Server)断开,注册服务信息的自动丢失
- 不设置为临时节点,手动的服务注册清除
我这里两种都实现了,虽然做两种方式不同但是功能相同的代码放在一起看起来很奇怪,这里只是做演示。选择其中一种即可。(我建议使用临时节点,当Server宕机,残留的服务信息也能及时清除)
注册实现原理图:
接口:
public interface ServiceDiscovery {
InetSocketAddress searchService(String serviceName);
void cleanLoaclCache(String serviceName);
}
public interface ServiceRegistry {
//服务注册
void register(String serviceName, InetSocketAddress inetAddress);
void cleanRegistry();
}
ZooKeeper接口实现:
public class ZookeeperServiceDiscovery implements ServiceDiscovery{
private final LoadBalancer loadBalancer;
public ZookeeperServiceDiscovery(LoadBalancer loadBalancer) {
this.loadBalancer = loadBalancer;
}
@Override
public InetSocketAddress searchService(String serviceName) {
return ZookeeperClientUtils.searchService(serviceName,loadBalancer);
}
@Override
public void cleanLoaclCache(String serviceName) {
ZookeeperClientUtils.cleanLocalCache(serviceName);
}
}
public class ZookeeperServiceRegistry implements ServiceRegistry{
@Override
public void register(String serviceName, InetSocketAddress inetAddress) {
ZookeeperServerUitls.register(serviceName,inetAddress);
}
@Override
public void cleanRegistry() {
ZookeeperServerUitls.cleanRegistry();
}
}
Factory工厂:
public class ServiceFactory {
private static String center = RpcConfig.getRegisterCenterType();
private static String lb= RpcConfig.getLoadBalanceType();
private static ServiceRegistry registry;
private static ServiceDiscovery discovery;
private static Object registerLock=new Object();
private static Object discoveryLock=new Object();
public static ServiceDiscovery getServiceDiscovery(){
if (discovery==null){
synchronized (discoveryLock){
if (discovery==null){
if (\"nacos\".equalsIgnoreCase(center)){
discovery= new NacosServiceDiscovery(LoadBalancerFactory.getLoadBalancer(lb));
}else if (\"zookeeper\".equalsIgnoreCase(center)){
discovery= new ZookeeperServiceDiscovery(LoadBalancerFactory.getLoadBalancer(lb));
}
}
}
}
return discovery;
}
public static ServiceRegistry getServiceRegistry(){
if (registry==null){
synchronized (registerLock){
if (registry==null){
if (\"nacos\".equalsIgnoreCase(center)){
registry= new NacosServiceRegistry();
}else if (\"zookeeper\".equalsIgnoreCase(center)){
registry= new ZookeeperServiceRegistry();
}
}
}
}
return registry;
}
}
使用Gson序列化InetSocketAddress存在问题,编写Util:
public class InetSocketAddressSerializerUtil {
public static String getJsonByInetSockerAddress(InetSocketAddress address){
HashMap<String, String> map = new HashMap<>();
map.put(\"host\",address.getHostName());
map.put(\"port\",address.getPort()+\"\");
return new Gson().toJson(map);
}
public static InetSocketAddress getInetSocketAddressByJson(String json){
HashMap<String,String> hashMap = new Gson().fromJson(json, HashMap.class);
String host = hashMap.get(\"host\");
Integer port=Integer.parseInt(hashMap.get(\"port\"));
return new InetSocketAddress(host,port);
}
}
上面主要是注册,发现的逻辑,我把主要方法写在了Utils中:
@Slf4j
public class ZookeeperServerUitls {
private static CuratorFramework client = ZookeeperUtil.getZookeeperClient();
private static final Set<String> instances=new ConcurrentHashSet<>();
public static void register(String serviceName, InetSocketAddress inetSocketAddress){
serviceName=ZookeeperUtil.serviceName2Path(serviceName);;
String uuid = UUID.randomUUID().toString();
serviceName=serviceName+\"/\"+uuid;
String json = InetSocketAddressSerializerUtil.getJsonByInetSockerAddress(inetSocketAddress);
try {
if (RpcConfig.isZookeeperDestoryIsEphemeral()){
//会话结束节点,创建消失
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath(serviceName,json.getBytes());
} else {
client.create()
.creatingParentsIfNeeded()
.forPath(serviceName,json.getBytes());
}
}
catch (Exception e) {
log.error(\"服务注册失败\");
throw new RpcException(RpcError.REGISTER_SERVICE_FAILED);
}
//放入map
instances.add(serviceName);
}
public static void cleanRegistry(){
log.info(\"注销所有注册的服务\");
//如果自动销毁,不需要清除
if (RpcConfig.isZookeeperDestoryIsEphemeral()) return;
if (ZookeeperUtil.getServerHostname()!=null&&ZookeeperUtil.getServerPort()!=null&&!instances.isEmpty()){
for (String path:instances) {
try {
client.delete().forPath(path);
} catch (Exception e) {
log.error(\"服务注销失败\");
throw new RpcException(RpcError.DESTORY_REGISTER_FALL);
}
}
}
}
}
@Slf4j
public class ZookeeperClientUtils {
private static CuratorFramework client = ZookeeperUtil.getZookeeperClient();
private static final Map<String, List<InetSocketAddress>> instances=new ConcurrentHashMap<>();
public static InetSocketAddress searchService(String serviceName, LoadBalancer loadBalancer) {
InetSocketAddress address;
//本地缓存查询
if (instances.containsKey(serviceName)){
List<InetSocketAddress> addressList = instances.get(serviceName);
if (!addressList.isEmpty()){
//使用lb进行负载均衡
return loadBalancer.select(addressList);
}
}
try {
String path = ZookeeperUtil.serviceName2Path(serviceName);
//获取路径下所有的实现
List<String> instancePaths = client.getChildren().forPath(path);
List<InetSocketAddress> addressList = new ArrayList<>();
for (String instancePath : instancePaths) {
byte[] bytes = client.getData().forPath(path+\"/\"+instancePath);
String json = new String(bytes);
InetSocketAddress instance = InetSocketAddressSerializerUtil.getInetSocketAddressByJson(json);
addressList.add(instance);
}
addLocalCache(serviceName,addressList);
return loadBalancer.select(addressList);
} catch (Exception e) {
log.error(\"服务获取失败====>{}\",e);
throw new RpcException(RpcError.SERVICE_NONE_INSTANCE);
}
}
public static void cleanLocalCache(String serviceName){
log.info(\"服务调用失败,清除本地缓存,重新获取实例===>{}\",serviceName);
instances.remove(serviceName);
}
public static void addLocalCache(String serviceName,List<InetSocketAddress> addressList){
//直接替换原本的缓存
instances.put(serviceName,addressList);
}
}
③配置文件
rpc.properties放在resources下
#nacos zookeeper
#registerCenter.type=nacos
registerCenter.type=zookeeper
#registerCenter.host=127.0.0.1
registerCenter.host=101.43.244.40
#zookeeper port default 2181
#registerCenter.port=9000
registerCenter.port=2181
registerCenter.destory.isEphemeral=false
#??random?roundRobin
loadBalance.type=random
#kryo json jdk
serializer.type=kryo
server.hostName=127.0.0.1
server.port=9999
④更多
声哥的代码我做了很多修改,如果上述代码和你参考的项目代码出入比较大,可以查看本文章的工程阅读。
来源:https://www.cnblogs.com/zko0/p/17130553.html
本站部分图文来源于网络,如有侵权请联系删除。