并发异步hytrix和dubbo服务治理适配

在web开发过程中,用户体验的最大忌讳恐怕就是漫长的等待了,慢的原因可能是因为逻辑复杂,有的是阻塞等待,放到系统中就是各种延迟了,系统优化的点方方面面,其中编码方式上我们针对不同场景分别使用并发提高响应速度,提升并发率,加强并发治理3个层面进行优化。

线上环境基本上使用dubbo进行rpc调用,系统中最基础的莫过于用户模块、SNS模块的调用最为频繁,这里就模拟这两个系统的调用演示优化内容。

提高响应速度

相互独立的请求使用并发方式调用

Service1 5s
Service2 5s

单线程调用,耗时10s
多线程调用,耗时5s

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
/**
* Package: me.j360.jdk.jdk8.juc
* User: min_xu
* Date: 2017/4/13 上午10:35
* 说明:
*/
public class Server {
private ThreadPoolExecutor pool = new ThreadPoolExecutor(4,4,Long.MAX_VALUE, TimeUnit.SECONDS,new LinkedBlockingDeque<>());
private SnsService snsService;
private UserService userService;
public static void main(String[] args) throws InterruptedException {
Server server = new Server();
server.setSnsService(new SnsService());
server.setUserService(new UserService());
int score = server.getScore(1L);
System.out.println("score:"+score);
server.pool.shutdownNow();
server.pool.awaitTermination(1,TimeUnit.SECONDS);
}
public int getScore(Long uid) {
long start = System.currentTimeMillis();
Future<Integer> ageFuture = pool.submit(new Callable<Integer>() {
public Integer call() {
return userService.getUserAge(uid);
}
});
Future<Integer> snsFuture = pool.submit(new Callable<Integer>() {
public Integer call() {
return snsService.getFollowCount(uid);
}
});
int a = 0,c = 0;
try {
a = ageFuture.get(6,TimeUnit.SECONDS);
c = snsFuture.get(6,TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
long end = System.currentTimeMillis();
System.out.println((end - start)/1000+" s " + a + " " + c );
return a + c;
}
public void setSnsService(SnsService snsService) {
this.snsService = snsService;
}
public void setUserService(UserService userService) {
this.userService = userService;
}
}

在线程池参数设计上,按照实际业务情况,可以参考公式去指定线程数。

N = N(cpu核心数) * U(cpu执行率)/ (1+W等待时间/C执行时间)

Nthreads=NcpuUcpu(1+W/C)

提高并发率

在高并发场景中,吞吐量可能是一个权重较高的考虑因素,在线程池的设计中,线程使用完并且有请求在排队时,系统的吞吐量会受到很大影响,如果能缩短线程的使用时间,或者线程出让系统资源占用则能够提升较大吞吐量,java1.8则提供了异步编程利器,ComplatableFuture,在这之前通常使用guava的ComplatableListener来实现,基于现在大部分新系统都1.8了,这里演示下在常用的几种情况下如何使用异步非阻塞方式提高吞吐量。

提升吞吐量,并不意味着会提升响应速度,相反,还会增加响应时间,所以中间的取舍需要根据实际情况考虑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
private ThreadPoolExecutor pool = new ThreadPoolExecutor(4,4,Long.MAX_VALUE, TimeUnit.SECONDS,new LinkedBlockingDeque<>());
private ThreadPoolExecutor pool2 = new ThreadPoolExecutor(4,4,Long.MAX_VALUE, TimeUnit.SECONDS,new LinkedBlockingDeque<>());
private ThreadPoolExecutor pool3 = new ThreadPoolExecutor(4,4,Long.MAX_VALUE, TimeUnit.SECONDS,new LinkedBlockingDeque<>());
private SnsService snsService;
private UserService userService;
public static void main(String[] args) throws InterruptedException {
WebServer server = new WebServer();
server.setSnsService(new SnsService());
server.setUserService(new UserService());
//int score = server.getScore(1L);
//System.out.println("score:"+score);
List<String> list = server.getUserNames(new Long[]{1L,2L,3L});
list.forEach(a -> System.out.println(a));
server.pool.shutdownNow();
server.pool.awaitTermination(1,TimeUnit.SECONDS);
server.pool2.shutdownNow();
server.pool2.awaitTermination(1,TimeUnit.SECONDS);
server.pool3.shutdownNow();
server.pool3.awaitTermination(1,TimeUnit.SECONDS);
}
private List<String> getUserNames(Long[] uids) {
List<CompletableFuture<String>> cfs = new ArrayList<CompletableFuture<String>>();
int i = 0;
for(Long uid:uids) {
cfs.add( CompletableFuture.supplyAsync(() -> userService.getUserName(uid)).exceptionally(throwable -> {
System.out.println("Unrecoverable error ");
return null;
}));
i++;
}
CompletableFuture<List<String>> cfss = sequence(cfs);
//转化成
try {
List<String> list = cfss.get();return list;
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return null;
}
public static <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> futures) {
CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
return allDoneFuture.thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.<T>toList()));
}
/*public static <T> CompletableFuture<Stream<T>> sequence(Stream<CompletableFuture<T>> futures) {
List<CompletableFuture<T>> futureList = futures.filter(f -> f != null).collect(Collectors.toList());
return sequence(futureList);
}*/
private int getAnotherAge(Integer future,Long uid) {
return future.intValue() + userService.getUserAge(uid);
}
public int getScore(Long uid) {
long start = System.currentTimeMillis();
CompletableFuture<Integer> ageFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("ageFuture:"+Thread.currentThread().getName());
return userService.getUserAge(uid);
},pool);
CompletableFuture<Integer> snsFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("snsFuture:"+Thread.currentThread().getName());
return snsService.getFollowCount(uid);},pool);
int a = 0;
try {
//互斥任务
CompletableFuture<Integer> af = ageFuture.thenCombineAsync(CompletableFuture.supplyAsync(() -> {
System.out.println("af:"+Thread.currentThread().getName());
return userService.getUserAge(uid);},pool3),(m,n) -> {
System.out.println("af2:"+Thread.currentThread().getName());
return m+n;},pool2);
a = af.get(6,TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
long end = System.currentTimeMillis();
System.out.println((end - start)/1000+" s 互斥任务:" + a );
int c = 0;
try {
CompletableFuture<Integer> sf = snsFuture.thenComposeAsync(future -> CompletableFuture.supplyAsync(() -> {
System.out.println("sf:"+Thread.currentThread().getName());
return getAnotherAge(future,uid);
},pool2));
c = sf.get(6,TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
end = System.currentTimeMillis();
System.out.println((end - start)/1000+" s 连接任务:" + c );
return a + c;
}
public void setSnsService(SnsService snsService) {
this.snsService = snsService;
}
public void setUserService(UserService userService) {
this.userService = userService;
}

加强并发治理

上述的2种做法都是基于客户端Client的改进,但是最终的服务都是Server提供的结果,并发+吞吐量会增加Server的压力。

当系统处于比较紧张的资源消耗或者部分资源不可用时,容易造成系统雪崩现象,请求会瞬间迁移到可用路由或者加剧原本就紧张的服务器上,造成连环崩溃,如何在高并发场景下控制雪崩问题,hytrix提供了一系列解决方案,主要围绕3个功能:

  1. 资源隔离
  2. 降级
  3. 熔断

下面分别3个场景下如何使用hytrix在实际场景中的使用

在dubbo中适配hytrix

在dubbo中,侵入式的hytrix的使用给开发带来了较大的工作量,这里可以将hytrix作为dubbo插件方式提供服务,经过实际环境运行,效果比较可靠。

源码见:
https://github.com/xuminwlt/j360-dubbo-app-all

线程池大小可以通过dubbo参数进行控制,当前其他的参数也可以通过类似的方式进行配置