sofa-rpc源码解析-服务发布过程

SOFA 中间件是蚂蚁金服自主研发的金融级分布式中间件,包含了构建金融级云原生架构所需的各个组件,包括微服务研发框架,RPC 框架,服务注册中心,分布式定时任务,限流/熔断框架,动态配置推送,分布式链路追踪,Metrics监控度量,分布式高可用消息队列,分布式事务框架,分布式数据库代理层等组件,是一套分布式架构的完整的解决方案,也是在金融场景里锤炼出来的最佳实践。

功能特性

  • 透明化、高性能的远程服务调用
  • 支持多种服务路由及负载均衡策略
  • 支持多种注册中心的集成
  • 支持多种协议,包括 Bolt、Rest、Dubbo 等
  • 支持同步、单向、回调、泛化等多种调用方式
  • 支持集群容错、服务预热、自动故障隔离
  • 强大的扩展功能,可以按需扩展各个功能组件
  • 本源码解析系列将围绕这些特性展开.

编写服务端实现

第一步:创建接口

1
2
3
4
5
6
/**
* Quick Start demo interface
*/

public interface HelloService {
String sayHello(String string);
}

第二步:创建接口实现

1
2
3
4
5
6
7
8
9
10
/**
* Quick Start demo implement
*/

public class HelloServiceImpl implements HelloService {
@Override
public String sayHello(String string) {
System.out.println("Server receive: " + string);
return "hello " + string + " !";
}
}

第三步:编写服务端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* Quick Start Server
*/

public class QuickStartServer {

public static void main(String[] args) {
ServerConfig serverConfig = new ServerConfig()
.setProtocol("bolt") // 设置一个协议,默认bolt
.setPort(12200) // 设置一个端口,默认12200
.setDaemon(false); // 非守护线程

ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>()
.setInterfaceId(HelloService.class.getName()) // 指定接口
.setRef(new HelloServiceImpl()) // 指定实现
.setServer(serverConfig); // 指定服务端

providerConfig.export(); // 发布服务
}
}

编写客户端实现

第一步:拿到服务端接口

一般服务端会通过jar的形式将接口类提供给客户端。而在本例中,由于服务端和客户端在一个工程所以跳过。

第二步:编程客户端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* Quick Start client
*/

public class QuickStartClient {
public static void main(String[] args) {
ConsumerConfig<HelloService> consumerConfig = new ConsumerConfig<HelloService>()
.setInterfaceId(HelloService.class.getName()) // 指定接口
.setProtocol("bolt") // 指定协议
.setDirectUrl("bolt://127.0.0.1:12200"); // 指定直连地址
// 生成代理类
HelloService helloService = consumerConfig.refer();
while (true) {
System.out.println(helloService.sayHello("world"));
try {
Thread.sleep(2000);
} catch (Exception e) {
}
}
}
}

服务发布

服务发布过程涉及到三个类 RegistryConfig ,ServerConfig ,ProviderConfig 。

1.RegistryConfig

1
2
3
RegistryConfig registryConfig = new RegistryConfig()
.setProtocol("zookeeper")
.setAddress("127.0.0.1:2181")

RegistryConfig 表示注册中心。如上声明了服务注册中心的地址和端口是127.0.0.1:2181,协议是 Zookeeper 。

2.ServerConfig

1
2
3
ServerConfig serverConfig = new ServerConfig()
.setPort(8803)
.setProtocol("bolt");

ServerConfig 表示服务运行容器。如上声明了一个使用8803端口和 bolt 协议的 server 。
3.ProviderConfig

1
2
3
4
5
6
ProviderConfig<HelloWorldService> providerConfig = new ProviderConfig<HelloWorldService>()
.setInterfaceId(HelloWorldService.class.getName())
.setRef(new HelloWorldServiceImpl())
.setServer(serverConfig)
.setRegistry(registryConfig);
providerConfig.export();

ProviderConfig 表示服务发布。如上声明了服务的接口,实现和该服务运行的 server 。 最终通过 export 方法将这个服务发布出去了。

启动底层Netty服务端

1
2
3
4
5
6
7
8
9
/**
* 发布服务
*/

public synchronized void export() {
if (providerBootstrap == null) {
providerBootstrap = Bootstraps.from(this);
}
providerBootstrap.export();
}

发布服务过程首先会初始化一个发布服务的包装类 ProviderBootstrap(DefaultProviderBootstrap),从该实例持有服务提供者配置信息。还提供了延迟加载的特性(单位毫秒)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
public void export() {
if (providerConfig.getDelay() > 0) { // 延迟加载,单位毫秒
Thread thread = factory.newThread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(providerConfig.getDelay());
} catch (Throwable ignore) { // NOPMD
}
doExport();
}
});
thread.start();
} else {
doExport();
}
}

doExport会做一些必要的参数检查,例如服务发布次数限制、服务下方法的黑白名单。最重要的是构建请求调用器也就是请求处理器,并将其与Server绑定。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 构造请求调用器
providerProxyInvoker = new ProviderProxyInvoker(providerConfig);
// 初始化注册中心
if (providerConfig.isRegister()) {
List<RegistryConfig> registryConfigs = providerConfig.getRegistry();
if (CommonUtils.isNotEmpty(registryConfigs)) {
for (RegistryConfig registryConfig : registryConfigs) {
RegistryFactory.getRegistry(registryConfig); // 提前初始化Registry
}
}
}
// 将处理器注册到server
List<ServerConfig> serverConfigs = providerConfig.getServer();
for (ServerConfig serverConfig : serverConfigs) {
try {
Server server = serverConfig.buildIfAbsent();
// 注册序列化接口
server.registerProcessor(providerConfig, providerProxyInvoker);
if (serverConfig.isAutoStart()) {
server.start();
}
} catch (SofaRpcRuntimeException e) {
throw e;
} catch (Exception e) {
LOGGER.errorWithApp(appName, "Catch exception when register processor to server: "
+ serverConfig.getId(), e);
}
}

providerProxyInvoker是服务端调用链的入口,rpc请求将会沿着这个入口执行。