[AD] Scalaアプリケーションの開発・保守は合同会社ミルクソフトにお任せください
この記事では、Future
を使用して、非同期的(並列)に処理を実行する方法について解説します。
非同期処理をするには Future を使う
非同期的に処理を行いたいとき、つまり既存の処理に並列して処理を実行したいときには、Future
を使用します。
非同期的に処理を行いたい場面としては、例えばディスクIO、ネットワークIO、あるいはコストの大きい計算など、比較的時間のかかる処理を行う場合です。
もっとも簡単な使用例は以下のようになります。
import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global Future: for (i <- 1 to 3) Thread.sleep(100) println(s"future: $i") // A println("executed") // B
このコードでは、Future
のあとに続くブロック内に時間のかかる処理を記述しています。
実行結果は以下のようになります。
executed future: 1 future: 2 future: 3
コードの書かれている順番としては(B)の println("executed")
がいちばん最後ですが、文字列 "executed" は最初に出力されています。
ここでは、Future
ブロック内の処理とブロック外の処理が並列に処理されることとなった結果、
ブロック外のprintln
(B)が先に実行され、
ブロック内のprintln
(A)は遅れて実行されています。
ScalaにおいてはこのようにしてFuture
を使って非同期的(並列)に処理を行うことができます。
ただし、以下の説明を読まずにこのコードをそのまま使ってプロダクションコードを書いた場合には、 予期せぬスピードダウンが発生するかもしれません。
ぜひこの記事の最後まで読んでくださいね。
使い方の基本は Future
と ExecutionContext
の組み合わせ
Scalaで非同期処理をするには、 Future
と、 ExecutionContext
(実行コンテキスト)について知る必要があります。
「実行コンテキスト」とは何でしょうか?一言でいうとスレッドプールのことです(後述)。
まずは通常の処理がどのように行われているかについて振り返ってみましょう。
通常の処理では、標準の「実行コンテキスト」を使用している
上述のコードから、Future
ブロックを剥がして同期的に実行してみます。
for (i <- 1 to 3) Thread.sleep(100) println(s"future: $i") // A println("executed") // B
これらの処理は上から順次、つまり同期的に処理されます。
したがって実行結果は以下のようになります。
future: 1 future: 2 future: 3 executed
(A)がすべて実行されたのち(B)が出力されています。
実際のところ、特に指定しない限り、これらすべての処理はこのコードを実行した「実行コンテキスト」内の「スレッド」において実行されています。
実行コンテキストとは、処理をどのように実行するか定義したグループのことです。
実行の仕組みについては後述しますが、内部に「スレッド」という名の処理レーンが1本以上用意されており、 このレーン上で処理が実行されることになっています。
このスレッドのグループのことを「スレッドプール」と呼びます。
ExecutionContext
とはスレッドプールのことだと考えてよいです。
通常の処理はこの「標準の実行コンテキスト」内で実行されるので、 明示的に実行コンテキストを指定する必要はありません。
実行コンテキストの存在を意識せずに処理を実行できているのは、このためです。
非同期処理では、何らかの実行コンテキストを用意する必要がある
そもそも、 実行コンテキストは「コンテキスト(脈絡)」という名の通り、複数存在し得ます。 つまり、標準以外の実行コンテキストを作成して使用することもできます。
通常のように順次処理している分には、実行コンテキストは自明でした。 これ対して非同期処理においては、どのような実行コンテキストにおいて処理を行うべきかが自明ではありません。
なぜ複数種類の実行コンテキストを使用したいことがあるのでしょうか。 それは、高効率なプログラムを書く場合には、扱う処理に対して最適な実装の実行コンテキストを用意して使い分ける必要があるためです。
したがって、使用者が何らかの実行コンテキストを用意し、Future
に対してわかるように指定してあげる必要があるというわけです。
さて、上述の、Future
を使って非同期的に処理をしているコードに戻ってみましょう。
import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global Future: for (i <- 1 to 3) Thread.sleep(100) println(s"future: $i") // A println("executed") // B
このコードでは、冒頭の2行で何かをインポートしていますね。
scala.concurrent.Future
と scala.concurrent.ExecutionContext.Implicits.global
です。
2つ目こそ、Scalaが標準で用意している「実行コンテキスト」です。
つまり、このコードでは、Scalaが標準で用意している実行コンテキストを使用して非同期処理を行うよう指定しているというわけです。
Scalaが用意している ExecutionContext
には scala.concurrent.ExecutionContext.global
と scala.concurrent.ExecutionContext.Implicits.global
があります。
これらのglobal
はほぼ同じものを指しています。
ExecutionContext.global
は、標準の実行コンテキストそのものです。
ExecutionContext.Implicits.global
はそのエイリアスで、違いはimplicit
であることだけです。
インポートすると暗黙変数として使用することができます。
Future
を使用している箇所においてExecutionContext.Implicits.global
をインポートすることで、
Future
に対して標準の実行コンテキストにて処理を実行するよう暗黙的に指定したこととなります。
使い方は「Future
に手を突っ込んで中身を操作する」イメージ
一度Future
で包んだら、中身の値を操作したい場合にもできるだけFuture
の外から触るようにして、
できるだけ最後まで中身を取り出さないでおくようにします。
なぜこのような扱いをするのかというと、Future
の値を取り出すには非同期処理の完了を待つ必要があるためです。
取り出した後は同期的に処理されることになるため、一連の流れの途中で取り出してしまうと処理効率が低下しがちです。
したがって、Future
から値を取り出すのはできるだけ控えましょう。
難しそうに聞こえますが、Future
には便利なメソッドがたくさん用意されているため、
値を取り出さないまま外から様々な操作を加えることができます。
Future
の中身はmap
やflatMap
メソッドで操作する
map
やflatMap
メソッドを使用すると、Future
の中身の値を操作することができます。
まずはmap
メソッドを使用して中身を操作してみましょう。
import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global val f: Future[Int] = Future: Thread.sleep(100) 100 f.map(_ * 3) // mapメソッドを使用する .foreach(println)
このコードでは、もとの値100
をmapメソッドによって3倍しています。
出力結果は以下のようになります。
300
あるいは、Future
の中にFuture
が入れ子になっている場合、以下のようにflatten
メソッド使うとうまく扱うことができます。
val f: Future[Future[Int]] = Future: Thread.sleep(100) Future: Thread.sleep(100) 100 val flatF: Future[Int] = f.flatten // flattenメソッドを使用する flatF.map(_ * 3) .foreach(println)
出力結果は以下のようになります(map
のときと同様です)。
300
また、そもそも入れ子になってしまうと扱いづらいです。
入れ子になるのを防ぎたい場合には、以下のようにflatMap
メソッド使って処理することができます。
flatMap
メソッドは、flatten
メソッドとmap
メソッドを組み合わせたものです。
val f: Future[Int] = Future { Thread.sleep(100) }.flatMap { _ => Future: Thread.sleep(100) 100 } f.map(_ * 3) // flatMapメソッドを使用する .foreach(println)
出力結果は以下のようになります(map
、flatten
のときと同様です)。
300
このようにして、入れ子になって積み上がりそうなFuture
を適度に潰して平たくして処理したい場合には flatMap
メソッドを使用すると便利です。
複数のFuture
を扱うならfor式を使う
複数のFuture
を扱いたい場合には、for式を使うと便利です。
val f1: Future[Int] = Future(100) val f2: Future[Int] = Future(3) for { a <- f1 b <- f2 } yield println(a * b)
出力結果は以下のようになります
300
for
式は、内部的にはflatMap
、map
、あるいはwithFilter
メソッドに変換されて実行されます。
便利メソッドを活用する
Futureには、map
やflatMap
以外にも多くの便利メソッドが用意されています。
上で軽く振れたwithFilter
メソッドもそうです。
APIドキュメントを参照して、どのようなメソッドがあるのかを確認しておきましょう。
3つの注意点を押さえる
Future
を使用する際に注意すべき点が大きく3つあります。
- 実行順
- 実行コンテキスト
- 例外処理
最後に、この3つについて確認してみましょう。
実行順を把握する
実はFuture
は、非同期的(並列)に実行されるとはいえ、一旦変数に置かないと並列処理されません。
以下の例を見てください。
この例では、Future
を返すメソッドを2回呼んでいます。
import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global def getF(num: Int): Future[Unit] = Future: for (i <- 1 to 5) Thread.sleep(100) println(s"future $num: $i") for { _ <- getF(1) _ <- getF(2) } yield println("finished")
Future
を使っているので、getF(1)
とgetF(2)
が並列に実行されてほしいと思う人が多いでしょう。
しかしresult1
の結果は以下のようになります。
future 1: 1 future 1: 2 future 1: 3 future 1: 4 future 1: 5 future 2: 1 future 2: 2 future 2: 3 future 2: 4 future 2: 5 finished
どうも様子がおかしいですね。
仕組みとしては、getF(1)
の結果を待ってからgetF(2)
が実行されているので、このようなことが起こります。
たとえワイルドカード_
を使って結果の値を使わないようにしていても、この現象が起こるので注意してください。
さて、もう一つ例を用意しました。
こちらは、メソッド呼び出しの結果のFuture
をそれぞれ一旦変数に置いてからfor式に渡しています。
val f1: Future[Unit] = getF(1) val f2: Future[Unit] = getF(2) for { _ <- f1 _ <- f2 } yield println("finished")
一旦変数に束縛しているのが冗長に見えますが、挙動の違いを確認してみましょう。
result2
の結果は例えば以下のようになります。
future 1: 1 future 2: 1 future 1: 2 future 2: 2 future 1: 3 future 2: 3 future 1: 4 future 2: 4 future 1: 5 future 2: 5 finished
こんどは順不同に、つまり並列に実行されていることがわかります
(実際には、f1
の出力とf2
の出力が必ず交互に並ぶとは限りません)。
変数に束縛した時点でそれぞれFuture
ブロック内の処理が開始されているので、並列に処理できていたというわけです。
広いスコープにおいてはFuture
を変数に置かないことはあまりないので心配する必要性は薄いのですが、
狭いスコープ内で処理を確実に並列に実行したい場合には注意してください。
また、for式そのものは並列に処理するための仕組みではないことにも注意してください。
最適な実行コンテキスト(ExecutionContext
)を選ぶ
非同期処理を行うということは効率よく処理を行いたいということです。
効率よく処理を行うには、実行コンテキストに関する理解は欠かせません。
内部実装について知る
実行コンテキストは、内部実装的には「スレッドプール」です。
標準の実行コンテキスト(ExecutionContext.global
)は使わないようにする
Scalaに標準で用意されている実行コンテキストExecutionContext.global
は、デフォルトでは並列数がCPUのコア数分だけになっています。
例えばコアが1つだけの場合、Future
で時間のかかる処理を実行すると、一発でスレッドプールが詰まります。並列もなにもありません。
あるいは時間のかかる処理がコア数以上に同時に実行された場合にも詰まります。
単純に ExecutionContext.global
を使って非同期処理をしてもいい場面は、並列処理数がコア数を超えない場合のみです。
sbt console
などで手元でちょっと実験するためのものと割り切ることをおすすめします。
Scala ExecutionContextって何 / Futureはスレッド立ち上げじゃないよ - ましめも https://mashi.hatenablog.com/entry/2014/11/24/010417
Future内でThread.sleepはするな - ましめも https://mashi.hatenablog.com/entry/2014/12/08/000149
フレームワークがExecutionContextを用意してくれる場合もある
ExecutionContext
は、フレームワークが用意してくれることもあります。
Akkaや、AkkaHTTPを使用しているPlayFrameworkなどでは、Akka自体がActorSystem
を用意してくれています。
ActorSystem
はExecutionContext
を継承しています。
Akkaではapplication.confにて詳しく設定することができます。
ExecutionContextを自前で用意する
手頃な実行コンテキストが用意されていない場合は、自分で用意しましょう。
用意する方法は大きく2つあります。
- JavaのAPIでスレッドプールを作成し、
ExecutionContext#fromExecutor
や#fromExecutorService
メソッドに渡してExecutionContextを得る - システムプロパティを設定し、
ExecutionContext.global
の並列数をCPUのn倍にするExecutionContext.global
自体が事故の元なので、おすすめはしません
タスクの種類によってExecutionContext
を適切に使い分ける必要があります。
ExecutionContextの種類について確認する
Javaで作成できるスレッドプールには以下のようなものがあります。
ForkJoinPool (Java SE 15 & JDK 15) https://docs.oracle.com/javase/jp/15/docs/api/java.base/java/util/concurrent/ForkJoinPool.html
※ scala.concurrent.ExecutionContext.global
は、実装上は「CPUコア数×n倍」を並列数としたForkJoinPool
です。
また、java.util.concurrentExecutors
の以下のようなメソッドを使用して作成することもできます(抜粋)。
newCachedThreadPool()
newFixedThreadPool(int nThreads)
newScheduledThreadPool(int corePoolSize)
newSingleThreadExecutor()
newSingleThreadScheduledExecutor()
newWorkStealingPool()
Executors (Java SE 15 & JDK 15) https://docs.oracle.com/javase/jp/15/docs/api/java.base/java/util/concurrent/Executors.html
必ずエラーハンドリングをする
エラー処理に関しては、何もしなければ結果を取り出すことがないというFuture
の特徴が災いする可能性がありますので、
注意して扱いましょう。
そもそも同期処理の場合には、例外発生時には呼び出し元が処理をします。
catch
するなどして対応しなければ処理全体が失敗する原因となりますが、
逆に言えば対応を半ば強いられるという点で、異常発生に気づきやすい仕組みになっています。
これに対して、Future
ブロック内で例外が発生した場合には、一旦Future
型が例外をキャッチします。
このままでは実はキャッチされた例外は誰にも処理されず、最終的にガベージコレクションによって宇宙の彼方へと消え去ることになります。
「例外が握りつぶされれていることに長い間運用していても気づかない」などということが起こり得るので、忘れずにエラーハンドリングをしましょう。
Future
での例外処理には onComplete
や recover
を使う
Future
内部で生じた例外を処理するには、onComplete
メソッド、またはrecover
/recoverWith
メソッドを使用します。
onComplete
メソッドは戻り値がUnit
です。したがって、エラーハンドリングが最終結果に影響しない場合に使用します。
val f = Future: throw new Exception("error!") f.onComplete: case Success(_) => println(1) case Failure(_) => println(100)
出力結果は以下のようになります。
100
これに対して、recover
/recoverWith
メソッドは戻り値がFuture[U]
です。
例外処理をしてもとの処理に復帰させたいときに使用します。
val f = Future { throw new Exception("error!") }.recover: case _ => 100 f.foreach(println)
recover
メソッドとrecoverWith
メソッドの使い分けは、引数としてFuture
に包まれた値をとるかどうかです。
val f = Future { throw new Exception("error!") }.recoverWith: case _ => Future(100) f.foreach(println)
Akkaでは、例外処理していない場合にはログを出力してくれる
ちなみに、Akkaにおいては、ExecutionContext
としてAkkaのActorSystem
を使用している限り、
その実装であるActorSystemImpl
が未処理の例外をログに出力してくれるので、
ログさえ見ておけば例外発生が握りつぶされてしまうことはありません。