Go如何保证内存顺序

内存顺序简介

许多编译器(在编译时)和CPU处理器(在运行时)经常通过指令重排进行一些优化,使得指令的执行顺序可能与代码中显示的顺序有所不同,指令的执行顺序也通常称为内存顺序。

当然,指令重排不能是任意的。在某个goroutine中能受指令重排影响的前提条件是,goroutine与其他goroutine共享数据,否则goroutine本身不能检测到指令重排。换句话说,不与其他goroutine共享数据的goroutine,它的代码执行顺序始终与代码编写顺序相同,即使实际上有指令重排也是如此。

对于这样的goroutine,它可以认为其指令执行顺序始终与代码指定的顺序相同,即使实际上在其中进行了指令重新排序也是如此。

但是,如果某些goroutine共享数据,则一个goroutine可能会观察到另一个goroutine中的指令重排,并影响相关goroutine的行为。在goroutine之间共享数据在并发编程中很常见,如果忽略指令重排造成的结果影响,我们的并发程序的行为可能与编译器和CPU有关,并且容易出异常。

这是一个不严谨的Go程序,它不考虑指令重排。该程序是从文档Go内存模型中的示例扩展而来的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package main

import "log"
import "runtime"

var a string
var done bool

func setup() {
a = "hello, world"
done = true
if done {
log.Println(len(a)) // always 12 once printed
}
}

func main() {
go setup()

for !done {
runtime.Gosched()
}
log.Println(a) // expected to print: hello, world
}

该程序的行为像我们预期的那样,会打印hello, world.世界文本。但是,该程序的行为取决于编译器和CPU,如果该程序是使用不同的编译器或更高版本的编译器进行编译 ,或者它运行在不同体系结构CPU上,则可能不会打印hello, world.文本,或者可能会打印出不同的文本。原因是编译器和CPU可能会交换setup()函数中前两行的执行顺序,因此setup()函数的最终执行顺序可能是以下代码顺序。

1
2
3
4
5
6
7
func setup() {
done = true
a = "hello, world"
if done {
log.Println(len(a))
}
}

上述程序中的setup()函数的goroutine无法观察到指令重排,因此log.Println(len(a))行将始终打印12(如果setup()能在主线程退出之前执行),但是主goroutine可能会观察到指令重排,这就是打印的文本可能不是hello, world.的原因。

除了内存重新排序的问题外,程序中还存在数据竞争问题。使用变量adone并没有任何同步措施,因此上面的程序充分展示了并发编程的错误。专业的Go程序员不应犯这些错误。

我们可以使用Go工具链中提供的go build -race命令来构建程序,然后运行提取可执行文件来检查程序中是否存在数据竞争。

Go内存模型

有时,我们需要确保一个goroutine中某些代码行的执行必须在另一个goroutine中某些代码行的执行之前(或之后)执行(从这两个goroutine中任何一个的视角来看),以保持代码逻辑的正确性。在这种情况下,指令重排可能会造成一些麻烦。我们应如何防止某些可能的指令重排?

不同的CPU架构提供不同的栅栏指令以防止不同种类的指令重排。某些编程语言提供将这些栅栏指令插入代码中的相应函数,但是,理解和正确使用栅栏指令提高了并发编程的使用标准。

Go的设计理念是使用尽可能少的特性功能以支持尽可能多的用户场景,同时确保足够好的整体代码执行效率。因此,Go内置和标准包不提供直接使用CPU栅栏指令的方法。实际上,CPU栅栏指令体现在Go支持的各种同步技术中。因此,我们应该使用这些同步技术来确保预期的代码执行顺序。

下文将列出Go中某些保证的(和非保证的)代码执行顺序,包括在Go内存模型文档和其他官方Go文档中提到或未提到的。

在下面的描述中,如果我们说事件A肯定在事件B之前发生,则意味着涉及这两个事件的任何goroutine都会观察到一个现象,在源代码中的事件A之前出现的任何语句,都将在事件B之后出现的任何语句之前执行。对于其他不相关的goroutine,观察到的顺序可能与刚刚描述的顺序不同。

goroutine的创建发生在goroutine的执行之前

在以下函数中,赋值x, y = 123, 789语句将在fmt.Println(x)之前调用,并在调用fmt.Println(y)之前执行调用fmt.Println(x)。

1
2
3
4
5
6
7
8
9
10
var x, y int
func f1() {
x, y = 123, 789
go func() {
fmt.Println(x)
go func() {
fmt.Println(y)
}()
}()
}

但是,以下函数中三个语句的执行顺序不确定,此函数存在数据竞争。

1
2
3
4
5
6
7
8
9
10
11
12
var x, y int
func f2() {
go func() {
// Might print 0, 123, or some others.
fmt.Println(x)
}()
go func() {
// Might print 0, 789, or some others.
fmt.Println(y)
}()
x, y = 123, 789
}

channel操作与顺序保证有关系

Go内存模型文档列出了下面三种channel相关的顺序保证。

  • 不管该channel是有缓冲的还是无缓冲的,向该channel的第n次写入成功,都会在从该channel第n次成功读取之前发生。
  • 从容量为m的channel进行的第n次成功读取发生在向该channel的第(n+m)次成功写入之前,特例是如果该channel无缓冲(m == 0),则从该channel进行的第n次成功读取发生在 该channel上的第n个成功写入之前。
  • 读取完成之前关闭channel,如果发生读取动作,会返回零值,因为channel是关闭状态。

实际上,到一个channel的第n个成功写入和第n个成功读取是同一事件。

下面例子展示了使用无缓冲的channel如何保证一些代码的执行顺序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func f3() {
var a, b int
var c = make(chan bool)

go func() {
a = 1
c <- true
if b != 1 { // impossible
panic("b != 1") // will never happen
}
}()

go func() {
b = 1
<-c
if a != 1 { // impossible
panic("a != 1") // will never happen
}
}()
}

在这里,对于两个新创建的goroutine,以下顺序可以保证:

  • 赋值b = 1的执行绝对在条件b != 1的判断之前结束
  • 赋值a = 1的执行绝对在条件a != 1的判断之前结束

因此,上面示例中的两次panic调用将永远不会执行,但是以下示例中的panic调用可能会执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func f4() {
var a, b, x, y int
c := make(chan bool)

go func() {
a = 1
c <- true
x = 1
}()

go func() {
b = 1
<-c
y = 1
}()

// Many data races are in this goroutine.
// Don't write code as such.
go func() {
if x == 1 {
if a != 1 { // possible
panic("a != 1") // may happen
}
if b != 1 { // possible
panic("b != 1") // may happen
}
}

if y == 1 {
if a != 1 { // possible
panic("a != 1") // may happen
}
if b != 1 { // possible
panic("b != 1") // may happen
}
}
}()
}

在这里,对于第3个goroutine,与channel c上的操作无关。不能保证观察到的顺序和前两个新创建的goroutine所观察到的顺序一致,因此,可能会执行四个panic调用中的任何一个。

实际上,大多数编译器实现的确保证了上面示例中的四个panic调用永远不会执行,但是Go官方文档从未做出这样的保证。因此上面示例中的代码在交叉编译器或交叉编译器版本是不兼容的(会有顺序问题)。我们应该遵循Go官方文档来编写专业的Go代码。

这是一个使用有缓冲的channel的例子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func f5() {
var k, l, m, n, x, y int
c := make(chan bool, 2)

go func() {
k = 1
c <- true
l = 1
c <- true
m = 1
c <- true
n = 1
}()

go func() {
x = 1
<-c
y = 1
}()
}

可以保证以下顺序:

  • k = 1的执行在y = 1的执行之前结束。
  • x = 1的执行在n = 1的执行之前结束。

但是,不能保证x = 1的执行在l = 1和m = 1的执行之前发生,并且l = 1和m = 1的执行不能保证在y = 1的执行之前发生。

以下是通道关闭的示例。在此示例中,保证k = 1的执行在y = 1的执行之前结束,但不保证在x = 1的执行之前结束。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func f6() {
var k, x, y int
c := make(chan bool, 1)

go func() {
c <- true
k = 1
close(c)
}()

go func() {
<-c
x = 1
<-c
y = 1
}()
}

互斥与顺序保证有关系

