forkOSで結合スレッドを利用する

 第12回で説明したように,forkIOによって作成されるのはユーザー・レベル・スレッドです。

 GHCのマルチスレッド版実行時システムでは,forkIOで作成したユーザー・レベル・スレッドの実行を,複数のCPUコアで動作するネイティブ・スレッドに適切に割り振ってくれます。これにより,それぞれのスレッドが並列に実行されます。しかし,あくまでユーザー・スレッドであり,ネイティブ・スレッドではありません。

 このため,OSのネイティブ・スレッドで使用することを想定したスレッド固有データ(TSD:thread-specific data)などの機能は,forkIOの作成したスレッド上ではきちんと動かない可能性があります(参考リンク)。

 例を見てみましょう。以下のような,Windows APIのスレッド固有データを利用するプログラムを用意します。

tls.h(Cのヘッダ)

#include "windows.h"

void init ();
void setValue (int val);
int getValue ();

tls.c(Cのコード)

#include "tls.h"

static DWORD tlsIndex;

void init () {
    tlsIndex = TlsAlloc();
};

void setValue (int val) {
    TlsSetValue(tlsIndex, (LPVOID) val);
};

int getValue () {
    int val = (int)TlsGetValue(tlsIndex);
    return val;
};

TLSExample.hs(Haskellのコード)

{-# INCLUDE "tls.h" #-}
{-# LANGUAGE ForeignFunctionInterface #-}
module Main where
import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad
import GHC.Conc (numCapabilities)

main  = main' forkIO
main' fork = do
    let num = max 6 $ 2 * numCapabilities
        nums = [1..num]
    ms <- sequence $ take num $ repeat newEmptyTMVarIO
    c_init

    zipWithM_ (threadFunc fork) nums ms

    atomically $ mapM_ takeTMVar ms
    where
        threadFunc fork val var = fork $ do
            c_setValue val
            yield
            val <- c_getValue
            print val
            atomically $ putTMVar var ()


foreign import ccall safe "init" c_init :: IO ()
foreign import ccall safe "setValue" c_setValue :: Int -> IO ()
foreign import ccall safe "getValue" c_getValue :: IO Int

 threadFuncの中で,スレッド固有データに対する処理を行っています。threadFuncの中で呼ばれているc_setValueはスレッド固有データを書き込む関数,c_getValueはスレッド固有データを読み込む関数です。これら二つの関数の間にControl.Concurrentモジュールのyield関数を挟み込むことで,固有データの書き込みと読み出しの間に,実行中のスレッドを切り替える機会を与えています。

Prelude Control.Concurrent> :t yield
yield :: IO ()

 ただし,注意点があります。GHCのマルチスレッド版実行時システムでは,forkIOで作成したユーザー・スレッドの実行は,空いているネイティブ・スレッドに適宜割り振られていきます。十分な数のユーザー・スレッドを用意しなければ,すべてのユーザー・スレッドが別々のネイティブ・スレッドの中で実行されることになります。その結果,ユーザー・スレッドの切り替えが起こらなくなってしまいます。これにより,ユーザー・スレッドとネイティブ・スレッドの違いが見えなくなり,今回,説明したい「ユーザー・スレッドでネイティブ・スレッド向けの機能を使った場合に起こる問題」を検出できなくなります。こうした事態を防いでいるのが,「zipWithM_ (threadFunc fork) nums ms」の部分です。

 zipWithM_はControl.Monadモジュールで提供されているzipWithのモナド版です。zipWithM_は値を返しませんが,mapM*やsequence*と同様に,値を返すzipWithMも存在します(参考リンク)。

Prelude Control.Monad> :t zipWithM_
zipWithM_ :: (Monad m) => (a -> b -> m c) -> [a] -> [b] -> m ()
Prelude Control.Monad> :t zipWithM
zipWithM :: (Monad m) => (a -> b -> m c) -> [a] -> [b] -> m [c]

 numsの定義に使われているnumCapabilitiesは,Haskellプログラム内で同時に実行されるネイティブ・スレッドの数を返します。

Prelude GHC.Conc> :t numCapabilities
numCapabilities :: Int

 この値は実行時システムの-N<n>オプションに与えたのと同じものです(参考リンク)。

module Capability where
import GHC.Conc (numCapabilities)

main = print numCapabilities

$ ghc --make Capability.hs -main-is Capability -threaded

[1 of 1] Compiling Capability       ( Capability.hs, Capability.o )
Linking Capability.exe ...

$ ./Capability
1

$ ./Capability +RTS -N2
2

$ ./Capability +RTS -N4
4

 これを使い,「同時に実行されるネイティブ・スレッドの数の2倍,または6のどちらか大きい方」の大きさを持ったリストを作成します。

let num = max 6 $ 2 * numCapabilities
    nums = [1..num]
    ms <- sequence $ take num $ repeat newEmptyTMVarIO

 numsはリストの内包表記です。msはrepeatとtakeによってnumと同じ大きさのリストを作成しています。

 zipWithM_ (threadFunc fork)をこれらの値に適用することで,リストの大きさと同じだけthreadFunc関数で定義したアクションが実行されることになります。main変数でfork変数にはforkIOが与えられているため,これらのリストの大きさと同じ数だけスレッドが作成されるのです。

 コンパイルして実行してみましょう。main'関数にforkIOを与えた結果は以下のようになります。

$ ghc --make TLSExample.hs tls.c
[1 of 1] Compiling Main             ( TLSExample.hs, TLSExample.o )
Linking TLSExample.exe ...

$ ./TLSExample
6
66666




$ ghc --make TLSExample.hs tls.c -threaded
Linking TLSExample.exe ...

$ ./TLSExample +RTS -N4
8777
8
3
6



0

 出力される値がnumsの定義とは異なるのがわかります。複数の数字が横に並んでいる部分は,出力のタイミングによるものなので問題ありません。しかし,66666のように同じ数字が連続していたり,0が含まれていたりするのは,numsの定義からはありえない結果です。

 このように,forkIOの作成したスレッドでは,スレッド固有データなどのネイティブ・スレッドでの実行を必要とする機能をきちんと扱えません。これでは,スレッド固有データなどの機能を利用したライブラリをHaskell側から利用できなくなってしまいます。

 そこで第12回で説明したように,Haskell'やGHCは,ネイティブ・スレッドと結びついた「結合スレッド」を利用するための機能を提供しています。ただし,Haskell'ではこの機能はオプション扱いです。しばらくはGHCの機能として利用することになるでしょう(参考リンク1参考リンク2)。

 結合スレッドを利用するための機能の一つが,第12回で触れたforkOSです。forkOSは,ユーザー・レベル・スレッドの代わりにネイティブ・スレッドを作成して処理を実行します。これを使ってmain変数を書き換えれば,以下のように正しく動作します。

main  = main' forkOS

$ ghc --make TLSExample.hs tls.c -threaded
Linking TLSExample.exe ...

$ ./TLSExample
1
2
3
4
5
6

$ ./TLSExample +RTS -N4
1
32

4
5
6
7
8

 ./TLSExample +RTS -N4の最初の結果が32となっているのはタイミングの問題によるものです。二つのスレッドがそれぞれ3と2を出力したと思ってください。