>>分享流行的Java框架以及开源软件,对孙卫琴的《精通Spring:Java Web开发技术详解》提供技术支持 书籍支持  卫琴直播  品书摘要  在线测试  资源下载  联系我们
发表一个新主题 开启一个新投票 回复文章 您是本文章第 18920 个阅读者 刷新本主题
 * 贴子主题:  Redis用作消息队列 回复文章 点赞(0)  收藏  
作者:mary    发表时间:2020-06-16 03:13:25     消息  查看  搜索  好友  邮件  复制  引用

      Redis不仅可作为缓存服务器,还可用作消息队列。它的列表类型天生支持用作消息队列。如下图所示:

                点击在新窗口中浏览原图
CTRL+鼠标滚轮放大或缩小

image

     由于Redis的列表是使用双向链表实现的,保存了头尾节点,所以在列表头尾两边插取元素都是非常快的。

     所以可以直接使用Redis的List实现消息队列,只需简单的两个指令lpush和rpop或者rpush和lpop。简单示例如下:

     存放消息端(消息生产者):    
package org.yamikaze.redis.messsage.queue;
import org.yamikaze.redis.test.MyJedisFactory;
import redis.clients.jedis.Jedis;

import java.util.concurrent.TimeUnit;

/**
* 消息生产者
* @author yamikaze
*/

public class Producer extends Thread {

    public static final String MESSAGE_KEY = "message:queue";
    private Jedis jedis;
    private String producerName;
    private volatile int count;

    public Producer(String name) {
        this.producerName = name;
        init();
    }

    private void init() {
        jedis = MyJedisFactory.getLocalJedis();
    }

    public void putMessage(String message) {
        Long size = jedis.lpush(MESSAGE_KEY, message);
        System.out.println(producerName + ": 当前未被处理消息条数为:" + size);
        count++;
    }

    public int getCount() {
        return count;
    }

