这是一个基于 Java
的分布式远程过程调用工具。
服务提供者注册到注册中心后,服务消费者查询服务,挑选合适的服务提供者并调用服务(支持同步调用和异步调用)。
当前阶段均为 SNAPSHOT
版本,暂时不提供依赖配置。
你可以使用命令 git clone git@github.com:CongLinDev/clrpc.git
克隆到本地自行打包。
// define a service interface
interface EchoService {
String hello(String arg);
String hi(String arg);
}
// implements interface EchoService
class EchoServiceImpl implements EchoService {
@Override
public String hello(String arg) {
return "Hello " + arg;
}
@Override
public String hi(String arg) {
return "Hi " + arg;
}
}
// 创建简单的服务对象
ServiceObject<EchoService> serviceObject = new SimpleServiceObject.Builder<>(EchoService.class)
.name("EchoService")
.object(new EchoServiceImpl())
.build();
// 创建服务提供者
ProviderBootstrap bootstrap = new ProviderBootstrap();
// 发布服务并开启服务
bootstrap.registry(ZooKeeperServiceRegistry::getInstance) // 设置注册中心
.publish(serviceObject) // 发布服务对象
.hookStop() // 注册关闭钩子,用于优雅关闭服务提供者
.start(new CommonOption());
// 创建简单的服务接口对象
ServiceInterface<EchoService> serviceInterface = new SimpleServiceInterface.Builder<>(EchoService.class)
.name("EchoService")
.build();
// 创建服务消费者
ConsumerBootstrap bootstrap = new ConsumerBootstrap();
// 开启服务消费者
bootstrap.registry(ZooKeeperServiceRegistry::getInstance)
.start(new CommonOption());
// 提前刷新需要订阅的服务
bootstrap.subscribe(serviceInterface);
//使用同步服务
EchoService syncService = bootstrap.syncService(serviceInterface);
String result = syncService.hello("I am consumer!"); // 一直阻塞,直到返回结果
// 使用异步服务
EchoService asyncService = bootstrap.asyncService(serviceInterface);
String fakeResult = asyncService.hello("I am consumer!"); // 直接返回默认值
assert fakeResult == null;
InvocationFuture future = InvocationContext.lastContext().getFuture(); // 获取该线程最新一次操作的产生的future对象
future.callback(new Callback() { // 使用回调处理结果
@Override
public void success(Object res) {}
@Override
public void fail(Exception e) {}
});
// 关闭服务消费者
bootstrap.stop();
// 创建服务接口对象
ServiceInterface<EchoService> serviceInterface = new SimpleServiceInterface.Builder<>(EchoService.class)
.name("EchoService")
.build();
// 创建服务消费者
ConsumerBootstrap bootstrap = new ConsumerBootstrap();
// 开启服务消费者
bootstrap.registry(ZooKeeperServiceRegistry::getInstance)
.start(new CommonOption());
// 提前刷新需要订阅的服务
bootstrap.subscribe(serviceInterface);
TransactionManager manager = (TransactionManager)bootstrap.object(ZooKeeperTransactionManager.class);
EchoService service = manager.asyncService(serviceInterface); // get proxy from TransactionManager instead of bootstrap
manager.begin(); // 事务开启
service.hello("first request"); // 异步发送第一条请求
InvocationFuture f1 = InvocationContext.lastContext().getFuture(); // 获取第一条请求产生的future对象
service.hi("second request"); // 异步发送第二条请求
InvocationFuture f2 = InvocationContext.lastContext().getFuture(); // 获取第二条请求产生的future对象
TransactionInvocationContext context = manager.commit(); // 事务提交 返回事务 上下文
// 销毁 TransactionManager
ObjectLifecycleUtils.destroy(manager);
// 关闭服务消费者
bootstrap.stop();
@ServiceInterface(name = "EchoService")
interface EchoService {
String hello(String arg);
String hi(String arg);
}
@ServiceObject(interfaceClass = EchoService.class, name = "EchoService")
class EchoServiceImpl implements EchoService {
@Override
public String hello(String arg) {
return "Hello " + arg;
}
@Override
public String hi(String arg) {
return "Hi " + arg;
}
}
// build ServiceInterface
ServiceInterface<EchoService> serviceInterface = new AnnotationServiceInterface<>(EchoService.class);
// build ServiceObject
ServiceObject<EchoService> serviceObject = new AnnotationServiceObject<>(EchoServiceImpl.class);
默认配置文件名为 config.properties
。
配置文件位置默认在项目 resources
目录下,默认格式为 properties
,默认文件为 config.properties
。
Field | Type | Required | Default | Remark |
---|---|---|---|---|
instance.id | String | True | 服务实例id | |
instance.address | String | True | 服务实例地址 | |
registry.zookeeper.connection | String | TRUE | 注册中心ZooKeeper地址 | |
registry.zookeeper.session-timeout | Integer | FALSE | 5000 | 注册中心ZooKeeper超时 单位是毫秒 |
registry.zookeeper.path | String | FALSE | /clrpc | 注册中心ZooKeeper基础路径 |
netty.io-thread.number | Integer | False | 4 | netty IO线程数 |
invocation.retry.check-period | Integer | False | 3000 | 重试机制执行周期 单位是毫秒 |
invocation.retry.initial-threshold | Integer | False | 3000 | 初始重试时间门槛 单位是毫秒 |
Field | Type | Required | Default | Remark |
---|---|---|---|---|
extension.transaction.zookeeper.connection | String | TRUE | 事务ZooKeeper地址 | |
extension.transaction.zookeeper.session-timeout | Integer | FALSE | 5000 | 事务ZooKeeper超时 单位是毫秒 |
extension.transaction.zookeeper.path | String | FALSE | /clrpc | 事务ZooKeeper基础路径 |
clrpc 使用 ZooKeeper 实现了类似于两段式提交(2PC)的分布式事务协调服务。
注意:该服务仅支持 返回值为 conglin.clrpc.extension.transaction.TransactionResult
及其子类的服务方法。不满足条件的服务方法执行方式与普通调用相同。
- 阶段一 执行服务方法,并返回
conglin.clrpc.extension.transaction.TransactionResult
对象。并通过conglin.clrpc.extension.transaction.TransactionResult#result()
方法获取一阶段提交结果,返回给调用者。 - 阶段二 从
conglin.clrpc.extension.transaction.TransactionResult#callback()
获取conglin.clrpc.common.Callback
对象。根据实际需求执行conglin.clrpc.common.Callback#success(Object)
或conglin.clrpc.common.Callback#fail(Exception)
方法。
流程图如下:
使用者通过实现 conglin.clrpc.service.registry.ServiceRegistry
接口来实现注册中心。
在启动前通过 conglin.clrpc.bootstrap.Bootstrap#registry(ServiceRegistryFactory)
传入即可完成对注册中心的扩展。
clrpc 提供一个基于 ZooKeeper 的实现:conglin.clrpc.thirdparty.zookeeper.registry.ZooKeeperServiceRegistry
。
使用者通过实现 conglin.clrpc.service.loadbalance.LoadBalancer
接口来实现负载均衡器。
在订阅服务时通过 conglin.clrpc.bootstrap.ConsumerBootstrap#subscribe(ServiceInterface<?>, Class<? extends LoadBalancer<?, ?>>)
传入即可完成对注册中心的扩展。
对于不同的服务,允许使用不同的负载均衡器即可以使用不同的负载均衡策略。
clrpc 提供三种策略实现:
- 基于树的一致性哈希。(默认策略)
- 基于数组的随机。
- 基于数组的轮询。
使用者通过实现 conglin.clrpc.invocation.identifier.IdentifierGenerator
接口来实现ID生成器。
在启动时通过 conglin.clrpc.bootstrap.option.BootOption#identifierGenerator(IdentifierGenerator)
传入即可完成对ID生成器的扩展。
clrpc 提供四种策略实现:
- 雪花算法。(默认策略)
- 随机数生成。
- 基于ZooKeeper的全局顺序id生成。
- 基于ZooKeeper的服务顺序id生成。
使用者通过实现 conglin.clrpc.invocation.serialization.SerializationHandler
接口来实现序列化处理器。
在启动时通过 conglin.clrpc.bootstrap.option.BootOption#serializationHandler(SerializationHandler)
传入即可完成对序列化处理器的扩展。
clrpc 提供一种策略实现:
- Protostuff。(默认策略)
使用者通过实现 conglin.clrpc.service.instance.codec.ServiceInstanceCodec
接口来实现ServiceInstance序列化处理器。
在启动时通过 conglin.clrpc.bootstrap.option.BootOption#serviceInstanceCodec(SerializationHandler)
传入即可完成对ServiceInstance序列化处理器的扩展。
使用者通过实现 conglin.clrpc.invocation.protocol.ProtocolDefinition
接口来实现消息类型协议。
在启动时通过 conglin.clrpc.bootstrap.option.BootOption#protocolDefinition(ProtocolDefinition)
传入即可完成对消息类型协议的扩展。
clrpc 提供一个处理器链作为处理消息的责任链,并提供消息处理扩展点。
使用者实现 conglin.clrpc.bootstrap.ProviderBootstrap#registerExecutor(ChainExecutor)
接口来注册处理器。
使用者实现 conglin.clrpc.bootstrap.ConsumerBootstrap#registerExecutor(ChainExecutor)
接口来注册处理器。
使用者通过实现 conglin.clrpc.invocation.strategy.FailStrategy
接口来提供请求失败策略。
clrpc 提供三种策略实现:
- FailFast: 失败时抛出异常。(默认策略)
- FailOver: 失败时重试。
- FailSafe: 失败时不抛出异常,返回默认值。
使用者可以根据需求,任意组合或自定义失败策略。
请求超时时间,单位是毫秒。若小于0则永不超时。
使用者通过实现 conglin.clrpc.service.instance.condition.InstanceCondition
接口来选择服务实例。
使用者可以通过提供对象并实现以下接口来保证自己的扩展对象拥有 clrpc 对象生命周期。
- 使用前
conglin.clrpc.lifecycle.ComponentContextAware
conglin.clrpc.lifecycle.Initializable
- 使用后
conglin.clrpc.lifecycle.Destroyable
(没有经过托管的对象不具备该周期,需要开发者手动处理)
clrpc Provider 处同时支持以下两种方式的方法查询。
- 第一种为普通方式:该方式使用
java.lang.Class#getMethod(String, Class<?>[])
在执行时查找对应的Method
。 - 第二种为 clrpc 自定义方式(不支持用户自定义):该方式使用缓存的方式保存服务
Method
。- 当用户发布对应的服务时,会对该服务下的所有
Method
进行分类缓存。 - 分类的规则为重载的
Method
且其参数个数,例如#hello(String, Object)
的缓存key为$hello$2
。 - 对于缓存key相同的
Method
,根据其Method
参数类型继承顺序进行排序,例如#hello(String, Integer)
将排在#hello(String, Object)
前面。 - 执行时根据请求参数类型按顺序匹配
Method
参数类型。
- 当用户发布对应的服务时,会对该服务下的所有
Customer 处需要配合代理来选择使用哪种方式。
对于 conglin.clrpc.invocation.proxy.ServiceInterfaceObjectProxy
及其子类默认使用第二种方式。
若需要更换方法查询方式,对方法 conglin.clrpc.invocation.proxy.AbstractObjectProxy#getMethodName(Method)
重写即可。