《Disigning Data-Intensive Applications》 Part II(1)

前言

做分布式系统可以获得很多好处:

  • 可扩展性,通过sharding/replica等可以扩展数据量、访问量的
  • 错误容忍/高可用性,可以容忍单点故障
  • 延时,通过一写多读等方案可以降低一些跨地域的延时

Scaling to Higher Load

通过增强单机性能的vertical scaling/scaling up方式来扩展服务是一个直观的方法。在一台单机里面搞定所有事情的方法又可以称之为shared-memory architecture。虽然单机内很多组件是可以热更换,能够满足一些错误容忍的要求,但是单机还是只能部署在一个物理位置,不能够获得延时上的收益。

还有一种架构叫shared-disk architecture,在机器之间通过高速网络共享磁盘阵列,但是竞争和锁争用会限制这种架构的扩展性。

Shared-Nothing Architectures

又被称为horizontal scaling/scaling out,这种架构里面有很多对硬件要求不高的节点。这种架构是很美的,搞分布式系统的同学都很喜欢这种架构,但是这种架构有3宗罪:给系统带来了极大的复杂性,需要考虑分片的分布、一致性处理;会限制使用的数据模型的表示,单机上能work的语法分片了可能就搞不定了;性能上可能还不如同等配置的单机。这是一股Anti-Distributed的思潮啊,很危险,嗯嗯。

Replication Versus Partitioning

副本和分片,这是分布式系统里两个最基本的功能了。有些系统把副本和分片在进程级别就区分开了,比如Mysql/Mongo/Redis等;有些则并没有,比如Tair/Kudu

这一部分是DDIA的重点,一共分为:

  • 复制
  • 分区
  • 事务
  • 分布式系统的问题
  • 一致性和共识

这一部分内容比较多,需要多分几篇。

复制

复制可以带来如下的一些好处:

  • 可以把数据复制到离业务访问更近的地方,比如读写分离的架构,在AB两地都有部署,把A的数据复制到B,就可以B地部署的业务访问更快
  • 提高系统的容灾能力
  • scale out系统的读性能

这些和之前列举的分布式系统的好处基本上是一致的。复制系统的困难在于需要把数据的变化同步到其它副本。这里讨论了3种方案:single-leadermulti-leaderleaderless

Leaders and Followers

在有多个副本时,我们会面临一个问题:怎么保证数据同步到了所有的节点上。最通用的解决方案是使用leader-based replication,也可以称之为active/passive或者是master–slave replication。

这种模式里面,只有leader可以接受写请求,然后把replication log或者change stream同步到其它follower上。请求可以由leader,也可以由follower来响应。

Synchronous Versus Asynchronous Replication

这是有个名词上的差异,作者提到了,semi-synchronous,一般是这样理解的:同步复制是需要follower也写到持久化返回,再返回给客户端;异步复制是leader写到持久化就返回;半同步复制是写到follower的一个缓存里面(比如MySQL的relay log)之后返回。

从作者的书里面描述来看,如果保持一个followerleader是同步复制,其它的followersleader是异步复制,那么这个称之为semi-synchronous,这个从引用看来是出自facebook的这篇博客,但是这里面也是在说MySQL的配置,可能MySQL通常会配置某一个slave是半同步复制,其它都是异步复制。

这里作者还提到了微软的chain replication,不过这个之前没有了解过。

Setting Up New Followers

创建新的follower需要让数据和leader上保持一致,一般的操作时:创建当前数据的快照;把快照拷贝到follower;然后follower会向leader请求获取快照创建之后的数据更改,那么这就要求快照需要和replication log中的位置准确地对应上(这个位置在PostgreSQL里面称为log sequence number,在MySQL中称为binlog coordinates);如果follower追上了leader上的数据更改(catch up),就可以把这些更改应用到自己的本地了。

Handling Node Outages

