Java并发包常用类用法及原理

com.java.util.concurrent包是java5时添加的,专门处理多线程提供的工具类

一、Atomic

二、Lock

三、BlockingQueue

四、BlockDeque

五、ConcurrnetMap

六、CountDownLatch

七、CyclicBarrier

八、ExecutorService

九、CopyOnWriteList

十、ThreadLocal

1.atomic包

AtomicBoolean、AtomicInteger、AtomicLong、AtomicReference
类提供多种方法,可以原子性地为参数取值、赋值、交换值(getAndSet)、比较并且设置值(CAS:compareAndSet)(失败就重试,直到没有冲突为止)等。

为什么需要使用atomic?
先说几个概念:

  • 重排序

    Java优化程序性能,在编译、处理和内存中对代码进行重排序,重排序是对代码的执行顺序做了修改。

  • happens-before

    规定在编译、处理、内存中对代码执行重排序的规则。

  • as-if-serial语义

    规定单线程中重排序和顺序执行的结果一致。

至于为什么需要在多线程使用atomic,有以下几点原因:

1.多个线程不能保证哪个线程先执行。因为不可见主内存值的问题,可能出现脏读的情况。

2.多线程赋值过程非原子性。因为变量在多线程中,修改一个主内存中的值,需要执行多个步骤(读取主内存,放入寄存器,修改值、赋值到主内存中),这么一来可能你在执行这个步骤的过程中,该变量被其他线程修改了,不能保证原子性。

3.使用volatile修饰变量。volatile是变量具有可见性(当寄存器中修改了值,会立即通知主内存和其他寄存器修改值)和有序性,查看AtomicInteger的源码,发现变量被volatile修饰。而原子性是Atomic中使用CAS(修改前判断主内存的值是否和当前的值一致)实现的,可见性就解决了上面问题1的脏读,有序性和原子性就解决了问题2中的问题。

1
2
3
4
5
private volatile int value;

public AtomicInteger(int initialValue) {
value = initialValue;
}

2.locks

  • ReentrantLock

    • 可重入的互斥锁,即同一线程可以多次获得该锁,线程间是互斥的。使用CAS+CLH(双向链表)实现
  • ReentrantReadWriteLock

    • 可重入的读写锁,是在ReentrantLock的基础上的增强,更细粒度地控制。在特殊场景中会使用到,分为readLock和writeLock,读读共享,读写和写写排他。
  • StampLock

    • 读写并发锁,适用于读远远大于写的场景。

Synchronized加锁实现原理:

Synchronized经过编译,会在同步块的前后分别形成monitorenter和monitorexit这个两个字节码指令。在执行monitorenter指令时,首先要尝试获取对象锁。如果这个对象没被锁定,或者当前线程已经拥有了那个对象锁,把锁的计算器加1,相应的,在执行monitorexit指令时会将锁计算器就减1,当计算器为0时,锁就被释放了。如果获取对象锁失败,那当前线程就要阻塞,直到对象锁被另一个线程释放为止。

Synchronized和ReentrantLock的区别联系:

  • 相同点:
    都是加锁实现阻塞式的同步,一个线程获取了锁,其他线程就必须等待。

  • 不同点:

    • 使用上。Synchronized直接使用关键字Synchronized,ReentrantLock需要实例化,并且显示地调用lock()加锁和在finally方法块中unlock()解锁。

    • 等待可中断。ReentrantLock可以使用lockInterruptibly()方法中断锁或者设置超时中断。

    • 公平锁。Synchronized是非公平锁,ReentrantLock默认也是非公平锁,可以指定为公平锁。

    • 使用Condition条件,实现线程之间的协作。

      在Condition中,用await()替换wait(),用signal()替换notify(),用signalAll()替换notifyAll(),传统线程的通信方式,Condition都可以实现,这里注意,Condition是被绑定到Lock上的,要创建一个Lock的Condition必须用newCondition()方法。

3.BlockingQueue

提供了不同的插入移除检查方法,可以支持不同的返回值。

抛异常 特定值 阻塞 超时
插入 add(o) offer(o) put(o) offer(o, timeout, timeunit)
移除 remove(o) poll(o) take(o) poll(timeout, timeunit)
检查 get(o) peek(o) / /

阻塞队列值提供一个队列可以提供遵守FIFO放入和取出的操作,如果队列满了放入就会阻塞,相反队列如果为空,取出就会阻塞。

BlockingQueue是一个接口类,具体有多种实现,主要介绍5种常用的:

  • ArrayBlockingQueue

    数组阻塞队列,故名思议是用数组实现的阻塞队列,是有界的,只能在初始化确定队列容量大小。内部只有一个reentrantLock,读和写使用同一个锁,因此效率不高。

  • LinkedBlockingQueue

    链表阻塞队列,顾名思义是用链表实现的阻塞队列,但它可以是有界的也可以无界的,内部有两个reentrantLock,读写锁是分离的。性能要比ArrayBlockingQueue要高。但创建和销毁Node,高并发对GC有一定压力。

1
2
3
4
5
6
7
8
9
10
//默认的构造器
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
//指定容量的构造器
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
  • PriorityBlockingQueue

    优先级阻塞队列。基于最小二叉堆实现,线程安全的无界队列。构造器中可以传入初始值和比较器的规则。根据比较器规则对内部元素排序。

  • SynchronousQueue

    同步队列。内部只能存放一个元素。如果满了就插入就阻塞,相反如果为空取出就阻塞。

  • DelayQueue

    延迟队列无界队列。内部使用优先级阻塞队列实现,只有元素过期才能取出来。并且按过期长短排序,队头的是过期最长的元素。使用ReentrantLock实现线程安全。

