Skip to main content

第 21 章:单子与效果(Monads and Effects)

原文:Bartosz Milewski, Category Theory for Programmers, Scala Edition, Chapter 21. 原书 PDF、.tex 源文件及相关图像采用 CC BY-SA 4.0;本译文按同一许可发布。

现在我们知道单子的用途了,它让我们组合被修饰函数。真正有趣的问题是:为什么被修饰函数在函数式编程中如此重要。我们已经见过一个例子,也就是 Writer 单子,其中修饰让我们能够跨多个函数调用创建并累积日志。一个原本会用不纯函数解决的问题(例如访问和修改某个全局状态),被纯函数解决了。

21.1 问题(The Problem)

下面是一份简短清单,来自 Eugenio Moggi 的奠基论文1,其中所有问题传统上都需要通过放弃函数纯性来解决。

  • 偏性(partiality):可能不终止的计算
  • 非确定性(nondeterminism):可能返回多个结果的计算
  • 副作用:访问或修改状态的计算
  • 只读状态,或者说环境
  • 只写状态,或者说日志
  • 读写状态
  • 异常:可能失败的偏函数
  • 延续:保存程序状态并在需要时恢复它的能力
  • 交互式输入
  • 交互式输出

真正令人惊叹的是,所有这些问题都可以用同一个巧妙技巧解决:转向被修饰函数。当然,每种情况下的修饰都会完全不同。

你必须意识到,在这个阶段并没有要求修饰必须是单子的。只有当我们坚持组合,也就是能够把单个被修饰函数分解成更小的被修饰函数时,才需要单子。再次强调,由于每种修饰都不同,单子组合的实现也会不同,但整体模式相同。它是一个非常简单的模式:满足结合律并配有恒等元的组合。

下一节会大量使用 Haskell 示例。如果你急着回到范畴论,或者已经熟悉 Haskell 对单子的实现,可以快速浏览甚至跳过它。

21.2 解决方案(The Solution)

首先,分析一下我们使用 Writer 单子的方式。我们从一个执行某项任务的纯函数开始:给定参数,它产生某个输出。我们把这个函数替换为另一个函数,后者通过把原始输出与一个字符串配对来修饰输出。这就是我们对日志问题的解决方案。

但不能止步于此,因为一般来说,我们不想处理单体式方案。我们需要能够把一个产生日志的函数分解成更小的产生日志的函数。正是这些更小函数的组合,把我们带到了单子概念。

真正惊人的是,修饰函数返回类型这一相同模式,可以用于很多通常需要放弃纯性的不同问题。让我们逐项走过清单,识别每个问题对应的修饰。

21.2.1 偏性(Partiality)

我们修改每个可能不终止函数的返回类型,把它变成一个“提升”类型,也就是一个包含原类型所有值,再加上特殊 bottom 值 $\bot$ 的类型。例如,Bool 类型作为集合包含两个元素:TrueFalse。提升后的 Bool 包含三个元素。返回提升 Bool 的函数可以产生 TrueFalse,也可以永远执行下去。

有趣的是,在 Haskell 这样的惰性语言中,一个永不结束的函数实际上可能返回一个值,而且这个值还可以被传给下一个函数。我们把这个特殊值称为 bottom。只要这个值不被显式需要(例如用于模式匹配,或作为输出产生),它就可以被传来传去,而不会卡住程序执行。因为每个 Haskell 函数都可能不终止,Haskell 中所有类型都被假定为提升类型。这就是为什么我们经常谈论 Haskell(提升)类型和函数组成的范畴 Hask,而不是更简单的 $Set$。不过,Hask 是否真的是一个范畴并不清楚(参见 Andrej Bauer 的文章2)。

21.2.2 非确定性(Nondeterminism)

如果一个函数可以返回许多不同结果,那么它完全可以一次性返回所有结果。从语义上说,非确定性函数等价于返回结果列表的函数。在惰性、带垃圾回收的语言中,这很有意义。例如,如果你只需要一个值,可以只取列表头部,尾部永远不会被求值。如果需要随机值,可以用随机数生成器选择列表中的第 n 个元素。惰性甚至允许你返回无限结果列表。

