004 死信(限制队列最大长度)

文章目录

    • 消息ttl过期成为死信
    • 队列达到最大长度成为死信
      • MyOrder.java
      • RabbitMQDirectConfig.java
      • OrderProducer.java
      • PayConsumer.java
      • DeadOrderConsumer.java
    • application.yaml

死信就是无法被消费的消息。一般来说,producer 将消息投递到 broker 或者直接到 queue 中,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。

应用场景:
为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。 还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效

死信的原因:

消息 TTL (Time To Live ) : x-message-ttl
队列达到最大长度(队列满了无法再添加数据到 mq 中) : x-max-length
消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false

消息ttl过期成为死信

map.put("x-message-ttl",2000); // 消息存活时间1s

队列达到最大长度成为死信

MyOrder.java


package com.example.direct;

import java.io.Serializable;

public class MyOrder implements Serializable {
    private String orderId;
    private String orderNumber;
    private String customerName;
    private Integer productId;
    private String productName;
    private Float productPrice;
    private Integer productCount;
    private Float orderPrice;

    public MyOrder(){}


    public MyOrder(String orderId, String orderNumber, String customerName, Integer productId, String productName, Float productPrice, Integer productCount, Float orderPrice) {
        this.orderId = orderId;
        this.orderNumber = orderNumber;
        this.customerName = customerName;
        this.productId = productId;
        this.productName = productName;
        this.productPrice = productPrice;
        this.productCount = productCount;
        this.orderPrice = orderPrice;
    }

    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public String getOrderNumber() {
        return orderNumber;
    }

    public Integer getProductId() {
        return productId;
    }

    public void setProductId(Integer productId) {
        this.productId = productId;
    }

    public void setOrderNumber(String orderNumber) {
        this.orderNumber = orderNumber;
    }

    public String getCustomerName() {
        return customerName;
    }

    public void setCustomerName(String customerName) {
        this.customerName = customerName;
    }

    public String getProductName() {
        return productName;
    }

    public void setProductName(String productName) {
        this.productName = productName;
    }

    public Float getProductPrice() {
        return productPrice;
    }

    public void setProductPrice(Float productPrice) {
        this.productPrice = productPrice;
    }

    public Integer getProductCount() {
        return productCount;
    }

    public void setProductCount(Integer productCount) {
        this.productCount = productCount;
    }

    public Float getOrderPrice() {
        return orderPrice;
    }

    public void setOrderPrice(Float orderPrice) {
        this.orderPrice = orderPrice;
    }

    @Override
    public String toString() {
        return "MyOrder{" +
                "orderId=" + orderId +
                ", orderNumber='" + orderNumber + '\'' +
                ", customerName='" + customerName + '\'' +
                ", productName='" + productName + '\'' +
                ", productPrice=" + productPrice +
                ", productCount=" + productCount +
                ", orderPrice=" + orderPrice +
                '}';
    }
}


RabbitMQDirectConfig.java


package com.example.direct;


import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitMQDirectConfig  {

//    1. 创建交换机
//    @Bean
//    public DirectExchange newDirectExchange(){
//        return new DirectExchange("myDirectExchangeAAA",true,false);
//    }

    //2. 创建队列
//    @Bean
//    public Queue newQueueA(){
//        return new Queue("queueAAA",true);
//    }

//3. 绑定队列到交换机中
//    @Bean
//    public Binding bindingA(){
//        return BindingBuilder.bind(newQueueA()).to(newDirectExchange()).with("keyAAA");
//    }








    //==================死信


    //1. 创建交换机
    @Bean
    public DirectExchange newExchange(){
        return new DirectExchange("normalExchange",true,false);
    }
    //2. 创建队列
    @Bean
    public Queue newQueue(){
        Map<String ,Object> map = new HashMap<>();
        //map.put("x-message-ttl",2000); // 消息存活时间1s
        map.put("x-max-length",6); // 队列达到最大长度 为6
        map.put("x-dead-letter-exchange","deadExchange");// 设置死信交换机 的名称
        map.put("x-dead-letter-routing-key","key2") ;//设置死信路由键名字
        return new Queue("normalQueueA",true,false,false,map);

    }
    //3. 绑定
    @Bean
    public Binding binding(){
        return BindingBuilder.bind(newQueue()).to(newExchange()).with("key1");
    }


    //4. 创建死信交换机
    @Bean
    public DirectExchange newDeadExchange(){
        return new DirectExchange("deadExchange",true,false);
    }
    //5. 创建死信队列
    @Bean
    public Queue newDeadQueue(){

        return new Queue("deadQueueA",true,false,false);
    }
    //6. 绑定
    @Bean
    public Binding bindingDead(){
        return BindingBuilder.bind(newDeadQueue()).to(newDeadExchange()).with("key2");
    }








}



