Java并发数据结构收藏

Posted by 夏敏的博客 on June 12, 2017

CountDownLatch - 同步工具类

CountDownLatch这个类能够使一个线程等待其他线程完成各自的工作后再执行。例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有的框架服务之后再执行。或者希望若发生了获取某个变量,必须等某异步线程做完了才能获取到, 否则阻塞.

方法

  // 初始化
  private CountDownLatch mReplySequenceLatch = new CountDownLatch(1);
  // 计数-1
  mReplySequenceLatch.countDown();
  // 等待
  mReplySequenceLatch.await();

CountDownLatch正常就用上面几个方法,一个初始化等待的线程数,然后每个线程运行结束时countDown一下, 在需要阻塞的地方await即可.

我们举个真实用例,来自Google zxing的源码.

final class DecodeThread extends Thread {

  public static final String BARCODE_BITMAP = "barcode_bitmap";

  private final CaptureActivity activity;
  private final Hashtable<DecodeHintType,Object> hints;
  private Handler handler;
  private final CountDownLatch handlerInitLatch;

  DecodeThread(CaptureActivity activity,
               String characterSet,
               ResultPointCallback resultPointCallback) {

    this.activity = activity;
    handlerInitLatch = new CountDownLatch(1);

    hints = new Hashtable<DecodeHintType,Object>();
    Vector<BarcodeFormat> formats = new Vector<BarcodeFormat>();
    formats.add(BarcodeFormat.QR_CODE);
    hints.put(DecodeHintType.POSSIBLE_FORMATS, formats);

    if (characterSet != null) {
      hints.put(DecodeHintType.CHARACTER_SET, characterSet);
    }
    hints.put(DecodeHintType.NEED_RESULT_POINT_CALLBACK, resultPointCallback);
  }

  // 若已经countdown过了,则直接return
  Handler getHandler() {
    try {
      handlerInitLatch.await();                         
    } catch (InterruptedException ie) {
      // continue?
    }
    return handler;
  }

  @Override
  public void run() {
    Looper.prepare();
    handler = new DecodeHandler(activity, hints);
    //
    handlerInitLatch.countDown();                        
    Looper.loop();
  }
}

上述使用CountDownLatch做到了当该类被实类化之后,直接去getHandler是会阻塞的.只有等到运行了,变量准备好了,获取方法才会返回,这保证了get肯定能拿到对象,拿不到的情况就在等待.有点类似阻塞单列模式的味道.

关于更多CountDownLatch


BlockingQueue-阻塞队列

来自concurrent包, 多线程编程时经常用到,尤其是任务分配,或者生产者消费者这种类型的.

BlockingQueue的核心方法:

  • 放入数据:
      offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,     则返回true,否则返回false.(本方法不阻塞当前执行方法的线程)
      offer(E o, long timeout, TimeUnit unit),可以设定等待的时间,如果在指定的时间内,还不能往队列中     加入BlockingQueue,则返回失败。
      put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断     直到BlockingQueue里面有空间再继续.

  • 获取数据:
      poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,     取不到时返回null;
      poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,     队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。
      take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到     BlockingQueue有新的数据被加入;
      drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),     通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。

常用:

  • ArrayBlockingQueue

  • LinkedBlockingQueue
    于链表的阻塞队列,同ArrayListBlockingQueue类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞生产者队列

  • PriorityBlockingQueue
    基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定),但需要注意的是PriorityBlockingQueue并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。


ConcurrentLinkedQueue-非阻塞队列

BlockingQueue系列对应的是主要是同步操作,是阻塞的,而ConcurrentLinkedQueue是非阻塞的,Queue中元素按FIFO原则进行排序.采用CAS操作,来保证元素的一致性。我们可以根据是否需要阻塞选择使用哪个数据结构.