4.BlockDeque

提供了不同的插入移除检查方法,可以支持不同的返回值。

抛异常 特定值 阻塞 超时
插入 addFirst(o) offerFirst(o) putFirst(o) offerFirst(o, timeout, timeunit)
移除 removeFirst(o) pollFirst(o) takeFirst(o) pollFirst(timeout, timeunit)
检查 getFirst(o) peekFirst(o) / /
  • LinkedBlockingDeque

    双端链式阻塞队列。默认是无界的,也可以指定容量。该阻塞队列同时支持FIFO和FILO两种操作方式,队头和队尾都可以执行插入取出的操作。使用一把锁+两个条件维持队列的同步,和ArrayBlockingQueue的原理一样。

5.ConcurrentMap

支持并发操作的Map。

  • ConcurrentHashMap 是ConcurrentMap的具体实现。

    1.发展。JDK1.7及之前都是使用Segment分段锁来实现的,因为Segment数量会限制并发量,而且在寻址也会执行两次hash,JDK1.8后取消Segment改为数组+链表+红黑树和CAS原子操作+synchronized实现。

    2.初始化参数。

    • initialCapacity初始化Map的容量
    • loadFactor负载因子
    • concurrencyLevel是最好情况下可以达到的并发数(如果都访问的不同的Segment上)。Segment的个数是大于等于的第一个2的n次方的数,即设置15。即Segment = concurrencyLevel = 24 = 16。默认情况下,initialCapacity等于16,loadFactor等于0.75,concurrencyLevel等于16.

    3.关于锁

    • 1.7
      • Get没有加锁,因为Map中的key,value,nextHashEntry都是使用volatile修饰符修饰,多线程具有可见行。但是会进行两次Hash()方法寻址,第一次确定Segment位置,第二次确定table数组中位置。
      • Put使用的分段锁继承来ReentrantLock实现可重入锁。
    • 1.8
      • Get方法同1.7相似都是没有加锁,一次hash寻址。
      • Put方法。使用CAS无锁机制,仅在Hash冲突时候加了synchronized同步锁。

4.扩容
数组容量增加一倍,并迁移链表中的数据

  • ConcurrentSkipListMap
    • 使用跳表skipList实现,可以支持排序,对应非线程安全的TreeMap是使用红黑树实现的。ConcurrentSkipListMap适用于高并发的写操作(千万级),因为它锁住的节点少,相对于红黑树平衡造成的锁竞争,ConcurrentSkipListMap效率更高。

6.CountDownLatch

倒计时控制器(自己起的名字)。因为他类似于一个倒计时启动的功能。
初始化指定倒计时的值CountDownLatch latch = new CountDownLatch(3)并使用latch.await()等待执行,当其他其他线程调用3次latch.countDown()就触发主线程继续。

7.CyclicBarrier

栅栏。允许定义N个线程到达栅栏才执行某个方法。

1
2
//创建一个栅栏,这里设置2个线程都执行barrier1.await()方法后可以执行barrier1Action方法
CyclicBarrier barrier1 = new CyclicBarrier(2, barrier1Action);

8.ExecutorService
线程池服务接口,有两种具体的实现方式

  • ThreadPoolExecutor
序号 名称 类型 含义
1 corePoolSize int 核心线程池大小
2 maximumPoolSize int 最大线程池大小
3 keepAliveTime long 线程最大空闲时间
4 unit TimeUnit 时间单位
5 workQueue BlockingQueue 线程等待队列
6 threadFactory ThreadFactory 线程创建工厂
7 handler RejectedExecutionHandler 拒绝策略

实际上Executors类使用上述参数为他提供了多种预定义的实现。

简单介绍几种预定义实现:

1.FixedThreadPool:可以指定固定数量的核心线程,但是队列使用LinkedBlockingQueue是无界的,可能导致内存溢出。

2.CachedThreadPool:不限制线程的个数,要设置线程生存的周期,超过这个时间没有使用将自动回收线程。但是队列使用的是SynchronousQueue入队时必须出队。因为这些特性,该线程池应该用于类似于Netty中的短连接,快速处理大量耗时短的任务。

3.newSingleThreadExecutor:只创建一个线程,但是队列使用LinkedBlockingQueue无界队列。

  • ScheduledThreadPoolExecutor

    继承了ThreadPoolExecutor,可以设置核心和最大线程的大小,使用DelayedWorkQueue延迟队列。

  • ForkJoinPool

    实现了Executor接口,支持将一个大任务分为若干个子任务交给子线程处理,然后合并为一个结果集。采用了分治和递归的思想。内部维护了多个队列。
    (挖坑以后用到了再详细写)

9.CopyOnWriteList

CopyOnWriteList是并发场景下的List容器,适用于读远大于写的场景。相对于Vector的线程安全List,Vector所有方法上都有Synchronized同步锁,会造成大量的锁竞争。CopyOnWriteList使用读写分离的机制,它实现了无锁并发读,写操作加锁,发生在新的副本上,写完成后将原容器指向副本。

10.ThreadLocal

ThreadLocal用于处理同一线程数据共享的操作类。目的减少参数传递,和不同线程之间的数据隔离。
原理:内部使用静态的ThreadLocalMap对象存放元素,同一线程使用同一个ThreadLocalMap,key是ThreadLocal对象,value是存放的值。
ThreadLocalMap使用Entry数组实现,是一个弱引用对象,当线程被销毁时候ThreadLocalMap也会被回收。

1
2
3
4
5
6
7
8
9
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;

Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}