OrderProducer.java


package com.example.direct;


import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;

@RestController
@RequestMapping("a")
public class OrderProducer {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    @GetMapping("/submitOrder")
    public String submitOrder(){

        Map<String,Object> map = new HashMap<>();
        map.put("orderNumber","2222");//String
        map.put("productId",1111);//Integer


        for(int i=0;i<=130;i++){

            String orderId = UUID.randomUUID().toString().replace("-","");


            map.put("orderId",orderId);


            rabbitTemplate.convertAndSend("normalExchange", "key1", map);
        }


        return "生产者下单成功";

    }


}



PayConsumer.java



package com.example.direct;


import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;

@Component
public class PayConsumer {
    @RabbitHandler
    @RabbitListener(queues = "normalQueueA")
    public void process(Map map, Channel channel, Message message) throws IOException {

        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        System.out.println("支付服务接收到的消息:" + map);
        String orderId = (String)map.get("orderId");//String
        Integer productId = (Integer)map.get("productId");//Integer
        String orderNum = (String)map.get("orderNumber");//String

        System.out.println("支付服务接收到的orderId:" + orderId);
        System.out.println("支付服务接收到的productId:" + productId);
        System.out.println("支付服务接收到的orderNum:" + orderNum);

        //告诉broker,消息已经被确认
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }
}


DeadOrderConsumer.java



package com.example.direct;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
public class DeadOrderConsumer {

    // 获得死信队列中的消息
    @RabbitHandler
    @RabbitListener(queues = "deadQueueA")
    public void process(Map map){
        System.out.println("订单取消支付后,从死信队列中接收到的消息:" + map);
        String orderId = (String)map.get("orderId");//String
        Integer productId = (Integer)map.get("productId");//Integer
        String orderNum = (String)map.get("orderNumber");//String


        System.out.println("取消支付后,从死信队列中接收到的orderId:" + orderId);
        System.out.println("取消支付后,从死信队列中接收到的productId:" + productId);
        System.out.println("取消支付后,从死信队列中接收到的orderNum:" + orderNum);


    }
}


application.yaml


server:
  servlet:
    context-path: /app
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    publisher-confirm-type: correlated  # 确认交换机已经接收到生产者的消息了
    publisher-returns: true   #  消息已经到了队列(交换机与队列绑定成功的)
    listener:
      simple:
        acknowledge-mode: manual # 手动消息确认
        concurrency: 1 #消费者数量
        max-concurrency: 1  #消费者最大数量
        prefetch: 1  #消费者每次从队列中取几个消息


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/574224.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

xgp加速器免费 微软商店xgp用什么加速器

2001年11月14日深夜&#xff0c;比尔盖茨亲自来到时代广场&#xff0c;在午夜时分将第一台Xbox交给了来自新泽西的20岁年轻人爱德华格拉克曼&#xff0c;后者在回忆中说&#xff1a;“比尔盖茨就是上帝。”性能超越顶级PC的Xbox让他们趋之若鹜。2000年3月10日&#xff0c;微软宣…

