全面介绍数砖开发 Delta Lake 的第一篇论文

全面介绍数砖开发 Delta Lake 的第一篇论文

摘要

云对象存储如Amazon S3,作为目前最大且最节约成本的存储系统,用于实现数据仓库和数据湖的存储非常具有吸引力。但由于其实现的本质是键值存储,保证ACID事务性和高性能具有很大的挑战:元数据操作,比如list对象是很昂贵的操作;一致性保证也受限。

在本论文中,我们向大家介绍Delta Lake,一个由Databricks开源的基于云对象存储的ACID表存储层技术。Delta Lake通过使用压缩至Apache Parquent格式的事务性日志来提供ACID,Time Travel以及海量数据集的高性能元数据操作(比如快速搜索查询相关的上亿个表分区)。同时Delta Lake也提供一些高阶的特性,比如自动数据布局优化,upsert,缓存以及审计日志等。Delta Lake表可以通过Apache Spark, Hive, Presto, Redshift等系统访问。Delta Lake目前已被上千个Databriks用户使用,每天处理exabytes级数据,最大的应用实例管理EB级数据集以及上亿对象。

1.引言

云对象存储如Amazon S3以及Azure Blob存储已成为最大且最广泛使用的存储系统,为上百万用户存储EB级数据。除了云服务传统的优点,如按需付费,规模效益,专业的管理等,云对象存储特别具有吸引力的原因是允许用户对存储和计算资源进行分离;举例来说,用户可以存储PB数据,但是仅运行一个集群执行几个小时的查询。

因此,目前许多组织使用云存储来管理数据仓库以及数据湖上的大型结构化数据。主流的开源大数据系统,包括Apache Spark, Hive以及Presto支持Apache Parquet,ORC格式云对象存储的读写。商业服务包括AWS Athena, Google BigQuery和Redshift Spectrum 查询也支持以上这些系统及这些文件格式。

不幸的是,尽管许多系统支持云对象的读写,实现高性能及可变的表存储非常有挑战,同时也使得在其上构建数仓很困难。与分布式文件系统如HDFS, 或者DBMS的定制存储引擎不同,大多数云存储对象都仅仅只是键值存储,并没有跨键的一致性保证。它们的性能特点也与分布式文件系统非常不同因此需要获得特殊的关注。

在云存储对象中存储关系型数据最常见的方式是使用列存储格式,比如Parquet和ORC,每张表都被存储为一系列对象(parquet或者ORC文件)的集合,通过某些列做分区。这种方式在对象文件的数量适中时,扫描文件的性能尚可接受。但对于更复杂的扫描工作,正确性以及性能都将受到挑战。首先,多对象的更新并不是原子的,查询之间没有隔离:举例来说,如果一个查询需要更新表中的多个对象(比如从表的所有parquet文件中删除某个用户的相关记录),由于是逐个object更新,因此读客户端将会看到部分更新。另外,写回滚也很困难:如果一个更新失败,那么表将处于被污染的状态。第二,对于有上百万对象的大表,元数据操作非常昂贵。比如,parquet文件中footer包含了min/max等统计信息在查询时用来帮助跳过读文件。在HDFS上读footer信息只需要几毫秒,云对象存储的延迟非常高使得跳过读操作甚至比实际的查询花费时间还要长。

从我们与云客户工作的经验来看,这些一致性以及性能方面的问题对企业的数据团队产生了很大的挑战。大多数的企业数据是持续更新的,所以需要原子写的解决方案;多数涉及到用户信息的数据需要表范围的更新以满足GDPR这样的合规要求。即使是内部的数据也需要更新操作来修正错误数据以及集成延迟到达的记录。有趣的是,在Databricks提供云服务最初的几年,我们收到的客户服务支持升级中,约有一半都是由于云存储策略导致的数据损毁,一致性以及性能等方面的问题。(比如,取消更新任务失败造成的影响,或者改进读取上万个对象的查询性能)。

为了解决这些挑战,我们设计了Delta Lake,基于云存储对象的ACID表存储层。Delta Lake从2017年开始服务于客户,并于2019年开源。Delta Lake的核心概念很简单:我们使用存储在云对象中的预写日志,以ACID的方式维护了哪些对象属于Delta table这样的信息。对象本身写在parquet文件中,使已经能够处理Parquet格式的引擎可以方便地开发相应的connectors。这样的设计可以让客户端以串行的方式一次更新多个对象,替换一些列对象的子集,同时保持与读写parquet文件本身相同的高并发读写性能。日志包含了为每一个数据文件维护的元数据,如min/max 统计信息。相比“对象存储中的文件”这样的方式,元数据搜索相关数据文件速度有了数量级的提升。 最关键的是,我们设计Delta Lake使所有元数据都在底层对象存储中,并且事务是通过针对对象存储的乐观并发协议实现的(具体细节因云厂商而异)。这意味着不需要单独的服务来维护Delta table的状态;用户只需要在运行查询时启动服务器,享受存储计算扩展分离带来的好处。

基于这样的事务性设计,我们能够加入在传统云数据湖上无法提供的解决用户痛点的特性,包括:

  • Time travel:允许用户查询具体时间点的数据快照或者回滚错误的数据更新。

  • Upsert,delete以及merge操作:高效重写相关对象实现对存储数据的更新以及合规工作流(比如GDPR)

  • 高效的流I/O:流作业以低延迟将小对象写入表中,然后以事务形式将它们合并到大对象中来提高查询性能。支持快速“tail”读取表中新加入数据,因此作业可以将Delta表作为一个消息队列。

  • 缓存:由于Delta表中的对象以及日志是不可变的,集群节点可以安全地将他们缓存在本地存储中。我们在Databricks云服务中利用这个特性为Delta表实现透明的SSD缓存。

  • 数据布局优化:我们的云服务包括一个特性,能够在不影响查询的情况下,自动优化表中对象的大小,以及数据记录的聚类(clustering)(将记录存储成Zorder实现多维度的本地化)。

  • Schema演进:当表的schema变化时,允许在不重写parquet文件的情况下读取旧的parquet文件。

  • 日志审计:基于事务日志的审计功能。

这些特性改进了数据在云对象存储上的可管理性和性能,并且结合了数仓和数据湖的关键特性创造了“湖仓”的典范:直接在廉价的对象存储上使用标准的DBMS管理功能。事实上,我们发现很多Databricks的客户希望使用Delta Lake简化他们整体的数据架构,替换之前分离的数据湖,数仓,以及流存储系统,用Delta表来为所有用例提供适用的功能。表格1展示了一个例子,数据管道包括对象存储,消息队列以及为两个不同商业智能服务的数仓(每一个使用独立的计算资源),替换为只包含云存储对象上的Delta表,使用Delta的流I/O以及性能特性来执行ETL和BI。这种新的管道只用到了廉价的对象存储并产生了更少数量的数据备份,在存储和运维方面降低了成本。

图1: 使用3个存储系统实现的数据pipeline(1个消息队列, 1个对象存储 和 1个数据仓库), 或者使用 Delta Lake去同时实现流和表存储. Delta Lake的实现使得不需要维护多份copy,只需要使用最便宜的对象存储.

Delta Lake目前被大多数Databricks的大客户采用,每天处理exabyte数据(约占我们整体工作需求的一半),其他云厂商如Google Cloud,Alibaba,Tencent,Fivetran,Informatica,Qlik,Talend以及其他产品也支持Delta Lake。在Databricks的客户中,Delta Lake的使用用例非常多样化,从传统的ETL和数仓的工作流,到生物信息学,实时网络安全分析(每天处理数百TB流事件数据),GDPR合规性以及用于机器学习的数据管理(管理数以百万计的图像作为Delta表中的记录而不是S3对象,以获取ACID并提高性能)。我们将在Section 5具体讨论这些使用用例。

有趣的是,Delta Lake将Databricks的云存储相关支持问题比例从一半降为接近于零。Detla Lake为大多数客户改善了负载性能,在某些极端用例下,使用Delta Lake的数据布局以及快速访问统计信息对高维数据集(比如网络安全和生物信息学等场景)查询,甚至可以获得100倍的速度提升。开源的Delta Lake项目包含了Apache Spark(流批), Hive, Presto,AWS Athena,Redshift以及Snowflake的连接器,能够运行在多种云对象存储或者HDFS之上。在本文中,我们将展示Delta Lake的设计初衷,设计理念,用户使用案例以及推动设计的性能测试。

2.动机:云对象存储的特点及挑战