以下是使用Go中互斥锁保证顺序的场景。

  • 对于同步标准库中的Mutex或RWMutex类型的引用变量m,第n个m.Unlock()方法的成功调用发生在第(n+1)个m.Lock()方法调用返回之前
  • 对于RWMutex类型的引用变量rw,如果返回了第n个rw.Lock()方法调用,则其第n个rw.Unlock()方法的成功调用,发生在rw.RLock()方法调用返回之前,并且发生在第n个rw.Lock()方法调用之后
  • 对于RWMutex类型的引用变量rw,如果返回了第n个rw.RLock()方法调用,则其第m次rw.RUnlock()方法的成功调用(其中m <= n),发生在任何rw.Lock()方法调用之前,发生在第n个rw.RLock()方法调用之后

后面的示例中,保证以下顺序:

  • a = 1的执行在b = 1的执行之前结束。
  • m = 1的执行在n = 1的执行之前结束。
  • x = 1的执行在y = 1的执行之前结束。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
func fab() {
var a, b int
var l sync.Mutex // or sync.RWMutex

l.Lock()
go func() {
l.Lock()
b = 1
l.Unlock()
}()
go func() {
a = 1
l.Unlock()
}()
}

func fmn() {
var m, n int
var l sync.RWMutex

l.RLock()
go func() {
l.Lock()
n = 1
l.Unlock()
}()
go func() {
m = 1
l.RUnlock()
}()
}

func fxy() {
var x, y int
var l sync.RWMutex

l.Lock()
go func() {
l.RLock()
y = 1
l.RUnlock()
}()
go func() {
x = 1
l.Unlock()
}()
}

注意,在以下代码中,根据官方的Go文档,不能保证p = 1的执行在q = 1的执行之前结束,尽管大多数编译器确实提供这种保证。

1
2
3
4
5
6
7
8
var p, q int
func fpq() {
var l sync.Mutex
p = 1
l.Lock()
l.Unlock()
q = 1
}

sync.WaitGroup保证顺序

在给定的时间,假设计数器由一个sync.WaitGroup的引用变量wg维护,并且不为零。如果在指定时间之后,调用一组wg.Add(n)方法,我们可以确保最后一次wg.Done()调用会将wg维护的计数器修改为零,然后保证wg.Add(n)和wg.Done()都在wg.Wait方法调用返回之前发生,该调用在指定逻辑之后调用。

注意,wg.Done()等同于wg.Add(-1)。

请阅读有关sync.WaitGroup类型的说明,以及如何使用sync.WaitGroup。

sync.Once保证顺序

请阅读sync.Once类型的说明以获取sync.Once的值以及如何使用sync.Once保证顺序。

sync.Cond保证顺序

很难清楚地描述sync.Cond的顺序保证。请阅读sync.Cond类型的说明以及如何使用sync.Cond值。

atomic原子变量保证顺序

Go的官方文档都没有提到原子变量同步技术保证的内存顺序。但是,在标准Go编译器的实现中,原子变量操作确实存在一些内存顺序保证。标准包在很大程度上依赖于原子操作提供的保证。

如果使用标准Go编译器1.14进行编译,以下程序将始终打印1。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import "fmt"
import "sync/atomic"
import "runtime"

func main() {
var a, b int32 = 0, 0

go func() {
atomic.StoreInt32(&a, 1)
atomic.StoreInt32(&b, 1)
}()

for {
if n := atomic.LoadInt32(&b); n == 1 {
// The following line always prints 1.
fmt.Println(atomic.LoadInt32(&a))
break
}
runtime.Gosched()
}
}

在这里,主goroutine总是会观察到a的修改要早于b的修改。但是,原子操作所提供的顺序保证不会在Go规范和任何其他官方Go文档中列出。但是,跨编译器体系和跨编译器版本兼容的Go代码,安全的建议是,在普世Go编程中,不要依赖于atomic原子变量来保证内存顺序。有一个关于如何对普通编码人员开放使用atomic原子变量的issue。但是,到目前为止( 转到1.14),尚未做出决定。

欢迎阅读本文以了解如何保证代码执行的内存顺序。


【译】Airflow的挑战者Prefect

在数据工程生态系统,Airflow一直是一个重要的工具,技术工作者也利用它开发了很多业务。它开创性地兼备标准DAG模型和Python的灵活性,从而广泛的适用现实生活中的workflow业务。然而,Airflow被当做一个臃肿的批处理调度器来设计,目标使用者是团队打算招来协调组织其他系统的数据工程师,因此其可用性受到了很大限制。

今天,许多数据工程师正在与分析人员更直接地合作,计算存储都是便宜的,使得试错成本低,实验得到注重。加上业务workflow快速动态变化,尽管Airflow在许多事情上做对了,但是其核心目标从未预料到数据应用会变得如此多样化。很显然,它没有亟需的数据抽象和接口来实现那些日异月新的业务活动。

时间追溯到2016年如何改变Airflow来支撑新的多样化数据实践的讨论中,Prefect产品的第一次全面计划就此萌芽。令人失望的是,Aiflow在讨论后并未改进。我们才能开源了现代化的数据平台Prefect,早期的commit和评论反馈令我们很鼓舞。

我们知道用户关心关于Prefect和Airflow的对比,尤其是考虑到Prefect的血统与Airflow相关。本文主要对比阐述Prefect引擎是如何用特性解决一些Airflow无法解决的问题,并不是Prefect详尽的特性说明书,而是给熟悉Airflow的人提供用户指南,指明Prefect解决问题的方法。尽可能地讨论已解决的问题和方法,对于开源仓库中当前不可用内容的介绍试图保持平衡和克制,希望这对社区有益。

内容提要

  • 概览
  • API
  • 调度和时间的关系
  • 调度服务
  • 数据流
  • 参数化workflow
  • 动态workflow
  • 多版本workflow
  • 本地测试
  • UI操作界面
  • 总结

概览

Airflow是用来为按照固定的时间调度运行偏静态的、task流转缓慢的workflow而设计,在这个目标上算是优秀的框架。Airflow也是第一个用代码成功实现强大灵活的workflow的有用榜样。事实证明,无需借助配置文件的定义,也可实现workflow。

然而,从能处理多样化的workflow业务的角度,尤其以现在的标准来看,Airflow为workflow设计提供的数据抽象和接口很有限。开发者为了实现业务,把业务勉强套进Airflow模型会遇到麻烦。Airflow不能很好地满足以下场景:

  • 无调度或者脱离调度的DAG
  • 同一时间多次并发启动的DAG
  • 复杂分支逻辑的DAG
  • 实时task执行的DAG
  • 需要交换数据的DAG
  • 参数化的DAG
  • 动态的DAG

如果你的使用场景与上述任何一种相似,将无法直接使用Airflow,除非你在Airflow的抽象设计层面花费大量的工夫。因此,几乎每个中大型规模的公司最终都会编写自定义的DSL,或者维护大量专有插件来满足内部复杂多变的业务需求,又造成升级和故障维护的困难。

Prefect汲取了多年从事Airflow相关项目的经验。产品的研究覆盖了数百用户和公司,发现了Airflow难以解决的隐患。然后通过支撑适合大多数业务场景的架构抽象设计,它最终实现了难以置信的轻量好用的接口。注意,本文Airflow DAG和Prefect flow都是workflow工作流的意思

API

当workflow能够定义成代码时,他们变得更加
易于维护,方便多版本追踪,好测试,可协作。

– Airflow 官方文档

生产中应用workflow对复杂业务是非常有利的,并且牵扯到技术分工范围的多个利益协作者。因此考虑到workflow在数据处理技术栈处处可用,让workflow足够清晰简单是很重要的。Python是开发workflow的语言的最优选择。Airflow是第一个认识到这一点的框架,并且用Python实现API。

但是,Airflow的API是命令式(imperative)和面向对象(基于类)的。此外,由于Airflow对DAG的代码开发的诸多限制(后面展开讲),编写Airflow DAG代码就像是在开发Airflow特定风格的代码。

Prefect的基本理念是,如果开发者能够保证编写的业务函数按预期正常运行,对workflow框架是无感知的。只有当bug出现,程序抛异常,workflow框架的管理作用才显现出来。这么看,workflow框架可算是流程风险管理工具,并且设计良好的情况下,应该降低workflow框架对业务代码的显式侵入,除非开发者明确需要,否则workflow框架不要让用户在开发路上觉得碍眼。

所以Prefect的设计目标是在业务代码正确的情况下是几乎零侵入透明的,同时在出错时,workflow框架能尽量协助业务重回正轨。无论哪种方式,Prefect都可以为业务提供相应级别的透明度和细节。

