同期を陽に扱うParモナド

 このように,Evalモナドを使うと,並列処理のために必要な処理の流れをわかりやすく記述できます。しかし,実際に並列プログラムの実行順序を細かく制御しようとすると,Evalモナドでも不十分な場合があります。

 例を挙げましょう。

do rpar x
   rseq y
   f x y

do x <- rpar x'
   y <- rseq y'
   f x y

 これらの式では,「rpar関数を使って並列処理のタスクを作成し,rseq関数で値を評価して,それらの結果にf関数を適用する」という並列処理の実行手順はきちんと書かれています。しかし,重要な情報が欠けています。Haskellでは遅延評価を行うので,「f x y」に与えられているxとyのうち,どちらが先に評価されるかがこれらの式だけではわかりません。実際の振る舞いは,fの正格性に依存します。fがxに対して正格であるならxが先に評価され,yに対して正格であるならyが先に評価されます。

 これらの式の評価方法がfの定義に依存するのは,並列処理のタスクに対して同期を制御する機能がEvalモナドでは提供されていないためです。Evalモナドではxとyのどちらの結果を先にfに渡すかを定義できません。その結果,並列処理の評価順序がfの定義に依存することになります。

 この問題を回避する目的で,並列処理のタスクの結果を保持する変数に同期処理を組み込んだものが,monad-parパッケージControl.Monad.Parモジュールで提供されているParモナドです(参考リンク1参考リンク2)。

 Parモナドでは,処理を並列化するためのfork関数と,fork関数を使って並列化された処理の結果を受けてfork関数の呼び出し元に渡すためのIVar型を提供しています。これらを使って定義したPar型の並列処理を,runPar関数で実行します。

Prelude Control.Monad.Par> :t fork
fork :: Par () -> Par ()
Prelude Control.Monad.Par> :i IVar 
newtype IVar a
  = Control.Monad.Par.Internal.IVar (GHC.IORef.IORef
                                       (Control.Monad.Par.Internal.IVarContents a))
  	-- Defined in Control.Monad.Par.Internal
Prelude Control.Monad.Par> :t new
new :: Par (IVar a)
Prelude Control.Monad.Par> :t get
get :: IVar a -> Par a
Prelude Control.Monad.Par> :t put
put :: NFData a => IVar a -> a -> Par ()
Prelude Control.Monad.Par> :t put_
put_ :: IVar a -> a -> Par ()
Prelude Control.Monad.Par> :t runPar 
runPar :: Par a -> a

 IVarは「空または値を持つ,一度だけ代入可能な記憶領域を値の保存先として指す変数」です(参考リンク1参考リンク2)。

 普通のHaskellの変数では,「値を代入する前の空の状態」と「値を代入した後の値を持つ状態」はソースコード上では区別されません。そのため,空から値を持つ状態への移行に伴う同期は,コンパイル時に生成されたコードを使って実行時にランタイム・システムで暗黙に行われることになります。

 一方IVarでは,「IVar型に値を代入するput*関数」という操作を利用するため,空から値を持つ状態への移行がソースコード上で可視化されます。これにより,ソースコードのレベルで陽に同期を行えるようになります。

 空のIVar型の作成にはnew関数,IVar型に格納された値の取得にはget関数を使います。第12回で紹介したMVarと同様に,これらの操作には同期処理が組み込まれています。例えば,空のIVarに対してget関数が呼ばれた場合には,put関数などを使ってIVarに値が代入されるまでは,値の取得はブロックされます。

import Control.DeepSeq    (NFData)
import Control.Concurrent (threadDelay)
import Control.Monad.Par  (fork, Par, new, put, get)
import System.IO.Unsafe   (unsafeDupablePerformIO)

putByForkedTask :: (Num a, NFData a) => Par a
putByForkedTask
  = do x <- new
       fork $ unsafeDupablePerformIO {- for to represent long time process -} $
         do threadDelay 1000000
            return $ put x 9
       get x

