有一个项目用来负责调度集群中的”cron任务”,比如一个application中可以配置N个定时任务,这些任务信息最终注册到zookeeper上,并开发了一系列代码用于维护这些任务的”活性”;当applicaton中一个server故障,那么这个server上接管的任务,需要迁移到其他server上,如果多个server存活的话,还需要这些任务能够”均衡”的分布.
其中”负载均衡”,很好理解,比如有6个任务,3个server,那么就需要每个server上尽可能的运行2个任务;其实这个事情想起来很简单,但是做起来似乎有些不得不考虑的问题:
1) “相对平均”怎么设计
2) 迁移任务时,是否会丢失任务的触发时机;比如一个任务凌晨3点执行,刚好此时运行了一次”均衡”,任务在原来的server上没有触发,在新的server上又过了时间..
3) 迁移任务时,还需要考虑”最少移动”次数,不能大面积迁移任务;只能从”负载高”的server上迁移到”负载低”的.
例如:
sid1: w1 w2 w3 w4
sid2: w5
sid3:w6
期望迁移之后:
sid1:w1 w2
sid2:w5 w3
sid3:w4 w6
而不是(这种结果迁移的面积太大,只需要把”多余”的任务迁移出去即可,而不是重新洗牌再均衡)
sid1:w6 w5
sid2:w2 w3
sid3:w1 w4
经过提取,”相对平均”的设计代码如下,仅作备忘:
1. package
2.
3. import
4. import
5. import
6.
7. public class
8. private List servers = new
9. private Map> current = new
10. private Set workers = new
11.
12. public static void
13. new BufferedReader(new
14. String line;
15. new
16. new
17. try
18. while ((line = br.readLine()) != null) {
19. if (line.startsWith("addWorker")) {
20. balancer.addWorkers(line);
21. else if (line.startsWith("addServer")) {
22. balancer.addServers(line);
23. else
24. "???");
25. continue;
26. }
27. balancer.rebalance();
28. }
29. catch
30. e.printStackTrace();
31. }
32. "--END---");
33. }
34.
35. public void
36. int index = source.indexOf(" ");
37. if (index == -1) {
38. return;
39. }
40. 1).split(" ");
41. if (values == null || values.length == 0) {
42. return;
43. }
44. for
45. servers.add(server);
46. if(current.get(server) == null){
47. new
48. }
49. }
50. }
51.
52. public void
53. int index = source.indexOf(" ");
54. if (index == -1) {
55. return;
56. }
57. 1).split(" ");
58. if (values == null || values.length == 0) {
59. return;
60. }
61. //当有新的worker提交时,将咱有一台机器接管
62. 0);
63. List sw = current.get(sid);
64. if(sw == null){
65. new
66. }
67. for
68. workers.add(worker);
69. sw.add(worker);
70. }
71.
72. }
73.
74. public void
75. try
76. if
77. return;
78. }
79. for
80. if (current.get(sid) == null) {
81. new
82. }
83. }
84. //根据每个sid上的worker个数,整理成一个排序的map
85. new
86. for
87. int
88. List sl = counterMap.get(total);
89. if (sl == null) {
90. new
91. counterMap.put(total, sl);
92. }
93. //sid
94. }
95. int
96. int
97. int avg = totalWorkers / totalServers;//每个server实例可以接管任务的平均数
98. while (true) {
99. //大于平均数的列表, >= avg + 1
100. //与平均数差值为2的 _currentWorkers = current.get(_fromSid);
121. if (_currentWorkers == null
122. it.remove();
123. current.remove(_fromSid);
124. continue;
125. }
126. List _ltServers = lt.getValue();
127. if
128. counterMap.remove(ltKey);
129. break;
130. }
131. //取出需要交换出去的任务id
132. int
133. 1);
134. 0);
135. //从_fromSid的worker列表中移除低workerId
136. //注意:移除最后一个,和_ltWorkers.add(_wid)对应,_ltWorkers将新任务添加到list的尾部
137. //即从尾部移除,从尾部添加,基本保证"原任务,最少迁移次数"
138. 1);
139. it.remove();
140. 0);
141. //将此workerId添加到_toSid的worker列表中
142. List _ltWorkers = current.get(_toSid);
143. if (_ltWorkers == null) {
144. new
145. current.put(_toSid, _ltWorkers);
146. }
147. _ltWorkers.add(_wid);
148. //将gt的key降低一个数字
149. 1);
150. if (_next == null) {
151. new
152. 1, _next);
153. }
154. _next.add(_fromSid);
155. //将lt的key提升一个数字
156. 1);
157. //从lt的countMap中移除,因为它将被放置在key + 1的新位置
158. Iterator _ltIt = _ltServers.iterator();
159. while
160. if
161. _ltIt.remove();
162. break;
163. }
164. }
165. if (_prev == null) {
166. new
167. 1, _prev);
168. }
169. _prev.add(_toSid);
170. }
171. }
172. //dump info
173. for
174. "Sid:"
175. System.out.println(entry.getValue().toString());
176. }
177. catch
178. e.printStackTrace();
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
机房租用,北京机房租用,IDC机房托管, http://www.fwqtg.net
相关推荐: ChatGPT之问艺道:如何借助神级算法Prompt,让你轻松get到更高质量答案?
摘要:本文由葡萄城技术团队编写,文章的内容借鉴于Ibrahim John的《The Art of Asking ChatGPT》(向ChatGPT提问的艺术)。 前言 当今,ChatGPT赢得越来越多人的青睐,人们通过它输入问题并获取答案。但除了简单的一问一答…