实现上述目标的基本方法是使用Prefect的功能API。在此模式下,Prefect的task行为类似于Python函数,可以直接定义输入输出和调用它。简单到只用一行Prefect装饰器代码,就将Python函数转换为task。像是Python函数一样自然,task之间的互相调用就地构建workflow,非常具有Python风格。这使得通常将已有业务代码和脚本转换成全栈Prefect的workflow是轻车熟路的。

不用担心,Prefect同时还为Airflow用户提供Airflow风格的API,这种命令式风格对于构建复杂task依赖的workflow很有用,可以实现底层抽象控制。用户可以根据需求和倾向,来切换这两种风格使用。

调度和时间的关系

时间是一种幻想,就像午餐时间的加倍。

– 银河系漫游指南

对于Airflow的新手来说,可能最常见的困惑是不同的时间概念的用途。如果你运行Airflow教程例子,你会发现要一直纠结这些不同的时间定义是什么意思呢?

1
2
3
4
5
6
7
airflow test tutorial print_date 2015–06–01

## Output
AIRFLOW_CTX_EXECUTION_DATE=2015–06–01T00:00:00+00:00
[2019–04–17 15:54:45,679] {bash_operator.py:110} INFO - Running command: date
[2019–04–17 15:54:45,685] {bash_operator.py:119} INFO - Output:
[2019–04–17 15:54:45,695] {bash_operator.py:123} INFO - Wed Apr 17 15:54:45 PDT 2019

Airflow严格依赖execution_date的时间定义。DAG要设置execution_date才可以运行,同一个DAG不可能生成2个运行execution_date相同的实例。当一个DAG需要同时开始执行两次的呢?Airflow就无法支持。这需要创建2个DAG ID不同业务逻辑完全相同的DAG来生成各自的可执行实例,或者同一个DAG生成execution_date相隔1ms的2个可执行实例,或者使用Hack方式达到目的。

更令人困惑的是,execution_date并不是Airflow DAG可执行实例的start_time,start_time=execution_date+一个周期间隔。这最初是由于ELT编排业务要求所致,execution_date为5月2日时间点的数据作业将在5月3日的同一时间点真正开始运行。时至今日,这都是用户尤其是新用户的最大阻碍。

下一个时间点才开始运行可执行实例产生的间隔设计来源于Airflow严格要求DAG有良好的执行计划。直到现在,它都不可能运行脱离计划调度的DAG,因为调度器无法正确地给这类DAG可执行实例安排start_time。其实临时可执行实例也是可以产生的,只要该临时的可执行实例不与同一个DAG下面其他的可执行实例有同一个start_time。

这意味对于以下需求,Airflow会是一个错误的选择:

  • 不规则调度或者无调度的DAG
  • 同一个DAG生成多份同时开始的可执行实例
  • 维护经常手工触发产生可执行实例的DAG

作为对比,Prefect将workflow的可执行实例当作可以任何时候执行、任意并发的对象。
调度计划是一组预定义好start_time的可执行实例的编排,可以根据需求很灵活的定义简单或者复杂的调度计划。
如果workflow的可执行实例确实要严格遵守时间计划,只需要简单地将时间添加为workflow可执行实例的参数。

调度服务

R2-D2(星球大战的宇航技工机器人),你比起你作为一台奇怪的计算机器了解的更好。

–C-3PO(星球大战的礼仪机器人)

Airflow的调度器是Airflow的核心部分,对Airflow使用有性能影响。主要负责如下:

  • 每隔几秒钟重新分析一次DAG文件夹,生成DAG计划
  • 检查DAG计划以确定是否生成DAG可执行实例来准备就绪
  • 检查所有task依赖项,以确定是否有后续的task可以运行
  • 在数据库设置最终的DAG运行实例和task运行实例的状态

相对的,Prefect将这么多架构设计解耦到不同的独立模块中。

Prefect flow Scheduling

Prefect flow Scheduling(workflow可执行实例调度)是很轻量的,只需要简单创建一个新的可执行实例,并且设置成Scheduled状态。这就是Prefect flow Scheduling实际的唯一责任,调度器绝对不会干涉任何workflow的依赖逻辑。

Prefect flow Logic

Prefect flow Logic(workflow的依赖设置)是独立的编码部分,依赖逻辑绝不侵入业务,也不会把workflow相关的状态管理掺杂到业务里面。作为证据,你可以在本地运行空的workflow的情况,发现workflow进程几乎没有资源开销。

1
2
3
# run your first Prefect flow from the command line

python -c "from prefect import Flow; f = Flow('empty'); f.run()"

Prefect Task Scheduling

当Prefect flow的可执行实例在运行期间,只管调度属于自己的task,这非常重要:

  • 从workflow的架构抽象看,workflow的可执行实例对象是唯一适合接管这个责任的
  • 减轻中枢调度器(Prefect flow Scheduling)的负担
  • 针对各种独特场景做决策,例如动态生成task
  • 将执行细节交给Dask等外部系统负责

最后一点很重要,尽管Airflow已经丰富地支持了多种执行器,包括本地进程执行器、Celery执行器、Dask执行器、Kubernetes执行器,调度器仍然是Airflow使用的瓶颈,调度器执行任何task都需要至少10s的额外时间,5s用来标记已经就绪,5s用来提交执行。不管外部执行系统例如Dask集群有多大,Airflow会每隔10s生成准备和执行task。

相比而言,Prefect采用了更现代化的模式。例如当Prefect在Dask上面执行的时候,可以利用Dask的毫秒级延迟task调度器来尽快运行所有task,并且能充分利用Dask集群的并行度。实际上Prefect Cloud的默认部署模式是在Kubernetes中使用可定制化的Dask集群。

除性能外,如何设计workflow影响很大。Airflow鼓励设计庞大的task,Prefect倾向设计微型的模块化task,也保留了实现大型task的能力。

另外,当在Prefect Clound运行task且数据库是自定义的,task和workflow的可执行实例负责维护更新数据库的数据状态,而不是由调度器维护。

作为对比,强中心化(管理所有设计抽象的调度业务)的Airflow调度循环使得当task启动遇到task依赖处理时,会产生不小的延迟。如果使用场景只涉及到少数几个长时间运行的task,这完全没问题,但是如果你想执行一个包含有大量的时间要求的task的DAG,这可能成为瓶颈。
Airflow将时间计划和workflow紧密耦合一起,这意味着你即使在本地运行DAG,也需要初始化数据库和调度服务。对于生产环境无可厚非,但是对于业务代码的快速迭代和测试是很麻烦的。
Airflow调度器的强中心化特性形成了系统的单点故障。
循环重新解析DAG会造成代码逻辑的严重不一致,很有可能调度器将要执行的task,发现不存在对应的DAG。
强中心化调度意味着多个task之间无法通信(没有通信依赖关系解决方案)。

数据流

它是一个陷阱。

– 《星球大战》贾尔·阿克巴上将

Airflow最常见的用途之一建立某种数据管道,但讽刺的是,Airflow并没有最好的方式实现它。

Airflow提供XCom,一个被设计用来在task之间交换少量数据的工具。如果你希望task A告诉task B大量的数据写到云端的某个可见的位置,这是鼓励的的。但当用户试图使用它建立数据管道,就会产生很多Airflow的使用bug。

XCom使用管理员权限将可执行的序列化数据写到元信息的数据库,有安全隐患。即使采用json格式,也还是有问题。这种数据没有过期时间或者有效期限制,会造成性能和存储成本问题。更重要的是,使用XComs会在Airflow调度器不知情的情况,在task之间创建严格的上下游依赖关系。如果用户不增加额外的编码定义,Airflow实际可能会用错误的顺序执行这些task。考虑以下情况:

Airflow框架是无法明确确定某个task依赖于另一个推类型task的行为。如果开发者没有通过代码明确向Airflow说明,调度器还是会无序编排运行这些task实例。即使开发者代码指定输入输出数据关系,Airflow也无法理解基于通信数据的依赖关系,而且当XCom推送数据失败,Airflow又不知道怎么响应。这是最常见的玄奥难解的Airflow bug之一。

不幸的是,常常怕什么来什么,Airflow新手过度使用XCom堵死了元信息的数据库,我们看到过有人创建了10GB规模的数据,并用XCom传递给各种task。一个DAG实例要是有10个task实例,一共会写100GB的数据到Airflow的元信息的数据库。