处理节点失效是非常高频发生的操作,有些失效是由于错误发生,有些是运维的需求进行重启,这时候需要让整个系统的downtime越小越好。

对于Follower的失效是比较好处理的,follower被重新启动之后,从自己之前处理过的位置把之后的数据更改拉取过来。

leader挂掉之后会麻烦一点,因为涉及到重新选主,然后客户端需要探测到这个变化,然后把写请求都转发给leader(当然,也是可以由follower转发给新的leader)。这个过程称之为failoverfailover可以是手动的,也可以是自动的,对于自动的流程:

  • 探测到主挂掉了。这个探测就很容易出问题,线上实际遇到的就包括:网络抖、容器抖、内核抖、磁盘抖等,一般来说都是通过心跳来进行判活的。
  • 选主。探测到主挂掉了之后就到了选主的时候了,这时候就需要选举的算法,这个后面会讲到。
  • 重新配置系统让访问新的leader。前面这些发生之后,客户端需要重新配置访问到新的leader

failover可能会存在一些问题的,即使是一些已经非常成熟的系统:

  • 如果异步复现,那么有些客户端已经收到确认的修改,在failover之后就可以会丢失
  • 如果有其它系统(比如说缓存)需要跟数据库中的内容有交互时,上面的Discarding writes的过程就会导致系统的最终不一致,还可能会有安全上的问题,这个在实际情况下也发生过
  • 可能会发生split brain(脑裂)的情况,如果两个leader都可以写入,而且没有冲突检测,数据就会有一致性的问题
  • 判活的时间间隔,有时候failover是有可能让系统无法恢复的

Implementation of Replication Logs

刚才讲到了复制的时候,是会依赖到Replication Logs的,下面讲一下Replication Logs的实现。

Statement-based replication

这种复制日志就是把客户端成本执行的请求都记录下面,MySQLSBRRedisreplication log都是这种类型的复制日志。

这种可能会有一些问题,比如说:

  • 依赖一些运行时的状态,比如说SQL中使用了NOW(),到了follower执行这个statement的时候,运行时的状态可能就不一样。
  • 依赖运行的数据。
  • 会有Side effect的语句

MySQL支持SBRVoltDB的复制也是基于Statement的。

Write-ahead log (WAL) shipping

对于前面讨论过的不论是LSM还是基于BTree的引擎而言,都会把修改先写入到一个log当中去的,所以我们可以通过复制WAL来达到复制的效果。这些都是针对于存储引擎的更改,所以是非常确定的。在PostgreSQLOracle中都是这样做的。

但是这种方式有自己的问题,复制日志和存储引擎耦合得太紧密了,如果存储引擎需要升级,那么可以会比较麻烦,要么存储引擎能够做到向前兼容。

Logical (row-based) log replication

An alternative is to use different log formats for replication and for the storage
engine, which allows the replication log to be decoupled from the storage engine
internals. This kind of replication log is called a logical log, to distinguish it from the
storage engine’s (physical) data representation.

使用RBRMySQL就是以这种方式来记录binlog,但是没有直观上的理解,不理解和前面基于WAL的方式具体有什么区别。这个方式明显是作者比较倾向的方式,赞美之词溢于言表。对于关系型数据库,基于行的复制日志:

  • 对于插入,日志需要包含所有列的新值
  • 对于删除,需要唯一地标志要删除的行,那么这时候是需要主键的。如果没有主键,那么删除的行的所有列的旧值都应该被记录下来
  • 对于更新,需要唯一地标志要更新的行,和所有需要更新的列的值

这种方式的好处有二:和存储引擎解耦了,即使以后不同的节点上run不同的存储引擎,也是可以work的;和一些外部系统的整合来说,会更容易,比如说change data capture的系统。

Trigger-based replication

前面的几种都是数据库系统里面已经使用的方式,后面这种是应用上会使用的。比如,如果只想复制数据的一个子集(比如说某些行的数据),或者是想执行一些冲突检测的逻辑(这个不太好理解)。

