执行阻塞代码

理想情况下,所有的APIs都可以写成异步的形式,但是理想是丰满的,现实是骨感的。

实际上,大部分的库,尤其是java生态圈的库,多数的APIs都是同步阻塞的。一个很常见的例子就是JDBC API,这就是一个内聚同步的例子,也就是说无论如何,Vert.x在这样的库基础上都很难做到100%的异步化。

我们也不可能复写所有的东西,所以我们也提供另外一种“传统”方式来在Vert.x程序中安全的实现阻塞APIs。

正如之前所说的,不能从轮训事件中直接调用阻塞操作,因为那样会阻塞轮训事件处理其他的任务,那么我们该如何执行阻塞代码呢?

答案是通过executeBlocking同时指定要执行的阻塞代码和用来异步调用处理阻塞代码执行结果的结果处理器。

vertx.executeBlocking(future -> {
  // 调用一段很耗时间的操作并等待结果
  String result = someAPI.blockingMethod("hello");
  // 通过future包装异步等待结果
  future.complete(result);
}, res -> {
  // 处理返回的结果
  System.out.println("The result is: " + res.result());
});

通常情况下,如果executeBlocking被同一个context同时调用多次(比如在同一个verticle实例中的多次调用)那么不同的executeBlocking会序列化执行(也就是说一个接着另外一个执行)。

如果执行顺序不影响正常业务,那么通过指定executeBlocking的ordered属性为false,这样的话工作线程就会平行执行executeBlocking任务。

另外一种执行阻塞代码的方法是使用woker verticle。

一个Worker verticle通常是通过一个工作线程池(Worker pool)中的一个线程执行的。

默认情况下阻塞代码是通过Vert.x的阻塞代码执行线程池执行的,可以通过setWorkerPoolSize来配置。

根据不同的需求,也可以指定创建其他的线程池:

WorkerExecutor executor = vertx.createSharedWorkerExecutor("my-worker-pool");
executor.executeBlocking(future -> {
  // Call some blocking API that takes a significant amount of time to return
  String result = someAPI.blockingMethod("hello");
  future.complete(result);
}, res -> {
  System.out.println("The result is: " + res.result());
});

但是这种情况下,在工作任务完成并且线程池空闲的情况下,一定要记得关闭线程池:

executor.close();

如果多个Worker以相同的名字被创建,那么他们会共享相同的线程池。Worker线程池会在所有worker被关闭之后被销毁。

如果在一个Verticle中创建一个执行器,Vert.x会自动在Verticle被下架之后帮你关闭其中的执行器。

执行器(Worker executors)可以在创建时进行配置:

int poolSize = 10;

// 2 minutes
long maxExecuteTime = 120000;

WorkerExecutor executor = vertx.createSharedWorkerExecutor("my-worker-pool", poolSize, maxExecuteTime);

Worker pool执行线程池的配置在创建时指定,运行时不可更改。

results matching ""

    No results matching ""