作为对比,Prefect将数据流抽象当做一等公民。task可以输入返回,Prefect用透明方式管理这些输入输出依赖。另外,Prefect永远不会将这些数据写入元信息的数据库,相应的,task实例的阶段结果在需要存储的时候,可以轻松配置安全的结果处理器。这样有许多好处:

  • 开发者可以使用熟悉的Python函数风格编码
  • 引擎知道依赖关系,能提供更加透明流畅的调试体验
  • 尽管Prefect允许使用数据流,并不意味着只能这么用,所以仍然支持无依赖的Airflow风格,有时候鼓励使用无数据依赖风格
  • 因为task可以交换数据,Prefect能支持更复杂的毛细分支逻辑和更丰富的task状态,并且在workflow实例中的task实例之间有更严格的执行约束(例如,一个tas不能在数据库更新下游的task的状态)

参数化workflow

很抱歉Dave,恐怕我做不到。

– 《2001太空漫游》超级电脑HAL9000

有能力处理响应不同的输入是很方便的。例如,一个workflow可能就是一系列的操作步骤,可以重复处理不同外部数据(API、数据库或者ID)的信息,这些不同的数据需要复用相同的处理逻辑。甚至,你可能想用输入参数来控制影响workflow本身。

因为Airflow DAG要按固定的调度计划来执行,并且不接收调用的输入参数,Airflow不能很好的支持更多。当然也可以突破这个限制,但是解决方案涉及到围绕着Airflow调度器持续重复解析DAG文件和动态响应Airflow Variable对象开展工作。如果业务需要利用调度程序的内部实现细节,可能是落地方案错了。

Prefect为参数化workflow提供了方便的抽象。Prefect workflow的Parameter对象是一种支持可选参数的特殊task,它有配置的默认值,在运行期调用是可覆盖的。例如,我们在Prefect Cloud部署中运行时,Parameter对象的值可以通过简单的GraphQL调用或者Prefect的Python客户端来设置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from prefect import task, Parameter, Flow


@task
def return_param(p):
return p


with Flow("parameter-example") as flow:
p = Parameter("p", default=42)
result = return_param(p)


flow.run() # uses the value 42
flow.run(p=99) # uses the value 99

这提供了许多好处:

  • 出问题时,数据的来龙去脉是透明的
  • 不需要为不同的起始参数输入创建新的workflow,只需要根据参数生成新的workflow可执行实例
  • 允许设计响应事件的workflow,workflow可执行实例是根据不同的类型内容的事件来选择执行某个分支

早些时候,我们就注意到Airflow没有同时开始运行一个DAG多个可执行实例的抽象。这有部分原因是Airflow DAG没有参数概念。当DAG无法响应输入,同时运行多个实例就没有意义。

但是,有了一等公民的参数抽象,就容易理解为什么我可能希望同时运行workflow的多个可执行实例,例如发送多封电子邮件,或者更新多个数据源,或者任何其他系列活动等workflow业务逻辑相同和起始参数不同的场景。

动态化workflow

你将需要更大的船。

– 电影《大白鲨》

除了参数化的workflow之外,经常遇到workflow内部有task的逻辑需要被重复运行几次的场景。假设task A从数据库查询所有的新客户列表,需要把里面的每一个客户ID交给另一个task做一些事务处理。在Airflow中,只有一个选择,实现一个下游task B,该task B使用客户ID列表,并循环一一处理。这种方式有以下缺点:

  • UI操作界面无法可视化task内部循环造成的动态负载,执行过程不可见
  • 如果任何一条数据记录执行失败,则整个任务失败或者被丢弃形成业务黑洞
  • 重试的话,还要考虑规模和实现幂等逻辑,因为失败中途重试,系统很难理解哪些数据已经处理过,需要跳过不处理

由于这是一种常见场景,因此Prefect将其专门抽象成称之为task mapping的功能。task mapping是指能够运行时根据上游task的输出结果来动态生成不同数量的下游task的能力。映射强大到可以在映射过的任务继续映射,从而轻松创建并行管道。而归纳收集结果就是将mapped task作为Parameter输入到non-mapped task这么简单。考虑一个简单例子,生成一个4项item的list,遍历每一项item两次做+1处理并设置item,然后计算list的item之和。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from prefect import task, Flow


@task
def create_list():
return [1, 1, 2, 3]

@task
def add_one(x):
return x + 1

@task
def get_sum(x):
return sum(x)

with Flow("simple-map") as f:
plus_one = add_one.map(create_list)
plus_two = add_one.map(plus_one)
result = get_sum(plus_two)

f.run()

该workflow运行实例明确有10个Prefect task,1个是list创建,8个是4项item X 2次+1映射处理,还有一个求和规约。任务映射提供许多好处:

  • mapping模式易于在workflow中表达
  • 每一个动态产生的task都是单独实例,意味着可以独立于其他task单独重试告警
  • workflow的每一个task实例都能触发动态生成不同数量的下游task实例
  • 作为一等公民的task mapping抽象,UI操作界面也能准确可视化映射关系

多版本workflow

为了通过编码解决工作问题,这还不够。

– 《代码整洁之道》的作者[美]Robert C. Martin

任何涉及代码的系统的都需要一个重要能力,是能够对代码进行版本控制。

回想一下在Airflow里面,DAG是中枢调度器在检测指定的DAG目录包含的Python文件,并执行代码文件来发现DAG定义。这意味着,如果你更新某个DAG,Airflow将会重新加载盲目执行,而并不会感知代码的迭代改变。如果你的DAG定义代码定期更新,这会引起一些麻烦。

  • 能够重新访问甚至运行旧的DAG的能力要求你存储旧版本代码,并且需要在Airflow生态系统中抽象出单独的实体
  • UI操作界面对版本系统一无所知,就无法可视化DAG有效的多版本信息

在实践中,团队只能借助第三方版本工具GitHub控制版本,并将版本信息附加到文件名中实现部分业务目标。这种设计实现,如果你的workflow代码变化缓慢,也不会有负担。但是,由于数据工程日益成为快节奏的科学,包括实验和频繁更新,仅仅支持部署新的代码模型参数,这种方式很快会失败。

在Prefect Cloud中,我们已将版本化数据提升为一等公民。任何workflow都是版本化数据的一部分,以轻松追踪维护历史记录。我们照常有合理的默认值。

  • 一个项目下部署一个同名的workflow,版本系统会自动追踪记录为版本数据
  • 当一个workflow被版本化,会获得递增的版本号,并且对之前的老版本workflow关闭自动调度和进行存档

如果你有更复杂的版本需求,这些都是有可配置自定义的。例如,绕开项目下workflow同名追踪规则,你可以直接指定某一个workflow是另外一个workflow的不同版本。你可以覆盖自动版本升级,来取消存档并留用旧版本(例如,用于A/B测试)。或者,你只是简单使用版本系统来维护workflow历史,而不会复杂化UI操作界面。

本地测试

可能出错的事物和不可能出错的事物之间最大的区别是,
当不可能出错的事物出错时,
通常发现根本无法调试。

– 工具书《Mostly Harmless》

因为Airflow和Prefect都是用Python编写的,因此可以按照标准的Python模式来对独立的task/operator逻辑进行单元测试。例如,在Airflow中,您可以导入DagBag提取单个DAG,并对其结构或包含的task进行各种断言测试。同样,在Prefect中,你可以轻松导入和测试workflow。此外,在Airflow和Prefect中,对于独立的task逻辑,都可以进行Python风格的单元测试。

然而,在Airflow中测试DAG要比Prefect的workflow复杂的多。这是由于以下多种原因:

  • Airflow中的DAG级别执行是由中枢调度器来控制协调的,意味着模拟数据来执行DAG也需要初始化准备数据和调度服务。将其纳入CI持续集成流程中测试对很多人来说都是个大麻烦
  • Airflow的状态类型是字符串,这增加了测试数据传递的复杂性,或者引发什么类型异常,这需要数据库查询来确定

另外,在Prefect中,workflow作为参数简单传递给FlowRunner可以直接本地执行。此外,这类接口都提供了丰富的额外参数,专门用于帮助测试流程,还贴心的包括一种手动指定上游task任意状态的方法。

例如,要确保测试逻辑适用于单个任务。因为Prefect返回上游task的结果是组合完整的状态对象(包括数据、异常状态和重试次数等信息),这样你能通过task_states关键字参数模拟传递所有的上游任务状态,进而你可以只对关心的task返回状态做断言。

UI操作界面

我选择相信。

– 电影《X档案》Fox Mulder

Airflow最受欢迎的是web界面。在UI操作界面上,你可以轻松打开关闭DAG,可视化DAG运行实例的进度,甚至可以对Airflow数据库进行查询。这是一种访问Airflow元信息的非常有用的方法。

