虚拟线程使我们能够编写更具表现力和直接的并发代码。得益于虚拟线程获得的大规模吞吐量,我们可以轻松采用每个任务一个线程的模型(对于 HTTP 服务器来说,这意味着每个请求一个线程;对于数据库来说,这意味着每个事务一个线程,等等)。换句话说,我们可以为每个并发任务分配一个新的虚拟线程。尝试使用平台线程采用每个任务一个线程的模型将导致吞吐量受到硬件核心数量的限制——这是由 Little 定律(Little's law)解释的,L = λW,或者吞吐量等于平均并发量除以延迟。只要可能,建议避免直接与线程交互。JDK 通过 ExecutorService/Executor API 支持这一点。更具体地说,我们习惯于将任务(Runnable/Callable)提交给 ExecutorService/Executor,并使用返回的 Future 进行操作。这种模式对于虚拟线程也是有效的。因此,我们不必自己编写所有管道代码来采用虚拟线程的每个任务一个线程的模型,因为从 JDK 19 开始,这个模型可以通过 Executors 类获得。更具体地说,是通过 newVirtualThreadPerTaskExecutor() 方法,该方法创建了一个能够创建无限数量虚拟线程的 ExecutorService,这些虚拟线程遵循每个任务一个线程的模型。这个 ExecutorService 公开了允许我们提交任务的方法,如 submit()(如您接下来将看到的)和 invokeAll/Any()(如您稍后将看到的)方法,并返回一个包含异常或结果的 Future。
从 JDK 19 开始,ExecutorService 扩展了 AutoCloseable 接口。换句话说,我们可以使用 try-with-resources 模式使用 ExecutorService。
考虑以下简单的 Runnable 和 Callable:
Runnable taskr = () -> logger.info(
Thread.currentThread().toString());
Callable<Boolean> taskc = () -> {
logger.info(Thread.currentThread().toString());
return true;
};
可以按如下方式执行 Runnable/Callable(在这里,我们提交了 15 个任务(NUMBER_OF_TASKS = 15)):
try (ExecutorService executor
= Executors.newVirtualThreadPerTaskExecutor()) {
for (int i = 0; i < NUMBER_OF_TASKS; i++) {
executor.submit(taskr); // 执行 Runnable
executor.submit(taskc); // 执行 Callable
}
}
当然,在 Runnable/Callable 的情况下,我们可以捕获一个 Future 并通过阻塞的 get() 方法或我们想要做的任何事情进行相应操作。
Future<?> future = executor.submit(taskr);
Future<Boolean> future = executor.submit(taskc);
可能的输出如下所示:
VirtualThread[#28]/runnable@ForkJoinPool-1-worker-6
VirtualThread[#31]/runnable@ForkJoinPool-1-worker-5
VirtualThread[#29]/runnable@ForkJoinPool-1-worker-7
VirtualThread[#25]/runnable@ForkJoinPool-1-worker-3
VirtualThread[#24]/runnable@ForkJoinPool-1-worker-2
VirtualThread[#27]/runnable@ForkJoinPool-1-worker-5
VirtualThread[#26]/runnable@ForkJoinPool-1-worker-4
VirtualThread[#22]/runnable@ForkJoinPool-1-worker-1
VirtualThread[#36]/runnable@ForkJoinPool-1-worker-1
VirtualThread[#37]/runnable@ForkJoinPool-1-worker-2
VirtualThread[#35]/runnable@ForkJoinPool-1-worker-7
VirtualThread[#34]/runnable@ForkJoinPool-1-worker-4
VirtualThread[#32]/runnable@ForkJoinPool-1-worker-3
VirtualThread[#33]/runnable@ForkJoinPool-1-worker-2
VirtualThread[#30]/runnable@ForkJoinPool-1-worker-1
检查虚拟线程的 ID。它们在 #22-#37 之间,没有重复。每个任务由其自己的虚拟线程执行。每个任务一个线程的模型也适用于经典线程,通过 newThreadPerTaskExecutor(ThreadFactory threadFactory) 获得。这里是一个示例:
static class SimpleThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
return new Thread(r); // 经典
// return Thread.ofVirtual().unstarted(r); // 虚拟
}
}
try (ExecutorService executor =
Executors.newThreadPerTaskExecutor(
new SimpleThreadFactory())) {
for (int i = 0; i < NUMBER_OF_TASKS; i++) {
executor.submit(taskr); // 执行 Runnable
executor.submit(taskc); // 执行 Callable
}
}
如您所见,newThreadPerTaskExecutor() 可以用于经典线程或虚拟线程。创建的线程数量是无限的。通过简单地修改线程工厂,我们可以在虚拟/经典线程之间切换。可能的输出如下所示:
Thread[#75,Thread-15,5,main]
Thread[#77,Thread-17,5,main]
Thread[#76,Thread-16,5,main]
Thread[#83,Thread-23,5,main]
Thread[#82,Thread-22,5,main]
Thread[#80,Thread-20,5,main]
Thread[#81,Thread-21,5,main]
Thread[#79,Thread-19,5,main]
Thread[#78,Thread-18,5,main]
Thread[#89,Thread-29,5,main]
Thread[#88,Thread-28,5,main]
Thread[#87,Thread-27,5,main]
Thread[#86,Thread-26,5,main]
Thread[#85,Thread-25,5,main]
Thread[#84,Thread-24,5,main]
检查线程的 ID。它们在 #75-#89 之间,没有重复。每个任务由其自己的线程执行。
本文暂时没有评论,来添加一个吧(●'◡'●)