第 17 章:使用 Async 的并发编程(Concurrent Programming with Async)
原文:Anil Madhavapeddy and Yaron Minsky, Real World OCaml: Functional Programming for the Masses, Second Edition, Chapter 17。维护者已确认本书为开源书籍,可翻译并发布用于学习研究。
构建与外部世界交互的程序时,程序逻辑常常被“等待”支配:等待鼠标点击、等待从磁盘取得数据、等待出站网络缓冲区出现可用空间。即使只是稍微复杂一点的交互式应用,通常也是并发的:它需要同时等待多个不同事件,并立即响应最先发生的那个事件。
一种处理并发的方法是使用抢占式系统线程,这是 Java 或 C# 等语言中的主流方法。在这种模型中,每个可能需要同时等待的任务都会得到自己的操作系统线程,因此它可以阻塞,而不会让整个程序停下来。
另一种方法是编写单线程程序,让这一个线程运行一个事件循环。事件循环的职责是响应超时或鼠标点击这样的外部事件,并调用为此注册的回调函数。这种方法出现在 JavaScript 这类具有单线程运行时的语言中,也出现在许多 GUI 工具包中。
这些机制各有取舍。系统线程的每个线程都需要显著的内存和其他资源。此外,操作系统可以任意交错系统线程的执行,这要求程序员小心地用锁和条件变量保护共享资源,而这极其容易出错。
另一方面,单线程事件驱动系统一次只执行一个任务,不需要抢占式线程所需的那种复杂同步。然而,事件驱动程序反转的控制结构通常意味着,你自己的控制流必须尴尬地穿过系统事件循环,从而形成一团事件回调迷宫。
本章介绍 Async 库。它提供一种混合模型,目标是兼取两者之长:既避免抢占式线程带来的性能折中和同步痛苦,又避免事件驱动系统通常伴随的令人困惑的控制反转。
17.1 Async 基础(Async Basics)
回忆一下,在 Core 中通常如何执行 I/O。下面是一个简单例子:
# open Core;;
# #show In_channel.read_all;;
val read_all : string -> string
# Out_channel.write_all "test.txt" ~data:"This is only a test.";;
- : unit = ()
# In_channel.read_all "test.txt";;
- : string = "This is only a test."
从 In_channel.read_all 的类型可以看出,它必然是阻塞操作。特别是,它返回一个具体的字符串,这意味着在读取完成之前它不能返回。这个调用的阻塞性质意味着,在它完成之前,其他任何事情都无法取得进展。
在 Async 中,行为良好的函数永远不会阻塞。相反,它们返回一个 Deferred.t 类型的值,作为一个占位符,最终会被填入结果。例如,看看 In_channel.read_all 的 Async 等价函数的签名:
# #require "async";;
# open Async;;
# #show Reader.file_contents;;
val file_contents : string -> string Deferred.t
我们先在顶层环境中用 #require 加载 Async 包,然后打开该模块。和 Core 一样,Async 被设计成基本编程环境的扩展,并且预期会被打开。
一个 deferred 本质上是某个未来可能计算出来的值的句柄。因此,如果调用 Reader.file_contents,得到的 deferred 起初会是空的,如调用 Deferred.peek 可以看到:
# let contents = Reader.file_contents "test.txt";;
val contents : string Deferred.t = <abstr>
# Deferred.peek contents;;
- : string option = None
contents 中的值尚未确定,部分原因是没有任何正在运行的东西可以执行必要的 I/O。使用 Async 时,I/O 和其他事件的处理由 Async 调度器完成。编写独立程序时,需要显式启动调度器;但 utop 知道 Async,并且可以自动启动调度器。不仅如此,utop 还理解 deferred 值。当你输入一个类型为 Deferred.t 的表达式时,它会确保调度器正在运行,并阻塞直到该 deferred 被确定。因此可以写成:
# contents;;
- : string = "This is only a test."
稍微令人困惑的是,这里显示的类型并不是 contents 的类型。contents 的类型是 string Deferred.t,而这里显示的是 string,也就是该 deferred 内部包含的值的类型。
如果再次 peek,就会看到 contents 的值已经被填入:
# Deferred.peek contents;;
- : string option = Some "This is only a test."
为了真正使用 deferred 做事,我们需要一种等待 deferred 计算完成的方式,这可以通过 Deferred.bind 完成。下面是 bind 的类型签名:
# #show Deferred.bind;;
val bind : 'a Deferred.t -> f:('a -> 'b Deferred.t) -> 'b Deferred.t
bind 实际上是一种对并发计算排序的方式。具体来说,Deferred.bind d ~f 会在 d 的值被确定之后调用 f。
下面是 bind 的一个简单用法:一个函数把文件替换为其内容的大写版本。
# let uppercase_file filename =
Deferred.bind (Reader.file_contents filename)
~f:(fun text ->
Writer.save filename ~contents:(String.uppercase text));;
val uppercase_file : string -> unit Deferred.t = <fun>
# uppercase_file "test.txt";;
- : unit = ()
# Reader.file_contents "test.txt";;
- : string = "THIS IS ONLY A TEST."
同样,bind 在这里充当排序操作符,使文件只有在先通过 Reader.file_contents 读取完内容之后,才会通过 Writer.save 保存。
显式写出 Deferred.bind 可能相当冗长,因此 Async 为它提供了一个中缀操作符:>>=。使用这个操作符,可以把 uppercase_file 改写如下:
# let uppercase_file filename =
Reader.file_contents filename
>>= fun text ->
Writer.save filename ~contents:(String.uppercase text);;
val uppercase_file : string -> unit Deferred.t = <fun>
这里去掉了 bind 右侧函数周围的括号,也没有为该函数的内容增加一层缩进。这是使用中缀 bind 操作符时的标准写法。
现在再看 bind 的另一个潜在用途。这次编写一个统计文件行数的函数:
# let count_lines filename =
Reader.file_contents filename
>>= fun text ->
List.length (String.split text ~on:'\n');;
Line 4, characters 5-45:
Error: This expression has type int but an expression was expected of type
'a Deferred.t
这看起来足够合理,但可以看到编译器并不满意。问题在于,bind 期待的是一个返回 Deferred.t 的函数,而我们提供的函数直接返回结果。我们需要的是 return,这是 Async 提供的一个函数,它接收普通值,并把它包进 deferred。
# #show_val return;;
val return : 'a -> 'a Deferred.t
# let three = return 3;;
val three : int Deferred.t = <abstr>
# three;;
- : int = 3
使用 return,可以让 count_lines 通过编译:
# let count_lines filename =
Reader.file_contents filename
>>= fun text ->
return (List.length (String.split text ~on:'\n'));;
val count_lines : string -> int Deferred.t = <fun>
bind 和 return 组合在一起,形成了函数式编程中称为单子(monad)的设计模式。除线程之外,你还会在许多应用中遇到这个签名。事实上,我们已经在第 8 章“错误处理”中遇到过单子。
同时调用 bind 和 return 是一种相当常见的模式,因此有一个叫 Deferred.map 的标准简写,它的签名如下:
# #show Deferred.map;;
val map : 'a Deferred.t -> f:('a -> 'b) -> 'b Deferred.t
它也有自己的中缀等价形式 >>|。使用它,可以再次把 count_lines 写得更简洁一些:
# let count_lines filename =
Reader.file_contents filename
>>| fun text ->
List.length (String.split text ~on:'\n');;
val count_lines : string -> int Deferred.t = <fun>
# count_lines "/etc/hosts";;
- : int = 10
注意,count_lines 返回的是一个 deferred,但 utop 会等待该 deferred 变为确定状态,并向我们展示 deferred 的内容。
17.1.1 使用 let 语法(Using Let Syntax)
如第 8 章“错误处理”中所讨论的,有一种特殊语法,我们称为 let 语法,专门用于处理单子。可以通过启用 ppx_let 来使用它。
# #require "ppx_let";;
下面是使用该语法后的 count_lines 的 bind 版本:
# let count_lines filename =
let%bind text = Reader.file_contents filename in
return (List.length (String.split text ~on:'\n'));;
val count_lines : string -> int Deferred.t = <fun>
下面是基于 map 的 count_lines 版本:
# let count_lines filename =
let%map text = Reader.file_contents filename in
List.length (String.split text ~on:'\n');;
val count_lines : string -> int Deferred.t = <fun>
这里的区别只是语法层面的。这些示例会编译成与使用中缀操作符编写的对应示例相同的内容。let 语法的好处在于,它凸显了单子 bind 与 OCaml 内置 let 绑定之间的类比,从而让代码更统一、更可读。
let 语法适用于任何单子;你通过打开相应的 Let_syntax 模块来决定正在使用哪个单子。打开 Async 也会隐式打开 Deferred.Let_syntax,但在某些上下文中,你可能希望显式打开它。
大多数情况下,let 语法更易读、更易用。使用 Async 时应该默认采用它,本章剩余部分也会这样做。
17.1.2 Ivar 与 upon(Ivars and Upon)
Deferred 通常通过组合 bind、map 和 return 构建,但有时你希望构造一个 deferred,并能以编程方式决定它何时被填入。这可以使用 ivar 完成。(ivar 这个术语可以追溯到一门名为 Concurrent ML 的语言,该语言由 John Reppy 在 20 世纪 90 年代初开发。ivar 中的 “i” 代表 incremental。)
使用 ivar 有三个基础操作:可以用 Ivar.create 创建一个;可以用 Ivar.read 读出与该 ivar 对应的 deferred;也可以用 Ivar.fill 填充 ivar,从而让对应的 deferred 变为确定状态。下面展示这些操作:
# let ivar = Ivar.create ();;
val ivar : '_weak1 Ivar.t =
{Async_kernel__.Types.Ivar.cell = Async_kernel__Types.Cell.Empty}
# let def = Ivar.read ivar;;
val def : '_weak1 Deferred.t = <abstr>
# Deferred.peek def;;
- : '_weak2 option = None
# Ivar.fill ivar "Hello";;
- : unit = ()
# Deferred.peek def;;
- : string option = Some "Hello"
Ivar 算是一种低层特性;像 map、bind 和 return 这样的操作符通常更易使用,也更容易推理。不过,当你想构建一种现有工具尚未很好支持的同步模式时,ivar 会很有用。
举个例子,假设我们想要一种方式来调度一系列动作,让它们在固定延迟之后运行。此外,我们还希望保证这些延迟动作按照被调度的顺序执行。下面这个签名描述了这一想法:
# module type Delayer_intf = sig
type t
val create : Time.Span.t -> t
val schedule : t -> (unit -> 'a Deferred.t) -> 'a Deferred.t
end;;
module type Delayer_intf =
sig
type t
val create : Time.Span.t -> t
val schedule : t -> (unit -> 'a Deferred.t) -> 'a Deferred.t
end
一个动作以返回 deferred 的 thunk 的形式传给 schedule。(thunk 是一个参数类型为 unit 的函数。)schedule 的调用者会得到一个 deferred,它最终会被填入该 thunk 返回的 deferred 值的内容。为了实现这一点,我们会使用一个名为 upon 的操作符,它的签名如下:
# #show upon;;
val upon : 'a Deferred.t -> ('a -> unit) -> unit
和 bind 与 return 类似,upon 会在传入的 deferred 被确定时调度一个回调函数执行;但与它们不同的是,它不会创建新的 deferred 来让这个回调填充。
我们的 delayer 实现围绕 thunk 队列组织:每次调用 schedule 都会向队列添加一个 thunk,并且还会在未来调度一个任务,从队列中取出一个 thunk 并运行它。等待部分通过函数 after 完成,该函数接收一个时间跨度,并返回一个在该时间跨度流逝后变为确定状态的 deferred:
# module Delayer : Delayer_intf = struct
type t = { delay: Time.Span.t;
jobs: (unit -> unit) Queue.t;
}
let create delay =
{ delay; jobs = Queue.create () }
let schedule t thunk =
let ivar = Ivar.create () in
Queue.enqueue t.jobs (fun () ->
upon (thunk ()) (fun x -> Ivar.fill ivar x));
upon (after t.delay) (fun () ->
let job = Queue.dequeue_exn t.jobs in
job ());
Ivar.read ivar
end;;
module Delayer : Delayer_intf
这段代码并不长,但很微妙。特别需要注意的是,thunk 队列用于确保排队的动作按照它们被调度的顺序运行,即使由 upon 调度的 thunk 实际运行顺序不同也一样。这种微妙性是涉及 ivar 和 upon 的代码的典型特征。因此,只要可能,你都应该坚持使用更简单的 map/bind/return 风格来处理 deferred。
从 ivar 和 upon 理解 bind(Understanding bind in Terms of Ivars and upon)
下面大致说明当你写 let d' = Deferred.bind d ~f 时会发生什么:
- 创建一个新的 ivar
i,用于保存计算的最终结果;返回对应的 deferred。 - 注册一个函数,在 deferred
d变为确定状态时调用。 - 该函数运行后,会用
d被确定出来的值调用f。 - 再注册另一个函数,在
f返回的 deferred 变为确定状态时调用。 - 当该函数被调用时,它会用结果填充
i,从而让对应的 deferred 变为确定状态。
这听起来很多,但我们可以相对简洁地实现它:
# let my_bind d ~f =
let i = Ivar.create () in
upon d (fun x -> upon (f x) (fun y -> Ivar.fill i y));
Ivar.read i;;
val my_bind : 'a Deferred.t -> f:('a -> 'b Deferred.t) -> 'b Deferred.t =
<fun>
Async 的真实实现有更多优化,因此也更复杂。但上面的实现仍然是理解 bind 底层工作方式的有用的一阶心智模型。它也是另一个很好的例子,说明 upon 和 ivar 可以如何用于构建并发原语。
17.2 示例:回显服务器(Example: An Echo Server)
现在已经掌握 Async 基础,我们来看一个小型独立 Async 程序。具体来说,我们会编写一个回显服务器,也就是一个接受客户端连接,并把客户端发给它的内容原样吐回去的程序。
第一步是创建一个函数,用于把数据从输入复制到输出。这里会使用 Async 的 Reader 和 Writer 模块,它们为处理输入输出通道提供了方便的抽象:
open Core
open Async
(* Copy data from the reader to the writer, using the provided buffer
as scratch space *)
let rec copy_blocks buffer r w =
match%bind Reader.read r buffer with
| `Eof -> return ()
| `Ok bytes_read ->
Writer.write w (Bytes.to_string buffer) ~len:bytes_read;
let%bind () = Writer.flushed w in
copy_blocks buffer r w
代码中使用 bind 来对操作排序,每个 bind 都标记了一个等待位置:
- 首先,调用
Reader.read获取一块输入。 - 当它完成且返回了新块时,把该块写入 writer。
- 最后,等待 writer 的缓冲区刷新;此时递归继续。
如果遇到文件结束条件,循环就结束。调用 copy_blocks 返回的 deferred 只有在遇到文件结束条件后才会变为确定状态。
copy_blocks 的一个重要方面是它提供了反压(pushback)。也就是说,如果进程无法在写入上取得进展,它就会停止读取。如果服务器没有实现反压,那么任何阻止你写入的事情,例如客户端跟不上,都会导致程序分配无界数量的内存,因为它需要记录所有打算写出但尚未真正写出的数据。
尾调用与 deferred 链(Tail-Calls and Chains of Deferreds)
你可能还会担心另一个内存问题,也就是 deferred 的分配。如果思考 copy_blocks 的执行过程,就会发现它正在创建一条 deferred 链,每轮循环创建两个。链的长度没有上界,因此天真来看,随着回显进程继续运行,它似乎会占用无界内存。
幸运的是,Async 知道如何优化这种情况。特别是,这整条 deferred 链应该精确地在链中最后一个 deferred 被确定时变为确定状态;在这里,也就是遇到 Eof 条件时。因此,我们可以安全地把所有这些 deferred 替换成一个 deferred。Async 正是这样做的,所以实际上并不存在内存泄漏。
这本质上是尾调用优化的一种形式,被提升到了 Deferred 单子中。事实上,你判断这里的 bind 不会导致内存泄漏的方式,与判断尾递归优化应该适用的方式大体相同:该 bind 创建的 deferred 处于尾位置。换句话说,一旦创建了这个 deferred,就不会再对它做任何事情;它只是被原样返回。
copy_blocks 提供了处理客户端连接的逻辑,但还需要设置服务器来接收这些连接,并分派给 copy_blocks。为此会使用 Async 的 Tcp 模块,它提供了一组用于创建 TCP 客户端和服务器的工具:
(** Starts a TCP server, which listens on the specified port, invoking
copy_blocks every time a client connects. *)
let run () =
let host_and_port =
Tcp.Server.create
~on_handler_error:`Raise
(Tcp.Where_to_listen.of_port 8765)
(fun _addr r w ->
let buffer = Bytes.create (16 * 1024) in
copy_blocks buffer r w)
in
ignore
(host_and_port
: (Socket.Address.Inet.t, int) Tcp.Server.t Deferred.t)
调用 Tcp.Server.create 的结果是一个 Tcp.Server.t,也就是一个服务器句柄,可用于关闭服务器。这里没有使用这个功能,因此显式忽略 server,以抑制未使用变量错误。我们在被忽略的值周围添加类型注解,是为了明确说明正在忽略的值的性质。
Tcp.Server.create 最重要的参数是最后一个,也就是客户端连接处理器。值得注意的是,前面的代码没有显式做任何事情来在通信结束后关闭客户端连接。这是因为一旦处理器返回的 deferred 被确定,服务器就会自动关闭连接。
最后,需要初始化服务器并启动 Async 调度器:
(* Call [run], and then start the scheduler *)
let () =
run ();
never_returns (Scheduler.go ())
Async 新手最常见的错误之一,就是忘记运行调度器。这种错误会令人困惑,因为没有调度器,程序什么也不会做;甚至对 printf 的调用都不会到达终端。
值得注意的是,尽管我们没有花太多显式精力思考多个客户端,这个服务器仍然能够处理许多客户端并发连接、读取和写入数据。
现在有了回显服务器,就可以使用 nc 调用的 netcat 工具连接它。注意,这里使用 dune exec 同时构建并运行可执行文件。使用双短横线是为了避免 Dune 自己的参数解析干扰被执行程序的参数解析。
$ dune exec -- ./echo.exe &
$ echo "This is an echo server" | nc 127.0.0.1 8765
This is an echo server
$ echo "It repeats whatever I write" | nc 127.0.0.1 8765
It repeats whatever I write
$ killall echo.exe
永不返回的函数(Functions that Never Return)
围绕 Scheduler.go 调用加上 never_returns 看起来有点意外,但它有一个目的:向调用 Scheduler.go 的人明确说明该函数永不返回。
默认情况下,一个不返回的函数会被推断出返回类型 'a:
# let rec loop_forever () = loop_forever ();;
val loop_forever : unit -> 'a = <fun>
# let always_fail () = assert false;;
val always_fail : unit -> 'a = <fun>
这有点奇怪,但确实说得通。毕竟,如果一个函数永不返回,我们就可以给它不存在的返回值归入任何类型。因此,从类型角度看,一个永不返回的函数可以适配程序中的任何上下文。
但这本身也可能有问题,尤其是对 Scheduler.go 这样的函数来说,它永不返回这一点也许并不完全明显。never_returns 的意义就是创建一个显式标记,让用户知道所讨论的函数不会返回。
为此,Scheduler.go 被定义为返回 Nothing.t 类型的值。
# #show Scheduler.go;;
val go : ?raise_unhandled_exn:bool -> unit -> never_returns
never_returns 只是 Nothing.t 的别名。
Nothing.t 是无人居住的(uninhabited),这意味着不存在该类型的值。因此,一个函数实际上不可能返回 Nothing.t 类型的值;所以只有永不返回的函数才能具有 Nothing.t 作为返回类型。只需添加一个类型注解,就能让一个永不返回的函数具有 Nothing.t 类型的返回值:
# let rec loop_forever () : Nothing.t = loop_forever ();;
val loop_forever : unit -> never_returns = <fun>
函数 never_returns 消耗一个 Nothing.t 类型的值,并返回不受约束的类型 'a。
# #show_val never_returns;;
val never_returns : never_returns -> 'a
如果你试图编写一个使用 Scheduler.go 的函数,并假设它返回 unit,就会得到有帮助的类型错误:
# let do_stuff n =
let x = 3 in
if n > 0 then Scheduler.go ();
x + n;;
Line 3, characters 19-34:
Error: This expression has type never_returns
but an expression was expected of type unit
because it is in the result of a conditional with no else branch
可以通过插入一个对 never_returns 的调用来修正它,从而让 Scheduler.go 不会返回这一事实对读者可见。
# let do_stuff n =
let x = 3 in
if n > 0 then never_returns (Scheduler.go ());
x + n;;
val do_stuff : int -> int = <fun>
17.2.1 改进回显服务器(Improving the Echo Server)
我们尝试让回显服务器再进一步,逐步完成一些改进。具体来说,将会:
- 用
Command添加一个合适的命令行接口。 - 添加一个标志用于指定监听端口,并添加另一个标志,让服务器返回所收到内容的大写版本。
- 使用 Async 的
Pipe接口简化代码。
下面的代码完成了所有这些事情:
open Core
open Async
let run ~uppercase ~port =
let host_and_port =
Tcp.Server.create
~on_handler_error:`Raise
(Tcp.Where_to_listen.of_port port)
(fun _addr r w ->
Pipe.transfer
(Reader.pipe r)
(Writer.pipe w)
~f:(if uppercase then String.uppercase else Fn.id))
in
ignore
(host_and_port
: (Socket.Address.Inet.t, int) Tcp.Server.t Deferred.t);
Deferred.never ()
let () =
Command.async
~summary:"Start an echo server"
(let%map_open.Command uppercase =
flag
"-uppercase"
no_arg
~doc:" Convert to uppercase before echoing back"
and port =
flag
"-port"
(optional_with_default 8765 int)
~doc:" Port to listen on (default 8765)"
in
fun () -> run ~uppercase ~port)
|> Command_unix.run
注意 run 函数中使用了 Deferred.never。顾名思义,Deferred.never 返回一个永远不会被确定的 deferred。在这个例子中,它表示回显服务器永远不会自行关闭。
前面代码中最大的变化,是使用 Async 的 Pipe。Pipe 是一种异步通信通道,用于连接程序的不同部分。可以把它看作一个消费者/生产者队列,它使用 deferred 来传达什么时候可以从管道读取或写入。这里对管道的使用相当少,但它们是 Async 的重要组成部分,所以值得稍微详细讨论。
管道以相互连接的读/写对形式创建:
# let (r,w) = Pipe.create ();;
val r : '_weak3 Pipe.Reader.t = <abstr>
val w : '_weak3 Pipe.Writer.t = <abstr>
r 和 w 实际上只是同一个底层对象的读句柄和写句柄。注意,r 和 w 具有弱多态类型,如第 9 章“命令式编程”中所讨论的,因此只能包含某个单一但尚未确定的类型的值。
如果只是尝试向 writer 写入,就会看到在 utop 中无限期阻塞。可以按 Control-C 跳出等待:
# Pipe.write w "Hello World!";;
Interrupted.
这是因为管道有一定数量的内部余量,也就是在写操作阻塞之前可以写入管道的槽位数。默认情况下,管道的余量为零,这意味着 write 返回的 deferred 只有在值被从管道读出后才会被确定。
# let (r,w) = Pipe.create ();;
val r : '_weak4 Pipe.Reader.t = <abstr>
val w : '_weak4 Pipe.Writer.t = <abstr>
# let write_complete = Pipe.write w "Hello World!";;
val write_complete : unit Deferred.t = <abstr>
# Pipe.read r;;
- : [ `Eof | `Ok of string ] = `Ok "Hello World!"
# write_complete;;
- : unit = ()
在 run 函数中,我们利用了 Pipe 模块为管道提供的众多工具函数之一。具体来说,我们使用 Pipe.transfer 设置一个过程,从 reader-pipe 取得数据并移动到 writer-pipe。下面是 Pipe.transfer 的类型:
# Pipe.transfer;;
- : 'a Pipe.Reader.t -> 'b Pipe.Writer.t -> f:('a -> 'b) -> unit Deferred.t =
<fun>
被连接的两个管道分别由 Reader.pipe 和 Writer.pipe 调用生成。注意,反压在整个过程中都被保留。因此,如果 writer 被阻塞,writer 的管道就会停止从 reader 的管道拉取数据,这会阻止 reader 继续读入更多数据。
重要的是,Pipe.transfer 返回的 deferred 会在 reader 被关闭,并且最后一个元素从 reader 转移到 writer 之后变为确定状态。一旦该 deferred 被确定,服务器就会关闭对应客户端连接。因此,当客户端断开连接时,其余关闭流程会透明地发生。
这个程序的命令行解析基于第 16 章介绍的 Command 库。打开 Async 会用一个扩展版本遮蔽 Command 模块,该扩展版本包含 async 调用:
# #show Command.async_spec;;
val async_spec :
('a, unit Deferred.t) Async.Command.basic_spec_command Command.with_options
它与普通的 Command.basic 调用不同之处在于,主函数必须返回一个 Deferred.t,并且运行命令时(使用 Command_unix.run)会自动启动 Async 调度器,而无需显式调用 Scheduler.go。
17.3 示例:用 DuckDuckGo 搜索定义(Example: Searching Definitions with DuckDuckGo)
DuckDuckGo 是一个搜索引擎,并提供可自由使用的搜索接口。本节会使用 Async 编写一个小型命令行工具,查询 DuckDuckGo,为一组术语提取定义。
代码会依赖若干其他库,它们都可以用 opam 安装。如果安装需要帮助,请参考安装说明。下面是所需库的列表:
textwrap
: 一个用于换行长文本的库。我们会用它打印结果。
uri
: 一个用于处理 URI 的库。URI 是 “Uniform Resource Identifiers”(统一资源标识符),HTTP URL 就是其中一种。
yojson
: 一个 JSON 解析库,第 19 章会介绍它。
cohttp
: 一个用于创建 HTTP 客户端和服务器的库。我们需要 Async 支持,它由 cohttp-async 包提供。
现在深入实现。
17.3.1 URI 处理(URI Handling)
HTTP URL 用于标识 Web 上的端点,它们实际上属于一个更通用的家族,叫作 Uniform Resource Identifiers(URI)。完整 URI 规范定义在 RFC3986 中,而且相当复杂。幸运的是,uri 库提供了强类型接口,替我们处理了许多麻烦。
我们需要一个函数生成用于查询 DuckDuckGo 服务器的 URI:
open Core
open Async
(* Generate a DuckDuckGo search URI from a query string *)
let query_uri query =
let base_uri =
Uri.of_string "http://api.duckduckgo.com/?format=json"
in
Uri.add_query_param base_uri ("q", [ query ])
Uri.t 由 Uri.of_string 函数构造,然后添加一个查询参数 q,其值为期望的搜索查询。把 URI 输出到网络协议时,该库会负责正确编码。
17.3.2 解析 JSON 字符串(Parsing JSON Strings)
DuckDuckGo 返回的 HTTP 响应是 JSON,这是一种常见且值得庆幸地相当简单的格式,定义在 RFC4627 中。我们会使用 Yojson 库解析 JSON 数据,第 19 章会介绍这个库。
我们期望来自 DuckDuckGo 的响应以 JSON 记录形式传来,在 Yojson 的 JSON 变体中由 Assoc 标签表示。还期望定义本身出现在键 "Abstract" 或 "Definition" 下。因此,下面的代码会同时查看这两个键,返回第一个被定义且非空的值:
(* Extract the "Definition" or "Abstract" field from the DuckDuckGo
results *)
let get_definition_from_json json =
match Yojson.Safe.from_string json with
| `Assoc kv_list ->
let find key =
match List.Assoc.find ~equal:String.equal kv_list key with
| None | Some (`String "") -> None
| Some s -> Some (Yojson.Safe.to_string s)
in
(match find "Abstract" with
| Some _ as x -> x
| None -> find "Definition")
| _ -> None
17.3.3 执行 HTTP 客户端查询(Executing an HTTP Client Query)
现在看看使用 Cohttp 库通过 HTTP 分派搜索查询的代码:
(* Execute the DuckDuckGo search *)
let get_definition word =
let%bind _, body = Cohttp_async.Client.get (query_uri word) in
let%map string = Cohttp_async.Body.to_string body in
word, get_definition_from_json string
为了更好地理解发生了什么,可以在 utop 中查看 Cohttp_async.Client.get 的类型:
# #require "cohttp-async";;
# #show Cohttp_async.Client.get;;
val get :
?interrupt:unit Deferred.t ->
?ssl_config:Conduit_async.V2.Ssl.Config.t ->
?headers:Cohttp.Header.t ->
Uri.t -> (Cohttp.Response.t * Cohttp_async.Body.t) Deferred.t
get 调用接收一个 URI 作为必需参数,并返回一个 deferred 值,其中包含一个 Cohttp.Response.t(这里会忽略)和一个 pipe reader,请求体会被流式写入该 reader。
在这个例子中,HTTP body 可能并不大,因此调用 Cohttp_async.Body.to_string,把连接中的数据收集成单个 deferred 字符串,而不是增量消费数据。
从并发角度看,运行单次搜索并不有趣,所以我们来编写并行分派多个搜索的代码。首先,需要用于格式化并打印搜索结果的代码:
(* Print out a word/definition pair *)
let print_result (word, definition) =
printf
"%s\n%s\n\n%s\n\n"
word
(String.init (String.length word) ~f:(fun _ -> '-'))
(match definition with
| None -> "No definition found"
| Some def ->
String.concat ~sep:"\n" (Wrapper.wrap (Wrapper.make 70) def))
这里使用 textwrap 包中的 Wrapper 模块执行换行。这段例程使用了 Async,尽管这一点可能并不明显:这里调用的 printf 实际上是 Async 的专用 printf,它通过 Async 调度器输出,而不是直接打印。打开 Async 时,原始的 printf 定义会被这个新定义遮蔽。它的一个重要副作用是,如果编写 Async 程序时忘记启动调度器,像 printf 这样的调用实际上不会产生任何输出。
下一个函数并行分派搜索,等待结果,然后打印:
(* Run many searches in parallel, printing out the results after
they're all done. *)
let search_and_print words =
let%map results = Deferred.all (List.map words ~f:get_definition) in
List.iter results ~f:print_result
我们使用 List.map 对每个词调用 get_definition,并用 Deferred.all 等待所有结果。下面是 Deferred.all 的类型:
# Deferred.all;;
- : 'a Deferred.t list -> 'a list Deferred.t = <fun>
Deferred.all 返回的列表会反映传入 deferred 的顺序。因此,无论查询以什么顺序返回,定义都会按照搜索词传入的顺序打印。这也意味着,在所有结果到达之前不会发生任何打印。
可以把这段代码改写成在结果一收到就打印,因此可能不按顺序打印:
(* Run many searches in parallel, printing out the results as you
go *)
let search_and_print words =
Deferred.all_unit
(List.map words ~f:(fun word ->
get_definition word >>| print_result))
区别在于,这里在传给 map 的闭包中既分派查询,也打印结果,而不是等待所有结果都返回后再一起打印。我们使用 Deferred.all_unit,它接收一个 unit deferred 列表,并返回单个 unit deferred,该 deferred 会在输入列表中的每个 deferred 被确定后变为确定状态。可以在 utop 中看到这个函数的类型:
# Deferred.all_unit;;
- : unit Deferred.t list -> unit Deferred.t = <fun>
最后,使用 Command.async 创建命令行接口:
let () =
Command.async
~summary:"Retrieve definitions from duckduckgo search engine"
(let%map_open.Command words =
anon (sequence ("word" %: string))
in
fun () -> search_and_print words)
|> Command_unix.run
这就是构建一个简单但可用的定义搜索器所需的一切:
$ dune exec -- ./search.exe "Concurrent Programming" "OCaml"
Concurrent Programming
----------------------
"Concurrent computing is a form of computing in which several
computations are executed during overlapping time
periods—concurrently—instead of sequentially. This is a property
of a system—this may be an individual program, a computer, or a
network—and there is a separate execution point or \"thread of
control\" for each computation. A concurrent system is one where a
computation can advance without waiting for all other computations to
complete."
OCaml
-----
"OCaml, originally named Objective Caml, is the main implementation of
the programming language Caml, created by Xavier Leroy, Jérôme
Vouillon, Damien Doligez, Didier Rémy, Ascánder Suárez and others
in 1996. A member of the ML language family, OCaml extends the core
Caml language with object-oriented programming constructs."
17.4 异常处理(Exception Handling)
使用外部资源编程时,错误无处不在。从不稳定服务器、网络中断,到本地资源耗尽,都可能导致运行时错误。在 OCaml 编程中,其中一些错误会显式出现在函数返回类型中,另一些则会以异常形式出现。第 8 章已经介绍过 OCaml 中的异常处理,但接下来会看到,并发程序中的异常处理会带来一些新的挑战。
为了更好地理解 Async 中的异常如何工作,我们创建一个会(有时)因异常失败的异步计算。函数 maybe_raise 会阻塞半秒,然后要么抛出异常,要么返回 unit;后续调用会在这两种行为之间交替:
# let maybe_raise =
let should_fail = ref false in
fun () ->
let will_fail = !should_fail in
should_fail := not will_fail;
let%map () = after (Time.Span.of_sec 0.5) in
if will_fail then raise Exit else ();;
val maybe_raise : unit -> unit Deferred.t = <fun>
# maybe_raise ();;
- : unit = ()
# maybe_raise ();;
Exception: (monitor.ml.Error Exit ("Caught by monitor block_on_async"))
在 utop 中,maybe_raise () 抛出的异常只会终止该表达式的求值;但在独立程序中,未捕获异常会让整个进程崩溃。
那么,如何捕获并处理这样的异常呢?你可能会尝试使用 OCaml 内置的 try/with 表达式,但可以看到,这并不能奏效:
# let handle_error () =
try
let%map () = maybe_raise () in
"success"
with _ -> return "failure";;
val handle_error : unit -> string Deferred.t = <fun>
# handle_error ();;
- : string = "success"
# handle_error ();;
Exception: (monitor.ml.Error Exit ("Caught by monitor block_on_async"))
这不起作用,是因为 try/with 只能捕获其内部同步执行代码抛出的异常;而 maybe_raise 调度了一个 Async 任务,这个任务会在未来,也就是 try/with 表达式已经退出之后,抛出异常。
可以使用 Async 提供的 try_with 函数捕获这种异步错误。try_with f 接收一个返回 deferred 的 thunk f,并返回一个 deferred。该 deferred 要么被确定为 Ok,其中包含 f 返回的内容;要么在 f 的返回值变为确定之前抛出异常时,被确定为 Error exn。
下面是 try_with 的一个简单示例:
# let handle_error () =
match%map try_with (fun () -> maybe_raise ()) with
| Ok () -> "success"
| Error _ -> "failure";;
val handle_error : unit -> string Deferred.t = <fun>
# handle_error ();;
- : string = "success"
# handle_error ();;
- : string = "failure"
17.4.1 监视器(Monitors)
try_with 是处理 Async 异常的有用工具,但它并不是全部。Async 的所有异常处理机制,包括 try_with,都建立在 Async 的监视器(monitor)系统之上。该系统受 Erlang 中同名错误处理机制启发。监视器相当低层,只有偶尔会被直接使用,但理解它们的工作方式仍然值得。
在 Async 中,monitor 是一个上下文,它决定发生未处理异常时该做什么。每个 Async 任务都在某个 monitor 的上下文中运行;当该任务正在运行时,这个 monitor 称为当前 monitor。调度新的 Async 任务时,例如使用 bind 或 map,它会继承生成它的任务的当前 monitor。
Monitor 被组织成一棵树:创建新 monitor 时,例如使用 Monitor.create,它会成为当前 monitor 的子节点。可以使用 within 在某个 monitor 中显式运行任务,它接收返回非 deferred 值的 thunk;也可以使用 within',它接收返回 deferred 的 thunk。下面是一个例子:
# let blow_up () =
let monitor = Monitor.create ~name:"blow up monitor" () in
within' ~monitor maybe_raise;;
val blow_up : unit -> unit Deferred.t = <fun>
# blow_up ();;
- : unit = ()
# blow_up ();;
Exception: (monitor.ml.Error Exit ("Caught by monitor blow up monitor"))
除了普通堆栈跟踪外,异常还会显示它经过的 monitor 跟踪,从我们创建的名为 "blow up monitor" 的 monitor 开始。你看到的其他 monitor 来自 utop 对 deferred 的特殊处理。
Monitor 不仅可以增强异常的错误跟踪。你还可以用 monitor 显式处理传递给该 monitor 的错误。Monitor.detach_and_get_error_stream 调用尤其重要。它会把 monitor 从父 monitor 分离出来,并返回原本会传递给父 monitor 的错误流。这允许你对错误进行自定义处理,其中可能包括把错误重新抛给父 monitor。下面是一个非常简单的函数示例,它会捕获并忽略自己启动的进程中的错误:
# let swallow_error () =
let monitor = Monitor.create () in
Stream.iter (Monitor.detach_and_get_error_stream monitor)
~f:(fun _exn -> printf "an error happened\n");
within' ~monitor (fun () ->
let%bind () = after (Time.Span.of_sec 0.25) in
failwith "Kaboom!");;
val swallow_error : unit -> 'a Deferred.t = <fun>
这个函数返回的 deferred 永远不会被确定,因为计算以异常而不是返回值结束。这意味着,如果在 utop 中运行该函数,就永远拿不回提示符。
可以使用 Deferred.any 加上超时来修复这个问题,得到一个我们知道最终会被确定的 deferred。Deferred.any 接收一个 deferred 列表,并返回一个 deferred;只要其中任意一个参数变为确定状态,它就会变为确定状态。
# Deferred.any [ after (Time.Span.of_sec 0.5)
; swallow_error () ];;
an error happened
- : unit = ()
可以看到,消息 "an error happened" 会在超时到期之前打印出来。
下面是一个 monitor 示例,它会把某些异常传递给父 monitor,而处理其他异常。异常通过 Monitor.send_exn 发送给父 monitor;调用 Monitor.current 可以找到当前 monitor,也就是新创建 monitor 的父 monitor。
# exception Ignore_me;;
exception Ignore_me
# let swallow_some_errors exn_to_raise =
let child_monitor = Monitor.create () in
let parent_monitor = Monitor.current () in
Stream.iter
(Monitor.detach_and_get_error_stream child_monitor)
~f:(fun error ->
match Monitor.extract_exn error with
| Ignore_me -> printf "ignoring exn\n"
| _ -> Monitor.send_exn parent_monitor error);
within' ~monitor:child_monitor (fun () ->
let%bind () = after (Time.Span.of_sec 0.25) in
raise exn_to_raise);;
val swallow_some_errors : exn -> 'a Deferred.t = <fun>
注意,这里使用 Monitor.extract_exn 来取得被抛出的底层异常。Async 会用额外信息包装它捕获到的异常,包括 monitor 跟踪。因此,如果你想依赖原始异常的细节,就需要取得底层异常。
如果传入的不是 Ignore_me,例如内置异常 Not_found,那么该异常会被传递给父 monitor,并像平常一样被交付:
# exception Another_exception;;
exception Another_exception
# Deferred.any [ after (Time.Span.of_sec 0.5)
; swallow_some_errors Another_exception ];;
Exception:
(monitor.ml.Error (Another_exception) ("Caught by monitor (id 69)")).
如果改为使用 Ignore_me,异常会被忽略,计算会在超时到期时完成:
# Deferred.any [ after (Time.Span.of_sec 0.5)
; swallow_some_errors Ignore_me ];;
ignoring exn
- : unit = ()
实践中,你应该很少直接使用 monitor,而应该使用建立在 monitor 之上的函数,例如 try_with 和 Monitor.protect。一个直接使用 monitor 的库示例是 Tcp.Server.create,它会跟踪处理网络连接的逻辑抛出的异常,也会跟踪响应单个请求的回调抛出的异常。在任一种情况下,它都会通过关闭连接来响应异常。构建这种自定义错误处理时,monitor 会很有帮助。
17.4.2 示例:用 DuckDuckGo 处理异常(Example: Handling Exceptions with DuckDuckGo)
现在回到 DuckDuckGo 客户端,并改进它的异常处理。具体来说,我们会修改它,使任何失败的查询都被报告出来,同时不会阻止其他查询完成。
当前搜索代码很少失败,所以做一个改动,让我们可以更可预测地触发失败。方法是让请求可以分布到多个服务器上。然后,我们会处理其中某个服务器指定错误时出现的错误。
首先,需要修改 query_uri,让它接收一个参数,用于指定要连接的服务器:
(* Generate a DuckDuckGo search URI from a query string *)
let query_uri ~server query =
let base_uri =
Uri.of_string
(String.concat [ "http://"; server; "/?format=json" ])
in
Uri.add_query_param base_uri ("q", [ query ])
此外,还需要做必要改动,从命令行取得服务器列表,并把搜索查询以轮询方式分配到这些服务器上。
现在,重新构建应用,并在两个服务器上运行它,其中一个服务器不会响应查询,看看会发生什么:
$ dune exec -- ./search.exe -servers localhost,api.duckduckgo.com "Concurrent Programming" "OCaml"
(monitor.ml.Error (Unix.Unix_error "Connection refused" connect 127.0.0.1:80)
("Raised by primitive operation at file \"duniverse/async_unix/src/unix_syscalls.ml\", line 1046, characters 17-74"
"Called from file \"duniverse/async_kernel/src/deferred1.ml\", line 17, characters 40-45"
"Called from file \"duniverse/async_kernel/src/job_queue.ml\", line 170, characters 6-47"
"Caught by monitor Tcp.close_sock_on_error"))
[1]
可以看到,出现了一个 "Connection refused" 失败,并导致整个程序结束,尽管两个查询中的一个本来可以自己成功完成。可以在每次调用 get_definition 时使用 try_with 函数,分别处理各个连接的失败,如下所示:
(* Execute the DuckDuckGo search *)
let get_definition ~server word =
match%map
try_with (fun () ->
let%bind _, body =
Cohttp_async.Client.get (query_uri ~server word)
in
let%map string = Cohttp_async.Body.to_string body in
word, get_definition_from_json string)
with
| Ok (word, result) -> word, Ok result
| Error _ -> word, Error "Unexpected failure"
这里先用 try_with 捕获异常,然后使用 match%map(ppx_let 提供的另一种语法)把错误转换成想要的形式:一个二元组,第一个元素是被搜索的词,第二个元素是可能出错的结果。
现在只需要修改 print_result 的代码,使其能够处理新类型:
(* Print out a word/definition pair *)
let print_result (word, definition) =
printf
"%s\n%s\n\n%s\n\n"
word
(String.init (String.length word) ~f:(fun _ -> '-'))
(match definition with
| Error s -> "DuckDuckGo query failed: " ^ s
| Ok None -> "No definition found"
| Ok (Some def) ->
String.concat ~sep:"\n" (Wrapper.wrap (Wrapper.make 70) def))
现在,如果运行同一个查询,就会看到连接失败被单独处理:
$ dune exec -- ./search.exe -servers localhost,api.duckduckgo.com "Concurrent Programming" OCaml
Concurrent Programming
----------------------
DuckDuckGo query failed: Unexpected failure
OCaml
-----
"OCaml, originally named Objective Caml, is the main implementation of
the programming language Caml, created by Xavier Leroy, Jérôme
Vouillon, Damien Doligez, Didier Rémy, Ascánder Suárez and others
in 1996. A member of the ML language family, OCaml extends the core
Caml language with object-oriented programming constructs."
现在,只有发送到 localhost 的查询失败了。
注意,在这段代码中,我们依赖这样一个事实:Cohttp_async.Client.get 会在异常之后自行清理,特别是会关闭它的文件描述符。如果需要直接实现这样的功能,可能会想使用 Monitor.protect 调用,它类似于第 8 章中描述的 protect 调用。
17.5 超时、取消与选择(Timeouts, Cancellation, and Choices)
在并发程序中,经常需要组合来自同一程序中多个不同并发子计算的结果。我们已经在 DuckDuckGo 示例中见过这一点,当时使用 Deferred.all 和 Deferred.all_unit 等待一组 deferred 变为确定状态。另一个有用的原语是 Deferred.both,它允许你等待两个不同类型的 deferred 返回,并以元组形式返回两个值。这里使用函数 sec,它是创建等于给定秒数的时间跨度的简写:
# let string_and_float =
Deferred.both
(let%map () = after (sec 0.5) in "A")
(let%map () = after (sec 0.25) in 32.33);;
val string_and_float : (string * float) Deferred.t = <abstr>
# string_and_float;;
- : string * float = ("A", 32.33)
不过,有时只想等待多个事件中最先发生的一个。这在处理超时时尤其常见。在这种情况下,可以使用 Deferred.any 调用。给定一个 deferred 列表,它会返回单个 deferred,一旦列表中的任意值被确定,该 deferred 就会变为确定状态。
# Deferred.any
[ (let%map () = after (sec 0.5) in "half a second")
; (let%map () = after (sec 1.0) in "one second")
; (let%map () = after (sec 4.0) in "four seconds")
];;
- : string = "half a second"
用它给 DuckDuckGo 搜索添加超时。下面的代码是 get_definition 的包装器,它接收一个超时时间(形式为 Time.Span.t),并返回定义;如果耗时过长,则返回错误:
let get_definition_with_timeout ~server ~timeout word =
Deferred.any
[ (let%map () = after timeout in
word, Error "Timed out")
; (match%map get_definition ~server word with
| word, Error _ -> word, Error "Unexpected failure"
| word, (Ok _ as x) -> word, x)
]
上面使用 let%map 来转换正在等待的 deferred 值,使 Deferred.any 可以在相同类型的值之间选择。
这段代码的问题在于,get_definition 启动的 HTTP 查询在超时触发时实际上并不会关闭。因此,get_definition_with_timeout 可能泄漏一个打开的连接。幸运的是,Cohttp 确实提供了一种关闭客户端的方法。可以把一个 deferred 通过 interrupt 标签传给 Cohttp_async.Client.get。一旦 interrupt 被确定,客户端连接就会关闭。
下面的代码展示如何修改 get_definition 和 get_definition_with_timeout,使超时到期时可以取消 get 调用:
(* Execute the DuckDuckGo search *)
let get_definition ~server ~interrupt word =
match%map
try_with (fun () ->
let%bind _, body =
Cohttp_async.Client.get ~interrupt (query_uri ~server word)
in
let%map string = Cohttp_async.Body.to_string body in
word, get_definition_from_json string)
with
| Ok (word, result) -> word, Ok result
| Error _ -> word, Error "Unexpected failure"
接下来,修改 get_definition_with_timeout,创建一个 deferred 传给 get_definition,这个 deferred 会在超时到期时变为确定状态:
let get_definition_with_timeout ~server ~timeout word =
match%map
get_definition ~server ~interrupt:(after timeout) word
with
| word, (Ok _ as x) -> word, x
| word, Error _ -> word, Error "Unexpected failure"
这会导致连接在超时时干净关闭;但代码不再显式知道超时是否已经触发。特别是,超时时的错误消息现在会是 "Unexpected failure",而不是之前实现中的 "Timed out"。
可以用 Async 的 choose 函数更精确地处理超时。choose 允许你在一组不同的 deferred 之间选择,并只对其中一个做出反应。每个 deferred 都通过函数 choice 与一个函数配对;当且仅当这个 deferred 被选择时,该函数才会被调用。下面是 choice 和 choose 的类型签名:
# choice;;
- : 'a Deferred.t -> ('a -> 'b) -> 'b Deferred.Choice.t = <fun>
# choose;;
- : 'a Deferred.Choice.t list -> 'a Deferred.t = <fun>
注意,并不保证胜出的 deferred 一定是最先变为确定状态的那个。但 choose 保证只有一个 choice 会被选中,并且只有被选中的 choice 会执行所附带的函数。
下面的例子使用 choose 来确保 interrupt deferred 当且仅当超时 deferred 被选中时才变为确定状态。代码如下:
let get_definition_with_timeout ~server ~timeout word =
let interrupt = Ivar.create () in
choose
[ choice (after timeout) (fun () ->
Ivar.fill interrupt ();
word, Error "Timed out")
; choice
(get_definition ~server ~interrupt:(Ivar.read interrupt) word)
(fun (word, result) ->
let result' =
match result with
| Ok _ as x -> x
| Error _ -> Error "Unexpected failure"
in
word, result')
]
现在,如果用足够小的超时时间运行,就会看到一个查询成功,另一个以超时形式失败:
$ dune exec -- ./search.exe "concurrent programming" ocaml -timeout 0.1s
concurrent programming
----------------------
"Concurrent computing is a form of computing in which several
computations are executed during overlapping time
periods—concurrently—instead of sequentially. This is a property
of a system—this may be an individual program, a computer, or a
network—and there is a separate execution point or \"thread of
control\" for each computation. A concurrent system is one where a
computation can advance without waiting for all other computations to
complete."
ocaml
-----
DuckDuckGo query failed: Timed out
17.6 使用系统线程(Working with System Threads)
虽然还没有使用过,但 OCaml 确实内置支持真正的系统线程,也就是内核级线程,其交错执行由操作系统控制。本章开头讨论过 Async 协作式线程模型相对系统线程的优点。不过,即使大多数时候使用 Async,OCaml 的系统线程有时仍然必要,值得理解。
OCaml 系统线程最令人惊讶的一点,是它们并不会给你任何物理并行能力。这是因为 OCaml 运行时有一个单一运行时锁,同一时间最多只能有一个线程持有它。
既然线程不提供物理并行,那它们为什么还有用?
使用系统线程最常见的原因是,有些操作系统调用没有非阻塞替代方案,这意味着不能在 Async 这样的系统中直接运行它们,否则会阻塞整个程序。因此,Async 维护了一个线程池来运行这类调用。大多数时候,作为 Async 用户你不需要思考这件事,但它确实在幕后发生。
拥有多个线程的另一个原因,是为了处理非 OCaml 库。这些库可能有自己的事件循环,或者出于其他原因需要自己的线程。在这种情况下,有时在外部线程上运行一些 OCaml 代码会很有用,作为与主程序通信的一部分。OCaml 的外部函数接口会在第 23 章更详细讨论。
多核 OCaml(Multicore OCaml)
今天的 OCaml 尚不支持真正并行的线程,但很快会支持。当前 OCaml 开发分支预计会在 2022 年作为 OCaml 5.0 发布,其中包含期待已久的支持多核的垃圾收集器,这是多年研究和艰难实现工作的成果。
这里不会讨论多核 GC,部分原因是它尚未发布,部分原因是关于 OCaml 程序应该如何以安全、方便且高性能的方式利用多核,仍有许多开放问题。综合来看,我们今天还不知道足够多,无法写出一章关于多核的内容。
无论如何,虽然多核 OCaml 还没有到来,但它是 OCaml 近期未来中令人兴奋的一部分。
系统线程的另一个偶尔用途,是更好地与计算密集型 OCaml 代码互操作。在 Async 中,如果有一个长时间运行的计算从不调用 bind 或 map,那么该计算会阻塞 Async 运行时直到自身完成。
处理这一问题的一种方式,是显式把计算拆成更小的片段,并在片段之间用 bind 分隔。但有时这种显式让出控制权并不实际,因为它可能需要侵入式修改现有代码库。另一个解决办法是把相关代码放到单独线程中运行。Async 的 In_thread 模块提供了多种设施来完成这件事,其中 In_thread.run 最简单。可以直接写成:
# let def = In_thread.run (fun () -> List.range 1 10);;
val def : int list Deferred.t = <abstr>
# def;;
- : int list = [1; 2; 3; 4; 5; 6; 7; 8; 9]
这会让 List.range 1 10 在 Async 的某个工作线程上运行。计算完成后,结果会被放入 deferred,之后就可以在 Async 中以普通方式使用它。
Async 和系统线程之间的互操作可能相当棘手。考虑下面这个函数,用于测试 Async 的响应性。该函数接收一个返回 deferred 的 thunk;它先运行该 thunk,然后使用 Clock.every 每 100 毫秒醒来一次并打印时间戳,直到返回的 deferred 变为确定状态,此时再打印最后一个时间戳:
# let log_delays thunk =
let start = Time.now () in
let print_time () =
let diff = Time.diff (Time.now ()) start in
printf "%s, " (Time.Span.to_string diff)
in
let d = thunk () in
Clock.every (sec 0.1) ~stop:d print_time;
let%bind () = d in
printf "\nFinished at: ";
print_time ();
printf "\n";
Writer.flushed (force Writer.stdout);;
val log_delays : (unit -> unit Deferred.t) -> unit Deferred.t = <fun>
如果把一个简单的超时 deferred 传给这个函数,它会如你所期望的那样,大约每 100 毫秒醒来一次:
# log_delays (fun () -> after (sec 0.5));;
37.670135498046875us, 100.65722465515137ms, 201.19547843933105ms, 301.85389518737793ms, 402.58693695068359ms,
Finished at: 500.67615509033203ms,
- : unit = ()
现在看看,如果等待的不是时钟事件,而是等待忙循环完成,会发生什么:
# let busy_loop () =
let x = ref None in
for i = 1 to 100_000_000 do x := Some i done;;
val busy_loop : unit -> unit = <fun>
# log_delays (fun () -> return (busy_loop ()));;
Finished at: 874.99594688415527ms,
- : unit = ()
可以看到,log_delays 并没有每秒醒来 10 次,而是在 busy_loop 忙碌期间完全被挡住了。
另一方面,如果使用 In_thread.run 把它转移到另一个系统线程,行为就会不同:
# log_delays (fun () -> In_thread.run busy_loop);;
31.709671020507812us, 107.50102996826172ms, 207.65542984008789ms, 307.95812606811523ms, 458.15873146057129ms, 608.44659805297852ms, 708.55593681335449ms, 808.81166458129883ms,
Finished at: 840.72136878967285ms,
- : unit = ()
现在 log_delays 的确有机会运行,但不再是整齐的 100 毫秒间隔。原因是,一旦使用系统线程,就要任由操作系统决定每个线程何时被调度。线程行为非常依赖操作系统以及它的配置方式。
处理 OCaml 线程的另一个棘手方面与分配有关。编译为原生代码时,OCaml 线程只有在与分配器交互时才有机会让出运行时锁。因此,如果有一段代码完全不分配,那么它永远不会允许另一个 OCaml 线程运行。字节码没有这种行为,所以如果在字节码中运行一个不分配的循环,计时器进程就有机会运行:
# let noalloc_busy_loop () =
for i = 0 to 100_000_000 do () done;;
val noalloc_busy_loop : unit -> unit = <fun>
# log_delays (fun () -> In_thread.run noalloc_busy_loop);;
32.186508178710938us, 116.56808853149414ms, 216.65477752685547ms, 316.83063507080078ms, 417.13213920593262ms,
Finished at: 418.69187355041504ms,
- : unit = ()
但如果把它编译成原生代码可执行文件,那么这个不分配的忙循环会阻塞其他所有东西运行:
$ dune exec -- native_code_log_delays.exe
197.41058349609375us,
Finished at: 1.2127914428710938s,
这些例子的要点是,预测线程交错执行是一件微妙的事情。保持在 Async 范围内确实有其限制,但它会带来更可预测的行为。
17.6.1 线程安全与加锁(Thread-Safety and Locking)
一旦开始使用系统线程,就需要小心可变数据结构。大多数可变 OCaml 数据结构在被多个线程并发访问时都会表现出非确定性行为。你可能遇到的问题从运行时异常到数据结构损坏都有。这意味着,在不同系统线程之间共享可变数据时,几乎总是应该使用互斥锁。即使某些数据结构看起来似乎应该安全,但如果底层是可变的,例如惰性值,从多个线程访问时也可能产生令人意外的行为。
OCaml 有两个常用的互斥锁包:标准库中的 Mutex 模块,它只是操作系统级互斥锁的包装;以及 Nano_mutex,这是一个更高效的替代方案,利用 OCaml 运行时已经执行的一些加锁,在很多时候避免创建操作系统级互斥锁。因此,创建 Nano_mutex.t 比创建 Mutex.t 快 20 倍,而获取互斥锁大约快 40%。
总体而言,结合 Async 和线程相当棘手,但如果满足下面两个条件,它会相当简单:
- 各个相关线程之间没有共享可变状态。
- 由
In_thread.run执行的计算不会调用 Async 库。
话虽如此,即使违反这些约束,也可以安全地以某些方式使用线程。特别是,外部线程可以使用 Async 中 Thread_safe 模块的调用获取 Async 锁,从而安全地运行 Async 计算。这是一种把线程连接到 Async 世界的非常灵活的方式,但属于复杂用例,超出了本章范围。