Hadoop-aws

Hadoop-aws

简介

Hadoop S3A client

Hadoop ”S3A“ 客户端提供Amazon S3对象存储的高性能存储兼容实现。

  • 直接读写S3对象;

  • 兼容标准S3客户端;

  • 兼容老版的s3n://和Amazon EMR的s3://客户端文件创建;

  • 支持大文件(GB)分块上传;

  • 提供列式数据文件(如Apache ORCApache Parquet)高性能随机IO模型;

  • 使用Amazon Java S3 SDK,支持最新S3特性和认证规范;

  • 支持如下认证格式:环境变量、Hadoop配置属性、Hadoop key manager 和IAM 角色;

  • 支持单独bucket配置;

  • 结合S3Guard, 提供高性能一致性的metadata目录读操作,兼顾速度和一致性;

  • 支持S3服务端读写加密,包括SSE-S3SSE-KMSSSE-C;

  • 集成Hadoop 指标;

  • 完全由开源组织维护;

其他S3 connectors

如下有一些其他的Hadoop S3 connector,现在只有S3A由hadoop 项目有效维护。

  • Apache Hadoop 原有s3:// client. 不再包含咋hadoop中;

  • Amazon EMR的s3://client。由Amazon EMR项目组维护;

  • Apache Hadoop s3n://client。不再有效,可迁移至最新的s3a://;

起步

S3A依赖两个JAR包,和hadoop-common一致:

  • hadoop-aws

  • aws-java-sdk-bundle

其中hadoop-commonhadoop-aws版本必须一致;

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
<properties>
 <!-- Your exact Hadoop version here-->
  <hadoop.version>3.0.0</hadoop.version>
</properties>

