SpringCloud-6-消息和异步

1.异步和消息

1.简介

  1. 1.之前我们使用的方式是同步方式。
  2. 2.异步优点:a.客户端请求不会阻塞进程、服务端的相应可以是非及时的。
  3. 3.http常见是支持同步、但是http也是支持异步的。

2.异步的常见形态

  1. 1.通知
  2. 2.请求/异步响应:客户端发送请求到服务端,服务端异步响应,客户端不会阻塞,服务端默认不会立马响应。
  3. 3.消息

    3.MQ应用场景

    MQ是分布式应用系统最常用的组件。典型应用场景有如下:

  4. 1.异步处理:用户注册之后,通过短信服务、积分服务去做他们相应的操作,这样就能提高用户体验。

  5. 2.流量消峰:一般用于秒杀场景,秒杀过程中一般由于流量较大,会导致应用会挂掉、要解决这个问题。把请求放在消息队列中,如果超过消息队列的长度将抛弃此请求,返回错误信息。
  6. 3.日志处理:这个典型的组件就是kafka,kafka最初的设计就是用于日志处理,大数据里面用的特别多。通过日志采集、定时写入kafka队列,然后kafka队列定时接收 储存 和转发。
  7. 应用解耦:比如用户下单后,订单服务需要通知商品服务、之前是订单服务通过调用商品服务的接口,这样订单服务和商品服务是耦合的。使用mq,用户下单后,订单服务完成持续化处理、将消息写入消息队列、返回用户订单下单成功、商品服务来订阅这个消息采用拉或者推形式获取下单信息、商品服务获取到下单信息后进行商品的扣库存等操作。这样达到应解耦。

2.RabbitMQ的基本使用

我们Order服务使用RabbitMQ实现订单和商品服务解耦。

  1. 1.添加热rabbitmq依赖-在order服务中server子模块添加以下依赖:
1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  1. 2.添加配置到github远程仓库

  2. 3.创建MQ消息接收方

  3. 4.创建mq发送方进行测试
    我们在测试test文件目录下创建测试类:

测试出现了上面问题,原因是因为没有myQueue队列

此时正常启动。

这个时候一切正常。

  1. 5.以上是我们自己在RabbitMQ中创建的队列。我们是否可以自动创建队列,然后再调用队列接口。
1
@RabbitListener(queuesToDeclare = @Queue("myQueue1")) //自动创建队列myQueue1
  1. 6.自动创建并且和队列绑定
1
@RabbitListener(bindings =@QueueBinding(value =@Queue("myQueue"),exchange =@Exchange("myExchange")))

2.2 什么情况下需要用到exchange

我们现在是一个小小点餐系统,但是如果后面,我们什么都卖,又卖水果,又卖数码,同时对这两种商品下单,商品变多了,人也多了,订单服务是单独的人来维护,数码供应商、水果供应商都是由单独的人来维护。订单服务要根据不同的商品类型发出不同的MQ消息。相对应的、数码供应商只关注数码订单、水果及其他订单不关注。此时就牵涉到消息的分组。 演示如下:
1.我们接收方模拟两个接收服务(a.数码服务 b.水果服务)
exchange因为都是订单,我们我们叫做myOrder
想要分组归类,我们用key

2.我们发送方 (类似订单服务)如下:

3.SpringCloudStream的使用

可以参数博客:https://www.cnblogs.com/zhixiang-org-cn/p/10093367.html

3.1.基本介绍

  1. 1.应用模型:应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream 中Binder 交互,通过我们配置来绑定,而 Spring Cloud Stream 的 Binder 负责与中间件交互。所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。
  2. 2.抽象绑定器(The Binder Abstraction):Spring Cloud Stream实现Kafkat和RabbitMQ的Binder实现,也包括了一个TestSupportBinder,用于测试。你也可以写根据API去写自己的Binder.Spring Cloud Stream 同样使用了Spring boot的自动配置,并且抽象的Binder使Spring Cloud Stream的应用获得更好的灵活性,比如:我们可以在application.yml或application.properties中指定参数进行配置使用Kafka或者RabbitMQ,而无需修改我们的代码。

  3. 3.Binder(SpringCloudStream)是应用程序(Application)和消息中间件(Middleware)之间的粘合剂,使用 SpringCloudStream最大的好处莫过于对消息中间件的进一步封装。可以做到代码层面对消息中间件的无感知,设置于动态的切换中间件,但是也有局限:目前SpringCloudStream仅支持2种Binder:一种是RabbitMQ、另一种是Kafka