因此从设计Prefect的第一天开始,就注重支持漂亮的实时的UI操作界面的理念,但Prefect并没有Airflow那样直接裸露数据库数据模型的操作,而是将最佳实践都制作成可视化界面,来应对用户使用过程最关心的问题:系统运行状况如何?并且如果出现问题,我如何快速定位?

Prefect UI

Prefect UI操作界面支持如下:

  • 系统概况的仪表版
  • 调度新的输入参数执行
  • 实时更新task和对应的运行状态
  • 人工干预更新状态
  • 流式日志,包括立即跳转最新错误日志的能力
  • 完整的交互式的GraphQL API接口
  • 全局搜索
  • 代理控制
  • 项目的workflow组成
  • 团队管理和权限
  • API访问Token生成
  • 密码管理
  • 全局并发限制
  • 时区(适合Airflow用户)
  • ……还有很多

UI操作界面是Prefect Cloud的一部分,它以能提供生产级workflow管理的底层结构作为支持。但是我们致力于从开源产品的云端免费套餐开始,逐渐将其完整地提供给用户。我们正在研究将元素抽象通过UI操作界面交付给开源用户的方式。

Prefect的简单安装使用

安装

1
pip install prefect

task编写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from prefect import task, Flow, Parameter


@task(log_stdout=True)
def say_hello(name):
print("Hello, {}!".format(name))


with Flow("My First Flow") as flow:
name = Parameter('name')
say_hello(name)


flow.run(name='world') # "Hello, world!"
flow.run(name='Marvin') # "Hello, Marvin!"

管理server启动

1
prefect server start

总结

如果我比其他人看的更远,是因为我站在巨人的肩上。

– 艾萨克·牛顿

Airflow普及了很多今天工程师认为理所当然的框架抽象和语义。不幸的是,随着数据科学工程的发展,它无法满足公司更多的动态业务需求。

Prefect是一种新的引擎工具,反映了从数以百计的工业用户那里收集的很多变化的需求,可用于启用定制化workflow,为各种数据应用程序提供丰富的数据抽象和接口。

如果你想了解更多信息,请查看我们的文档或访问我们的源码仓库。如果你想试用Prefect Cloud,可以注册一个免费账号。


golang插件化方案

1. 背景

业务线的活动,每一次新活动都做独立项目开发,有大量重复代码,并且浪费数据服务的连接资源;排序服务也许要经常添加业务代码,目前是停服务发布……这些场景为了开发维护效率、稳定性、安全性和性能都使用了Go语言。Go是静态编译语言,在具体的动态场景该如何实现应用级别的持续交付呢?

基于k8s,nginx网关,队列回溯消费等工具的实现也可以实现不同程度的持续交付,但是持续交付的要求越高,搭建平台和维护的成本也越高。

从应用开发本身出发,可以考虑插件化

插件使用场景特点

  1. 可以热更新式扩展应用程序的功能列表
  2. 应对多变的业务需求,方便功能上下线
  3. 对于任意的go应用,能进行增量架构、代码分发以及代码上下线

插件设计标准

  • 性能:调用插件要尽可能的快;对于任务插件,使用单独的工作空间(协程、线程、进程的池子化处理),大的、慢的、长期运行的插件,要少调用
  • 稳定性:插件依赖的发布平台要少发布,交互API的设计要做好抽象,上下文的环境变量非必须不添加,减少升级需求,甚至能支持多个实例互备热升级
  • 可靠性:如果有失效、崩溃的可能,必须有快速、简单、完整的恢复机制;业务插件的执行不能影响依赖的发布平台的守护进程或者线程的稳定
  • 安全性:应该通过代码签名之类的手段防篡改
  • 扩展性:支持插件热更新和上下线,下线需要健康检查,公共库插件至少能热加载
  • 复用性:业务插件不要太多一次性的上下线
  • 易用性:提供使用简单、功能正交的API,业务插件能够获取依赖的发布平台的上下文和调用公共库

2. Go的插件方式

动态链接库plugin,官方文档

语言本身支持,插件和主程序原生语法交互

  • 进程隔离:无,单进程
  • 主程序调用插件:一切预协定object(包括function、channel)
  • 插件感知主程序上下文:主程序预定义类型参数object(包括function、channel)
  • stream支持:单向,基于channel
  • 插件发现:主程序循环扫描插件目录并维护状态;通过第三方文件diff工具维护,例如git
  • 上线:能
  • 下线:不能
  • 更新:不能
  • 通信:进程内
  • 序列化:不需要
  • 性能:高

Go plugin判断两个插件是否相同是通过比较pluginpath实现的,如果没有指定pluginpath,则由内部的算法生成, 生成的格式为plugin/unnamed-“ + root.Package.Internal.BuildID 。这种情况下,如果两个插件的文件名不同,引用包不同,或者引用的cgo不同,则会生成不同的插件,同时加载不会有问题。但是如果两个插件的文件名相同,相关的引用包也相同,则可能生成相同的插件,即使插件内包含的方法和变量不同,实现也不同。判断插件相同,热加载不会成功,也就意味着老插件不支持覆盖更新。

最好在编译的指定pluginpath,同时方便版本跟踪。目前生产环境建议一些公共库无服务依赖的函数,例如算法库之类的。

go build -ldflags "-pluginpath=plugin/hot-$(date +%s)" -buildmode=plugin -o so/Eng.so eng/greeter.go 

通信+序列化

natefinch/pie,github仓库

  • 进程隔离:有,多进程,provider+comsumer
  • 主程序调用插件:provider模式调用插件进程中预协定method;consumer模式消费插件进程中的预协定参数object(包括function、除了channel)
  • 插件感知主程序上下文:provider模式消费主程序的预定义参数object(包括function、除了channel);consumer模式调用主程序中预定义method
  • stream支持:不支持
  • 插件发现:主程序循环扫描插件目录并维护状态;通过第三方文件diff工具维护,例如git
  • 上线:能
  • 下线:能
  • 更新:能
  • 通信:支持stdin/stdout、pipe、unix socket、tcp、http、jsonrpc
  • 序列化:gob,protobuf, json, xml
  • 性能:中/偏高

基于Go的net/rpc库,无法支持主程序和插件之间的streaming数据交互,有golang的官方包[issue1]和[issue2]直接建议。另外,每一个插件都要开一个进程,因此要注意通信序列化的性能消耗和进程管理,默认使用stdin/stdout建立连接,如下图,一个plugin和主程序之间有两条单向连接。

可以上成产环境,要做好资源管理。

plugin

hashicorp/go-plugin,github仓库

  • 进程隔离:有,多进程,server+client
  • 主程序调用插件:一切协议预协定object
  • 插件感知主程序上下文:一切协议预协定object
  • stream支持:单向和双向,基于http/2
  • 插件发现:主程序循环扫描插件目录并维护状态;通过第三方文件diff工具维护,例如git
  • 上线:能
  • 下线:能
  • 更新:能
  • 通信:支持grpc
  • 序列化:protobuf
  • 性能:中/偏高

基于Google的grpc库,按照微服务的流程定义proto文件,能通信就能互相调用。知名团队出品。可以上成产环境,要做好资源管理。

grpc

go-mangos/mangos,github仓库

  • 进程隔离:有,多进程,provider+comsumer
  • 主程序调用插件:一切预协定object
  • 插件感知主程序上下文:一切预协定object
  • stream支持:单向,基于mq
  • 插件发现:主程序循环扫描插件目录并维护状态;通过第三方文件diff工具维护,例如git
  • 上线:能
  • 下线:能
  • 更新:能
  • 通信:支持mq
  • 序列化:未知
  • 性能:中/偏高

基于消息队列协议通信,nanomsg和ZeroMQ一类的规范包含一组预定义的通信拓扑(称为“可扩展性协议”),涵盖许多不同的场景:Pair,PubSub,Bus,Survey,Pipeline和ReqRep。Mangos是该协议的golang实现,能够灵活方便支地持两个插件交流。

可以上成产环境,要走大量的基础建设开发。

mq

嵌入式脚本语言

一般都是进程内内嵌第三方语言的解释器,需要考虑解释器的工作线程资源的重复利用。

embedscript

  • 进程隔离:无,单进程,解释器有goroutine开销
  • 主程序调用插件:一切语言协定object
  • 插件感知主程序上下文:一切语言协定object
  • stream支持:看语言是否支持channel互通
  • 插件发现:主程序循环扫描插件目录并维护状态;通过第三方文件diff工具维护,例如git
  • 上线:能
  • 下线:能
  • 更新:能
  • 通信:无
  • 序列化:无
  • 性能:中