    @Override
    public void run() {
        try {
            while (true) {
                putMessage(StringUtils.generate32Str());
                TimeUnit.SECONDS.sleep(1);
            }
        } catch (InterruptedException e) {

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws InterruptedException{
        Producer producer = new Producer("myProducer");
        producer.start();

        for(; ;) {
            System.out.println("main : 已存储消息条数:" + producer.getCount());
            TimeUnit.SECONDS.sleep(10);
        }
    }
}

  消息处理端(消息消费者):    

package org.yamikaze.redis.messsage.queue;

import org.yamikaze.redis.test.MyJedisFactory;
import redis.clients.jedis.Jedis;

/**
* 消息消费者
* @author yamikaze
*/

public class Customer extends Thread{

    private String customerName;
    private volatile int count;
    private Jedis jedis;

    public Customer(String name) {
        this.customerName = name;
        init();
    }

    private void init() {
        jedis = MyJedisFactory.getLocalJedis();
    }

    public void processMessage() {
        String message = jedis.rpop(Producer.MESSAGE_KEY);
        if(message != null) {
            count++;
            handle(message);
        }
    }

    public void handle(String message) {
        System.out.println(customerName + " 正在处理消息,消息内容是: " + message + " 这是第" + count + "条");
    }

    @Override
    public void run() {
        while (true) {
            processMessage();
        }
    }

    public static void main(String[] args) {
        Customer customer = new Customer("yamikaze");
        customer.start();
    }
}

  但上述例子中消息消费者有一个问题存在,即需要不停的调用rpop方法查看List中是否有待处理消息。每调用一次都会发起一次连接,这会造成不必要的浪费。也许你会使用Thread.sleep()等方法让消费者线程隔一段时间再消费,但这样做有两个问题:

     1、如果生产者速度大于消费者消费速度,消息队列长度会一直增大,时间久了会占用大量内存空间。

     2、如果睡眠时间过长,这样不能处理一些时效性的消息,睡眠时间过短,也会在连接上造成比较大的开销。

     所以可以使用brpop指令,这个指令只有在有元素时才返回,没有则会阻塞直到超时返回null,于是消费端可以将processMessage可以改为这样:    

public void processMessage() {
    /**
     * brpop支持多个列表(队列)
     * brpop指令是支持队列优先级的,比如这个例子中MESSAGE_KEY的优先级大于testKey(顺序决定)。
     * 如果两个列表中都有元素,会优先返回优先级高的列表中的元素,所以这儿优先返回MESSAGE_KEY
     * 0表示不限制等待,会一直阻塞在这儿
     */

    List<String> messages = jedis.brpop(0, Producer.MESSAGE_KEY, "testKey");
    if(messages.size() != 0) {
        //由于该指令可以监听多个Key,所以返回的是一个列表
        //列表由2项组成,1) 列表名,2)数据
        String keyName = messages.get(0);
        //如果返回的是MESSAGE_KEY的消息
        if(Producer.MESSAGE_KEY.equals(keyName)) {
            String message = messages.get(1);
            handle(message);
        }

    }
    System.out.println("=======================");
}

  然后可以运行Customer,清空控制台,可以看到程序没有任何输出,阻塞在了brpop这儿。然后在打开Redis的客户端,输入指令client list,可以查看当前有两个连接。

     发布/订阅模式

     Redis除了对消息队列提供支持外,还提供了一组命令用于支持发布/订阅模式。

     1、发布

     PUBLISH指令可用于发布一条消息,格式 PUBLISH channel message

                点击在新窗口中浏览原图
CTRL+鼠标滚轮放大或缩小

image

     返回值表示订阅了该消息的数量。

     2、订阅

     SUBSCRIBE指令用于接收一条消息,格式 SUBSCRIBE channel

                点击在新窗口中浏览原图
CTRL+鼠标滚轮放大或缩小

image

     可以看到使用SUBSCRIBE指令后进入了订阅模式,但没有接收到publish发送的消息,这是因为只有在消息发出去前订阅才会接收到。在这个模式下其他指令,只能看到回复。回复分为三种类型:

     1、如果为subscribe,第二个值表示订阅的频道,第三个值表示是第几个订阅的频道?(理解成序号?)

     2、如果为message(消息),第二个值为产生该消息的频道,第三个值为消息

     3、如果为unsubscribe,第二个值表示取消订阅的频道,第三个值表示当前客户端的订阅数量。

                点击在新窗口中浏览原图
CTRL+鼠标滚轮放大或缩小

image

     可以使用指令UNSUBSCRIBE退订,如果不加参数,则会退订所有由SUBSCRIBE指令订阅的频道。

     Redis还支持基于通配符的消息订阅,使用指令PSUBSCRIBE (pattern subscribe),例如:

     再试试推送消息会得到以下结果:

                点击在新窗口中浏览原图
CTRL+鼠标滚轮放大或缩小

image

     可以看到publish指令返回的是2,而订阅端这边接收了两次消息。这是因为PSUBSCRIBE指令可以重复订阅频道。而使用PSUBSCRIBE指令订阅的频道也要使用指令PUNSUBSCRIBE指令退订,该指令无法退订SUBSCRIBE订阅的频道,同理UNSUBSCRIBE也不能退订PSUBSCRIBE指令订阅的频道。同时PUNSUBSCRIBE指令通配符不会展开。

     例如:PUNSUBSCRIBE * 不会匹配到 channel., 所以要取消订阅channel.就要这样写PUBSUBSCRIBE channel.*。

     代码示范如下:    

package org.yamikaze.redis.messsage.subscribe;

import org.yamikaze.redis.messsage.queue.StringUtils;
import org.yamikaze.redis.test.MyJedisFactory;
import redis.clients.jedis.Jedis;

/**
* 消息发布方
* @author yamikaze
*/

public class Publisher {

    public static final String CHANNEL_KEY = "channel:message";
    private Jedis jedis;

    public Publisher() {
        jedis = MyJedisFactory.getLocalJedis();
    }

    public void publishMessage(String message) {
        if(StringUtils.isBlank(message)) {
            return;
        }
        jedis.publish(CHANNEL_KEY, message);
    }

    public static void main(String[] args) {
        Publisher publisher = new Publisher();
        publisher.publishMessage("Hello Redis!");
    }
}

  简单的发送一个消息。

     消息订阅方:    

package org.yamikaze.redis.messsage.subscribe;

import org.yamikaze.redis.test.MyJedisFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;

import java.util.concurrent.TimeUnit;

/**
* 消息订阅方客户端
* @author yamikaze
*/

public class SubscribeClient {

    private Jedis jedis;
    private static final String EXIT_COMMAND = "exit";

    public SubscribeClient() {
        jedis = MyJedisFactory.getLocalJedis();
    }

    public void subscribe(String ...channel) {
        if(channel == null || channel.length <= 0) {
            return;
        }
        //消息处理,接收到消息时如何处理
        JedisPubSub jps = new JedisPubSub() {
            /**
             * JedisPubSub类是一个没有抽象方法的抽象类,里面方法都是一些空实现
             * 所以可以选择需要的方法覆盖,这儿使用的是SUBSCRIBE指令,所以覆盖了onMessage
             * 如果使用PSUBSCRIBE指令,则覆盖onPMessage方法
             * 当然也可以选择BinaryJedisPubSub,同样是抽象类,但方法参数为byte[]
             */

            @Override
            public void onMessage(String channel, String message) {
                if(Publisher.CHANNEL_KEY.equals(channel)) {
                    System.out.println("接收到消息: channel : " + message);
                    //接收到exit消息后退出
                    if(EXIT_COMMAND.equals(message)) {
                        System.exit(0);
                    }

                }
            }

            /**
             * 订阅时
             */

            @Override
            public void onSubscribe(String channel, int subscribedChannels) {
                if(Publisher.CHANNEL_KEY.equals(channel)) {
                    System.out.println("订阅了频道:" + channel);
                }
            }
        };
        //可以订阅多个频道 当前线程会阻塞在这儿
        jedis.subscribe(jps, channel);
    }

    public static void main(String[] args) {
        SubscribeClient client = new SubscribeClient();
        client.subscribe(Publisher.CHANNEL_KEY);
        //并没有 unsubscribe方法
        //相应的也没有punsubscribe方法
    }
}

  先运行client,再运行Publisher进行消息发送,输出结果:

                点击在新窗口中浏览原图
CTRL+鼠标滚轮放大或缩小

image

     总结:

     使用Redis的List数据结构可以简单迅速地做一个消息队列,同时Redis提供的BRPOP和BLPOP等指令解决了频繁调用Jedis的rpop和lpop方法造成的资源浪费问题。除此之外,Redis提供对发布/订阅模式的指令,可以实现消息传递、进程间通信。

本文章强烈推荐:

Java电子书高清PDF集合免费下载

https://www.jianshu.com/p/2698107fc68a
----------------------------
原文链接:https://www.jianshu.com/p/27325793910b

程序猿的技术大观园:www.javathinker.net



[这个贴子最后由 flybird 在 2020-11-25 11:27:39 重新编辑]
  Java面向对象编程-->流程控制
  JavaWeb开发-->Servlet技术详解(Ⅲ)
  JSP与Hibernate开发-->Java应用分层架构及软件模型
  Java网络编程-->创建非阻塞的HTTP服务器
  精通Spring-->绑定CSS样式
  Vue3开发-->Vue Router路由管理器
  从零开始手写 spring ioc 框架,深入学习 spring 源码
  git 常用指令总结
  几种常见的MAVEN仓库地址
  在Spring MVC中配置线程池,进行异步请求处理
  springmvc处理异步请求的示例
  Spring MVC的拦截器的详细用法
  【项目实践】使用Vue.js和ElementUI快速实现后台管理系统的界...
  Spring MVC控制器类的方法的所支持的方法参数类型
  支付结算系统如何应对高并发、热点账户等问题
  分布式消息队列RocketMQ部署与监控
  酒店评论数据分析和挖掘-展现数据分析全流程:报告展示篇
  利用Spring Boot如何开发REST服务详解
  一份Spring Boot核心知识清单
  Spring MVC Controller单例陷阱
  Java核心库实现AOP过程
  更多...
 IPIP: 已设置保密
树形列表:   
1页 0条记录 当前第1
发表一个新主题 开启一个新投票 回复文章


中文版权所有: JavaThinker技术网站 Copyright 2016-2026 沪ICP备16029593号-2
荟萃Java程序员智慧的结晶,分享交流Java前沿技术。  联系我们
如有技术文章涉及侵权,请与本站管理员联系。