3.2.使用SpringCloudStream

为了详细了解SpringCloudStream的话,可以详细了解下:https://www.cnblogs.com/leeSmall/p/8900518.html这篇博文。

1.引入依赖

在order-server.xml中添加如下依赖:

1
2
3
4
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
2.添加mq的配置

之前我们已经配置过了,这里可以省略

3.使用stream发送和接收消息
  1. 1.定义接口:提供输入输出接口
1
2
3
4
5
6
public interface StreamClient {
@Input("myMessage")
SubscribableChannel input();
@Input("myMessage")
MessageChannel output();
}
  1. 2.添加stream接收端
1
2
3
4
5
6
7
8
9
@Component
@EnableBinding(StreamClient.class)
@Slf4j
public class StreamReceiver {
@StreamListener("myMessage")
public void process(Object message){
log.info("StreamReceiver:{}",message);
}
}
  1. 3.添加stream发送端
1
2
3
4
5
6
7
8
9
10
@RestController
public class SendMessageController {
@Autowired
private StreamClient streamClient;
@GetMapping("/sendMessage")
public void process(){
String message = "now " + new Date();
streamClient.output().send(MessageBuilder.withPayload(message).build());
}
}
  1. 4.测试

    有时候我们后端启用2个order服务,然后重启后 客户端发送消息,发现2个都接收到了,那么我们如何多集群情况下,只有一个实例接收消息呢?
    stream里面有个分组,我们配置一下就可以了 那就是使用分组。
3.使用stream传递对象

之前我们在用例中stream传递的是String、实际工作中stream更重要的是传递对象。
生产者:发送OrderDetail对象

消息接受者:

下面我们看一下:mq里面接收的是什么格式数据呢?
我们在上面看不到消息了,原因是消息已经被消费完了,所以我们需要把接收端,消费消息的停掉。让mq端有消息的累计。

上面我们可以看到消息是:OrderDTO对象,如果我们需要在MQ中获取的消息是json格式:则在stream的配置中添加:content-type: application/json

有时候我们的消费者:StreamReceiver消费完消息之后,我们需要给发送者一个通知,传统的做法是在业务逻辑里面后面做处理。
现在我们加上注解即可:

总结:使用Stream可以降低对消息中间件的复杂度,让开发者更多的关注业务开发。

4.商品和订单服务中使用MQ

4.1 引言

我们结合点餐业务来使用mq

  1. 1.之前我们按照上面图提示到:一旦有库存的变化(商品)、都会发布一个消息、订单拿到这个消息之后,会把库存的消息记录到自己的服务里面(这里我们把他记录到redis里面)
  2. 2.导致库存的变化有很多种:a.第一次商品上线的时候,会填写库存,这个时候库存就变化了。b.还有就是货物快卖完的时候需要补货。相当于加库存。我们以扣库存为例,使用消息队列进行通信。

4.2 product服务接入到配置中心

  1. 1.添加配置中心的依赖
  2. 2.修改appliaction.yml文件为:bootstrap.yml文件,并修改其中的配置

  3. 3.在github上建立对应的product-dev.yml文件,并将共有的信息拷贝到里面去

  4. 4.在config服务中看是否可以访问到此配置文件。
  5. 5.启动时候正常启动:说明我们已经把product服务接入到配置中心了。

4.3 product服务在扣库存时候发送队列消息

  1. 1.添加springBoot里面的amqp依赖

  2. 2.在统一配置中心中添加mq的连接信息

4.4 product服务在扣库存地方操作消息队列