2.1 对象存储API

云对象存储,比如Amazon S3,Azure Blob存储,Google云存储,以及OpenStack Swift,都提供了简单但容易扩展的键值存储接口。这些系统允许用户创建桶,每个桶存储多个对象,每个对象都是一个二进制blob,大小可到几TB(比如,在S3上对象最大为5TB),每个对象都由一个字符串作为key来标识。通常的方式是将文件系统的路径作为云对象存储的key。但云对象存储不能像文件系统那样,提供廉价的对“文件”或者对象的重命名操作。云对象存储提供元数据API,比如S3的List操作,根据给定的起始键,列出在某个桶中按键的字典序排序的所有对象。这使得通过发起一个LIST请求,给定一个代表目录前缀的key(比如 warehouse/table1/)有效地列出目录下的所有对象成为可能。但很可惜的是,元数据的API操作通常很昂贵,比如S3的LIST调用每次只能返回1000个key,每次调用花费几十至上百毫秒,所以当以顺序的方式列出一个有数百万对象的数据集可能需要好几分钟。

读取对象时,云对象存储支持字节范围的请求,读取某个大对象的某字节范围(比如,从10000字节到20000字节)通常是高效的。这样就可以利用对常用值进行聚类的存储格式。

更新对象通常需要重写整个对象。这些更新需要是原子的,以使读对整个新版本对象或者老版本对象可见。有些系统也支持对象的追加写。

一些云厂商也在blob存储上实现了分布式文件系统接口,比如Azure的ADLS Gen2与Hadoop的HDFS具有相似的语义(比如目录的原子rename)。然而,Delta Lake解决的许多问题,如小文件问题,对多个目录的原子更新问题即使在分布式系统中也依然存在。事实上,有很多用户是在HDFS上使用Delta Lake。

2.2 一致性属性

如引言中所述,大多数云对象存储对单个key提供最终一致性保证,对跨key不提供一致性保证, 这对包含多对象的数据集管理提出了挑战。特别是当客户端提交了新的对象,其他客户端不能够保证在LIST或者读操作中立即看到这个对象。类似地,对现有对象的更新对其他客户端也不能够立即可见。更严重的是,有些对象存储系统,即使同一客户端执行了写操作也不能够立即读到新对象。

精确的一致性模型因不同的云厂商而异,且相当复杂。举个具体的例子,Amazon S3提供了写后读的一致性,S3的客户端在PUT操作后可以通过GET返回这个对象的内容。一个例外是:如果客户端在PUT之前对不存在的Key先调用了GET,那么后续的GET操作可能由于S3的逆向缓存机制在一段时间内读不到这个对象。S3的LIST操作是最终一致的,这意味着在PUT之后LIST操作可能无法返回新的对象。其他的云对象存储提供更强的一致性保证,但在跨key的情况下仍然无法提供原子性操作。

2.3 性能特点

根据我们的经验,通过对象存储实现高吞吐量需要在大型顺序I / O和并行性之间取得平衡。

对于读取,如前所述,最小粒度的操作是读取连续字节范围。每个读取操作通常会有至少5–10 ms的延迟,然后以大约50–100 MB / s的速度读取数据,因此,一个操作需要读取至少数百KB,才能达到顺序读取的峰值吞吐量的一半;读取数MB才能以接近峰值吞吐量。此外,在典型的VM配置上,应用程序需要并行运行多个读取以最大化吞吐量。例如,在AWS上最常用于分析的VM类型具有至少10 Gbps的网络带宽,因此它们需要并行运行8-10次读取才能充分利用此带宽。

LIST操作也需要高并行度才能快速列出大数量的对象。比如S3的LIST操作每个请求只能返回1000个对象,耗时十到数百毫秒,因此客户端对大桶或者目录进行list时需要并行发出上百个LIST请求。在针对云上Apache Spark的优化运行时中,除了在Spark集群的driver节点中并行执行线程外,有时我们还会在worker节点上并行执行LIST操作以使它们更快地运行。在Delta Lake中,可用对象的元数据(包括它们的名称和数据统计信息)是存储在Delta日志中的,但我们还是会并行从该日志中读取数据。

如2.1节所述,写操作通常要求必须重写整个对象(或者追加),这意味着如果一张表期望得到点更新,那么对象文件必须小一些,这与大量读对文件大小的要求是矛盾的。一种替代方案是使用日志结构的存储格式。

表存储的含义。对于分析型工作负载,对象存储的性能特征引出的三点考虑:

  • 将需要经常访问的数据就近连续存储,这通常要求选择列存储格式。

  • 使对象较大,但不能过大。大对象增加了更新数据的成本(例如,删除某个用户的所有数据),因为需要全部重写。

  • 避免使用LIST操作,并在可能的情况下按字典顺序的键范围发送请求。

2.4 现有的表存储方法

基于对象存储的特征,目前主要有三种方法在对象存储之上管理表格数据集。我们将简述这些方法及其面临的挑战。

**1.**目录文件 目前开源大数据技术栈以及云服务支持的最通用的方式是将表存储为对象集合,通常采用列存,比如Parquet。作为一种改进,可以基于一个或多个属性将记录“分区”到目录中。例如,对于具有日期字段的表,我们可以为每个日期创建一个单独的对象目录,例如,mytable / date = 2020-01-01 / obj1 以及mytable / date = 2020-01-01 / obj2用于记录从1月1日的数据,mytable / date = 2020-01-02 / obj1,1月2日的数据,依此类推,然后根据该字段将传入的数据拆分为多个对象。这样的分区减少了LIST操作以及仅访问几个分区的查询读操作的成本。

这种方式具有吸引力是因为整个表仅由一些对象组成,可以通过许多工具访问 而无需运行任何其他数据存储或系统。这种方式起源于HDFS之上的Apache Hive,并且与Parquet,Hive和文件系统上的其他大数据软件配合使用。

如引言中所述,这种方式的挑战是 “一堆文件”在云对象存储上有性能和一致性方面的问题。客户遇到的最常见挑战是:

  • 跨多个对象没有原子性:任何需要写入或更新多个对象的事务都可能导致其他客户端只可见部分写入。此外,如果事务失败,数据将处于损坏状态。

  • 最终一致性:即使事务成功,客户端也有可能只看到部分更新对象。

  • 性能差:查找与查询相关对象时的LIST操作很昂贵,即使它们被键划分到分区目录中。此外,访问存储在Parquet或ORC文件中的对象统计信息很昂贵,因为它需要对每个文件的统计信息进行额外的高延迟读取。

  • 没有管理功能:对象存储没有实现数据仓库中常用的标准工具,例如表版本控制或审核日志。

2. 自定义存储引擎. 为云构建的“闭源”存储引擎,例如Snowake数据仓库[23],可以通过在单独的,高度一致的服务中管理元数据来绕过云对象存储的许多一致性挑战。这种服务保存着哪些对象构成了表这样的事实。在这些引擎中,可以将云对象存储视为笨拙的块设备,并且可以使用标准技术在云对象上实现有效的元数据存储,搜索,更新等。但是,此方法需要运行一个高可用性服务来管理元数据,这可能很昂贵,在使用外部计算引擎查询数据时可能会增加成本,而且有可能将用户锁定在某个特定厂商。

这种方式的挑战:尽管这种从头开始的“闭源”设计是有好处的,但使用这种方法遇到的一些具体挑战是:

  • 所有对表的I / O操作都需要连接元数据服务联系,增加资源成本并降低性能和可用性。例如,当用Spark访问Snow flake数据集时,使用Snow flake的Spark连接器通过Snow service的服务读取数据,与直接从云对象存储中读取数据相比降低了性能。

  • 与重用现有开放格式(例如Parquet)的方法相比,开发现有计算引擎的连接器需要更多的工作量。根据我们的经验,数据团队希望在数据上使用多种计算引擎(例如Spark,TensorFlow,PyTorch等),因此使连接器易于实现非常重要。

  • 专有的元数据服务将用户与特定厂商绑定,相比之下,基于直接访问云存储的方式使用户总是能够使用各种技术访问他们的数据。

图2:一个Delta table的对象layout案例

Apache Hive ACID 使用Hive Metastore(一种事务性关系型数据库,例如MySQL)跟踪每张表相关的更新,更新以多个文件的形式存储在表的元数据信息中,一般为ORC格式。但是,这种方法受限于metastore的性能,根据我们的经验,它可能成为具有数百万个对象的表的瓶颈。

