博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RabbitMQ与.net core(二)Producer与Exchange
阅读量:6978 次
发布时间:2019-06-27

本文共 6500 字,大约阅读时间需要 21 分钟。

原文:

Producer:消息的生产者,也就是创建消息的对象

Exchange:消息的接受者,也就是用来接收消息的对象,Exchange接收到消息后将消息按照规则发送到与他绑定的Queue中。下面我们来定义一个Producer与Exchange。

1.新建.netcore console项目,并引入RabbitMQ.Client的Nuget包

2.创建Exchange

using RabbitMQ.Client;namespace RabbitMQConsole{    class Program    {        static void Main(string[] args)        {            ConnectionFactory factory = new ConnectionFactory();            factory.HostName = "39.**.**.**";            factory.Port = 5672;            factory.VirtualHost = "/";            factory.UserName = "root";            factory.Password = "root";            var exchange = "change2";            var route = "route2";            var queue = "queue2";            using (var connection = factory.CreateConnection())            {                using (var channel = connection.CreateModel())                {                    channel.ExchangeDeclare(exchange, type:"direct", durable: true, autoDelete: false);   //创建Exchange                                    }            }        }    }}

可以看到Echange的参数有:

type:可选项为,fanout,direct,topic,headers。区别如下:

    fanout:发送到所有与当前Exchange绑定的Queue中

    direct:发送到与消息的routeKey相同的Rueue中

    topic:fanout的模糊版本

    headers:发送到与消息的header属性相同的Queue中

durable:持久化

autoDelete:当最后一个绑定(队列或者exchange)被unbind之后,该exchange自动被删除。

 运行程序,可以在可视化界面看到change2

接下来我们可以创建与change2绑定的queue

3.创建Queue

using (var channel = connection.CreateModel())                {                    channel.ExchangeDeclare(exchange, type: "direct", durable: true, autoDelete: false);                    channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false);  #创建queue2                    channel.QueueBind(queue, exchange, route);  #将queue2绑定到exchange2                }

可以看到Echange的参数有:

durable:持久化

exclusive:如果为true,则queue只在channel存在时存在,channel关闭则queue消失

autoDelete:当最后一个绑定(队列或者exchange)被unbind之后,该exchange自动被删除。

去可视化界面看Queue

4.发送消息

using (var channel = connection.CreateModel())                {                    channel.ExchangeDeclare(exchange, type: "direct", durable: true, autoDelete: false);                    channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false);                    channel.QueueBind(queue, exchange, route);                    var props = channel.CreateBasicProperties();                    props.Persistent = true; #持久化                    channel.BasicPublish(exchange, route, true, props, Encoding.UTF8.GetBytes("hello rabbit"));                }

5.消费消息

using RabbitMQ.Client;using System;using System.Text;namespace RabbitMQClient{    class Program    {        private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory()        {            HostName = "39.**.**.**",            Port = 5672,            UserName = "root",            Password = "root",            VirtualHost = "/"        };        static void Main(string[] args)        {            var exchange = "change2";            var route = "route2";            var queue = "queue2";            using (IConnection conn = rabbitMqFactory.CreateConnection())            using (IModel channel = conn.CreateModel())            {                channel.ExchangeDeclare(exchange, "direct", durable: true, autoDelete: false);                channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false);                channel.QueueBind(queue, exchange, route);                while (true)                {                    var message = channel.BasicGet(queue, true);  #第二个参数说明自动释放消息,如为false需手动释放消息                    if(message!=null)                    {                        var msgBody = Encoding.UTF8.GetString(message.Body);                        Console.WriteLine(string.Format("***接收时间:{0},消息内容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody));                    }                    System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1));                }            }        }    }}

运行查看结果

查看可视化界面

6.手动释放消息

while (true)                {                    var message = channel.BasicGet(queue, false);#设置为手动释放                    if(message!=null)                    {                        var msgBody = Encoding.UTF8.GetString(message.Body);                        Console.WriteLine(string.Format("***接收时间:{0},消息内容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody));                    }                    channel.BasicAck(message.DeliveryTag, false); #手动释放                    System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1));                }

我们再发一条消息,然后开始消费,加个断点调试一下

查看一下Queue中消息状态

然后直接取消调试,不让程序走到释放的那一步,再查看一下消息状态

这么说来只要不走到 channel.BasicAck(message.DeliveryTag, false);这一行,消息就不会被释放掉,我们让程序直接走到这一行代码,查看一下消息的状态

如图已经被释放了

7.让失败的消息回到队列中

while (true)                {                    var message = channel.BasicGet(queue, false);                    if(message!=null)                    {                        var msgBody = Encoding.UTF8.GetString(message.Body);                        Console.WriteLine(string.Format("***接收时间:{0},消息内容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody));                        Console.WriteLine(message.DeliveryTag);    #当前消息被处理的次序数                        if (1==1)                            channel.BasicReject(message.DeliveryTag, true);                    }                                        System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1));                }

重新发送4条消息

开始消费

我们可以看到消息一直没有没消费,因为消息被处理之后又放到了队尾

8.监听消息

using (IConnection conn = rabbitMqFactory.CreateConnection())            using (IModel channel = conn.CreateModel())            {                channel.ExchangeDeclare(exchange, "direct", durable: true, autoDelete: false);                channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false);                channel.QueueBind(queue, exchange, route);                channel.BasicQos(prefetchSize: 0, prefetchCount: 10, global: false);  #一次接受10条消息,否则rabbit会把所有的消息一次性推到client,会增大client的负荷                EventingBasicConsumer consumer = new EventingBasicConsumer(channel);                consumer.Received += (model, ea) =>                {                    Byte[] body = ea.Body;                    String message = Encoding.UTF8.GetString(body);                    Console.WriteLine( message+Thread.CurrentThread.ManagedThreadId);                    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);                };                channel.BasicConsume(queue: queue, autoAck: false, consumer: consumer);                Console.ReadLine();            }

 

转载地址:http://qmupl.baihongyu.com/

你可能感兴趣的文章
思科交换机各类型中字母的意思?
查看>>
linux基础命令
查看>>
我的友情链接
查看>>
Nutanix CE on Lenovo W520 初探
查看>>
make执行过程
查看>>
Ansible源码解析 Inventory组概念
查看>>
数据备份学习
查看>>
替换空格
查看>>
Linux中源码包的管理
查看>>
ASCII、Unicode、GBK和UTF-8字符编码的区别联系
查看>>
设计模式(行为型模式)——备忘录模式(Memento)
查看>>
[雪峰磁针石博客]kotlin书籍汇总
查看>>
Azure自动化部署运维浅谈
查看>>
浏览器是怎样工作的:渲染引擎,HTML解析
查看>>
centos下LAMP之源码编译安装httpd
查看>>
EBS form日历可选范围设置(calendar.setup )介绍
查看>>
myeclipse莫名其妙的问题
查看>>
iOS-UIWebView添加头部和尾部
查看>>
你最需要了解的H3C交换机端口安全模式
查看>>
常用Linux路由命令(route、ip、ifconfig等等)
查看>>