Redis里面的event notification感觉有些像这个,写入了之后可以触发一个事件,外部系统接收到了之后进行一些逻辑的处理。

Problems with Replication Lag

讲到了复制,就需要讲一下复制延迟。通过增加follower来提高读性能的read-scaling架构中,如果使用全同步的复制方式是不现实的,任何一个follower的抖动都会让写入失败。

对于异步复制,follower的数据可能是脏的,但是这只是暂时的,如果写入停止了,那么follower上的数据会最终和leader上是一样的,这个称之为eventual consistencyeventual这个词是故意弄得很含糊的,因为replication lag可能是几毫秒,也可以是几秒或者数分钟。下面会介绍几种处理replication lag的方法:

Reading Your Own Writes

用户需要读到自己写入的数据,这被称为read-after-write consistency,又被称为read-your-writes consistency。有以下一些方法可以满足这种约束:

  • 在读数据的时候可以判断读的数据有没有被我自己写过,如果写过,就从Leader读,否则就可以从Follower读,这种是基于数据的策略
  • 如果客户端会写入大部分的数据,那么上面这种方法可能就不太管用了,因为会导致很多读取都到Leader了,这种情况下可以记录上次更新的时间,如果距离上次更新时间超过一定阈值,就可以从Follower读取了。同时也可以监控多个Followerreplication lag
  • 客户端可以记住上次更新的时间戳,然后和请求一向发送到某一个副本,如果这个副本最近的更新记录是早于这个时间戳的,说明客户端请求的数据还没有复制过来,那么这个时候可以转发或者是等(这个和foundationdb里面采用的方法比较像)

如果应用会从多个设备来访问,问题会变得更加复杂一些:

In this case you may want to provide cross-device read-after-write consistency: if the user enters some information on one device and then views it on another device, they should see the information they just entered.

有几点需要考虑的:用户最近访问的时间戳不能只保存在某个设备上了,而需要存储在云端;需要把所有设备的请求都转发到同一个数据中心。

Monotonic Reads

如上图所示,User 2345并没有更改过数据,所以Reading Your Own Writes对它无效,但是它出现第一次读到数据,第二次读不到的情况。这种一致性可以通过约束某一个用户只能从一个Replica上读来实现。

Consistent Prefix Reads

这一部分会关心有因果关系的数据之间的一致性。

如上图所示,明明是Mr. Poons先说话,但是观察者却先看到Mrs. Cake的话。这种问题会发生在有分区的数据库中,因为在不同的分区之后,数据应用的顺序可能是不一样的。

这个方法取名叫Consistent Prefix Reads,原因是数据库不同的分区在应用的时候,一定需要前面有相同的prefix,或者说seq需要是连续的,这个时候就可以避免上面的问题。但是对于很多数据,不同的分区是独立运行的,所以没有全局的顺序保证的。

一种方法是让有因果关系的操作写入到同一分区,但是在实际系统中这样实现不一定会有效率。

Solutions for Replication Lag

上面的一些解决Replication Lag问题的方法都是case by case的,最好是提供一种API来让业务不感知这些底层的复制和分区的细节,这就是transaction。单点的Transaction已经存在很久了,但是到了分布式系统,很多系统就怂了,直接说不支持,还断言 在这种系统里面只能够支持最终一致性。是这样的么?

Multi-Leader Replication

前面讨论的都是Single Leader的复制方式,虽然这是最常见的方式,但还是有一些其它的方式,比如下面要介绍的multi-leader,也可以被称为master–master或者active/active replication

Multi-Leader Replication的应用场景

Multi-datacenter operation

当有多个datacenter的时候,就需要有多个Leader,原因如下:性能,写入到本地的Leader,跨datacenter写入会有问题;容忍datacenter失效;容忍网络延迟。