**3.**在对象存储中保存元数据 Delta Lake的方法是将事务日志和元数据直接存储在云对象存储中,并在对象存储操作上使用一组协议来实现可序列化。。表中的数据以Parquet格式存储,只要实现一个最基本的连接器去发现要读取的对象集,就可以使用任何已经支持Parquet的软件访问数据。尽管我们认为Delta Lake是第一个使用该设计的系统(从2016年开始),但现在另外两个软件Apache Hudi 和Apache Iceberg 也支持这种方式。Delta Lake提供了一系列这些系统不支持的独特功能,例如Z序聚类,缓存和后台优化。我们将在第8节中详细讨论这些系统之间的异同。

  1. DELTA LAKE存储格式及访问协议

Delta Lake表是云对象存储或文件系统上的一个目录,其中包含具有表内容的数据对象和事务操作日志(包含检查点)。客户端使用我们根据云对象存储的特性量身定制的乐观并发控制协议来更新这些数据结构。在本节中,我们描述了Delta Lake的存储格式以及访问协议。我们还描述了Delta Lake的事务隔离级别,包括序列化(serializable)和快照(snapshot)隔离级别。

3.1 存储格式

图2展示了Delta Table的存储格式。每个表都存储在一个文件系统目录中(本例中是mytable)或者在对象存储中以相同目录作为key前缀的一些对象。

3.1.1 数据对象

表的内容存储在Apache Parquet对象中,可以使用Hive的分区命名规范将其组织到目录中。

例如在图2中,该表按日期列分区,因此对于每个日期,数据对象位于单独的目录中。我们选择Parquet作为我们的数据格式,因为Parquet面向列,提供多种压缩更新,支持半结构化数据的嵌套数据类型,并且已经在许多引擎中实现了高性能。基于现有的开放文件格式,还使Delta Lake可以持续利用Parquet库最新发布的更新并简化其他引擎的连接器开发(第4.8节)。其他开放文件格式,例如ORC [12],也可以类似地工作,但是Parquet在Spark中拥有的支持最为成熟。

每个数据对象在Delta中拥有唯一名字,通常是由writer生成的GUID。哪些对象是表的哪的版本是由事务日志决定的。

3.1.2 日志

日志存储在表的_delta_log子目录中。它包含一系列以零填充的递增数字作为ID的JSON对象用于存储日志记录,并包含对某些特定日志对象的检查点,这些检查点将检查点之前的日志合并为Parquet格式。如3.2节中讨论的,一些简单的访问协议(取决于每个对象存储中可用的原子操作)用于创建新的日志条目或检查点,并使客户端在此基础上支持事务。

每个日志记录对象(比如000003.json)包含了在前一个版本的表基础上进行的操作数组,以产生下一个版本。可用的操作包括:

更改元数据 metaData操作更改表的当前元数据。表的第一个版本必须包含metaData操作。后续的metaData操作将完全覆盖表的当前元数据。元数据是一种数据结构,其中包含模式,分区列名称(如示例中的日期),数据文件的存储格式(通常为Parquet,但提供了可扩展性)以及其他配置选项,例如将表标记为仅追加。

添加或删除文件   添加和删除操作用于通过添加或删除单个数据对象来修改表中的数据。因此,客户可以搜索日志以查找所有尚未删除的已添加对象,以确定组成表的对象集。

数据对象的添加记录还可以包括数据统计信息,例如总记录条数以及每列的最小/最大值和空计数。当表中已存在的路径遇到添加操作时,最新版本的统计信息将替换任何先前版本的统计信息。这样可以在新版Delta Lake中“升级”旧表使其具有更多统计信息。

删除操作包括表明删除发生时间的时间戳。在用户指定的保留时间阈值之后,数据对象会被进行惰性延迟物理删除。此延迟使并发的读取器可以继续对过期的数据快照执行操作。删除操作应作为墓碑保留在日志和所有日志检查点中,直到数据对象被删除为止。

可以将添加或删除操作上的dataChange标志设置为false,以指示当与同一日志记录对象中的其他操作结合使用时,此添加或删除操作仅对现有数据重新排列或添加统计信息。例如,跟踪事务日志的流查询可以使用此标志来跳过不会影响其结果的操作,例如在早期数据文件中更改排序顺序。

协议演进 协议操作用于增加Delta协议的版本号,在读取或写入给定表时需要此版本号。我们使用此操作向存储格式添加新功能,同时指出哪些客户端仍然兼容。

添加来源信息 每个日志记录对象还可以在commitInfo操作中包括来源信息,例如,记录执行操作的用户。

更新应用事务ID。Delta Lake为应用程序提供了一种将应用程序的数据包括在日志记录中的方法,这对于实现端到端事务性应用很有用。例如,写入Delta表的流处理系统需要知道先前已经提交了哪些写入,才能实现“精确一次性”的语义:如果流作业崩溃,则需要知道其哪些写入先前已写入表中,以便它可以从输入流中的正确偏移处开始重播后续写入。为了支持该用例,Delta Lake允许应用程序在其日志记录对象中写入带有appId和版本字段的自定义txn操作,这样该日志对象就可以用来跟踪应用程序特定的信息,例如本示例中输入流的对应偏移量。将此信息与相应的Delta添加和删除操作放置在相同的日志记录中(原子地插入到日志中),应用程序可以确保Delta Lake以原子方式添加新数据并存储其版本字段。每个应用程序可以简单地随机生成其appId来获得唯一的ID。我们在Spark Structured Streaming的Delta Lake connector中中使用此特性。

3.1.3日志检查点

出于性能考虑,有必要定期将日志压缩到检查点中。检查点存储了直到特定日志记录ID的所有非冗余操作,以Parquet格式存储在表的日志中。某些冗余的操作是可以删除的。这些操作包括:

  • 对同一数据对象先执行添加操作,然后执行删除操作。可以删除添加项,因为数据对象不再是表的一部分。根据表的数据retention配置,应将删除操作保留为墓碑具体来说,客户端使用在删除操作中的时间戳来决定何时从存储中删除对象。

  • 同一对象的多个添加项可以被最后一个替换,因为新添加项只能添加统计信息。

  • 来自同一appId的多个txn操作可以被最新的替换,因为最新的txn操作包含其最新版本字段

  • changeMetadata以及协议操作可以进行合并操作以仅保留最新的元数据。

检查点过程的最终结果是一个Parquet文件,其中包含仍在表中的每个对象的添加记录,需要保留直到retention period到期的对象删除记录,以及如txn,协议和changeMetadata等操作的少量记录。这种面向列的文件对于查询表的元数据以及基于数据统计信息查找哪些对象可能包含与选择性查询相关的数据来说是非常理想的存储格式。根据我们的经验,使用Delta Lake检查点查找对象集几乎总是比使用LIST操作和读取对象存储上的Parquet文件的Footer要快得多。

任何客户端都可以尝试创建至指定日志记录ID的检查点,如果成功,则应将其写为对应ID的.parquet文件。例如,000003.parquet将代表直到并包括000003.json记录的检查点。默认情况下,我们的客户端每10个事务会写入一个检查点。

最后,访问Delta Lake表的客户端需要高效地找到最后一个检查点(以及检查点之后的日志),而不需要列出_delta_log目录中的所有对象。检查点writer将会把最新的检查点ID写入_delta_log / _last_checkpoint文件中,前提是写入的检查点ID比该文件中当前的ID更大。请注意,由于云对象存储库最终的一致性问题,即使_last_checkpoint文件不是最新的也没有关系,因为客户端仍会在该文件中的ID之后搜索新的检查点。

3.2 访问协议

Delta Lake的访问协议是为了让用户能依托“对象存储”的接口实现“序列化”级别事务,尽管大部分公有云的“对象存储”只提供“最终一致性”保障。这个选择关键在于需要有一个“日志记录”对象,例如000003.json,此“日志对象”会作为客户端读取数据表的某个版本时使用的核心数据结构。读取了这个“日志对象”的内容,用户就能够从“对象存储”中定位到本张数据表中其他对象,完成后续对数据表中数据的查询,当然由于“对象存储”最终一致性,读取时可能数据对象还不可见,客户端可能需要等一个delay的小段时间。对于“写入”事务,用户需要一种机制去保障只有一个用户能创建下一个“日志记录”(比如,000003.json),这种机制可以理解为一种类似“乐观锁”的控制能力。

3.2.1 读表操作

我们先描述Delta table的read-only读事务。读事务会安全的读到数据表的某个版本。Read-only的读事务有5个步骤:

