博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RabbitMQ入门-高效的Work模式
阅读量:7249 次
发布时间:2019-06-29

本文共 5247 字,大约阅读时间需要 17 分钟。

扛不住的Hello World模式

上篇《RabbitMQ入门-从HelloWorld开始》介绍了RabbitMQ中最基本的Hello World模型。正如其名,Hello World模型组成简单,也很好理解,我们也看到了一条消息时如何从一个生产者最终流向队列并最终被消费者消费的过程。

但是,过于简单、单调的模型设计也存在一些缺陷。假使现在队列Queue中挤压了很多的消息没有被消费,Hello World模型中只有一个消费者,在消费消息时会显得力不从心。如果遇上网络状况异常等情况,则消费速率就更加不同乐观,从而影响了消息的处理效率,影响网站应用的性能。

很直观的思路,我们能想到的是,一个人不行,那就多来几个人,这时候就有了我们的Work模型。

多管齐下的Work模式

619240-20170804223807084-123621979.png

该模型具有以下特征

  • 一个消息生产者P,一个消息存储队列Q,多个消息消费者C

  • Work模型能够较好的解决资源密集型场景的问题,不需要像Hello World那样孤注一掷的等唯一的消费者消费完

  • 多个消费者,多管齐下,更加高效的并行处理消息

实例

如何构造一个资源密集型的场景

相较于Hello World,Work模式主要是在资源密集型的场景更能发挥威力,那么没有工作环境或者很难遇到这样的情况,我们怎么办?
其实,这个场景的本质是为了体现一个消费者处理要很长时间的时候,这个模式是如何发挥作用的。那么,我们可以让每个消费者处理的时间长点不就行了,要让Consumer处理的时间长很简单,只要调用Thread.sleep()即可。
发送端

对于发送端相对Hello World类型来说,没有什么不同。这是我们队这里发送的消息采用指定的格式比如“hello......”,在后面的发送端接收消息后,当遇到"."则停顿1秒或者2秒,所以程序如下

package com.ximalaya.openapi.rabbitmq.work;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.MessageProperties;/** * Created by jackie on 17/8/4. */public class NewTask {    private static final String TASK_QUEUE_NAME = "task_queue";    public static void main(String[] argv) throws Exception {        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("192.168.3.161");        Connection connection = factory.newConnection();        Channel channel = connection.createChannel();        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);        String message = getMessage(argv);        channel.basicPublish("", TASK_QUEUE_NAME,                MessageProperties.PERSISTENT_TEXT_PLAIN,                message.getBytes("UTF-8"));        channel.basicPublish("", TASK_QUEUE_NAME,                MessageProperties.PERSISTENT_TEXT_PLAIN,                message.getBytes("UTF-8"));        channel.basicPublish("", TASK_QUEUE_NAME,                MessageProperties.PERSISTENT_TEXT_PLAIN,                message.getBytes("UTF-8"));        channel.basicPublish("", TASK_QUEUE_NAME,                MessageProperties.PERSISTENT_TEXT_PLAIN,                message.getBytes("UTF-8"));        System.out.println(" [x] Sent '" + message + "'");        channel.close();        connection.close();    }    private static String getMessage(String[] strings) {        if (strings.length < 1)            return "Hello World!";        return joinStrings(strings, " ");    }    private static String joinStrings(String[] strings, String delimiter) {        int length = strings.length;        if (length == 0) return "";        StringBuilder words = new StringBuilder(strings[0]);        for (int i = 1; i < length; i++) {            words.append(delimiter).append(strings[i]);        }        return words.toString();    }}

注意:这里getMessage方法,如果在运行的配置参数中添加了输入参数,则使用输入参数,如果没有填写,则使用默认值"Hello World"。

填写输入参数的方法是在如下图位置写上输入参数
619240-20170804223845990-938387076.png

我们执行发送端代码,向队列"task_queue"中塞入4条消息

619240-20170804223919115-1996792921.gif

从这个动态图片可以发现,通过发送端一次性发送了4条消息。

接收端

