Java 核心技术读书笔记——Java并发编程
1 并行计算
- 业务:任务多,数据量大
- 串行 vs 并行
- 串行编程简单,并行编程困难
- 单个计算核频率下降,计算核数增多,整体性能变高
- 并行困难(任务分配和执行过程高度耦合)
- 如何控制粒度,切割任务
- 如何分配任务给线程,监督线程执行过程
- 并行模式
- 主从模式 (Master-Slave)
Worker
模式(Worker-Worker)
- Java并发编程
Thread/Runnable/Thread
组管理
Executor
Fork-Join
框架
2 线程组管理
线程组ThreadGroup
- 线程的集合
- 树形结构,大线程组可以包括小线程组
- 可以通过
enumerate
方法遍历组内的线程,执行操作
- 能够有效管理多个线程,但是管理效率低
- 任务分配和执行过程高度耦合
- 重复创建线程、关闭线程操作,无法重用线程
笔记
activeCount
返回线程组中还处于active
的线程数(估计数);
enumerate
将线程组中active
的线程拷贝到数组中;
interrupt
对线程组中所有的线程发出interrupt
信号;
list
打印线程组中所有的线程信息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
|
// Result.java
package threadgroup;
/**
* 搜索结果类
*
*/
public class Result {
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
// Searcher.java
package threadgroup;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.TimeUnit;
public class Searcher implements Runnable {
private Result result;
public Searcher(Result result) {
this.result=result;
}
@Override
public void run() {
String name=Thread.currentThread().getName();
System.out.printf("Thread %s: 启动\n",name);
try {
doTask();
result.setName(name);
} catch (InterruptedException e) {
System.out.printf("Thread %s: 被中断\n",name);
return;
}
System.out.printf("Thread %s: 完成\n",name);
}
private void doTask() throws InterruptedException {
Random random=new Random((new Date()).getTime());
int value=(int)(random.nextDouble()*100);
System.out.printf("Thread %s: %d\n",Thread.currentThread().getName(),value);
TimeUnit.SECONDS.sleep(value);
}
}
// Main.java
package threadgroup;
import java.util.concurrent.TimeUnit;
public class Main {
public static void main(String[] args) {
// 创建线程组
ThreadGroup threadGroup = new ThreadGroup("Searcher");
Result result=new Result();
// 创建一个任务,10个线程完成
Searcher searchTask=new Searcher(result);
for (int i=0; i<10; i++) {
Thread thread=new Thread(threadGroup, searchTask);
thread.start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("========华丽丽0=======");
// 查看线程组消息
System.out.printf("active 线程数量: %d\n",threadGroup.activeCount());
System.out.printf("线程组信息明细\n");
threadGroup.list();
System.out.println("========华丽丽1=======");
// 遍历线程组
Thread[] threads=new Thread[threadGroup.activeCount()];
threadGroup.enumerate(threads);
for (int i=0; i<threadGroup.activeCount(); i++) {
System.out.printf("Thread %s: %s\n",threads[i].getName(),threads[i].getState());
}
System.out.println("========华丽丽2=======");
// Wait for the finalization of the Threadds
waitFinish(threadGroup);
// Interrupt all the Thread objects assigned to the ThreadGroup
threadGroup.interrupt();
}
public static void waitFinish(ThreadGroup threadGroup) {
while (threadGroup.activeCount()>9) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
*
* 运行结果:
* Thread Thread-0: 启动
* Thread Thread-0: 83
* Thread Thread-1: 启动
* Thread Thread-1: 73
* Thread Thread-2: 启动
* Thread Thread-2: 1
* Thread Thread-2: 完成
* Thread Thread-3: 启动
* Thread Thread-3: 92
* Thread Thread-4: 启动
* Thread Thread-4: 73
* Thread Thread-5: 启动
* Thread Thread-5: 64
* Thread Thread-6: 启动
* Thread Thread-6: 89
* Thread Thread-7: 启动
* Thread Thread-7: 80
* Thread Thread-8: 启动
* Thread Thread-8: 8
* Thread Thread-9: 启动
* Thread Thread-9: 99
* ========华丽丽0=======
* active 线程数量: 9
* 线程组信息明细
* java.lang.ThreadGroup[name=Searcher,maxpri=10]
* Thread[Thread-0,5,Searcher]
* Thread[Thread-1,5,Searcher]
* Thread[Thread-3,5,Searcher]
* Thread[Thread-4,5,Searcher]
* Thread[Thread-5,5,Searcher]
* Thread[Thread-6,5,Searcher]
* Thread[Thread-7,5,Searcher]
* Thread[Thread-8,5,Searcher]
* Thread[Thread-9,5,Searcher]
* ========华丽丽1=======
* Thread Thread-0: TIMED_WAITING
* Thread Thread-1: TIMED_WAITING
* Thread Thread-3: TIMED_WAITING
* Thread Thread-4: TIMED_WAITING
* Thread Thread-5: TIMED_WAITING
* Thread Thread-6: TIMED_WAITING
* Thread Thread-7: TIMED_WAITING
* Thread Thread-8: TIMED_WAITING
* Thread Thread-9: TIMED_WAITING
* ========华丽丽2=======
* Thread Thread-7: 被中断
* Thread Thread-5: 被中断
* Thread Thread-1: 被中断
* Thread Thread-9: 被中断
* Thread Thread-3: 被中断
* Thread Thread-8: 被中断
* Thread Thread-0: 被中断
* Thread Thread-6: 被中断
* Thread Thread-4: 被中断
*/
|
3 Java并发框架Executor
3.1 Executor FrameWork
从 JDK 5
开始提供Executor FrameWork
(java.util.concurrent.*
)
- 分离任务的创建和执行者的创建
- 线程重复利用(
new
线程代价很大)
3.2理解共享线程池的概念
- 预设好的多个
Thread
,可弹性增加
- 多次执行很多很小的任务
- 任务创建和执行过程解耦
- 程序员无需关心线程池执行任务过程
主要类:ExecutorService
, ThreadPoolExecutor
,Future
Executors.newCachedThreadPool/newFixedThreadPool
创建线程池
ExecutorService
线程池服务
Callable
具体的逻辑对象(线程类)
笔记
Callable
和Runnable
是等价的,可以用来执行一个任务
Runnable
的run
方法没有返回值,而Callable
的call
方法可以有返回值
Future
返回结果
- 实例1,各方法演示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
|
// executor/example1/Server.java
package executor.example1;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 执行服务器
*
*/
public class Server {
//线程池
private ThreadPoolExecutor executor;
public Server(){
// 创建一个默认线程池
executor=(ThreadPoolExecutor)Executors.newCachedThreadPool();
// 创建一个固定个数的线程池
//executor=(ThreadPoolExecutor)Executors.newFixedThreadPool(5);
}
//向线程池提交任务
public void submitTask(Task task){
System.out.printf("Server: A new task has arrived\n");
executor.execute(task); //执行 无返回值
System.out.printf("Server: Pool Size: %d\n",executor.getPoolSize());
System.out.printf("Server: Active Count: %d\n",executor.getActiveCount());
System.out.printf("Server: Completed Tasks: %d\n",executor.getCompletedTaskCount());
}
// 关闭整个线程池
public void endServer() {
executor.shutdown();
}
}
// executor/example1/Task.java
package executor.example1;
import java.util.Date;
import java.util.concurrent.TimeUnit;
/**
* Task 任务类
* @author Tom
*
*/
public class Task implements Runnable {
private String name;
public Task(String name){
this.name=name;
}
//休眠,进行两次输出
public void run() {
try {
Long duration=(long)(Math.random()*1000);
System.out.printf("%s: Task %s: Doing a task during %d seconds\n",Thread.currentThread().getName(),name,duration);
Thread.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("%s: Task %s: Finished on: %s\n",Thread.currentThread().getName(),name,new Date());
}
}
// executor/example1/Main.java
package executor.example1;
public class Main {
public static void main(String[] args) throws InterruptedException {
// 创建一个执行服务器
Server server=new Server();
// 创建20个任务,并发给执行器,等待完成
for (int i=0; i<20; i++){
Task task=new Task("Task "+i);
Thread.sleep(10);
//将任务提交给线程池去执行
server.submitTask(task);
}
// 关闭整个线程池
server.endServer();
}
|
- 运行结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
|
Server: A new task has arrived
Server: Pool Size: 1
pool-1-thread-1: Task Task 0: Doing a task during 148 seconds
Server: Active Count: 1
Server: Completed Tasks: 0
Server: A new task has arrived
Server: Pool Size: 2
Server: Active Count: 2
pool-1-thread-2: Task Task 1: Doing a task during 988 seconds
Server: Completed Tasks: 0
Server: A new task has arrived
Server: Pool Size: 3
Server: Active Count: 3
pool-1-thread-3: Task Task 2: Doing a task during 769 seconds
Server: Completed Tasks: 0
Server: A new task has arrived
Server: Pool Size: 4
Server: Active Count: 4
Server: Completed Tasks: 0
pool-1-thread-4: Task Task 3: Doing a task during 698 seconds
Server: A new task has arrived
Server: Pool Size: 5
Server: Active Count: 5
pool-1-thread-5: Task Task 4: Doing a task during 727 seconds
Server: Completed Tasks: 0
Server: A new task has arrived
Server: Pool Size: 6
Server: Active Count: 6
pool-1-thread-6: Task Task 5: Doing a task during 232 seconds
Server: Completed Tasks: 0
Server: A new task has arrived
Server: Pool Size: 7
Server: Active Count: 7
pool-1-thread-7: Task Task 6: Doing a task during 509 seconds
Server: Completed Tasks: 0
Server: A new task has arrived
Server: Pool Size: 8
pool-1-thread-8: Task Task 7: Doing a task during 21 seconds
Server: Active Count: 8
Server: Completed Tasks: 0
Server: A new task has arrived
Server: Pool Size: 9
Server: Active Count: 9
pool-1-thread-9: Task Task 8: Doing a task during 683 seconds
Server: Completed Tasks: 0
Server: A new task has arrived
Server: Pool Size: 10
Server: Active Count: 10
Server: Completed Tasks: 0
pool-1-thread-10: Task Task 9: Doing a task during 616 seconds
pool-1-thread-8: Task Task 7: Finished on: Mon Feb 28 21:37:57 CST 2022
Server: A new task has arrived
Server: Pool Size: 10
pool-1-thread-8: Task Task 10: Doing a task during 403 seconds
Server: Active Count: 10
Server: Completed Tasks: 1
Server: A new task has arrived
Server: Pool Size: 11
Server: Active Count: 11
pool-1-thread-11: Task Task 11: Doing a task during 115 seconds
Server: Completed Tasks: 1
Server: A new task has arrived
Server: Pool Size: 12
Server: Active Count: 12
pool-1-thread-12: Task Task 12: Doing a task during 966 seconds
Server: Completed Tasks: 1
Server: A new task has arrived
Server: Pool Size: 13
Server: Active Count: 13
Server: Completed Tasks: 1
pool-1-thread-13: Task Task 13: Doing a task during 224 seconds
Server: A new task has arrived
pool-1-thread-1: Task Task 0: Finished on: Mon Feb 28 21:37:58 CST 2022
Server: Pool Size: 14
pool-1-thread-14: Task Task 14: Doing a task during 799 seconds
Server: Active Count: 13
Server: Completed Tasks: 2
Server: A new task has arrived
Server: Pool Size: 14
pool-1-thread-1: Task Task 15: Doing a task during 509 seconds
Server: Active Count: 14
Server: Completed Tasks: 2
Server: A new task has arrived
Server: Pool Size: 15
Server: Active Count: 15
pool-1-thread-15: Task Task 16: Doing a task during 780 seconds
Server: Completed Tasks: 2
Server: A new task has arrived
Server: Pool Size: 16
Server: Active Count: 16
Server: Completed Tasks: 2
pool-1-thread-16: Task Task 17: Doing a task during 846 seconds
Server: A new task has arrived
Server: Pool Size: 17
Server: Active Count: 17
Server: Completed Tasks: 2
pool-1-thread-17: Task Task 18: Doing a task during 458 seconds
Server: A new task has arrived
Server: Pool Size: 18
Server: Active Count: 18
pool-1-thread-18: Task Task 19: Doing a task during 759 seconds
Server: Completed Tasks: 2
pool-1-thread-11: Task Task 11: Finished on: Mon Feb 28 21:37:58 CST 2022
pool-1-thread-6: Task Task 5: Finished on: Mon Feb 28 21:37:58 CST 2022
pool-1-thread-13: Task Task 13: Finished on: Mon Feb 28 21:37:58 CST 2022
pool-1-thread-8: Task Task 10: Finished on: Mon Feb 28 21:37:58 CST 2022
pool-1-thread-7: Task Task 6: Finished on: Mon Feb 28 21:37:58 CST 2022
pool-1-thread-17: Task Task 18: Finished on: Mon Feb 28 21:37:58 CST 2022
pool-1-thread-1: Task Task 15: Finished on: Mon Feb 28 21:37:58 CST 2022
pool-1-thread-10: Task Task 9: Finished on: Mon Feb 28 21:37:58 CST 2022
pool-1-thread-4: Task Task 3: Finished on: Mon Feb 28 21:37:58 CST 2022
pool-1-thread-9: Task Task 8: Finished on: Mon Feb 28 21:37:58 CST 2022
pool-1-thread-5: Task Task 4: Finished on: Mon Feb 28 21:37:58 CST 2022
pool-1-thread-3: Task Task 2: Finished on: Mon Feb 28 21:37:58 CST 2022
pool-1-thread-14: Task Task 14: Finished on: Mon Feb 28 21:37:58 CST 2022
pool-1-thread-15: Task Task 16: Finished on: Mon Feb 28 21:37:58 CST 2022
pool-1-thread-18: Task Task 19: Finished on: Mon Feb 28 21:37:58 CST 2022
pool-1-thread-2: Task Task 1: Finished on: Mon Feb 28 21:37:58 CST 2022
pool-1-thread-16: Task Task 17: Finished on: Mon Feb 28 21:37:58 CST 2022
pool-1-thread-12: Task Task 12: Finished on: Mon Feb 28 21:37:58 CST 2022
|
- 实例2,利用多线程统计1-1000总和,分成10个任务计算。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
|
package executor.example2;
import java.util.Random;
import java.util.concurrent.Callable;
public class SumTask implements Callable<Integer> {
//定义每个线程计算的区间
private int startNumber;
private int endNumber;
public SumTask(int startNumber, int endNumber){
this.startNumber=startNumber;
this.endNumber=endNumber;
}
@Override
public Integer call() throws Exception {
int sum = 0;
for(int i=startNumber; i<=endNumber; i++)
{
sum = sum + i;
}
Thread.sleep(new Random().nextInt(1000));
System.out.printf("%s: %d\n",Thread.currentThread().getName(),sum);
return sum;
}
}
package executor.example2;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
public class SumTest {
public static void main(String[] args) {
// 执行线程池
ThreadPoolExecutor executor=(ThreadPoolExecutor)Executors.newFixedThreadPool(4);
List<Future<Integer>> resultList=new ArrayList<>();
//统计1-1000总和,分成10个任务计算,提交任务
for (int i=0; i<10; i++){
SumTask calculator=new SumTask(i*100+1, (i+1)*100);
Future<Integer> result=executor.submit(calculator);
resultList.add(result);
}
// 每隔50毫秒,轮询等待10个任务结束
do {
System.out.printf("Main: 已经完成多少个任务: %d\n",executor.getCompletedTaskCount());
for (int i=0; i<resultList.size(); i++) {
Future<Integer> result=resultList.get(i);
System.out.printf("Main: Task %d: %s\n",i,result.isDone());
}
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
} while (executor.getCompletedTaskCount()<resultList.size());
// 所有任务都已经结束了,综合计算结果
int total = 0;
for (int i=0; i<resultList.size(); i++) {
Future<Integer> result=resultList.get(i);
Integer sum=null;
try {
sum=result.get();
total = total + sum;
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
System.out.printf("1-1000的总和:" + total);
// 关闭线程池
executor.shutdown();
}
}
|
- 运行结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
|
Main: 已经完成多少个任务: 0
Main: Task 0: false
Main: Task 1: false
Main: Task 2: false
Main: Task 3: false
Main: Task 4: false
Main: Task 5: false
Main: Task 6: false
Main: Task 7: false
Main: Task 8: false
Main: Task 9: false
Main: 已经完成多少个任务: 0
Main: Task 0: false
Main: Task 1: false
Main: Task 2: false
Main: Task 3: false
Main: Task 4: false
Main: Task 5: false
Main: Task 6: false
Main: Task 7: false
Main: Task 8: false
Main: Task 9: false
Main: 已经完成多少个任务: 0
Main: Task 0: false
Main: Task 1: false
Main: Task 2: false
Main: Task 3: false
Main: Task 4: false
Main: Task 5: false
Main: Task 6: false
Main: Task 7: false
Main: Task 8: false
Main: Task 9: false
Main: 已经完成多少个任务: 0
Main: Task 0: false
Main: Task 1: false
Main: Task 2: false
Main: Task 3: false
Main: Task 4: false
Main: Task 5: false
Main: Task 6: false
Main: Task 7: false
Main: Task 8: false
Main: Task 9: false
Main: 已经完成多少个任务: 0
Main: Task 0: false
Main: Task 1: false
Main: Task 2: false
Main: Task 3: false
Main: Task 4: false
Main: Task 5: false
Main: Task 6: false
Main: Task 7: false
Main: Task 8: false
Main: Task 9: false
Main: 已经完成多少个任务: 0
Main: Task 0: false
Main: Task 1: false
Main: Task 2: false
Main: Task 3: false
Main: Task 4: false
Main: Task 5: false
Main: Task 6: false
Main: Task 7: false
Main: Task 8: false
Main: Task 9: false
Main: 已经完成多少个任务: 0
Main: Task 0: false
Main: Task 1: false
Main: Task 2: false
Main: Task 3: false
Main: Task 4: false
Main: Task 5: false
Main: Task 6: false
Main: Task 7: false
Main: Task 8: false
Main: Task 9: false
Main: 已经完成多少个任务: 0
Main: Task 0: false
Main: Task 1: false
Main: Task 2: false
Main: Task 3: false
Main: Task 4: false
Main: Task 5: false
Main: Task 6: false
Main: Task 7: false
Main: Task 8: false
Main: Task 9: false
Main: 已经完成多少个任务: 0
Main: Task 0: false
Main: Task 1: false
Main: Task 2: false
Main: Task 3: false
Main: Task 4: false
Main: Task 5: false
Main: Task 6: false
Main: Task 7: false
Main: Task 8: false
Main: Task 9: false
Main: 已经完成多少个任务: 0
Main: Task 0: false
Main: Task 1: false
Main: Task 2: false
Main: Task 3: false
Main: Task 4: false
Main: Task 5: false
Main: Task 6: false
Main: Task 7: false
Main: Task 8: false
Main: Task 9: false
Main: 已经完成多少个任务: 0
Main: Task 0: false
Main: Task 1: false
Main: Task 2: false
Main: Task 3: false
Main: Task 4: false
Main: Task 5: false
Main: Task 6: false
Main: Task 7: false
Main: Task 8: false
Main: Task 9: false
pool-1-thread-3: 25050
Main: 已经完成多少个任务: 1
Main: Task 0: false
Main: Task 1: false
Main: Task 2: true
Main: Task 3: false
Main: Task 4: false
Main: Task 5: false
Main: Task 6: false
Main: Task 7: false
Main: Task 8: false
Main: Task 9: false
Main: 已经完成多少个任务: 1
Main: Task 0: false
Main: Task 1: false
Main: Task 2: true
Main: Task 3: false
Main: Task 4: false
Main: Task 5: false
Main: Task 6: false
Main: Task 7: false
Main: Task 8: false
Main: Task 9: false
Main: 已经完成多少个任务: 1
Main: Task 0: false
Main: Task 1: false
Main: Task 2: true
Main: Task 3: false
Main: Task 4: false
Main: Task 5: false
Main: Task 6: false
Main: Task 7: false
Main: Task 8: false
Main: Task 9: false
Main: 已经完成多少个任务: 1
Main: Task 0: false
Main: Task 1: false
Main: Task 2: true
Main: Task 3: false
Main: Task 4: false
Main: Task 5: false
Main: Task 6: false
Main: Task 7: false
Main: Task 8: false
Main: Task 9: false
pool-1-thread-1: 5050
Main: 已经完成多少个任务: 2
Main: Task 0: true
Main: Task 1: false
Main: Task 2: true
Main: Task 3: false
Main: Task 4: false
Main: Task 5: false
Main: Task 6: false
Main: Task 7: false
Main: Task 8: false
Main: Task 9: false
pool-1-thread-1: 55050
Main: 已经完成多少个任务: 3
Main: Task 0: true
Main: Task 1: false
Main: Task 2: true
Main: Task 3: false
Main: Task 4: false
Main: Task 5: true
Main: Task 6: false
Main: Task 7: false
Main: Task 8: false
Main: Task 9: false
pool-1-thread-2: 15050
Main: 已经完成多少个任务: 4
Main: Task 0: true
Main: Task 1: true
Main: Task 2: true
Main: Task 3: false
Main: Task 4: false
pool-1-thread-4: 35050
Main: Task 5: true
Main: Task 6: false
Main: Task 7: false
Main: Task 8: false
Main: Task 9: false
Main: 已经完成多少个任务: 5
Main: Task 0: true
Main: Task 1: true
Main: Task 2: true
Main: Task 3: true
Main: Task 4: false
Main: Task 5: true
Main: Task 6: false
Main: Task 7: false
Main: Task 8: false
Main: Task 9: false
pool-1-thread-1: 65050
Main: 已经完成多少个任务: 6
Main: Task 0: true
Main: Task 1: true
Main: Task 2: true
Main: Task 3: true
Main: Task 4: false
Main: Task 5: true
Main: Task 6: true
Main: Task 7: false
Main: Task 8: false
Main: Task 9: false
pool-1-thread-3: 45050
Main: 已经完成多少个任务: 7
Main: Task 0: true
Main: Task 1: true
Main: Task 2: true
Main: Task 3: true
Main: Task 4: true
Main: Task 5: true
Main: Task 6: true
Main: Task 7: false
Main: Task 8: false
Main: Task 9: false
pool-1-thread-4: 85050
Main: 已经完成多少个任务: 8
Main: Task 0: true
Main: Task 1: true
Main: Task 2: true
Main: Task 3: true
Main: Task 4: true
Main: Task 5: true
Main: Task 6: true
Main: Task 7: false
Main: Task 8: true
Main: Task 9: false
Main: 已经完成多少个任务: 8
Main: Task 0: true
Main: Task 1: true
Main: Task 2: true
Main: Task 3: true
Main: Task 4: true
Main: Task 5: true
Main: Task 6: true
Main: Task 7: false
Main: Task 8: true
Main: Task 9: false
Main: 已经完成多少个任务: 8
Main: Task 0: true
Main: Task 1: true
Main: Task 2: true
Main: Task 3: true
Main: Task 4: true
Main: Task 5: true
Main: Task 6: true
Main: Task 7: false
Main: Task 8: true
Main: Task 9: false
Main: 已经完成多少个任务: 8
Main: Task 0: true
Main: Task 1: true
Main: Task 2: true
Main: Task 3: true
Main: Task 4: true
Main: Task 5: true
Main: Task 6: true
Main: Task 7: false
Main: Task 8: true
Main: Task 9: false
Main: 已经完成多少个任务: 8
Main: Task 0: true
Main: Task 1: true
Main: Task 2: true
Main: Task 3: true
Main: Task 4: true
Main: Task 5: true
Main: Task 6: true
Main: Task 7: false
Main: Task 8: true
Main: Task 9: false
Main: 已经完成多少个任务: 8
Main: Task 0: true
Main: Task 1: true
Main: Task 2: true
Main: Task 3: true
Main: Task 4: true
Main: Task 5: true
Main: Task 6: true
Main: Task 7: false
Main: Task 8: true
Main: Task 9: false
Main: 已经完成多少个任务: 8
Main: Task 0: true
Main: Task 1: true
Main: Task 2: true
Main: Task 3: true
Main: Task 4: true
Main: Task 5: true
Main: Task 6: true
Main: Task 7: false
Main: Task 8: true
Main: Task 9: false
Main: 已经完成多少个任务: 8
Main: Task 0: true
Main: Task 1: true
Main: Task 2: true
Main: Task 3: true
Main: Task 4: true
Main: Task 5: true
Main: Task 6: true
Main: Task 7: false
Main: Task 8: true
Main: Task 9: false
Main: 已经完成多少个任务: 8
Main: Task 0: true
Main: Task 1: true
Main: Task 2: true
Main: Task 3: true
Main: Task 4: true
Main: Task 5: true
Main: Task 6: true
Main: Task 7: false
Main: Task 8: true
Main: Task 9: false
pool-1-thread-1: 95050
Main: 已经完成多少个任务: 9
Main: Task 0: true
Main: Task 1: true
Main: Task 2: true
Main: Task 3: true
Main: Task 4: true
Main: Task 5: true
Main: Task 6: true
Main: Task 7: false
Main: Task 8: true
Main: Task 9: true
Main: 已经完成多少个任务: 9
Main: Task 0: true
Main: Task 1: true
Main: Task 2: true
Main: Task 3: true
Main: Task 4: true
Main: Task 5: true
Main: Task 6: true
Main: Task 7: false
Main: Task 8: true
Main: Task 9: true
Main: 已经完成多少个任务: 9
Main: Task 0: true
Main: Task 1: true
Main: Task 2: true
Main: Task 3: true
Main: Task 4: true
Main: Task 5: true
Main: Task 6: true
Main: Task 7: false
Main: Task 8: true
Main: Task 9: true
pool-1-thread-2: 75050
1-1000的总和:500500
|
4 Java并发框架Fork-Join
笔记
- Java 7 提供另一种并行框架:分解、治理、合并(分治编程)
- 适合用于整体任务量不好确定的场合(最小任务可确定)
关键类:
-
ForkJoinPool
任务池
-
RecursiveAction
-
RecursiveTask
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
|
// SumTask.java
import java.math.BigInteger;
import java.util.concurrent.RecursiveTask;
//分任务求和
public class SumTask extends RecursiveTask<Long> {
private int start;
private int end;
public SumTask(int start, int end) {
this.start = start;
this.end = end;
}
public static final int threadhold = 5;
@Override
protected Long compute() {
Long sum = 0L;
// 如果任务小于等于阈值, 就直接执行
boolean canCompute = (end - start) <= threadhold;
if (canCompute) {
for (int i = start; i <= end; i++) {
sum = sum + i;
}
} else {
// 任务大于阈值, 分裂为2个任务
int middle = (start + end) / 2;
SumTask subTask1 = new SumTask(start, middle);
SumTask subTask2 = new SumTask(middle + 1, end);
invokeAll(subTask1, subTask2);
Long sum1 = subTask1.join();
Long sum2 = subTask2.join();
// 结果合并
sum = sum1 + sum2;
}
return sum;
}
}
// SumTest.java
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
//分任务求和
public class SumTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//创建执行线程池
ForkJoinPool pool = new ForkJoinPool();
//ForkJoinPool pool = new ForkJoinPool(4);
//创建任务
SumTask task = new SumTask(1, 10000000);
/提交任务
ForkJoinTask<Long> result = pool.submit(task);
//等待结果
do {
System.out.printf("Main: Thread Count: %d\n",pool.getActiveThreadCount());
System.out.printf("Main: Paralelism: %d\n",pool.getParallelism());
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
} while (!task.isDone());
//输出结果
System.out.println(result.get().toString());
}
}
|
5 Java并发数据结构
笔记
- 常用的数据结构是线程不安全的
ArrayList
, HashMap
, HashSet
非同步的
- 多个线程同时读写,可能会抛出异常或数据错误
- 传统
Vector
,Hashtable
等同步集合性能过差
- 并发数据结构:数据添加和删除
- 阻塞式集合:当集合为空或者满时,等待
- 非阻塞式集合:当集合为空或者满时,不等待,返回
null
或异常
5.1 List
Vector
同步安全,写多读少
ArrayList
不安全
Collections.synchronizedList(List list)
基于synchronized
,效率差
CopyOnWriteArrayList
读多写少,基于复制机制,非阻塞
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
|
package list;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
public class ListTest {
public static void main(String[] args) throws InterruptedException{
//线程不安全
List<String> unsafeList = new ArrayList<String>();
//线程安全
List<String> safeList1 = Collections.synchronizedList(new ArrayList<String>());
//线程安全
CopyOnWriteArrayList<String> safeList2 = new CopyOnWriteArrayList<String>();
ListThread t1 = new ListThread(unsafeList);
ListThread t2 = new ListThread(safeList1);
ListThread t3 = new ListThread(safeList2);
for(int i = 0; i < 10; i++){
Thread t = new Thread(t1, String.valueOf(i));
t.start();
}
for(int i = 0; i < 10; i++) {
Thread t = new Thread(t2, String.valueOf(i));
t.start();
}
for(int i = 0; i < 10; i++) {
Thread t = new Thread(t3, String.valueOf(i));
t.start();
}
//等待子线程执行完
Thread.sleep(2000);
System.out.println("listThread1.list.size() = " + t1.list.size());
System.out.println("listThread2.list.size() = " + t2.list.size());
System.out.println("listThread3.list.size() = " + t3.list.size());
//输出list中的值
System.out.println("unsafeList:");
for(String s : t1.list){
if(s == null){
System.out.print("null ");
}
else
{
System.out.print(s + " ");
}
}
System.out.println();
System.out.println("safeList1:");
for(String s : t2.list){
if(s == null){
System.out.print("null ");
}
else
{
System.out.print(s + " ");
}
}
System.out.println();
System.out.println("safeList2:");
for(String s : t3.list){
if(s == null){
System.out.print("null ");
}
else
{
System.out.print(s + " ");
}
}
}
}
class ListThread implements Runnable{
public List<String> list;
public ListThread(List<String> list){
this.list = list;
}
@Override
public void run() {
int i = 0;
while(i<10)
{
try {
Thread.sleep(10);
}catch (InterruptedException e){
e.printStackTrace();
}
//把当前线程名称加入list中
list.add(Thread.currentThread().getName());
i++;
}
}
}
/**
*
*
* listThread1.list.size() = 95
* listThread2.list.size() = 100
* listThread3.list.size() = 100
* unsafeList:
* 5 6 3 8 4 7 9 2 0 1 1 2 3 9 6 5 7 8 4 0 5 9 null null null 2 4 7 8 0 6 1 3 9 7 5 4 8 0 1 2 9 3 7 4 5 8 0 7 null null null 5 2 6 4 8 0 1 8 5 9 3 2 7 0 4 2 3 4 9 7 1 null null 5 0 1 9 2 3 5 8 4 7 0 7 5 9 3 8 0 2 1 4
* safeList1:
* 3 2 8 1 0 4 5 7 6 9 7 2 0 4 8 3 6 9 1 5 2 0 8 7 4 3 1 9 6 5 9 3 7 8 2 0 6 4 1 5 9 2 8 7 3 6 0 4 1 5 0 7 8 2 9 4 3 1 6 5 3 7 8 2 1 5 6 4 9 0 1 4 9 7 5 8 0 2 3 6 2 0 8 6 9 7 1 3 5 4 5 2 8 0 4 3 9 6 1 7
* safeList2:
* 6 0 4 3 2 9 5 8 7 1 7 8 6 9 4 5 3 2 0 1 2 8 7 4 0 9 5 6 1 3 0 9 4 2 1 3 6 7 8 5 0 6 8 9 1 7 5 2 4 3 8 0 7 4 9 3 5 6 1 2 6 8 9 3 5 2 1 7 0 4 4 0 8 5 7 1 2 6 3 9 3 5 6 8 2 9 7 1 0 4 2 0 4 9 3 8 6 5 7 1
*
*/
|
5.2 Set
HashSet
不安全
Collections.synchronizedSet(Set set)
基于synchronized
,效率差
CopyOnWriteArraySet
(基于CopyOnWriteArrayList实现) 读多写少,
非阻塞
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
|
package set;
import java.util.*;
import java.util.concurrent.CopyOnWriteArraySet;
public class SetTest{
public static void main(String[] args) throws InterruptedException{
//线程不安全
Set<String> unsafeSet = new HashSet<String>();
//线程安全
Set<String> safeSet1 = Collections.synchronizedSet(new HashSet<String>());
//线程安全
CopyOnWriteArraySet<String> safeSet2 = new CopyOnWriteArraySet<String>();
SetThread t1 = new SetThread(unsafeSet);
SetThread t2 = new SetThread(safeSet1);
SetThread t3 = new SetThread(safeSet2);
//unsafeSet的运行测试
for(int i = 0; i < 10; i++){
Thread t = new Thread(t1, String.valueOf(i));
t.start();
}
for(int i = 0; i < 10; i++) {
Thread t = new Thread(t2, String.valueOf(i));
t.start();
}
for(int i = 0; i < 10; i++) {
Thread t = new Thread(t3, String.valueOf(i));
t.start();
}
//等待子线程执行完
Thread.sleep(2000);
System.out.println("setThread1.set.size() = " + t1.set.size());
System.out.println("setThread2.set.size() = " + t2.set.size());
System.out.println("setThread3.set.size() = " + t3.set.size());
//输出set中的值
System.out.println("unsafeSet:");
for(String element:t1.set){
if(element == null){
System.out.print("null ");
}
else
{
System.out.print(element + " ");
}
}
System.out.println();
System.out.println("safeSet1:");
for(String element:t2.set){
if(element == null){
System.out.print("null ");
}
else
{
System.out.print(element + " ");
}
}
System.out.println();
System.out.println("safeSet2:");
for(String element:t3.set){
if(element == null){
System.out.print("null ");
}
else
{
System.out.print(element + " ");
}
}
}
}
class SetThread implements Runnable{
public Set<String> set;
public SetThread(Set<String> set){
this.set = set;
}
@Override
public void run() {
int i = 0;
while(i<10)
{
i++;
try {
Thread.sleep(10);
}catch (InterruptedException e){
e.printStackTrace();
}
//把当前线程名称加入list中
set.add(Thread.currentThread().getName() + i);
}
}
}
/**
*
*
* setThread1.set.size() = 97
* setThread2.set.size() = 100
* setThread3.set.size() = 100
* unsafeSet:
* 44 88 45 89 02 46 03 47 04 48 05 49 06 07 08 09 110 310 510 710 910 92 93 94 51 95 96 53 97 54 98 55 99 56 13 57 14 58 15 59 16 17 18 19 62 63 64 65 66 23 67 24 68 25 69 26 27 28 29 010 210 410 610 810 73 74 75 32 76 33 77 34 78 35 79 36 37 38 39 83 84 85 42 86 43 87
* safeSet1:
* 88 01 89 02 03 04 05 06 07 08 09 110 510 91 910 92 93 94 95 96 97 98 11 99 12 13 14 15 16 17 18 19 21 22 23 24 25 26 27 28 29 010 410 810 31 32 33 34 35 36 37 38 39 41 42 43 44 45 46 47 48 49 310 710 51 52 53 54 55 56 57 58 59 61 62 63 64 65 66 67 68 69 210 610 71 72 73 74 75 76 77 78 79 81 82 83 84 85 86 87
* safeSet2:
* 21 61 51 41 01 91 81 11 71 31 22 62 32 52 82 92 12 42 02 72 93 23 83 13 33 43 73 53 03 63 24 64 84 34 04 54 94 74 44 14 25 95 15 05 55 35 75 85 65 45 86 36 76 06 56 66 16 96 26 46 27 47 97 57 77 87 67 07 37 17 98 08 38 58 78 48 18 88 68 28 09 39 29 19 49 59 79 89 99 69 610 110 910 510 810 410 010 210 310 710
*
*
*
*/
|
5.3 Map
Hashtable
同步安全,写多读少
HashMap
不安全
Collections.synchronizedMap(Map map)
基于synchronized
,效率差
ConcurrentHashMap
读多写少,非阻塞
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
|
package map;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
public class MapTest{
public static void main(String[] args) throws InterruptedException{
//线程不安全
Map<Integer,String> unsafeMap = new HashMap<Integer,String>();
//线程安全
Map<Integer,String> safeMap1 = Collections.synchronizedMap(new HashMap<Integer,String>());
//线程安全
ConcurrentHashMap<Integer,String> safeMap2 = new ConcurrentHashMap<Integer,String>();
MapThread t1 = new MapThread(unsafeMap);
MapThread t2 = new MapThread(safeMap1);
MapThread t3 = new MapThread(safeMap2);
//unsafeMap的运行测试
for(int i = 0; i < 10; i++){
Thread t = new Thread(t1);
t.start();
}
for(int i = 0; i < 10; i++) {
Thread t = new Thread(t2);
t.start();
}
for(int i = 0; i < 10; i++) {
Thread t = new Thread(t3);
t.start();
}
//等待子线程执行完
Thread.sleep(2000);
System.out.println("mapThread1.map.size() = " + t1.map.size());
System.out.println("mapThread2.map.size() = " + t2.map.size());
System.out.println("mapThread3.map.size() = " + t3.map.size());
//输出set中的值
System.out.println("unsafeMap:");
Iterator iter = t1.map.entrySet().iterator();
while(iter.hasNext()) {
Map.Entry<Integer,String> entry = (Map.Entry<Integer,String>)iter.next();
// 获取key
System.out.print(entry.getKey() + ":");
// 获取value
System.out.print(entry.getValue() + " ");
}
System.out.println();
System.out.println("safeMap1:");
iter = t2.map.entrySet().iterator();
while(iter.hasNext()) {
Map.Entry<Integer,String> entry = (Map.Entry<Integer,String>)iter.next();
// 获取key
System.out.print(entry.getKey() + ":");
// 获取value
System.out.print(entry.getValue() + " ");
}
System.out.println();
System.out.println("safeMap2:");
iter = t3.map.entrySet().iterator();
while(iter.hasNext()) {
Map.Entry<Integer,String> entry = (Map.Entry<Integer,String>)iter.next();
// 获取key
System.out.print(entry.getKey() + ":");
// 获取value
System.out.print(entry.getValue() + " ");
}
System.out.println();
System.out.println("mapThread1.map.size() = " + t1.map.size());
System.out.println("mapThread2.map.size() = " + t2.map.size());
System.out.println("mapThread3.map.size() = " + t3.map.size());
}
}
class MapThread implements Runnable
{
public Map<Integer,String> map;
public MapThread(Map<Integer,String> map){
this.map = map;
}
@Override
public void run() {
int i=0;
while(i<100)
{
//把当前线程名称加入map中
map.put(i++,Thread.currentThread().getName());
try {
Thread.sleep(10);
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
}
/**
*
*
*
*
*
* mapThread1.map.size() = 123
* mapThread2.map.size() = 100
* mapThread3.map.size() = 100
* unsafeMap:
* 0:Thread-9 1:Thread-0 2:Thread-5 3:Thread-1 4:Thread-7 5:Thread-7 6:Thread-4 7:Thread-9 8:Thread-8 9:Thread-2 10:Thread-6 11:Thread-5 12:Thread-7 13:Thread-7 14:Thread-0 15:Thread-0 16:Thread-6 17:Thread-7 18:Thread-6 19:Thread-7 20:Thread-7 21:Thread-0 22:Thread-5 23:Thread-0 24:Thread-3 25:Thread-0 26:Thread-5 27:Thread-7 28:Thread-0 29:Thread-2 30:Thread-2 31:Thread-1 32:Thread-3 33:Thread-9 34:Thread-7 35:Thread-0 36:Thread-0 37:Thread-0 38:Thread-7 39:Thread-6 40:Thread-7 41:Thread-7 42:Thread-0 43:Thread-0 44:Thread-6 45:Thread-7 46:Thread-8 47:Thread-6 48:Thread-9 49:Thread-0 50:Thread-1 51:Thread-2 52:Thread-6 53:Thread-7 54:Thread-7 55:Thread-0 56:Thread-2 57:Thread-2 58:Thread-5 59:Thread-9 60:Thread-7 61:Thread-2 62:Thread-0 63:Thread-8 64:Thread-6 65:Thread-8 66:Thread-0 67:Thread-8 68:Thread-8 69:Thread-0 70:Thread-1 71:Thread-7 72:Thread-9 73:Thread-7 74:Thread-9 75:Thread-7 76:Thread-0 77:Thread-7 78:Thread-8 79:Thread-8 80:Thread-8 81:Thread-7 82:Thread-8 83:Thread-7 84:Thread-1 85:Thread-0 86:Thread-5 87:Thread-8 88:Thread-5 89:Thread-8 90:Thread-8 91:Thread-1 92:Thread-8 93:Thread-7 94:Thread-8 95:Thread-0 96:Thread-7 97:Thread-0 98:Thread-5 99:Thread-7
* safeMap1:
* 0:Thread-16 1:Thread-14 2:Thread-14 3:Thread-12 4:Thread-11 5:Thread-15 6:Thread-18 7:Thread-18 8:Thread-18 9:Thread-11 10:Thread-18 11:Thread-12 12:Thread-16 13:Thread-15 14:Thread-18 15:Thread-15 16:Thread-12 17:Thread-13 18:Thread-12 19:Thread-18 20:Thread-12 21:Thread-16 22:Thread-14 23:Thread-19 24:Thread-13 25:Thread-11 26:Thread-15 27:Thread-18 28:Thread-15 29:Thread-19 30:Thread-17 31:Thread-10 32:Thread-18 33:Thread-17 34:Thread-16 35:Thread-16 36:Thread-15 37:Thread-15 38:Thread-13 39:Thread-18 40:Thread-11 41:Thread-15 42:Thread-18 43:Thread-16 44:Thread-18 45:Thread-16 46:Thread-18 47:Thread-10 48:Thread-18 49:Thread-13 50:Thread-18 51:Thread-14 52:Thread-11 53:Thread-15 54:Thread-18 55:Thread-15 56:Thread-10 57:Thread-18 58:Thread-18 59:Thread-14 60:Thread-16 61:Thread-19 62:Thread-16 63:Thread-11 64:Thread-14 65:Thread-16 66:Thread-13 67:Thread-16 68:Thread-16 69:Thread-15 70:Thread-10 71:Thread-16 72:Thread-10 73:Thread-14 74:Thread-17 75:Thread-17 76:Thread-15 77:Thread-16 78:Thread-15 79:Thread-16 80:Thread-16 81:Thread-15 82:Thread-13 83:Thread-15 84:Thread-17 85:Thread-18 86:Thread-10 87:Thread-16 88:Thread-14 89:Thread-16 90:Thread-16 91:Thread-13 92:Thread-16 93:Thread-16 94:Thread-15 95:Thread-15 96:Thread-16 97:Thread-15 98:Thread-16 99:Thread-16
* safeMap2:
* 0:Thread-29 1:Thread-21 2:Thread-23 3:Thread-25 4:Thread-26 5:Thread-24 6:Thread-24 7:Thread-25 8:Thread-24 9:Thread-22 10:Thread-27 11:Thread-21 12:Thread-24 13:Thread-28 14:Thread-22 15:Thread-23 16:Thread-29 17:Thread-22 18:Thread-20 19:Thread-24 20:Thread-26 21:Thread-24 22:Thread-29 23:Thread-28 24:Thread-29 25:Thread-21 26:Thread-29 27:Thread-25 28:Thread-26 29:Thread-29 30:Thread-25 31:Thread-25 32:Thread-27 33:Thread-26 34:Thread-20 35:Thread-23 36:Thread-22 37:Thread-29 38:Thread-24 39:Thread-25 40:Thread-20 41:Thread-24 42:Thread-26 43:Thread-23 44:Thread-25 45:Thread-23 46:Thread-23 47:Thread-22 48:Thread-22 49:Thread-23 50:Thread-23 51:Thread-22 52:Thread-23 53:Thread-20 54:Thread-29 55:Thread-24 56:Thread-26 57:Thread-26 58:Thread-21 59:Thread-29 60:Thread-23 61:Thread-27 62:Thread-24 63:Thread-21 64:Thread-29 65:Thread-24 66:Thread-27 67:Thread-23 68:Thread-24 69:Thread-23 70:Thread-25 71:Thread-23 72:Thread-26 73:Thread-23 74:Thread-25 75:Thread-23 76:Thread-22 77:Thread-23 78:Thread-22 79:Thread-24 80:Thread-24 81:Thread-23 82:Thread-24 83:Thread-23 84:Thread-25 85:Thread-25 86:Thread-21 87:Thread-24 88:Thread-29 89:Thread-24 90:Thread-24 91:Thread-23 92:Thread-24 93:Thread-26 94:Thread-23 95:Thread-23 96:Thread-29 97:Thread-25 98:Thread-29 99:Thread-26
* mapThread1.map.size() = 123
* mapThread2.map.size() = 100
* mapThread3.map.size() = 100
*
*/
|
5.4 Queue & Deque (队列,JDK 1.5 提出)
ConcurrentLinkedQueue
非阻塞
ArrayBlockingQueue/LinkedBlockingQueue
阻塞
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
|
package queue;
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentLinkedDeque;
public class QueueTest {
public static void main(String[] args) throws InterruptedException{
//线程不安全
Deque<String> unsafeQueue = new ArrayDeque<String>();
//线程安全
ConcurrentLinkedDeque<String> safeQueue1 = new ConcurrentLinkedDeque<String>();
ArrayBlockingQueue<String> safeQueue2 = new ArrayBlockingQueue<String>(100);
QueueThread t1 = new QueueThread(unsafeQueue);
QueueThread t2 = new QueueThread(safeQueue1);
QueueThread t3 = new QueueThread(safeQueue2);
for(int i = 0; i < 10; i++){
Thread thread1 = new Thread(t1, String.valueOf(i));
thread1.start();
}
for(int i = 0; i < 10; i++) {
Thread thread2 = new Thread(t2, String.valueOf(i));
thread2.start();
}
for(int i = 0; i < 10; i++) {
Thread thread3 = new Thread(t3, String.valueOf(i));
thread3.start();
}
//等待子线程执行完
Thread.sleep(2000);
System.out.println("queueThread1.queue.size() = " + t1.queue.size());
System.out.println("queueThread2.queue.size() = " + t2.queue.size());
System.out.println("queueThread3.queue.size() = " + t3.queue.size());
//输出queue中的值
System.out.println("unsafeQueue:");
for(String s:t1.queue)
{
System.out.print(s + " ");
}
System.out.println();
System.out.println("safeQueue1:");
for(String s:t2.queue)
{
System.out.print(s + " ");
}
System.out.println();
System.out.println("safeQueue2:");
for(String s:t3.queue)
{
System.out.print(s + " ");
}
}
}
class QueueThread implements Runnable{
public Queue<String> queue;
public QueueThread(Queue<String> queue){
this.queue = queue;
}
@Override
public void run() {
int i = 0;
while(i<10)
{
i++;
try {
Thread.sleep(10);
}catch (InterruptedException e){
e.printStackTrace();
}
//把当前线程名称加入list中
queue.add(Thread.currentThread().getName());
}
}
}
/**
*
*
* queueThread1.queue.size() = 83
* queueThread2.queue.size() = 100
* queueThread3.queue.size() = 100
* unsafeQueue:
* 1 8 4 3 7 6 0 0 5 3 2 8 4 0 7 6 6 4 3 2 9 1 5 8 0 4 2 3 5 8 9 1 6 5 7 3 1 9 4 2 0 2 9 1 5 4 3 6 8 0 7 0 5 2 3 1 4 9 8 7 6 9 2 5 2 9 1 4 3 5 6 8 7 0 6 8 0 2 7 3 4 9 1
* safeQueue1:
* 4 8 1 7 2 9 0 6 3 5 3 4 0 9 5 1 7 6 2 8 8 2 9 1 7 0 4 3 6 5 2 4 1 8 7 3 0 9 6 5 4 0 7 3 8 9 1 2 5 6 8 0 7 3 2 1 9 4 6 5 4 7 0 8 6 3 9 1 2 5 3 8 7 4 0 5 2 6 1 9 7 0 2 8 9 1 4 3 6 5 3 8 5 6 0 4 1 9 7 2
* safeQueue2:
* 6 3 2 5 1 9 4 8 0 7 5 8 7 1 3 6 4 0 9 2 6 1 9 7 5 8 0 2 3 4 1 6 7 5 8 3 0 4 9 2 1 2 9 7 5 4 3 0 6 8 0 5 2 8 6 3 4 9 1 7 4 9 1 6 2 0 3 8 7 5 8 2 7 6 0 3 1 9 5 4 6 8 5 0 7 1 2 9 3 4 4 3 9 5 0 2 1 6 8 7
*
*
*
*/
|
6 Java并发协作控制
笔记
Thread/Executor/Fork-Join
synchronized
同步
- 限定只有一个线程才能进入关键区
- 简单粗暴,性能损失有点大
6.1 Lock
- Lock也可以实现同步的效果
- 实现更复杂的临界区结构
tryLock
方法可以预判锁是否空闲
- 允许分离读写的操作,多个读,一个写
- 性能更好
ReentrantLock
类,可重入的互斥锁
ReentrantReadWriteLock
类,可重入的读写锁
lock
和unlock
函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
|
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class LockExample {
private static final ReentrantLock queueLock = new ReentrantLock(); //可重入锁
private static final ReentrantReadWriteLock orderLock = new ReentrantReadWriteLock(); //可重入读写锁
/**
* 有家奶茶店,点单有时需要排队
* 假设想买奶茶的人如果看到需要排队,就决定不买
* 又假设奶茶店有老板和多名员工,记单方式比较原始,只有一个订单本
* 老板负责写新订单,员工不断地查看订单本得到信息来制作奶茶,在老板写新订单时员工不能看订单本
* 多个员工可同时看订单本,在员工看时老板不能写新订单
* @param args
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException {
buyMilkTea();
//handleOrder(); //需手动关闭
}
public void tryToBuyMilkTea() throws InterruptedException {
boolean flag = true;
while(flag)
{
if (queueLock.tryLock()) {
//queueLock.lock();
long thinkingTime = (long) (Math.random() * 500);
Thread.sleep(thinkingTime);
System.out.println(Thread.currentThread().getName() + ": 来一杯珍珠奶茶,不要珍珠");
flag = false;
queueLock.unlock();
} else {
//System.out.println(Thread.currentThread().getName() + ":" + queueLock.getQueueLength() + "人在排队");
System.out.println(Thread.currentThread().getName() + ": 再等等");
}
if(flag)
{
Thread.sleep(1000);
}
}
}
public void addOrder() throws InterruptedException {
//writeLock,写锁,排他的,只能一个线程拥有
orderLock.writeLock().lock();
long writingTime = (long) (Math.random() * 1000);
Thread.sleep(writingTime);
System.out.println("老板新加一笔订单");
orderLock.writeLock().unlock();
}
public void viewOrder() throws InterruptedException {
//readLock,读锁,可以多个线程共享
orderLock.readLock().lock();
long readingTime = (long) (Math.random() * 500);
Thread.sleep(readingTime);
System.out.println(Thread.currentThread().getName() + ": 查看订单本");
orderLock.readLock().unlock();
}
public static void buyMilkTea() throws InterruptedException {
LockExample lockExample = new LockExample();
int STUDENTS_CNT = 10;
Thread[] students = new Thread[STUDENTS_CNT];
for (int i = 0; i < STUDENTS_CNT; i++) {
students[i] = new Thread(new Runnable() {
@Override
public void run() {
try {
long walkingTime = (long) (Math.random() * 1000);
Thread.sleep(walkingTime);
lockExample.tryToBuyMilkTea();
} catch(InterruptedException e) {
System.out.println(e.getMessage());
}
}
}
);
students[i].start();
}
for (int i = 0; i < STUDENTS_CNT; i++)
students[i].join();
}
public static void handleOrder() throws InterruptedException {
LockExample lockExample = new LockExample();
Thread boss = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
lockExample.addOrder();
long waitingTime = (long) (Math.random() * 1000);
Thread.sleep(waitingTime);
} catch (InterruptedException e) {
System.out.println(e.getMessage());
}
}
}
});
boss.start();
int workerCnt = 3;
Thread[] workers = new Thread[workerCnt];
for (int i = 0; i < workerCnt; i++)
{
workers[i] = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
lockExample.viewOrder();
long workingTime = (long) (Math.random() * 5000);
Thread.sleep(workingTime);
} catch (InterruptedException e) {
System.out.println(e.getMessage());
}
}
}
});
workers[i].start();
}
}
}
/**
*
*
* Thread-2: 再等等
* Thread-0: 来一杯珍珠奶茶,不要珍珠
* Thread-3: 再等等
* Thread-6: 再等等
* Thread-9: 再等等
* Thread-8: 再等等
* Thread-1: 来一杯珍珠奶茶,不要珍珠
* Thread-5: 来一杯珍珠奶茶,不要珍珠
* Thread-7: 再等等
* Thread-4: 来一杯珍珠奶茶,不要珍珠
* Thread-3: 再等等
* Thread-6: 再等等
* Thread-2: 来一杯珍珠奶茶,不要珍珠
* Thread-8: 再等等
* Thread-9: 来一杯珍珠奶茶,不要珍珠
* Thread-7: 来一杯珍珠奶茶,不要珍珠
* Thread-6: 再等等
* Thread-8: 再等等
* Thread-3: 来一杯珍珠奶茶,不要珍珠
* Thread-8: 再等等
* Thread-6: 来一杯珍珠奶茶,不要珍珠
* Thread-8: 来一杯珍珠奶茶,不要珍珠
*
*/
|
6.2 Semaphore
- 信号量,由1965年Dijkstra提出的
- 信号量:本质上是一个计数器
- 计数器大于0,可以使用,等于0不能使用
- 可以设置多个并发量,例如限制10个访问
Semaphore
- 比
Lock
更进一步,可以控制多个同时访问关键区
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
|
import java.util.concurrent.Semaphore;
public class SemaphoreExample {
private final Semaphore placeSemaphore = new Semaphore(5);
public boolean parking() throws InterruptedException {
if (placeSemaphore.tryAcquire()) {
System.out.println(Thread.currentThread().getName() + ": 停车成功");
return true;
} else {
System.out.println(Thread.currentThread().getName() + ": 没有空位");
return false;
}
}
public void leaving() throws InterruptedException {
placeSemaphore.release();
System.out.println(Thread.currentThread().getName() + ": 开走");
}
/**
* 现有一地下车库,共有车位5个,由10辆车需要停放,每次停放时,去申请信号量
* @param args
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException {
int tryToParkCnt = 10;
SemaphoreExample semaphoreExample = new SemaphoreExample();
Thread[] parkers = new Thread[tryToParkCnt];
for (int i = 0; i < tryToParkCnt; i++) {
parkers[i] = new Thread(new Runnable() {
@Override
public void run() {
try {
long randomTime = (long) (Math.random() * 1000);
Thread.sleep(randomTime);
if (semaphoreExample.parking()) {
long parkingTime = (long) (Math.random() * 1200);
Thread.sleep(parkingTime);
semaphoreExample.leaving();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
parkers[i].start();
}
for (int i = 0; i < tryToParkCnt; i++) {
parkers[i].join();
}
}
}
/**
*
*
* Thread-5: 停车成功
* Thread-9: 停车成功
* Thread-9: 开走
* Thread-6: 停车成功
* Thread-8: 停车成功
* Thread-2: 停车成功
* Thread-4: 停车成功
* Thread-0: 没有空位
* Thread-4: 开走
* Thread-7: 停车成功
* Thread-2: 开走
* Thread-3: 停车成功
* Thread-6: 开走
* Thread-1: 停车成功
* Thread-5: 开走
* Thread-8: 开走
* Thread-3: 开走
* Thread-1: 开走
* Thread-7: 开走
*
*
*/
|
6.3 Latch
- 等待锁,是一个同步辅助类
- 用来同步执行任务的一个或者多个线程
- 不是用来保护临界区或者共享资源
CountDownLatch
countDown()
计数减1
await()
等待latch变成0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
import java.util.concurrent.CountDownLatch;
public class CountDownLatchExample {
/**
* 设想百米赛跑比赛 发令枪发出信号后选手开始跑,全部选手跑到终点后比赛结束
*
* @param args
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException {
int runnerCnt = 10;
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(runnerCnt);
for (int i = 0; i < runnerCnt; ++i) // create and start threads
new Thread(new Worker(startSignal, doneSignal)).start();
System.out.println("准备工作...");
System.out.println("准备工作就绪");
startSignal.countDown(); // let all threads proceed
System.out.println("比赛开始");
doneSignal.await(); // wait for all to finish
System.out.println("比赛结束");
}
static class Worker implements Runnable {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
public void run() {
try {
startSignal.await();
doWork();
doneSignal.countDown();
} catch (InterruptedException ex) {
} // return;
}
void doWork() {
System.out.println(Thread.currentThread().getName() + ": 跑完全程");
}
}
}
/**
*
* 准备工作...
* 准备工作就绪
* 比赛开始
* Thread-0: 跑完全程
* Thread-6: 跑完全程
* Thread-9: 跑完全程
* Thread-5: 跑完全程
* Thread-4: 跑完全程
* Thread-8: 跑完全程
* Thread-3: 跑完全程
* Thread-2: 跑完全程
* Thread-1: 跑完全程
* Thread-7: 跑完全程
* 比赛结束
*
*/
|
6.4 Barrier
- 集合点,也是一个同步辅助类
- 允许多个线程在某一个点上进行同步
CyclicBarrier
- 构造函数是需要同步的线程数量
await
等待其他线程,到达数量后,就放行
笔记
当在Barrier
上await
的线程数量达到预定的要求后,所有的await
的线程不再等待,全部解锁。并且,Barrier
将执行预定的回调动作(本程序中,回调动作就是CalculateFinalResult
)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
|
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierExample {
/**
* 假定有三行数,用三个线程分别计算每一行的和,最终计算总和
* @param args
*/
public static void main(String[] args) {
final int[][] numbers = new int[3][5];
final int[] results = new int[3];
int[] row1 = new int[]{1, 2, 3, 4, 5};
int[] row2 = new int[]{6, 7, 8, 9, 10};
int[] row3 = new int[]{11, 12, 13, 14, 15};
numbers[0] = row1;
numbers[1] = row2;
numbers[2] = row3;
CalculateFinalResult finalResultCalculator = new CalculateFinalResult(results);
CyclicBarrier barrier = new CyclicBarrier(3, finalResultCalculator);
//当有3个线程在barrier上await,就执行finalResultCalculator
for(int i = 0; i < 3; i++) {
CalculateEachRow rowCalculator = new CalculateEachRow(barrier, numbers, i, results);
new Thread(rowCalculator).start();
}
}
}
class CalculateEachRow implements Runnable {
final int[][] numbers;
final int rowNumber;
final int[] res;
final CyclicBarrier barrier;
CalculateEachRow(CyclicBarrier barrier, int[][] numbers, int rowNumber, int[] res) {
this.barrier = barrier;
this.numbers = numbers;
this.rowNumber = rowNumber;
this.res = res;
}
@Override
public void run() {
int[] row = numbers[rowNumber];
int sum = 0;
for (int data : row) {
sum += data;
res[rowNumber] = sum;
}
try {
System.out.println(Thread.currentThread().getName() + ": 计算第" + (rowNumber + 1) + "行结束,结果为: " + sum);
barrier.await(); //等待!只要超过3个(Barrier的构造参数),就放行。
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
class CalculateFinalResult implements Runnable {
final int[] eachRowRes;
int finalRes;
public int getFinalResult() {
return finalRes;
}
CalculateFinalResult(int[] eachRowRes) {
this.eachRowRes = eachRowRes;
}
@Override
public void run() {
int sum = 0;
for(int data : eachRowRes) {
sum += data;
}
finalRes = sum;
System.out.println("最终结果为: " + finalRes);
}
}
/**
*
* Thread-0: 计算第1行结束,结果为: 15
* Thread-2: 计算第3行结束,结果为: 65
* Thread-1: 计算第2行结束,结果为: 40
* 最终结果为: 120
*
*/
|
6.5 Phaser
- 允许执行并发多阶段任务,同步辅助类
- 在每一个阶段结束的位置对线程进行同步,当所有的线程都到达这步,再进行下一步
Phaser
arrive()
arriveAndAwaitAdvance()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
|
import java.util.concurrent.Phaser;
public class PhaserExample {
/**
* 假设举行考试,总共三道大题,每次下发一道题目,等所有学生完成后再进行下一道
*
* @param args
*/
public static void main(String[] args) {
int studentsCnt = 5;
Phaser phaser = new Phaser(studentsCnt);
for (int i = 0; i < studentsCnt; i++) {
new Thread(new Student(phaser)).start();
}
}
}
class Student implements Runnable {
private final Phaser phaser;
public Student(Phaser phaser) {
this.phaser = phaser;
}
@Override
public void run() {
try {
doTesting(1);
phaser.arriveAndAwaitAdvance(); //等到5个线程都到了,才放行
doTesting(2);
phaser.arriveAndAwaitAdvance();
doTesting(3);
phaser.arriveAndAwaitAdvance();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void doTesting(int i) throws InterruptedException {
String name = Thread.currentThread().getName();
System.out.println(name + "开始答第" + i + "题");
long thinkingTime = (long) (Math.random() * 1000);
Thread.sleep(thinkingTime);
System.out.println(name + "第" + i + "道题答题结束");
}
}
/**
*
* Thread-1开始答第1题
* Thread-3开始答第1题
* Thread-2开始答第1题
* Thread-4开始答第1题
* Thread-0开始答第1题
* Thread-4第1道题答题结束
* Thread-3第1道题答题结束
* Thread-0第1道题答题结束
* Thread-2第1道题答题结束
* Thread-1第1道题答题结束
* Thread-1开始答第2题
* Thread-4开始答第2题
* Thread-3开始答第2题
* Thread-2开始答第2题
* Thread-0开始答第2题
* Thread-2第2道题答题结束
* Thread-1第2道题答题结束
* Thread-4第2道题答题结束
* Thread-0第2道题答题结束
* Thread-3第2道题答题结束
* Thread-3开始答第3题
* Thread-0开始答第3题
* Thread-4开始答第3题
* Thread-1开始答第3题
* Thread-2开始答第3题
* Thread-0第3道题答题结束
* Thread-1第3道题答题结束
* Thread-4第3道题答题结束
* Thread-2第3道题答题结束
* Thread-3第3道题答题结束
*
*
*/
|
6.6 Exchanger
- 允许在并发线程中互相交换消息
- 允许在2个线程中定义同步点,当两个线程都到达同步点,它们交换数据结构
Exchanger
exchange()
, 线程双方互相交互数据
- 交换数据是双向的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
import java.util.Scanner;
import java.util.concurrent.Exchanger;
public class ExchangerExample {
/**
* 本例通过Exchanger实现学生成绩查询,简单线程间数据的交换
* @param args
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException {
Exchanger<String> exchanger = new Exchanger<String>();
BackgroundWorker worker = new BackgroundWorker(exchanger);
new Thread(worker).start();
Scanner scanner = new Scanner(System.in);
while(true) {
System.out.println("输入要查询的属性学生姓名:");
String input = scanner.nextLine().trim();
exchanger.exchange(input); //把用户输入传递给线程
String value = exchanger.exchange(null); //拿到线程反馈结果
if ("exit".equals(value)) {
break;
}
System.out.println("查询结果:" + value);
}
scanner.close();
}
}
class BackgroundWorker implements Runnable {
final Exchanger<String> exchanger;
BackgroundWorker(Exchanger<String> exchanger) {
this.exchanger = exchanger;
}
@Override
public void run() {
while (true) {
try {
String item = exchanger.exchange(null);
switch (item) {
case "zhangsan":
exchanger.exchange("90");
break;
case "lisi":
exchanger.exchange("80");
break;
case "wangwu":
exchanger.exchange("70");
break;
case "exit":
exchanger.exchange("exit");
return;
default:
exchanger.exchange("查无此人");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
*
*
* 输入要查询的属性学生姓名:
* lisi
* 查询结果:80
*
*
*/
|
7 Java定时任务执行
笔记
Thread/Executor/Fork-Join
多线程
- 定时执行
7.1 简单定时器机制
- 设置计划任务,也就是在指定的时间开始执行某一个任务。
TimerTask
封装任务
Timer
类 定时器
笔记
一个Timer
对象可以执行多个计划任务,但是这些任务是串行执行的。如果有一个任务执行很慢,将会影响后续的任务准点运行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
package timer;
import java.util.Calendar;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;
public class TimerTest {
public static void main(String[] args) throws InterruptedException {
MyTask task = new MyTask();
Timer timer = new Timer();
System.out.println("当前时间:"+new Date().toLocaleString());
//当前时间1秒后,每2秒执行一次
timer.schedule(task, 1000, 2000);
Thread.sleep(10000);
task.cancel(); //取消当前的任务
System.out.println("================================");
Calendar now = Calendar.getInstance();
now.set(Calendar.SECOND,now.get(Calendar.SECOND)+3);
Date runDate = now.getTime();
MyTask2 task2 = new MyTask2();
timer.scheduleAtFixedRate(task2,runDate,3000); //固定速率
Thread.sleep(20000);
timer.cancel(); //取消定时器
}
}
class MyTask extends TimerTask {
public void run() {
System.out.println("运行了!时间为:" + new Date());
}
}
class MyTask2 extends TimerTask {
public void run() {
System.out.println("运行了!时间为:" + new Date());
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
*
* 当前时间:2022-3-1 1:07:56
* 运行了!时间为:Tue Mar 01 01:07:57 CST 2022
* 运行了!时间为:Tue Mar 01 01:07:59 CST 2022
* 运行了!时间为:Tue Mar 01 01:08:01 CST 2022
* 运行了!时间为:Tue Mar 01 01:08:03 CST 2022
* 运行了!时间为:Tue Mar 01 01:08:05 CST 2022
* ================================
* 运行了!时间为:Tue Mar 01 01:08:09 CST 2022
* 运行了!时间为:Tue Mar 01 01:08:13 CST 2022
* 运行了!时间为:Tue Mar 01 01:08:17 CST 2022
* 运行了!时间为:Tue Mar 01 01:08:21 CST 2022
* 运行了!时间为:Tue Mar 01 01:08:25 CST 2022
*
*
*
*/
|
7.2 Executor +定时器机制
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
|
package schedule;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ScheduledExecutorTest {
public static void main(String[] a) throws Exception
{
//executeAtFixTime();
//executeFixedRate(); //3s
executeFixedDelay(); //4s
}
public static void executeAtFixTime() throws Exception {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
executor.schedule(
new MyTask(),
1,
TimeUnit.SECONDS);
Thread.sleep(20000);
executor.shutdown();
}
/**
* 周期任务 固定速率 是以上一个任务开始的时间计时,period时间过去后,检测上一个任务是否执行完毕,
* 如果上一个任务执行完毕,则当前任务立即执行,如果上一个任务没有执行完毕,则需要等上一个任务执行完毕后立即执行。
* @throws Exception
*/
public static void executeFixedRate() throws Exception {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
executor.scheduleAtFixedRate(
new MyTask(),
1,
3000,
TimeUnit.MILLISECONDS);
Thread.sleep(20000);
executor.shutdown();
}
/**
* 周期任务 固定延时 是以上一个任务结束时开始计时,period时间过去后,立即执行。
* @throws Exception
*/
public static void executeFixedDelay() throws Exception {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
executor.scheduleWithFixedDelay(
new MyTask(),
1,
3000,
TimeUnit.MILLISECONDS);
Thread.sleep(20000);
executor.shutdown();
}
}
class MyTask implements Runnable {
public void run() {
System.out.println("时间为:" + new Date());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//System.out.println("时间为:" + new Date());
}
}
|
7.3 Quartz
Quartz
是一个较为完善的任务调度框架
- 解决程序中
Timer
零散管理的问题
- 功能更加强大
Timer
执行周期任务,如果中间某一次有异常,整个任务终止执行
Quartz
执行周期任务,如果中间某一次有异常,不影响下次任务执行
- ……
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
|
// quartz/QuartzTest.java
package quartz;
import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.Trigger;
import org.quartz.impl.StdSchedulerFactory;
import static org.quartz.JobBuilder.newJob;
import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
import static org.quartz.TriggerBuilder.newTrigger;
public class QuartzTest {
public static void main(String[] args) {
try {
//创建scheduler
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
//定义一个Trigger
Trigger trigger = newTrigger().withIdentity("trigger1", "group1") //定义name/group
.startNow()//一旦加入scheduler,立即生效
.withSchedule(simpleSchedule() //使用SimpleTrigger
.withIntervalInSeconds(2) //每隔2秒执行一次
.repeatForever()) //一直执行
.build();
//定义一个JobDetail
JobDetail job = newJob(HelloJob.class) //定义Job类为HelloQuartz类
.withIdentity("job1", "group1") //定义name/group
.usingJobData("name", "quartz") //定义属性
.build();
//加入这个调度
scheduler.scheduleJob(job, trigger);
//启动
scheduler.start();
//运行一段时间后关闭
Thread.sleep(10000);
scheduler.shutdown(true);
} catch (Exception e) {
e.printStackTrace();
}
}
}
// quartz/HelloJob.java
package quartz;
import org.quartz.Job;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import java.util.Date;
public class HelloJob implements Job {
public void execute(JobExecutionContext context) throws JobExecutionException {
JobDetail detail = context.getJobDetail();
String name = detail.getJobDataMap().getString("name");
System.out.println("hello from " + name + " at " + new Date());
}
}
/**
*
*
* hello from name at Tue Mar 01 01:34:57 CST 2022
* hello from name at Tue Mar 01 01:34:59 CST 2022
* hello from name at Tue Mar 01 01:35:01 CST 2022
* hello from name at Tue Mar 01 01:35:03 CST 2022
* hello from name at Tue Mar 01 01:35:05 CST 2022
*
*
*/
|