1.在table的log目录读取_last_checkpoint 对象,如果对象存在,读取最近一次的checkpoint ID

2.在对象存储table的log目录中执行一次LIST操作,如果“最近一次checkpoint ID”存在,则以此ID做start key;如果它不存在,则找到最新的.parquet文件以及其后面的所有.json文件。这个操作提供了数据表从最近一次“快照”去恢复整张表所有状态所需要的所有文件清单。(需注意:因为对象存储是最终一致性语义,这个LIST操作返回的文件清单可能不连续,比如清单中有000004.json和000006.json但是没有000005.json . 这个问题Delta Lake有考虑到,客户端可以使用从表中读取的最大的ID,这里是000006.json,等待所有确实的对象可见后再完成计算)

3.使用“快照”(如果存在)和后续的“日志记录”去重新组成数据表的状态(即,包含add records,没有相关remove records的数据对象)和这些数据对象的统计信息。Delta数据格式被涉及可以并行读取:比如,在使用Spark读取delta格式时,可以使用Spark job去并行读取.parquet的快照文件和.json的”日志记录“。

4.使用统计信息去定位读事务的query相关的数据对象集合。

5.可以在启动的spark cluster或其他计算集群中,并行的读取这些相关数据对象。 需注意,因为对象存储的最终一致性,一些worker节点可能读不到driver在制定执行计划后下发任务的相关数据文件,目前的设计是如果worker读不到,就等一段时间然后retry。

我们注意到这个访问协议的每一步中都有相关的设计去规避对象存储的最终一致性。比如,客户端可能会读取到一个过期的_last_checkpoint文件,仍然可以用它的内容,通过LIST命令去定位新的“日志记录”文件清单,生产最新版本的数据表状态。这个_last_checkpoint文件主要是提供一个最新的快照ID,帮助减少LIST操作的开销。同样的,客户端能容忍在LIST最近对象清单时的不一致(比如,日志记录ID之间的gap),也能容忍在读取日志记录中的数据对象时,还不可见,通过等一等的方式去规避。

3.2.2 写事务

一个写入数据的事务处理,一般会涉及最多5个步骤,具体有几步取决与事务中的具体操作:

1.找到一个最近的日志记录ID,比如r,使用读事务协议的1-2步(比如,从最近的一次checkpoint ID开始往前找).事务会读取表数据的第r个版本(按需),然后尝试去写一个r+1版本的日志记录文件。

2.读取表数据的r版本数据,如果需要,使用读事务相同的步骤(比如,合并最新的checkpoint .parquet 和 较新的所有.json 日志记录文件,生成数据表的最新状态,然后读取数据表相关的数据对象清单)

3.写入事务相关的数据对象到正确的数据表路径,使用GUID生成对象名。这一步可以并行化。最后这些数据对象会被最新的日志记录对象所引用。

4.尝试去写本次写事务的日志记录到r+1版本的.json日志记录对象中,如果没有其他客户端在尝试写入这个对象(乐观锁)。这一步需要是原子的(atomic),我们稍后会探讨在不同的对象存储中如何实现原子性。如果这一步失败了,事务是要重试的;这取决于事务query的语义,在一定情况下客户端还是可以在重试中复用step3产生的数据对象们,然后把这些数据对象们写入到重试事务产生的新的.log日志记录对象中。

5.此步可选。为r+1版本的日志记录对象,写一个新的.parquet 快照对象.(最佳实践中,默认每10条日志记录会做一次快照) 然后,在写事务完成后,更新_last_checkpoint文件内容,指向r+1的快照。

需注意到第5步中,写一个新的.parquet 快照对象,更新_last_checkpoint文件内容,只会影响性能,如果在这一步客户端失败了并不会损害到数据完整性。比如,在生成快照对象时失败了,或者在更新_last_checkpoint文件内容时失败了,其他客户端仍然可以使用老一些的快照去读取数据表的内容。在第4步成功后,事务就算原子性的提交完成了。

原子性的添加日志记录。在写事务协议中很明显的,步骤4,创建r+1版本的.json日志记录对象需要原子性:只能有一个客户端能成功的创建此日志记录。不幸的事,不是所有的大规模对象存储系统有put-if-absent类似的原子操作,我们针对不同的对象存储做了不同的实现去达到原子性的效果:

  • Google Cloud Storage 和 Azure Blob Store 都支持原子性的put-if-absent操作,所以直接使用即可

  • 在类似HDFS的分布式文件系统,我们使用原子的rename操作去rename临时文件到最终位置(如果最终位置文件已存在就fail).Azure Data Lake Storage [18]也提供了文件系统API中的原子rename操作,所以我们直接使用这些系统的这些方法。

  • Amazon S3并没有提供原子性的“put if absent” 或者 “rename” 操作。在Databricks的部署服务中,我们使用了一个单独的轻量级协调服务去保证针对一个指定ID的日志记录,只能有一个客户端能够做添加操作。这个服务只有在写事务时才需要(读事务和非数据相关操作不涉及),所以它的load是相对较低的。在开源的Apache Spark的Delta Lake connector上,我们能保证同一个Spark driver程序(SparkContext object)的进程内部能利用in-memory的状态,在事务之间保证拿到不同的日志记录ID,即用户可以在一个单独的spark集群内针对一张Delta table做并发的操作。我们仍然提供了一个API接口,留给用户足够的自由度去实现一个自己日志存储实现类,从而达成事务操作的独立、强一致性。(LogStore)

3.3  关于隔离级别

在遵循了Delta Lake的并发控制协议后,所有写事务都是线性化隔离级别(serializable)的,也使得事务的日志记录ID的线性增长。这遵循了写事务的提交协议,即每个日志记录ID只有一个写事务能使用。读事务是能达到snapshot isolation或者serializability的。在3.2.1节中描述的读协议只会读取数据表的一个快照,所以客户端使用这个协议就能达成snapshot isolation,但是客户端如果想达到线性化(serializable)的读取,可以发出一个“读after写”的事务,假装mock一次写事务然后再读,来达到线性化。在最佳实践中,Delta Lake 的connector实现了在内存中将每一张已访问过的表的最近“日志记录”ID做cache起来,这样客户端能“读自己所写”,即使客户端使用了snapshot isolation的读能力,也能在多次读操作时读到单调递增的数据表版本。更重要的是,Delta Lake目前只支持单表事务。基于“日志记录”的协议设计,在未来是可以被扩展到管理多张表上去的。

3.4 事务频率

Delta Lake的写事务频率受限于在写新的日志记录时,需要执行put-if-absent操作的延迟(描述与3.2.2章节)。在任何基于乐观锁的并发控制协议中,高频率的写事务都会导致事务commit失败。实际上,对象存储的写入延迟能达到上百毫秒ms,这严重限制了写事务的tps(transactions per second)。但是我们发现对于Delta Lake应用的并发来说这个并发率也够了,即使是一个相对高并行的streaming流式数据任务(打比方 Spark Streaming jobs),负责把数据导入到云存储,也可以把很多数据对象放在一个写事务当中批量提交。如果在未来,更高频的tps成为需求,我们相信去定制开发一个LogStore服务去负责事务日志管理(类似于Databricks在AWS针对S3存储做的commit服务),是能够提供更快的事务提交能力的(比如把事务日志先存储在低延迟的DBMS上,然后再异步写入对象存储)。当然,snapshot isolation隔离级别的读事务是没有竞争的,他们只需要去读对象存储中的对象即可,所以读事务的并发读是很高的,完全不受限的。

4.DELTA中的高级功能

Delta Lake的事务设计允许很多更宽范围的高阶数据管理功能,这和很多传统的分析型DBMS提供的便利能力类似。在本章,我们会探讨一些更广泛被使用的特性,以及客户的case或者说客户的痛点。

4.1 时间穿梭和回滚