go-like脚本语言,agora七牛qlang

agora和qlang都是go语法的动态脚本语言,都好几年没维护了,建议不要用在生产环境,其中Qlang还有用户提[issue]觉得不稳定。

其他脚本语言,js-ottogo-lua5.1go-lua5.2

otta支持目前受欢迎的js语法,star比较多,协定了大部分go原生支持的类型,不包括channel和goroutine,没有提供解释器的工作空间池子化管理,需要开发者使用goroutine和解释器的interrupt接口自行实现,但是从issue和TODO来看,也不适合生产环境。

gopher-lua支持lua5.1语法,和go交互的object类型比较完备,协定了大部分go原生支持的类型,包括channel和goroutine,有提供解释器的工作空间池子化管理,可以上生产环境。

go-lua支持lua5.2语法,目前不建议上生产环境。

3. 思考

  1. 主程序需要怎样设计才能给业务插件预定义完美的上下文呢?例如线程池、redis连接池、mysql连接池、rocketmq、外部服务依赖等等
  2. 公共库插件和业务插件是否适合不同的插件方式?公共库插件方便为业务插件增加提供上下文吗?

人生半坡 想上就努力上

忽然朋友圈刷满了那些年,18岁的天空,很蓝,云朵,很闲。我知道2017的最后一天,大家在虚拟的空间里集合,是要也是该缅怀点什么了,所以一批人生半坡的中年男在朋友圈里回到了18岁,其中一个就是我,但是我已经不认识18的他们了,甚至不认识18岁的自己。

一恍惚,18岁过去了,大学毕业已经5年了。再回头看看,我只有2013年毕业写过一篇一周年总结2013一周年总结,4年“嗖”的一下就过去了。

2014年,我在一家做酒店在线旅游的公司上班,叫快捷酒店管家,那是我呆的第一家创业公司,十几个小伙伴让我觉得很新鲜,leader也很提携后生,在这里有很多事做,我一直忙着写业务,才发现自己的技术基础很薄弱,就算看了很多文章,做具体东西时,还是有点茫然,加上学习贪多又没有形成舒适的节奏,其实做的蛮辛苦,但是当时团队有很多牛逼的资源,所以我还是想留下努力继续试试。无奈后来因为工作环境和人事调整各种原因,我还是选择了离开。啊,这里还有一段失败的不能称之为恋情的感情,还有,这里的同事都是生活的高手,叫我理财,帮我找女朋友,挺有意思的,反而,我觉得我是个没意思的人,而且对生活常识有点失去了信心。这里一方面因为个人的素质偏差,一方面因为工作环境的变化,导致我应该及早抽身离开,却贪恋一些东西而没有及早离开,是一种小遗憾。我在很多时候,还是会怀念一下那一段不一样的日子。

2015年,我离开了颇为眷恋又不想呆的第一家创业公司,在同学ZYQ和XJQ配合准备下,来到了北京,又是一家小的创业公司,叫光点图灵,人不多,不过我已经没有了当年初进创业公司的新鲜感了,同事也都很好,老板一张娃娃脸,人也很nice,最爽的是酒喝得少了。平时我就忙着做事,而且我跟俩同学是有自己的小的创业计划的,虽然最后没成,那又怎样,我们付出了。在和同学的创业过程中,我发现自己的节奏也不是很好,原因还是整体素质和心态不到位。

2016年,我离开了前一家创业公司,因为这家创业公司搬到武汉去了,不过就算不搬,我也要离开了。我选了一家叫赤子城的做海外直播的公司,去之前,拿过果壳和小米的offer,但是这里给很多期权,薪水也不少,我也在一些统计数据网站调查了,公司的数据不错,都D轮了。来干了一年半载,只能说有我喜欢的,也有我不喜欢的地方,而且我现在的想法是做好工作,拿到想要的待遇,没有其他任何想法。

然而2017年底,因为种种原因,我又离开了,我离开了北京,回到武汉斗鱼,有时候默默念叨着“Don’t cry for me, Beijing, I will be back, maybe in a winter.”算是勉励自己吧。在武汉也挺好的,今晚我见到了小时候一起在河里摸鱼儿的玩伴,他也是个成熟稳重的小伙子了,感觉很温暖。

这几年的工作,我还是在逐步上升,越来越相信自己了,总结在以下几点:

产品素质提升,曾经工作的时候,同事们聊业务我很被动,现在我能主导控制了。之所以有这种变化,是因为第一份工作的leader灌输了很多提高产品嗅觉的方法论,不知不觉用上了;而且俩同学的产品素质都很好,经常聊,慢慢长进了;还有一个重要原因是,生活消费慢慢打开买买买,会让人了解主动分析更多的生活逻辑,这是产品逻辑的根本来源。

专业技能提升,这个我觉得,一方面是很多以前不太注意的基础理论,例如操作系统、数据库、网络原理、算法数据结构、分布式,都在恶补,一遍看不懂,再看一遍,再看不懂,看三遍…..直到有自己的理解为止,理论懂的多了,更容易把握项目整体,而且能控制变化;另一方面经历的项目变多变丰富,做事情效率提高很多,也就有更多时间去思考和学习。

生活能力提升,还是要提的,毕竟自己是一个山里娃,曾经不敢消费,节约的观念和家境的忧虑根深蒂固,导致做事畏首畏尾,自己也过得不舒坦,别人也不乐意交往。压抑一些正常的消费行为,也让人对社会生活缺少信心,总之状态差,精神面貌也不好。现在随着朋友和同学的消费理财引导下,我也开始建立正常的消费观念,让生活舒适一些,这一点,很感激他们,虽然现在还没达到理想状态,但是在不断变好。

虽然生活工作爱情还没有满意,而人生已经半坡,但是我一定会握紧泡满枸杞的保温杯,想上就努力上。

具体的执行包括以下几点:

  • 创业,是我永远不会停下的想法,就算最后创业不成功。技术创业,俩同学是我最好的搭档;如果是农业养殖的话,不知道找谁好;如果是搞文化,也有很适合的同学…..总之,不愿做工作的人们,前进,前进,进

  • 产品素质,通过多观察生活,理解人生,使用各种新技术产品,和不同的人多聊聊,来提升自己的产品理解能力和构建能力

  • 专业技能,一是要通过看技术文章和书籍巩固基础理论知识,多动手写代码实践理解;二是要抓热点,吸纳新的知识,紧跟潮流,把控未来

  • 生活能力,多找朋友聊天,多找姑娘聊天,多找亲人聊天,强力刷出自己的存在感,早点定个姑娘,安顿生活

  • 我的文字爱好,照常继续

  • 该做减法了,不擅长或者不喜欢的东西不碰了,要时常自我反省,自娱自乐

我觉得和同学技术创业三人组很像海贼王里的一些角色,哈哈哈。
three

今天是28岁的最后一天,我只想过好28岁,18岁的一切,记得就记得,忘了就忘了,没有啥好强求的,毕竟脑容量有限,遗忘、辞旧才能迎新,至于记得的那部分就是一部分你。

29岁,我告诉你,我不是一个油腻的中年男,也不是一个成功的中年男,但我是一个努力的男人。

go的context

context

包(context)类型是包装了channel通信来关联goroutine服务之间的控制机制,支持树状的上级控制一个或者多个下级,不支持反向控制和平级控制,同理参数的共享是树状的传递流动方向,也可以用来管理有超时依赖的函数,以下代码实例里面,存在forever for循环的才算服务。

参数共享特性

  • context的参数存储不是map,是struct的key和val两个属性,对外暴露操作接口
  • context的参数由父goroutine初始化,子goroutine调用
  • 父context会嵌入子context,同名参数相当于有多个版本
  • 子context获取参数会按照embed的向上查找机制获得多层父级context的参数,同名参数优先使用子context的参数版本
  • 子context初始化非同名参数,相当于增加新参数
  • 子context初始化同名参数,相当于增加一个参数版本
  • context传递的参数注意选择必要的,简单的,size小的
  • context因为只读所以是gorountine安全的,可以多个子gorountine服务共享同一个context的父控制

控制特性

  • 父gorountine服务通过context控制子gorountine服务,由struct里面的空channel实现,父gorountine服务close空channel,子gorountine服务的select监听并可能触发关闭事件,结束生命周期
  • 父gorountine服务达到某种状态,传递信号,结束子gorountine服务
  • 父gorountine服务初始化启动子gorountine服务时,传入截止时刻或者超时时间
  • 父gorountine结束,则所有的子gorountine服务结束
  • 以上三条规则共同决定一个gorountine服务的生命周期