在列表单子,也就是 Haskell 对非确定性计算的实现中,join 被实现为 concat。记住,join 应该把容器中的容器压平,而 concat 会把列表的列表连接成单个列表。return 创建单元素列表:

instance Monad [] where
join = concat
return x = [x]
implicit val listMonad = new Monad[List] {
def flatten[A](mma: List[List[A]]): List[A] =
mma.reduce(_ ++ _)

def pure[A](a: A): List[A] = List(a)
}

列表单子的 bind 操作符由通用公式给出:先 fmapjoin。在这里得到:

as >>= k = concat (fmap k as)
def flatMap[A, B](as: List[A])(k: A => List[B]): List[B] =
flatten(fmap(k)(as))

这里,函数 k 自身会产生一个列表,它被应用到列表 as 的每个元素上。结果是一个列表的列表,再用 concat 压平。

从程序员角度看,处理列表比在循环中调用非确定性函数,或者实现一个返回迭代器的函数更容易(尽管在现代 C++3 中,返回惰性 range 几乎等价于在 Haskell 中返回列表)。

创造性使用非确定性的一个好例子来自游戏编程。例如,当计算机和人类下棋时,它无法预测对手下一步会走什么。不过,它可以生成所有可能走法的列表,并逐个分析。类似地,一个非确定性解析器可以为给定表达式生成所有可能解析的列表。

即使我们可以把返回列表的函数解释为非确定性的,列表单子的应用也要广泛得多。这是因为,把产生列表的计算缝合在一起,是命令式编程中迭代结构,也就是循环,的完美函数式替代。单个循环通常可以用 fmap 重写,后者把循环体应用到列表的每个元素。列表单子中的 do 记法可以用来替代复杂嵌套循环。

我最喜欢的例子是生成毕达哥拉斯三元组的程序,也就是可以作为直角三角形三边的正整数三元组。

triples = do
z <- [1..]
x <- [1..z]
y <- [x..z]
guard (x^2 + y^2 == z^2)
return (x, y, z)
// Scala 中的 Stream 可视为惰性列表
def triples = for {
z <- Stream.from(1)
x <- 1 to z
y <- x to z
_ <- guard(x * x + y * y == z * z)
} yield (x, y, z)

第一行告诉我们,z 从正数无限列表 [1..] 中取一个元素。然后 x1z 的(有限)列表 [1..z] 中取一个元素。最后,yxz 的列表中取一个元素。于是我们手头有三个数 1 <= x <= y <= z。函数 guard 接受一个 Bool 表达式,并返回一个 unit 列表:

guard :: Bool -> [()]
guard True = [()]
guard False = []
def guard: Boolean => List[Unit] = {
case true => List(())
case false => List.empty[Unit]
}

这个函数(它属于一个更大的类 MonadPlus)在这里用于过滤掉非毕达哥拉斯三元组。确实,如果你查看 bind(或相关操作符 >>)的实现,会注意到,当它接收空列表时,会产生空列表。另一方面,当它接收非空列表(这里是包含 unit 的单元素列表 [()])时,bind 会调用延续,也就是这里的 return (x, y, z),它产生一个包含已验证毕达哥拉斯三元组的单元素列表。所有这些单元素列表会由外层 bind 连接起来,产生最终的(无限)结果。当然,triples 的调用者永远无法消费整个列表,但这没关系,因为 Haskell 是惰性的。

通常需要三层嵌套循环的问题,在列表单子和 do 记法帮助下被大幅简化。还不止如此,Haskell 还允许你用列表推导式进一步简化这段代码:

triples = [(x, y, z) | z <- [1..]
, x <- [1..z]
, y <- [x..z]
, x^2 + y^2 == z^2]
def triples = for {
z <- Stream.from(1)
x <- 1 to z
y <- x to z
if x * x + y * y == z * z
} yield (x, y, z)

这只是列表单子(严格说,是 MonadPlus)的进一步语法糖。

你可能会在其他函数式或命令式语言中,以生成器和协程的名义看到类似构造。

