SpringBoot
是为了简化 Spring
应用的创建、运行、调试、部署等一系列问题而诞生的产物,自动装配的特性让我们可以更好的关注业务本身而不是外部的XML配置,我们只需遵循规范,引入相关的依赖就可以轻易的搭建出一个 WEB 工程
初探RabbitMQ消息队列 中介绍了RabbitMQ
的简单用法,顺带提及了下延迟队列的作用。所谓延时消息
就是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。
延迟队列
延迟队列能做什么?
订单业务: 在电商/点餐中,都有下单后 30 分钟内没有付款,就自动取消订单。
短信通知: 下单成功后 60s 之后给用户发送短信通知。
失败重试: 业务操作失败后,间隔一定的时间进行失败重试。
这类业务的特点就是:非实时的,需要延迟处理,需要进行失败重试。一种比较笨的方式是采用定时任务,轮训数据库,方法简单好用,但性能底下,在高并发情况下容易弄死数据库,间隔时间不好设置,时间过大,影响精度,过小影响性能,而且做不到按超时的时间顺序处理。另一种就是用Java中的DelayQueue 位于java.util.concurrent包下,本质是由PriorityQueue和BlockingQueue实现的阻塞优先级队列。
,这玩意最大的问题就是不支持分布式与持久化
RabbitMQ 实现思路 RabbitMQ队列
本身是没有直接实现支持延迟队列的功能,但可以通过它的Time-To-Live Extensions 与 Dead Letter Exchange 的特性模拟出延迟队列的功能。
Time-To-Live Extensions
RabbitMQ
支持为队列或者消息设置TTL(time to live 存活时间) 。TTL表明了一条消息可在队列中存活的最大时间。当某条消息被设置了TTL或者当某条消息进入了设置了TTL的队列时,这条消息会在TTL时间后死亡
成为Dead Letter
。如果既配置了消息的TTL,又配置了队列的TTL,那么较小的那个值会被取用。
Dead Letter Exchange
死信交换机
,上文中提到设置了 TTL 的消息或队列最终会成为Dead Letter
。如果为队列设置了Dead Letter Exchange(DLX)
,那么这些Dead Letter
就会被重新发送到Dead Letter Exchange
中,然后通过Dead Letter Exchange
路由到其他队列,即可实现延迟队列的功能。
导入依赖 在 pom.xml
中添加 spring-boot-starter-amqp
的依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 <dependencies > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-amqp</artifactId > </dependency > <dependency > <groupId > com.alibaba</groupId > <artifactId > fastjson</artifactId > <version > 1.2.46</version > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-web</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-test</artifactId > <scope > test</scope > </dependency > </dependencies >
属性配置 在 application.properties
文件中配置rabbitmq
相关内容,值得注意的是这里配置了手动ACK的开关
1 2 3 4 5 6 7 spring.rabbitmq.username=battcn spring.rabbitmq.password=battcn spring.rabbitmq.host=192.168.0.133 spring.rabbitmq.port=5672 spring.rabbitmq.virtual-host=/ spring.rabbitmq.listener.simple.acknowledge-mode=manual
具体编码 定义队列 如果手动创建过或者RabbitMQ
中已经存在该队列那么也可以省略下述代码…
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 package com.battcn.config;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.core.*;import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;@Configuration public class RabbitConfig { private static final Logger log = LoggerFactory.getLogger(RabbitConfig.class); @Bean public RabbitTemplate rabbitTemplate (CachingConnectionFactory connectionFactory) { connectionFactory.setPublisherConfirms(true ); connectionFactory.setPublisherReturns(true ); RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMandatory(true ); rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("消息发送成功:correlationData({}),ack({}),cause({})" , correlationData, ack, cause)); rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}" , exchange, routingKey, replyCode, replyText, message)); return rabbitTemplate; } private static final String REGISTER_DELAY_QUEUE = "dev.book.register.delay.queue" ; public static final String REGISTER_DELAY_EXCHANGE = "dev.book.register.delay.exchange" ; public static final String DELAY_ROUTING_KEY = "" ; public static final String REGISTER_QUEUE_NAME = "dev.book.register.queue" ; public static final String REGISTER_EXCHANGE_NAME = "dev.book.register.exchange" ; public static final String ROUTING_KEY = "all" ; @Bean public Queue delayProcessQueue () { Map<String, Object> params = new HashMap<>(); params.put("x-dead-letter-exchange" , REGISTER_EXCHANGE_NAME); params.put("x-dead-letter-routing-key" , ROUTING_KEY); return new Queue(REGISTER_DELAY_QUEUE, true , false , false , params); } @Bean public DirectExchange delayExchange () { return new DirectExchange(REGISTER_DELAY_EXCHANGE); } @Bean public Binding dlxBinding () { return BindingBuilder.bind(delayProcessQueue()).to(delayExchange()).with(DELAY_ROUTING_KEY); } @Bean public Queue registerBookQueue () { return new Queue(REGISTER_QUEUE_NAME, true ); } @Bean public TopicExchange registerBookTopicExchange () { return new TopicExchange(REGISTER_EXCHANGE_NAME); } @Bean public Binding registerBookBinding () { return BindingBuilder.bind(registerBookQueue()).to(registerBookTopicExchange()).with(ROUTING_KEY); } }
实体类 创建一个Book
类
1 2 3 4 5 6 7 8 public class Book implements java .io .Serializable { private static final long serialVersionUID = -2164058270260403154L ; private String id; private String name; }
控制器 编写一个Controller
类,用于消息发送工作,同时为了看到测试效果,添加日志输出,将发送消息的时间记录下来..
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 package com.battcn.controller;import com.battcn.config.RabbitConfig;import com.battcn.entity.Book;import com.battcn.handler.BookHandler;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.support.converter.AbstractJavaTypeMapper;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.*;import java.time.LocalDateTime;@RestController @RequestMapping (value = "/books" )public class BookController { private static final Logger log = LoggerFactory.getLogger(BookController.class); private final RabbitTemplate rabbitTemplate; @Autowired public BookController (RabbitTemplate rabbitTemplate) { this .rabbitTemplate = rabbitTemplate; } @GetMapping public void defaultMessage () { Book book = new Book(); book.setId("1" ); book.setName("一起来学Spring Boot" ); this .rabbitTemplate.convertAndSend(RabbitConfig.REGISTER_DELAY_EXCHANGE, RabbitConfig.DELAY_ROUTING_KEY, book, message -> { message.getMessageProperties().setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME, Book.class.getName()); message.getMessageProperties().setExpiration(5 * 1000 + "" ); return message; }); log.info("[发送时间] - [{}]" , LocalDateTime.now()); } }
消息消费者 默认情况下 spring-boot-data-amqp
是自动ACK
机制,就意味着 MQ 会在消息消费完毕后自动帮我们去ACK,这样依赖就存在这样一个问题:如果报错了,消息不会丢失,会无限循环消费,很容易就吧磁盘空间耗完,虽然可以配置消费的次数但这种做法也有失优雅。目前比较推荐的就是我们手动ACK
然后将消费错误的消息转移到其它的消息队列中,做补偿处理。 由于我们需要手动控制ACK
,因此下面监听完消息后需要调用basicAck
通知rabbitmq
消息已被正确消费,可以将远程队列中的消息删除
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 package com.battcn.handler;import com.battcn.config.RabbitConfig;import com.battcn.entity.Book;import com.rabbitmq.client.Channel;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.io.IOException;import java.time.LocalDateTime;@Component public class BookHandler { private static final Logger log = LoggerFactory.getLogger(BookHandler.class); @RabbitListener (queues = {RabbitConfig.REGISTER_QUEUE_NAME}) public void listenerDelayQueue (Book book, Message message, Channel channel) { log.info("[listenerDelayQueue 监听的消息] - [消费时间] - [{}] - [{}]" , LocalDateTime.now(), book.toString()); try { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false ); } catch (IOException e) { } } }
主函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 package com.battcn;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication public class Chapter12Application { public static void main (String[] args) { SpringApplication.run(Chapter12Application.class, args); } }
测试 完成准备事项后,启动Chapter12Application
访问 http://localhost:8080/books 将会看到如下内容,就代表一切正常….
1 2 2018 -05 -23 19 :56 :36.248 INFO 29048 --- [nio-8080 -exec-1 ] com.battcn.controller.BookController : [发送时间] - [2018 -05 -23 T19:56 :36.248 ]2018 -05 -23 19 :56 :41.256 INFO 29048 --- [cTaskExecutor-1 ] com.battcn.handler.BookHandler : [listenerDelayQueue 监听的消息] - [消费时间] - [2018 -05 -23 T19:56 :41.256 ] - [Book{id='1' , name='一起来学Spring Boot' }]
总结 目前很多大佬都写过关于 SpringBoot
的教程了,如有雷同,请多多包涵,本教程基于最新的 spring-boot-starter-parent:2.0.2.RELEASE
编写,包括新版本的特性都会一起介绍…
说点什么
个人QQ:1837307557
battcn开源群(适合新手):391619659
微信公众号(欢迎调戏):battcn
个人博客:http://blog.battcn.com/
全文代码:https://github.com/battcn/spring-boot2-learning/tree/master/chapter12