Java并发之容器BlockingQueue

Java并发之容器BlockingQueue

阻塞队列,顾名思义,首先它是一个队列,常用的队列主要有以下两种:

  1. 先进先出(FIFO):先插入的队列的元素也最先出队列,类似于排队的功能。从某种程度上来说这种队列也体现了一种公平性。
  2. 后进先出(LIFO):后插入队列的元素最先出队列,这种队列优先处理最近发生的事件。

多线程环境中,通过队列可以很容易实现数据共享,比如经典的“生产者”和“消费者”模型中,通过队列可以很便利地实现两者之间的数据共享。假设我们有若干生产者线程,另外又有若干个消费者线程。如果生产者线程需要把准备好的数据共享给消费者线程,利用队列的方式来传递数据,就可以很方便地解决他们之间的数据共享问题。

但如果生产者和消费者在某个时间段内,万一发生数据处理速度不匹配的情况呢?理想情况下,如果生产者产出数据的速度大于消费者消费的速度,并且当生产出来的数据累积到一定程度的时候,那么生产者必须暂停等待一下(阻塞生产者线程),以便等待消费者线程把累积的数据处理完毕,反之亦然。

  • 当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列。
  • 当队列中填满数据的情况下,生产者端的所有线程都会被自动阻塞(挂起),直到队列中有空的位置,线程被自动唤醒。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public interface Queue<E> extends Collection<E> {
boolean add(E e);
boolean offer(E e);

/**
* 获取队列头,并删除。若对列为空,抛出异常 NoSuchElementException
*/
E remove();

/**
* 获取队列头,并删除。若对列为空,返回null
*/
E poll();

/**
* 获取队列头,不删除。 若对列为空,抛出异常 NoSuchElementException
*/
E element();

/**
* 获取队列头,不删除。 若对列为空,返回null
*/
E peek();
}

// BlockingQueue 中的元素不允许为null,插入null时会抛出空指针异常
public interface BlockingQueue<E> extends Queue<E> {
/* 立刻向队列中插入元素e\(不超过容量上限\),
* 成功时返回true,
* 当没有空间时抛出IllegalStateException;
* 使用容量限制队列时,通常最好使用offer。
*/
boolean add(E e);
/* 立刻向队列中插入元素e\(不超过容量上限\),
* 成功时返回true,
* 当没有空间时返回false;
* 使用容量限制队列时,通常最好使用offer。
*/
boolean offer(E e);
/* blockingqueue 提供的阻塞操作
* 向队列插入元素,如果没有空间,阻塞直到插入成功;
* 阻塞时中断会抛出InterruptedException
*/
void put(E e) throws InterruptedException;
/* 向队列插入元素,
* 如果没有空间,等待空间,超时失败;
*/
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
/* 获取并删除队列的头,阻塞到获取成功
* 阻塞时中断会抛出InterruptedException
*/
E take() throws InterruptedException;
/* 获取并删除队列的头,
* 如果队列为空,等待,直到超时退出,返回 null
*/
E poll(long timeout, TimeUnit unit) throws InterruptedException;
/* 返回理想状态下此队列可以添加的其它元素的数量,无阻塞
* 注意,多线程下无法通过remainingCapacity判断一个元素能否插入成功,
* 非原子操作
*/
int remainingCapacity();
/* 从队列中删除某个元素,如果存在该元素equals入参o*/
boolean remove(Object o);
/* 如果队列中存在某个元素,返回true,否则返回false */
public boolean contains(Object o);
/* 一次性从BlockingQueue拿走所有可用的数据对象(还可以指定获取数据的个数),
    通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。*/
int drainTo(Collection<? super E> c);
int drainTo(Collection<? super E> c, int maxElements);
}

双端阻塞队列(BlockingDeque)

先来看看Deque是什么。Deque是双端队列,允许在队列头和尾部进行入队出队操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public interface Deque<E> extends Queue<E> {
// 队列头部插入元素,失败抛异常
void addFirst(E e);
// 队列尾部插入元素,失败抛异常
void addLast(E e);
// 队列头部插入元素,失败null
boolean offerFirst(E e);
// 队列尾部插入元素,失败null
boolean offerLast(E e);

// 获取头部,失败异常
E removeFirst();
// 获取尾部,失败异常
E removeLast();
// 获取头部,失败null
E pollFirst();
// 获取头部,失败null
E pollLast();
// 查看不删除,头部;失败异常
E getFirst();
// 查看不删除,尾部;失败异常
E getLast();
// 查看不删除,头部;失败null
E peekFirst();
// 查看不删除,尾部;失败null
E peekLast();

// 从头部删除第一个相同元素
boolean removeFirstOccurrence(Object o);
// 从尾部删除第一个相同元素
boolean removeLastOccurrence(Object o);

// *** Queue methods ***
boolean add(E e);
boolean offer(E e);
E remove();
E poll();
E element();
E peek();

// *** Stack methods ***

// 队首入栈,失败异常
void push(E e);
// 队首出站,失败异常
E pop();

// *** Collection methods ***

// 等同于方法 removeFirstOccurrence(Object)
boolean remove(Object o);
// 是否包含
boolean contains(Object o);
// 当前容量
public int size();
// 头到尾 正向迭代器
Iterator<E> iterator();
// 尾到头,反向迭代器
Iterator<E> descendingIterator();

}

BlockingDeque提供在队列头和尾部进行入队出队操作的阻塞方法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public interface BlockingDeque<E> extends BlockingQueue<E>, Deque<E> {
/* 向队列头部插入元素,如果没有空间,阻塞直到插入成功;
* 阻塞时中断会抛出InterruptedException
*/
void putFirst(E e) throws InterruptedException;

/* 向队列尾部插入元素,如果没有空间,阻塞直到插入成功;
* 阻塞时中断会抛出InterruptedException
*/
void putLast(E e) throws InterruptedException;

/* 获取并删除队列的头,阻塞到获取成功
* 阻塞时中断会抛出InterruptedException
*/
E takeFirst() throws InterruptedException;

/* 获取并删除队列的尾,阻塞到获取成功
* 阻塞时中断会抛出InterruptedException
*/
E takeLast() throws InterruptedException;
}
-------------本文结束感谢您的阅读-------------
坚持分享,您的支持将鼓励我继续创作!
0%