任务

什么是任务?

任务是进程, 是用于执行特定动作的进程, 通常较少或不与其他进程通信. 任务最常用的情况是异步地计算一个值.

1
task = Task.async(fn -> do_some_work() end)
res  = do_some_other_work()
res + Task.await(task)

任务通常不返回值, 但是有时候我们需要让任务计算一个值,随后读取其计算结果.

async/await 提供了一个非常简单的机制并发地计算.

1
iex(foo@44adb2a6d305)2> task = Task.async(fn -> :math.pow(7,2) + :math.pow(8,2) end)
%Task{pid: #PID<0.97.0>, ref: #Reference<0.0.0.238>}
iex(foo@44adb2a6d305)3> Task.await(task)
113.0

首先来说明异步, Task.async/1创建一个任务,并执行传递给它的函数, 如上述代码第2行所示, Task.async/1调用返回一个对任务的引用.

这行输出很好地阐述了文章开头这么一句话任务是进程.该输出实际上是一个%Task Map, 包含两个元素 进程的Pid, 以及一个引用对象.

随后可以通过调用Task.await/1获取其计算结果. 调用Task.async/1会创建一个新的进程,该进程连接到调用者进程. 任务结果以消息的形式返回给调用者进程.
Task.await/2用于读取由任务发送的消息.

创建任务task1

1
iex(foo@44adb2a6d305)4> task1 = Task.async(fn -> :math.pow(7,2) + :math.pow(8,2) end)
%Task{pid: #PID<0.100.0>, ref: #Reference<0.0.0.245>}

创建任务task2

1
iex(foo@44adb2a6d305)6> task2 = Task.async(fn -> 2+2 end)
%Task{pid: #PID<0.103.0>, ref: #Reference<0.0.0.252>}

分别获取两个任务的计算结果 v1, v2

1
v1 = Task.await(task1)
v2 = Task.await(task2)

再通过第三个任务task3合并计算两个任务的结果

1
iex(foo@44adb2a6d305)15> task3 = Task.async(fn -> v1 + v2 end)
%Task{pid: #PID<0.117.0>, ref: #Reference<0.0.0.288>}
iex(foo@44adb2a6d305)16> Task.await(task3)
117.0

还可以通过start_link/1start_link/3创建任务并挂在到Supervition树中.

Task.async/1不同,Task.start_link/1返回的是一个{:ok,pid}而不是一个任务引用,因此我无法通过Task.await/2获取其计算结果.

1
iex(foo@44adb2a6d305)21> Task.start_link(fn -> 1 + 1 end)
{:ok, #PID<0.128.0>}

这样的任务可以挂在到supervision树, 下面的代码片段显示了如何挂载一个任务到一个supervision树中:

1
import Supervisor.Spec
children = [
    worker(Task, [fn -> IO.puts "ok" end])
]

通过上述几个步骤,我们实现了并发处理. 下面阐述如何把任务分布到集群中的多个节点从而实现分布式.

Supervision 树

Task.Supervisor模块允许开发者创建可动态第添加任务的监控进程(Supervisors):

1
{:ok, pid} = Task.Supervisor.start_link()
Task.Supervisor.async(pid, Module, :function, [arg1, arg2, arg3])

Task.Supervisor还可以在远程节点上创建任务, 只要该监控进程在本地或全局注册过:

1
# 在远程节点
Task.Supervisor.start_link(name: :tasks_sup)
# 在客户端
Task.Supervisor.async({:tasks_sup, :'bar@192.168.8.8'}, Module, :function, [arg1, arg2, arg3])

Task.Supervisor通常在监控树中以下列方式启动

1
import Supervisor.Spec
children = [
    supervisor(Task.Supervisor, [[name: :tasks_sup]
]

需要注意的是, 当处理分布式任务时, 应该使用async/3, 分别传递模块,函数,参数列表, 而不是以一个匿名函数作为参数的async/1.

分布 (Distrbuted)

TODO::