调度器

Sponge 提供了 Scheduler 机制用于允许插件开发者指定希望在未来完成的任务。 Scheduler 提供了一个 Task.Builder 可以用于指定项目的属性如延迟时间、名称、同步或异步、以及一个 Runnable (参见 属性 )。

生成器

首先我们先获取到一个 Task.Builder 的实例:

import org.spongepowered.api.scheduler.Task;

Task.Builder taskBuilder = Task.builder();

唯一必需的属性是 Runnable ,该属性通过 Task.Builder#execute(Runnable) 方法指定。

taskBuilder.execute(new Runnable() {
    public void run() {
        logger.info("Yay! Schedulers!");
    }
});

或者使用 Task.Builder#execute(Runnable runnable) 方法的 Java 8 形式

taskBuilder.execute(
    () -> {
        logger.info("Yay! Schedulers!");
    }
);

或者使用 Task.Builder#execute(Consumer<Task> task) 方法的 Java 8 形式

taskBuilder.execute(
    task -> {
        logger.info("Yay! Schedulers! :" + task.getName());
    }
);

属性

通过使用 Task.Builder ,你可以指定其他可选的属性,如下所述。

属性名 使用的方法 说明
delay

delayTicks(long delay)

delay(long delay,
TimeUnit unit)

在运行前经过的可选的延迟时间。

时间被指定为具有 delayTicks() 方法设置的 Tick 数,或者通过使用 delay() 方法指定 TimeUnit 作为一个更方便的时间单位。

任一方法均可,但不能同时指定。

interval
intervalTicks(
long interval)
interval(long interval,
TimeUnit unit)

两次任务重复之间的延时。若未指定,则任务不会重复。

时间被指定为具有 intervalTicks() 方法设置的 Tick 数,或者通过使用 interval() 方法指定 TimeUnit 以使用更方便的时间单位。

任一方法均可,但不能同时指定。

synchronization async() 同步(Synchronous)任务在游戏的主循环也就是每 Tick 的循环中运行。如果使用 Task.Builder#async ,任务将以异步的方式运行。因此,它将在自己的线程中运行,独立于tick循环,并且可能没有办法安全地使用游戏中的状态(参见 Asynchronous Tasks )。
name name(String name) 用于描述的名称。默认情况下的名称是 PLUGIN_ID “-” ( “A-” | “S-” ) SERIAL_ID 。比如一个可能的默认名称会是这样子:“FooPlugin-A-12”。Sponge 不会默认分发两个 SERIAL_ID 和 同步/异步类型完全相同的名称。如果指定了特殊的名称,那么该名称应该有比较好的描述性,并在用户调试你的插件时起到很大的帮助。

最后,通过使用 Task.Builder#submit(Object) 方法,把属性设置提交给调度器。

就是这样!综上所述,通过下面的代码,我们就可以生成并提交一个功能齐全的调度器。该调度器会以异步的方式每五分钟运行一次,第一次运行将会在设置后的 100 毫秒后发生:

import java.util.concurrent.TimeUnit;

Task task = Task.builder().execute(() -> logger.info("Yay! Schedulers!"))
    .async().delay(100, TimeUnit.MILLISECONDS).interval(5, TimeUnit.MINUTES)
    .name("ExamplePlugin - Fetch Stats from Database").submit(plugin);

若要取消,只需要简单地调用 Task#cancel() 方法就可以了:

task.cancel();

如果你想通过调度器本身来取消的话,你可以转而去使用 Consumer<Task> 以获取实例本身。下面的示例将在调度器中进行从 60 开始的倒计时,并在倒计时达到零时取消自身。

@Listener
public void onGameInit(GameInitializationEvent event) {
    Task task = Task.builder().execute(new CancellingTimerTask())
        .interval(1, TimeUnit.SECONDS)
        .name("Self-Cancelling Timer Task").submit(plugin);
}

private class CancellingTimerTask implements Consumer<Task> {
    private int seconds = 60;
    @Override
    public void accept(Task task) {
        seconds--;
        Sponge.getServer()
            .getBroadcastChannel()
            .send(Text.of("Remaining Time: "+seconds+"s"));
        if (seconds < 1) {
            task.cancel();
        }
    }
}

异步调度器

异步任务应主要用于可能需要相当长时间执行的代码,比如对另一个服务器或数据库的请求。如果在主线程上完成,对另一个服务器的请求可能会大大影响游戏的性能,因为在请求完成之前,游戏不能进行到下一个 Tick。

由于 Minecraft 主要是单线程的,你几乎不能在异步线程中做什么事情。如果你一定要异步运行线程的话,你应该执行所有不使用 SpongeAPI ,并且不影响 Minecraft 的代码,然后注册另一个同步任务来处理需要它们的代码。不过,在 Minecraft 中有一小部分是可以异步工作的,这包括:

  • 聊天
  • Sponge 的内置权限管理系统
  • Sponge 的调度器本身

此外,还有一些异步安全的操作:

  • 独立的网络请求
  • (不包含 Sponge 使用的文件的)文件系统的 IO

和第三方库的兼容性

