com.java.util.concurrent包是java5时添加的,专门处理多线程提供的工具类
一、Atomic
二、Lock
三、BlockingQueue
四、BlockDeque
五、ConcurrnetMap
六、CountDownLatch
七、CyclicBarrier
八、ExecutorService
九、CopyOnWriteList
十、ThreadLocal
AtomicBoolean、AtomicInteger、AtomicLong、AtomicReference
类提供多种方法,可以原子性地为参数取值、赋值、交换值(getAndSet)、比较并且设置值(CAS:compareAndSet)(失败就重试,直到没有冲突为止)等。
为什么需要使用atomic?
先说几个概念:
- 重排序
Java优化程序性能,在编译、处理和内存中对代码进行重排序,重排序是对代码的执行顺序做了修改。
- happens-before
规定在编译、处理、内存中对代码执行重排序的规则。
- as-if-serial语义
规定单线程中重排序和顺序执行的结果一致。
至于为什么需要在多线程使用atomic,有以下几点原因:
1.多个线程不能保证哪个线程先执行。因为不可见主内存值的问题,可能出现脏读的情况。
2.多线程赋值过程非原子性。因为变量在多线程中,修改一个主内存中的值,需要执行多个步骤(读取主内存,放入寄存器,修改值、赋值到主内存中),这么一来可能你在执行这个步骤的过程中,该变量被其他线程修改了,不能保证原子性。
3.使用volatile修饰变量。volatile是变量具有可见性(当寄存器中修改了值,会立即通知主内存和其他寄存器修改值)和有序性,查看AtomicInteger的源码,发现变量被volatile修饰。而原子性是Atomic中使用CAS(修改前判断主内存的值是否和当前的值一致)实现的,可见性就解决了上面问题1的脏读,有序性和原子性就解决了问题2中的问题。
1 | private volatile int value; |
ReentrantLock
- 可重入的互斥锁,即同一线程可以多次获得该锁,线程间是互斥的。使用CAS+CLH(双向链表)实现
ReentrantReadWriteLock
- 可重入的读写锁,是在ReentrantLock的基础上的增强,更细粒度地控制。在特殊场景中会使用到,分为readLock和writeLock,读读共享,读写和写写排他。
StampLock
- 读写并发锁,适用于读远远大于写的场景。
Synchronized加锁实现原理:
Synchronized经过编译,会在同步块的前后分别形成monitorenter和monitorexit这个两个字节码指令。在执行monitorenter指令时,首先要尝试获取对象锁。如果这个对象没被锁定,或者当前线程已经拥有了那个对象锁,把锁的计算器加1,相应的,在执行monitorexit指令时会将锁计算器就减1,当计算器为0时,锁就被释放了。如果获取对象锁失败,那当前线程就要阻塞,直到对象锁被另一个线程释放为止。
Synchronized和ReentrantLock的区别联系:
相同点:
都是加锁实现阻塞式的同步,一个线程获取了锁,其他线程就必须等待。不同点:
使用上。Synchronized直接使用关键字Synchronized,ReentrantLock需要实例化,并且显示地调用lock()加锁和在finally方法块中unlock()解锁。
等待可中断。ReentrantLock可以使用lockInterruptibly()方法中断锁或者设置超时中断。
公平锁。Synchronized是非公平锁,ReentrantLock默认也是非公平锁,可以指定为公平锁。
使用Condition条件,实现线程之间的协作。
在Condition中,用await()替换wait(),用signal()替换notify(),用signalAll()替换notifyAll(),传统线程的通信方式,Condition都可以实现,这里注意,Condition是被绑定到Lock上的,要创建一个Lock的Condition必须用newCondition()方法。
提供了不同的插入移除检查方法,可以支持不同的返回值。
抛异常 | 特定值 | 阻塞 | 超时 | |
---|---|---|---|---|
插入 | add(o) | offer(o) | put(o) | offer(o, timeout, timeunit) |
移除 | remove(o) | poll(o) | take(o) | poll(timeout, timeunit) |
检查 | get(o) | peek(o) | / | / |
阻塞队列值提供一个队列可以提供遵守FIFO放入和取出的操作,如果队列满了放入就会阻塞,相反队列如果为空,取出就会阻塞。
BlockingQueue是一个接口类,具体有多种实现,主要介绍5种常用的:
ArrayBlockingQueue
数组阻塞队列,故名思议是用数组实现的阻塞队列,是有界的,只能在初始化确定队列容量大小。内部只有一个reentrantLock,读和写使用同一个锁,因此效率不高。
LinkedBlockingQueue
链表阻塞队列,顾名思义是用链表实现的阻塞队列,但它可以是有界的也可以无界的,内部有两个reentrantLock,读写锁是分离的。性能要比ArrayBlockingQueue要高。但创建和销毁Node,高并发对GC有一定压力。
1 | //默认的构造器 |
PriorityBlockingQueue
优先级阻塞队列。基于最小二叉堆实现,线程安全的无界队列。构造器中可以传入初始值和比较器的规则。根据比较器规则对内部元素排序。
SynchronousQueue
同步队列。内部只能存放一个元素。如果满了就插入就阻塞,相反如果为空取出就阻塞。
DelayQueue
延迟队列无界队列。内部使用优先级阻塞队列实现,只有元素过期才能取出来。并且按过期长短排序,队头的是过期最长的元素。使用ReentrantLock实现线程安全。
提供了不同的插入移除检查方法,可以支持不同的返回值。
抛异常 | 特定值 | 阻塞 | 超时 | |
---|---|---|---|---|
插入 | addFirst(o) | offerFirst(o) | putFirst(o) | offerFirst(o, timeout, timeunit) |
移除 | removeFirst(o) | pollFirst(o) | takeFirst(o) | pollFirst(timeout, timeunit) |
检查 | getFirst(o) | peekFirst(o) | / | / |
- LinkedBlockingDeque
双端链式阻塞队列。默认是无界的,也可以指定容量。该阻塞队列同时支持FIFO和FILO两种操作方式,队头和队尾都可以执行插入取出的操作。使用一把锁+两个条件维持队列的同步,和ArrayBlockingQueue的原理一样。
支持并发操作的Map。
- ConcurrentHashMap 是ConcurrentMap的具体实现。
1.发展。JDK1.7及之前都是使用Segment分段锁来实现的,因为Segment数量会限制并发量,而且在寻址也会执行两次hash,JDK1.8后取消Segment改为数组+链表+红黑树和CAS原子操作+synchronized实现。
2.初始化参数。
- initialCapacity初始化Map的容量
- loadFactor负载因子
- concurrencyLevel是最好情况下可以达到的并发数
(如果都访问的不同的Segment上)。Segment的个数是大于等于的第一个2的n次方的数,即设置15。即Segment = concurrencyLevel = 24 = 16。默认情况下,initialCapacity等于16,loadFactor等于0.75,concurrencyLevel等于16.
3.关于锁
- 1.7
- Get没有加锁,因为Map中的key,value,nextHashEntry都是使用volatile修饰符修饰,多线程具有可见行。但是会进行两次Hash()方法寻址,第一次确定Segment位置,第二次确定table数组中位置。
- Put使用的分段锁继承来ReentrantLock实现可重入锁。
- 1.8
- Get方法同1.7相似都是没有加锁,一次hash寻址。
- Put方法。使用CAS无锁机制,仅在Hash冲突时候加了synchronized同步锁。
4.扩容
数组容量增加一倍,并迁移链表中的数据
- ConcurrentSkipListMap
- 使用跳表skipList实现,可以支持排序,对应非线程安全的TreeMap是使用红黑树实现的。ConcurrentSkipListMap适用于高并发的写操作(千万级),因为它锁住的节点少,相对于红黑树平衡造成的锁竞争,ConcurrentSkipListMap效率更高。
倒计时控制器(
自己起的名字)。因为他类似于一个倒计时启动的功能。
初始化指定倒计时的值CountDownLatch latch = new CountDownLatch(3)
并使用latch.await()
等待执行,当其他其他线程调用3次latch.countDown()
就触发主线程继续。
栅栏。允许定义N个线程到达栅栏才执行某个方法。
1 | //创建一个栅栏,这里设置2个线程都执行barrier1.await()方法后可以执行barrier1Action方法 |
8.ExecutorService
线程池服务接口,有两种具体的实现方式
- ThreadPoolExecutor
序号 | 名称 | 类型 | 含义 |
---|---|---|---|
1 | corePoolSize | int | 核心线程池大小 |
2 | maximumPoolSize | int | 最大线程池大小 |
3 | keepAliveTime | long | 线程最大空闲时间 |
4 | unit | TimeUnit | 时间单位 |
5 | workQueue | BlockingQueue |
线程等待队列 |
6 | threadFactory | ThreadFactory | 线程创建工厂 |
7 | handler | RejectedExecutionHandler | 拒绝策略 |
实际上Executors类使用上述参数为他提供了多种预定义的实现。
简单介绍几种预定义实现:
1.FixedThreadPool:可以指定固定数量的核心线程,但是队列使用
LinkedBlockingQueue
是无界的,可能导致内存溢出。2.CachedThreadPool:不限制线程的个数,要设置线程生存的周期,超过这个时间没有使用将自动回收线程。但是队列使用的是
SynchronousQueue
入队时必须出队。因为这些特性,该线程池应该用于类似于Netty中的短连接,快速处理大量耗时短的任务。3.newSingleThreadExecutor:只创建一个线程,但是队列使用
LinkedBlockingQueue
无界队列。
ScheduledThreadPoolExecutor
继承了ThreadPoolExecutor,可以设置核心和最大线程的大小,使用
DelayedWorkQueue
延迟队列。ForkJoinPool
实现了Executor接口,支持将一个大任务分为若干个子任务交给子线程处理,然后合并为一个结果集。采用了分治和递归的思想。内部维护了多个队列。
(挖坑以后用到了再详细写)
CopyOnWriteList是并发场景下的List容器,适用于读远大于写的场景。相对于Vector的线程安全List,Vector所有方法上都有Synchronized同步锁,会造成大量的锁竞争。CopyOnWriteList使用读写分离的机制,它实现了无锁并发读,写操作加锁,发生在新的副本上,写完成后将原容器指向副本。
ThreadLocal用于处理同一线程数据共享的操作类。目的减少参数传递,和不同线程之间的数据隔离。
原理:内部使用静态的ThreadLocalMap对象存放元素,同一线程使用同一个ThreadLocalMap,key是ThreadLocal对象,value是存放的值。
ThreadLocalMap使用Entry数组实现,是一个弱引用对象,当线程被销毁时候ThreadLocalMap也会被回收。
1 | static class Entry extends WeakReference<ThreadLocal<?>> { |