数据工程师的pipeline经常会出逻辑错误,比如有时会把脏数据从外部系统导入到大数据系统中。在传统的数据湖设计方案中,很难去通过给单表做增加对象来实现undo更新语义。更多时候,一些工作比如机器学习训练是需要去针对老版本的数据做重新训练的(比如在同一份数据集上去对比 新/老的两种训练算法的效果)。在Delta Lake技术诞生前,这些问题都给Databricks的用户造成过很大的挑战,需要他们去设计很复杂的数据pipeline纠错辅助工具,或者将数据冗余多份。而有了Delta Lake后,基于它底层数据对象和事务日志的不可修改性,使得读取数据过去的历史快照变得很直接和容易,这是一个经典的MVCC实现。客户端只需要一个老的日志记录ID就能读到数据的历史版本。为了更好的帮用户实现Time travel,Delta Lake允许用户去做每张表级别的数据retention inverval配置,而且支持在sql中使用 timestamp 或者commit_id等隐藏字段等语义去帮助读取历史快照版本。客户端也能通过Delta Lake提供的API,在一次读/写操作后,获取到当下使用的commit ID日志记录。比如,我们在开源的MLflow项目中在每次ML训练任务中,使用这个API去自动记录数据表版本号,作为每次训练的元数据。用户会发现在修复数据pipeline的错误时,time travel功能会特别有用。比如,在需要修复一些用户数据时,有效的undo一个更新操作可以通过在数据表的快照上执行一条MERGE语句的SQL达成目标:

MERGE INTO mytable target USING mytable TIMESTAMP AS OF  source ON source.userId = target.userId WHEN MATCHED THEN UPDATE SET *

我们还开发了一个CLONE命令,它能够在数据表当下的一份快照上创建一个 copy-onwrite的新版本快照。

4.2 有效的更新,删除和合并

在企业中很多分析型的数据是需要持续更新的。比如根据GDPR[27]的数据隐私合规要求,企业需要能够有能力按要求删除一个用户相关的所有数据。即使不涉及个人隐私的数据,在某些场景下也有更新需求,比如上游数据pipeline的错误导致数据损坏就需要update去修复数据,再比如延迟到达的数据(late-arriving)也会导致需要对老数据进行更新等等。然后,一些聚合数据集也需要不断的更新聚合结果数据集(比如由数据分析时发出的针对一张表的sum query需要根据时间不断重新计算聚合值)。在传统的数据湖存储格式中,比如在S3上直接将Parquet文件放入目录下,很难在同时有并发读取的请求时去执行更新操作。即使要做,更新任务也要执行的非常小心,因为如果在更新时发生任务fail了,会留下一些“部分更新”的数据碎片。在Delta Lake中,所有这些操作都可以以事务进行运行(同时成功同时失败),在Delta log里记录(增加 or 删除)这些相关的被update的数据对象。Delta Lake支持标准的 SQL UPSERT,DELETE和MERGE语法。

4.3 流式的数据导入和消费

很多数据团队期望能使用streaming数据pipeline来实时的将数据进行ETL或者聚合操作,但基于传统的云存储是很难做到的。这些数据团队会使用一些独立的流式消息队列,比如Apache Kafka 或者 AWS Kinesis,在处理不好数据流时容易产生数据冗余,同时也给数据团队带来了很多额外的运维管理复杂度。我们在Delta Lake的设计中使用“日志记录”文件去记录事务追加的提交记录,用户可以把此日志记录当作一个message queue来看待,producer写入,consumer消费。有了递增的“日志记录”文件,用户就不需要额外再部署一套单独的消息队列服务了。这个能力由三个场景推演而来:

写合并. 一个最简化的数据湖就是由一组对象文件组成,这使得写入数据很容易(写新的数据对象即可),但是在 写和读 的性能之间很难找到一个很好的平衡点。如果写入方想要快速的通过写小文件的方式达到快速写入数据的目的,在读取方最终会因为“过多的小文件读”和过多的“元数据”操作而变慢。相反,Delta Lake允许用户去跑一个后台Daemon来以事务的方式达成合并小文件的目标,且不影响读取方。如在3.1.2章描述,在做compact操作时将dataChange flag设置为false,如果已经读了小文件,可以让流streaming的消费者忽略这些compaction操作生成的数据文件。通过写小文件,使下游Streaming应用更快消费到新数据(延迟低,性能略差)。而其他普通的基于老版本数据的查询仍然可以很快。

Exactly-Once 流式写入 . 写入方在日志记录对象中可以使用txn action type字段,用它来跟踪写入指定表的所有相关数据对象,从而达到“exactly-oncce”写入。总的来说,流式处理系统在写入(更新)外部存储时是需要一些机制去保证写入的幂等性的,需要幂等去避免duplicate数据写入,比如在发生写入错误后job重试时。在复写数据时,如果为每个记录设置一个唯一键(unique key),是能达到幂等效果的。或者说,将所有需要写入的记录放在一个“last version written” 事务记录里,一起成功或者一起失败。Delta Lake使用后一种模式让每个spark应用在每次事务中维护一个(appId,version)元组。比如在Spark Structured Streamingg 的Delta Lake connector中,我们用这个feature支持了所有流式计算语义的exactly-once写入(append, aggregation, upsert ).

有效的Log Tailing.  把Delta Lake表当作message queue的终极目标是要能让consumer能有效的找到“增量新写入”. 幸运的是,.json的日志存储格式是一系列按字典序自增ID的日志对象,这就让consumer的增量消费变的容易了:一个消费者可以在对象存储以last log ID为startkey,跑个简单的LIST操作,就能把增量新写入的数据对象找出来了。在日志记录内容中每一条的dataChange flag会允许流式streaming消费者决定是否跳过compact 或者重新整理过的数据,是否直接读新的数据文件(可能是新的小文件).流式计算应用也能通过自己上一次成功处理完成的last record ID点位,完成stop或者restart操作 。

将这三个feature合起来,我们发现很多用户真的可以用Delta Lake,一个搭建在对象存储上的数据格式,去实现消息队列的语义,从而完成“秒级延迟”的流式计算pipeline。而不是依赖于一个独立的消息队列服务,比如kafka。

4.4 数据布局优化

数据layout在分析类系统中对查询性能有很大的影响,特别是在很多分析query都有高度的挑剔复杂度时。因为Delta Lake支持以事务的方式去更新一张数据表,它就必须要能够支持在不影响并发的其他操作的情况下做layout的优化。比如,一个后台Daemon进程会把数据对象做compact,在这些数据对象内部重新排序,甚至去更新这些数据的统计指标、索引信息等等,且要在不影响其他客户端的前提下。我们基于事务这个优势,实现了一些数据layout优化的特性:

OPTIMIZE Command. 用户可以针对一张表手动触发OPTIMIZE命令,这个命令可以在不影响进行中(on-going)事务的前提下进行小文件的合并,同时重新计算缺少的文件统计信息。默认情况下,这个操作的目标是把文件重新规整为每个文件1GB大小,这个值是我们的经验值,当然用户可以按自定义设置这个参数。

Z-Ordering by Multiple Attributes.  很多数据集上跑的query都有很高的选择度,query会有很多条件。比如,一个网络安全的数据集,它的数据包含有网络中 (sourceIp, destIp, time) 这样一个三元组,在这三个维度(属性)都会有很多查询使用。如果使用Apache Hive的简单按照path/directory做分区的办法,只能在数据写入时按一部分维度(属性)做分区,但是要使用多个维度(属性)创建分区,分区数会暴涨,这在hive里是极力避免的。

Delta Lake支持在数据表中按照一些给定的维度(属性)去重新整理记录,使用Z-order[35]技术,在指定的多个维度(属性)上都能达到相对较高的数据本地性。在指定的多维度空间上去计算Z-order曲线还是很容易计算出来的,这个技术的目标是在“经常会涉及到多个维度的查询场景”下,都能达到较好的性能,而不是偏向于某一个维度(在Section 6 的测试中有表现)。用户可以在每张表上设置自己需要的Z-order维度集合,然后跑一个OPTIMIZE命令,就可以达到把数据按Z-order整理好的目的了,用户还可以随时调整Z-order策略。Z-order技术使用了数据统计学技术,能让查询过滤更多的数据,减少读IO开销。在最佳实践中,Z-order技术的目标就是让所有的数据对象,在用户指定的几个维度下,都有一个相对小的值范围,在查询时能保证过滤掉更多的数据对象。

图3:DESCRIBE HISTORY输出了在一张Delta标上每一次的update。

自动优化. 在Databricks的云服务上,用户可以给一张表设置AUTO OPTIMIZE 属性,从而可以自动的去compact新写入的数据对象。

总的来说,Delta Lake的设计也允许在数据表做update时,能够维护index和高计算消耗的统计信息。我们在这个点上开发了不少新feature。

4.5 缓存