使用

1
2
3
4
5
6
type Context interface {
Done() <-chan struct{} //控制生命周期的空channel
Err() error //获取错误码
Deadline() (deadline time.Time, ok bool) //获取截止时间
Value(key interface{}) interface{} //传递参数
}

子goroutine服务的关联的context是通过WithCancel, WithDeadline, WithTimeout, WithValue复制父goroutine服务的内容,派生出新的context。CancelFunc可以取消所有的后代,移除父goroutine服务在子goroutine服务的关联,并停止所有关联timer。

1
2
3
4
5
6
7
8
9
10
11
package main
import "context"

func tree() {
ctx1 := context.Background()
ctx2, _ := context.WithCancel(ctx1)
ctx3, _ := context.WithTimeout(ctx2, time.Second * 5)
ctx4, _ := context.WithTimeout(ctx3, time.Second * 3)
ctx5, _ := context.WithTimeout(ctx3, time.Second * 6)
ctx6 := context.WithValue(ctx5, "userID", 12)
}

以上代码的Context链下图:
ctx
3秒超时:ctx4超时退出
ctx
5秒超时:ctx3超时退出,导致子节点ctx5和ctx6都退出
ctx
这里只是形象的表示了多级context之间形成的控制关系,实际业务里面ctx5,ctx6所在的goroutine如果在执行业务是不会立即退出的,只有当goroutine在等待IO事件等待数据的时候才会由select响应结束的case。

** Background和TODO **
都是返回一样的无控制条件无参数的empty Context,background在主服务里面通常作为匿名参数生成可控的context变量;todo可能会用于程序兼容处理,当空桩,做空调用

** WithCancel **
传入父Context,返回可控子Context,并且返回结束调用子context的goroutine的控制函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
package main
import "context"

func main() {
gen := func(ctx context.Context) <-chan int {
dst := make(chan int)
n := 1
go func() {
for { //for循环是goroutine 函数作为服务的标志
select {
case <-ctx.Done():
return
case dst <- n: //这里堵塞有可能会成为函数的超时依赖
n++
}
}
}()
return dst
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel() //一定要手动加取消操作

for n := range gen(ctx) {
println(n)
if n == 5 {
break
}
}
}

** WithDeadline **
传入父Context和截止期限,返回包含截止期限的可控子Context,并且返回结束调用子context的goroutine的控制函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
package main

import "context"
import "time"

func main() {
d := time.Now().Add(50 * time.Millisecond)
ctx, cancel := context.WithDeadline(context.Background(), d)
defer cancel()
select {
case <-time.After(1 * time.Second):
println("overslept")
case <-ctx.Done():
println(ctx.Err())
}
}

** WithTimeout **
传入父Context和超时时间,返回包含超时时间的可控子Context,并且返回结束调用子context的goroutine的控制函数

1
//func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)

** WithValue **
传入父Context和参数key-val,返回包含参数的子Context,如果父context是background,那么子context不可控,否则可控

  • 在服务里面context.Background(),不适合作为有变量接收的WithValue的参数,可以是ctx := context.WithCancel(context.WithValue(context.Background(), k, “Go”))
  • context的key是包内的全局变量,不可导出,避免冲突
  • context的key的数据类型必须支持==和!=,类似于map的key,包括bool,number,string,pointer,channel,interface和包含上述类型的array,struct。不支持slice,map,function或者包含它们的类型。
  • 存储的数据最好是type-safe
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//func WithValue(parent Context, key, val interface{}) Context
package main

import "context"

func main() {
type favContextKey string

f := func(ctx context.Context, k favContextKey) {
if v := ctx.Value(k); v != nil {
println("found value:", v)
return
}
println("key not found:", k)
}

k := favContextKey("language")
ctx := context.WithValue(context.Background(), k, "Go")

f(ctx, k)
f(ctx, favContextKey("color"))

}

go vet tool可以检查CancelFuncs的使用情况。context参数不要包裹在struct等一些复杂的数据结构里面,context参数位置放在函数的第一个变量,context参数不要传递nil。

问题

如何实现平级控制,一个子goroutine出现问题同时取消共享父控制的其他子goroutine,参见golang.org/x/sync/errgroup

To avoid allocating when assigning to an interface{}, context keys often have concrete type struct{}. Alternatively, exported context key variables’ static type should be a pointer or interface. 这是什么意思?

pilosa分布式位图数据库(1)

Pilosa

  • 能够轻松横向扩展,并且保证速度的位图数据库,支持数据版本的粗粒度的时间序列(最小粒度到小时)
  • 适合高频次的流数据处理,使用场景例如,如果你的业务主体数据达到10亿量级,并且该数据的附加属性数量达到百万级别,当你需要实时的筛选符合各种属性组合条件的业务主体数据,Pilosa会是个好帮手
  • Pilosa的数据存储不像现有的关系型数据mysql、oracle的行式存储(一行是一个业务实体,一列是同一种数据实体同一个属性),包括index,column、frame、row、cell,是列式存储
  • 一个index类似一张表,数据查询不能跨index
  • 一个column表示一个业务实体,有业务意义,包括多个frame一行row的组合,从平面形象上看是纵向的
  • 一个frame,表示一个业务实体的属性,有业务意义,是row的集合,从平面形象上看是横向的
  • 一个row表示一个属性的一行数据存储,是cell的集合,从平面形象上看是横向的
  • 一个cell就是0/1,是数据存储的逻辑最小单元

安装

MacOS

1
(workspace) ➜  brew install pilosa

二进制包

1
2
3
(workspace) ➜  curl -L -O https://github.com/pilosa/pilosa/releases/download/v0.4.0/pilosa-v0.4.0-darwin-amd64.tar.gz
(workspace) ➜ tar xfz pilosa-v0.4.0-darwin-amd64.tar.gz
(workspace) ➜ cp -i pilosa-v0.4.0-darwin-amd64/pilosa /usr/local/bin

go源码

1
2
3
(workspace) ➜  go get -d github.com/pilosa/pilosa
(workspace) ➜ cd $GOPATH/src/github.com/pilosa/pilosa
(workspace) ➜ make install

docker

1
2
(workspace) ➜  docker pull pilosa/pilosa:latest
(workspace) ➜ docker run --rm pilosa/pilosa:latest help

使用

业务场景

Star Trace是一个跟踪github开源项目的关注情况的业务,有1000条最近有更新并且名称包含go的项目数据,数据字段包括编程语言,标签,项目的关注者。

启server

1
2
(workspace) ➜  pilosa --help
(workspace) ➜ pilosa server

client操作

  • 建表,column是repository,frame是stargazer和language,构成了业务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
(workspace) ➜  curl localhost:10101/status
(workspace) ➜ curl localhost:10101/schema
(workspace) ➜ curl localhost:10101/index/repository \
-X POST \
-d '{"options": {"columnLabel": "repo_id"}}'
(workspace) ➜ curl localhost:10101/index/repository/frame/stargazer \
-X POST \
-d '{"options": {"rowLabel": "stargazer_id",
"timeQuantum": "YMD",
"inverseEnabled": true}}'
(workspace) ➜ curl localhost:10101/index/repository/frame/language \
-X POST \
-d '{"options": {"rowLabel": "language_id",
"inverseEnabled": true}}'
  • 数据导入
1
2
3
4
(workspace) ➜  curl -O https://raw.githubusercontent.com/pilosa/getting-started/master/stargazer.csv
(workspace) ➜ curl -O https://raw.githubusercontent.com/pilosa/getting-started/master/language.csv
(workspace) ➜ pilosa import -i repository -f stargazer stargazer.csv
(workspace) ➜ pilosa import -i repository -f language language.csv
  • 查询操作,查询的结果都是列id

查14号用户关注的repository

1
2
3
(workspace) ➜  curl localhost:10101/index/repository/query \
-X POST \
-d 'Bitmap(frame="stargazer", stargazer_id=14)'

查编程语言是5的repository

1
2
3
(workspace) ➜  curl localhost:10101/index/repository/query \
-X POST \
-d 'TopN(frame="language", n=5)'

查14号用户和19号用户的关注的repository交集

1
2
3
(workspace) ➜  curl localhost:10101/index/repository/query \
-X POST \
-d 'Intersect(Bitmap(frame="stargazer", stargazer_id=14), Bitmap(frame="stargazer", stargazer_id=19))'

查14号用户和19号用户的关注的repository并集

