Java笔记_13
并发
创建线程
- 将线程任务放在一个类的
run()
中, 这个类需要实现Runnable
接口
1 | public interface Runnable { |
Runnable
是个函数式接口, 所以可以使用lambda
表达式创建实例
1 | Runnable r = () -> { |
- 从这个
Runnable
构造一个Thread
对象Thread t = new Thread(r);
- 启动线程
t.start();
- 还可以通过建立
Thread
类的子类定义线程
1 | class MyThread extends Thread { |
- 然后构造这个子类的对象并调用
start()
, 但是现在一般不用这个方法, 如果有多个任务, 每个任务都要创建一个线程的开销的太大, 一般使用线程池 - 不要调用
Thread
类或者Runnable
对象的run()
, 直接调用run()
会在同一个线程中执行这个任务, 而不会启动新的线程 - 应该调用
Thread.start()
创建一个新的线程执行run()
线程的状态
使用
getState()
可以确定当前的状态
New
新建Runnable
可运行Blocked
阻塞Waiting
等待Time waiting
计时等待Terminated
终止
New
- 使用
new
新建一个线程, 比如new Thread(r)
, 线程还没有开始运行
Runnable
- 调用
start()
后, 线程就是Runnable
状态, 可以是正在运行, 也可以没有在运行
Blocked
& Waiting
& Time waiting
- 处于阻塞或等待状态时, 线程不活动, 不执行任何代码, 消耗最少的资源
- 当一个线程试图获取一个内部的对象锁, 不是
java.util.concurrent.Lock
, 而这个锁目前被其他线程占有, 该线程就会阻塞; 当其他线程都释放了这个锁, 并且调度器允许该线程持有锁时, 该线程转换为非阻塞状态 - 当线程等待另一个线程通知调度器出现某个条件时, 线程进入等待状态, 阻塞状态和等待状态没有太大的区别. 调用
Object.wait(), Thread.join()
, 或者等待java.util.concurrent
中的Lock, Condition
时会进入等待状态 - 有几个方法有超时参数, 调用这些方法会让线程进入计时等待状态, 这个状态将会一直保持到计时器满, 或者接收到适当通知. 带有超时参数的方法有
Thread.sleep()
和计时版的Object.wait(), Thread.join(), Lock.tryLock(), Condition.await()
- 当一个线程试图获取一个内部的对象锁, 不是
Terminated
- 由于
run()
正常退出, 线程自然终止 - 因为一个没有捕获的异常终止
run()
, 线程意外终止 stop()
会抛出一个ThreadDeath
错误对象, 终止线程, 但是现在已经废弃不用
线程的属性
中断线程
- 当线程执行了最后一条语句, 或者抛出了一个没有捕获的异常, 线程就会终止
stop()
可以强制停止, 但是已经废弃, 现在没有方法可以强制停止一个线程- 可以使用
interrupt()
请求终止一个线程, 对一个线程调用interrupt()
, 设置线程为中断状态, 每个线程都会不时检测这个boolean
标志, 判断线程是否被中断了- 使用
Thread.currentThread().isInterrupted()
判断当前线程是否处于中断状态 - 如果线程被阻塞, 就无法查看中断状态, 需要引入
InterruptedException
- 在一个被
sleep(), wait()
阻塞的线程上调用interrupt()
, 那个阻塞调用将被InterruptedException
中断 - 如果在线程循环中调用了
sleep()
, 就没有必要使用isInterrupted()
检测了, 因为如果设置了中断状态,sleep()
也只会清除中断装填并直接抛出InterruptedException
, 所以还要循环调用了sleep()
, 可以直接try-catch``InterruptedException
- 使用
interrupted()
是一个静态方法, 检查当前线程是否被中断, 调用该方法会清除该线程中断状态isInterrupted()
是一个实例方法, 检查是否有线程被中断, 不会清除线程中断状态
- 如果
catch(InterruptedException e)
没有什么需要做的, 可以Thread.currentThread().interrupt()
设置中断状态, 或者直接不try-catch
, 而是throws InterruptedException
守护线程
t.setDaemon(true)
将一个线程转换为守护线程, 唯一的作用是为其他线程提供服务- 比如计时器线程, 或者清空过时缓存项的线程, 如果只剩下守护线程,
JVM
就会退出, 因为只有守护线程就没有必要运行程序了
线程名
t.setName("Name");
可以为任何线程设置一个名字
未捕获异常的处理器
- 线程的
run()
不能抛出任何检查型异常, 如果有非检查型异常则会导致线程终止, 最终线程死亡 - 对于可以传播的异常, 也没有
catch
子句, 因为在线程死亡之前, 异常会传递到一个用于处理未捕获异常的处理器 - 这个处理器必须属于实现了
Thread.UncaughtExceptionHelper
接口的类, 接口只有一个方法void uncaughtException(Thread t, Throwable e);
- 可以调用
setUncaughtExceptionHandler()
为任何线程设置处理器, 也可以使用静态方法Thread.setDefaultUncaughtExceptionHandler()
为所有线程安装一个默认处理器 - 如果没有安装默认处理器, 则为
null
, 如果没有为单个线程安装处理器, 则处理器就是该线程的ThreadGroup
对象 - 建议不要在自己的程序中使用线程组
ThreadGroup
类实现了Thread.UncaughtExceptionhandler
接口,uncaughtException()
执行以下操作- 如果该线程组有父线程组, 调用父线程组的
uncaughtExcpeiton()
- 否则, 如果
Thread.getDefaultUncaughtExceptionHandler()
返回一个非null
的处理器, 则调用该处理器 - 否则, 如果
Throwable
是ThreadDeath
的一个实例, 则什么都不做 - 否则, 将线程的名字以及
Throwable
的栈轨迹输出到System.err
- 如果该线程组有父线程组, 调用父线程组的
线程优先级
- 每个线程都有一个优先级, 默认一个线程会继承构造他的线程的优先级
- 可以使用
setPriority()
设置优先级,MIN_PRIORITY = 1
,MAX_PRIORITY = 10
,NORM_PRIORITY = 5
- 调度器选择新的线程时优先选择优先级高的线程, 优先级高度依赖于系统
- 早期优先级可能很有用, 现在不要使用线程优先级
同步
javap -c -p xxx
可以反编译xxx.class
, 查看虚拟机字节码- 两种机制可以防止并发访问一个代码块,
synchronized
关键字和Java 5
引入的ReentrantLock
类
1 | myLock.lock(); |
-
如果两个线程尝试访问同一个对象, 可以保证串行化访问; 如果两个线程访问不同的对象, 每个线程都会得到不同的锁对象, 两个线程都不会阻塞
- 这个锁称为重入锁, 因为线程可以反复获得已经拥有的锁, 锁持有一个计数器跟踪对
lock
方法的嵌套调用 ReentrantLock(boolean fail)
可以构造一个采用公平策略的锁, 但是公平锁比常规锁慢的多, 而且就算使用公平锁, 也不能保证就可以公平处理
- 这个锁称为重入锁, 因为线程可以反复获得已经拥有的锁, 锁持有一个计数器跟踪对
-
线程进入临界区以后发现需要满足某个条件才能继续执行, 可以使用条件对象(条件变量)管理那些已经获得锁但是不能有效工作的线程
1 | public void transfer(int from, int to, int amount) { |
signalAll()
会重新激活所有满足条件的线程, 从等待集中移出, 再次变为可运行状态await()
一般放在一个循环中while(!(OK is proceed)) condition.await()
- 最终都需要有一个其他线程调用
signalAll()
, 因为当一个线程调用await()
时, 没有办法自行激活- 如果没有别的线程调用
signalAll()
, 将永远阻塞, 导致死锁 - 只要一个对象状态有变化, 并且可能有利于正在等待的线程, 就可以调用
signalAll()
signal()
可以随机选择等待集的一个线程, 解除其阻塞状态, 比解除所有线程阻塞状态更加高效- 但是如果随机解除阻塞状态的线程发现自己仍然无法运行, 就会再次阻塞, 此时没有其他线程调用
signal()
以后就会导致死锁
- 如果没有别的线程调用
synchronized
-
锁用来保护代码段, 一次只允许一个线程执行被保护的代码段
-
锁可以用来管理试图进入被保护的代码段的线程
-
一个锁可以有一个或者多个关联的条件对象
-
每个条件对象管理那些已经进入被保护代码段, 但是还不能执行的线程
-
如果一个方法声明时有
synchronized
关键字, 则对象的锁将会保护整个方法, 所以如果需要调用这个方法, 则线程必须获得内部对象锁
1 | public synchronized void method() { |
-
内部对象锁只有一个关联条件,
wait()
将一个线程添加到等待集,notifyAll(), notify()
可以解除等待集线程阻塞 -
内部锁和条件存在一些限制
- 不能中断一个正在尝试获得锁的线程
- 不能指定尝试获得锁的线程超时时间
- 没有锁只有一个条件对象, 比较低效
同步块
1 | synchronized(obj) { |
监视器
- 监视器是指包含私有字段的类
- 监视器类的每个对象都有一个关联的锁
- 所有方法由这个锁锁定, 如果客户端调用
obj.method()
, 调用开始时就会自动获得obj
对象的锁, 并且在返回时自动释放这个锁- 因为所有的字段都是私有的, 这样就可以保证一个线程处理字段时, 其他线程都无法访问
- 锁可以有任意多个关联的条件
volatile
- 如果写一个对象, 这个对象接下来可能被另一个线程读取; 或者读一个对象, 这个对象可能已经被另一个线程写入, 就必须使用同步
volatile
关键字为实例字段的同步访问提供了一种免锁机制- 如果声明一个字段时
volatile
, 那编译器或虚拟机就会考虑这个字段可能被另一个线程并发更新
1 | private boolean done; |
final
变量
final var accounts = new HashMap<String, Double>()
声明为final
以后, 其他线程会在构造器完成构造以后才能看到这个accounts
- 如果不使用
final
, 不能保证其他线程看到的是accounts
更新以后的值, 可能都只是null
, 而不是信构造的HashMap
原子性
- 假设对共享变量除了赋值以外不做其他操作, 可以使用
volatile
java.util.concurrent.atomic
包中包含很多原子操作, 使用了机器指令, 没使用锁AtomicInteger
类中包含incrementAndGet, decrementAndGet
以原子方式对一个整数完成自增自减
- 如果有大量的线程要访问相同的原子值, 性能会大幅度下降, 因为乐观更新需要叫多次重试
LongAdder
和LongAccumulator
类解决了这个问题
stop()
和 suspend()
stop()
方法就不安全, 会终止所有未完成的方法, 包括run()
- 一个线程终止时, 会立即释放被它锁定的对象的锁, 导致对象处于不一致状态
suspend()
不会破坏对象, 但是如果用来挂起一个持有锁的线程, 在这个线程恢复运行之前这个锁不可用- 如果调用
suspend()
的线程试图获得同一个锁, 那么就会导致死锁问题
- 如果调用
按需初始化
JVM
会在第一次使用类时初始化一个静态初始化器, 并且只会执行一次JVM
利用一个锁来确保这一点, 但是需要确保构造器不会抛出任何异常
线程局部变量
-
如果一个类中设置静态变量
1
2
3
4
5
6
7public static final SimpleDateFormat dataFormat = new SimpleDateFormat("yyyy-mm-dd");
// 如果此时两个线程都执行如下操作
String dataStamp = dataFormat.format(new Date());
// 则dataFormat的内部结构可能会被破坏
public static final ThreadLocal<SimpleDateFormat> dataFormat = ThreadLocal.withInitial() -> new SimpleDateFormat("yyyy-mm-dd");
// 如果需要格式化方法, 可以调用
String dataStamp = dataFormat.get().format(new Date()); -
java.util.Random
是线程安全的, 但是如果多个线程需要等待一个共享随机数生成器, 就很低效1
2int random = ThreadLocalRandom.current().nextInt(upperBound);
// ThreadLocalRandom.current()会返回当前线程的一个随机数实例 -
如果需要共享一个数据库连接
1
2
3
4
5public static final ThreadLocal<Connection> connection = ThreadLocal.withInitial(() -> null);
// 任务开始时可以初始化这个连接
connection.set(connect(url, username, password));
// 任务调用某些方法, 所有方法都在一个线程, 其中一个方法需要这个连接
var result = connection.get().executeQuery(query); -
上述都要求只有一个任务使用线程, 如果是一个线程池执行任务, 可能不想共享相同线程的其他任务提供数据库连接, 就不能使用上述方法
线程安全的集合
- 有的并发散列表映射较大, 使用
size()
返回int
类型, 如果超过了20亿则无法正常返回, 可以使用mappingCount()
方法返回long
类型数据 - 集合返回弱一致性迭代器, 表示迭代器不一定能够反映出构造之后所做的全部更改, 但是他们不会将同一个值返回两次, 也不会抛出
ConcurrentModificationException
java.util
包中的集合, 如果集合在迭代器构造之后发生改变, 将会抛出一个``ConcurrentModificationException`ConcurrentHashMap
不允许有null
, 如果传入compute, merge
的函数返回null
, 就会从映射中删除现有的条目
并发散列映射的批操作
-
批操作会遍历映射映射, 处理遍历过程中找到的元素, 不会冻结映射的当前快照
- 搜索
search
: 为每个键或值应用一个函数, 直到函数生成一个非null
的结果, 然后函数终止, 返回这个结果 - 规约
reduce
: 组合所有的键值, 这里要使用所提供的一个累加函数 forEach
为所有键值应用一个函数
- 搜索
-
所有操作都需要指定一个参数化阈值, 如果希望批操作在一个线程中运行, 可以使用
Long.MAX_VALUE
, 如果希望用尽可能多的线程运行批操作, 可以使用阈值1 -
希望找到出现次数超过1000次的单词:
1
2String res = map.search(threshold, (k, v) -> v > 1000 ? k : null);
// res最终是第一个匹配的单词, 如果所有单词都不匹配, 则res是null -
forEach
有两种形式, 第一种是为每个条目应用一个消费者函数1
map.forEach(threshold, (k, v) -> System.out.println(k + "->" + v));
-
第二种形式接受一个额外的转换器, 先应用转换器, 再传递到消费者函数
-
比如只打印很大的条目:
1
2
3map.forEach(threshold,
(k, v) -> v > 1000 ? k + "->" + v : null,
System.out::prinln); -
reduce
操作用一个累加函数组合输入, 比如计算所有值的总和Long sum = map.reduceValues(threshold, Long::sum);
-
同样可以使用一个转换器函数, 比如计算最长的键的长度
Integer mLen = map.reduceKeys(threshold, String::length, Integer::max);
-
CopyOnWriteArrayList
和CopyOnWriteArraySet
是线程安全的集合, 所有更改器会建立底层数组的副本- 如果迭代访问集合的线程数超过更改集合的线程数, 这个更改就很有用
- 构造一个迭代器, 包含对当前数组的引用, 如果这个数组后来被更改了, 迭代器仍然会引用原来的数组, 尽管集合的数组已经被替换了, 所以迭代器可以访问一致, 但是过时的视图, 并不存在同步开销
并行数组算法
-
Arrays
提供了大量的并行化操作, 比如Arrays.parallelSort()
可以对一个基本类型或对象数组排序- 对对象数组排序, 可以提供一个
Comparator
数组Arrays.parallelSort(words, Comparator.comparing(String::length));
- 对对象数组排序, 可以提供一个
-
任何集合类都可以使用同步包装器变成线程安全的
1
2List<E> synchArrayList = Collections.synchronizedList(new ArrayList<E>());
Map<K, V> synchHashMap = Collections.synchronizedMap(New HashMap<K, V>());- 得到的集合方法会使用一个锁加以保护
- 如果希望迭代访问一个集合, 同时另一个线程仍然可能修改这个集合, 就要使用客户端锁定
1
2
3
4
5
6synchronized(synchHashMap) {
Iterator<K> iter = synchHashMap.keySet().iterator();
while(iter.hasNext()) {
...
}
}
任务和线程池
- 如果程序中使用了大量生命周期很短的线程, 不能将每个任务映射到一个单独的线程, 而是应该使用一个线程池
Callable
, Future
Runnable
封装了一个异步运行任务, 可以想象成一个没有参数和返回值的异步方法Callable
与Runnable
相似, 只是有返回值,Callable
是一个参数化接口, 只有一个方法call
1
2
3public interface Callable<V> {
V call() throws Exception;
}Future
可以保存异步计算的结果, 可以启动一个计算, 将Future
对象交给某个方法, 然后忘掉他- 计算得到结果的时候,
Future
对象的所有者就会得到这个结果
1 | // Future接口具有下面的方法 |
-
取消一个任务涉及两个步骤, 找到并中断底层线程,
call()
方法中必须感知到中断, 并放弃工作 -
可以使用
FutureTask
执行Callable
, 实现了Future
和Runnable
接口 -
也可以将一个
Callable
传递到执行器来执行 -
newCachedThreadPool()
构造一个线程池, 立即执行各种任务, 如果有空线程可以使用, 就使用空线程, 如果没有就创建 -
newFixedThreadPool()
构造一个大小固定的线程池, 如果提交任务数大于空线程数, 没有得到服务的任务就放到队列中 -
newSingleThreadPool()
退化的大小为1的线程池, 顺序执行所提交的任务 -
上述三个方法返回一个实现了
ExecutorService
接口的ThreadPoolExecutor
类的对象 -
如果线程生存期很短, 或者大量时间都在阻塞, 可以使用一个缓存线程池
-
为了得到最优的运行速度, 并发线程数等于处理器内核个数, 这种情况应该使用固定线程池, 这样并发线程总数会有一个上限
-
单线程执行器对性能测试有帮助, 可以临时使用一个单线程池替换固定线程池, 测试不并发的情况下性能降低的量
-
使用线程池时所做的工作:
- 调用
Executors
类的静态方法newCachedTreadPool
或newFixedThreadPool
- 调用
submit
提交Runnable
或Callable
对象 - 保留返回的
Future
对象, 以便得到结果或者取消任务 - 不想再提交任务时可以调用
shutdown
- 调用
-
invokeAny
提交一个Callable
对象集合中的所有对象, 并返回某一个已经完成任务的结果, 不知道会返回哪个, 一般都是速度最快的 -
对于搜索问题, 可以使用这个方法
-
invokeAll
提交一个Callable
对象集合中的所有对象, 方法阻塞, 直到所有任务都完成了, 并返回一个Future
对象列表, 包含所有答案1
2
3
4
5
6List<Callable<T>> tasks = ...;
List<Future<T>> res = executor.invokeAll(tasks);
for (Future<T> r: res) {
processFuture(r.get());
}
// get()方法会阻塞, 直到获得了一个结果 -
可以使用
ExecutorCompletionService
管理, 将任务提交到这个完成服务中, 服务会管理一个Future
对象的阻塞队列\1
2
3
4
5var service = new ExecutorCompletionService<T>(executor);
for (Callable<T> task: tasks) service.submit(task);
for (int i = 0; i < tasks.size(); i ++) {
processFuture(service.task().get());
} -
有的应用使用大量线程, 但是大部分是空闲的
- 比如服务器为每个连接使用一个线程;
- 或者处理器内核使用一个线程执行计算密集型任务, 比如图像或者视频处理
Java 7
引入了fork-join
框架支持后一类应用- 比如一个任务可以分解为两个子任务分别计算, 需要扩展
Recursive<T>
的类, 或者扩展RecursiveAction
的类 - 前者生成一个
T
结果, 后者不生成结果, 再覆盖compute()
生成并调用子任务, 合并结果
1 | class Counter extends Recuresive<T> { |
- 后台中,
fork-join
框架使用了工作密取的启发式方法平衡可用线程的工作负载, 每个工作线程都有任务的一个双端队列- 一个工作线程将子任务压入双端队列的队头, 只有一个线程可以访问队头, 所以不需要加锁
- 一个工作线程空闲时, 会从另一个双端队列的队尾密取一个任务, 由于大的子任务都在队尾, 这种密取很少见
fork-join
是对非阻塞任务进行负载优化的, 对于阻塞任务就失效了, 需要使用ForkJoinPool.ManagedBlocker
接口解决这个问题
异步计算
可完成Future
- 如果有一个
Future
对象, 需要调用get()
获得值, 方法会阻塞, 直到值可以使用 CompletableFuture
类实现了Future
接口, 可以注册一个回调, 一旦结果可用, 就会在某个线程中利用该结果调用这个回调- 采用这种方法, 一旦结果可用就可以对结果进行处理, 而不需要阻塞
1 | public CompletableFuture<String> readPage(Url url) { |
进程
- 有时候需要执行另一个程序, 可以使用
ProcessBuilder, Process
类Process
类在单一操作系统进程中执行一个命令ProcessBuilder
类允许配置Process
对象, 可以取代Runtime.exec
调用
创建进程
- 指定需要执行的命令, 或者传入一个
List<String>
var builder = new ProcessBuilder("gcc", "myapp.c");
- 可以使用
directory
改变工作目录
builder = builder.directory(path.toFile());
- 然后需要指定处理进程的标准输入, 输出, 错误流, 默认情况分别是一个管道
1
2
3OutputStream processIn = p.getOutputStream();
InputStream processOut = p.getInputStream();
InputStream processErr = p.getErrorStream(); - 如果希望使用管道将一个进程的输出作为另一个进程的输入, 可以使用
Java 9
提供的startpipeline()
进程句柄
- 可以用四种方法得到一个
ProcessHandler
- 给定一个
Process
对象p
,p.toHandler()
会生成他的ProcessHandler
- 给定一个
Long
类型的进程ID
,ProcessHandler.of(ID)
可以生成这个进程的句柄 Process.current()
是运行这个JVM
的进程句柄ProcessHandler.allProcesses()
可以生成对当前进程可见的所有操作系统进程的Stream<ProcessHandler>