[AD] Scalaアプリケーションの開発・保守は合同会社ミルクソフトにお任せください
この記事では、Future を使用して、非同期的(並列)に処理を実行する方法について解説します。
非同期処理をするには Future を使う
非同期的に処理を行いたいとき、つまり既存の処理に並列して処理を実行したいときには、Future を使用します。
非同期的に処理を行いたい場面としては、例えばディスクIO、ネットワークIO、あるいはコストの大きい計算など、比較的時間のかかる処理を行う場合です。
もっとも簡単な使用例は以下のようになります。
import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global Future: for (i <- 1 to 3) Thread.sleep(100) println(s"future: $i") // A println("executed") // B
このコードでは、Future のあとに続くブロック内に時間のかかる処理を記述しています。
実行結果は以下のようになります。
executed future: 1 future: 2 future: 3
コードの書かれている順番としては(B)の println("executed") がいちばん最後ですが、文字列 "executed" は最初に出力されています。
ここでは、Futureブロック内の処理とブロック外の処理が並列に処理されることとなった結果、
ブロック外のprintln (B)が先に実行され、
ブロック内のprintln (A)は遅れて実行されています。
ScalaにおいてはこのようにしてFutureを使って非同期的(並列)に処理を行うことができます。
ただし、以下の説明を読まずにこのコードをそのまま使ってプロダクションコードを書いた場合には、 予期せぬスピードダウンが発生するかもしれません。
ぜひこの記事の最後まで読んでくださいね。
使い方の基本は Future と ExecutionContext の組み合わせ
Scalaで非同期処理をするには、 Future と、 ExecutionContext (実行コンテキスト)について知る必要があります。
「実行コンテキスト」とは何でしょうか?一言でいうとスレッドプールのことです(後述)。
まずは通常の処理がどのように行われているかについて振り返ってみましょう。
通常の処理では、標準の「実行コンテキスト」を使用している
上述のコードから、Futureブロックを剥がして同期的に実行してみます。
for (i <- 1 to 3) Thread.sleep(100) println(s"future: $i") // A println("executed") // B
これらの処理は上から順次、つまり同期的に処理されます。
したがって実行結果は以下のようになります。
future: 1 future: 2 future: 3 executed
(A)がすべて実行されたのち(B)が出力されています。
実際のところ、特に指定しない限り、これらすべての処理はこのコードを実行した「実行コンテキスト」内の「スレッド」において実行されています。
実行コンテキストとは、処理をどのように実行するか定義したグループのことです。
実行の仕組みについては後述しますが、内部に「スレッド」という名の処理レーンが1本以上用意されており、 このレーン上で処理が実行されることになっています。
このスレッドのグループのことを「スレッドプール」と呼びます。
ExecutionContextとはスレッドプールのことだと考えてよいです。
通常の処理はこの「標準の実行コンテキスト」内で実行されるので、 明示的に実行コンテキストを指定する必要はありません。
実行コンテキストの存在を意識せずに処理を実行できているのは、このためです。
非同期処理では、何らかの実行コンテキストを用意する必要がある
そもそも、 実行コンテキストは「コンテキスト(脈絡)」という名の通り、複数存在し得ます。 つまり、標準以外の実行コンテキストを作成して使用することもできます。
通常のように順次処理している分には、実行コンテキストは自明でした。 これ対して非同期処理においては、どのような実行コンテキストにおいて処理を行うべきかが自明ではありません。
なぜ複数種類の実行コンテキストを使用したいことがあるのでしょうか。 それは、高効率なプログラムを書く場合には、扱う処理に対して最適な実装の実行コンテキストを用意して使い分ける必要があるためです。
したがって、使用者が何らかの実行コンテキストを用意し、Futureに対してわかるように指定してあげる必要があるというわけです。
さて、上述の、Futureを使って非同期的に処理をしているコードに戻ってみましょう。
import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global Future: for (i <- 1 to 3) Thread.sleep(100) println(s"future: $i") // A println("executed") // B
このコードでは、冒頭の2行で何かをインポートしていますね。
scala.concurrent.Future と scala.concurrent.ExecutionContext.Implicits.global です。
2つ目こそ、Scalaが標準で用意している「実行コンテキスト」です。
つまり、このコードでは、Scalaが標準で用意している実行コンテキストを使用して非同期処理を行うよう指定しているというわけです。
Scalaが用意している ExecutionContext には scala.concurrent.ExecutionContext.global と scala.concurrent.ExecutionContext.Implicits.global があります。
これらのglobalはほぼ同じものを指しています。
ExecutionContext.global は、標準の実行コンテキストそのものです。
ExecutionContext.Implicits.global はそのエイリアスで、違いはimplicitであることだけです。
インポートすると暗黙変数として使用することができます。
Futureを使用している箇所においてExecutionContext.Implicits.globalをインポートすることで、
Futureに対して標準の実行コンテキストにて処理を実行するよう暗黙的に指定したこととなります。
使い方は「Futureに手を突っ込んで中身を操作する」イメージ
一度Futureで包んだら、中身の値を操作したい場合にもできるだけFutureの外から触るようにして、
できるだけ最後まで中身を取り出さないでおくようにします。
なぜこのような扱いをするのかというと、Futureの値を取り出すには非同期処理の完了を待つ必要があるためです。
取り出した後は同期的に処理されることになるため、一連の流れの途中で取り出してしまうと処理効率が低下しがちです。
したがって、Futureから値を取り出すのはできるだけ控えましょう。
難しそうに聞こえますが、Futureには便利なメソッドがたくさん用意されているため、
値を取り出さないまま外から様々な操作を加えることができます。
Futureの中身はmapやflatMapメソッドで操作する
mapやflatMapメソッドを使用すると、Futureの中身の値を操作することができます。
まずはmapメソッドを使用して中身を操作してみましょう。
import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global val f: Future[Int] = Future: Thread.sleep(100) 100 f.map(_ * 3) // mapメソッドを使用する .foreach(println)
このコードでは、もとの値100をmapメソッドによって3倍しています。
出力結果は以下のようになります。
300
あるいは、Futureの中にFutureが入れ子になっている場合、以下のようにflattenメソッド使うとうまく扱うことができます。
val f: Future[Future[Int]] = Future: Thread.sleep(100) Future: Thread.sleep(100) 100 val flatF: Future[Int] = f.flatten // flattenメソッドを使用する flatF.map(_ * 3) .foreach(println)
出力結果は以下のようになります(mapのときと同様です)。
300
また、そもそも入れ子になってしまうと扱いづらいです。
入れ子になるのを防ぎたい場合には、以下のようにflatMapメソッド使って処理することができます。
flatMapメソッドは、flattenメソッドとmapメソッドを組み合わせたものです。
val f: Future[Int] = Future { Thread.sleep(100) }.flatMap { _ => Future: Thread.sleep(100) 100 } f.map(_ * 3) // flatMapメソッドを使用する .foreach(println)
出力結果は以下のようになります(map、flattenのときと同様です)。
300
このようにして、入れ子になって積み上がりそうなFutureを適度に潰して平たくして処理したい場合には flatMap メソッドを使用すると便利です。
複数のFutureを扱うならfor式を使う
複数のFutureを扱いたい場合には、for式を使うと便利です。
val f1: Future[Int] = Future(100) val f2: Future[Int] = Future(3) for { a <- f1 b <- f2 } yield println(a * b)
出力結果は以下のようになります
300
for式は、内部的にはflatMap、map、あるいはwithFilterメソッドに変換されて実行されます。
便利メソッドを活用する
Futureには、mapやflatMap以外にも多くの便利メソッドが用意されています。
上で軽く振れたwithFilterメソッドもそうです。
APIドキュメントを参照して、どのようなメソッドがあるのかを確認しておきましょう。
3つの注意点を押さえる
Futureを使用する際に注意すべき点が大きく3つあります。
- 実行順
- 実行コンテキスト
- 例外処理
最後に、この3つについて確認してみましょう。
実行順を把握する
実はFutureは、非同期的(並列)に実行されるとはいえ、一旦変数に置かないと並列処理されません。
以下の例を見てください。
この例では、Futureを返すメソッドを2回呼んでいます。
import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global def getF(num: Int): Future[Unit] = Future: for (i <- 1 to 5) Thread.sleep(100) println(s"future $num: $i") for { _ <- getF(1) _ <- getF(2) } yield println("finished")
Futureを使っているので、getF(1)とgetF(2)が並列に実行されてほしいと思う人が多いでしょう。
しかしresult1の結果は以下のようになります。
future 1: 1 future 1: 2 future 1: 3 future 1: 4 future 1: 5 future 2: 1 future 2: 2 future 2: 3 future 2: 4 future 2: 5 finished
どうも様子がおかしいですね。
仕組みとしては、getF(1)の結果を待ってからgetF(2)が実行されているので、このようなことが起こります。
たとえワイルドカード_を使って結果の値を使わないようにしていても、この現象が起こるので注意してください。
さて、もう一つ例を用意しました。
こちらは、メソッド呼び出しの結果のFutureをそれぞれ一旦変数に置いてからfor式に渡しています。
val f1: Future[Unit] = getF(1) val f2: Future[Unit] = getF(2) for { _ <- f1 _ <- f2 } yield println("finished")
一旦変数に束縛しているのが冗長に見えますが、挙動の違いを確認してみましょう。
result2の結果は例えば以下のようになります。
future 1: 1 future 2: 1 future 1: 2 future 2: 2 future 1: 3 future 2: 3 future 1: 4 future 2: 4 future 1: 5 future 2: 5 finished
こんどは順不同に、つまり並列に実行されていることがわかります
(実際には、f1の出力とf2の出力が必ず交互に並ぶとは限りません)。
変数に束縛した時点でそれぞれFutureブロック内の処理が開始されているので、並列に処理できていたというわけです。
広いスコープにおいてはFutureを変数に置かないことはあまりないので心配する必要性は薄いのですが、
狭いスコープ内で処理を確実に並列に実行したい場合には注意してください。
また、for式そのものは並列に処理するための仕組みではないことにも注意してください。
最適な実行コンテキスト(ExecutionContext)を選ぶ
非同期処理を行うということは効率よく処理を行いたいということです。
効率よく処理を行うには、実行コンテキストに関する理解は欠かせません。
内部実装について知る
実行コンテキストは、内部実装的には「スレッドプール」です。
標準の実行コンテキスト(ExecutionContext.global)は使わないようにする
Scalaに標準で用意されている実行コンテキストExecutionContext.globalは、デフォルトでは並列数がCPUのコア数分だけになっています。
例えばコアが1つだけの場合、Futureで時間のかかる処理を実行すると、一発でスレッドプールが詰まります。並列もなにもありません。
あるいは時間のかかる処理がコア数以上に同時に実行された場合にも詰まります。
単純に ExecutionContext.globalを使って非同期処理をしてもいい場面は、並列処理数がコア数を超えない場合のみです。
sbt consoleなどで手元でちょっと実験するためのものと割り切ることをおすすめします。
Scala ExecutionContextって何 / Futureはスレッド立ち上げじゃないよ - ましめも https://mashi.hatenablog.com/entry/2014/11/24/010417
Future内でThread.sleepはするな - ましめも https://mashi.hatenablog.com/entry/2014/12/08/000149
フレームワークがExecutionContextを用意してくれる場合もある
ExecutionContextは、フレームワークが用意してくれることもあります。
Akkaや、AkkaHTTPを使用しているPlayFrameworkなどでは、Akka自体がActorSystemを用意してくれています。
ActorSystemはExecutionContextを継承しています。
Akkaではapplication.confにて詳しく設定することができます。
ExecutionContextを自前で用意する
手頃な実行コンテキストが用意されていない場合は、自分で用意しましょう。
用意する方法は大きく2つあります。
- JavaのAPIでスレッドプールを作成し、
ExecutionContext#fromExecutorや#fromExecutorServiceメソッドに渡してExecutionContextを得る - システムプロパティを設定し、
ExecutionContext.globalの並列数をCPUのn倍にするExecutionContext.global自体が事故の元なので、おすすめはしません
タスクの種類によってExecutionContextを適切に使い分ける必要があります。
ExecutionContextの種類について確認する
Javaで作成できるスレッドプールには以下のようなものがあります。
ForkJoinPool (Java SE 15 & JDK 15) https://docs.oracle.com/javase/jp/15/docs/api/java.base/java/util/concurrent/ForkJoinPool.html
※ scala.concurrent.ExecutionContext.globalは、実装上は「CPUコア数×n倍」を並列数としたForkJoinPoolです。
また、java.util.concurrentExecutorsの以下のようなメソッドを使用して作成することもできます(抜粋)。
newCachedThreadPool()newFixedThreadPool(int nThreads)newScheduledThreadPool(int corePoolSize)newSingleThreadExecutor()newSingleThreadScheduledExecutor()newWorkStealingPool()
Executors (Java SE 15 & JDK 15) https://docs.oracle.com/javase/jp/15/docs/api/java.base/java/util/concurrent/Executors.html
必ずエラーハンドリングをする
エラー処理に関しては、何もしなければ結果を取り出すことがないというFutureの特徴が災いする可能性がありますので、
注意して扱いましょう。
そもそも同期処理の場合には、例外発生時には呼び出し元が処理をします。
catchするなどして対応しなければ処理全体が失敗する原因となりますが、
逆に言えば対応を半ば強いられるという点で、異常発生に気づきやすい仕組みになっています。
これに対して、Futureブロック内で例外が発生した場合には、一旦Future型が例外をキャッチします。
このままでは実はキャッチされた例外は誰にも処理されず、最終的にガベージコレクションによって宇宙の彼方へと消え去ることになります。
「例外が握りつぶされれていることに長い間運用していても気づかない」などということが起こり得るので、忘れずにエラーハンドリングをしましょう。
Futureでの例外処理には onComplete や recover を使う
Future内部で生じた例外を処理するには、onCompleteメソッド、またはrecover/recoverWithメソッドを使用します。
onCompleteメソッドは戻り値がUnitです。したがって、エラーハンドリングが最終結果に影響しない場合に使用します。
val f = Future: throw new Exception("error!") f.onComplete: case Success(_) => println(1) case Failure(_) => println(100)
出力結果は以下のようになります。
100
これに対して、recover/recoverWithメソッドは戻り値がFuture[U]です。
例外処理をしてもとの処理に復帰させたいときに使用します。
val f = Future { throw new Exception("error!") }.recover: case _ => 100 f.foreach(println)
recoverメソッドとrecoverWithメソッドの使い分けは、引数としてFutureに包まれた値をとるかどうかです。
val f = Future { throw new Exception("error!") }.recoverWith: case _ => Future(100) f.foreach(println)
Akkaでは、例外処理していない場合にはログを出力してくれる
ちなみに、Akkaにおいては、ExecutionContextとしてAkkaのActorSystemを使用している限り、
その実装であるActorSystemImplが未処理の例外をログに出力してくれるので、
ログさえ見ておけば例外発生が握りつぶされてしまうことはありません。

