目录

[Java核心技术] Java并发编程

Java 核心技术读书笔记——Java并发编程

1 并行计算

  • 业务:任务多,数据量大
  • 串行 vs 并行
    • 串行编程简单,并行编程困难
    • 单个计算核频率下降,计算核数增多,整体性能变高
  • 并行困难(任务分配和执行过程高度耦合)
    • 如何控制粒度,切割任务
    • 如何分配任务给线程,监督线程执行过程
  • 并行模式
    • 主从模式 (Master-Slave)
    • Worker模式(Worker-Worker)
  • Java并发编程
    • Thread/Runnable/Thread组管理
    • Executor
    • Fork-Join框架

2 线程组管理

线程组ThreadGroup

  • 线程的集合
  • 树形结构,大线程组可以包括小线程组
  • 可以通过enumerate方法遍历组内的线程,执行操作
  • 能够有效管理多个线程,但是管理效率低
  • 任务分配和执行过程高度耦合
  • 重复创建线程、关闭线程操作,无法重用线程
笔记
  • activeCount 返回线程组中还处于active的线程数(估计数);
  • enumerate 将线程组中active的线程拷贝到数组中;
  • interrupt 对线程组中所有的线程发出interrupt信号;
  • list 打印线程组中所有的线程信息。
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
// Result.java
package threadgroup;

/**
 * 搜索结果类
 *
 */
public class Result {
  
  private String name;  
  public String getName() {
    return name;
  } 
  public void setName(String name) {
    this.name = name;
  }

}
// Searcher.java
package threadgroup;

import java.util.Date;
import java.util.Random;
import java.util.concurrent.TimeUnit;

public class Searcher implements Runnable {

  private Result result;
  
  public Searcher(Result result) {
    this.result=result;
  }

  @Override
  public void run() {
    String name=Thread.currentThread().getName();
    System.out.printf("Thread %s: 启动\n",name);
    try {
      doTask();
      result.setName(name);
    } catch (InterruptedException e) {
      System.out.printf("Thread %s: 被中断\n",name);
      return;
    }
    System.out.printf("Thread %s: 完成\n",name);
  }
  
  private void doTask() throws InterruptedException {
    Random random=new Random((new Date()).getTime());
    int value=(int)(random.nextDouble()*100);
    System.out.printf("Thread %s: %d\n",Thread.currentThread().getName(),value);
    TimeUnit.SECONDS.sleep(value);
  }
}

// Main.java
package threadgroup;

import java.util.concurrent.TimeUnit;

public class Main {