25-代码随想录第454题.四数相加II

25-代码随想录第454题.四数相加II 给定四个包含整数的数组列表 A , B , C , D ,计算有多少个元组 (i, j, k, l) &#xff0c;使得 A[i] B[j] C[k] D[l] 0。 为了使问题简单化&#xff0c;所有的 A, B, C, D 具有相同的长度 N&#xff0c;且 0 ≤ N ≤ 500 。所有整数的范…

python 笔记ast.literal_eval

1 介绍 ast.literal_eval 是 Python 标准库 ast 模块中的一个函数&#xff0c;用于安全地评估表示 Python 字面量或容器&#xff08;如列表、字典、元组、集合&#xff09;的字符串 import ast # 解析并执行一个数字表达式 num ast.literal_eval("3.14") prin…

OpenFeign微服务调用组件!!!

1.Feign是什么 GitHub - OpenFeign/feign: Feign makes writing java http clients easierFeign makes writing java http clients easier. Contribute to OpenFeign/feign development by creating an account on GitHub.https://github.com/OpenFeign/feignFeign是Netflix开…

项目十一:爬取热搜榜(小白实战级)

首先&#xff0c;恭喜各位也恭喜自已学习爬虫基础到达圆满级&#xff0c;今后的自已python爬虫之旅会随着网络发展而不断进步。回想起来&#xff0c;我学过请求库requests模块、解析库re模块、lmxl模块到数据保存的基本应用方法&#xff0c;这一次的学习python爬虫之旅收获很多…

Vu3+QuaggaJs实现web页面识别条形码

一、什么是QuaggaJs QuaggaJS是一个基于JavaScript的开源图像识别库&#xff0c;可用于识别条形码。 QuaggaJs的作用主要体现在以下几个方面&#xff1a; 实时图像处理与识别&#xff1a;QuaggaJs是一款基于JavaScript的开源库&#xff0c;它允许在Web浏览器中实现实时的图像…

ASP.NET Core 3 高级编程(第8版) 学习笔记 03

本篇介绍原书的第 18 章&#xff0c;为 19 章 Restful Service 编写基础代码。本章实现了如下内容&#xff1a; 1&#xff09;使用 Entity Framework Core 操作 Sql Server 数据库 2&#xff09;Entity Framework Core 数据库迁移和使用种子数据的方法 3&#xff09;使用中间件…

Qt Quick centerIn和fill 的用法

1&#xff09;Qt Quick centerIn和fill 的用法&#xff1a; import QtQuick 2.5 Rectangle { width:300; height:200; Rectangle { color: "blue"; anchors.fill: parent; border.width: 6; border.co…

详解工业网关在线探测功能及用途

工业网关专为工业物联网应用设计&#xff0c;可实现包括不同通讯协议之间的兼容和转换&#xff0c;提供软硬件加密保障工业数据安全传输&#xff0c;发挥强大算力实现数据边缘预处理&#xff0c;联动联调工业网络设备实现高效协同等。在线探测功能是佰马工业网关的一项重要功能…

unity学习(89)——unity塞满c盘!--删除editor下的log文件

卸了一个视频后强制续命打开详细信息&#xff1a; 这个再往下找也是没用的&#xff01; 显示隐藏文件夹后&#xff01;执行如下操作&#xff01; 30个g&#xff01; 其中unity占23g editer占了21g 删除C:\Users\王栋林\AppData\Local\Unity\Editor下的log文件 恢复到之前的水…

使用 Flask 和 WTForms 构建一个用户注册表单

在这篇技术博客中&#xff0c;我们将使用 Flask 和 WTForms 库来构建一个用户注册表单。我们将创建一个简单的 Flask 应用&#xff0c;并使用 WTForms 定义一个注册表单&#xff0c;包括用户名、密码、确认密码、邮箱、性别、城市和爱好等字段。我们还将为表单添加验证规则&…

【C 数据结构】图

