数据湖(Delta Lake)

数据湖(Delta Lake)

简介

  • Data lake这个术语由Pentaho公司创始人兼CTO詹姆斯·狄克逊(James Dixon)发明,他对数据湖的解释是: 把你以前在磁带上拥有的东西倒入到数据湖,然后开始探索这些数据

  • 数据湖的想法是将企业中的所有数据(从原始数据开始保存,这意味着源系统数据的精确副本)保存于同一个存储介质中,以用于各种任务(包括报告,可视化,分析和机器学习)。

  • 数据湖创建了一个适用于所有格式数据的集中式数据存储,包括:关系数据库的数据(行和列),半结构化数据(CSV,日志,XML,JSON),非结构化数据(电子邮件,文档,PDF)甚至二进制数据(图像,音频,视频)

  • 在数据湖方案里,数据移动和管理的工具非常重要。

数据湖与数据仓库、数据集市的区别

  • 数据库:侧重事务处理能力,OLTP,关注:QPS, TPS, IOPS这些指标;

  • 数据仓库:侧重分析能力,数据入库时有一定的scheme,OLAP;

  • 数据湖:入库时没有scheme,

  • 湖仓一体:

主要功能

  • ACID 事务:Delta Lake 提供多次写入之间的 ACID 事务。每次写入都是一个事务,并且在事务日志中顺序记录了写入的序列。事务日志跟踪文件级别的写入并使用乐观并发控制,这非常适合数据湖泊,因为不会经常发生多个写入尝试修改相同的文件的情况。在存在冲突的情况下,Delta Lake 会抛出并发修改异常以遍用户处理并重试其作业。 Delta Lake 还提供强大的可序列化隔离级别,允许工程师持续写入一个目录或表,并允许消费者持续从同一目录或表中进行读取。读取者将会看到开始读取时存在的最新快照。
  • Schema 管理:Delta Lake 自动验证正在写入的 DataFrame 的 Schema 是否与表的 Schema 兼容。表中存在但不在 DataFrame 中的列设置为 null。如果 DataFrame 中有 Schema 中没有定义的列,则此操作会引发异常。 Delta Lake 具有显式添加新列的 DDL 以及自动更新 Schema 的能力。
  • 可扩展的元数据处理:Delta Lake 将表或目录的元数据信息存储在事务日志中,而不是 Metastore 中,这允许Delta Lake在恒定时间内列出大型目录中的文件,并在读取数据时更高效。
  • 数据版本控制和时间旅行:Delta Lake 允许用户读取表或目录的历史快照。在写入或者修改文件时,Delta Lake会创建较新版本的文件并保留旧版本。当用户想要读取旧版本的表或目录时,他们可以为 Apache Spark 的读取 API 提供时间戳或版本号,Delta Lake 根据事务日志中的信息构建该时间戳或版本的完整快照。这允许用户重现实验和报告,并在需要时将表还原为旧版本。
  • 统一批处理和流式处理:除了批量写入,Delta Lake 还可以用作 Apache Spark Structured Streaming 的高效流式接收器。结合 ACID 事务和可扩展的元数据处理,高效的流式传输接收器现在可以实现大量近实时分析用例,而无需维护复杂的流式传输和批处理管道。
  • 记录更新和删除(即将发布):Delta Lake 提供合并、更新和删除这些 DML 命令。这允许工程师很方便的对数据进行修改和删除,简化数据变更捕获以及 GDPR。因为 Delta Lake 按文件粒度跟踪数据变更,所以比一次处理所有分区或表要高效的多。
  • 数据期望(即将推出):Delta Lake 还将支持新的 API 来设置表或目录的数据期望。工程师将能够指定布尔条件并调整严重程度以处理数据期望。当 Apache Spark 作业写入表或目录时,Delta Lake 将自动验证记录,当存在违规时,它将根据提供的严重程度配置对记录进行处理。

deltalake

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
//# spark version: 2.4.6
//# jdk: 1.8.0_172
//# scala: 2.11
//$ cd $SPARK_HOME
//$ bin/spark-shell --jars ./jars/delta-core_2.11-0.6.0.jar 

val data = spark.range(0, 5)
data.write.format("delta").save("file:///mnt/cfs/spark/delta-table")

val data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save("file:///mnt/cfs/spark/delta-table")

//du数据
val df = spark.read.format("delta").load("/tmp/delta-table")
df.show()

//
val df = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta-table")
df.show()

参考

  1. https://help.aliyun.com/document_detail/148369.html?spm=a2c6h.12873639.0.0.43197803PJYFDj

updatedupdated2024-12-152024-12-15