我们在扣库存的service中添加扣库存(decreaseStock)后消息通知:

然后我们使用postman测试:

数据库里面的库存从33变成32,数据库扣除成功了

但是我们登录mq的管理后端却发现没有对应的amqpTemplate的productInfo队列,

为什么没有这个队列呢?原因是因为我们压根就没有创建队列,只有在接收方添加注解:@RabbitListener注解才会自动创建,我们 上面只有发送方,所以我们先手动创建下:然后点几次发送,后面发现我们在消息队列中有了相应的消息:

以上就完成了消息的发送,现在消息已经从商品服务发送到消息队列里面了,接下来我们要在订单服务接收消息。
在Order服务的message包下面添加接收消息:

我们先删除之前的productInfo队列,然后重启服务会发现会自动创建队列

4.5 Order服务获取到扣库存信息存储到redis

我们order服务获取到了product服务发送的信息,现在需要把这些信息存储到redis中
redis可以使用docker安装、也可以使用自己免安装的版本

  1. 1.项目中引入依赖(order-server中pom.xml)
1
2
3
4
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
  1. 2.配置redis里面的参数,在统一配置中心添加
1
2
3
4
spring:
redis:
host: localhost
port: 6379
  1. 3.书写存储到redis(使用StringRedisTemplate)

  2. 4.postman测试

4.6 残留的问题

  1. 1.从上面我们知道:扣库存是遍历了商品,然后操作数据库,再然后就是发送mq消息,假如decreaseStockInputList长度大于1,我们第一件商品扣库存之后、发送消息到mq,但是第二件商品报了异常,由于有事务,数据库会回滚,但是mq里面的消息不会回滚,所有有脏数据。
  2. 2.我们修改下,我们在数据库扣完库存之后再发mq消息
    Product服务修改:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Override
public void decreaseStock(List<DecreaseStockInput> decreaseStockInputList) {
//1.获取到扣库存列表
List<ProductInfo> productInfoList = decreaseStockProcess(decreaseStockInputList);
//2.转换成ProductInfoOutput列表
List<ProductInfoOutput> productInfoOutputList = productInfoList.stream().map(e -> {
ProductInfoOutput output = new ProductInfoOutput();
BeanUtils.copyProperties(e, output);
return output;
}).collect(Collectors.toList());
//3.发送mq消息
amqpTemplate.convertAndSend("productInfo", JsonUtil.toJson(productInfoOutputList));
}
@Transactional(rollbackFor = Exception.class)
public List<ProductInfo> decreaseStockProcess(List<DecreaseStockInput> decreaseStockInputList) {
/**
* 遍历:查看是否存在
*/
List<ProductInfo> productInfoList = new ArrayList<>();
for (DecreaseStockInput cartDTO:decreaseStockInputList){
Optional<ProductInfo> productInfoOptional = repository.findById(cartDTO.getProductId());
//商品不存在
if(!productInfoOptional.isPresent()){
throw new ProductException(ResultEnum.PRODUCT_NOT_EXIST);
}
//商品存在-库存错误
ProductInfo productInfo = productInfoOptional.get();
int result = productInfo.getProductStock() - cartDTO.getProductQuantity();
if(result<0){
throw new ProductException(ResultEnum.PRODUCT_STOCK_ERROE);
}
//保存
productInfo.setProductStock(result);
repository.save(productInfo);
productInfoList.add(productInfo);
}
return productInfoList;
}

Order服务修改:
JsonUtil添加如下方法:

5.异步和库存分析

项目改造成异步后,数据一致性等问题是经常遇到的,很多时候在单体服务中,依靠本地事务,我们很容易保证数据的一致性。但是一旦切换到分布式异步情况下就很可能出现数据不一致的情况。比如这里的发消息,数据库回滚数据是自动的。消息多发了要怎么办?这个就需要重新仔细考虑,稍不留神机会出错。下面我们以下面通路进一步看下更多考虑的点。
参考一下链接:https://cloud.tencent.com/developer/article/1344252

毕业于<br>相信技术可以改变人与人之间的生活<br>码农一枚