forked from CyC2018/CS-Notes
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Java 併發.md
1647 lines (1219 loc) · 59.2 KB
/
Java 併發.md
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
<!-- GFM-TOC -->
* [一、線程狀態轉換](#一線程狀態轉換)
* [新建(New)](#新建new)
* [可運行(Runnable)](#可運行runnable)
* [阻塞(Blocked)](#阻塞blocked)
* [無限期等待(Waiting)](#無限期等待waiting)
* [限期等待(Timed Waiting)](#限期等待timed-waiting)
* [死亡(Terminated)](#死亡terminated)
* [二、使用線程](#二使用線程)
* [實現 Runnable 接口](#實現-runnable-接口)
* [實現 Callable 接口](#實現-callable-接口)
* [繼承 Thread 類](#繼承-thread-類)
* [實現接口 VS 繼承 Thread](#實現接口-vs-繼承-thread)
* [三、基礎線程機制](#三基礎線程機制)
* [Executor](#executor)
* [Daemon](#daemon)
* [sleep()](#sleep)
* [yield()](#yield)
* [四、中斷](#四中斷)
* [InterruptedException](#interruptedexception)
* [interrupted()](#interrupted)
* [Executor 的中斷操作](#executor-的中斷操作)
* [五、互斥同步](#五互斥同步)
* [synchronized](#synchronized)
* [ReentrantLock](#reentrantlock)
* [比較](#比較)
* [使用選擇](#使用選擇)
* [六、線程之間的協作](#六線程之間的協作)
* [join()](#join)
* [wait() notify() notifyAll()](#wait-notify-notifyall)
* [await() signal() signalAll()](#await-signal-signalall)
* [七、J.U.C - AQS](#七juc---aqs)
* [CountDownLatch](#countdownlatch)
* [CyclicBarrier](#cyclicbarrier)
* [Semaphore](#semaphore)
* [八、J.U.C - 其它組件](#八juc---其它組件)
* [FutureTask](#futuretask)
* [BlockingQueue](#blockingqueue)
* [ForkJoin](#forkjoin)
* [九、線程不安全示例](#九線程不安全示例)
* [十、Java 內存模型](#十java-內存模型)
* [主內存與工作內存](#主內存與工作內存)
* [內存間交互操作](#內存間交互操作)
* [內存模型三大特性](#內存模型三大特性)
* [先行發生原則](#先行發生原則)
* [十一、線程安全](#十一線程安全)
* [不可變](#不可變)
* [互斥同步](#互斥同步)
* [非阻塞同步](#非阻塞同步)
* [無同步方案](#無同步方案)
* [十二、鎖優化](#十二鎖優化)
* [自旋鎖](#自旋鎖)
* [鎖消除](#鎖消除)
* [鎖粗化](#鎖粗化)
* [輕量級鎖](#輕量級鎖)
* [偏向鎖](#偏向鎖)
* [十三、多線程開發良好的實踐](#十三多線程開發良好的實踐)
* [參考資料](#參考資料)
<!-- GFM-TOC -->
# 一、線程狀態轉換
<div align="center"> <img src="pics/adfb427d-3b21-40d7-a142-757f4ed73079.png" width="600px"> </div><br>
## 新建(New)
創建後尚未啟動。
## 可運行(Runnable)
可能正在運行,也可能正在等待 CPU 時間片。
包含了操作系統線程狀態中的 Running 和 Ready。
## 阻塞(Blocked)
等待獲取一個排它鎖,如果其線程釋放了鎖就會結束此狀態。
## 無限期等待(Waiting)
等待其它線程顯式地喚醒,否則不會被分配 CPU 時間片。
| 進入方法 | 退出方法 |
| --- | --- |
| 沒有設置 Timeout 參數的 Object.wait() 方法 | Object.notify() / Object.notifyAll() |
| 沒有設置 Timeout 參數的 Thread.join() 方法 | 被調用的線程執行完畢 |
| LockSupport.park() 方法 | LockSupport.unpark(Thread) |
## 限期等待(Timed Waiting)
無需等待其它線程顯式地喚醒,在一定時間之後會被系統自動喚醒。
調用 Thread.sleep() 方法使線程進入限期等待狀態時,常常用“使一個線程睡眠”進行描述。
調用 Object.wait() 方法使線程進入限期等待或者無限期等待時,常常用“掛起一個線程”進行描述。
睡眠和掛起是用來描述行為,而阻塞和等待用來描述狀態。
阻塞和等待的區別在於,阻塞是被動的,它是在等待獲取一個排它鎖。而等待是主動的,通過調用 Thread.sleep() 和 Object.wait() 等方法進入。
| 進入方法 | 退出方法 |
| --- | --- |
| Thread.sleep() 方法 | 時間結束 |
| 設置了 Timeout 參數的 Object.wait() 方法 | 時間結束 / Object.notify() / Object.notifyAll() |
| 設置了 Timeout 參數的 Thread.join() 方法 | 時間結束 / 被調用的線程執行完畢 |
| LockSupport.parkNanos() 方法 | LockSupport.unpark(Thread) |
| LockSupport.parkUntil() 方法 | LockSupport.unpark(Thread) |
## 死亡(Terminated)
可以是線程結束任務之後自己結束,或者產生了異常而結束。
# 二、使用線程
有三種使用線程的方法:
- 實現 Runnable 接口;
- 實現 Callable 接口;
- 繼承 Thread 類。
實現 Runnable 和 Callable 接口的類只能當做一個可以在線程中運行的任務,不是真正意義上的線程,因此最後還需要通過 Thread 來調用。可以說任務是通過線程驅動從而執行的。
## 實現 Runnable 接口
需要實現 run() 方法。
通過 Thread 調用 start() 方法來啟動線程。
```java
public class MyRunnable implements Runnable {
public void run() {
// ...
}
}
```
```java
public static void main(String[] args) {
MyRunnable instance = new MyRunnable();
Thread thread = new Thread(instance);
thread.start();
}
```
## 實現 Callable 接口
與 Runnable 相比,Callable 可以有返回值,返回值通過 FutureTask 進行封裝。
```java
public class MyCallable implements Callable<Integer> {
public Integer call() {
return 123;
}
}
```
```java
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyCallable mc = new MyCallable();
FutureTask<Integer> ft = new FutureTask<>(mc);
Thread thread = new Thread(ft);
thread.start();
System.out.println(ft.get());
}
```
## 繼承 Thread 類
同樣也是需要實現 run() 方法,因為 Thread 類也實現了 Runable 接口。
當調用 start() 方法啟動一個線程時,虛擬機會將該線程放入就緒隊列中等待被調度,當一個線程被調度時會執行該線程的 run() 方法。
```java
public class MyThread extends Thread {
public void run() {
// ...
}
}
```
```java
public static void main(String[] args) {
MyThread mt = new MyThread();
mt.start();
}
```
## 實現接口 VS 繼承 Thread
實現接口會更好一些,因為:
- Java 不支持多重繼承,因此繼承了 Thread 類就無法繼承其它類,但是可以實現多個接口;
- 類可能只要求可執行就行,繼承整個 Thread 類開銷過大。
# 三、基礎線程機制
## Executor
Executor 管理多個異步任務的執行,而無需程序員顯式地管理線程的生命週期。這裡的異步是指多個任務的執行互不干擾,不需要進行同步操作。
主要有三種 Executor:
- CachedThreadPool:一個任務創建一個線程;
- FixedThreadPool:所有任務只能使用固定大小的線程;
- SingleThreadExecutor:相當於大小為 1 的 FixedThreadPool。
```java
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
executorService.execute(new MyRunnable());
}
executorService.shutdown();
}
```
## Daemon
守護線程是程序運行時在後臺提供服務的線程,不屬於程序中不可或缺的部分。
當所有非守護線程結束時,程序也就終止,同時會殺死所有守護線程。
main() 屬於非守護線程。
在線程啟動之前使用 setDaemon() 方法可以將一個線程設置為守護線程。
```java
public static void main(String[] args) {
Thread thread = new Thread(new MyRunnable());
thread.setDaemon(true);
}
```
## sleep()
Thread.sleep(millisec) 方法會休眠當前正在執行的線程,millisec 單位為毫秒。
sleep() 可能會拋出 InterruptedException,因為異常不能跨線程傳播回 main() 中,因此必須在本地進行處理。線程中拋出的其它異常也同樣需要在本地進行處理。
```java
public void run() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
```
## yield()
對靜態方法 Thread.yield() 的調用聲明瞭當前線程已經完成了生命週期中最重要的部分,可以切換給其它線程來執行。該方法只是對線程調度器的一個建議,而且也只是建議具有相同優先級的其它線程可以運行。
```java
public void run() {
Thread.yield();
}
```
# 四、中斷
一個線程執行完畢之後會自動結束,如果在運行過程中發生異常也會提前結束。
## InterruptedException
通過調用一個線程的 interrupt() 來中斷該線程,如果該線程處於阻塞、限期等待或者無限期等待狀態,那麼就會拋出 InterruptedException,從而提前結束該線程。但是不能中斷 I/O 阻塞和 synchronized 鎖阻塞。
對於以下代碼,在 main() 中啟動一個線程之後再中斷它,由於線程中調用了 Thread.sleep() 方法,因此會拋出一個 InterruptedException,從而提前結束線程,不執行之後的語句。
```java
public class InterruptExample {
private static class MyThread1 extends Thread {
@Override
public void run() {
try {
Thread.sleep(2000);
System.out.println("Thread run");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
```
```java
public static void main(String[] args) throws InterruptedException {
Thread thread1 = new MyThread1();
thread1.start();
thread1.interrupt();
System.out.println("Main run");
}
```
```html
Main run
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at InterruptExample.lambda$main$0(InterruptExample.java:5)
at InterruptExample$$Lambda$1/713338599.run(Unknown Source)
at java.lang.Thread.run(Thread.java:745)
```
## interrupted()
如果一個線程的 run() 方法執行一個無限循環,並且沒有執行 sleep() 等會拋出 InterruptedException 的操作,那麼調用線程的 interrupt() 方法就無法使線程提前結束。
但是調用 interrupt() 方法會設置線程的中斷標記,此時調用 interrupted() 方法會返回 true。因此可以在循環體中使用 interrupted() 方法來判斷線程是否處於中斷狀態,從而提前結束線程。
```java
public class InterruptExample {
private static class MyThread2 extends Thread {
@Override
public void run() {
while (!interrupted()) {
// ..
}
System.out.println("Thread end");
}
}
}
```
```java
public static void main(String[] args) throws InterruptedException {
Thread thread2 = new MyThread2();
thread2.start();
thread2.interrupt();
}
```
```html
Thread end
```
## Executor 的中斷操作
調用 Executor 的 shutdown() 方法會等待線程都執行完畢之後再關閉,但是如果調用的是 shutdownNow() 方法,則相當於調用每個線程的 interrupt() 方法。
以下使用 Lambda 創建線程,相當於創建了一個匿名內部線程。
```java
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> {
try {
Thread.sleep(2000);
System.out.println("Thread run");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
executorService.shutdownNow();
System.out.println("Main run");
}
```
```html
Main run
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at ExecutorInterruptExample.lambda$main$0(ExecutorInterruptExample.java:9)
at ExecutorInterruptExample$$Lambda$1/1160460865.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
```
如果只想中斷 Executor 中的一個線程,可以通過使用 submit() 方法來提交一個線程,它會返回一個 Future<?> 對象,通過調用該對象的 cancel(true) 方法就可以中斷線程。
```java
Future<?> future = executorService.submit(() -> {
// ..
});
future.cancel(true);
```
# 五、互斥同步
Java 提供了兩種鎖機制來控制多個線程對共享資源的互斥訪問,第一個是 JVM 實現的 synchronized,而另一個是 JDK 實現的 ReentrantLock。
## synchronized
**1. 同步一個代碼塊**
```java
public void func() {
synchronized (this) {
// ...
}
}
```
它只作用於同一個對象,如果調用兩個對象上的同步代碼塊,就不會進行同步。
對於以下代碼,使用 ExecutorService 執行了兩個線程,由於調用的是同一個對象的同步代碼塊,因此這兩個線程會進行同步,當一個線程進入同步語句塊時,另一個線程就必須等待。
```java
public class SynchronizedExample {
public void func1() {
synchronized (this) {
for (int i = 0; i < 10; i++) {
System.out.print(i + " ");
}
}
}
}
```
```java
public static void main(String[] args) {
SynchronizedExample e1 = new SynchronizedExample();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> e1.func1());
executorService.execute(() -> e1.func1());
}
```
```html
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9
```
對於以下代碼,兩個線程調用了不同對象的同步代碼塊,因此這兩個線程就不需要同步。從輸出結果可以看出,兩個線程交叉執行。
```java
public static void main(String[] args) {
SynchronizedExample e1 = new SynchronizedExample();
SynchronizedExample e2 = new SynchronizedExample();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> e1.func1());
executorService.execute(() -> e2.func1());
}
```
```html
0 0 1 1 2 2 3 3 4 4 5 5 6 6 7 7 8 8 9 9
```
**2. 同步一個方法**
```java
public synchronized void func () {
// ...
}
```
它和同步代碼塊一樣,作用於同一個對象。
**3. 同步一個類**
```java
public void func() {
synchronized (SynchronizedExample.class) {
// ...
}
}
```
作用於整個類,也就是說兩個線程調用同一個類的不同對象上的這種同步語句,也會進行同步。
```java
public class SynchronizedExample {
public void func2() {
synchronized (SynchronizedExample.class) {
for (int i = 0; i < 10; i++) {
System.out.print(i + " ");
}
}
}
}
```
```java
public static void main(String[] args) {
SynchronizedExample e1 = new SynchronizedExample();
SynchronizedExample e2 = new SynchronizedExample();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> e1.func2());
executorService.execute(() -> e2.func2());
}
```
```html
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9
```
**4. 同步一個靜態方法**
```java
public synchronized static void fun() {
// ...
}
```
作用於整個類。
## ReentrantLock
ReentrantLock 是 java.util.concurrent(J.U.C)包中的鎖。
```java
public class LockExample {
private Lock lock = new ReentrantLock();
public void func() {
lock.lock();
try {
for (int i = 0; i < 10; i++) {
System.out.print(i + " ");
}
} finally {
lock.unlock(); // 確保釋放鎖,從而避免發生死鎖。
}
}
}
```
```java
public static void main(String[] args) {
LockExample lockExample = new LockExample();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> lockExample.func());
executorService.execute(() -> lockExample.func());
}
```
```html
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9
```
## 比較
**1. 鎖的實現**
synchronized 是 JVM 實現的,而 ReentrantLock 是 JDK 實現的。
**2. 性能**
新版本 Java 對 synchronized 進行了很多優化,例如自旋鎖等,synchronized 與 ReentrantLock 大致相同。
**3. 等待可中斷**
當持有鎖的線程長期不釋放鎖的時候,正在等待的線程可以選擇放棄等待,改為處理其他事情。
ReentrantLock 可中斷,而 synchronized 不行。
**4. 公平鎖**
公平鎖是指多個線程在等待同一個鎖時,必須按照申請鎖的時間順序來依次獲得鎖。
synchronized 中的鎖是非公平的,ReentrantLock 默認情況下也是非公平的,但是也可以是公平的。
**5. 鎖綁定多個條件**
一個 ReentrantLock 可以同時綁定多個 Condition 對象。
## 使用選擇
除非需要使用 ReentrantLock 的高級功能,否則優先使用 synchronized。這是因為 synchronized 是 JVM 實現的一種鎖機制,JVM 原生地支持它,而 ReentrantLock 不是所有的 JDK 版本都支持。並且使用 synchronized 不用擔心沒有釋放鎖而導致死鎖問題,因為 JVM 會確保鎖的釋放。
# 六、線程之間的協作
當多個線程可以一起工作去解決某個問題時,如果某些部分必須在其它部分之前完成,那麼就需要對線程進行協調。
## join()
在線程中調用另一個線程的 join() 方法,會將當前線程掛起,而不是忙等待,直到目標線程結束。
對於以下代碼,雖然 b 線程先啟動,但是因為在 b 線程中調用了 a 線程的 join() 方法,b 線程會等待 a 線程結束才繼續執行,因此最後能夠保證 a 線程的輸出先於 b 線程的輸出。
```java
public class JoinExample {
private class A extends Thread {
@Override
public void run() {
System.out.println("A");
}
}
private class B extends Thread {
private A a;
B(A a) {
this.a = a;
}
@Override
public void run() {
try {
a.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("B");
}
}
public void test() {
A a = new A();
B b = new B(a);
b.start();
a.start();
}
}
```
```java
public static void main(String[] args) {
JoinExample example = new JoinExample();
example.test();
}
```
```
A
B
```
## wait() notify() notifyAll()
調用 wait() 使得線程等待某個條件滿足,線程在等待時會被掛起,當其他線程的運行使得這個條件滿足時,其它線程會調用 notify() 或者 notifyAll() 來喚醒掛起的線程。
它們都屬於 Object 的一部分,而不屬於 Thread。
只能用在同步方法或者同步控制塊中使用,否則會在運行時拋出 IllegalMonitorStateException。
使用 wait() 掛起期間,線程會釋放鎖。這是因為,如果沒有釋放鎖,那麼其它線程就無法進入對象的同步方法或者同步控制塊中,那麼就無法執行 notify() 或者 notifyAll() 來喚醒掛起的線程,造成死鎖。
```java
public class WaitNotifyExample {
public synchronized void before() {
System.out.println("before");
notifyAll();
}
public synchronized void after() {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("after");
}
}
```
```java
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
WaitNotifyExample example = new WaitNotifyExample();
executorService.execute(() -> example.after());
executorService.execute(() -> example.before());
}
```
```html
before
after
```
**wait() 和 sleep() 的區別**
- wait() 是 Object 的方法,而 sleep() 是 Thread 的靜態方法;
- wait() 會釋放鎖,sleep() 不會。
## await() signal() signalAll()
java.util.concurrent 類庫中提供了 Condition 類來實現線程之間的協調,可以在 Condition 上調用 await() 方法使線程等待,其它線程調用 signal() 或 signalAll() 方法喚醒等待的線程。
相比於 wait() 這種等待方式,await() 可以指定等待的條件,因此更加靈活。
使用 Lock 來獲取一個 Condition 對象。
```java
public class AwaitSignalExample {
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void before() {
lock.lock();
try {
System.out.println("before");
condition.signalAll();
} finally {
lock.unlock();
}
}
public void after() {
lock.lock();
try {
condition.await();
System.out.println("after");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
```
```java
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
AwaitSignalExample example = new AwaitSignalExample();
executorService.execute(() -> example.after());
executorService.execute(() -> example.before());
}
```
```html
before
after
```
# 七、J.U.C - AQS
java.util.concurrent(J.U.C)大大提高了併發性能,AQS 被認為是 J.U.C 的核心。
## CountDownLatch
用來控制一個或者多個線程等待多個線程。
維護了一個計數器 cnt,每次調用 countDown() 方法會讓計數器的值減 1,減到 0 的時候,那些因為調用 await() 方法而在等待的線程就會被喚醒。
<div align="center"> <img src="pics/ba078291-791e-4378-b6d1-ece76c2f0b14.png" width="300px"> </div><br>
```java
public class CountdownLatchExample {
public static void main(String[] args) throws InterruptedException {
final int totalThread = 10;
CountDownLatch countDownLatch = new CountDownLatch(totalThread);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < totalThread; i++) {
executorService.execute(() -> {
System.out.print("run..");
countDownLatch.countDown();
});
}
countDownLatch.await();
System.out.println("end");
executorService.shutdown();
}
}
```
```html
run..run..run..run..run..run..run..run..run..run..end
```
## CyclicBarrier
用來控制多個線程互相等待,只有當多個線程都到達時,這些線程才會繼續執行。
和 CountdownLatch 相似,都是通過維護計數器來實現的。線程執行 await() 方法之後計數器會減 1,並進行等待,直到計數器為 0,所有調用 await() 方法而在等待的線程才能繼續執行。
CyclicBarrier 和 CountdownLatch 的一個區別是,CyclicBarrier 的計數器通過調用 reset() 方法可以循環使用,所以它才叫做循環屏障。
CyclicBarrier 有兩個構造函數,其中 parties 指示計數器的初始值,barrierAction 在所有線程都到達屏障的時候會執行一次。
```java
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}
```
<div align="center"> <img src="pics/f71af66b-0d54-4399-a44b-f47b58321984.png" width="300px"> </div><br>
```java
public class CyclicBarrierExample {
public static void main(String[] args) {
final int totalThread = 10;
CyclicBarrier cyclicBarrier = new CyclicBarrier(totalThread);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < totalThread; i++) {
executorService.execute(() -> {
System.out.print("before..");
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.print("after..");
});
}
executorService.shutdown();
}
}
```
```html
before..before..before..before..before..before..before..before..before..before..after..after..after..after..after..after..after..after..after..after..
```
## Semaphore
Semaphore 類似於操作系統中的信號量,可以控制對互斥資源的訪問線程數。
以下代碼模擬了對某個服務的併發請求,每次只能有 3 個客戶端同時訪問,請求總數為 10。
```java
public class SemaphoreExample {
public static void main(String[] args) {
final int clientCount = 3;
final int totalRequestCount = 10;
Semaphore semaphore = new Semaphore(clientCount);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < totalRequestCount; i++) {
executorService.execute(()->{
try {
semaphore.acquire();
System.out.print(semaphore.availablePermits() + " ");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
});
}
executorService.shutdown();
}
}
```
```html
2 1 2 2 2 2 2 1 2 2
```
# 八、J.U.C - 其它組件
## FutureTask
在介紹 Callable 時我們知道它可以有返回值,返回值通過 Future<V> 進行封裝。FutureTask 實現了 RunnableFuture 接口,該接口繼承自 Runnable 和 Future<V> 接口,這使得 FutureTask 既可以當做一個任務執行,也可以有返回值。
```java
public class FutureTask<V> implements RunnableFuture<V>
```
```java
public interface RunnableFuture<V> extends Runnable, Future<V>
```
FutureTask 可用於異步獲取執行結果或取消執行任務的場景。當一個計算任務需要執行很長時間,那麼就可以用 FutureTask 來封裝這個任務,主線程在完成自己的任務之後再去獲取結果。
```java
public class FutureTaskExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<Integer> futureTask = new FutureTask<Integer>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int result = 0;
for (int i = 0; i < 100; i++) {
Thread.sleep(10);
result += i;
}
return result;
}
});
Thread computeThread = new Thread(futureTask);
computeThread.start();
Thread otherThread = new Thread(() -> {
System.out.println("other task is running...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
otherThread.start();
System.out.println(futureTask.get());
}
}
```
```html
other task is running...
4950
```
## BlockingQueue
java.util.concurrent.BlockingQueue 接口有以下阻塞隊列的實現:
- **FIFO 隊列** :LinkedBlockingQueue、ArrayBlockingQueue(固定長度)
- **優先級隊列** :PriorityBlockingQueue
提供了阻塞的 take() 和 put() 方法:如果隊列為空 take() 將阻塞,直到隊列中有內容;如果隊列為滿 put() 將阻塞,直到隊列有空閒位置。
**使用 BlockingQueue 實現生產者消費者問題**
```java
public class ProducerConsumer {
private static BlockingQueue<String> queue = new ArrayBlockingQueue<>(5);
private static class Producer extends Thread {
@Override
public void run() {
try {
queue.put("product");
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.print("produce..");
}
}
private static class Consumer extends Thread {
@Override
public void run() {
try {
String product = queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.print("consume..");
}
}
}
```
```java
public static void main(String[] args) {
for (int i = 0; i < 2; i++) {
Producer producer = new Producer();
producer.start();
}
for (int i = 0; i < 5; i++) {
Consumer consumer = new Consumer();
consumer.start();
}
for (int i = 0; i < 3; i++) {
Producer producer = new Producer();
producer.start();
}
}
```
```html
produce..produce..consume..consume..produce..consume..produce..consume..produce..consume..
```
## ForkJoin
主要用於並行計算中,和 MapReduce 原理類似,都是把大的計算任務拆分成多個小任務並行計算。
```java
public class ForkJoinExample extends RecursiveTask<Integer> {
private final int threshold = 5;
private int first;
private int last;
public ForkJoinExample(int first, int last) {
this.first = first;
this.last = last;
}
@Override
protected Integer compute() {
int result = 0;
if (last - first <= threshold) {
// 任務足夠小則直接計算
for (int i = first; i <= last; i++) {
result += i;
}
} else {
// 拆分成小任務
int middle = first + (last - first) / 2;
ForkJoinExample leftTask = new ForkJoinExample(first, middle);
ForkJoinExample rightTask = new ForkJoinExample(middle + 1, last);
leftTask.fork();
rightTask.fork();