1、AbstractQueue抽象队列、BlockingQueue阻塞队列、Deque双端队列
2、Queue FIFO先进先出,
写入:队列满阻塞等待,取出:队列满阻塞等待生产
3、使用场景:多线程并发处理、线程池
4、阻塞队列(BlockingQueue)——四组API(添加、移除、判断队列首部的场景)
==1、会抛出异常
==2、有返回值,不抛出异常
==3、阻塞等待(一直阻塞等待)
==4、超时等待
=========抛出异常 add()\\remove()\\element()=========
//抛出异常
public static void atest(){
//数组类型的阻塞队列 要指定 队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
//添加操作 返回Boolean值
System.out.println(blockingQueue.add(\"a\"));
System.out.println(blockingQueue.add(\"b\"));
System.out.println(blockingQueue.add(\"c\"));
/*
java.lang.IllegalStateException: Queue full
队列已满 抛出异常
*/
// System.out.println(blockingQueue.add(\"d\"));
//检测队首元素
System.out.println(\"队首元素\"+blockingQueue.element());
//移除操作 返回移除的添加的元素
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
/*
java.util.NoSuchElementException
元素为空 不能再移除 抛出异常
*/
// System.out.println(blockingQueue.remove());
} public static void atest(){
//数组类型的阻塞队列 要指定 队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
//添加操作 返回Boolean值
System.out.println(blockingQueue.add(\"a\"));
System.out.println(blockingQueue.add(\"b\"));
System.out.println(blockingQueue.add(\"c\"));
/*
java.lang.IllegalStateException: Queue full
队列已满 抛出异常
*/
// System.out.println(blockingQueue.add(\"d\"));
//移除操作 返回移除的添加的元素
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
/*
java.util.NoSuchElementException
元素为空 不能再移除 抛出异常
*/
// System.out.println(blockingQueue.remove());
}
=========有返回值,不抛出异常 add()\\remove()\\element()=========
// 不抛出异常
public static void btest(){
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
//添加 返回Boolean值 添加成功 返回true
System.out.println(blockingQueue.offer(\"a\"));
System.out.println(blockingQueue.offer(\"b\"));
System.out.println(blockingQueue.offer(\"c\"));
// 队列大小为3 继续添加元素 不能继续添加 返回false
System.out.println(blockingQueue.offer(\"d\"));
//移除 返回元素
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
// 队列在元素全部移除后 继续移除操作 不能继续移除 返回null
System.out.println(blockingQueue.poll());
}
=========阻塞等待(一直阻塞等待) add()\\remove()\\element()=========
// 一直阻塞等待 public static void ctest() throws InterruptedException { ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3); //添加元素 blockingQueue.put(\"a\"); blockingQueue.put(\"b\"); blockingQueue.put(\"c\"); // 队列已满 如果继续添加元素 则造成阻塞 一直阻塞等待 程序一直在等// blockingQueue.put(\"d\"); //移除元素 System.out.println(blockingQueue.take()); System.out.println(blockingQueue.take()); System.out.println(blockingQueue.take()); //队列为空 如果继续移除元素 则造成阻塞 一直在等待元素来移除 程序一直在等// System.out.println(blockingQueue.take());• }
=========超时等待 add()\\remove()\\element()=========
// 等待超时退出
public static void dtest() throws InterruptedException {
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
//添加元素 offer(元素,时间,单位) offer方法的重载方法
System.out.println(blockingQueue.offer(\"a\"));
System.out.println(blockingQueue.offer(\"b\"));
System.out.println(blockingQueue.offer(\"c\"));
//队列已满 继续添加元素 等待超过2秒就结束程序
System.out.println(blockingQueue.offer(\"d\", 2, TimeUnit.SECONDS));
//移除元素 poll(时间,单位) poll方法的重载方法
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
//队列为空 继续移除元素 等待超过2秒就结束程序
System.out.println(blockingQueue.poll(2,TimeUnit.SECONDS));
}
5、同步队列
==没有容量(即不长时间存储元素)
进去一个元素,必须等待取出(take())后才能再往里边继续添加(put())
==例子:
BlockingQueue<String> blockingQueue = new SynchronousQueue<String>();//同步队列
//添加元素 线程
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+\"put 1\");
blockingQueue.put(\"1\");
System.out.println(Thread.currentThread().getName()+\"put 2\");
blockingQueue.put(\"2\");
System.out.println(Thread.currentThread().getName()+\"put 3\");
blockingQueue.put(\"3\");
} catch (InterruptedException e) {
e.printStackTrace();
}
},\"线程1\").start();
//移除元素 线程
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+\"==>\"+blockingQueue.take());
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+\"==>\"+blockingQueue.take());
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+\"==>\"+blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
},\"线程2\").start();
}
AQS
1、一个用来构建锁和同步器的框架,定义了锁的实现机制,并开放出扩展的地方,让子类去实现。(封装得很好,但又有子类扩展的地方)
例如:lock(加锁)时锁的内部机制:
使用锁Lock时,AQS开放state字段,然子类可以根据state字段来决定是否能够获得锁,对于获取不到锁的线程,AQS会自动进行管理,无需子类锁关心。
2、使用AQS能够简单且高效地构造出大量应用广泛的同步器:ReentrantLock,Semaphore,ReentrantReadWriteLock,SynchronousQueue,FutureTask等等,都基于AQS。也可以自定义同步器。
3、AQS底层:
==(1)由 同步队列 + 条件队列 联合实现(CLH队列锁)
====一般情况下有同步队列(双向链表)组成,条件队列(单向链表)不是必须存在的,当程序中存在condition时,才会存在此列表。
====同步队列管理获取不到锁的线程的排队和释放
====条件队列是在一定场景下,对同步队列的补充(非必须的),如,获得锁的线程从空队列中拿数据(队列是空的,拿不到数据的),此时,条件队列会管理该线程,使线程阻塞。
==(2)核心思想:AQS内部维护一个CLH队列来管理。
====线程请求共享资源时,
如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效线程,并且将共享资源设置为锁定状态;
如果被请求的共享资源被占用,则需要CLH队列锁实现的机制来实现线程阻塞等待以及线程被唤醒使锁的分配:即将暂时获取不到锁的线程加入到队列中。(上边的同步队列)
==(3)CLH锁队列:
====是一个虚拟的双向队列(不存在队列实例,仅存在节点间的关联关系)
====AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个个节点(Node)实现锁的分配。
==(4)AQS中同步队列的工作流程和数据结构:(结合两图理解)
====工作过程:
(1)当前线程获取同步状态失败,同步器会将当线程及等待状态等信息构成一个Node节点,加入CLH队列中,放在队尾,同步器重新设置尾节点。
(2)加入队列后,会阻塞当前线程
(3)同步状态被释放并且同步器重新设置首节点,同步器唤醒等待队列中第一个节点,让其再次获取同步状态。
====AQS使用一个int成员变量来表示同步状态,通过内置的FIFO队列来完成获取资源线程的排队工作。
====AQS使用CAS对该同步状态进行原子性操作实现对其值的修改
private volatile int state;//共享变量,使用volatile修饰保证线程可见性
====状态信息通过protected类型的getState,setState,compareAndSetState进行操作
//返回同步状态的当前值
protected final int getState() {
return state;
}
//设置同步状态的值
protected final void setState(int newState) {
state = newState;
}
//原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
4、AQS对资源的共享方式
定义了两种资源共享方式:
==(1)Exclusive(独占):又分为公平锁和非公平锁
只有一个线程能执行,如ReentrantLock
====公平锁:按照线程在队列中的排队顺序, 先到者先拿到锁
====非公平锁:当前线程要获取锁时,无视队列内的顺序,直接去强锁,谁抢到了就是谁的。
==(2)Share(共享):
多个线程可以同时执行,如Semaphore\\CyclicBarrier\\ReadWriteLock\\CountDownLatch
==(3)ReentrantReadWriteLock:组合两种资源共享方式的
====读锁运行多个线程同时执行对同一资源进行读操作
====写锁仅允许一个线程能执行对同一资源进行写操作
==(4)不同自定义的同步器有不同的共享资源方式,自定义同步器在实现时,只需要实现共享资源state的获取与释放方式即可。
5、AQS使用的设计模式
(1)基于模板设计模式的
(2)自定义同步器的方式:
==1、继承AbstractQueuedSynchronizer并重写指定的方法
重写方法是指对于共享资源state的获取和释放
==2、将AQS组合在自定义同步组件的实现中,并调用其模板方法
这些模板方法会调用上边继承重写的方法
====AQS提高的模板方法
protected boolean tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。
protected boolean tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。
protected int tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
protected boolean tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。
protected boolean isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。
====除以上的方法外,AQS类中其它方法不能被重写,都为final
====例:
=======独占方式=======
ReentrantLock
(1)state初始化为0,表示未锁定状态;
(2)A线程lock()时,会调用tryAcquire()独占该锁并state+1;
(3)其它线程tryAcquire()时会失败,直到A线程unlock()到state=0,即释放,其它线程才能获取该锁。
(4)A线程释放前,A自己时可以重复获取此锁的(state++),即可重入
(5)A线程释放,一定要将state回归为state=0
=======共享方式=======
CountDownLatch,任务分为N个子线程进行执行
(1)state初始化为N(与线程数一致)
(2)N个子线程并行每执行countDown()一次,state CAS减一
(3)所有子线程都执行完成后即state=0,会unpark()主调用线程
(4)主调用线程从await()函数返回,继续后续的操作
6、AQS组件
(1)Semaphore(信号量):允许多个线程同时访问
====synchronized和ReentrantLock是一次只允许一个线程访问某个资源
(2)CountDownLatch(倒计时器):同步工具类,用来协调多个线程间的同步,通常用来控制线程等待,可以让某个线程等待直到倒计时结束,再开始执行。
(3)CyclicBarrier(循环栅栏):可以实现线程间的技术等待,功能更强大复杂。
== Cyclic(可循环使用)的Barrier(屏障):
====让一组线程到达一个屏障(同步点)时被阻塞,直到最后一个线程到达屏障(同步点)时,才会打开屏障,所有被拦截的线程此时才会继续执行。
====CyclicBarrier的默认构造方法:CyclicBarrier(int parties)
=======parties参数:表示屏障拦截的线程数量
=======每个线程调用await()方法告知CyclicBarrier已到达屏障,然后被阻塞。
来源:https://www.cnblogs.com/hexiayuliang666/p/16156939.html
本站部分图文来源于网络,如有侵权请联系删除。