ConcurrentLinkedQueue应该注意的地方:

  1. 并不是使用ConcurrentLinkedQueue类之后意味着不需要自己进行任何同步或加锁操作,查了下资料, 如果直接使用它提供的函数,比如:queue.add(obj); 或者 queue.poll(obj);,这样我们自己不需要做任何同步。 但如果是非原子操作,比如:
    if(!queue.isEmpty()) {
        queue.poll(obj);
    }
    

    我们很难保证,在调用了isEmpty()之后,poll()之前,这个queue没有被其他线程修改。 所以对于这种情况,我们还是需要自己同步:

    synchronized(queue) {
       if(!queue.isEmpty()) {
         queue.poll(obj);
       }
      }
    

      当然,如果是可以接受的脏读同样可以不用加synchronized

  2. 判断是否还有元素时,ConcurrentLinkedQueue的API原来.size()是要遍历一遍集合的,比较慢,所以尽量要避免用size而改用isEmpty().

ConcurrentHashMap

一个经常被使用的数据结构,因为HashMap的线程不安全,以及Hashtable的低效,,相比于Hashtable以及Collections.synchronizedMap(),ConcurrentHashMap在线程安全的基础上提供了更好的写并发能力,但同时降低了对读一致性的要求.

ConcurrentHashMap代码中可以看出,它引入了一个“分段锁”的概念,具体可以理解为把一个大的Map拆分成N个小的HashTable,根据key.hashCode()来决定把key放到哪个HashTable中。
在ConcurrentHashMap中,就是把Map分成了N个Segment,put和get的时候,都是现根据key.hashCode()算出放到哪个Segment中.默认是16个段.

concurrentHashmap维护一个segment数组,将元素分成若干段(第一次hash)

/**
* The segments, each of which is a specialized hash table.
*/
final Segment<K,V>[] segments;
segments的每一个segment维护一个链表数组

在大并发的情况下,只会影响某一个segment的rehash而其他segment不会受到影响


Collections.synchronized类方法

    Collection c = Collections.synchronizedCollection(new ArrayList());
    List list = Collections.synchronizedList(new ArrayList());
    Set set = Collections.synchronizedSet(new HashSet());
    Map map = Collections.synchronizedMap(new HashMap());

返回一个线程安全的集合类.

不过需要注意的是, 这些类的同步也只是靠其内部的一个锁来控制的,所以若有需要连续的锁控制的地方,我们还是得自己进行同步控制.说实话这个方法我不常用,因为任何能用到它的地方好像都有方案代替.

参考文章:Collections.synchronizedList()不同锁造成的陷阱


ThreadLocal 为每个线程创建一个单独的变量副本,提供了保持对象的方法和避免参数传递的复杂性

以前的Java版本是通过, Thread的ThreadLocal.Values变量来做的,保持每个线程有独立的变量. jdk1.7开始,改为ThreadLocalMap.即通过为每个线程实现ThreadLocalMap来实现每个线程有独立变量.

  • void set(Object value)设置当前线程的线程局部变量的值。
  • public Object get()该方法返回当前线程所对应的线程局部变量。
  • public void remove()将当前线程局部变量的值删除,目的是为了减少内存的占用,该方法是JDK 5.0新增的方法。需要指出的是,当线程结束后,对应该线程的局部变量将自动被垃圾回收,所以显式调用该方法清除线程的局部变量并不是必须的操作,但它可以加快内存回收的速度。
  • protected Object initialValue()返回该线程局部变量的初始值,该方法是一个protected的方法,显然是为了让子类覆盖而设计的。这个方法是一个延迟调用方法,在线程第1次调用get()或set(Object)时才执行,并且仅执行1次,ThreadLocal中的缺省实现直接返回一个null。

ThreadLocal博客


本文作者:Anderson/Jerey_Jobs

博客地址 : http://jerey.cn/
简书地址 : Anderson大码渣
github地址 : https://github.com/Jerey-Jobs

作者:Anderson大码渣,欢迎关注我的简书: Anderson大码渣