21.2.3 只读状态(Read-Only State)

一个对某些外部状态或环境有只读访问的函数,总可以替换为一个把环境作为额外参数的函数。纯函数 (a, e) -> b(其中 e 是环境类型)乍看不像 Kleisli 箭头。但只要把它柯里化为 a -> (e -> b),就能认出这个修饰正是我们的老朋友 reader 函子:

newtype Reader e a = Reader (e -> a)
case class Reader[E, A](run: E => A)

你可以把返回 Reader 的函数解释为产生一个迷你可执行程序:一个给定环境后产生所需结果的动作。有一个辅助函数 runReader 可以执行这样的动作:

runReader :: Reader e a -> e -> a
runReader (Reader f) e = f e
def runReader[E, A]: Reader[E, A] => E => A = {
case Reader(f) => e => f(e)
}

它可能会对不同环境值产生不同结果。注意,返回 Reader 的函数以及 Reader 动作本身都是纯的。

为了实现 Reader 单子的 bind,首先注意你必须产生一个函数,它接受环境 e 并产生一个 b

ra >>= k = Reader (\e -> ...)
def flatMap[A, B](ra: Reader[E, A])(k: A => Reader[E, B]): Reader[E, B] =
Reader { e => ... }

在 lambda 内部,可以执行动作 ra 来产生一个 a

ra >>= k = Reader (\e -> let a = runReader ra e
in ...)
def flatMap[A, B](ra: Reader[E, A])(k: A => Reader[E, B]): Reader[E, B] =
Reader { e =>
val a = runReader(ra)(e)
...
}

然后可以把这个 a 传给延续 k,得到一个新动作 rb

ra >>= k = Reader (\e -> let a  = runReader ra e
rb = k a
in ...)
def flatMap[A, B](ra: Reader[E, A])(k: A => Reader[E, B]): Reader[E, B] =
Reader { e =>
val a = runReader(ra)(e)
val rb = k(a)
...
}

最后,可以用环境 e 运行动作 rb

ra >>= k = Reader (\e -> let a  = runReader ra e
rb = k a
in runReader rb e)
def flatMap[A, B](ra: Reader[E, A])(k: A => Reader[E, B]): Reader[E, B] =
Reader { e =>
val a = runReader(ra)(e)
val rb = k(a)
runReader(rb)(e)
}

为了实现 return,我们创建一个忽略环境并返回原值的动作。

把这些放在一起,再做一些简化,得到下面的定义:

instance Monad (Reader e) where
ra >>= k = Reader (\e -> runReader (k (runReader ra e)) e)
return x = Reader (\e -> x)
implicit def readerMonad[E] = new Monad[Reader[E, ?]] {
def pure[A](x: A): Reader[E, A] =
Reader(e => x)

def flatMap[A, B](ra: Reader[E, A])(k: A => Reader[E, B]): Reader[E, B] =
Reader(e =>
runReader(k(runReader(ra)(e)))(e))
}

21.2.4 只写状态(Write-Only State)

这就是我们最初的日志例子。修饰由 Writer 函子给出:

newtype Writer w a = Writer (a, w)
case class Writer[W, A](run: (A, W))

为完整起见,还有一个简单辅助函数 runWriter,用于拆开数据构造器:

runWriter :: Writer w a -> (a, w)
runWriter (Writer (a, w)) = (a, w)
def runWriter[W, A]: Writer[W, A] => (A, W) = {
case Writer((a, w)) => (a, w)
}

正如我们之前见过的,要让 Writer 可组合,w 必须是幺半群。下面是用 bind 操作符写出的 Writer 单子实例:

instance (Monoid w) => Monad (Writer w) where
(Writer (a, w)) >>= k = let (a', w') = runWriter (k a)
in Writer (a', w `mappend` w')
return a = Writer (a, mempty)
implicit def writerMonad[W: Monoid] = new Monad[Writer[W, ?]] {
def flatMap[A, B](wa: Writer[W, A])(k: A => Writer[W, B]): Writer[W, B] =
wa match {
case Writer((a, w)) =>
val ((a1, w1)) = runWriter(k(a))
Writer((a1, Monoid[W].combine(w, w1)))
}

def pure[A](a: A) = Writer(a, Monoid[W].empty)
}

