SPARK-25299: 改进Spark Shuffle可靠性
背景动机
在分布式计算中,shuffle表示多个不同计算单元之间的数据交换。spark用shuffle来表示不同executor之间的数据重组,最常见于RDD的重新分区。合并和聚合类的RDD 需要通过shuffle来重建新分区,相同分区键值的数据集需要重新定位到同一个executor中。
已经有很多工作来改善shuffle的性能和可靠性。Spark Shuffle一个重要的行为就是Spark shuffle时将读写大量的磁盘文件,同时维护一份这些文件在不同executor 节点上的记录。在Spark 1.2 中,开发者实现了external shuffle service。 该组件将shuffle 分区文件的 位置,文件块映射及偏移保存在一个文件中。这样,当executor失效时,shuffle service还能继续提供shuffle服务,而不像之前,如果executor失败,则需要重新计算。
External Shuffle Service存在的问题
当前external shuffle service通过远程shuffle服务来读取不同节点上的数据,而executor进程仍然将shuffle数据写入本地存储。这种架构存在一些不足,尤其在容器环境如K8s和Docker。下图
当前实现存在以下问题:
缺乏隔离机制
扩展问题
设计目标
方案讨论
方案1:通过Shuffle Service 上传Shuffle 数据
shuffle service作为文件服务器接收shuffle 写入数据流。
优点:
- 后端shuffle文件流程(indexing、caching等)对应用完全透明,应用层无需其他配置;
缺点:
- 每次shuffle文件需要写3次:executor-> ess proxy, ess proxy -> shuffle service host, shuffle service host -> back storage
方案2:shuffle service提供远程文件URI
executor使用远程文件系统代替本地文件系统存储这些shuffle文件,shuffle service充当数据库维护shuffle文件路径。
优点
相比方案一,只有元数据需要跨多路存储;
相比方案1更少的改动;
缺点
spark 应用配置增加后端存储相关配置;
所有应用能直接访问后端存储;
方案3:使用分布式存储
应用程序本身可以维护有关该应用程序的shuffle文件的所有信息,因此完全不需要外部shuffle服务。该实现大致如下: 应用程序本身会跟踪分区ID到文件位置+文件内偏移量之间的映射。 所有文件位置都在某个远程文件存储层中。 执行程序直接针对后端存储系统打开和关闭流以shuffle文件。
在此解决方案中,我们删除了外部随机播放服务,因此剩下的唯一组件是随机播放数据存储系统和Spark应用程序本身。 Spark应用程序可以隔离,而存储系统可以与访问其数据的计算作业分离。此解决方案解决了隔离方面描述的问题。
由于我们没有外部随机播放服务,因此该解决方案的可扩展性通常会落在Spark随机播放操作的可扩展性以及随机数据存储系统的可扩展性上。后者是一个众所周知的领域,有大量现有工作,而前者可能会有所改进,但需要单独努力。
执行程序可以自由关闭,并且写入的随机数据仍然可以通过持久后备存储使用。我们不再需要担心单独系统的正常运行时间。因此,该解决方案解决了围绕可靠性,停机时间和工作浪费而描述的问题。
在容器中运行的Spark应用程序可以访问远程存储系统以进行随机读取和写入。该解决方案与容器化的运行时兼容。
只要在分布式文件系统中的各个节点之间复制混洗块,该系统中就不应存在单个瓶颈。我们没有一个单一的组件可以一次加载所有索引文件-所有执行程序都将导出需要以分布式方式获取的元数据和数据。
实现细节:
优点
- 架构及实现简单,无须独立的shuffle service;
缺点
清理数据是不可靠的。假设Spark应用程序收到kill -9信号。在这种情况下,Spark应用程序无法清理自己的随机播放文件。没有其他组件知道此类文件的存在和生命周期。因此,文件可能会无限期地保留在那里。
有效读取索引文件比较困难。以前,如果两个不同的执行者向shuffle服务请求相同的索引文件,则shuffle服务可以在两次调用之间缓存索引文件。但是,现在混洗元数据必须从后备存储中读取两次-每个执行者一次。索引文件的跨执行者缓存变得更加困难。
- 支持本地缓存的分布式存储有所缓解。
方案4:备份数据至分布式文件系统
相比方案3直接将数据写入远端存储,该方案先将数据写入到本地存储,后异步将数据上传至远端分布式存储系统中。
优点
- executor不崩溃时拥有更少的网络hops;