很多云用户都会为不同的业务,跑不同的常驻的计算集群,有时候也会根据业务的负载动态伸缩集群规模。在这些集群中,使用本地磁盘将经常访问的数据做caching是一种加速query的很好的机会。比如,AWS i3机型为每个core提供一个237GB的NVME SSD,价格比同等的m5(general-purpose)实例贵个50%。在Databricks,我们在集群针对Delta Lake的数据搞了一层透明的cache处理,这个特性可以帮助访问数据&元数据是都提速。Caching是安全的,这是因为在Delta Lake中,data文件,log文件,checkpoint文件等等一旦写入,都是immutable不可修改的。我们在第6章会看到,使用了cache后,读性能显著的增长了。

4.6 审计日志

Delta Lake的事务日志也可以被用作审计日志,基于日志中的commitinfo记录。在Databricks,我们开发了一个“锁”机制,去防止用户在spark集群使用UDF去直接访问云对象存储,这保证了只有使用runtime引擎才能向日志记录写入commitinfo记录,从而保证了事务日志的不可变性,也就达到了可审计的目标。用户可以使用 DESCRIBE HISTORY命令去看Delta Lake表的历史版本,如Figure3图所示。在开源版本的Delta Lake中Commit information日志也是可见的。审计日志是企业级数据应用合规要求中,在数据安全要求里越来越重要的强制性要求。

4.7 Schema 演变和增强

数据源经过长时期的迭代后,通常都会有schema变更的需求,,但是这也带来了挑战,老的数据文件(old Parquet files)可能会有“过期的/错误的”schema。Delta Lake可以以事务的方式完成schema变更,如果需要甚至还可以按照最新的schema去更新底层的数据的对象(比如删除一个用户不再需要的字段)。把每次的schema变更记录保存在事务日志中并维护一个历史,可以不重写老的Paruqet数据文件(当然只能在add column 加列时)。同等重要的是,Delta的客户端要保证新写入的数据是能符合表的schema的。在有Delta Lake这种写入时check schema的机制之前,将Parquet文件写入一个directory经常会有把schema搞错的事情发生,有了这个简单的check机制就能很好的trace到问题,因为在发生schema错误时会抛出错误。

4.8 Connectors to Query and ETL Engines

Delta Lake在Spark SQL和Structured Streaming通过使用Apache Spark的data source API,提供了全能力的connector。更进一步,它目前和很多系统都提供了“只读”的集成:Apache Hive, Presto, AWS Athena, AWS Redshift, and Snowflake,用户使用这些系统都能去查询Delta table了,跑普通查询甚至用Delta table数据和其他数据源的数据做join也可以。最后,一些ETL和CDC(Change Data Capture)的工具包括Fivetran, Informatica, Qlik and Talend 都是可以写入Delta Lake的 [33, 26]。一些查询引擎的整合使用了特殊的机制,比如Hive里的symbolic links ,会生成叫symlink的manifest文件。一个symlink manifest文件本质上是一个 text file,它包含了“对象存储 or 文件系统”在对应path/directory下可见的文件列表。很多Hive兼容(Hive-compatible)的系统是能够识别这个manifest files的,通常文件叫“_symlink_format_manifest”,当去读一张表对应数据时,可以先去找目录下的这个文件,然后把文件内容里的所有paths作为本张表的数据对象清单。在Delta Lake的上下文中, manifest files的作用就是为读取方提供了一个表的静态快照(包含表的file lists)。要生成一张表的manifest files ,用户需要跑一个简单的SQL指令。然后就可以把数据作为外部表load到Presto, Athena, Redshift or Snowflake等等引擎了。在其他case里,比如Apache Hive,开源社区也有人为Hive设计了一个Delta Lake的connector。

5.DELTA LAKE USE CASES

Delta Lake目前被Databricks中几千个活跃用户所使用,每天使用它处理EB级的数据量,和开源社区里的其他组织一样。这些use cases跨越了很广阔的数据源和应用。Delta Lake的数据源包括:企业级的OLTP系统的Change Data Capture (CDC) logs, 应用logs, 时间序列data, 图数据, 为BI分析用的数据表格的聚合数据, 图片,machine learning(ML)的特征数据等等。在这些数据上跑的应用包括:SQL(最常见的应用类型), BI(business intelligence) , streaming(流计算),data science(数据科学),machine learning(机器学习) and graph analytics(图计算)。Delta Lake对大多数使用Parquet、ORC等存储格式的数据应用来说,是一个很好的补充。

在这些use cases里,我们发现用户会使用Delta Lake来简化他们的企业级数据架构,使用云对象存储,在上面搭建“lakehouse”湖仓一体系统,同时达成数据湖和事务的能力。比如,想象一个从多数据源load数据的典型数据pipeline:从OLTP数据出来的CDC logs和设备产生的sensor data,将两个数据进行一些的ETL然后产生一些服务于数仓和数据科学家的衍生数据表(图1所示)。传统的实现需要集成很多组件,比如使用message queue(Apache Kafka)去承载实时计算的结果;使用一种数据湖存储作为长期存储;再使用一种数据仓库技术(比如Redshift)来为用户提供高速的Analytical分析类查询服务,数仓引擎可能会使用索引技术和告诉的本地磁盘(SSD)。在这些系统中都需要duplicate data,等于同一份数据多了多份拷贝,另外一个挑战是在这些系统中保证数据的一致性。而有了Delta Lake之后,上述的多种存储系统都可以被简单的“单一”云对象存储所取代即可,在上面利用好Delta Lake的ACID事务能力,streaming I/O能力 和 caching能力,这样就能得到同等的性能同时剔除数据架构上的复杂度。虽然Delta Lake不能代替上述系统的所有能力,也不能在所有场景都work的非常好(比如毫秒级的实时系统),但在大多数场景还是能满足需求的。在4.8章我们也介绍了,Delta目前和其他一些查询系统也已经有了connector集成,在后续章节我们会更详细的说一些use case。

5.1 Data Engineering and ETL

很多组织都在将ETL/ELT和数据仓库搬到云上,来减轻管理维护负担,另一边,更多的组织在将自己的业务数据(OLTP系统的交易数据)和其他数据源(web访问或IOT物联网系统)打通从而给下游的其他数据应用赋能,比如机器学习应用。这些应用都需要一个可靠且容易维护的数据工程化以及/ETL能力去处理这些数据。当这些组织将工作搬到云上后,他们都倾向于使用“云对象存储”作为数据的落地存储,这能带来存储开销的缩减,然后从这些原始数据经过计算加工,将加工后数据再导入到“更优的数仓系统”(比如拥有本地SSD存储)。Delta Lake的ACID事务能力,对UPSERT/MERGE的支持,以及time travel等特性是能够让这些公司直接基于对象存储就能“架设”数据仓库的,比如提供数据仓库常见的rollback,time travel,审计日志等能力。更多的好处是,使用Delta Lake后,避免了使用多种存储,避免了复杂数据链路的维护工作。最后,Delta Lake也同时支持SQL和Spark 编程API去写程序,让创建data pipeline更容易了。我们看到,在跨越不同行业比如(金融服务业、healthcare以及media行业)时,数据处理或者ML机器学习类的工作都在技术上都是差不多的,一旦这些公司的最基本的ETL pipeline和数据完成后,这些组织还可以进一步使用这些数据去充分挖掘价值(比如使用PySpark写一些数据科学分析)。可以在云上再开一个独立的计算集群即可,新集群也可以访问底层同一份数据,底层的基于Delta Lake的存储是共享的。还有一些组织将一部分的pipeline改为流式query(使用Spark Structured Streaming的Streaming SQL)。这些都可以通过新的云虚机(VM)来简单的跑起来,同时访问相同的底层数据。

5.2 Data Warehousing and BI

传统的数据仓库系统会使用有效的工具将ETL/ELT的功能结9合起来,来满足交互式的查询能力

比如BI(business intelligence)。支持这些需求的核心技术能力就是使用高效的存储格式(列式存储格式),数据的访问优化比如clustering和indexing,更快的存储介质,和更可靠的查询引擎。Delta Lake能够依托云对象存储直接支撑所有的这些特性,比如列式存储、数据layout优化、min-max统计、SSD caching,所有这些都是依托了它基于事务的ACID设计。之后,为我们还发现很多Delta Lake的用户会基于他们的LakeHouse数据集去跑adhoc query和BI需求,有的直接跑SQL,也有的使用Tableau这样的BI软件。基于这些use case都是常见需求,DataBricks开发了一个新的向量化的专门为BI需求服务的执行引擎,就好像对Spark runtime的优化一样。像其他ETL的case一样,BI直接查询Delta Lake的好处是能给分析师提供更新鲜的新数据,因为数据不再需要被load到另外一个独立的数据仓库系统了。