*Main Control.Monad.Par> runPar putByForkedTask
9

 ただし,IVar型はMVar型とは異なり再代入できません。IVar型への複数回の書き込みはエラーになります。また,takeMVar関数を使ってMVar型の値を参照する場合には値の中身が空になりましたが,get関数を使ってIVar型の値を参照しても中身が空になることはありません。

Prelude Control.Monad.Par> runPar $ do {x <- new; put x 9; get x; get x}
9
Prelude Control.Monad.Par> runPar $ do {x <- new; put x 9; put x 10; get x}
<interactive>: multiple put
Interrupted.
Prelude Control.Monad.Par> runPar $ do {x <- new; put x 9; fork (put x 10); get x}
<interactive>: multiple put
^CInterrupted.
Prelude Control.Monad.Par> runPar $ do {x <- new; fork (put x 9); put x 10; get x}
<interactive>: multiple put

 IVar型ではこうした制限を設けることで,共有変数が持つ非決定性を排除しています。また,NFDataクラスの制約を持たせて,データ構造の中身までを正格評価するput関数をput*関数のデフォルトにすることで,遅延評価固有の問題をなるべくユーザーの目から隠すようにしています。

-- | put a value into a @IVar@.  Multiple 'put's to the same @IVar@
-- are not allowed, and result in a runtime error.
--
-- 'put' fully evaluates its argument, which therefore must be an
-- instance of 'NFData'.  The idea is that this forces the work to
-- happen when we expect it, rather than being passed to the consumer
-- of the @IVar@ and performed later, which often results in less
-- parallelism than expected.
--
-- Sometimes partial strictness is more appropriate: see 'put_'.
--
put :: NFData a => IVar a -> a -> Par ()
put v a = deepseq a (Par $ \c -> Put v a (c ()))

 利便性向上のため,Control.Monad.Parモジュールでは,fork関数とnew関数,put*関数をセットにしたspawn*関数や,spawn関数とreturnメソッドをセットにしたpval関数なども提供されています。

-- | Like 'spawn', but the result is only head-strict, not fully-strict.
spawn_ :: Par a -> Par (IVar a)
spawn_ p = do
  r <- new
  fork (p >>= put_ r)
  return r

-- | Like 'fork', but returns a @IVar@ that can be used to query the
-- result of the forked computataion.
--
-- >  spawn p = do
-- >    r <- new
-- >    fork (p >>= put r)
-- >    return r
--
spawn :: NFData a => Par a -> Par (IVar a)
spawn p = do
  r <- new
  fork (p >>= put r)
  return r

-- | equivalent to @spawn . return@
pval :: NFData a => a -> Par (IVar a)
pval a = spawn (return a)

 これらの関数を利用することで,単純な「IVar型に格納された値」をParモナドで扱う場合には,定型的なコードを除去できます。

 Parモナドでは,以上のような工夫によって雑多な問題をできるだけ排除し,プログラマが並列化に意識を集中できるようにしているのです。

 Parモナドを使うと,第48回のクイックソート関数は以下のように定義できます。

qsortPar :: (Ord a, NFData a) => Int -> [a] -> [a]
qsortPar x ys = runPar $ qsortParWithPivot' selectMiddle 0 x ys

qsortParWithPivot' :: (Ord a, NFData a) => ([a] -> a) -> Int -> Int -> [a] -> Par [a]
qsortParWithPivot' _ _ _ []  = return []
qsortParWithPivot' _ _ _ [x] = return [x]
qsortParWithPivot' f !currentDepth !limit xs
    | currentDepth >= limit = return $ quickSortWithPivot f xs
    | otherwise
        = do lof <- spawn losort
             hi  <-       hisort
             lo  <- get   lof
             return $ lo ++ hi
  where
      x      = f xs
      losort = qsortParWithPivot' f (currentDepth+1) limit [y | y <- xs, y < x]
      hisort = qsortParWithPivot' f (currentDepth+1) limit [y | y <- xs, y >= x]

 この定義では,「losortに対する並列処理のタスクを作成し,hisortの計算を行い,losrtの結果を取得して,それぞれの結果を連結する」という処理の並列化と同期の流れが明確に表現されています。rnfメソッドやrdeepseq関数などを使わない分,全体の処理の見通しが良くなっています。