阻塞与非阻塞式客户端

阻塞与非阻塞

阻塞是指程序会一直等待该进程或线程完成当前任务期间不做其它事情。而非阻塞,是指当前线程在处理一些事情的同时,还可以处理其它的事情,并不需要等待当前事件完成才执行其它事件。

阻塞与非阻塞客户端

对于请求当中,我们有需要借助一些请求封装的客户端,这里可以分为两大类:阻塞式、非阻塞式。

阻塞式客户端以常见的 RestTemplate为例,这是一种常见的客户端请求封装,要创建负载平衡RestTemplate,下面看看其Bean:

1
2
3
4
5
@LoadBalanced
@Bean
public RestTemplate restTemplate() {
return new RestTemplate();
}

在底层,RestTemplate 使用了基于每个请求对应一个线程模型(thread-per-request)的 Java Servlet API。在阻塞客户端中,这意味着,直到 Web 客户端收到响应之前,线程都将一直被阻塞下去。而阻塞带来的问题是:每个线程都消耗了一定的内存和 CPU 周期。

如果在并发下,等待结果的请求迟早都会堆积起来。这样,程序将创建很多线程,这些线程将耗尽线程池或占用所有可用内存。由于频繁的 CPU 线程切换,我们还会遇到性能下降的问题。

这在 Spring5 中,提出了一种新的客户端抽象:反应式客户端 WebClient,而 WebClient 使用了 Spring Reactive Framework 所提供的异步非阻塞解决方案。所以,当 RestTemplate创建一个个新的线程时,Webclient是为其创建类似task的线程,并且在底层,
Reactive 框架将对这些 task 进行排队,并且仅在适当的响应可用时再执行它们。WebClient 是 Spring WebFlux 库的一部分。所以,我们还可以使用了流畅的函数式 API 编程,并将响应类型作为声明来进行组合。如果需要使用 WebClient,同样可以创建:

1
2
3
4
5
@Bean
@LoadBalanced
public WebClient.Builder loadBalancedWebClientBuilder() {
return WebClient.builder();
}

案例

假设这里有一个响应非常慢的服务rest-service,我们分别用阻塞式、非阻塞式客户端来测试一下。

阻塞式

我们利用 RestTemplate实现阻塞式请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Bean
@LoadBalanced
public RestTemplate restTemplate() {
return new RestTemplate();
}


@Autowired
RestTemplate restTemplate;

@GetMapping("/getClientRes")
public Response<Object> getClientRes() throws Exception {
System.out.println("block api enter");
HttpHeaders headers = new HttpHeaders();
MediaType type = MediaType.parseMediaType("application/json; charset=UTF-8");
headers.setContentType(type);
headers.add("Accept", MediaType.APPLICATION_JSON.toString());
HttpEntity<String> formEntity = new HttpEntity<String>(null, headers);
String body = "";
try {
ResponseEntity<String> responseEntity = restTemplate.exchange("http://diff-ns-service-service/getservicedetail?servicename=cas-server-service",
HttpMethod.GET, formEntity, String.class);
System.out.println(JSON.toJSONString(responseEntity));
if (responseEntity.getStatusCodeValue() == 200) {
System.out.println("block api exit");
return Response.ok(responseEntity.getBody());
}
} catch (Exception e) {
System.out.println(e.getMessage());
}
System.out.println("block api failed, exit");
return Response.error("failed");
}

在启动服务请求后,发现其打印:

1
2
3
4
5
block api enter

[{"host":"10.244.0.55","instanceId":"71f96128-3bb1-11ec-97e6-ac1f6ba00d36","metadata":{"kubectl.kubernetes.io/last-applied-configuration":"{\"apiVersion\":\"v1\",\"kind\":\"Service\",\"metadata\":{\"annotations\":{},\"name\":\"cas-server-service\",\"namespace\":\"system-server\"},\"spec\":{\"ports\":[{\"name\":\"cas-server01\",\"port\":2000,\"targetPort\":\"cas-server01\"}],\"selector\":{\"app\":\"cas-server\"}}}\n","port.cas-server01":"2000","k8s_namespace":"system-server"},"namespace":"system-server","port":2000,"scheme":"http","secure":false,"serviceId":"cas-server-service","uri":"http://10.244.0.55:2000"},{"host":"10.244.0.56","instanceId":"71fc1c14-3bb1-11ec-97e6-ac1f6ba00d36","metadata":{"$ref":"$[0].metadata"},"namespace":"system-server","port":2000,"scheme":"http","secure":false,"serviceId":"cas-server-service","uri":"http://10.244.0.56:2000"}]

block api exit

非阻塞式

上面的打印符合我们的逾期,接下来我们来看看非阻塞、反应式客户端请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Bean
@LoadBalanced
public WebClient.Builder loadBalancedWebClientBuilder() {
return WebClient.builder();
}