<dependencies>
  <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>${hadoop.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-aws</artifactId>
    <version>${hadoop.version}</version>
  </dependency>
</dependencies>

注意事项

Amazon S3昨晚对象存储,为了实现可扩展、便宜、高可靠等特性,像其他的云对象存储一样,放松了对经典POSIX文件系统的一些约束。

S3Guard尝试为S3部分实现这些缺失的特性。

事项1:S3一致性模型

  • Hadoop Filesystem API创建的最新文件不会立即可访问;

  • 文件deteleupdate操作不会立即同步,文件旧的副本可能存在不定时间;

  • 目录的deleterename操作通过文件操作递归实现。这些操作最小耗时和文件数相关,在这些操作中部分更新可能还可访问。如果操作被中的,则文件系统保持在中间状态;

事项2:目录虚拟

S3A客户端的目录是虚拟的:

  • 执行mkdirs调用时,先创建一个stub entry,当向目录增加一个文件时删除该entry;

  • list目录时,查找所有以该目录开头的路径作为列表返回;

  • rename单个目录时,先通过list获取相关对象,并通过s3将对象复制到新的目标对象中;

  • delete单个目录时,先list后,批量删除所有相关对象;

  • renamedelete多个目录时,list所有相关目录对象后,对每个对象单独进行相关操作;

虚拟目录带来的一些后果:

  • 目录可能缺少修改时间。部分依赖的Hadoop操作可能会引发不可预料行为;

  • 目录list操作很慢。尽量使用listFiles(path, recursive)进行高效的递归list;

  • 如果调用者尝试,可能在文件下创建文件;

  • 目录rename操作和目录下的文件个数和大小相关。

  • 目录rename操作不是原子的。可能部分失效,调用者无法安全

  • 目录delete操作不是原子的,可能部分失效;

事项3:对象存储不同的认证模型

S3对象认证模型和HDFS及传统文件系统的认证模型有很大不同,S3Aclient在API查询metadata时简单汇报一些相关信息:

  • 文件owner是当前用户;

  • 文件group也为当前用户(hadoop 2.8.0之前为空)

  • 目录权限为777;

  • 文件权限为666;

事项4:AWS credentials 很重要

AWS credentials不仅是服务的支付凭证,也提供数据的读写访问权限,任何拥有credentials的用户将不仅能读取数据,而且也能删除数据。

避免通过以下方式在无意中泄露credentials:

  • SCM 配置文件中要包含secrets;

  • 在终端中记录;

  • bug 包括中包括secret;

  • AWS_开头的环境变量;

保护AWS凭证

S3A 客户端配置

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
<property>
  <name>fs.s3a.connection.maximum</name>
  <value>15</value>
  <description>Controls the maximum number of simultaneous connections to S3.</description>
</property>

<property>
  <name>fs.s3a.connection.ssl.enabled</name>
  <value>true</value>
  <description>Enables or disables SSL connections to S3.</description>
</property>

<property>
  <name>fs.s3a.endpoint</name>
  <description>AWS S3 endpoint to connect to. An up-to-date list is
    provided in the AWS Documentation: regions and endpoints. Without this
    property, the standard region (s3.amazonaws.com) is assumed.
  </description>
</property>

<property>
  <name>fs.s3a.path.style.access</name>
  <value>false</value>
  <description>Enable S3 path style access ie disabling the default virtual hosting behaviour.
    Useful for S3A-compliant storage providers as it removes the need to set up DNS for virtual hosting.
  </description>
</property>

<property>
  <name>fs.s3a.proxy.host</name>
  <description>Hostname of the (optional) proxy server for S3 connections.</description>
</property>

<property>
  <name>fs.s3a.proxy.port</name>
  <description>Proxy server port. If this property is not set
    but fs.s3a.proxy.host is, port 80 or 443 is assumed (consistent with
    the value of fs.s3a.connection.ssl.enabled).</description>
</property>

<property>
  <name>fs.s3a.proxy.username</name>
  <description>Username for authenticating with proxy server.</description>
</property>

<property>
  <name>fs.s3a.proxy.password</name>
  <description>Password for authenticating with proxy server.</description>
</property>

<property>
  <name>fs.s3a.proxy.domain</name>
  <description>Domain for authenticating with proxy server.</description>
</property>

<property>
  <name>fs.s3a.proxy.workstation</name>
  <description>Workstation for authenticating with proxy server.</description>
</property>

<property>
  <name>fs.s3a.attempts.maximum</name>
  <value>20</value>
  <description>How many times we should retry commands on transient errors.</description>
</property>

<property>
  <name>fs.s3a.connection.establish.timeout</name>
  <value>5000</value>
  <description>Socket connection setup timeout in milliseconds.</description>
</property>

<property>
  <name>fs.s3a.connection.timeout</name>
  <value>200000</value>
  <description>Socket connection timeout in milliseconds.</description>
</property>

<property>
  <name>fs.s3a.paging.maximum</name>
  <value>5000</value>
  <description>How many keys to request from S3 when doing
     directory listings at a time.</description>
</property>

<property>
  <name>fs.s3a.threads.max</name>
  <value>10</value>
  <description> Maximum number of concurrent active (part)uploads,
  which each use a thread from the threadpool.</description>
</property>

<property>
  <name>fs.s3a.socket.send.buffer</name>
  <value>8192</value>
  <description>Socket send buffer hint to amazon connector. Represented in bytes.</description>
</property>

<property>
  <name>fs.s3a.socket.recv.buffer</name>
  <value>8192</value>
  <description>Socket receive buffer hint to amazon connector. Represented in bytes.</description>
</property>

<property>
  <name>fs.s3a.threads.keepalivetime</name>
  <value>60</value>
  <description>Number of seconds a thread can be idle before being
    terminated.</description>
</property>

<property>
  <name>fs.s3a.max.total.tasks</name>
  <value>5</value>
  <description>Number of (part)uploads allowed to the queue before
  blocking additional uploads.</description>
</property>

<property>
  <name>fs.s3a.multipart.size</name>
  <value>100M</value>
  <description>How big (in bytes) to split upload or copy operations up into.
    A suffix from the set {K,M,G,T,P} may be used to scale the numeric value.
  </description>
</property>

<property>
  <name>fs.s3a.multipart.threshold</name>
  <value>2147483647</value>
  <description>How big (in bytes) to split upload or copy operations up into.
    This also controls the partition size in renamed files, as rename() involves
    copying the source file(s).
    A suffix from the set {K,M,G,T,P} may be used to scale the numeric value.
  </description>
</property>

<property>
  <name>fs.s3a.multiobjectdelete.enable</name>
  <value>true</value>
  <description>When enabled, multiple single-object delete requests are replaced by
    a single 'delete multiple objects'-request, reducing the number of requests.
    Beware: legacy S3-compatible object stores might not support this request.
  </description>
</property>

<property>
  <name>fs.s3a.acl.default</name>
  <description>Set a canned ACL for newly created and copied objects. Value may be Private,
    PublicRead, PublicReadWrite, AuthenticatedRead, LogDeliveryWrite, BucketOwnerRead,
    or BucketOwnerFullControl.</description>
</property>

<property>
  <name>fs.s3a.multipart.purge</name>
  <value>false</value>
  <description>True if you want to purge existing multipart uploads that may not have been
     completed/aborted correctly</description>
</property>

<property>
  <name>fs.s3a.multipart.purge.age</name>
  <value>86400</value>
  <description>Minimum age in seconds of multipart uploads to purge</description>
</property>

<property>
  <name>fs.s3a.signing-algorithm</name>
  <description>Override the default signing algorithm so legacy
    implementations can still be used</description>
</property>

<property>
  <name>fs.s3a.server-side-encryption-algorithm</name>
  <description>Specify a server-side encryption algorithm for s3a: file system.
    Unset by default. It supports the following values: 'AES256' (for SSE-S3), 'SSE-KMS'
     and 'SSE-C'
  </description>
</property>

<property>
    <name>fs.s3a.server-side-encryption.key</name>
    <description>Specific encryption key to use if fs.s3a.server-side-encryption-algorithm
    has been set to 'SSE-KMS' or 'SSE-C'. In the case of SSE-C, the value of this property
    should be the Base64 encoded key. If you are using SSE-KMS and leave this property empty,
    you'll be using your default's S3 KMS key, otherwise you should set this property to
    the specific KMS key id.</description>
</property>

<property>
  <name>fs.s3a.buffer.dir</name>
  <value>${hadoop.tmp.dir}/s3a</value>
  <description>Comma separated list of directories that will be used to buffer file
    uploads to.</description>
</property>

<property>
  <name>fs.s3a.block.size</name>
  <value>32M</value>
  <description>Block size to use when reading files using s3a: file system.
  </description>
</property>

<property>
  <name>fs.s3a.user.agent.prefix</name>
  <value></value>
  <description>
    Sets a custom value that will be prepended to the User-Agent header sent in
    HTTP requests to the S3 back-end by S3AFileSystem.  The User-Agent header
    always includes the Hadoop version number followed by a string generated by
    the AWS SDK.  An example is "User-Agent: Hadoop 2.8.0, aws-sdk-java/1.10.6".
    If this optional property is set, then its value is prepended to create a
    customized User-Agent.  For example, if this configuration property was set
    to "MyApp", then an example of the resulting User-Agent would be
    "User-Agent: MyApp, Hadoop 2.8.0, aws-sdk-java/1.10.6".
  </description>
</property>

<property>
  <name>fs.s3a.impl</name>
  <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
  <description>The implementation class of the S3A Filesystem</description>
</property>

<property>
  <name>fs.AbstractFileSystem.s3a.impl</name>
  <value>org.apache.hadoop.fs.s3a.S3A</value>
  <description>The implementation class of the S3A AbstractFileSystem.</description>
</property>

<property>
  <name>fs.s3a.readahead.range</name>
  <value>64K</value>
  <description>Bytes to read ahead during a seek() before closing and
  re-opening the S3 HTTP connection. This option will be overridden if
  any call to setReadahead() is made to an open stream.</description>
</property>

<property>
  <name>fs.s3a.list.version</name>
  <value>2</value>
  <description>Select which version of the S3 SDK's List Objects API to use.
  Currently support 2 (default) and 1 (older API).</description>
</property>

重试和恢复

处理Read-During-Overwrite

S3A如何写数据到S3

S3A客户端实现写文件是通过将所有写入的OutputStream数据缓存到磁盘中,只有当stream close时,才开始启动上传操作。这导致输出很慢,尤其在大文件时可能占满磁盘空间。

Hadoop 2.7 增加了 S3AFastOutputStream代替原有的S3AOutputStream(以后将从hadoop中移除)。

FastOutputStream 主要如下实现:

  • 大文件分为多个定长block来上传。通过fs.s3a.multipart.size配置指定;

  • 缓存block到磁盘(默认)或堆/非堆内存中;

  • 通过后台线程并行上传block;

  • 缓存block大小已达到长度及开始上传;

  • 缓存到磁盘时,使用fs.s3a.buffer.dir指定的目录;

参考

  1. Apache Hadoop Amazon Web Services support – Hadoop-AWS module: Integration with Amazon Web Services

  2. Apache Hadoop Amazon Web Services support – S3Guard: Consistency and Metadata Caching for S3A

updatedupdated2024-05-102024-05-10