调度器
Sponge 提供了 Scheduler 机制用于允许插件开发者指定希望在未来完成的任务。 Scheduler
提供了一个 Task.Builder 可以用于指定项目的属性如延迟时间、名称、同步或异步、以及一个 Runnable
(参见 属性 )。
生成器
首先我们先获取到一个 Task.Builder
的实例:
import org.spongepowered.api.scheduler.Task;
Task.Builder taskBuilder = Task.builder();
唯一必需的属性是 Runnable ,该属性通过 Task.Builder#execute(Runnable) 方法指定。
taskBuilder.execute(new Runnable() {
public void run() {
logger.info("Yay! Schedulers!");
}
});
或者使用 Task.Builder#execute(Runnable runnable)
方法的 Java 8 形式
taskBuilder.execute(
() -> {
logger.info("Yay! Schedulers!");
}
);
或者使用 Task.Builder#execute(Consumer<Task> task)
方法的 Java 8 形式
taskBuilder.execute(
task -> {
logger.info("Yay! Schedulers! :" + task.getName());
}
);
属性
通过使用 Task.Builder
,你可以指定其他可选的属性,如下所述。
属性名 |
使用的方法 |
说明 |
---|---|---|
delay |
delayTicks(long delay)
|
在运行前经过的可选的延迟时间。 时间被指定为具有 任一方法均可,但不能同时指定。 |
interval |
|
两次任务重复之间的延时。若未指定,则任务不会重复。 时间被指定为具有 任一方法均可,但不能同时指定。 |
synchronization |
async() |
同步(Synchronous)任务在游戏的主循环也就是每 Tick 的循环中运行。如果使用 |
name |
name(String name) |
用于描述的名称。默认情况下的名称是 PLUGIN_ID “-” ( “A-” | “S-” ) SERIAL_ID 。比如一个默认名称可能会是这样子:“fooplugin-A-12”。Sponge 不会让两个名称的 SERIAL_ID 和 同步/异步类型都完全相同。如果你指定了特殊的名称,那么这一名称应该有比较好的描述性,并应在调试插件时起到很大的帮助。 |
最后,通过使用 Task.Builder#submit(Object) 方法,把属性设置提交给调度器。
就是这样!综上所述,通过下面的代码,我们就可以生成并提交一个功能齐全的调度器。该调度器会以异步的方式每五分钟运行一次,第一次运行将会在设置后的 100 毫秒后发生:
import java.util.concurrent.TimeUnit;
PluginContainer plugin = ...;
Task task = Task.builder().execute(() -> logger.info("Yay! Schedulers!"))
.async().delay(100, TimeUnit.MILLISECONDS).interval(5, TimeUnit.MINUTES)
.name("ExamplePlugin - Fetch Stats from Database").submit(plugin);
若要取消,只需要简单地调用 Task#cancel() 方法就可以了:
task.cancel();
如果你想通过调度器本身来取消的话,你可以转而去使用 Consumer<Task>
以获取实例本身。下面的示例将在调度器中进行从 60 开始的倒计时,并在倒计时达到零时取消自身。
@Listener
public void onGameInit(GameInitializationEvent event) {
Task task = Task.builder().execute(new CancellingTimerTask())
.interval(1, TimeUnit.SECONDS)
.name("Self-Cancelling Timer Task").submit(plugin);
}
private class CancellingTimerTask implements Consumer<Task> {
private int seconds = 60;
@Override
public void accept(Task task) {
seconds--;
Sponge.getServer()
.getBroadcastChannel()
.send(Text.of("Remaining Time: "+seconds+"s"));
if (seconds < 1) {
task.cancel();
}
}
}
警告
任何进入调度器的任务都应监视当前 Game 的 state 或在不需要的时候取消注册(比如在 GameStoppingServerEvent 发布的时候)。这一点对客户端相当关键,因为它可以反复开关服务器。
异步调度器
异步任务应主要用于可能需要相当长时间执行的代码,比如对另一个服务器或数据库的请求。如果在主线程上完成,对另一个服务器的请求可能会大大影响游戏的性能,因为在请求完成之前,游戏不能进行到下一个 Tick。
由于 Minecraft 主要是单线程的,你几乎不能在异步线程中做什么事情。如果你一定要异步运行线程的话,你应该执行所有不使用 SpongeAPI ,并且不影响 Minecraft 的代码,然后注册另一个同步任务来处理需要它们的代码。不过,在 Minecraft 中有一小部分是可以异步工作的,这包括:
聊天
Sponge 的内置权限管理系统
Sponge 的调度器本身
此外,还有一些异步安全的操作:
独立的网络请求
(不包含 Sponge 使用的文件的)文件系统的 IO
警告
在非主线程上访问游戏对象会导致崩溃、行为不一致等诸多问题,应尽可能避免。错误的实现会引发 ConcurrentModificationException
,有可能会伴随服务器崩溃,甚至是玩家/世界/服务器数据损坏。
警告
任何进入调度器的任务都应监视当前 Game 的 state 或在不需要的时候取消注册(比如在 GameStoppingServerEvent 发布的时候)。这一点对客户端相当关键,因为它可以反复开关服务器。
和第三方库的兼容性
随着插件在大小和范围上的增长,你可能希望开始使用使用 Java 或兼容 JVM 的许多第三方并发库之一。这些库确实倾向于支持 Java 的 ExecutorService 作为任务在哪个线程上执行的指示。
为了允许这些库和 Sponge 的 Scheduler
一起使用,可以使用以下方法:
Scheduler#createSyncExecutor(Object) 方法创建一个 SpongeExecutorService 用于通过 Sponge 的同步调度器执行任务。
Scheduler#createAsyncExecutor(Object) 方法创建一个
SpongeExecutorService
用于通过 Sponge 的异步调度器执行任务。任务的限制在 Asynchronous Tasks 部分里也有所提及。
需要时刻牢记的一件事是,任何与 Sponge 交互的任务,如果它们与 Sponge 的交互方式,在 Asynchronous Tasks 中列出的交互方式之外,那么需要在使用 Scheduler#createSyncExecutor(Object)
创建的 ExecutorService 上执行才能保证线程安全。
import org.spongepowered.api.scheduler.SpongeExecutorService;
PluginContainer plugin = ...;
SpongeExecutorService minecraftExecutor = Sponge.getScheduler().createSyncExecutor(plugin);
minecraftExecutor.submit(() -> { ... });
minecraftExecutor.schedule(() -> { ... }, 10, TimeUnit.SECONDS);
几乎所有的库都有一些适应 ExecutorService
的方法来本地调度任务。作为示例,下面的内容将解释 ExecutorService
如何在许多库中使用。
CompletableFuture(Java 8)
在 Java 8 中,对象 CompletableFuture 被添加到标准库中。与 Future
对象相比,它允许开发人员提供一个回调,当 Future 完成时调用,而不是阻塞线程,直到 Future 最终完成。
CompletableFuture 是一个流畅的(Fluent)接口,它的每个功能通常有以下三个变体:
CompletableFuture#<function>Async(..., Executor ex)
通过ex
执行CompletableFuture#<function>Async(...)
通过ForkJoinPool.commonPool()
执行CompletableFuture#<function>Async(...)
通过之前的CompletableFuture
完成的任何线程上执行。
import java.util.concurrent.CompletableFuture;
PluginContainer plugin = ...;
SpongeExecutorService minecraftExecutor = Sponge.getScheduler().createSyncExecutor(plugin);
CompletableFuture.supplyAsync(() -> {
// ASYNC: ForkJoinPool.commonPool()
return 42;
}).thenAcceptAsync((awesomeValue) -> {
// SYNC: minecraftExecutor
}, minecraftExecutor).thenRun(() -> {
// SYNC: minecraftExecutor
});
RxJava
RxJava 是 Reactive Extensions 理念在 JVM 上的一个实现。
Rx 中的多线程是通过各种 调度器 管理的。使用 Schedulers#from(Executor executor)
方法,Sponge 提供的 Executor
可以变成 Scheduler
。
就像 CompletableFuture
的默认行为是在整条执行链的前一部分的相同线程上执行的一样,使用 Observable#observeOn(Scheduler scheduler)
在线程之间移动。
需要记住重要的一点是,根 Observable
在 Observable#subscribe()
被调用的任何线程上被调用。也就是说,如果根 Observable 与 Sponge 交互,它应该被强制使用 Observable#subscribeOn(Scheduler scheduler)
来同步运行。
import rx.Observable;
import rx.Scheduler;
import rx.schedulers.Schedulers;
PluginContainer plugin = ...;
SpongeExecutorService executor = Sponge.getScheduler().createSyncExecutor(plugin);
Scheduler minecraftScheduler = Schedulers.from(executor);
Observable.defer(() -> Observable.from(Sponge.getServer().getOnlinePlayers()))
.subscribeOn(minecraftScheduler) // defer -> SYNC: minecraftScheduler
.observeOn(Schedulers.io()) // -> ASYNC: Schedulers.io()
.filter(player -> {
// ASYNC: Schedulers.io()
return "Flards".equals(player.getName());
})
.observeOn(minecraftScheduler) // -> SYNC: minecraftScheduler
.subscribe(player -> {
// SYNC: minecraftScheduler
player.kick(Text.of("Computer says no"));
});
Scala
Scala 有一个内置的 Future Object,有很多 Scala 框架也反应出这一设计。Future 的大多数方法接受一个 ExecutionContext ,它决定了执行操作的哪一部分。这与 CompletableFuture 或 RxJava 不同,因为它们默认在上一个结束的操作的同一线程上执行。
事实上,所有这些试图隐式找到一个 ExecutionContext
的操作意味着你可以很容易地使用默认的 ExecutionContext.global
并具体运行在需要与 Sponge 服务端线程安全的部分。
为了避免通过 Sponge 的 ExecutorContext
的意外调度工作,另一个上下文应该被隐式定义并因此作为默认选择。为了保证线程安全,只有与 Sponge 实际交互的功能需要指定 Sponge 的 Executor。
import scala.concurrent.ExecutionContext
val executor = Sponge.getScheduler().createSyncExecutor(plugin)
import ExecutionContext.Implicits.global
val ec = ExecutionContext.fromExecutorService(executor)
val future = Future {
// ASYNC: ExecutionContext.Implicits.global
}
future foreach {
case value => // SYNC: ec
}(ec)
future map {
case value => 42 // SYNC: ec
}(ec).foreach {
case value => println(value) // ASYNC: ExecutionContext.Implicits.global
}