  public static void main(String[] args) {

    // 创建线程组
    ThreadGroup threadGroup = new ThreadGroup("Searcher");
    Result result=new Result();

    // 创建一个任务,10个线程完成
    Searcher searchTask=new Searcher(result);
    for (int i=0; i<10; i++) {
      Thread thread=new Thread(threadGroup, searchTask);
      thread.start();
      try {
        TimeUnit.SECONDS.sleep(1);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
    System.out.println("========华丽丽0=======");
    
    // 查看线程组消息
    System.out.printf("active 线程数量: %d\n",threadGroup.activeCount());
    System.out.printf("线程组信息明细\n");
    threadGroup.list();
    System.out.println("========华丽丽1=======");

    // 遍历线程组
    Thread[] threads=new Thread[threadGroup.activeCount()];
    threadGroup.enumerate(threads);
    for (int i=0; i<threadGroup.activeCount(); i++) {
      System.out.printf("Thread %s: %s\n",threads[i].getName(),threads[i].getState());
    }
    System.out.println("========华丽丽2=======");

    // Wait for the finalization of the Threadds
    waitFinish(threadGroup);
    
    // Interrupt all the Thread objects assigned to the ThreadGroup
    threadGroup.interrupt();
  }

  public static void waitFinish(ThreadGroup threadGroup) {
    while (threadGroup.activeCount()>9) {
      try {
        TimeUnit.SECONDS.sleep(1);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }
}
/**
 * 
 * 运行结果:
 * Thread Thread-0: 启动
 * Thread Thread-0: 83
 * Thread Thread-1: 启动
 * Thread Thread-1: 73
 * Thread Thread-2: 启动
 * Thread Thread-2: 1
 * Thread Thread-2: 完成
 * Thread Thread-3: 启动
 * Thread Thread-3: 92
 * Thread Thread-4: 启动
 * Thread Thread-4: 73
 * Thread Thread-5: 启动
 * Thread Thread-5: 64
 * Thread Thread-6: 启动
 * Thread Thread-6: 89
 * Thread Thread-7: 启动
 * Thread Thread-7: 80
 * Thread Thread-8: 启动
 * Thread Thread-8: 8
 * Thread Thread-9: 启动
 * Thread Thread-9: 99
 * ========华丽丽0=======
 * active 线程数量: 9
 * 线程组信息明细
 * java.lang.ThreadGroup[name=Searcher,maxpri=10]
 *     Thread[Thread-0,5,Searcher]
 *     Thread[Thread-1,5,Searcher]
 *     Thread[Thread-3,5,Searcher]
 *     Thread[Thread-4,5,Searcher]
 *     Thread[Thread-5,5,Searcher]
 *     Thread[Thread-6,5,Searcher]
 *     Thread[Thread-7,5,Searcher]
 *     Thread[Thread-8,5,Searcher]
 *     Thread[Thread-9,5,Searcher]
 * ========华丽丽1=======
 * Thread Thread-0: TIMED_WAITING
 * Thread Thread-1: TIMED_WAITING
 * Thread Thread-3: TIMED_WAITING
 * Thread Thread-4: TIMED_WAITING
 * Thread Thread-5: TIMED_WAITING
 * Thread Thread-6: TIMED_WAITING
 * Thread Thread-7: TIMED_WAITING
 * Thread Thread-8: TIMED_WAITING
 * Thread Thread-9: TIMED_WAITING
 * ========华丽丽2=======
 * Thread Thread-7: 被中断
 * Thread Thread-5: 被中断
 * Thread Thread-1: 被中断
 * Thread Thread-9: 被中断
 * Thread Thread-3: 被中断
 * Thread Thread-8: 被中断
 * Thread Thread-0: 被中断
 * Thread Thread-6: 被中断
 * Thread Thread-4: 被中断
 */

3 Java并发框架Executor

3.1 Executor FrameWork

JDK 5 开始提供Executor FrameWork (java.util.concurrent.*)

  • 分离任务的创建和执行者的创建
  • 线程重复利用(new线程代价很大)

3.2理解共享线程池的概念

  • 预设好的多个Thread,可弹性增加
  • 多次执行很多很小的任务
  • 任务创建和执行过程解耦
  • 程序员无需关心线程池执行任务过程

主要类:ExecutorService ThreadPoolExecutorFuture

  • Executors.newCachedThreadPool/newFixedThreadPool 创建线程池
  • ExecutorService 线程池服务
  • Callable 具体的逻辑对象(线程类)
笔记
  • CallableRunnable是等价的,可以用来执行一个任务
  • Runnablerun方法没有返回值,而Callablecall方法可以有返回值
  • Future 返回结果
  • 实例1,各方法演示:
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    
    // executor/example1/Server.java
    package executor.example1;
    
    import java.util.concurrent.Executors;
    import java.util.concurrent.ThreadPoolExecutor;
    
    /**
     * 执行服务器
     *
     */
    public class Server {
    
      //线程池
      private ThreadPoolExecutor executor;
    
      public Server(){
        // 创建一个默认线程池
        executor=(ThreadPoolExecutor)Executors.newCachedThreadPool();
        // 创建一个固定个数的线程池
        //executor=(ThreadPoolExecutor)Executors.newFixedThreadPool(5);
      }
    
      //向线程池提交任务
      public void submitTask(Task task){
        System.out.printf("Server: A new task has arrived\n");
        executor.execute(task); //执行  无返回值
    
        System.out.printf("Server: Pool Size: %d\n",executor.getPoolSize());
        System.out.printf("Server: Active Count: %d\n",executor.getActiveCount());
        System.out.printf("Server: Completed Tasks: %d\n",executor.getCompletedTaskCount());
      }
    
      // 关闭整个线程池 
      public void endServer() {
        executor.shutdown();
      }
    }
    
    // executor/example1/Task.java
    package executor.example1;
    
    import java.util.Date;
    import java.util.concurrent.TimeUnit;
    /**
     * Task 任务类
     * @author Tom
     *
     */
    public class Task implements Runnable {
    
      private String name;
    
      public Task(String name){
        this.name=name;
      }
    
      //休眠,进行两次输出
      public void run() {
        try {
          Long duration=(long)(Math.random()*1000);
          System.out.printf("%s: Task %s: Doing a task during %d seconds\n",Thread.currentThread().getName(),name,duration);
          Thread.sleep(duration);     
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
    
        System.out.printf("%s: Task %s: Finished on: %s\n",Thread.currentThread().getName(),name,new Date());
      }
    
    }
    
    // executor/example1/Main.java
    package executor.example1;
    
    public class Main {
    
      public static void main(String[] args) throws InterruptedException {
        // 创建一个执行服务器
        Server server=new Server();
    
        // 创建20个任务,并发给执行器,等待完成
        for (int i=0; i<20; i++){
          Task task=new Task("Task "+i);
          Thread.sleep(10);
          //将任务提交给线程池去执行
          server.submitTask(task);
        }
        // 关闭整个线程池   
        server.endServer();
      }
    
    • 运行结果
        1
        2
        3
        4
        5
        6
        7
        8
        9
       10
       11
       12
       13
       14
       15
       16
       17
       18
       19
       20
       21
       22
       23
       24
       25
       26
       27
       28
       29
       30
       31
       32
       33
       34
       35
       36
       37
       38
       39
       40
       41
       42
       43
       44
       45
       46
       47
       48
       49
       50
       51
       52
       53
       54
       55
       56
       57
       58
       59
       60
       61
       62
       63
       64
       65
       66
       67
       68
       69
       70
       71
       72
       73
       74
       75
       76
       77
       78
       79
       80
       81
       82
       83
       84
       85
       86
       87
       88
       89
       90
       91
       92
       93
       94
       95
       96
       97
       98
       99
      100
      101
      102
      103
      104
      105
      106
      107
      108
      109
      110
      111
      112
      113
      114
      115
      116
      117
      118
      119
      120
      
      Server: A new task has arrived
      Server: Pool Size: 1
      pool-1-thread-1: Task Task 0: Doing a task during 148 seconds
      Server: Active Count: 1
      Server: Completed Tasks: 0
      Server: A new task has arrived
      Server: Pool Size: 2
      Server: Active Count: 2
      pool-1-thread-2: Task Task 1: Doing a task during 988 seconds
      Server: Completed Tasks: 0
      Server: A new task has arrived
      Server: Pool Size: 3
      Server: Active Count: 3
      pool-1-thread-3: Task Task 2: Doing a task during 769 seconds
      Server: Completed Tasks: 0
      Server: A new task has arrived
      Server: Pool Size: 4
      Server: Active Count: 4
      Server: Completed Tasks: 0
      pool-1-thread-4: Task Task 3: Doing a task during 698 seconds
      Server: A new task has arrived
      Server: Pool Size: 5
      Server: Active Count: 5
      pool-1-thread-5: Task Task 4: Doing a task during 727 seconds
      Server: Completed Tasks: 0
      Server: A new task has arrived
      Server: Pool Size: 6
      Server: Active Count: 6
      pool-1-thread-6: Task Task 5: Doing a task during 232 seconds
      Server: Completed Tasks: 0
      Server: A new task has arrived
      Server: Pool Size: 7
      Server: Active Count: 7
      pool-1-thread-7: Task Task 6: Doing a task during 509 seconds
      Server: Completed Tasks: 0
      Server: A new task has arrived
      Server: Pool Size: 8
      pool-1-thread-8: Task Task 7: Doing a task during 21 seconds
      Server: Active Count: 8
      Server: Completed Tasks: 0
      Server: A new task has arrived
      Server: Pool Size: 9
      Server: Active Count: 9
      pool-1-thread-9: Task Task 8: Doing a task during 683 seconds
      Server: Completed Tasks: 0
      Server: A new task has arrived
      Server: Pool Size: 10
      Server: Active Count: 10
      Server: Completed Tasks: 0
      pool-1-thread-10: Task Task 9: Doing a task during 616 seconds
      pool-1-thread-8: Task Task 7: Finished on: Mon Feb 28 21:37:57 CST 2022
      Server: A new task has arrived
      Server: Pool Size: 10
      pool-1-thread-8: Task Task 10: Doing a task during 403 seconds
      Server: Active Count: 10
      Server: Completed Tasks: 1
      Server: A new task has arrived
      Server: Pool Size: 11
      Server: Active Count: 11
      pool-1-thread-11: Task Task 11: Doing a task during 115 seconds
      Server: Completed Tasks: 1
      Server: A new task has arrived
      Server: Pool Size: 12
      Server: Active Count: 12
      pool-1-thread-12: Task Task 12: Doing a task during 966 seconds
      Server: Completed Tasks: 1
      Server: A new task has arrived
      Server: Pool Size: 13
      Server: Active Count: 13
      Server: Completed Tasks: 1
      pool-1-thread-13: Task Task 13: Doing a task during 224 seconds
      Server: A new task has arrived
      pool-1-thread-1: Task Task 0: Finished on: Mon Feb 28 21:37:58 CST 2022
      Server: Pool Size: 14
      pool-1-thread-14: Task Task 14: Doing a task during 799 seconds
      Server: Active Count: 13
      Server: Completed Tasks: 2
      Server: A new task has arrived
      Server: Pool Size: 14
      pool-1-thread-1: Task Task 15: Doing a task during 509 seconds
      Server: Active Count: 14
      Server: Completed Tasks: 2
      Server: A new task has arrived
      Server: Pool Size: 15
      Server: Active Count: 15
      pool-1-thread-15: Task Task 16: Doing a task during 780 seconds
      Server: Completed Tasks: 2
      Server: A new task has arrived
      Server: Pool Size: 16
      Server: Active Count: 16
      Server: Completed Tasks: 2
      pool-1-thread-16: Task Task 17: Doing a task during 846 seconds
      Server: A new task has arrived
      Server: Pool Size: 17
      Server: Active Count: 17
      Server: Completed Tasks: 2
      pool-1-thread-17: Task Task 18: Doing a task during 458 seconds
      Server: A new task has arrived
      Server: Pool Size: 18
      Server: Active Count: 18
      pool-1-thread-18: Task Task 19: Doing a task during 759 seconds
      Server: Completed Tasks: 2
      pool-1-thread-11: Task Task 11: Finished on: Mon Feb 28 21:37:58 CST 2022
      pool-1-thread-6: Task Task 5: Finished on: Mon Feb 28 21:37:58 CST 2022
      pool-1-thread-13: Task Task 13: Finished on: Mon Feb 28 21:37:58 CST 2022
      pool-1-thread-8: Task Task 10: Finished on: Mon Feb 28 21:37:58 CST 2022
      pool-1-thread-7: Task Task 6: Finished on: Mon Feb 28 21:37:58 CST 2022
      pool-1-thread-17: Task Task 18: Finished on: Mon Feb 28 21:37:58 CST 2022
      pool-1-thread-1: Task Task 15: Finished on: Mon Feb 28 21:37:58 CST 2022
      pool-1-thread-10: Task Task 9: Finished on: Mon Feb 28 21:37:58 CST 2022
      pool-1-thread-4: Task Task 3: Finished on: Mon Feb 28 21:37:58 CST 2022
      pool-1-thread-9: Task Task 8: Finished on: Mon Feb 28 21:37:58 CST 2022
      pool-1-thread-5: Task Task 4: Finished on: Mon Feb 28 21:37:58 CST 2022
      pool-1-thread-3: Task Task 2: Finished on: Mon Feb 28 21:37:58 CST 2022
      pool-1-thread-14: Task Task 14: Finished on: Mon Feb 28 21:37:58 CST 2022
      pool-1-thread-15: Task Task 16: Finished on: Mon Feb 28 21:37:58 CST 2022
      pool-1-thread-18: Task Task 19: Finished on: Mon Feb 28 21:37:58 CST 2022
      pool-1-thread-2: Task Task 1: Finished on: Mon Feb 28 21:37:58 CST 2022
      pool-1-thread-16: Task Task 17: Finished on: Mon Feb 28 21:37:58 CST 2022
      pool-1-thread-12: Task Task 12: Finished on: Mon Feb 28 21:37:58 CST 2022
      
  • 实例2,利用多线程统计1-1000总和,分成10个任务计算。
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    
    package executor.example2;
    
    import java.util.Random;
    import java.util.concurrent.Callable;
    
    public class SumTask implements Callable<Integer> {
      //定义每个线程计算的区间
      private int startNumber;
      private int endNumber;
    
      public SumTask(int startNumber, int endNumber){
        this.startNumber=startNumber;
        this.endNumber=endNumber;
      }
    
      @Override
      public Integer call() throws Exception {
        int sum = 0;
        for(int i=startNumber; i<=endNumber; i++)
        {
          sum = sum + i;
        }
    
        Thread.sleep(new Random().nextInt(1000));
    
        System.out.printf("%s: %d\n",Thread.currentThread().getName(),sum);
        return sum;
      }
    }
    
    package executor.example2;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.ThreadPoolExecutor;
    
    
    public class SumTest {
    
      public static void main(String[] args) {
    
        // 执行线程池
        ThreadPoolExecutor executor=(ThreadPoolExecutor)Executors.newFixedThreadPool(4);
    
        List<Future<Integer>> resultList=new ArrayList<>();
    
        //统计1-1000总和,分成10个任务计算,提交任务
        for (int i=0; i<10; i++){
          SumTask calculator=new SumTask(i*100+1, (i+1)*100);
          Future<Integer> result=executor.submit(calculator);
          resultList.add(result);
        }
    
        // 每隔50毫秒,轮询等待10个任务结束
        do {
          System.out.printf("Main: 已经完成多少个任务: %d\n",executor.getCompletedTaskCount());
          for (int i=0; i<resultList.size(); i++) {
            Future<Integer> result=resultList.get(i);
            System.out.printf("Main: Task %d: %s\n",i,result.isDone());
          }
          try {
            Thread.sleep(50);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        } while (executor.getCompletedTaskCount()<resultList.size());
    
        // 所有任务都已经结束了,综合计算结果    
        int total = 0;
        for (int i=0; i<resultList.size(); i++) {
          Future<Integer> result=resultList.get(i);
          Integer sum=null;
          try {
            sum=result.get();
            total = total + sum;
          } catch (InterruptedException e) {
            e.printStackTrace();
          } catch (ExecutionException e) {
            e.printStackTrace();
          }
        }
        System.out.printf("1-1000的总和:" + total);
    
        // 关闭线程池
        executor.shutdown();
      }
    }
    
    • 运行结果
        1
        2
        3
        4
        5
        6
        7
        8
        9
       10
       11
       12
       13
       14
       15
       16
       17
       18
       19
       20
       21
       22
       23
       24
       25
       26
       27
       28
       29
       30
       31
       32
       33
       34
       35
       36
       37
       38
       39
       40
       41
       42
       43
       44
       45
       46
       47
       48
       49
       50
       51
       52
       53
       54
       55
       56
       57
       58
       59
       60
       61
       62
       63
       64
       65
       66
       67
       68
       69
       70
       71
       72
       73
       74
       75
       76
       77
       78
       79
       80
       81
       82
       83
       84
       85
       86
       87
       88
       89
       90
       91
       92
       93
       94
       95
       96
       97
       98
       99
      100
      101
      102
      103
      104
      105
      106
      107
      108
      109
      110
      111
      112
      113
      114
      115
      116
      117
      118
      119
      120
      121
      122
      123
      124
      125
      126
      127
      128
      129
      130
      131
      132
      133
      134
      135
      136
      137
      138
      139
      140
      141
      142
      143
      144
      145
      146
      147
      148
      149
      150
      151
      152
      153
      154
      155
      156
      157
      158
      159
      160
      161
      162
      163
      164
      165
      166
      167
      168
      169
      170
      171
      172
      173
      174
      175
      176
      177
      178
      179
      180
      181
      182
      183
      184
      185
      186
      187
      188
      189
      190
      191
      192
      193
      194
      195
      196
      197
      198
      199
      200
      201
      202
      203
      204
      205
      206
      207
      208
      209
      210
      211
      212
      213
      214
      215
      216
      217
      218
      219
      220
      221
      222
      223
      224
      225
      226
      227
      228
      229
      230
      231
      232
      233
      234
      235
      236
      237
      238
      239
      240
      241
      242
      243
      244
      245
      246
      247
      248
      249
      250
      251
      252
      253
      254
      255
      256
      257
      258
      259
      260
      261
      262
      263
      264
      265
      266
      267
      268
      269
      270
      271
      272
      273
      274
      275
      276
      277
      278
      279
      280
      281
      282
      283
      284
      285
      286
      287
      288
      289
      290
      291
      292
      293
      294
      295
      296
      297
      298
      299
      300
      301
      302
      303
      304
      305
      306
      307
      308
      309
      310
      311
      312
      313
      314
      315
      316
      317
      318
      319
      320
      321
      322
      323
      324
      325
      326
      327
      328
      329
      330
      331
      332
      333
      334
      335
      336
      337
      338
      339
      340
      341
      342
      343
      344
      345
      346
      347
      348
      349
      350
      351
      352
      353
      354
      355
      356
      357
      358
      359
      360
      361
      362
      363
      364
      365
      366
      367
      368
      369
      370
      371
      372
      373
      374
      
      Main: 已经完成多少个任务: 0
      Main: Task 0: false
      Main: Task 1: false
      Main: Task 2: false
      Main: Task 3: false
      Main: Task 4: false
      Main: Task 5: false
      Main: Task 6: false
      Main: Task 7: false
      Main: Task 8: false
      Main: Task 9: false
      Main: 已经完成多少个任务: 0
      Main: Task 0: false
      Main: Task 1: false
      Main: Task 2: false
      Main: Task 3: false
      Main: Task 4: false
      Main: Task 5: false
      Main: Task 6: false
      Main: Task 7: false
      Main: Task 8: false
      Main: Task 9: false
      Main: 已经完成多少个任务: 0
      Main: Task 0: false
      Main: Task 1: false
      Main: Task 2: false
      Main: Task 3: false
      Main: Task 4: false
      Main: Task 5: false
      Main: Task 6: false
      Main: Task 7: false
      Main: Task 8: false
      Main: Task 9: false
      Main: 已经完成多少个任务: 0
      Main: Task 0: false
      Main: Task 1: false
      Main: Task 2: false
      Main: Task 3: false
      Main: Task 4: false
      Main: Task 5: false
      Main: Task 6: false
      Main: Task 7: false
      Main: Task 8: false
      Main: Task 9: false
      Main: 已经完成多少个任务: 0
      Main: Task 0: false
      Main: Task 1: false
      Main: Task 2: false
      Main: Task 3: false
      Main: Task 4: false
      Main: Task 5: false
      Main: Task 6: false
      Main: Task 7: false
      Main: Task 8: false
      Main: Task 9: false
      Main: 已经完成多少个任务: 0
      Main: Task 0: false
      Main: Task 1: false
      Main: Task 2: false
      Main: Task 3: false
      Main: Task 4: false
      Main: Task 5: false
      Main: Task 6: false
      Main: Task 7: false
      Main: Task 8: false
      Main: Task 9: false
      Main: 已经完成多少个任务: 0
      Main: Task 0: false
      Main: Task 1: false
      Main: Task 2: false
      Main: Task 3: false
      Main: Task 4: false
      Main: Task 5: false
      Main: Task 6: false
      Main: Task 7: false
      Main: Task 8: false
      Main: Task 9: false
      Main: 已经完成多少个任务: 0
      Main: Task 0: false
      Main: Task 1: false
      Main: Task 2: false
      Main: Task 3: false
      Main: Task 4: false
      Main: Task 5: false
      Main: Task 6: false
      Main: Task 7: false
      Main: Task 8: false
      Main: Task 9: false
      Main: 已经完成多少个任务: 0
      Main: Task 0: false
      Main: Task 1: false
      Main: Task 2: false
      Main: Task 3: false
      Main: Task 4: false
      Main: Task 5: false
      Main: Task 6: false
      Main: Task 7: false
      Main: Task 8: false
      Main: Task 9: false
      Main: 已经完成多少个任务: 0
      Main: Task 0: false
      Main: Task 1: false
      Main: Task 2: false
      Main: Task 3: false
      Main: Task 4: false
      Main: Task 5: false
      Main: Task 6: false
      Main: Task 7: false
      Main: Task 8: false
      Main: Task 9: false
      Main: 已经完成多少个任务: 0
      Main: Task 0: false
      Main: Task 1: false
      Main: Task 2: false
      Main: Task 3: false
      Main: Task 4: false
      Main: Task 5: false
      Main: Task 6: false
      Main: Task 7: false
      Main: Task 8: false
      Main: Task 9: false
      pool-1-thread-3: 25050
      Main: 已经完成多少个任务: 1
      Main: Task 0: false
      Main: Task 1: false
      Main: Task 2: true
      Main: Task 3: false
      Main: Task 4: false
      Main: Task 5: false
      Main: Task 6: false
      Main: Task 7: false
      Main: Task 8: false
      Main: Task 9: false
      Main: 已经完成多少个任务: 1
      Main: Task 0: false
      Main: Task 1: false
      Main: Task 2: true
      Main: Task 3: false
      Main: Task 4: false
      Main: Task 5: false
      Main: Task 6: false
      Main: Task 7: false
      Main: Task 8: false
      Main: Task 9: false
      Main: 已经完成多少个任务: 1
      Main: Task 0: false
      Main: Task 1: false
      Main: Task 2: true
      Main: Task 3: false
      Main: Task 4: false
      Main: Task 5: false
      Main: Task 6: false
      Main: Task 7: false
      Main: Task 8: false
      Main: Task 9: false
      Main: 已经完成多少个任务: 1
      Main: Task 0: false
      Main: Task 1: false
      Main: Task 2: true
      Main: Task 3: false
      Main: Task 4: false
      Main: Task 5: false
      Main: Task 6: false
      Main: Task 7: false
      Main: Task 8: false
      Main: Task 9: false
      pool-1-thread-1: 5050
      Main: 已经完成多少个任务: 2
      Main: Task 0: true
      Main: Task 1: false
      Main: Task 2: true
      Main: Task 3: false
      Main: Task 4: false
      Main: Task 5: false
      Main: Task 6: false
      Main: Task 7: false
      Main: Task 8: false
      Main: Task 9: false
      pool-1-thread-1: 55050
      Main: 已经完成多少个任务: 3
      Main: Task 0: true
      Main: Task 1: false
      Main: Task 2: true
      Main: Task 3: false
      Main: Task 4: false
      Main: Task 5: true
      Main: Task 6: false
      Main: Task 7: false
      Main: Task 8: false
      Main: Task 9: false
      pool-1-thread-2: 15050
      Main: 已经完成多少个任务: 4
      Main: Task 0: true
      Main: Task 1: true
      Main: Task 2: true
      Main: Task 3: false
      Main: Task 4: false
      pool-1-thread-4: 35050
      Main: Task 5: true
      Main: Task 6: false
      Main: Task 7: false
      Main: Task 8: false
      Main: Task 9: false
      Main: 已经完成多少个任务: 5
      Main: Task 0: true
      Main: Task 1: true
      Main: Task 2: true
      Main: Task 3: true
      Main: Task 4: false
      Main: Task 5: true
      Main: Task 6: false
      Main: Task 7: false
      Main: Task 8: false
      Main: Task 9: false
      pool-1-thread-1: 65050
      Main: 已经完成多少个任务: 6
      Main: Task 0: true
      Main: Task 1: true
      Main: Task 2: true
      Main: Task 3: true
      Main: Task 4: false
      Main: Task 5: true
      Main: Task 6: true
      Main: Task 7: false
      Main: Task 8: false
      Main: Task 9: false
      pool-1-thread-3: 45050
      Main: 已经完成多少个任务: 7
      Main: Task 0: true
      Main: Task 1: true
      Main: Task 2: true
      Main: Task 3: true
      Main: Task 4: true
      Main: Task 5: true
      Main: Task 6: true
      Main: Task 7: false
      Main: Task 8: false
      Main: Task 9: false
      pool-1-thread-4: 85050
      Main: 已经完成多少个任务: 8
      Main: Task 0: true
      Main: Task 1: true
      Main: Task 2: true
      Main: Task 3: true
      Main: Task 4: true
      Main: Task 5: true
      Main: Task 6: true
      Main: Task 7: false
      Main: Task 8: true
      Main: Task 9: false
      Main: 已经完成多少个任务: 8
      Main: Task 0: true
      Main: Task 1: true
      Main: Task 2: true
      Main: Task 3: true
      Main: Task 4: true
      Main: Task 5: true
      Main: Task 6: true
      Main: Task 7: false
      Main: Task 8: true
      Main: Task 9: false
      Main: 已经完成多少个任务: 8
      Main: Task 0: true
      Main: Task 1: true
      Main: Task 2: true
      Main: Task 3: true
      Main: Task 4: true
      Main: Task 5: true
      Main: Task 6: true
      Main: Task 7: false
      Main: Task 8: true
      Main: Task 9: false
      Main: 已经完成多少个任务: 8
      Main: Task 0: true
      Main: Task 1: true
      Main: Task 2: true
      Main: Task 3: true
      Main: Task 4: true
      Main: Task 5: true
      Main: Task 6: true
      Main: Task 7: false
      Main: Task 8: true
      Main: Task 9: false
      Main: 已经完成多少个任务: 8
      Main: Task 0: true
      Main: Task 1: true
      Main: Task 2: true
      Main: Task 3: true
      Main: Task 4: true
      Main: Task 5: true
      Main: Task 6: true
      Main: Task 7: false
      Main: Task 8: true
      Main: Task 9: false
      Main: 已经完成多少个任务: 8
      Main: Task 0: true
      Main: Task 1: true
      Main: Task 2: true
      Main: Task 3: true
      Main: Task 4: true
      Main: Task 5: true
      Main: Task 6: true
      Main: Task 7: false
      Main: Task 8: true
      Main: Task 9: false
      Main: 已经完成多少个任务: 8
      Main: Task 0: true
      Main: Task 1: true
      Main: Task 2: true
      Main: Task 3: true
      Main: Task 4: true
      Main: Task 5: true
      Main: Task 6: true
      Main: Task 7: false
      Main: Task 8: true
      Main: Task 9: false
      Main: 已经完成多少个任务: 8
      Main: Task 0: true
      Main: Task 1: true
      Main: Task 2: true
      Main: Task 3: true
      Main: Task 4: true
      Main: Task 5: true
      Main: Task 6: true
      Main: Task 7: false
      Main: Task 8: true
      Main: Task 9: false
      Main: 已经完成多少个任务: 8
      Main: Task 0: true
      Main: Task 1: true
      Main: Task 2: true
      Main: Task 3: true
      Main: Task 4: true
      Main: Task 5: true
      Main: Task 6: true
      Main: Task 7: false
      Main: Task 8: true
      Main: Task 9: false
      pool-1-thread-1: 95050
      Main: 已经完成多少个任务: 9
      Main: Task 0: true
      Main: Task 1: true
      Main: Task 2: true
      Main: Task 3: true
      Main: Task 4: true
      Main: Task 5: true
      Main: Task 6: true
      Main: Task 7: false
      Main: Task 8: true
      Main: Task 9: true
      Main: 已经完成多少个任务: 9
      Main: Task 0: true
      Main: Task 1: true
      Main: Task 2: true
      Main: Task 3: true
      Main: Task 4: true
      Main: Task 5: true
      Main: Task 6: true
      Main: Task 7: false
      Main: Task 8: true
      Main: Task 9: true
      Main: 已经完成多少个任务: 9
      Main: Task 0: true
      Main: Task 1: true
      Main: Task 2: true
      Main: Task 3: true
      Main: Task 4: true
      Main: Task 5: true
      Main: Task 6: true
      Main: Task 7: false
      Main: Task 8: true
      Main: Task 9: true
      pool-1-thread-2: 75050
      1-1000的总和:500500
      

4 Java并发框架Fork-Join

笔记
  • Java 7 提供另一种并行框架:分解、治理、合并(分治编程
  • 适合用于整体任务量不好确定的场合(最小任务可确定

关键类:

  • ForkJoinPool 任务池

  • RecursiveAction

  • RecursiveTask

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
// SumTask.java
import java.math.BigInteger;
import java.util.concurrent.RecursiveTask;

//分任务求和
public class SumTask extends RecursiveTask<Long> {
  
  private int start;
  private int end;

  public SumTask(int start, int end) {
    this.start = start;
    this.end = end;
  }

  public static final int threadhold = 5;

  @Override
  protected Long compute() {
    Long sum = 0L;
    
    // 如果任务小于等于阈值, 就直接执行
    boolean canCompute = (end - start) <= threadhold;
    if (canCompute) {
      for (int i = start; i <= end; i++) {
        sum = sum + i;        
      }
    } else {
      // 任务大于阈值, 分裂为2个任务
      int middle = (start + end) / 2;
      SumTask subTask1 = new SumTask(start, middle);
      SumTask subTask2 = new SumTask(middle + 1, end);

      invokeAll(subTask1, subTask2);

      Long sum1 = subTask1.join();
      Long sum2 = subTask2.join();

      // 结果合并
      sum = sum1 + sum2;
    }
    return sum;
  }
}

// SumTest.java
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;

//分任务求和
public class SumTest {
    
    public static void main(String[] args) throws ExecutionException, InterruptedException {
      //创建执行线程池
      ForkJoinPool pool = new ForkJoinPool();
      //ForkJoinPool pool = new ForkJoinPool(4);
      
      //创建任务
      SumTask task = new SumTask(1, 10000000);
        
        /提交任务
      ForkJoinTask<Long> result = pool.submit(task);
        
      //等待结果
      do {
        System.out.printf("Main: Thread Count: %d\n",pool.getActiveThreadCount());
        System.out.printf("Main: Paralelism: %d\n",pool.getParallelism());
        try {
          Thread.sleep(50);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      } while (!task.isDone());
          
          //输出结果
          System.out.println(result.get().toString());
      }
}

5 Java并发数据结构

笔记
  • 常用的数据结构是线程不安全的
    • ArrayList, HashMap, HashSet 非同步的
    • 多个线程同时读写,可能会抛出异常或数据错误
  • 传统VectorHashtable等同步集合性能过差
  • 并发数据结构:数据添加和删除
    • 阻塞式集合:当集合为空或者满时,等待
    • 非阻塞式集合:当集合为空或者满时,不等待,返回null或异常

5.1 List

  • Vector 同步安全,写多读少
  • ArrayList 不安全
  • Collections.synchronizedList(List list) 基于synchronized,效率差
  • CopyOnWriteArrayList 读多写少,基于复制机制,非阻塞
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package list;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

public class ListTest {    
 
    public static void main(String[] args) throws InterruptedException{

        //线程不安全
        List<String> unsafeList = new ArrayList<String>();
        //线程安全
        List<String> safeList1 = Collections.synchronizedList(new ArrayList<String>());
        //线程安全
        CopyOnWriteArrayList<String> safeList2 = new CopyOnWriteArrayList<String>();

        ListThread t1 = new ListThread(unsafeList);
        ListThread t2 = new ListThread(safeList1);
        ListThread t3 = new ListThread(safeList2);

        for(int i = 0; i < 10; i++){
            Thread t = new Thread(t1, String.valueOf(i));
            t.start();
        }
        for(int i = 0; i < 10; i++) {
            Thread t = new Thread(t2, String.valueOf(i));
            t.start();
        }
        for(int i = 0; i < 10; i++) {
            Thread t = new Thread(t3, String.valueOf(i));
            t.start();
        }

        //等待子线程执行完
        Thread.sleep(2000);
 
        System.out.println("listThread1.list.size() = " + t1.list.size());
        System.out.println("listThread2.list.size() = " + t2.list.size());
        System.out.println("listThread3.list.size() = " + t3.list.size());

        //输出list中的值
        System.out.println("unsafeList:");
        for(String s : t1.list){
            if(s == null){
              System.out.print("null  ");
            }
            else
            {
              System.out.print(s + "  ");
            }
        }
        System.out.println();
        System.out.println("safeList1:");
        for(String s : t2.list){
          if(s == null){
              System.out.print("null  ");
            }
            else
            {
              System.out.print(s + "  ");
            }
        }
        System.out.println();
        System.out.println("safeList2:");
        for(String s : t3.list){
          if(s == null){
              System.out.print("null  ");
            }
            else
            {
              System.out.print(s + "  ");
            }
        }
    }
}

class ListThread implements Runnable{
  public List<String> list;

    public ListThread(List<String> list){
        this.list = list;
    }

    @Override
    public void run() {
      int i = 0;
      while(i<10)
      {
        try {
                Thread.sleep(10);
            }catch (InterruptedException e){
                e.printStackTrace();
            }
            //把当前线程名称加入list中
            list.add(Thread.currentThread().getName());
            i++;
      }        
    }
}
/**
 * 
 * 
 * listThread1.list.size() = 95
 * listThread2.list.size() = 100
 * listThread3.list.size() = 100
 * unsafeList:
 * 5  6  3  8  4  7  9  2  0  1  1  2  3  9  6  5  7  8  4  0  5  9  null  null  null  2  4  7  8  0  6  1  3  9  7  5  4  8  0  1  2  9  3  7  4  5  8  0  7  null  null  null  5  2  6  4  8  0  1  8  5  9  3  2  7  0  4  2  3  4  9  7  1  null  null  5  0  1  9  2  3  5  8  4  7  0  7  5  9  3  8  0  2  1  4  
 * safeList1:
 * 3  2  8  1  0  4  5  7  6  9  7  2  0  4  8  3  6  9  1  5  2  0  8  7  4  3  1  9  6  5  9  3  7  8  2  0  6  4  1  5  9  2  8  7  3  6  0  4  1  5  0  7  8  2  9  4  3  1  6  5  3  7  8  2  1  5  6  4  9  0  1  4  9  7  5  8  0  2  3  6  2  0  8  6  9  7  1  3  5  4  5  2  8  0  4  3  9  6  1  7  
 * safeList2:
 * 6  0  4  3  2  9  5  8  7  1  7  8  6  9  4  5  3  2  0  1  2  8  7  4  0  9  5  6  1  3  0  9  4  2  1  3  6  7  8  5  0  6  8  9  1  7  5  2  4  3  8  0  7  4  9  3  5  6  1  2  6  8  9  3  5  2  1  7  0  4  4  0  8  5  7  1  2  6  3  9  3  5  6  8  2  9  7  1  0  4  2  0  4  9  3  8  6  5  7  1  
 *
 */

5.2 Set

  • HashSet 不安全
  • Collections.synchronizedSet(Set set) 基于synchronized,效率差
  • CopyOnWriteArraySet (基于CopyOnWriteArrayList实现) 读多写少, 非阻塞
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package set;

import java.util.*;
import java.util.concurrent.CopyOnWriteArraySet;

public class SetTest{  
 
    public static void main(String[] args) throws InterruptedException{

        //线程不安全
        Set<String> unsafeSet = new HashSet<String>();
        //线程安全
        Set<String> safeSet1 = Collections.synchronizedSet(new HashSet<String>());
        //线程安全
        CopyOnWriteArraySet<String> safeSet2 = new CopyOnWriteArraySet<String>();

        SetThread t1 = new SetThread(unsafeSet);
        SetThread t2 = new SetThread(safeSet1);
        SetThread t3 = new SetThread(safeSet2);

        //unsafeSet的运行测试
        for(int i = 0; i < 10; i++){
          Thread t = new Thread(t1, String.valueOf(i));
          t.start();
        }
        for(int i = 0; i < 10; i++) {
          Thread t = new Thread(t2, String.valueOf(i));
            t.start();
        }
        for(int i = 0; i < 10; i++) {
          Thread t = new Thread(t3, String.valueOf(i));
            t.start();
        }

        //等待子线程执行完
        Thread.sleep(2000);
 
        System.out.println("setThread1.set.size() = " + t1.set.size());
        System.out.println("setThread2.set.size() = " + t2.set.size());
        System.out.println("setThread3.set.size() = " + t3.set.size());

        //输出set中的值
        System.out.println("unsafeSet:");
        for(String element:t1.set){
            if(element == null){
              System.out.print("null  ");
            }
            else
            {
              System.out.print(element + "  ");
            }
        }
        System.out.println();
        System.out.println("safeSet1:");
        for(String element:t2.set){
          if(element == null){
              System.out.print("null  ");
            }
            else
            {
              System.out.print(element + "  ");
            }
        }
        System.out.println();
        System.out.println("safeSet2:");
        for(String element:t3.set){
          if(element == null){
              System.out.print("null  ");
            }
            else
            {
              System.out.print(element + "  ");
            }
        }
    }
}

class SetThread implements Runnable{
  public Set<String> set;

    public SetThread(Set<String> set){
        this.set = set;
    }

    @Override
    public void run() {
      int i = 0;
      while(i<10)
      {
        i++;
        try {
                Thread.sleep(10);
            }catch (InterruptedException e){
                e.printStackTrace();
            }
            //把当前线程名称加入list中
            set.add(Thread.currentThread().getName() + i);
      }        
    }
}

/**
 * 
 * 
 * setThread1.set.size() = 97
 * setThread2.set.size() = 100
 * setThread3.set.size() = 100
 * unsafeSet:
 * 44  88  45  89  02  46  03  47  04  48  05  49  06  07  08  09  110  310  510  710  910  92  93  94  51  95  96  53  97  54  98  55  99  56  13  57  14  58  15  59  16  17  18  19  62  63  64  65  66  23  67  24  68  25  69  26  27  28  29  010  210  410  610  810  73  74  75  32  76  33  77  34  78  35  79  36  37  38  39  83  84  85  42  86  43  87  
 * safeSet1:
 * 88  01  89  02  03  04  05  06  07  08  09  110  510  91  910  92  93  94  95  96  97  98  11  99  12  13  14  15  16  17  18  19  21  22  23  24  25  26  27  28  29  010  410  810  31  32  33  34  35  36  37  38  39  41  42  43  44  45  46  47  48  49  310  710  51  52  53  54  55  56  57  58  59  61  62  63  64  65  66  67  68  69  210  610  71  72  73  74  75  76  77  78  79  81  82  83  84  85  86  87  
 * safeSet2:
 * 21  61  51  41  01  91  81  11  71  31  22  62  32  52  82  92  12  42  02  72  93  23  83  13  33  43  73  53  03  63  24  64  84  34  04  54  94  74  44  14  25  95  15  05  55  35  75  85  65  45  86  36  76  06  56  66  16  96  26  46  27  47  97  57  77  87  67  07  37  17  98  08  38  58  78  48  18  88  68  28  09  39  29  19  49  59  79  89  99  69  610  110  910  510  810  410  010  210  310  710 
 * 
 * 
 * 
 */

5.3 Map

  • Hashtable 同步安全,写多读少
  • HashMap 不安全
  • Collections.synchronizedMap(Map map) 基于synchronized,效率差
  • ConcurrentHashMap 读多写少,非阻塞
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package map;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

public class MapTest{    
 
    public static void main(String[] args) throws InterruptedException{

        //线程不安全
        Map<Integer,String> unsafeMap = new HashMap<Integer,String>();
        //线程安全
        Map<Integer,String> safeMap1 = Collections.synchronizedMap(new HashMap<Integer,String>());
        //线程安全
        ConcurrentHashMap<Integer,String> safeMap2 = new ConcurrentHashMap<Integer,String>();

        MapThread t1 = new MapThread(unsafeMap);
        MapThread t2 = new MapThread(safeMap1);
        MapThread t3 = new MapThread(safeMap2);

        //unsafeMap的运行测试
        for(int i = 0; i < 10; i++){
          Thread t = new Thread(t1);
          t.start();
        }       
        for(int i = 0; i < 10; i++) {
          Thread t = new Thread(t2);
            t.start();
        }
        for(int i = 0; i < 10; i++) {
          Thread t = new Thread(t3);
            t.start();
        }

        //等待子线程执行完
        Thread.sleep(2000);
 
        System.out.println("mapThread1.map.size() = " + t1.map.size());
        System.out.println("mapThread2.map.size() = " + t2.map.size());
        System.out.println("mapThread3.map.size() = " + t3.map.size());

        //输出set中的值
        System.out.println("unsafeMap:");
        Iterator iter = t1.map.entrySet().iterator();
        while(iter.hasNext()) {
            Map.Entry<Integer,String> entry = (Map.Entry<Integer,String>)iter.next();
            // 获取key
            System.out.print(entry.getKey() + ":");
            // 获取value
            System.out.print(entry.getValue() + " ");
        }
        System.out.println();
        
        System.out.println("safeMap1:");
        iter = t2.map.entrySet().iterator();
        while(iter.hasNext()) {
            Map.Entry<Integer,String> entry = (Map.Entry<Integer,String>)iter.next();
            // 获取key
            System.out.print(entry.getKey() + ":");
            // 获取value
            System.out.print(entry.getValue() + " ");
        }

        System.out.println();
        System.out.println("safeMap2:");
        iter = t3.map.entrySet().iterator();
        while(iter.hasNext()) {
            Map.Entry<Integer,String> entry = (Map.Entry<Integer,String>)iter.next();
            // 获取key
            System.out.print(entry.getKey() + ":");
            // 获取value
            System.out.print(entry.getValue() + " ");
        }
        System.out.println();
        System.out.println("mapThread1.map.size() = " + t1.map.size());
        System.out.println("mapThread2.map.size() = " + t2.map.size());
        System.out.println("mapThread3.map.size() = " + t3.map.size());
    }
}

class MapThread implements Runnable
{
  public Map<Integer,String> map;

    public MapThread(Map<Integer,String> map){
        this.map = map;
    }

    @Override
    public void run() {
        int i=0;
        
        while(i<100)
        {
          //把当前线程名称加入map中
            map.put(i++,Thread.currentThread().getName());
            try {
                Thread.sleep(10);
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        }        
    }
}
/**
 * 
 * 
 * 
 * 
 * 
 * mapThread1.map.size() = 123
 * mapThread2.map.size() = 100
 * mapThread3.map.size() = 100
 * unsafeMap:
 * 0:Thread-9 1:Thread-0 2:Thread-5 3:Thread-1 4:Thread-7 5:Thread-7 6:Thread-4 7:Thread-9 8:Thread-8 9:Thread-2 10:Thread-6 11:Thread-5 12:Thread-7 13:Thread-7 14:Thread-0 15:Thread-0 16:Thread-6 17:Thread-7 18:Thread-6 19:Thread-7 20:Thread-7 21:Thread-0 22:Thread-5 23:Thread-0 24:Thread-3 25:Thread-0 26:Thread-5 27:Thread-7 28:Thread-0 29:Thread-2 30:Thread-2 31:Thread-1 32:Thread-3 33:Thread-9 34:Thread-7 35:Thread-0 36:Thread-0 37:Thread-0 38:Thread-7 39:Thread-6 40:Thread-7 41:Thread-7 42:Thread-0 43:Thread-0 44:Thread-6 45:Thread-7 46:Thread-8 47:Thread-6 48:Thread-9 49:Thread-0 50:Thread-1 51:Thread-2 52:Thread-6 53:Thread-7 54:Thread-7 55:Thread-0 56:Thread-2 57:Thread-2 58:Thread-5 59:Thread-9 60:Thread-7 61:Thread-2 62:Thread-0 63:Thread-8 64:Thread-6 65:Thread-8 66:Thread-0 67:Thread-8 68:Thread-8 69:Thread-0 70:Thread-1 71:Thread-7 72:Thread-9 73:Thread-7 74:Thread-9 75:Thread-7 76:Thread-0 77:Thread-7 78:Thread-8 79:Thread-8 80:Thread-8 81:Thread-7 82:Thread-8 83:Thread-7 84:Thread-1 85:Thread-0 86:Thread-5 87:Thread-8 88:Thread-5 89:Thread-8 90:Thread-8 91:Thread-1 92:Thread-8 93:Thread-7 94:Thread-8 95:Thread-0 96:Thread-7 97:Thread-0 98:Thread-5 99:Thread-7 
 * safeMap1:
 * 0:Thread-16 1:Thread-14 2:Thread-14 3:Thread-12 4:Thread-11 5:Thread-15 6:Thread-18 7:Thread-18 8:Thread-18 9:Thread-11 10:Thread-18 11:Thread-12 12:Thread-16 13:Thread-15 14:Thread-18 15:Thread-15 16:Thread-12 17:Thread-13 18:Thread-12 19:Thread-18 20:Thread-12 21:Thread-16 22:Thread-14 23:Thread-19 24:Thread-13 25:Thread-11 26:Thread-15 27:Thread-18 28:Thread-15 29:Thread-19 30:Thread-17 31:Thread-10 32:Thread-18 33:Thread-17 34:Thread-16 35:Thread-16 36:Thread-15 37:Thread-15 38:Thread-13 39:Thread-18 40:Thread-11 41:Thread-15 42:Thread-18 43:Thread-16 44:Thread-18 45:Thread-16 46:Thread-18 47:Thread-10 48:Thread-18 49:Thread-13 50:Thread-18 51:Thread-14 52:Thread-11 53:Thread-15 54:Thread-18 55:Thread-15 56:Thread-10 57:Thread-18 58:Thread-18 59:Thread-14 60:Thread-16 61:Thread-19 62:Thread-16 63:Thread-11 64:Thread-14 65:Thread-16 66:Thread-13 67:Thread-16 68:Thread-16 69:Thread-15 70:Thread-10 71:Thread-16 72:Thread-10 73:Thread-14 74:Thread-17 75:Thread-17 76:Thread-15 77:Thread-16 78:Thread-15 79:Thread-16 80:Thread-16 81:Thread-15 82:Thread-13 83:Thread-15 84:Thread-17 85:Thread-18 86:Thread-10 87:Thread-16 88:Thread-14 89:Thread-16 90:Thread-16 91:Thread-13 92:Thread-16 93:Thread-16 94:Thread-15 95:Thread-15 96:Thread-16 97:Thread-15 98:Thread-16 99:Thread-16 
 * safeMap2:
 * 0:Thread-29 1:Thread-21 2:Thread-23 3:Thread-25 4:Thread-26 5:Thread-24 6:Thread-24 7:Thread-25 8:Thread-24 9:Thread-22 10:Thread-27 11:Thread-21 12:Thread-24 13:Thread-28 14:Thread-22 15:Thread-23 16:Thread-29 17:Thread-22 18:Thread-20 19:Thread-24 20:Thread-26 21:Thread-24 22:Thread-29 23:Thread-28 24:Thread-29 25:Thread-21 26:Thread-29 27:Thread-25 28:Thread-26 29:Thread-29 30:Thread-25 31:Thread-25 32:Thread-27 33:Thread-26 34:Thread-20 35:Thread-23 36:Thread-22 37:Thread-29 38:Thread-24 39:Thread-25 40:Thread-20 41:Thread-24 42:Thread-26 43:Thread-23 44:Thread-25 45:Thread-23 46:Thread-23 47:Thread-22 48:Thread-22 49:Thread-23 50:Thread-23 51:Thread-22 52:Thread-23 53:Thread-20 54:Thread-29 55:Thread-24 56:Thread-26 57:Thread-26 58:Thread-21 59:Thread-29 60:Thread-23 61:Thread-27 62:Thread-24 63:Thread-21 64:Thread-29 65:Thread-24 66:Thread-27 67:Thread-23 68:Thread-24 69:Thread-23 70:Thread-25 71:Thread-23 72:Thread-26 73:Thread-23 74:Thread-25 75:Thread-23 76:Thread-22 77:Thread-23 78:Thread-22 79:Thread-24 80:Thread-24 81:Thread-23 82:Thread-24 83:Thread-23 84:Thread-25 85:Thread-25 86:Thread-21 87:Thread-24 88:Thread-29 89:Thread-24 90:Thread-24 91:Thread-23 92:Thread-24 93:Thread-26 94:Thread-23 95:Thread-23 96:Thread-29 97:Thread-25 98:Thread-29 99:Thread-26 
 * mapThread1.map.size() = 123
 * mapThread2.map.size() = 100
 * mapThread3.map.size() = 100
 * 
 */

5.4 Queue & Deque (队列,JDK 1.5 提出)

  • ConcurrentLinkedQueue 非阻塞
  • ArrayBlockingQueue/LinkedBlockingQueue 阻塞
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
package queue;

import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentLinkedDeque;

public class QueueTest {

    public static void main(String[] args) throws InterruptedException{

        //线程不安全
        Deque<String> unsafeQueue = new ArrayDeque<String>();
        //线程安全
        ConcurrentLinkedDeque<String> safeQueue1 = new ConcurrentLinkedDeque<String>();

        ArrayBlockingQueue<String> safeQueue2 = new ArrayBlockingQueue<String>(100);

        QueueThread t1 = new QueueThread(unsafeQueue);
        QueueThread t2 = new QueueThread(safeQueue1);
        QueueThread t3 = new QueueThread(safeQueue2);

        for(int i = 0; i < 10; i++){
            Thread thread1 = new Thread(t1, String.valueOf(i));
            thread1.start();
        }
        for(int i = 0; i < 10; i++) {
            Thread thread2 = new Thread(t2, String.valueOf(i));
            thread2.start();
        }
        for(int i = 0; i < 10; i++) {
            Thread thread3 = new Thread(t3, String.valueOf(i));
            thread3.start();
        }

        //等待子线程执行完
        Thread.sleep(2000);
 
        System.out.println("queueThread1.queue.size() = " + t1.queue.size());
        System.out.println("queueThread2.queue.size() = " + t2.queue.size());
        System.out.println("queueThread3.queue.size() = " + t3.queue.size());

        //输出queue中的值
        System.out.println("unsafeQueue:");
        for(String s:t1.queue)
        {
          System.out.print(s + " ");
        }
        System.out.println();
        System.out.println("safeQueue1:");
        for(String s:t2.queue)
        {
          System.out.print(s + " ");
        }
        System.out.println();
        System.out.println("safeQueue2:");
        for(String s:t3.queue)
        {
          System.out.print(s + " ");
        }
    }
}

class QueueThread implements Runnable{
  public Queue<String> queue;

    public QueueThread(Queue<String> queue){
        this.queue = queue;
    }

    @Override
    public void run() {
      int i = 0;
      while(i<10)
      {
        i++;
        try {
                Thread.sleep(10);
            }catch (InterruptedException e){
                e.printStackTrace();
            }
            //把当前线程名称加入list中
            queue.add(Thread.currentThread().getName());
      }        
    }
}
/**
 * 
 * 
 * queueThread1.queue.size() = 83
 * queueThread2.queue.size() = 100
 * queueThread3.queue.size() = 100
 * unsafeQueue:
 * 1 8 4 3 7 6 0 0 5 3 2 8 4 0 7 6 6 4 3 2 9 1 5 8 0 4 2 3 5 8 9 1 6 5 7 3 1 9 4 2 0 2 9 1 5 4 3 6 8 0 7 0 5 2 3 1 4 9 8 7 6 9 2 5 2 9 1 4 3 5 6 8 7 0 6 8 0 2 7 3 4 9 1 
 * safeQueue1:
 * 4 8 1 7 2 9 0 6 3 5 3 4 0 9 5 1 7 6 2 8 8 2 9 1 7 0 4 3 6 5 2 4 1 8 7 3 0 9 6 5 4 0 7 3 8 9 1 2 5 6 8 0 7 3 2 1 9 4 6 5 4 7 0 8 6 3 9 1 2 5 3 8 7 4 0 5 2 6 1 9 7 0 2 8 9 1 4 3 6 5 3 8 5 6 0 4 1 9 7 2 
 * safeQueue2:
 * 6 3 2 5 1 9 4 8 0 7 5 8 7 1 3 6 4 0 9 2 6 1 9 7 5 8 0 2 3 4 1 6 7 5 8 3 0 4 9 2 1 2 9 7 5 4 3 0 6 8 0 5 2 8 6 3 4 9 1 7 4 9 1 6 2 0 3 8 7 5 8 2 7 6 0 3 1 9 5 4 6 8 5 0 7 1 2 9 3 4 4 3 9 5 0 2 1 6 8 7 
 * 
 * 
 * 
 */ 

6 Java并发协作控制

笔记
  • Thread/Executor/Fork-Join
    • 线程启动,运行,结束
    • 线程之间缺少协作
  • synchronized 同步
    • 限定只有一个线程才能进入关键区
    • 简单粗暴,性能损失有点大

6.1 Lock

  • Lock也可以实现同步的效果
    • 实现更复杂的临界区结构
    • tryLock方法可以预判锁是否空闲
    • 允许分离读写的操作,多个读,一个写
    • 性能更好
  • ReentrantLock类,可重入的互斥锁
  • ReentrantReadWriteLock类,可重入的读写锁
  • lockunlock函数
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class LockExample {

  private static final ReentrantLock queueLock = new ReentrantLock(); //可重入锁
  private static final ReentrantReadWriteLock orderLock = new ReentrantReadWriteLock(); //可重入读写锁
  
  /**
   * 有家奶茶店,点单有时需要排队 
   * 假设想买奶茶的人如果看到需要排队,就决定不买
   * 又假设奶茶店有老板和多名员工,记单方式比较原始,只有一个订单本
   * 老板负责写新订单,员工不断地查看订单本得到信息来制作奶茶,在老板写新订单时员工不能看订单本
   * 多个员工可同时看订单本,在员工看时老板不能写新订单
   * @param args
   * @throws InterruptedException 
   */
  public static void main(String[] args) throws InterruptedException {
    buyMilkTea();
    //handleOrder(); //需手动关闭
  }
  
  public void tryToBuyMilkTea() throws InterruptedException {
    boolean flag = true;
    while(flag)
    {
      if (queueLock.tryLock()) {
        //queueLock.lock();
        long thinkingTime = (long) (Math.random() * 500);
        Thread.sleep(thinkingTime);
        System.out.println(Thread.currentThread().getName() + ": 来一杯珍珠奶茶,不要珍珠");
        flag = false;
        queueLock.unlock();
      } else {
        //System.out.println(Thread.currentThread().getName() + ":" + queueLock.getQueueLength() + "人在排队");
        System.out.println(Thread.currentThread().getName() + ": 再等等");
      }
      if(flag)
      {
        Thread.sleep(1000);
      }
    }
    
  }
  
  public void addOrder() throws InterruptedException {
    //writeLock,写锁,排他的,只能一个线程拥有
    orderLock.writeLock().lock();
    long writingTime = (long) (Math.random() * 1000);
    Thread.sleep(writingTime);
    System.out.println("老板新加一笔订单");
    orderLock.writeLock().unlock();
  }
  
  public void viewOrder() throws InterruptedException {
    //readLock,读锁,可以多个线程共享
    orderLock.readLock().lock();
      
    long readingTime = (long) (Math.random() * 500);
    Thread.sleep(readingTime);
    System.out.println(Thread.currentThread().getName() + ": 查看订单本");
    orderLock.readLock().unlock();      

  }
  
  public static void buyMilkTea() throws InterruptedException {
    LockExample lockExample = new LockExample();
    int STUDENTS_CNT = 10;
    
    Thread[] students = new Thread[STUDENTS_CNT];
    for (int i = 0; i < STUDENTS_CNT; i++) {
      students[i] = new Thread(new Runnable() {

        @Override
        public void run() {
          try {
            long walkingTime = (long) (Math.random() * 1000);
            Thread.sleep(walkingTime);
            lockExample.tryToBuyMilkTea();
          } catch(InterruptedException e) {
            System.out.println(e.getMessage());
          }
        }
        
      }
      );
      
      students[i].start();
    }
    
    for (int i = 0; i < STUDENTS_CNT; i++)
      students[i].join();

  }
  
  
  public static void handleOrder() throws InterruptedException {
    LockExample lockExample = new LockExample();
    
    
    Thread boss = new Thread(new Runnable() {

      @Override
      public void run() {
        while (true) {
          try {
            lockExample.addOrder();
            long waitingTime = (long) (Math.random() * 1000);
            Thread.sleep(waitingTime);
          } catch (InterruptedException e) {
            System.out.println(e.getMessage());
          }
        }
      }
    });
    boss.start();

    int workerCnt = 3;
    Thread[] workers = new Thread[workerCnt];
    for (int i = 0; i < workerCnt; i++)
    {
      workers[i] = new Thread(new Runnable() {

        @Override
        public void run() {
          while (true) {
            try {
                lockExample.viewOrder();
                long workingTime = (long) (Math.random() * 5000);
                Thread.sleep(workingTime);
              } catch (InterruptedException e) {
                System.out.println(e.getMessage());
              }
            }
        }
        
      });
      
      workers[i].start();
    }
    
  }
}

/**
 * 
 * 
 * Thread-2: 再等等
 * Thread-0: 来一杯珍珠奶茶,不要珍珠
 * Thread-3: 再等等
 * Thread-6: 再等等
 * Thread-9: 再等等
 * Thread-8: 再等等
 * Thread-1: 来一杯珍珠奶茶,不要珍珠
 * Thread-5: 来一杯珍珠奶茶,不要珍珠
 * Thread-7: 再等等
 * Thread-4: 来一杯珍珠奶茶,不要珍珠
 * Thread-3: 再等等
 * Thread-6: 再等等
 * Thread-2: 来一杯珍珠奶茶,不要珍珠
 * Thread-8: 再等等
 * Thread-9: 来一杯珍珠奶茶,不要珍珠
 * Thread-7: 来一杯珍珠奶茶,不要珍珠
 * Thread-6: 再等等
 * Thread-8: 再等等
 * Thread-3: 来一杯珍珠奶茶,不要珍珠
 * Thread-8: 再等等
 * Thread-6: 来一杯珍珠奶茶,不要珍珠
 * Thread-8: 来一杯珍珠奶茶,不要珍珠
 *
 */

6.2 Semaphore

  • 信号量,由1965年Dijkstra提出的
  • 信号量:本质上是一个计数器
  • 计数器大于0,可以使用,等于0不能使用
  • 可以设置多个并发量,例如限制10个访问
  • Semaphore
    • acquire获取
    • release释放
  • Lock更进一步,可以控制多个同时访问关键区
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import java.util.concurrent.Semaphore;

public class SemaphoreExample {

  private final Semaphore placeSemaphore = new Semaphore(5);
  
  public boolean parking() throws InterruptedException {
    if (placeSemaphore.tryAcquire()) {
      System.out.println(Thread.currentThread().getName() + ": 停车成功");
      return true;
    } else {
      System.out.println(Thread.currentThread().getName() + ": 没有空位");
      return false;
    }

  }
  
  public void leaving() throws InterruptedException {
    placeSemaphore.release();
    System.out.println(Thread.currentThread().getName() + ": 开走");
  }
  
  /**
   * 现有一地下车库,共有车位5个,由10辆车需要停放,每次停放时,去申请信号量
   * @param args
   * @throws InterruptedException 
   */
  public static void main(String[] args) throws InterruptedException {
    int tryToParkCnt = 10;
    
    SemaphoreExample semaphoreExample = new SemaphoreExample();
    
    Thread[] parkers = new Thread[tryToParkCnt];
    
    for (int i = 0; i < tryToParkCnt; i++) {
      parkers[i] = new Thread(new Runnable() {

        @Override
        public void run() {
          try {
            long randomTime = (long) (Math.random() * 1000);
            Thread.sleep(randomTime);
            if (semaphoreExample.parking()) {
              long parkingTime = (long) (Math.random() * 1200);
              Thread.sleep(parkingTime);
              semaphoreExample.leaving();
            }
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      });
      
      parkers[i].start();
    }

    for (int i = 0; i < tryToParkCnt; i++) {
      parkers[i].join();
    } 
  }
}

/**
 * 
 * 
 * Thread-5: 停车成功
 * Thread-9: 停车成功
 * Thread-9: 开走
 * Thread-6: 停车成功
 * Thread-8: 停车成功
 * Thread-2: 停车成功
 * Thread-4: 停车成功
 * Thread-0: 没有空位
 * Thread-4: 开走
 * Thread-7: 停车成功
 * Thread-2: 开走
 * Thread-3: 停车成功
 * Thread-6: 开走
 * Thread-1: 停车成功
 * Thread-5: 开走
 * Thread-8: 开走
 * Thread-3: 开走
 * Thread-1: 开走
 * Thread-7: 开走
 * 
 * 
 */

6.3 Latch

  • 等待锁,是一个同步辅助类
  • 用来同步执行任务的一个或者多个线程
  • 不是用来保护临界区或者共享资源
  • CountDownLatch
    • countDown() 计数减1
    • await() 等待latch变成0
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69

import java.util.concurrent.CountDownLatch;

public class CountDownLatchExample {

  /**
   * 设想百米赛跑比赛 发令枪发出信号后选手开始跑,全部选手跑到终点后比赛结束
   * 
   * @param args
   * @throws InterruptedException
   */
  public static void main(String[] args) throws InterruptedException {
    int runnerCnt = 10;
    CountDownLatch startSignal = new CountDownLatch(1);
    CountDownLatch doneSignal = new CountDownLatch(runnerCnt);

    for (int i = 0; i < runnerCnt; ++i) // create and start threads
      new Thread(new Worker(startSignal, doneSignal)).start();

    System.out.println("准备工作...");
    System.out.println("准备工作就绪");
    startSignal.countDown(); // let all threads proceed
    System.out.println("比赛开始");
    doneSignal.await(); // wait for all to finish
    System.out.println("比赛结束");
  }

  static class Worker implements Runnable {
    private final CountDownLatch startSignal;
    private final CountDownLatch doneSignal;

    Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
      this.startSignal = startSignal;
      this.doneSignal = doneSignal;
    }

    public void run() {
      try {
        startSignal.await();
        doWork();
        doneSignal.countDown();
      } catch (InterruptedException ex) {
      } // return;
    }

    void doWork() {
      System.out.println(Thread.currentThread().getName() + ": 跑完全程");
    }
  }
}

/**
 * 
 * 准备工作...
 * 准备工作就绪
 * 比赛开始
 * Thread-0: 跑完全程
 * Thread-6: 跑完全程
 * Thread-9: 跑完全程
 * Thread-5: 跑完全程
 * Thread-4: 跑完全程
 * Thread-8: 跑完全程
 * Thread-3: 跑完全程
 * Thread-2: 跑完全程
 * Thread-1: 跑完全程
 * Thread-7: 跑完全程
 * 比赛结束
 * 
 */

6.4 Barrier

  • 集合点,也是一个同步辅助类
  • 允许多个线程在某一个点上进行同步
  • CyclicBarrier
    • 构造函数是需要同步的线程数量
    • await等待其他线程,到达数量后,就放行
笔记
当在Barrierawait的线程数量达到预定的要求后,所有的await的线程不再等待,全部解锁。并且,Barrier将执行预定的回调动作(本程序中,回调动作就是CalculateFinalResult
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96


import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierExample {
  
  /**
   * 假定有三行数,用三个线程分别计算每一行的和,最终计算总和
   * @param args
   */
  public static void main(String[] args) {
    final int[][] numbers = new int[3][5];
    final int[] results = new int[3];
    int[] row1 = new int[]{1, 2, 3, 4, 5};
    int[] row2 = new int[]{6, 7, 8, 9, 10};
    int[] row3 = new int[]{11, 12, 13, 14, 15};
    numbers[0] = row1;
    numbers[1] = row2;
    numbers[2] = row3;
    
    CalculateFinalResult finalResultCalculator = new CalculateFinalResult(results);
    CyclicBarrier barrier = new CyclicBarrier(3, finalResultCalculator);
    //当有3个线程在barrier上await,就执行finalResultCalculator
    
    for(int i = 0; i < 3; i++) {
      CalculateEachRow rowCalculator = new CalculateEachRow(barrier, numbers, i, results);
      new Thread(rowCalculator).start();
    }   
  }
}

class CalculateEachRow implements Runnable {

  final int[][] numbers;
  final int rowNumber;
  final int[] res;
  final CyclicBarrier barrier;
  
  CalculateEachRow(CyclicBarrier barrier, int[][] numbers, int rowNumber, int[] res) {
    this.barrier = barrier;
    this.numbers = numbers;
    this.rowNumber = rowNumber;
    this.res = res;
  }
  
  @Override
  public void run() {
    int[] row = numbers[rowNumber];
    int sum = 0;
    for (int data : row) {
      sum += data;
      res[rowNumber] = sum;
    }
    try {
      System.out.println(Thread.currentThread().getName() + ": 计算第" + (rowNumber + 1) + "行结束,结果为: " + sum);
      barrier.await(); //等待!只要超过3个(Barrier的构造参数),就放行。
    } catch (InterruptedException | BrokenBarrierException e) {
      e.printStackTrace();
    }
  }
  
}


class CalculateFinalResult implements Runnable {
  final int[] eachRowRes;
  int finalRes;
  public int getFinalResult() {
    return finalRes;
  }
  
  CalculateFinalResult(int[] eachRowRes) {
    this.eachRowRes = eachRowRes;
  }
  
  @Override
  public void run() {
    int sum = 0;
    for(int data : eachRowRes) {
      sum += data;
    }
    finalRes = sum;
    System.out.println("最终结果为: " + finalRes);
  }
  
}

/**
 * 
 * Thread-0: 计算第1行结束,结果为: 15
 * Thread-2: 计算第3行结束,结果为: 65
 * Thread-1: 计算第2行结束,结果为: 40
 * 最终结果为: 120
 * 
 */

6.5 Phaser

  • 允许执行并发多阶段任务,同步辅助类
  • 在每一个阶段结束的位置对线程进行同步,当所有的线程都到达这步,再进行下一步
  • Phaser
    • arrive()
    • arriveAndAwaitAdvance()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87

import java.util.concurrent.Phaser;

public class PhaserExample {

  /**
   * 假设举行考试,总共三道大题,每次下发一道题目,等所有学生完成后再进行下一道
   * 
   * @param args
   */
  public static void main(String[] args) {

    int studentsCnt = 5;
    Phaser phaser = new Phaser(studentsCnt);

    for (int i = 0; i < studentsCnt; i++) {
      new Thread(new Student(phaser)).start();
    }
  }
}

class Student implements Runnable {

  private final Phaser phaser;

  public Student(Phaser phaser) {
    this.phaser = phaser;
  }

  @Override
  public void run() {
    try {
      doTesting(1);
      phaser.arriveAndAwaitAdvance(); //等到5个线程都到了,才放行
      doTesting(2);
      phaser.arriveAndAwaitAdvance();
      doTesting(3);
      phaser.arriveAndAwaitAdvance();
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

  private void doTesting(int i) throws InterruptedException {
    String name = Thread.currentThread().getName();
    System.out.println(name + "开始答第" + i + "题");
    long thinkingTime = (long) (Math.random() * 1000);
    Thread.sleep(thinkingTime);
    System.out.println(name + "第" + i + "道题答题结束");
  }
}

/**
 * 
 * Thread-1开始答第1题
 * Thread-3开始答第1题
 * Thread-2开始答第1题
 * Thread-4开始答第1题
 * Thread-0开始答第1题
 * Thread-4第1道题答题结束
 * Thread-3第1道题答题结束
 * Thread-0第1道题答题结束
 * Thread-2第1道题答题结束
 * Thread-1第1道题答题结束
 * Thread-1开始答第2题
 * Thread-4开始答第2题
 * Thread-3开始答第2题
 * Thread-2开始答第2题
 * Thread-0开始答第2题
 * Thread-2第2道题答题结束
 * Thread-1第2道题答题结束
 * Thread-4第2道题答题结束
 * Thread-0第2道题答题结束
 * Thread-3第2道题答题结束
 * Thread-3开始答第3题
 * Thread-0开始答第3题
 * Thread-4开始答第3题
 * Thread-1开始答第3题
 * Thread-2开始答第3题
 * Thread-0第3道题答题结束
 * Thread-1第3道题答题结束
 * Thread-4第3道题答题结束
 * Thread-2第3道题答题结束
 * Thread-3第3道题答题结束
 * 
 * 
 */

6.6 Exchanger

  • 允许在并发线程中互相交换消息
  • 允许在2个线程中定义同步点,当两个线程都到达同步点,它们交换数据结构
  • Exchanger
    • exchange(), 线程双方互相交互数据
    • 交换数据是双向的
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75


import java.util.Scanner;
import java.util.concurrent.Exchanger;

public class ExchangerExample {
  
  /**
   * 本例通过Exchanger实现学生成绩查询,简单线程间数据的交换
   * @param args
   * @throws InterruptedException
   */
  public static void main(String[] args) throws InterruptedException {
    Exchanger<String> exchanger = new Exchanger<String>();
    BackgroundWorker worker = new BackgroundWorker(exchanger);
    new Thread(worker).start();
    
    Scanner scanner = new Scanner(System.in);
    while(true) {
      System.out.println("输入要查询的属性学生姓名:");
      String input = scanner.nextLine().trim();
      exchanger.exchange(input); //把用户输入传递给线程
      String value = exchanger.exchange(null); //拿到线程反馈结果
      if ("exit".equals(value)) {
        break;
      }
      System.out.println("查询结果:" + value);
    }
    scanner.close();
  } 
}

class BackgroundWorker implements Runnable {

  final Exchanger<String> exchanger;
  BackgroundWorker(Exchanger<String> exchanger) {
    this.exchanger = exchanger;
  }
  @Override
  public void run() {
    while (true) {
      try {
        String item = exchanger.exchange(null);
        switch (item) {
        case "zhangsan": 
          exchanger.exchange("90");
          break;
        case "lisi":
          exchanger.exchange("80");
          break;
        case "wangwu":
          exchanger.exchange("70");
          break;
        case "exit":
          exchanger.exchange("exit");
          return;
        default:
          exchanger.exchange("查无此人");
        }         
      } catch (InterruptedException e) {
        e.printStackTrace();
      }       
    }
  }   
}

/**
 * 
 * 
 * 输入要查询的属性学生姓名:
 * lisi
 * 查询结果:80
 * 
 * 
 */

7 Java定时任务执行

笔记
  • Thread/Executor/Fork-Join 多线程
    • 立刻执行
    • 框架调度
  • 定时执行
    • 固定某一个时间点运行
    • 以某一个周期

7.1 简单定时器机制

  • 设置计划任务,也就是在指定的时间开始执行某一个任务。
  • TimerTask 封装任务
  • Timer类 定时器
笔记
一个Timer对象可以执行多个计划任务,但是这些任务是串行执行的。如果有一个任务执行很慢,将会影响后续的任务准点运行。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package timer;

import java.util.Calendar;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;

public class TimerTest {
  public static void main(String[] args) throws InterruptedException {
      MyTask task = new MyTask();
      Timer timer = new Timer();
      
      System.out.println("当前时间:"+new Date().toLocaleString());
      //当前时间1秒后,每2秒执行一次
      timer.schedule(task, 1000, 2000);
      
      Thread.sleep(10000);
      task.cancel();  //取消当前的任务
      
      System.out.println("================================");
      
      Calendar now = Calendar.getInstance();
      now.set(Calendar.SECOND,now.get(Calendar.SECOND)+3);
      Date runDate = now.getTime();
      MyTask2 task2 = new MyTask2();
      timer.scheduleAtFixedRate(task2,runDate,3000); //固定速率
          
      
      Thread.sleep(20000);
      timer.cancel();  //取消定时器
  }
}

class MyTask extends TimerTask {
  public void run() {
    System.out.println("运行了!时间为:" + new Date());
  }
}

class MyTask2 extends TimerTask {
  public void run() {
    System.out.println("运行了!时间为:" + new Date());
    try {
      Thread.sleep(4000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}

/**
 * 
 * 当前时间:2022-3-1 1:07:56
 * 运行了!时间为:Tue Mar 01 01:07:57 CST 2022
 * 运行了!时间为:Tue Mar 01 01:07:59 CST 2022
 * 运行了!时间为:Tue Mar 01 01:08:01 CST 2022
 * 运行了!时间为:Tue Mar 01 01:08:03 CST 2022
 * 运行了!时间为:Tue Mar 01 01:08:05 CST 2022
 * ================================
 * 运行了!时间为:Tue Mar 01 01:08:09 CST 2022
 * 运行了!时间为:Tue Mar 01 01:08:13 CST 2022
 * 运行了!时间为:Tue Mar 01 01:08:17 CST 2022
 * 运行了!时间为:Tue Mar 01 01:08:21 CST 2022
 * 运行了!时间为:Tue Mar 01 01:08:25 CST 2022
 * 
 * 
 * 
 */

7.2 Executor +定时器机制

  • ScheduledExecutorService
    • 定时任务
    • 周期任务
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package schedule;

import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduledExecutorTest {
    
  public static void main(String[] a) throws Exception
    {
      //executeAtFixTime();
      //executeFixedRate();  //3s
      executeFixedDelay();  //4s
    }
  
  public static void executeAtFixTime() throws Exception {
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
        executor.schedule(
                new MyTask(),
                1,                
                TimeUnit.SECONDS);
        
        Thread.sleep(20000);
        executor.shutdown();
    }
  
  /**
     * 周期任务 固定速率 是以上一个任务开始的时间计时,period时间过去后,检测上一个任务是否执行完毕,
     * 如果上一个任务执行完毕,则当前任务立即执行,如果上一个任务没有执行完毕,则需要等上一个任务执行完毕后立即执行。
   * @throws Exception 
     */
    public static void executeFixedRate() throws Exception {
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
        executor.scheduleAtFixedRate(
                new MyTask(),
                1,
                3000,
                TimeUnit.MILLISECONDS);
        
        Thread.sleep(20000);
        executor.shutdown();
    }

    /**
     * 周期任务 固定延时 是以上一个任务结束时开始计时,period时间过去后,立即执行。
     * @throws Exception 
     */
    public static void executeFixedDelay() throws Exception {
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
        executor.scheduleWithFixedDelay(
                new MyTask(),
                1,
                3000,
                TimeUnit.MILLISECONDS);
        
        Thread.sleep(20000);
        executor.shutdown();
    }    
}

class MyTask implements Runnable {
  public void run() {
    System.out.println("时间为:" + new Date());
    try {
      Thread.sleep(1000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    //System.out.println("时间为:" + new Date());
  }
}

7.3 Quartz

  • Quartz是一个较为完善的任务调度框架
  • 解决程序中Timer零散管理的问题
  • 功能更加强大
    • Timer执行周期任务,如果中间某一次有异常,整个任务终止执行
    • Quartz执行周期任务,如果中间某一次有异常,不影响下次任务执行
    • ……
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
// quartz/QuartzTest.java
package quartz;

import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.Trigger;
import org.quartz.impl.StdSchedulerFactory;

import static org.quartz.JobBuilder.newJob;
import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
import static org.quartz.TriggerBuilder.newTrigger;

public class QuartzTest {

    public static void main(String[] args) {
        try {
            //创建scheduler
            Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();

            //定义一个Trigger
            Trigger trigger = newTrigger().withIdentity("trigger1", "group1") //定义name/group
                    .startNow()//一旦加入scheduler,立即生效
                    .withSchedule(simpleSchedule() //使用SimpleTrigger
                            .withIntervalInSeconds(2) //每隔2秒执行一次
                            .repeatForever()) //一直执行
                    .build();

            //定义一个JobDetail
            JobDetail job = newJob(HelloJob.class) //定义Job类为HelloQuartz类
                    .withIdentity("job1", "group1") //定义name/group
                    .usingJobData("name", "quartz") //定义属性
                    .build();

            //加入这个调度
            scheduler.scheduleJob(job, trigger);

            //启动
            scheduler.start();

            //运行一段时间后关闭
            Thread.sleep(10000);
            scheduler.shutdown(true);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

// quartz/HelloJob.java
package quartz;

import org.quartz.Job;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;

import java.util.Date;

public class HelloJob implements Job {
    public void execute(JobExecutionContext context) throws JobExecutionException {
        JobDetail detail = context.getJobDetail();
        String name = detail.getJobDataMap().getString("name");
        System.out.println("hello from " + name + " at " + new Date());
    }
}
/**
 * 
 * 
 * hello from name at Tue Mar 01 01:34:57 CST 2022
 * hello from name at Tue Mar 01 01:34:59 CST 2022
 * hello from name at Tue Mar 01 01:35:01 CST 2022
 * hello from name at Tue Mar 01 01:35:03 CST 2022
 * hello from name at Tue Mar 01 01:35:05 CST 2022
 * 
 * 
 */