很多数据库都是支持multi-leader的,比如说Tair Ldbremote sync,当然,也有很多是依赖于外部工具的,比如说Tair RdbreplicatorMySQLTungsten Replicator。但是multi-leader需要解决写入冲突的问题。

其实这跟通常理解的multi-leader已经不太一样了,这在我们的概念里面,已经是两个不同的集群了,multi-leader通常认为是同一个集群的。

Clients with offline operation

有离线操作的客户端也是需要multi-leader的复制的。如果说前面Multi-datacenter已经和通常理解的multi-leader不一样了,这个就更加风马牛不相及了。

不过这样来理解,每个device都是一个集群,云上的数据也是一个集群,这么多个集群需要相互做数据同步。

Collaborative editing

协作编辑其实和前面的也很像,假设每个用户所操作的数据集都是一个集群,也是需要相互做数据同步的。在协作编辑时,如果一个用户开始操作就把文档锁住,那么问题就会退化成single-leader

Handling Write Conflicts

对于multi-leader的系统,最重要的工作就是处理写冲突。

Synchronous versus asynchronous conflict detection

简单来说,都是异步的冲突检测,如果使用同步的冲突检测,还是用single-leader比较好。

Conflict avoidance

Tair Ldb/Rdb都是通过这种方法,把不同的数据集群的请求路由到不同的datacenter,这样就可以避免冲突的发生。

Converging toward a consistent state

让冲突收敛到一致的状态,这样所有副本的数据到最后都是相同的。有几种方法:

  • 给每一个写入分配唯一的ID,如果使用时间戳,就是我们通常说的last write wins (LWW)
  • 给每一个副本分配唯一的ID,这样从某一个副本发起的write能覆盖其它的
  • 想办法把数据合并到一起,比如说incr时,把每个datacenter的计数分别存放
  • 把冲突的数据都记录下来,留给业务方来处理
Custom conflict resolution logic

让业务来提供个性化的方法来处理冲突检测,按照处理的阶段来分类可以分开为两类:

  • On Write。PostgreSQL的复制工具Bucardo就是采用这种方式,会调用一段事先写好的Perl来处理
  • On Read。把所有的冲突现在都保存下来,在读的时候来处理,CouchDB是这样干的

这里的冲突检测通常是针对单条语句的,而不是一个事务的,对于同一个事务,那有可能每一条更改对应到最后听冲突处理的方案不一样,这就破坏了事务的语义。

有几种自动处理冲突的算法,但是实现都不是特别地成熟:

  • Conflict-free replicated datatypes (CRDTs),这个我们接触得更多一些,在Riak 2.0中实现了
  • Mergeable persistent data structures,这个类似于Git,没有了解过
  • Operational transformation,这个也没了解过,Google Docs会使用

Multi-Leader Replication Topologies

multi-leader复制的拓扑图有如下的几种:

最常用的方法是(c)Tair Ldb/Rdb中使用的复制拓扑就是如此,这种拓扑需要分辨出客户端写入的数据和远端同步过来的数据,如果集群内部有主备,还得需要更加细致的区分。

对于CircularStar这两种架构,要避免复制回环可能会更加复杂一些,需要维护更多的状态。而且,如果有一个节点坏了会中断后续节点的同步,所以还是用All-to-all会更加靠谱一些。当然,All-to-all也有自己的问题,会更容易导致因果依赖的问题。

一些问题我们可以引入这一章中后面会提到的version vectors来解决。但是,在目前存在的multi-leader的系统中,冲突检测技术被实现得很有限。

Leaderless Replication

到目前为止,我们讨论的复制的方法都是基于客户端的写都是写入到Leader节点的,然后数据库系统本身去把写入复制到其它的副本。 Leader节点决定这些写入请求的顺序,Follower会以相同的顺序来应用这些写入。

某些系统采用了完全不一样的方法,完全抛弃了Leader的概念,允许任何副本都能够接受客户端的写请求。曾经有段时间有些系统是这样的,但是自从关系型数据库出来之后就销声匿迹了。但是Dynamo出现之后,这种架构又变得流行起来了,RiakCassandraVoldemort都是使用的这种架构,所以这些数据库也被称为Dynamo-style

