1.异步和消息
1.简介
- 1.之前我们使用的方式是同步方式。
- 2.异步优点:a.客户端请求不会阻塞进程、服务端的相应可以是非及时的。
- 3.http常见是支持同步、但是http也是支持异步的。
2.异步的常见形态
- 1.通知
- 2.请求/异步响应:客户端发送请求到服务端,服务端异步响应,客户端不会阻塞,服务端默认不会立马响应。
3.消息
3.MQ应用场景
MQ是分布式应用系统最常用的组件。典型应用场景有如下:
1.异步处理:用户注册之后,通过短信服务、积分服务去做他们相应的操作,这样就能提高用户体验。
- 2.流量消峰:一般用于秒杀场景,秒杀过程中一般由于流量较大,会导致应用会挂掉、要解决这个问题。把请求放在消息队列中,如果超过消息队列的长度将抛弃此请求,返回错误信息。
- 3.日志处理:这个典型的组件就是kafka,kafka最初的设计就是用于日志处理,大数据里面用的特别多。通过日志采集、定时写入kafka队列,然后kafka队列定时接收 储存 和转发。
- 应用解耦:比如用户下单后,订单服务需要通知商品服务、之前是订单服务通过调用商品服务的接口,这样订单服务和商品服务是耦合的。使用mq,用户下单后,订单服务完成持续化处理、将消息写入消息队列、返回用户订单下单成功、商品服务来订阅这个消息采用拉或者推形式获取下单信息、商品服务获取到下单信息后进行商品的扣库存等操作。这样达到应解耦。
2.RabbitMQ的基本使用
我们Order服务使用RabbitMQ实现订单和商品服务解耦。
- 1.添加热rabbitmq依赖-在order服务中server子模块添加以下依赖:
|
|
2.添加配置到github远程仓库
3.创建MQ消息接收方
4.创建mq发送方进行测试
我们在测试test文件目录下创建测试类:
测试出现了上面问题,原因是因为没有myQueue队列
此时正常启动。
这个时候一切正常。
- 5.以上是我们自己在RabbitMQ中创建的队列。我们是否可以自动创建队列,然后再调用队列接口。
|
|
- 6.自动创建并且和队列绑定
|
|
2.2 什么情况下需要用到exchange
我们现在是一个小小点餐系统,但是如果后面,我们什么都卖,又卖水果,又卖数码,同时对这两种商品下单,商品变多了,人也多了,订单服务是单独的人来维护,数码供应商、水果供应商都是由单独的人来维护。订单服务要根据不同的商品类型发出不同的MQ消息。相对应的、数码供应商只关注数码订单、水果及其他订单不关注。此时就牵涉到消息的分组。 演示如下:
1.我们接收方模拟两个接收服务(a.数码服务 b.水果服务)
exchange因为都是订单,我们我们叫做myOrder
想要分组归类,我们用key
2.我们发送方 (类似订单服务)如下:
3.SpringCloudStream的使用
可以参数博客:https://www.cnblogs.com/zhixiang-org-cn/p/10093367.html
3.1.基本介绍
- 1.应用模型:应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream 中Binder 交互,通过我们配置来绑定,而 Spring Cloud Stream 的 Binder 负责与中间件交互。所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。
2.抽象绑定器(The Binder Abstraction):Spring Cloud Stream实现Kafkat和RabbitMQ的Binder实现,也包括了一个TestSupportBinder,用于测试。你也可以写根据API去写自己的Binder.Spring Cloud Stream 同样使用了Spring boot的自动配置,并且抽象的Binder使Spring Cloud Stream的应用获得更好的灵活性,比如:我们可以在application.yml或application.properties中指定参数进行配置使用Kafka或者RabbitMQ,而无需修改我们的代码。
3.Binder(SpringCloudStream)是应用程序(Application)和消息中间件(Middleware)之间的粘合剂,使用 SpringCloudStream最大的好处莫过于对消息中间件的进一步封装。可以做到代码层面对消息中间件的无感知,设置于动态的切换中间件,但是也有局限:目前SpringCloudStream仅支持2种Binder:一种是RabbitMQ、另一种是Kafka
3.2.使用SpringCloudStream
为了详细了解SpringCloudStream的话,可以详细了解下:https://www.cnblogs.com/leeSmall/p/8900518.html这篇博文。
1.引入依赖
在order-server.xml中添加如下依赖:
|
|
2.添加mq的配置
之前我们已经配置过了,这里可以省略
3.使用stream发送和接收消息
- 1.定义接口:提供输入输出接口
|
|
- 2.添加stream接收端
|
|
- 3.添加stream发送端
|
|
- 4.测试
有时候我们后端启用2个order服务,然后重启后 客户端发送消息,发现2个都接收到了,那么我们如何多集群情况下,只有一个实例接收消息呢?
stream里面有个分组,我们配置一下就可以了 那就是使用分组。
3.使用stream传递对象
之前我们在用例中stream传递的是String、实际工作中stream更重要的是传递对象。
生产者:发送OrderDetail对象
消息接受者:
下面我们看一下:mq里面接收的是什么格式数据呢?
我们在上面看不到消息了,原因是消息已经被消费完了,所以我们需要把接收端,消费消息的停掉。让mq端有消息的累计。
上面我们可以看到消息是:OrderDTO对象,如果我们需要在MQ中获取的消息是json格式:则在stream的配置中添加:content-type: application/json
有时候我们的消费者:StreamReceiver消费完消息之后,我们需要给发送者一个通知,传统的做法是在业务逻辑里面后面做处理。
现在我们加上注解即可:
总结:使用Stream可以降低对消息中间件的复杂度,让开发者更多的关注业务开发。
4.商品和订单服务中使用MQ
4.1 引言
我们结合点餐业务来使用mq
- 1.之前我们按照上面图提示到:一旦有库存的变化(商品)、都会发布一个消息、订单拿到这个消息之后,会把库存的消息记录到自己的服务里面(这里我们把他记录到redis里面)
- 2.导致库存的变化有很多种:a.第一次商品上线的时候,会填写库存,这个时候库存就变化了。b.还有就是货物快卖完的时候需要补货。相当于加库存。我们以扣库存为例,使用消息队列进行通信。
4.2 product服务接入到配置中心
- 1.添加配置中心的依赖
2.修改appliaction.yml文件为:bootstrap.yml文件,并修改其中的配置
3.在github上建立对应的product-dev.yml文件,并将共有的信息拷贝到里面去
- 4.在config服务中看是否可以访问到此配置文件。
- 5.启动时候正常启动:说明我们已经把product服务接入到配置中心了。
4.3 product服务在扣库存时候发送队列消息
1.添加springBoot里面的amqp依赖
2.在统一配置中心中添加mq的连接信息
4.4 product服务在扣库存地方操作消息队列
我们在扣库存的service中添加扣库存(decreaseStock)后消息通知:
然后我们使用postman测试:
数据库里面的库存从33变成32,数据库扣除成功了
但是我们登录mq的管理后端却发现没有对应的amqpTemplate的productInfo队列,
为什么没有这个队列呢?原因是因为我们压根就没有创建队列,只有在接收方添加注解:@RabbitListener注解才会自动创建,我们 上面只有发送方,所以我们先手动创建下:然后点几次发送,后面发现我们在消息队列中有了相应的消息:
以上就完成了消息的发送,现在消息已经从商品服务发送到消息队列里面了,接下来我们要在订单服务接收消息。
在Order服务的message包下面添加接收消息:
我们先删除之前的productInfo队列,然后重启服务会发现会自动创建队列
4.5 Order服务获取到扣库存信息存储到redis
我们order服务获取到了product服务发送的信息,现在需要把这些信息存储到redis中
redis可以使用docker安装、也可以使用自己免安装的版本
- 1.项目中引入依赖(order-server中pom.xml)
|
|
- 2.配置redis里面的参数,在统一配置中心添加
|
|
3.书写存储到redis(使用StringRedisTemplate)
4.postman测试
4.6 残留的问题
- 1.从上面我们知道:扣库存是遍历了商品,然后操作数据库,再然后就是发送mq消息,假如decreaseStockInputList长度大于1,我们第一件商品扣库存之后、发送消息到mq,但是第二件商品报了异常,由于有事务,数据库会回滚,但是mq里面的消息不会回滚,所有有脏数据。
- 2.我们修改下,我们在数据库扣完库存之后再发mq消息
Product服务修改:
|
|
Order服务修改:
JsonUtil添加如下方法:
5.异步和库存分析
项目改造成异步后,数据一致性等问题是经常遇到的,很多时候在单体服务中,依靠本地事务,我们很容易保证数据的一致性。但是一旦切换到分布式异步情况下就很可能出现数据不一致的情况。比如这里的发消息,数据库回滚数据是自动的。消息多发了要怎么办?这个就需要重新仔细考虑,稍不留神机会出错。下面我们以下面通路进一步看下更多考虑的点。
参考一下链接:https://cloud.tencent.com/developer/article/1344252