5.3 合规 & 重新生成数据

传统数据湖存储格式设计初就是为了不可变数据的,但现在越来越多的国家对数据有了合规要求,比如欧盟的GDPR[27],结合业界的最佳实战来看,对企业而言,需要大家有有效的方法去delete或者correct个人用户的隐私数据。我们看到不少组织将云上的数据集转向使用Delta Lake,就是为了使用它的高效 UPSERT,MERGE 和 DELETE 能力。用户还可以使用审计日志(section4.6)功能去做数据治理。Delta Lake的time travel能力对于需要重新使用老数据的数据科学分析和机器学习场景也非常有用。我们把MLflow和Delta Lake做了整合,MLflow是一个开源的模型管理平台,也是Databricks主导的,它能够自动的记录哪个模型使用了哪个版本的数据集进行了什么训练,这样能够方便开发人员重新跑过去的训练。

5.4 Specialized Use Cases

5.4.1 Computer System Event Data

我们见过的使用Delta Lake的一个最大的单场景需求是“安全信息事件管理平台(SIEM)”,来自一家大型的科技公司。这家公司将一大堆的计算机系统事件记录了下来,包含:TCP和UPD的网络流,认证请求,SSH登陆日志,等等,把这些数据都导入到一张大的Delta Lake表中,数据量有PB级。很多的ETL、SQL、图分析作业 以及 机器学习任务都会使用这个数据源,按照一定的已知行为pattern去搜索一些入侵的证据(比如,怀疑一个用户的登陆事件,或者有人从一些服务器上导出了大量的数据)。这些任务中很多是流式计算任务,都是为了尽可能缩小发现问题的时间。更多的是,超过100个分析师会查询这张表数据,直接用这张Delta Lake table去调查怀疑的告警,或者去设计新的自动化监控任务。这个信息安全的case真的很有意思,因为它很容易就自动的收集了大规模的数据(每天上百TB数据),因为这些数据是需要保留很久的,它将被用来作为法庭上分析新发现的入侵方式(有时在事实发生几个月后才定义出来),因为这些数据需要按很多维度被查询。比如,如果一个分析师发现了某个服务器曾经被攻破过,她可能需要去查查在网络里从这个sourceIP地址出去的数据(看看哪些机器可能从这里被攻击了),以这台机器为destination 的IP地址(看看攻击是从哪些源头来到这台机器),需要按时间,按其他一些维度(比如,攻击者拿到的员工的access token)。为PB级的数据集维护重量级的索引结构会是一件很重的事情,所以这个组织使用了Delta Lake的ZORDER BY特性去重新组织Parquet数据对象,从而提供跨越多个维度的聚类。因为法律要求类的查询伴随的这些维度都会经常组合出现(比如,在百万级别数据中找寻1个IP address),Z-ordering 和Delta Lake本身的min/max统计合在一起,能显著的降低每个query要读取的数据对象个数。Delta Lake的 AUTO OPTIMIZE功能, time travel 和 ACID transactions也在保证数据准确性,在百级别工程师协同访问数据等方面,发挥了重要作用。

5.4.2 Bioinformatics

生物信息是另一个我们发现Delta Lake被重度使用的领域,它被用来管理机器产生的数据。这里有很多数据源,包括DNA序列,RNA序列,电子医疗记录,还有医学设备的时间序列数据,这些数据让生物医药公司能够收集到关于病人和疾病更细节的信息。这些数据源一般会用来和公共数据集做join,比如和UI Biobank[44],他拥有序列信息和500,000个体的医疗记录。虽然传统的生物信息工具也使用过定制的数据格式,比如SAM,BAM ,VCF[34, 24],很多组织现在都开始将数据使用数据湖存储格式比如Parquet。大数据基因组学项目[37] 先行使用了这个方法。Delta Lake更进一步的加强了生物信息的工作能力,通过帮助开启全多维分析查询(使用Z-ordering),ACID事务,和高效的UPSERT 和 MERGE。在一些case里,使用这些特性和直接使用Parquet相比快了100x倍。在2019年,Databricks和Regeneron 发布了Glow[28],一个开源的基因组学数据工具集,它使用Delta Lake作为存储。

5.4.3 Media Datasets for Machine Learning

另一个我们看到很令人惊喜的应用是使用Delta Lake去管理多媒体数据集,比如从website上传的一批图片,用作后续的machine learning。虽然图片和其他媒体文件已经用高效的二进制格式编码好了,管理好这些百万级的对象,在对象存储中也是很有挑战的,因为每个对象只有区区几个kb大小。对象存储的LIST操作会跑上几分钟,很难并发的快速读到足够的对象,然后喂给基于GPU上跑的机器学习任务。我们看到很多组织也将这种媒体文件以二进制的记录存储在Delta table里,然后使用Delta做高速的推理查询,流式处理,和ACID事务。比如,头部的电子商务公司以及旅游公司就使用这种办法去管理用户上传的百万级别的图片。

6. 性能实验

在这一章,我们通过一些性能实验来表现Delta Lake的特性。我们首先,(1) 分析有很大规模数量对象 or 分区的开源大数据系统的问题,带着问题去看Delta Lake使用中心化的checkpoint去做metadata和统计信息的技术设计。(2)再分析在一张大表中,当查询条件多样化时 Z-ordering的性能问题。

最后我们还把Delta 的性能和 原生Parquet在TPC-DS数据集上做了对比,在写入场景并没有发现有明显的overhead增加。

6.1 多对象or分区的影响

Delta Lake的很多设计初心都是为了解决云对象存储的 listing和reading 对象的高延迟。这个延迟会让加载一张几千个数据文件的表 or 创建一个Hive风格的有几千个partition的表 变的很重。小文件常常会给HDFS造成问题,但是在性能这块HDFS还是要好过云对象存储的。

图4: 在查询有大量分区时不同系统的性能表现。未使用Delta的系统查询1million分区时太慢了,结果就没有列出来。

图5:在拥有100个对象的表中,使用4个字段“全局排序”或“zorder”能过滤掉的Parquet文件比例

去评估海量数据对象的影响,我们使用Databricks服务,在AWS创建了16-node AWS clusters of i3.2xlarge VMs (where each VM has 8 vCPUs, 61 GB RAM and 1.9 TB SSD storage)  ,还有托管的Apache Hive和Presto。然后我们创建一张33,000,000行的数据表,但是给他分配1000到1,000,000个分区,用这种方式去衡量大规模分区时metadata的overhead,然后在这些记录上跑一个简单的sum query。我们使用Databricks Runtime提供的Spark集群去跑,也用其他的引擎比如Hive 和 Presto ,另一边底层的存储格式同时对比Delta Lake和原生Parquet。

如图4所示,使用Delta Lake的Databricks Runtime组合在性能上有显著的性能优势,即使在没有SSD做cache的情况下。Hive使用了近一个小时才能找到1张表里的10,000个分区,1万个分区是个挺正常的数量,在给一张hive表按时间(天)分区的同时只要再来一个其他分区键就很容易达到这个规模了。Presto在读100,000分区是用了1个小时还要多。而Databricks Runtime引擎用了差不多450秒完成了100,000分区的listing操作,这很大程度上是因为我们优化了基于对象存储的LIST请求,我们把它并行化了,用spark cluster分布式的执行。

但是,在1个million分区这个量级,Delta Lake使用了108秒,如果使用了cache on SSD去把日志记录log做cache,可以把时间压缩到只要17秒。百万级别的hive分区看上去好像不现实,但在真实世界里,PB级别的表里真的有甚至上亿级别的数据对象的,在这种数据集上跑LIST操作是很重很重的。

6.2 Impact of Z-Ordering

要解析Z-Ordering,我们就要评估在访问一张表的数据时能跳过的数据百分比,我们针对这一指标,在使用Z-ordering 和 在数据表只根据一列做partition 或者 sort 来做对比。我们先根据Section5.4.1章节的use case里,基于信息安全数据集的灵感生成一份数据,有4个fields:sourceIP,sourcePort,destIP和destPort ,这几个维度能代表一个网络流量。我们选择32-bit的IP地址和16-bit的端口统一进行随机生成,然后我们把这张数据表存储为100个Parquet对象。然后,我们根据一些query去看能跳过多少对象,这些query的条件中包含一些维度(比如 SELECT SUM(col) WHERE sourceIP = "127.0.0.1")。