随着插件在大小和范围上的增长,你可能希望开始使用使用 Java 或兼容 JVM 的许多第三方并发库之一。这些库确实倾向于支持 Java 的 ExecutorService 作为任务在哪个线程上执行的指示。

为了允许这些库和 Sponge 的 Scheduler 一起使用,可以使用以下方法:

需要时刻牢记的一件事是,任何与 Sponge 交互的任务,如果它们与 Sponge 的交互方式,在 Asynchronous Tasks 中列出的交互方式之外,那么需要在使用 Scheduler#createSyncExecutor(Object) 创建的 ExecutorService 上执行才能保证线程安全。

import org.spongepowered.api.scheduler.SpongeExecutorService;

SpongeExecutorService minecraftExecutor = Sponge.getScheduler().createSyncExecutor(plugin);

minecraftExecutor.submit(() -> { ... });

minecraftExecutor.schedule(() -> { ... }, 10, TimeUnit.SECONDS);

几乎所有的库都有一些适应 ExecutorService 的方法来本地调度任务。作为示例,下面的内容将解释 ExecutorService 如何在许多库中使用。

CompletableFuture(Java 8)

在 Java 8 中,对象 CompletableFuture 被添加到标准库中。与 Future 对象相比,它允许开发人员提供一个回调,当 Future 完成时调用,而不是阻塞线程,直到 Future 最终完成。

CompletableFuture 是一个流畅的(Fluent)接口,它的每个功能通常有以下三个变体:

  • CompletableFuture#<function>Async(..., Executor ex) 通过 ex 执行
  • CompletableFuture#<function>Async(...) 通过 ForkJoinPool.commonPool() 执行
  • CompletableFuture#<function>Async(...) 通过之前的 CompletableFuture 完成的任何线程上执行。
import java.util.concurrent.CompletableFuture;

SpongeExecutorService minecraftExecutor = Sponge.getScheduler().createSyncExecutor(plugin);

CompletableFuture.supplyAsync(() -> {
    // ASYNC: ForkJoinPool.commonPool()
    return 42;
}).thenAcceptAsync((awesomeValue) -> {
    // SYNC: minecraftExecutor
}, minecraftExecutor).thenRun(() -> {
    // SYNC: minecraftExecutor
});

RxJava

RxJavaReactive Extensions 理念在 JVM 上的一个实现。

Rx 中的多线程是通过各种 调度器 管理的。使用 Schedulers#from(Executor executor) 方法,Sponge 提供的 Executor 可以变成 Scheduler

就像 CompletableFuture 的默认行为是在整条执行链的前一部分的相同线程上执行的一样,使用 Observable#observeOn(Scheduler scheduler) 在线程之间移动。

需要记住重要的一点是,根 ObservableObservable#subscribe() 被调用的任何线程上被调用。也就是说,如果根 Observable 与 Sponge 交互,它应该被强制使用 Observable#subscribeOn(Scheduler scheduler) 来同步运行。

import rx.Observable;
import rx.Scheduler;
import rx.schedulers.Schedulers;

SpongeExecutorService executor = Sponge.getScheduler().createSyncExecutor(plugin);
Scheduler minecraftScheduler = Schedulers.from(executor);

Observable.defer(() -> Observable.from(Sponge.getServer().getOnlinePlayers()))
          .subscribeOn(minecraftScheduler) // defer -> SYNC: minecraftScheduler
          .observeOn(Schedulers.io()) // -> ASYNC: Schedulers.io()
          .filter(player -> {
              // ASYNC: Schedulers.io()
              return "Flards".equals(player.getName());
          })
          .observeOn(minecraftScheduler) // -> SYNC: minecraftScheduler
          .subscribe(player -> {
              // SYNC: minecraftScheduler
              player.kick(Text.of("Computer says no"));
          });

Scala

Scala 有一个内置的 Future Object,有很多 Scala 框架也反应出这一设计。Future 的大多数方法接受一个 ExecutionContext ,它决定了执行操作的哪一部分。这与 CompletableFuture 或 RxJava 不同,因为它们默认在上一个结束的操作的同一线程上执行。

事实上,所有这些试图隐式找到一个 ExecutionContext 的操作意味着你可以很容易地使用默认的 ExecutionContext.global 并具体运行在需要与 Sponge 服务端线程安全的部分。

为了避免通过 Sponge 的 ExecutorContext 的意外调度工作,另一个上下文应该被隐式定义并因此作为默认选择。为了保证线程安全,只有与 Sponge 实际交互的功能需要指定 Sponge 的 Executor。

import scala.concurrent.ExecutionContext

val executor = Sponge.getScheduler().createSyncExecutor(plugin)

import ExecutionContext.Implicits.global
val ec = ExecutionContext.fromExecutorService(executor)

val future = Future {
    // ASYNC: ExecutionContext.Implicits.global
}

future foreach {
    case value => // SYNC: ec
}(ec)

future map {
    case value => 42 // SYNC: ec
}(ec).foreach {
    case value => println(value) // ASYNC: ExecutionContext.Implicits.global
}