管道传输
# 场景介绍
云数据库Redis提供管道传输(pipeline)机制。管道(pipeline)将客户端client与服务器端的交互明确划分为单向的发送请求(Send Request)和接收响应(Receive Response),可以一次性发送多条命令并在执行完后一次性将结果返回。管道机制通过减少客户端与服务器端的通信次数来实现降低往返延时时间。
对于一些可批量操作,无需即时获取响应结果,且允许偶尔操作失败的场景来说,使用管道机制可以大幅度提升性能,性能提升的原因主要是减少了TCP连接中交互往返的开销。针对偶尔操作失败的情况可以通过补偿机制完善。管道传输机制适用于短信群发等类似的业务场景。
注意:
使用管道传输时客户端将独占与服务器端的连接,此期间将不能进行其他“非管道”类型操作,直至pipeline被关闭;如果要同时执行其他操作,可以为pipeline操作单独建立一个连接,将其与常规操作分离开来。
# 代码示例
- 响应数据
在Jedis中使用管道(pipeline)时,对于响应数据(response)的处理有两种方式,代码示例如下:
import java.util.List;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.Response;
public class PipelineClientTest {
static final String host = "10.XX.XX.13";
static final int port = 9736;
static final String password = "password";
public static void main(String[] args) {
Jedis jedis = new Jedis(host, port);
// Redis的实例密码
String authString = jedis.auth(password);// password
if (!authString.equals("OK")) {
System.err.println("AUTH Failed: " + authString);
jedis.close();
return;
}
String key = "Redis-Test1";
jedis.del(key);//初始化
// -------- 方法1
Pipeline p1 = jedis.pipelined();
System.out.println("-----方法1-----");
for (int i = 0; i < 5; i++) {
p1.incr(key);
System.out.println("Pipeline发送请求");
}
// 发送请求完成,开始接收响应
System.out.println("发送请求完成,开始接收响应");
List<Object> responses = p1.syncAndReturnAll();
if (responses == null || responses.isEmpty()) {
jedis.close();
throw new RuntimeException("Pipeline error: 没有接收到响应");
}
for (Object resp : responses) {
System.out.println("Pipeline接收响应Response: " + resp.toString());
}
System.out.println();
//-------- 方法2
System.out.println("-----方法2-----");
jedis.del(key);//初始化
Pipeline p2 = jedis.pipelined();
//需要先声明Response
Response<Long> r1 = p2.incr(key);
System.out.println("Pipeline发送请求");
Response<Long> r2 = p2.incr(key);
System.out.println("Pipeline发送请求");
Response<Long> r3 = p2.incr(key);
System.out.println("Pipeline发送请求");
Response<Long> r4 = p2.incr(key);
System.out.println("Pipeline发送请求");
Response<Long> r5 = p2.incr(key);
System.out.println("Pipeline发送请求");
try{
r1.get(); //此时还未开始接收响应,所以此操作会出错
}catch(Exception e){
System.out.println(" <<< Pipeline error:还未开始接收响应 >>> ");
}
// 发送请求完成,开始接收响应
System.out.println("发送请求完成,开始接收响应");
p2.sync();
System.out.println("Pipeline接收响应Response: " + r1.get());
System.out.println("Pipeline接收响应Response: " + r2.get());
System.out.println("Pipeline接收响应Response: " + r3.get());
System.out.println("Pipeline接收响应Response: " + r4.get());
System.out.println("Pipeline接收响应Response: " + r5.get());
jedis.close();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
在输入了正确的云数据库Redis版实例访问地址和密码之后,运行以上Java程序,输出结果如下:
-----方法1-----
Pipeline发送请求
Pipeline发送请求
Pipeline发送请求
Pipeline发送请求
Pipeline发送请求
发送请求完成,开始接收响应
Pipeline接收响应Response: 1
Pipeline接收响应Response: 2
Pipeline接收响应Response: 3
Pipeline接收响应Response: 4
Pipeline接收响应Response: 5
-----方法2-----
Pipeline发送请求
Pipeline发送请求
Pipeline发送请求
Pipeline发送请求
Pipeline发送请求
<<< Pipeline error:还未开始接收响应 >>>
发送请求完成,开始接收响应
Pipeline接收响应Response: 1
Pipeline接收响应Response: 2
Pipeline接收响应Response: 3
Pipeline接收响应Response: 4
Pipeline接收响应Response: 5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
- 性能对比
import java.util.Date;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
public class RedisPipelinePerformanceTest {
static final String host = "10.XX.XX.13";
static final int port = 9736;
static final String password = "password";
public static void main(String[] args) {
Jedis jedis = new Jedis(host, port);
//Redis的实例密码
String authString = jedis.auth(password);// password
if (!authString.equals("OK")) {
System.err.println("AUTH Failed: " + authString);
jedis.close();
return;
}
//连续执行多次命令操作
final int COUNT=5000;
String key = "Redis-Tanghan";
// 1 ---不使用pipeline操作---
jedis.del(key);//初始化key
Date ts1 = new Date();
for (int i = 0; i < COUNT; i++) {
//发送一个请求,并接收一个响应(Send Request and Receive Response)
jedis.incr(key);
}
Date ts2 = new Date();
System.out.println("不用Pipeline > value为:"+jedis.get(key)+" > 操作用时:" + (ts2.getTime() - ts1.getTime())+ "ms");
//2 ----对比使用pipeline操作---
jedis.del(key);//初始化key
Pipeline p1 = jedis.pipelined();
Date ts3 = new Date();
for (int i = 0; i < COUNT; i++) {
//发出请求 Send Request
p1.incr(key);
}
//接收响应 Receive Response
p1.sync();
Date ts4 = new Date();
System.out.println("使用Pipeline > value为:"+jedis.get(key)+" > 操作用时:" + (ts4.getTime() - ts3.getTime())+ "ms");
jedis.close();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
在输入了正确的云数据库Redis版实例访问地址和密码之后,运行以上Java程序,输出结果如下:
不用Pipeline > value为:5000 > 操作用时:4734ms
使用Pipeline > value为:5000 > 操作用时:64ms
1
2
2
您可以看到使用pipeline的性能要快的多。