图5展示了结果,使用了(1)一个全局排序(特别的,按这个顺序 sourceIP, sourcePort, destIP and destPort ) 和 (2)使用这4个field做Z-ordering。在全局排序下,按照source IP搜索结果是能有效的跳过很多数据的,因为可以使用Parquet对象中source IP列的 min/max统计信息(大多数query只需要读这100个Paruqet对象中的1个),但按其他filed查询是则就没有效率了,因为每个文件都包含了很多记录,这些文件里关于其他列的min/max值范围太大了,甚至逼近整张表的min/max,达不到很好的过滤效果。相反,使用这4列做一个Z-ordering,不论按哪个field去查询,都能至少过滤掉43%的Parquet数据对象,平均能过滤率能达到54%(和简单的排序比平局是25%)。

如果有些表的数据对象比1000个更多,那Z-order就能带来更大的整体提升了。比如,在一个500TB的网络流量数据集上要按多个维度查询,按上述办法做Z-ordering后,我们在查询时能过滤掉数据表里的93%的数据对象。

图6 :使用不同查询引擎 +底层存储格式,在TPC-DS压力测试下的表现

6.3 TPC-DS 性能测试

要给Delta Lake在DBMS关系模型来一轮全面的性能基准测试,我们使用了TPC-DS数据集,在Databricks Runtime上来跑(分别以Delta Lake格式和原生Parquet格式存储),然后再交叉使用原生Spark和Presto来测试。每对组合都使用1个master和8个worker,跑在( i3.2xlarge AWS VMs, which have 8 vCPUs )机器上。我们使用TPC-DS在S3存储生成1TB的标准数据集,事实表按代理日key日期列做分区。图6展示了在不同配置下3种组合的平均耗时。我们看到Databricks Runtime和Delta Lake的组合性能是最好的。。(此处有benchmarket 的嫌疑)在这个实验中,Delta Lake在处理大规模分区的优势没有很明显的体现,这是因为整张表太小了,Delta Lake确实也在原生Parquet上做了一些加速,主要是针对benchmark中的大query做加速。Databricks Runtime的执行计划优化会比第三方的Spark服务做的更好一些(大多数基于Apache Spark2.4).

6.4 写入性能

我们同样使用Delta Lake和原生Parquet这两种格式,通过分别加载一个大的数据集,来对比Delta的统计信息是否显著的增加了写入的开销。图7分别展示了几种组合加载数据的Load时间。(Databricks, Delta Databricks, Parquet 3rd-Party Spark , 数据集为400 GB TPC-DS store_sales table),硬件资源为(one i3.2xlarge master and eight i3.2xlarge workers )。可以看到spark在写入Delta Lake的速度和Parquet没有差太多,这表明统计信息的收集并不会在写入数据时增加明显的overhead。

(评:在较大的数据集上,overhead相对小,但是在小数据集,overhead就相对大了。overhead是根据query的基准SLA的一个相对值)

7. 探讨 & 局限

从我们的经验看,Delta Lake展现出他能依托云对象存储来实现企业级数据处理所要求的ACID事务能力,能支持大规模的流式处理,批处理,和交互式查询工作负载。Delta Lake的设计是很有吸引力的,因为他在使用云存储时并不需要一个很重的中间件层服务,这也让他能够很容易被一些支持Parquet的查询引擎所直接使用。Delta Lake的ACID能力带来了很强大的管理能力和性能提升,但说实话,目前的设计还是有很多局限的,这也是未来工作的方向。

首先,Delta Lake目前只支持单表的序列化级别的事务,因为每张表都有它自己的事务日志。如果有跨表的事务日志将能打破这个局限,但这可能会显著的增加并发乐观锁的竞争(在给日志记录文件做append时)。在高TPS的事务场景下,一个coordinator是可以承接事务log写入的,这样能解决事务直接读写对象存储。

然后,在流式工作负载下,Delta Lake受限于云对象存储的latency。比如,使用对象存储的API很难达到ms级的流式延迟要求。另外一边看,我们发现大企业的用户一般都跑并行的任务,在使用Delta Lake去提供秒级的服务延迟在大多数场景下也是能够接受的。

第三,Delta Lake目前不支持二级索引(只有数据对象级别的min/max统计),我们已经开始着手开发一个基于Bloom filter的index了。Delta 的ACID事务能力,允许我们以事务的方式更新这些索引。

8. 相关工作

很多学术界的研究和工业界的项目都在思考使用云环境去做数据管理系统。比如,Branter,在基于S3开发OLTP数据库;在最终一致性的KVstore上去实现因果一致性;AWS Aruora 是一个商业的OLTP DBMS系统,它拥有存储计算分离的架构;Google BigQuery ,AWS Redshift Spectrum [39] 和 Snowflake [23] 都是OLAP的DBMS系统,他们都做到了存储计算分离,单独的计算集群都可以访问共享的云对象存储上的数据。其他一些项目,在考虑怎样自动的让DBMS引擎适应弹性、多租户的工作负载。

Delta Lake参考了这些工作的vision,借助了广泛的云基础设施,但是有一些不同的目标。特别的看,很多云上的DBMS系统需要一个中间件层服务去桥接client端和storage层。(比如Aurora 和 Redshift都有一个frontend server来处理client端连接),这种方式增加了运维的负担(这个frontend节点需要一直保持running),需要考虑扩展性、可用性、在大规模写入数据时可能会造成问题。相反,Delta Lake允许多客户端仅仅依靠云对象存储就能独立的协调工作,不需要再依赖一个单独的服务了(当然在3.2.2章说到,在使用AWS S3时会依赖一个轻量级的日志记录存储服务),这种设计解放了用户的运维压力,同时还保证了弹性扩容读/写的能力。还有,这套架构的HA能力是和底层云对象存储的可用性相同的,在发生灾难事故时,没有什么组件需要重启 or 特殊对待。当然,这个设计能这么从容灵活,也是因为Delta Lake的目标场景天然特性:是OLAP场景,TPS很低频,但事务涉及的数据量很大,因此很适合这种乐观锁的设计。

最接近Delta Lake设计和初衷的系统是 Apache Hudi[8] 和 Apache Iceberg[10],这两者都定义了数据格式,也都基于云对象存储实现了事务语义。这些系统没有能提供Delta Lake的所有能力,比如,其他两个系统都没有提供数据layout优化的特性(Z-Order),也没有提供把数据湖表当作streaming input 源的能力(Delta Lake的日志记录),也没有基于本地SSD做caching的Databricks runtime服务。

还有,Hudi同时只能有一个write(类似悲观锁)。

这些项目都和现在popular的计算引擎有结合,比如Spark 和 Presto,但都缺乏和商业数据仓库组件的connector(Redshift 和 Snowflake)【这个点应该动态发展的去看,有失偏颇】,而在Delta 我们实现了Manifest file以及一些商用的ETL tools。

Apache Hive ACID[32] 也基于“对象存储”/“分布式文件系统”实现了事务能力,但它要依靠Hive metastore区去track每张表的状态。这会在有几百万分区的时候成为瓶颈(把底层mysql替换成兼容mysql协议的分布式NewSQL即可,比如tidb),也增加了用户的运维负担。Hive ACID 也没法做到 time travel。低延迟的基于HDFS的存储、比如HBase、Kudu,都可以在把数据写入HDFS前将很多small write做合并,但都需要一层独立的分布式文件系统或分布式服务层。在合并高性能的OLTP和OLAP负载之间还是有一条很长的距离的,这个领域也有被称为HTAP系统。这些提供通常会有一个单独的为OLTP优化的写入存储,然后有一个为OLAP优化的长期存储。在我们的实际工作中,我们非常想基于对象存储开发一个支持高TPS的并发协议,但不使用一个独立的外部存储系统。

9.结论

我们已经介绍了Delta Lake,一个在云对象存储上搭建的ACID的数据表存储层服务,它给数据仓库带来了很多DBMS-like系统的性能和管理数据的feature,但并没有带来太多overhead。

它只是一种存储格式+一些客户端访问协议,这简化了维护成本,天生的highly available,也让客户端可以以 直接、高带宽的方式访问云对象存储。目前Delta Lake已经被几千家公司所使用,每天处理EB级(成千上万PB)的数据,被用来取代更复杂的基于很多数据系统柔和而成的复杂架构数仓。

最后,Delta Lake是开源的,基于Apache 2 license at  https://delta.io.

本文翻译自:https://databricks.com/wp-content/uploads/2020/08/p975-armbrust.pdf

updatedupdated2024-08-252024-08-25