有些leaderless的实现中,客户端直接写入到多个副本中;其它的一些是写入到一个coordinator node,不同于leader节点的是,coordinator不保证写入的顺序。

Writing to the Database When a Node Is Down

leaderless的系统中,客户端会将写入请求并发地发送到多个节点中,如下图所示,客户端会将请求发送到3个节点中:

如果有一个节点Replica 3在写入的时候挂掉了,那么客户端能写成功两个,会忽略掉失败的这个节点。等Replica 3恢复的时候,它就可以接受请求了,但这个时候它可能已经丢失了一些数据了。

和写策略类似,读的时候也会read requests are also sent to several nodes in parallel。这时候就会发现Replica 3中的数据是旧的,不会使用这个脏数据。

Read repair and anti-entropy

上面的例子里,Replica 3中的数据是旧的,那么问题来了,这个数据什么时候被修复呢?这里有两种方法:

  • Read repair,上面的例子中客户端读的时候能够发现Replica 3中的数据是旧的,这时候客户端可以将读到的新数据再写入到Replica 3中,这样的方法称为读修复。对于会被经常读到的数据,这个方法是ok的。
  • Anti-entropy process,不通过客户端请求触发,而是服务端有后台的任务来校验数据,进行修复。

并不是所有的系统都实现了这两种方法,比如Voldemort就没有Anti-entropy process。这样会导致一些持久化的风险。

Quorums for reading and writing

这里有3个符号:n,操作的副本数,每次写操作都必须并发地写入到n个副本;w,写操作需要等待至少w个副本的返回;r,读操作需要至少请求r个副本。

只要w+r>n,我们就可以保证能获取到最新的值。

Normally, reads and writes are always sent to all n replicas in parallel. The parameters w and r determine how many nodes we wait for

如果写入等到的确认不足w,我们就会认为写入失败,但是对于leaderless来说,写入失败也没有办法回滚。要么返回失败的意义是?

Limitations of Quorum Consistency

通常来说,w和r会选取n的大多数(more than n/2),因为这些能容忍的失效节点数是最多的,但是这样选择并不是必须的。

如果w + r ≤ n,那么读可能会拿不到最新的值,但是可以提供更低的延时和可用性。而且,即使w + r > n,在下面的一些情况下依然会读到脏数据:

  • sloppy quorum被使用时,就不能保证wr之间有重叠了;
  • 当有并发的写入时,最保险的方法是能够合并并发的写入,但是使用LWW的话,也会导致数据的丢失
  • 当读写有并发的时候,会不确认读会读到新的值还是老的值,就比如说执行两次Read,第一次读到新的值,第二次可能会读到老的值
  • 写入失败时,是不会回滚写入成功的结点的,那么读的时候可能会返回这次失败的写入的值,有可能不会
  • 如果一个包含新值的节点失败,然后从一个包含老值的节点恢复了数据,那这个时候,包括新值的节点数就会比w少,这里涉及到恢复数据时节点选取的策略了

quorums看起来能保证读的时候能够读到最新的数据,其实不然,需要了解到这一点。

Monitoring staleness

Leader-base replication中,监控同步延迟是非常容易的,比较在replication log中位点的不同即可。

对于leaderless的系统来说,是很难衡量的,对于没有anti-entropy的系统来说,如果旧值没有被读到,就很难被修复了。Eventual consistency是个模糊的概念,很难被量化,但是现在在这方面已经有一些研究了,但是还没有在实践中被使用。

Sloppy Quorums and Hinted Handoff

在一个在集群中(比n要大很多的集群),对于某一个key,它对应的n个副本所在的机器访问不了了,但是其它机器都是ok的,那么这时我们就面临一个选择:是让这个key的所有读写访问都失败,还是临时写入到其它可以访问的节点呢?

