云数据库 Redis
  • 产品发布记录
  • 新手引导
  • 产品简介

  • 购买指南

  • 快速入门

  • 操作指南

  • 性能白皮书
  • API文档

  • 最佳实践

    • 消息发布订阅
      • 场景介绍
      • 代码示例
    • 管道传输
    • 事务处理
  • 常见问题

  • 故障处理

  • 服务条款
  • 相关概念
  • 联系我们
  • 云数据库Redis
  • 最佳实践
云数据库 Redis

云数据库Redis是首云提供的兼容开源Redis协议标准、基于键值对形式存储的内存数据库服务,具有高可用、高可靠、弹性扩展等特点。支持主从和集群两种架构,可实现亚毫秒级响应时间,每秒处理数十万个请求,可满足高吞吐、低延迟及弹性变配等业务需求。

  • 产品简介
    • 产品概述

    • 产品优势

    • 应用场景

    • 产品系列

    • 命令支持

    • 地域与可用区

  • 购买指南
    • 计费概述

    • 购买方式

    • 欠费说明

    • 调整实例规格费用说明

  • 快速入门
    • 创建Redis实例

    • 设置白名单

    • 连接Redis实例

  • 操作指南
    • 使用限制

    • 操作总览

    • 管理实例

    • 连接实例

    • 网络与安全

    • 备份与恢复

    • 数据迁移

    • 监控告警

    • 账号与密码

    • 参数配置

    • 禁用命令

    • 标签管理

  • API文档
    • 认证方式

    • API概览

    • 实例相关接口

    • 备份相关接口

    • 错误码

  • 最佳实践
    • 消息发布订阅

    • 管道传输

    • 事务处理

  • 常见问题
    • 购买计费

    • 连接登录

    • 使用数据库

    • 监控报警

    • 持久化

    • 缓存策略

  • 故障处理
    • Redis无法连接

    • 使用Csredis客户端时出现Unexpected end of stream异常

    • 使用redis-py客户端时连接集群时报错

    • 使用Jedis客户端时出现Unexpected end of stream异常

    • 使用Jedis客户端时出现OOM异常

    • 如何处理大key问题

消息发布订阅

最后更新时间:2022-01-12 生成PDF文件 | 前往GitHub编辑

# 场景介绍

云数据库Redis提供消息发布(publish)订阅(subscribe)功能,可以用于消息的传输。在Redis中设定对某一个key值进行消息发布及消息订阅,当一个key值上进行了消息发布后,所有订阅它的客户端都会收到相应的消息。

云数据库Redis的发布订阅机制包括三部分,分别是消息发布者(即publish客户端)、消息订阅者(即subscribe客户端)、频道(即Channel服务器端)。Redis发布的消息是“非持久”的,即消息发布者只负责发送消息,而不管消息是否有接收方,也不会保存之前发送的消息,即发布的消息“即发即失”。消息订阅者也只能得到订阅之后的消息,频道中在此之前的消息将无从获得。可以使用消息发布订阅的常见业务场景有构建实时消息系统(如普通的即时聊天、群聊功能),门户网站的内容更新等。

在使用消息发布订阅功能时,消息发布者无需独占与服务器端的连接,您可以在发布消息的同时,使用同一个客户端连接进行其他操作(例如List操作等)。但是,消息订阅者需要独占与服务器端的连接,即进行 subscribe 期间,该客户端无法执行其他操作,而是以阻塞的方式等待频道中的消息,因此消息订阅者需要使用单独的服务器连接,或者需要在单独的线程中使用。

# 代码示例

  • 消息发布者(publish client)
