複数のデータ構造を使用する処理を効率化

 Stream Fusionでは,zipやzipWithといった関数を使って複数のデータ構造を処理する場合でも融合変換が可能です。そのためには,Stream型に格納する「s -> Step a s」型の関数の呼び出しパターンを工夫します。

-- Data.List.Stream モジュールでの定義
------------------------------------------------------------------------
-- * Zipping and unzipping lists

zip :: [a] -> [b] -> [(a, b)]
~ 略 ~
{-# RULES
"zip -> fusible" [~1] forall xs ys.
    zip xs ys = unstream (Stream.zip (stream xs) (stream ys))
-- "zip -> unfused" [1]  forall xs ys.
--     unstream (Stream.zip (stream xs) (stream ys)) = zip xs ys
  #-}

-- Data.Stream モジュールでの定義
zip :: Stream a -> Stream b -> Stream (a, b)
zip = zipWith (,)
{-# INLINE zip #-}

 zip関数はzipWith関数を使って定義されているので,zipWith関数の定義を見てみましょう。stream-fusionパッケージにおけるzipWith関数の定義は以下の通りです。

-- Data.List.Stream モジュールでの定義
zipWith :: (a -> b -> c) -> [a] -> [b] -> [c]
~ 略 ~
{-# RULES
"zipWith -> fusible" [~1] forall f xs ys.
    zipWith f xs ys = unstream (Stream.zipWith f (stream xs) (stream ys))
-- "zipWith -> unfused" [1]  forall f xs ys.
--     unstream (Stream.zipWith f (stream xs) (stream ys)) = zipWith f xs ys
  #-}

-- Data.Stream モジュールでの定義
zipWith :: (a -> b -> c) -> Stream a -> Stream b -> Stream c
zipWith f (Stream next0 sa0) (Stream next1 sb0) = Stream next (sa0 :!: sb0 :!: Nothing)
  where
    {-# INLINE next #-}
    next (sa :!: sb :!: Nothing)     = case next0 sa of
        Done        -> Done
        Skip    sa' -> Skip (sa' :!: sb :!: Nothing)
        Yield a sa' -> Skip (sa' :!: sb :!: Just (L a))

    next (sa' :!: sb :!: Just (L a)) = case next1 sb of
        Done        -> Done
        Skip    sb' -> Skip          (sa' :!: sb' :!: Just (L a))
        Yield b sb' -> Yield (f a b) (sa' :!: sb' :!: Nothing)
{-# INLINE [0] zipWith #-}

 Data.StreamモジュールのzipWith関数では,第1引数「Stream next0 sa0」の状態sa0,第2引数「Stream next1 sb0」の状態sb0,そして「第1引数のStreamと第2引数のStreamのどちらをzipWith関数が現在使っているか」を示すMaybe型の値,の三つの状態を扱います。ただし,Stream型には単一の状態しか格納できません。そこで,zipWith関数では「:!:」演算子を使って三つの状態をまとめています。

 Haskellでは「:」で始まる演算子を中置データ構成子として扱います。したがって:!:演算子は中置データ構成子です。:!:データ構成子は:!:型で定義されています。

data (Unlifted a, Unlifted b) => a :!: b = !a :!: !b

instance (Unlifted a, Unlifted b) => Unlifted (a :!: b) where
  expose (a :!: b) s = expose a (expose b s)
  {-# INLINE expose #-}

 中置データ構成子は,通常のデータ構成子よりも優先順位が低いものとして扱われます。:!:はSデータ構成子やLデータ構成子よりも優先順位が低いので,「L n :!: s」や「S n :!: s」は「(L n) :!: s」や「(S n) :!: s」と解釈されます(参考リンク1参考リンク2)。

module RewriteStream where

data L a = L a

data Infix a b = !a :!: !b

*RewriteStream> :t L 12 :!: 'h'
L 12 :!: 'h' :: (Num t) => Infix (L t) Char

 :!:データ構成子は「a :!: b」のような対や「a :!: b :!: c」のような組を一つの値として定義するのに使われます(参考リンク)。ちょうど,タプルの「,」データ構成子が「(a,b)」のような対や「(a,b,c)」のような組を一つの値として定義するのに使われるのと同じです。

 では,stream-fusionではなぜタプルの「,」構成子ではなく,:!:構成子を新たに定義して使っているのでしょうか? その答えは効率にあります。タプルの「,」構成子を使った対や組では,それぞれのデータ構成子の各フィールドに対して正格性注釈は付けられていません。したがって,各フィールドは遅延評価されます(参考リンク1参考リンク2)。

data  (a,b)   =  (a,b)    deriving (Eq, Ord, Bounded)

data  (a,b,c) =  (a,b,c)  deriving (Eq, Ord, Bounded)

 効率の面では,フィールドのそれぞれの値を遅延評価するのは非効率です。そこでstream-fusionでは,それぞれのフィールドを正格評価する:!:構成子という新たな中置データ構成子を定義しているのです。

 話をzipWith関数に戻しましょう。

-- Data.Stream モジュールでの定義
zipWith :: (a -> b -> c) -> Stream a -> Stream b -> Stream c
zipWith f (Stream next0 sa0) (Stream next1 sb0) = Stream next (sa0 :!: sb0 :!: Nothing)
  where
    {-# INLINE next #-}
    next (sa :!: sb :!: Nothing)     = case next0 sa of
        Done        -> Done
        Skip    sa' -> Skip (sa' :!: sb :!: Nothing)
        Yield a sa' -> Skip (sa' :!: sb :!: Just (L a))

    next (sa' :!: sb :!: Just (L a)) = case next1 sb of
        Done        -> Done
        Skip    sb' -> Skip          (sa' :!: sb' :!: Just (L a))
        Yield b sb' -> Yield (f a b) (sa' :!: sb' :!: Nothing)
{-# INLINE [0] zipWith #-}

 zipWith関数では:!:構成子を使い,三つの状態をまとめて一つの状態にします。このとき,zipWith関数の処理は始まっていないので,三つ目の状態をNothingとした「sa0 :!: sb0 :!: Nothing」がStreamの初期状態になります。

 zipWith関数内のnext関数は,zipWithの第1引数である「Stream next0 sa0」のnext0関数と状態sa0を使って処理を始めます。「next0 sa」の結果が「Yield a sa'」である場合,状態sa0をsa'に更新し,第1引数から取り出したaという値をLデータ構成子とJustデータ構成子で包んで保持します。

 ここで作成された「sa' :!: sb :!: Just (L a)」という状態は,第1引数から値を取り出しているだけなので,「zipWith関数の処理結果を使用する消費者関数」からすれば不完全です。そこで,この処理結果を単なる状態の更新として扱い,すぐにzipWith関数のnext関数を使った処理に戻れるよう,Skip型に包んで返しています。「Skip (sa' :!: sb :!: Just (L a))」という状態はSkip型に包まれているので,「zipWith関数の処理結果を使用する消費者関数」は単に「next (sa' :!: sb :!: Just (L a))」を呼び出します。

 next関数に渡される状態が「sa' :!: sb :!: Just (L a)」である場合,zipWithの第2引数である「Stream next1 sb0」のnext1関数と状態sbを使った処理「next1 sb」が行われます。この結果が「Yield b sb'」である場合,状態sbをsb'に更新し,「f a b」を計算し,すでに使用したaという値を捨てた状態をYieldに包んだ「Yield (f a b) (sa' :!: sb' :!: Nothing)」を返します。

 この結果はYieldなので,「zipWith関数の処理結果を使用する消費者関数」で利用できます。次にnext関数に渡される状態は「sa' :!: sb' :!: Nothing」になるので,関数next0と状態sa'を使った処理が行われます。

 このようにしてzipWith関数では,第1引数のStreamを計算する状態をNothing,第2引数のStreamを計算する状態をJustとして,第1引数と第2引数のそれぞれのStreamを交互に計算していきます。最終的に,第1引数のStream,あるいは第2引数のStreamの状態がDoneになった時点で,計算の終了を示すDoneを返します。

 「第1引数のStream,あるいは第2引数のStreamの状態がDoneになった時点で計算を終了する」というこの振る舞いは,Data.ListのzipやzipWithの振る舞いに沿ったものです。Data.ListのzipやzipWithは,引数として取る二つのリストのうち短いリストの処理が終わった時点で処理を終了します。そこで,Data.StreamのzipやzipWithも第1引数か第2引数のどちらかのStreamの処理が終わり次第,終了を示すDoneを返しているのです。

 このzipWith関数の定義は,「Stream型を引数として取り,Stream型を返す関数」の再帰構造に依存しません。このため,Stream Fusionでzip関数およびzipWith関数を問題なく融合変換できます。