package com.ximalaya.openapi.rabbitmq.work;import com.rabbitmq.client.*;import java.io.IOException;/** * Created by jackie on 17/8/4. */public class Worker {    private static final String TASK_QUEUE_NAME = "task_queue";    public static void main(String[] argv) throws Exception {        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("192.168.3.161");        final Connection connection = factory.newConnection();        final Channel channel = connection.createChannel();        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");        channel.basicQos(1);        final Consumer consumer = new DefaultConsumer(channel) {            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                String message = new String(body, "UTF-8");                System.out.println(" [x] Received '" + message + "'");                try {                    doWork(message);                } finally {                    System.out.println(" [x] Done");                    channel.basicAck(envelope.getDeliveryTag(), false);                }            }        };        channel.basicConsume(TASK_QUEUE_NAME, false, consumer);    }    private static void doWork(String task) {        for (char ch : task.toCharArray()) {            if (ch == '.') {                try {                    Thread.sleep(2000);                } catch (InterruptedException _ignored) {                    Thread.currentThread().interrupt();                }            }        }    }}

注意:这里的doWork方法,该方法当遇到"."是就会睡眠2秒钟,所以像"hello..."这样的消息就会睡眠6秒。

下面分两种情况来看接收端的处理信息的情况
一个消费者
如果此时只运行一个接收端的代码,说明只启动了一个Consumer,我们看看消息的消费过程

619240-20170804223945365-722199913.gif

  • 图中Ready的消息依次从4->3->2->1->0,表示消息依次被派出消费

  • Uncknowledged表示没有确认的,这里始终是1,因为消息时一个个发送的,等一个个发完了,最终变为0

  • Total表示总共剩余的消息个数,最终消费完变为0

两个消费者

如果这时候启动两个客户端,我们看下消息是如何被消费的
619240-20170804224008944-777300870.gif

  • 图中的Ready从4->2->0,这是因为有两个消费者,消息分别分发到两个消费者上,一次派发两个,分两次派发完

  • Unacknowledged从0->2->0,过程为在一次发送两条消息时,说明有两条消息等待确认是否被消费掉

  • Total则与Ready变化趋势一致

对比“一个消费者”和“两个消费者”的消费情况,我们确实发现Work的消费处理效率要比Hello World高。

细心的你可能发现了,为什么在“两个消费者”的情况下能够做到如此公平的每个消费者分配两个,有关这块,限于篇幅,将在下篇详细介绍。

如果您觉得阅读本文对您有帮助,请点一下“推荐”按钮,您的“推荐”将是我最大的写作动力!如果您想持续关注我的文章,请扫描二维码,关注JackieZheng的微信公众号,我会将我的文章推送给您,并和您一起分享我日常阅读过的优质文章。

619240-20170115225309400-1706498548.jpg

转载于:https://www.cnblogs.com/bigdataZJ/p/rabbitmq3.html

你可能感兴趣的文章
实战案例:创建支持SSH服务的镜像
查看>>
Fiddler Web Debugger简单调试头部参数
查看>>
Linux环境下发布项目(Tomcat重新启动)
查看>>
centos7配置svn服务器
查看>>
亮剑:PHP,我的未来不是梦(13)
查看>>
MYSQL主从数据同步
查看>>
javascript数组操作
查看>>
linux中父进程退出时如何通知子进程
查看>>
linux 缩减文件系统大小 LVM
查看>>
对比文件md5值实现去重文件
查看>>
C#设计模式之二十三解释器模式(Interpreter Pattern)【行为型】
查看>>
js处理中文乱码记录/nodejs+express error 413
查看>>
基于Keepalived实现LVS双主高可用集群
查看>>
SqlServer 使用脚本创建分发服务及事务复制的可更新订阅
查看>>
什么是Floating (浮动)规则?
查看>>
分布式文件系统-FastDFS
查看>>
HTML5 rotate 做仪表盘
查看>>
为什么说荆州松滋刘氏采穴堂是刘开七、刘广传的后裔
查看>>
React中使用Ant Table组件
查看>>
第四篇 快速、轻量、可扩展、易于使用的EmEditor
查看>>