1
2
3
(workspace) ➜  curl localhost:10101/index/repository/query \
-X POST \
-d 'Union(Bitmap(frame="stargazer", stargazer_id=14), Bitmap(frame="stargazer", stargazer_id=19))'

查14号用户和19号用户的共同关注的并且语言是1的repository

1
2
3
(workspace) ➜  curl localhost:10101/index/repository/query \
-X POST \
-d 'Intersect(Bitmap(frame="stargazer", stargazer_id=14), Bitmap(frame="stargazer", stargazer_id=19), Bitmap(frame="language", language_id=1))'

在frame为stargazer里面,加一行关注者为99999数据,列上repository为77777的cell为1

1
2
3
(workspace) ➜  curl localhost:10101/index/repository/query \
-X POST \
-d 'SetBit(frame="stargazer", repo_id=77777, stargazer_id=99999)'

经过基本的使用,我觉得pilosa比传统的关系型数据库更侧重于关系,而通过列式存储的架构,方便了大数据的实时聚合计算,所以pilosa是为了在某些场景替代传统关系型数据库,对于文档数据库mongo和嵌入式数据库没有影响。在一些业务庞大的公司里面应该是可以考虑引入的。如果要继续了解pilosa,请阅读pilosa分布式位图数据库(2)

pilosa分布式位图数据库(2)

几种数据结构

Pilosa的元逻辑存储结构是一个bool矩阵,每一个cell的0/1表示行列是否存在关系。行和列可以表示任何东西,甚至可以表示相同的事物(这一点对基于pilosa思维优化查询和存储非常重要)。

基本的逻辑数据结构决定了业务的架构和封装模式。Pilosa的数据结构比文档数据库和传统关系数据库都多,是否能满足各种业务设计呢?是否满足就适合用呢?

Index

类似于mysql的table,mongo的collection,表示一个业务数据的命名空间,无法对两个以上的index做联合查询。

Column

column用ID表示,是顺序递增的整数,并且要赋予业务意义,是同一个index下面的所有frame共用,在index的作用域里是唯一的。

Row

row用ID表示,是顺序递增的整数,并且要赋予业务意义,在frame的作用域里是唯一的。

Frame

pilosa
frame其实是对应传统关系型数据库中的每一列的属性,是pilosa的row的高维结构,由多row组成,从平面形象上看,将数据表格上多行row的整体看做一个frame,这样划分可以对0/1的二元表示意义进行扩展,满足业务多状态的需求

  • ranked cache of a frame
    pilosa
    有序frame是通过行ID维护列计数的高速缓存,该缓存便于TopN查询,缓存大小默认为50,000

  • LRU cache of a frame
    pilosa
    LRU缓存维护frame最近访问的行

Slice 与分布式有关

切片是列组,是pilosa的column的高维结构,包含多个column,每个切片包含固定数量的列,有SliceWidth。和column的关系,从平面形象上看类似于frame和row的关系,但是frame是有业务意义的,slice是为了分布式设计,通过预设宽度划分column成切片,用一致性哈希算法将切片分布到各cluster

Attribute

在传统关系数据库里面关系表的数据,分为两种,一种是纯粹的关系,只存储ID,通过增删或者update status来表示关系的状态;另外一种关系同时已经是一种新的业务实体了,除了ID关系,还有业务属性。
第二种情况的附加业务属性就需要Attribute来支撑。

Time Quantums

time quantums在frame上设置支持时间序列,会产生额外的数据冗余,这允许范围查询缩小到指定的时间间隔。 例如 - 如果时间量被设置为YMD,则支持范围查询到一天的粒度。

如果帧具有时间量化,则为每个定义的时间段生成视图。 例如,对于时间量级为YMD的帧,以下SetBit()查询将导致下图中描述的数据:

View

视图表示frame的数据物理布局存在的可能情况

  • Standard view of frame

标准视图的行列是输入的行列对应的,也是index中一定存在的一个布局

  • Inverse view of frame
    pilosa
    反向视图的行列是输入的行列颠倒的,即输入的行对应frame的列,输入的列对应frame的行,是index中可选的一个布局,会和标准视图及其他视图并存,在行列的业务意义是相同的时候,比较有意义,例如行列都是user_id

  • Time Quantums of frame
    pilosa
    时间序列数据产生的多个布局,时间单位最小到小时,小时的布局包括年、月、日、时四个布局

几种业务案例

(未完待续……)

go 无数据抽象设计

interface是golang的抽象设计的根基,是方法集合的接口,是一个非常强大的并且规范的指针,可以引用任意实现了该接口的方法集合的struct,不能定义属性,意味着在抽象设计里是不允许有数据的,使语言的编译运行管理更纯粹方便。

一切属性都是setter/getter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package main

type Surface struct{
skin string
}

func (s *Surface) Skin() string{
return "My skin is " + s.skin
}

type Men interface {
GetSurface() Surface
}

type European struct {
surface Surface
}

func (p *European) GetSurface() Surface{
return p.surface
}

type African struct {
surface Surface
}

func (p *African) GetSurface() Surface{
return p.surface
}

func Introduce(men Men){
s := men.GetSurface()
println(s.Skin())
}

func main() {
var s Surface
s = Surface{"white"}
e := European{s}
Introduce(&e)
s = Surface{"black"}
a := African{s}
Introduce(&a)
}

上例中的多态,通过setter/getter变相完成了property在interface定义,用方法多态来实现属性多态。

父类方法A调用子类方法B

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package main

type Men interface {
Age() int
}

type Parent struct {
Men //隐式
m Men //显式
}

func (p *Parent) Age() int{
return 80
}

func (p *Parent) Display1() {
println(p.Men.Age())
}

func (p *Parent) Display2() {
println(p.m.Age())
}

type Child struct {
Parent
}

func (c *Child) Age() int{
return 40
}

func main() {
var child *Child
var parent *Parent

parent = &Parent{parent, nil}
child = &Child{Parent: *parent}
println("====parent")
child.Display1()

parent = &Parent{child, nil}
child = &Child{Parent: *parent}
println("====child")
child.Display1()

child.m = parent
println("====parent")
child.Display2()

child.m = child
println("====child")
child.Display2()
}

上例中的实现了在父类的A方法中调用父类或者子类的B方法,类似于抽象方法。因为go是静态语言,没有动态语言强大的查找机制,也没有虚函数(virtural method)和抽象方法(abstract method)之类的辅助高级别的多态。

幸好,golang有指针,即interface,又名接口,通过在父类定义接口类型的属性,并在实例化的时候动态绑定该属性的真正类型,来完成父类方法中一些多态方法的灵活调用,增加了公共部分的复用性。

Airflow的基本搭建

airflow是一个基于celery任务DAG管理工具,包括

  • manage system
  • worker
  • scheduler

支持redis、rabbitmq、mongo、mysql、SQLite、postgresql作为queue,支持同时使用flower作为可视化任务的管理后台。
管理平台的数据落地支持mongo、mysql、SQLite、postgresql.

安装依赖

1
2
3
4
5
6
[lavenderuni@~]# pip install airflow
pip install "airflow[crypto, password]"
pip install "airflow[mysql]"
pip install "airflow[hive]"
pip install "airflow[celery]"
pip install "airflow[rabbitmq]"

初始化管理平台的数据结构到mysql

1
2
3
4
5
6
7
8
create database airflow;
grant all privileges on airflow.* to 'ct'@'localhost' identified by '152108';
flush privileges;
select * from mysql.user;
airflow initdb # 任意目录下执行
cd ~/airflow/
vi airflow.cfg # 替换sql_alchemy_conn = mysql://ct:152108@localhost/airflow
airflow initdb # 再次执行

初始化数据

1
2
3
4
5
6
cd ~/airflow/
vi airflow.cfg # 替换broker_url = redis://127.0.0.1:6379/15
和 celery_result_backend = redis://127.0.0.1:6379/16

airflow webserver --debug &
airflow scheduler

设置执行任务的broker即queue

1
2
3
4
5
6
cd ~/airflow/
vi airflow.cfg # 替换broker_url = redis://127.0.0.1:6379/15
和 celery_result_backend = redis://127.0.0.1:6379/16

airflow webserver --debug &
airflow scheduler

启动三大组件

1
2
3
airflow webserver --debug & #启动管理平台,访问http://127.0.0.1:8080/
airflow worker #启动工作者
airflow scheduler #启动任务调度分发器

管理平台如下图所示
airflow

airflow的dag文件例子参照testlab的airflow/dags