DelayedQueueController.java 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. package com.ruoyi.demo.controller.queue;
  2. import com.ruoyi.common.core.domain.R;
  3. import com.ruoyi.common.utils.redis.QueueUtils;
  4. import io.swagger.v3.oas.annotations.Parameter;
  5. import io.swagger.v3.oas.annotations.tags.Tag;
  6. import lombok.RequiredArgsConstructor;
  7. import lombok.extern.slf4j.Slf4j;
  8. import org.springframework.web.bind.annotation.GetMapping;
  9. import org.springframework.web.bind.annotation.RequestMapping;
  10. import org.springframework.web.bind.annotation.RestController;
  11. import java.util.concurrent.TimeUnit;
  12. /**
  13. * 延迟队列 演示案例
  14. * <p>
  15. * 轻量级队列 重量级数据量 请使用 MQ
  16. * 例如: 创建订单30分钟后过期处理
  17. * <p>
  18. * 集群测试通过 同一个数据只会被消费一次 做好事务补偿
  19. * 集群测试流程 两台集群分别开启订阅 在其中一台发送数据 观察接收消息的规律
  20. *
  21. * @author Lion Li
  22. * @version 3.6.0
  23. */
  24. @Slf4j
  25. @Tag(name ="延迟队列 演示案例", description = "延迟队列")
  26. @RequiredArgsConstructor
  27. @RestController
  28. @RequestMapping("/demo/queue/delayed")
  29. public class DelayedQueueController {
  30. @GetMapping("/subscribe")
  31. public R<Void> subscribe(@Parameter(name = "队列名") String queueName) {
  32. log.info("通道: {} 监听中......", queueName);
  33. // 项目初始化设置一次即可
  34. QueueUtils.subscribeBlockingQueue(queueName, (String orderNum) -> {
  35. // 观察接收时间
  36. log.info("通道: {}, 收到数据: {}", queueName, orderNum);
  37. });
  38. return R.ok("操作成功");
  39. }
  40. @GetMapping("/add")
  41. public R<Void> add(@Parameter(name = "队列名") String queueName,
  42. @Parameter(name = "订单号") String orderNum,
  43. @Parameter(name = "延迟时间(秒)") Long time) {
  44. QueueUtils.addDelayedQueueObject(queueName, orderNum, time, TimeUnit.SECONDS);
  45. // 观察发送时间
  46. log.info("通道: {} , 发送数据: {}", queueName, orderNum);
  47. return R.ok("操作成功");
  48. }
  49. @GetMapping("/remove")
  50. public R<Void> remove(@Parameter(name = "队列名") String queueName,
  51. @Parameter(name = "订单号") String orderNum) {
  52. if (QueueUtils.removeDelayedQueueObject(queueName, orderNum)) {
  53. log.info("通道: {} , 删除数据: {}", queueName, orderNum);
  54. } else {
  55. return R.fail("操作失败");
  56. }
  57. return R.ok("操作成功");
  58. }
  59. @GetMapping("/destroy")
  60. public R<Void> destroy(@Parameter(name = "队列名") String queueName) {
  61. // 用完了一定要销毁 否则会一直存在
  62. QueueUtils.destroyDelayedQueue(queueName);
  63. return R.ok("操作成功");
  64. }
  65. }