import redis.clients.jedis.Jedis;
public class RedisPubClient {
    private Jedis jedis;
    public RedisPubClient(String host,int port, String password){
        jedis = new Jedis(host,port);
        //Redis实例密码
        String authString = jedis.auth(password);
        if (!authString.equals("OK"))
        {
            System.err.println("AUTH Failed: " + authString);
            return;
        }
    }
    public void pub(String channel,String message){
        System.out.println("  >>> 发布(PUBLISH) > Channel:"+channel+" > 发送出的Message:"+message);
        jedis.publish(channel, message);
    }
    public void close(String channel){
        System.out.println("  >>> 发布(PUBLISH)结束 > Channel:"+channel+" > Message:quit");
        //消息发布者结束发送,发送一个“quit”消息;
        jedis.publish(channel, "quit");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
  • 消息订阅者(subscribe client)
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
public class RedisSubClient extends Thread{
    private Jedis jedis;
    private String channel;
    private JedisPubSub listener;
    public RedisSubClient(String host,int port, String password){
        jedis = new Jedis(host,port);
                //Redis实例密码
                String authString = jedis.auth(password);//password
                if (!authString.equals("OK"))
                {
                    System.err.println("AUTH Failed: " + authString);
                    return;
                }
    }
    public void setChannelAndListener(JedisPubSub listener,String channel){
        this.listener=listener;
        this.channel=channel;
    }
    private void subscribe(){
        if(listener==null || channel==null){
            System.err.println("Error:SubClient> listener or channel is null");
        }
        System.out.println("  >>> 订阅(SUBSCRIBE) > Channel:"+channel);
        System.out.println();
        //接收者在侦听订阅的消息时,将会阻塞进程,直至接收到quit消息(被动方式),或主动取消订阅
        jedis.subscribe(listener, channel);
    }
    public void unsubscribe(String channel){
        System.out.println("  >>> 取消订阅(UNSUBSCRIBE) > Channel:"+channel);
        System.out.println();
        listener.unsubscribe(channel);
    }
    @Override
    public void run() {
        try{
            System.out.println();
            System.out.println("----------订阅消息SUBSCRIBE 开始-------");
            subscribe();
            System.out.println("----------订阅消息SUBSCRIBE 结束-------");
            System.out.println();
        }catch(Exception e){
            e.printStackTrace();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
  • 消息监听者
import redis.clients.jedis.JedisPubSub;
public class RedisMessageListener extends JedisPubSub{
    @Override
    public void onMessage(String channel, String message) {
        System.out.println("  <<< 订阅(SUBSCRIBE)< Channel:" + channel + " >接收到的Message:" + message );
        System.out.println();
        //当接收到的message为quit时,取消订阅(被动方式)
        if(message.equalsIgnoreCase("quit")){
            this.unsubscribe(channel);
        }
    }
    @Override
    public void onPMessage(String pattern, String channel, String message) {
        // TODO Auto-generated method stub
    }
    @Override
    public void onSubscribe(String channel, int subscribedChannels) {
        // TODO Auto-generated method stub
    }
    @Override
    public void onUnsubscribe(String channel, int subscribedChannels) {
        // TODO Auto-generated method stub
    }
    @Override
    public void onPUnsubscribe(String pattern, int subscribedChannels) {
        // TODO Auto-generated method stub
    }
    @Override
    public void onPSubscribe(String pattern, int subscribedChannels) {
        // TODO Auto-generated method stub
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
  • 主程序示例
import java.util.UUID;
import redis.clients.jedis.JedisPubSub;
public class RedisPubSubTest {
    //Redis的连接信息
    static final String host = "10.XX.XX.13";
    static final int port = 9736;
    static final String password="password";//密码
    public static void main(String[] args) throws Exception{
            RedisPubClient pubClient = new RedisPubClient(host, port,password);
            final String channel = "订阅频道测试-A";
            //消息发送者开始发消息,此时还无人订阅,所以此消息不会被接收
            pubClient.pub(channel, "首云消息1:(此时还无人订阅,所以此消息不会被接收)");
            //消息接收者
            RedisSubClient subClient = new RedisSubClient(host, port,password);
            JedisPubSub listener = new RedisMessageListener();
            subClient.setChannelAndListener(listener, channel);
            //消息接收者开始订阅
            subClient.start();
            //消息发送者继续发消息
            for (int i = 0; i < 5; i++) {
                String message=UUID.randomUUID().toString();
                pubClient.pub(channel, message);
                Thread.sleep(1000);
            }
            //消息接收者主动取消订阅
            subClient.unsubscribe(channel);
            Thread.sleep(1000);
            pubClient.pub(channel, "首云消息2:(此时订阅取消,所以此消息不会被接收)");
            //消息发布者结束发送,即发送一个“quit”消息;
            //此时如果有其他的消息接收者,那么在listener.onMessage()中接收到“quit”时,将执行“unsubscribe”操作。
            pubClient.close(channel);
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
  • 运行结果

在输入了正确的云数据库Redis版实例访问地址和密码之后,运行以上Java程序,输出结果如下:

  >>> 发布(PUBLISH) > Channel:订阅频道测试-A > 发送出的Message:首云消息1:(此时还无人订阅,所以此消息不会被接收)
----------订阅消息SUBSCRIBE 开始-------
  >>> 订阅(SUBSCRIBE) > Channel:订阅频道测试-A
  >>> 发布(PUBLISH) > Channel:订阅频道测试-A > 发送出的Message:382163fe-99dc-4f1b-8f18-ce55dd38140e
  <<< 订阅(SUBSCRIBE)< Channel:订阅频道测试-A >接收到的Message:382163fe-99dc-4f1b-8f18-ce55dd38140e
  >>> 发布(PUBLISH) > Channel:订阅频道测试-A > 发送出的Message:f8ec92fe-83e6-41d8-a676-ecab7fa202f3
  <<< 订阅(SUBSCRIBE)< Channel:订阅频道测试-A >接收到的Message:f8ec92fe-83e6-41d8-a676-ecab7fa202f3
  >>> 发布(PUBLISH) > Channel:订阅频道测试-A > 发送出的Message:4281e8f6-9377-4376-bc09-fff5995a89b0
  <<< 订阅(SUBSCRIBE)< Channel:订阅频道测试-A >接收到的Message:4281e8f6-9377-4376-bc09-fff5995a89b0
  >>> 发布(PUBLISH) > Channel:订阅频道测试-A > 发送出的Message:73bae6b8-3753-4047-b8f9-7a6919d643c4
  <<< 订阅(SUBSCRIBE)< Channel:订阅频道测试-A >接收到的Message:73bae6b8-3753-4047-b8f9-7a6919d643c4
  >>> 发布(PUBLISH) > Channel:订阅频道测试-A > 发送出的Message:c4221aea-26de-4aee-be1f-9d832a81ec13
  <<< 订阅(SUBSCRIBE)< Channel:订阅频道测试-A >接收到的Message:c4221aea-26de-4aee-be1f-9d832a81ec13
  >>> 取消订阅(UNSUBSCRIBE) > Channel:订阅频道测试-A
----------订阅消息SUBSCRIBE 结束-------
  >>> 发布(PUBLISH) > Channel:订阅频道测试-A > 发送出的Message:首云消息2:(此时订阅取消,所以此消息不会被接收)
  >>> 发布(PUBLISH)结束 > Channel:订阅频道测试-A > Message:quit
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
错误码
管道传输

← 错误码 管道传输→

最近更新
01
_index
08-18
02
将备份数据迁移至首云Redis
07-11
03
监控概览
04-08
更多文章>

版权所有 ©2005 - 2024 Capitalonline Data Service Co., Ltd 备案序号:京ICP备06033943号 京公网安备:11010502020343号

北京首都在线科技股份有限公司(总部) 经营许可证:B1.B2-20140358 上海红之盟网络科技有限公司(首都在线全资子公司) 经营许可证:B1-20194861