文章目录 【 1. 基本原理 】1.1 无向图1.2 有向图1.3 基本知识 【 2. 图的存储结构 】2.1 完全图2.2 稀疏图和稠密图2.3 连通图2.3.1 (普通)连通图连通图 - 无向图非连通图 的 连通分量 2.3.2 强连通图强连通图 - 有向图非强连通有向图 的 强连通分量 2.3.3 生成树 - 连通图2.3…

美区视频带货“一哥”,一周销量狂干三十万美金!

“超店有数显示&#xff0c;Tybuggy上周带货狂揽34.3万美金&#xff0c;超出第二名近30倍。” TikTok风波再起&#xff0c;4月17日&#xff0c;美众议院推出援乌援以军事议案&#xff0c;值得注意的是&#xff0c;TikTok剥离法案被“打包”夹带其中&#xff0c;以此加大再参议…

LLM应用实战:当KBQA集成LLM(二)

1. 背景 又两周过去了&#xff0c;本qiang~依然奋斗在上周提到的项目KBQA集成LLM&#xff0c;感兴趣的可通过传送门查阅先前的文章《LLM应用实战&#xff1a;当KBQA集成LLM》。 本次又有什么更新呢&#xff1f;主要是针对上次提到的缺点进行优化改进。主要包含如下方面&#…

【Linux笔记】基本指令(一)

一道残阳铺水中 半江瑟瑟半江红 目录 Linux基本指令 罗列目录内容&#xff1a;ls 指令 显示当前目录位置信息&#xff1a;pwd 指令 切换工作目录&#xff1a;cd 指令 创建文件修改时间戳&#xff1a;touch指令 创建空目录&#xff1a;mkdir指令 删除空目录&#xff1a;rmdir指…

1.3K Star我上位机项目中用了这个开源项目

软件介绍 ClientServerProject的软件是一款基于C-S&#xff08;客户端-服务器&#xff09;架构的通用开发框架&#xff0c;为中小型系统的快速开发提供强大的支持。该框架由服务端、客户端以及公共组件三部分组成&#xff0c;不仅提供了基础的账户管理、版本控制、软件升级、公…

输入法重大漏洞曝光,仅华为幸免,近10亿用户受影响

近日&#xff0c;Citizenlab研究人员调查了多家厂商的输入法应用安全漏洞并报告称&#xff1a;除华为以外&#xff0c;百度、荣耀、科大讯飞、OPPO、三星、腾讯、Vivo和小米等供应商的九款应用程序中有八款均存在安全漏洞。 随着用户规模的不断增长&#xff0c;云输入法应用的…

kubernetes中DaemonSet控制器

一、概念 使用DaemonSet控制器&#xff0c;相当于在节点上启动了一个守护进程。通过DaemonSet控制器可以确保在每个节点上运行Pod的一个副本。如果有心的node节点加入集群&#xff0c;则DaemonSet控制器会自动给新加入的节点增加一个Pod的副本&#xff1b;反之&#xff0c;当有…

GPT的全面历史和演变:从GPT-1到GPT-4

人工智能新篇章&#xff1a;GPT-4与人类互动的未来&#xff01; 本文探讨了生成式预训练 Transformer (GPT) 的显着演变&#xff0c;提供了从开创性的 GPT-1 到复杂的 GPT-4 的旅程。 每次迭代都标志着重大的技术飞跃&#xff0c;深刻影响人工智能领域以及我们与技术的互动。 我…

vmware虚拟机网络“桥接模式”与“NAT模式”的联网原理及linux环境下IP配置指引

一、vmware虚拟机网络“桥接模式”与“NAT模式”的区别 选中虚拟机》设置》网络适配器&#xff0c;打开虚拟机设置面板 我们看到网络连接处有多个选项&#xff0c;今天良哥通过试验告诉你“桥接模式”和“NAT模式”的联网原理、区别及两种模式下IP地址配置的详细方法。 桥接模…
最新文章