@GetMapping(value = "/getClientResByWebClient", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Mono<String> getClientResByWebClient() throws Exception {
System.out.println("no block api enter");
Mono<String> resp = webClientBuilder.build().get()
.uri("http://diff-ns-service-service/getservicedetail?servicename=cas-server-service").retrieve()
.bodyToMono(String.class);
resp.subscribe(body -> System.out.println(body.toString()));
System.out.println("no block api exit");
return resp;
}

执行完代码后,看打印:

1
2
3
4
5
no block api enter

no block api exit

[{"host":"10.244.0.55","instanceId":"71f96128-3bb1-11ec-97e6-ac1f6ba00d36","metadata":{"kubectl.kubernetes.io/last-applied-configuration":"{\"apiVersion\":\"v1\",\"kind\":\"Service\",\"metadata\":{\"annotations\":{},\"name\":\"cas-server-service\",\"namespace\":\"system-server\"},\"spec\":{\"ports\":[{\"name\":\"cas-server01\",\"port\":2000,\"targetPort\":\"cas-server01\"}],\"selector\":{\"app\":\"cas-server\"}}}\n","port.cas-server01":"2000","k8s_namespace":"system-server"},"namespace":"system-server","port":2000,"scheme":"http","secure":false,"serviceId":"cas-server-service","uri":"http://10.244.0.55:2000"},{"host":"10.244.0.56","instanceId":"71fc1c14-3bb1-11ec-97e6-ac1f6ba00d36","metadata":{"$ref":"$[0].metadata"},"namespace":"system-server","port":2000,"scheme":"http","secure":false,"serviceId":"cas-server-service","uri":"http://10.244.0.56:2000"}]

在本例中,WebClient 返回一个 Mono 生产者后完成方法的执行。如果一旦结果可用,发布者将开始向其订阅者发送数据。调用这个API的客户端(浏览器)也将订阅返回的 Mono 对象。

阻塞式转非阻塞式

可以将前面的阻塞式请求,直接转为非阻塞请求,前提是你使用的是 Spring5,此时,可以直接这样来写,贴代码:

1
2
3
4
5
@GetMapping("/hello")
public Mono<String> hello() {
return Mono.fromCallable(() -> restTemplate.getForObject("http://diff-ns-service-service/all/getService", String.class))
.subscribeOn(Schedulers.elastic());
}

这样后,在请求访问时,直接返回了提供者服务返回的信息体:

1
{"result":{"status":200,"code":0,"msg":"success"},"data":[{"host":"10.244.0.55","instanceId":"71f96128-3bb1-11ec-97e6-ac1f6ba00d36","metadata":{"kubectl.kubernetes.io/last-applied-configuration":"{\"apiVersion\":\"v1\",\"kind\":\"Service\",\"metadata\":{\"annotations\":{},\"name\":\"cas-server-service\",\"namespace\":\"system-server\"},\"spec\":{\"ports\":[{\"name\":\"cas-server01\",\"port\":2000,\"targetPort\":\"cas-server01\"}],\"selector\":{\"app\":\"cas-server\"}}}\n","port.cas-server01":"2000","k8s_namespace":"system-server"},"namespace":"system-server","port":2000,"scheme":"http","secure":false,"serviceId":"cas-server-service","uri":"http://10.244.0.55:2000"},{"host":"10.244.0.56","instanceId":"71fc1c14-3bb1-11ec-97e6-ac1f6ba00d36","metadata":{"$ref":"$[0].metadata"},"namespace":"system-server","port":2000,"scheme":"http","secure":false,"serviceId":"cas-server-service","uri":"http://10.244.0.56:2000"}]}

这里需要注意的是,请求时,需要直接返回服务提供者的标准信息体,不能再作二次封装返回,否则,只能拿到信息:

1
{"result":{"status":200,"code":0,"msg":"success"},"data":{"scanAvailable":true}}

表示本次 callable 为true,但这不是我们需要的信息,我们还是需要其本身返回的业务数据。所以需要提供者的返回标准化,因为直接将信息返回给可接收的浏览器等前端。

结论

在大部分场景下, RestTemplate 还是继续被使用的,但有些场景下,反应式非阻塞请求还是必须的,系统资源要少得多。WebClient不失为是一个更好的选择。



结束福利

开源实战利用 k8s 作微服务的架构设计代码:

https://gitee.com/damon_one/spring-cloud-k8s

欢迎大家 star,多多指教。


关于作者

  笔名:Damon,技术爱好者,长期从事 Java 开发、Spring Cloud 的微服务架构设计,以及结合 Docker、K8s 做微服务容器化,自动化部署等一站式项目部署、落地。目前主要从事基于 K8s 云原生架构研发的工作。Golang 语言开发,长期研究边缘计算框架 KubeEdge、调度框架 Volcano 等。公众号 交个朋友之猿天地 发起人。个人微信 DamonStatham,星球:《交个朋友之猿田地》,个人网站:交个朋友之猿天地 | 微服务 | 容器化 | 自动化,欢迎來撩。


欢迎关注:InfoQ

欢迎关注:腾讯自媒体专栏


欢迎关注

公号:交个朋友之猿天地

公号:damon8

公号:天山六路折梅手


打赏
  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!
  • Copyrights © 2020-2023 交个朋友之猿天地
  • Powered By Hexo | Title - Nothing
  • 访问人数: | 浏览次数:

请我喝杯咖啡吧~

支付宝
微信