netty BlockingOperationException

对netty线程模型了解不深,使用Future的sync()或await()方法不当会产生BlockingOperationException。stackoverflow上有个回答很好的解答了这个问题:stackoverflow

1
BlockingOperationException will be throw by netty if you call sync*or await* on a Future in the same thread that the EventExecutor is using and to which the Future is tied to. This is usually the EventLoop that is used by the Channel itself.

源码解析

Future代表一个异步任务的结果,netty对java.util.concurrent.Future进行了增强:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/**
* 扩展了JDK的Future接口
*/

public interface Future<V> extends java.util.concurrent.Future<V> {

//异步操作完成且正常终止
boolean isSuccess();

//异步操作是否可以取消
boolean isCancellable();

//异步操作失败的原因
Throwable cause();

//添加一个监听者,异步操作完成时回调,类比javascript的回调函数
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);

//添加多个监听者
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

//移除一个监听者
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);

//移除一个监听者
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

//等待任务结束,如果任务产生异常或被中断则抛出异常,否则返回Future自身
Future<V> sync() throws InterruptedException;

//等待任务结束,任务本身不可中断,如果产生异常则抛出异常,否则返回Future自身
Future<V> syncUninterruptibly();

//等待任务结束,如果任务被中断则抛出中断异常,与sync不同的是只抛出中断异常,不抛出任务产生的异常
Future<V> await() throws InterruptedException;

//阻塞直到异步操作完成
Future<V> awaitUninterruptibly();

//同await,加了时间限制
boolean await(long timeout, TimeUnit unit) throws InterruptedException;

//同await,加了时间限制
boolean await(long timeoutMillis) throws InterruptedException;

//同awaitUninterruptibly,加了时间限制
boolean awaitUninterruptibly(long timeout, TimeUnit unit);

//同awaitUninterruptibly,加了时间限制
boolean awaitUninterruptibly(long timeoutMillis);

//非阻塞地返回异步结果,如果尚未完成返回null
V getNow();

//取消任务
@Override
boolean cancel(boolean mayInterruptIfRunning);
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
DefaultPromise.java

@Override
public Promise<V> await() throws InterruptedException {
//如果任务已经完成则直接返回
if (isDone()) {
return this;
}

if (Thread.interrupted()) {
throw new InterruptedException(toString());
}

//检查是否产生死锁
checkDeadLock();

synchronized (this) {
while (!isDone()) {
incWaiters();
try {
//等待被唤醒
wait();
} finally {
decWaiters();
}
}
}
return this;
}

protected void checkDeadLock() {
EventExecutor e = executor();
if (e != null && e.inEventLoop()) {
throw new BlockingOperationException(toString());
}
}

判断执行任务线程和current thread是否时同一个线程,如果是就检测为死锁抛出异常BlockingOperationException