21.2.5 状态(State)

具有读写状态访问能力的函数,结合了 Reader 和 Writer 的修饰。你可以把它们看作纯函数,它们接受状态作为额外参数,并产生一对值/状态作为结果:(a, s) -> (b, s)。柯里化之后,我们把它们变成 Kleisli 箭头形式 a -> (s -> (b, s)),其中修饰被抽象在 State 函子中:

newtype State s a = State (s -> (a, s))
case class State[S, A](run: S => (A, S))

同样,可以把 Kleisli 箭头看作返回一个动作,这个动作可以用辅助函数执行:

runState :: State s a -> s -> (a, s)
runState (State f) s = f s
def runState[S, A]: State[S, A] => S => (A, S) = {
case State(f) => s => f(s)
}

不同初始状态不仅可能产生不同结果,还可能产生不同最终状态。

State 单子的 bind 实现与 Reader 单子的实现非常相似,只是必须注意在每一步传递正确状态:

sa >>= k = State (\s -> let (a, s') = runState sa s
sb = k a
in runState sb s')
def flatMap[A, B](sa: State[S, A])(k: A => State[S, B]): State[S, B] =
State { s =>
val (a, s1) = runState(sa)(s)
val sb = k(a)
runState(sb)(s1)
}

完整实例如下:

instance Monad (State s) where
sa >>= k = State (\s -> let (a, s') = runState sa s
in runState (k a) s')
return a = State (\s -> (a, s))
implicit def stateMonad[S] = new Monad[State[S, ?]] {
def flatMap[A, B](sa: State[S, A])(k: A => State[S, B]): State[S, B] =
State { s =>
val (a, s1) = runState(sa)(s)
runState(k(a))(s1)
}

def pure[A](a: A): State[S, A] = State(s => (a, s))
}

还有两个辅助 Kleisli 箭头,可用于操作状态。一个取回状态以供检查:

get :: State s s
get = State (\s -> (s, s))
def get: State[S, S] = State(s => (s, s))

另一个用全新状态替换它:

put :: s -> State s ()
put s' = State (\s -> ((), s'))
def put(s1: S): State[S, Unit] = State(s => ((), s1))

21.2.6 异常(Exceptions)

会抛出异常的命令式函数实际上是偏函数,也就是在某些参数值上没有定义的函数。用纯全函数实现异常的最简单方式,是使用 Maybe 函子。一个偏函数被扩展为全函数:当有意义时返回 Just a,无意义时返回 Nothing。如果还想返回关于失败原因的信息,可以改用 Either 函子(并把第一个类型固定为例如 String)。

下面是 Maybe 的单子实例:

instance Monad Maybe where
Nothing >>= k = Nothing
Just a >>= k = k a
return a = Just a
implicit val optionMonad = new Monad[Option] {
def flatMap[A, B](ma: Option[A])(k: A => Option[B]): Option[B] =
ma match {
case None => None
case Some(a) => k(a)
}

def pure[A](a: A): Option[A] = Some(a)
}

注意,当检测到错误时,Maybe 的单子组合会正确地短路计算(延续 k 永远不会被调用)。这正是我们期待异常具备的行为。

21.2.7 延续(Continuations)

这就像求职面试之后可能遇到的“别打给我们,我们会打给你!”情况。你不会得到直接答案,而是应该提供一个处理器,也就是一个在结果可用时被调用的函数。这种编程风格在调用时结果尚不可知的情况下尤其有用,例如结果正在另一个线程中求值,或者来自远程网站。此时的 Kleisli 箭头返回一个接受处理器的函数,而处理器表示“计算的其余部分”:

data Cont r a = Cont ((a -> r) -> r)
case class Cont[R, A](run: (A => R) => R)

处理器 a -> r 最终被调用时,会产生类型为 r 的结果,而这个结果最后会被返回。延续由结果类型参数化。(实践中,这通常是某种状态指示器。)

还有一个辅助函数用于执行 Kleisli 箭头返回的动作。它接受处理器并把它传给延续:

runCont :: Cont r a -> (a -> r) -> r
runCont (Cont k) h = k h
def runCont[R, A]: Cont[R, A] => (A => R) => R = {
case Cont(k) => h => k(h)
}

延续的组合出了名地困难,因此通过单子,尤其是 do 记法来处理它,优势非常大。

让我们推导 bind 的实现。先看剥离后的签名:

(>>=) :: ((a -> r) -> r) ->
(a -> (b -> r) -> r) ->
((b -> r) -> r)
def flatMap[A, B]: ((A => R) => R) =>
(A => (B => R) => R) =>
((B => R) => R)

我们的目标是创建一个函数,它接受处理器 b -> r 并产生结果 r。所以这是起点:

ka >>= kab = Cont (\hb -> ...)
def flatMap[A, B](ka: Cont[R, A])(kab: A => Cont[R, B]): Cont[R, B] =
Cont(hb => ...)

在 lambda 内部,我们想用合适的处理器调用函数 ka,这个处理器表示计算的其余部分。把这个处理器实现为一个 lambda:

runCont ka (\a -> ...)
runCont(ka)(a => ...)

在这个情形中,计算剩余部分包括:先用 a 调用 kab,再把 hb 传给得到的动作 kb

runCont ka (\a -> let kb = kab a
in runCont kb hb)
runCont(ka) { a =>
val kb = kab(a)
runCont(kb)(hb)
}

可以看到,延续是从内向外组合的。最终处理器 hb 从计算最内层被调用。完整实例是:

instance Monad (Cont r) where
ka >>= kab = Cont (\hb -> runCont ka (\a -> runCont (kab a) hb))
return a = Cont (\ha -> ha a)
implicit def contMonad[R] = new Monad[Cont[R, ?]] {
def flatMap[A, B](ka: Cont[R, A])(kab: A => Cont[R, B]): Cont[R, B] =
Cont { hb =>
runCont(ka)(a => runCont(kab(a))(hb))
}

def pure[A](a: A): Cont[R, A] =
Cont(ha => ha(a))
}

21.2.8 交互式输入(Interactive Input)

这是最棘手的问题,也是许多困惑的来源。显然,像 getChar 这样的函数,如果它要返回键盘输入的字符,就不可能是纯函数。但如果它把字符放在容器中返回呢?只要没有办法从这个容器中提取字符,我们就可以声称这个函数是纯的。每次调用 getChar,它都会返回完全相同的容器。概念上,这个容器会包含所有可能字符的叠加。

如果你熟悉量子力学,理解这个类比应该没有问题。它就像装着薛定谔的猫的盒子,只不过没有任何办法打开或偷看盒子内部。这个盒子用特殊的内置 IO 函子定义。在我们的例子中,getChar 可以声明为 Kleisli 箭头:

getChar :: () -> IO Char
def getChar: Unit => IO[Unit]

(实际上,由于从 unit 类型出发的函数等价于选择返回类型中的一个值,getChar 的声明被简化为 getChar :: IO Char。)

作为函子,IO 允许你用 fmap 操作其内容。而且作为函子,它可以存储任何类型的内容,不只是字符。当你考虑到在 Haskell 中 IO 是单子时,这种做法的真正实用性就显现出来了。这意味着你可以组合产生 IO 对象的 Kleisli 箭头。

你可能会以为 Kleisli 组合允许你偷看 IO 对象的内容(如果继续使用量子类比,就是“坍缩波函数”)。确实,可以把 getChar 与另一个 Kleisli 箭头组合,后者接受字符并比如说把它转换为整数。问题是,第二个 Kleisli 箭头也只能把这个整数作为 IO Int 返回。于是你又得到所有可能整数的叠加,如此继续下去。薛定谔的猫永远不会从袋子里出来。一旦进入 IO 单子,就没有办法离开它。IO 单子没有对应于 runStaterunReader 的东西。没有 runIO

那么,除了把 Kleisli 箭头的结果,也就是 IO 对象,与另一个 Kleisli 箭头组合,你还能做什么?你可以从 main 返回它。在 Haskell 中,main 的签名是:

main :: IO ()
def main: IO[Unit]

你可以自由地把它想成 Kleisli 箭头:

main :: () -> IO ()
def main: Unit => IO[Unit]

从这个角度看,Haskell 程序只是 IO 单子中的一个大 Kleisli 箭头。你可以用单子组合从更小的 Kleisli 箭头组合出它。至于如何处理得到的 IO 对象(也称为 IO 动作),则交给运行时系统。

注意,箭头本身是纯函数,从头到尾都是纯函数。脏活被交给系统。当它最终执行从 main 返回的 IO 动作时,会做各种讨厌的事情,比如读取用户输入、修改文件、打印刺眼消息、格式化磁盘,等等。Haskell 程序从不弄脏自己的手(好吧,除非它调用 unsafePerformIO,但那是另一个故事)。

当然,因为 Haskell 是惰性的,main 几乎会立即返回,脏活也马上开始。正是在执行 IO 动作期间,纯计算的结果会按需被请求和求值。所以,真实情况中,程序执行是纯(Haskell)代码与脏(系统)代码的交错。

还有一种更离奇但作为数学模型完全合理的 IO 单子解释。它把整个宇宙当作程序中的一个对象。注意,概念上命令式模型把宇宙当作一个外部全局对象,因此执行 I/O 的过程通过与这个对象交互而具有副作用。它们既可以读取宇宙状态,也可以修改宇宙状态。

我们已经知道如何在函数式编程中处理状态,也就是使用状态单子。不过,与简单状态不同,宇宙状态无法轻松用标准数据结构描述。但只要我们从不直接与它交互,就不必描述它。假设存在一个类型 RealWorld,并且通过某种宇宙工程奇迹,运行时能够提供这个类型的对象,这就足够了。IO 动作只是一个函数:

type IO a = RealWorld -> (a, RealWorld)
type IO[A] = RealWorld => (A, RealWorld)

或者用 State 单子表示:

type IO = State RealWorld
type IO[A] = State[RealWorld, A]

不过,IO 单子的 >=>return 必须内建在语言中。

21.2.9 交互式输出(Interactive Output)

同一个 IO 单子也用于封装交互式输出。RealWorld 应该包含所有输出设备。你可能会问,为什么不能直接从 Haskell 调用输出函数,并假装它们什么都不做。例如,为什么要有:

putStr :: String -> IO ()
def putStr: String => IO[Unit]

而不是更简单的:

putStr :: String -> ()
def putStr: String => Unit

有两个原因:Haskell 是惰性的,所以它永远不会调用一个输出(这里是 unit 对象)没有任何用处的函数。而且,即使它不是惰性的,也仍然可以自由改变这类调用的顺序,从而弄乱输出。在 Haskell 中强制两个函数顺序执行的唯一方式是数据依赖:一个函数的输入必须依赖另一个函数的输出。让 RealWorldIO 动作之间传递,可以强制顺序。

概念上,在这个程序中:

main :: IO ()
main = do
putStr "Hello "
putStr "World!"
def main: IO[Unit] = for {
_ <- putStr("Hello ")
_ <- putStr("World!")
} yield ()

打印 “World!” 的动作接收的输入,是一个屏幕上已经有 “Hello ” 的宇宙。它输出一个新的宇宙,其中屏幕上有 “Hello World!”。

21.3 结论(Conclusion)

当然,我只是触及了单子编程的表面。单子不仅用纯函数完成了命令式编程中通常用副作用完成的事情,而且以高度控制和类型安全的方式完成。不过,它们并非没有缺点。对单子的主要抱怨是,它们彼此之间不容易组合。当然,你可以用 monad transformer 库组合大多数基本单子。创建一个结合状态与异常的单子栈相对容易,但并不存在把任意单子堆叠在一起的公式。

Footnotes

  1. Eugenio Moggi, Notions of Computation and Monads.

  2. Andrej Bauer, Hask is not a category.

  3. Eric Niebler, Range comprehensions.