后面一种方法就是sloppy quorum,直译过来是“草率的多数派”。假设原来对应的n个副本所在的机器是RG-old,临时选的n个副本所有的机器是RG-new,刚网络恢复,RG-old可以访问时,RG-new会把所有写入复制加RG-old,这个过程被称为hinted handoff

sloppy quorum能够提高系统的可用性,只有集群中有n个节点可用,就可以接受写入。但是代价就是读的时候可以拿不到最新的值,因为可能hinted handoff正在进行中。在Riak中,sloppy quorum是默认打开的,在CassandraVoldemort中是默认关闭的。

对于多数据中心的复制,CassandraVoldemort直接在leaderless的模型里面支持了,也就是说所有的数据中心的节点都认为是在一个集群中,然后w设置成本地数据中心节点就能满足的值。Riak的多数据中心复制和multi-leader是类似的。

Detecting Concurrent Writes

multi-leader replication类似,leaderless replication需要对写入的冲突进行处理。

以上图的例子为例, Node 2Node 3返回的数据版本号都是2,但是值是不一样的,Node 1返回的数据版本是1,那应该用哪个值来修复Node 1中的数据呢?这时就会有最终不一致的风险,因为无法判定Node 2Node 3中的值哪个是最新的。

Last write wins (discarding concurrent writes)

在每一个写入带上一个时间戳,那么对于每一个记录,都会带上一个时间戳存储,如果接收到比这个时间戳要小的请求,就直接忽略掉了。这种方法被称为LWW

后面说的LWW会丢失数据这一点不太理解,如果有并发的写入,只有一个最终会写成功,其它的写入都被忽略掉了,这在我看来并不是数据丢失啊。如果是更新一个key的值,这个是可以接受的,如果是incr之类的操作,不太明白作者想表达什么意思。

The “happens-before” relationship and concurrency

这一部分内容不知道想讲什么,可以就想讲因果依赖这件事,对于两个操作A、B,有3种可能:A依赖于B;B依赖于A;A、B是并发的。

Capturing the happens-before relationship

系统需要有能力探测到有因果关系的写入。

比如上图中,Client 1Client 2的某些操作是并发的,某些操作是有依赖关系的,因果关系如下图所示:

系统可以通过如下算法来判断两个操作是否可以并发:

  • 对于每一个key, 服务器维护一个版本号,每次有写入时就增加版本号,把版本号和value存放在一起,会保留多个版本
  • 读取的时候,服务器会返回所有没有被overwrititten的数据版本,也包含了最终的版本号。
  • 客户端在写入key时,需要包含之前读到的最新的版本,并将自己之前读到的内容都merge起来。写入请求的返回和读取一样,会返回所有有效版本的value
  • 服务端收到一个带特定版本的写入请求时,可以将这个版本的value覆盖掉,但是需要把value和最新的版本号一起保存。

如果写入请求没有带特定的版本号,那么这个写入就跟其它写入没有因果依赖了。

Merging concurrently written values

客户端需要自己来处理读到多个版本的数据,在Riak中称这些并发的数据为siblings。在上面的例子当中, 我们合并sibling的操作很简单,就是取并集。但是如果允许客户端从购物车中删除东西时,就会存在问题。这时候可以引入tombstone来解决。Riak中支持了CRDT,这也和它leaderless的架构是有关系的。

Version vectors

在上面的例子中我们可以看到对于某个key,有很多版本的数据,那么在多副本的情况下, 对于每一个副本,都保存一个对应版本的数据。所有副本中版本集合称之为version vector,其中Riak使用的变种,dotted version vector是很多意思的。version vector可以从一个副本中读取,并安全地写入到另外一个副本中去。这里如果有例子会更有利于理解。

这个其实挺有意思,可以再多了解一下dotted version vectorpaper

最后

关于复制的话题讲到这儿,下一篇讲分区。