Java SE 9の新機能としてはあまり目立ってはいませんが、非同期処理ライブラリの「Concurrency Utilities」にも新しい機能が導入されています。Concurrency Utilitesの新機能はJEP 266 More Concurrency Updateで提案されています。導入される主な機能は以下の2つです。

  • Reactive Streamsの導入
  • CompletableFutureクラスの拡充

 前者のReactive Streamsは新しい概念の導入になります。一方、後者のCompletableFutureクラスは様々なメソッドが追加されているものの、基本的な使い方はこれまでと同じです。そこで、今回は前者のReactive Streamsについて紹介していきます。

Reactive Streams

 Reactive Streamsは、非同期ストリーム処理を実現するため、Reactive Streams Special Interest Group(SIG)が策定した仕様です。Javaだけでなく、JavaScriptや.NET向けのインタフェースも定義されています。

 Reactie Streamsを採用しているライブラリとして、RxJavaAkka Streamsなどがあります。Java SE 9では、このReactive Streamsとして策定されたインタフェースを取り込んでいます*1

 Reactive Streamsは「リアクティブプログラミング」の一種としてとらえることができます。リアクティブプログラミングは、データの流れ(ストリーム)に着目したプログラミングスタイルです。一般に、「Publish-Subscribe」モデルを採用してデータストリームを扱います。

 Publish-Subscribeモデルでは、データを作成(生産)してストリームとして流すPublisherと、流れてきたデータを処理(消費)するSubscriberとの間でやり取りします。リアクティブプログラミングでは非同期処理が必須というわけではありませんが、データの生産側と消費側が疎結合になるモデルなため、非同期処理にへの移行が容易になります。

*1 ストリームといってはいますが、java.util.stream.Streamインタフェースとは別ものなので、ご注意ください。

Publish-Subscribeモデルの処理の流れ

 Reactive Streamsでの処理の流れを説明する前に、まず一般的なPublish-Subscribeモデルでの処理の流れを説明しておきましょう。

 図1に正常時の処理の流れをUMLのシーケンス図で示しました。

図1●Publish-Subscribeモデルの処理の流れ

 まず、SubscriberはPublisherに購読(subscribe)の登録を行います。すると、Publisherはデータを生成しSubscriberに通知します。データの生成が続く限り、通知を繰り返します。

 データの生成が完了したら、PublisherはSubscriberにその旨を通知します。また、図1には示していませんが、途中でエラーが発生した場合、Subscriberにエラーを通知します。図1では、Publisher自身がデータを生成しているように記述していますが、このほかにもGUIでのイベントやファイルの読み込み、ネットワークを介した通信といった様々なデータを扱えます。さらに、SubscriberとPublisherを組み合わせ、受け取ったデータを再加工して他のSubscriberに通知することもよく行われます。

この先は会員の登録が必要です。今なら有料会員(月額プラン)登録で6月末まで無料!

日経 xTECHには有料記事(有料会員向けまたは定期購読者向け)、無料記事(登録会員向け)、フリー記事(誰でも閲覧可能)があります。有料記事でも、登録会員向け配信期間は登録会員への登録が必要な場合があります。有料会員と登録会員に関するFAQはこちら