Hadoop 文档

General

Common

HDFS

MapReduce

MapReduce REST APIs

YARN

YARN REST APIs

YARN Service

Submarine

Hadoop Compatible File Systems

Auth

Tools

Reference

Configuration

Hadoop流

Hadoop流是Hadoop发行版随附的实用程序。该实用程序允许您使用任何可执行文件或脚本作为映射器和/或化简器来创建和运行Map / Reduce作业。例如:

映射流
  输入myInputDirs \
  -输出myOutputDir \
  -mapper / bin / cat \
  -reducer / usr / bin / wc

流媒体如何工作

在上面的示例中,映射器和化简器都是可执行文件,它们从stdin(逐行)读取输入,并将输出发送到stdout。该实用程序将创建一个Map / Reduce作业,将该作业提交到适当的群集,并监视该作业的进度,直到完成为止。

为映射器指定可执行文件后,初始化映射器时,每个映射器任务都会将可执行文件作为单独的进程启动。当mapper任务运行时,它将其输入转换为行,并将行输入到流程的标准输入中。同时,映射器从流程的标准输出中收集面向行的输出,并将每行转换为键/值对,并将其作为映射器的输出进行收集。默认情况下,第一个制表符前一行前缀,其余行(不包括制表符)将是value。如果该行中没有制表符,则将整行视为键,并且该值为null。但是,可以通过设置-inputformat自定义 命令选项,如稍后讨论。

当为reducer指定可执行文件时,每个reducer任务将以单独的进程启动可执行文件,然后初始化reducer。reducer任务运行时,它将其输入键/值对转换为行,并将这些行馈送到流程的标准输入。同时,Reducer从流程的stdout收集面向行的输出,将每条线转换为键/值对,将其作为reducer的输出收集。默认情况下,直到第一个制表符的行的前缀是键,其余的行(不包括制表符)是值。但是,这可以通过设置-outputformat命令选项来自定义,如稍后所述。

这是Map / Reduce框架与流式Mapper / Reducer之间的通信协议的基础。

用户可以将stream.non.zero.exit.is.failure指定为truefalse,以使以非零状态退出的流任务分别为FailureSuccess。默认情况下,以非零状态退出的流任务被视为失败任务。

流命令选项

流支持流命令选项以及通用命令选项。常规命令行语法如下所示。

注意:确保将通用选项放在流选项之前,否则命令将失败。有关示例,请参见使档案可用于任务

映射的流[genericOptions] [streamingOptions]

Hadoop流命令选项在此处列出:

参数 可选/必需 描述
-输入目录名或文件名 需要 映射器的输入位置
-输出目录名 需要 减速机的输出位置
-mapper可执行文件或JavaClassName 可选的 映射器可执行文件。如果未指定,则将IdentityMapper用作默认值
-reducer可执行文件或JavaClassName 可选的 Reducer可执行文件。如果未指定,则将IdentityReducer用作默认值
-文件名 可选的 使映射器,reducer或combiner可执行文件在计算节点上本地可用
-inputformat JavaClassName 可选的 您提供的类应返回Text类的键/值对。如果未指定,则将TextInputFormat用作默认值
-outputformat JavaClassName 可选的 您提供的类应采用Text类的键/值对。如果未指定,则将TextOutputformat用作默认值
分区JavaClassName 可选的 确定哪个还原键发送到的类
-combiner StreamingCommand或JavaClassName 可选的 组合器可执行文件,用于地图输出
-cmdenv名称=值 可选的 将环境变量传递给流命令
-inputreader 可选的 为了向后兼容:指定记录读取器类(而不是输入格式类)
-冗长 可选的 详细输出
-lazyOutput 可选的 延迟创建输出。例如,如果输出格式基于FileOutputFormat,则仅在第一次调用Context.write时创建输出文件。
-numReduceTasks 可选的 指定减速机数量
-mapdebug 可选的 地图任务失败时调用的脚本
-减少调试 可选的 减少任务失败时调用的脚本

指定Java类作为Mapper / Reducer

您可以提供Java类作为映射器和/或reducer。

映射流
  输入myInputDirs \
  -输出myOutputDir \
  -inputformat org.apache.hadoop.mapred.KeyValueTextInputFormat \
  -mapper org.apache.hadoop.mapred.lib.IdentityMapper \
  -reducer / usr / bin / wc

您可以将stream.non.zero.exit.is.failure指定为truefalse,以使以非零状态退出的流任务分别为FailureSuccess。默认情况下,以非零状态退出的流任务被视为失败任务。

打包文件并提交作业

您可以将任何可执行文件指定为映射器和/或化简器。可执行文件不需要预先存在于群集中的机器上;但是,如果没有,您将需要使用“ -file”选项来告诉框架将可执行文件打包为作业提交的一部分。例如:

映射流
  输入myInputDirs \
  -输出myOutputDir \
  -mapper myPythonScript.py \
  -reducer / usr / bin / wc \
  -文件myPythonScript.py

上面的示例将用户定义的Python可执行文件指定为映射器。选项“ -file myPythonScript.py”将python可执行文件作为作业提交的一部分发送到群集计算机。

除了可执行文件之外,您还可以打包映射器和/或化简器可能使用的其他辅助文件(例如字典,配置文件等)。例如:

映射流
  输入myInputDirs \
  -输出myOutputDir \
  -mapper myPythonScript.py \
  -reducer / usr / bin / wc \
  -文件myPythonScript.py \
  -文件myDictionary.txt

指定作业的其他插件

与正常的Map / Reduce作业一样,您可以为流作业指定其他插件:

 -inputformat JavaClassName
 -outputformat JavaClassName
 分区JavaClassName
 -combiner StreamingCommand或JavaClassName

您为输入格式提供的类应返回Text类的键/值对。如果未指定输入格式类,则将TextInputFormat用作默认格式。由于TextInputFormat返回LongWritable类的键,而这些键实际上不是输入数据的一部分,因此将丢弃这些键;只有值将通过管道传输到流式映射器。

您为输出格式提供的类应该采用Text类的键/值对。如果未指定输出格式类,则将TextOutputFormat用作默认格式。

设置环境变量

要在流命令中设置环境变量,请使用:

 -cmdenv EXAMPLE_DIR = / home / example / dictionaries /

通用命令选项

流支持流命令选项以及通用命令选项。常规命令行语法如下所示。

注意:确保将通用选项放在流选项之前,否则命令将失败。有关示例,请参见使档案可用于任务

hadoop命令[genericOptions] [streamingOptions]

此处列出了可与流一起使用的Hadoop通用命令选项:

参数 可选/必需 描述
-conf配置文件 可选的 指定应用程序配置文件
-D属性=值 可选的 给定属性的使用价值
-fs host:端口或本地 可选的 指定一个名称节点
文件 可选的 指定将逗号分隔的文件复制到Map / Reduce集群
-libjars 可选的 指定逗号分隔的jar文件以包含在类路径中
-档案 可选的 指定以逗号分隔的存档,以在计算机上取消存档

使用-D选项指定配置变量

您可以使用“ -D <属性> = <值>”来指定其他配置变量。

指定目录

要更改本地临时目录,请使用:

 -D dfs.data.dir = / tmp

要指定其他本地临时目录,请使用:

 -D mapred.local.dir = / tmp /本地
 -D mapred.system.dir = / tmp /系统
 -D mapred.temp.dir = / tmp / temp

注意:有关作业配置参数的更多详细信息,请参见:mapred-default.xml

指定仅地图作业

通常,您可能只想使用地图功能处理输入数据。为此,只需将mapreduce.job.reduces设置为零即可。Map / Reduce框架不会创建任何化简任务。而是,映射器任务的输出将是作业的最终输出。

 -D mapreduce.job.reduces = 0

为了向后兼容,Hadoop Streaming还支持“ -reducer NONE”选项,该选项等效于“ -D mapreduce.job.reduces = 0”。

指定减速机数量

要指定减速器的数量(例如两个),请使用:

映射流
  -D mapreduce.job.reduces = 2 \
  输入myInputDirs \
  -输出myOutputDir \
  -mapper / bin / cat \
  -reducer / usr / bin / wc

自定义如何将行拆分为键/值对

如前所述,当Map / Reduce框架从映射器的stdout读取一行时,它将该行拆分为一个键/值对。默认情况下,第一个制表符之前的行的前缀是键,其余的行(不包括制表符)是值。

但是,您可以自定义此默认值。您可以指定除制表符(默认值)以外的字段分隔符,并且可以指定第n个(n> = 1)字符而不是一行中的第一个字符(默认值)作为键和值之间的分隔符。例如:

映射流
  -D stream.map.output.field.separator =。\
  -D stream.num.map.output.key.fields = 4 \
  输入myInputDirs \
  -输出myOutputDir \
  -mapper / bin / cat \
  -reducer / bin / cat

在上面的示例中,“-D stream.map.output.field.separator =。” 指定“。” 作为地图输出的字段分隔符,并且前缀最多为第四个“。” 一行中的行将是键,其余行(不包括第四个“。”)将是值。如果一行少于四个“。”,则整行将是键,并且值将是一个空的Text对象(例如由ne​​w Text(“”)创建的对象)。

类似地,您可以使用“ -D stream.reduce.output.field.separator = SEP”和“ -D stream.num.reduce.output.fields = NUM​​”将reduce输出行中的第n个字段分隔符指定为键和值之间的分隔符。

同样,您可以将“ stream.map.input.field.separator”和“ stream.reduce.input.field.separator”指定为Map / Reduce输入的输入分隔符。默认情况下,分隔符是制表符。

处理大文件和档案

使用-files和-archives选项,可以使文件和归档可用于任务。该参数是您已经上传到HDFS的文件或档案的URI。这些文件和存档在各个作业之间进行缓存。您可以从fs.default.name配置变量中检索host和fs_port值。

注意: -files和-archives选项是通用选项。确保将通用选项放在命令选项之前,否则命令将失败。

使文件可用于任务

-files选项在任务的当前工作目录中创建指向该文件本地副本的符号链接。

在此示例中,Hadoop在任务的当前工作目录中自动创建一个名为testfile.txt的符号链接。此符号链接指向testfile.txt的本地副本。

-文件hdfs:// host:fs_port / user / testfile.txt

用户可以使用#为-files指定其他符号链接名称。

-文件hdfs:// host:fs_port / user / testfile.txt#testfile

可以这样指定多个条目:

-文件hdfs:// host:fs_port / user / testfile1.txt,hdfs:// host:fs_port / user / testfile2.txt

使档案可用于任务

使用-archives选项,您可以将jar本地复制到任务的当前工作目录,并自动解压缩文件。

在此示例中,Hadoop在当前任务的工作目录中自动创建一个名为testfile.jar的符号链接。此符号链接指向存储上载jar文件的未jarned内容的目录。

-存档hdfs:// host:fs_port / user / testfile.jar

用户可以使用#为-archives指定其他符号链接名称。

-存档hdfs:// host:fs_port / user / testfile.tgz#tgzdir

在此示例中,input.txt文件有两行指定两个文件的名称:cachedir.jar / cache.txt和cachedir.jar / cache2.txt。“ cachedir.jar”是指向归档目录的符号链接,该目录具有文件“ cache.txt”和“ cache2.txt”。

映射流
                -archives'hdfs://hadoop-nn1.example.com/user/me/samples/cachefile/cachedir.jar'\
                -D mapreduce.job.maps = 1 \
                -D mapreduce.job.reduces = 1 \
                -D mapreduce.job.name =“实验” \
                -输入“ /user/me/samples/cachefile/input.txt” \
                -输出“ / user / me / samples / cachefile / out” \
                -mapper“ xargs cat” \
                减速器“猫”

$ ls test_jar /
cache.txt cache2.txt

$ jar cvf cachedir.jar -C test_jar /。
添加清单
添加:cache.txt(in = 30)(out = 29)(缩小3%)
添加:cache2.txt(输入= 37)(输出= 35)(缩小5%)

$ hdfs dfs -put cachedir.jar样本/缓存文件

$ hdfs dfs -cat /user/me/samples/cachefile/input.txt
cachedir.jar / cache.txt
cachedir.jar / cache2.txt

$ cat test_jar / cache.txt
这只是缓存字符串

$ cat test_jar / cache2.txt
这只是第二个缓存字符串

$ hdfs dfs -ls / user / me / samples / cachefile / out
找到2项
-rw-r--r- * 1个我超级组0 2013-11-14 17:00 / user / me / samples / cachefile / out / _SUCCESS
-rw-r--r- * 1 me超级组69 2013-11-14 17:00 / user / me / samples / cachefile / out / part-00000

$ hdfs dfs -cat / user / me / samples / cachefile / out / part-00000
这只是缓存字符串
这只是第二个缓存字符串

更多用法示例

Hadoop分区程序类

Hadoop具有一个库类KeyFieldBasedPartitioner,该类对许多应用程序很有用。此类允许Map / Reduce框架根据某些键字段(而不是整个键)对地图输出进行分区。例如:

映射流
  -D stream.map.output.field.separator =。\
  -D stream.num.map.output.key.fields = 4 \
  -D map.output.key.field.separator =。\
  -D mapreduce.partition.keypartitioner.options = -k1,2 \
  -D mapreduce.job.reduces = 12 \
  输入myInputDirs \
  -输出myOutputDir \
  -mapper / bin / cat \
  -reducer / bin / cat \
  -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner

在这里,-D stream.map.output.field.separator =。-D stream.num.map.output.key.fields = 4如前面的示例中所述。流使用这两个变量来标识映射器的键/值对。

上面的Map / Reduce作业的map输出键通常有四个用“。”分隔的字段。但是,Map / Reduce框架将使用-D mapred.text.key.partitioner.options = -k1,2选项按键的前两个字段对地图输出进行分区。在此,-D map.output.key.field.separator =。指定分区的分隔符。这样可以确保将键中前两个字段相同的所有键/值对都划分为相同的reducer。

这实际上等效于将前两个字段指定为主键,然后将后两个字段指定为主键。主键用于分区,主键和辅助键的组合用于排序。一个简单的例子如下所示:

地图输出(按键)

11.12.1.2
11.14.2.3
11.11.4.1
11.12.1.1
11.14.2.2

划分为3个reducer(前2个字段用作分区的键)

11.11.4.1
-----------
11.12.1.2
11.12.1.1
-----------
11.14.2.3
11.14.2.2

在reducer的每个分区内排序(用于排序的所有4个字段)

11.11.4.1
-----------
11.12.1.1
11.12.1.2
-----------
11.14.2.2
11.14.2.3

Hadoop比较器类

Hadoop有一个库类KeyFieldBasedComparator,对许多应用程序都非常有用。此类提供了Unix / GNU Sort提供的功能的子集。例如:

映射流
  -D mapreduce.job.output.key.comparator.class = org.apache.hadoop.mapreduce.lib.partition.KeyFieldBasedComparator \
  -D stream.map.output.field.separator =。\
  -D stream.num.map.output.key.fields = 4 \
  -D mapreduce.map.output.key.field.separator =。\
  -D mapreduce.partition.keycomparator.options = -k2,2nr \
  -D mapreduce.job.reduces = 1 \
  输入myInputDirs \
  -输出myOutputDir \
  -mapper / bin / cat \
  -reducer / bin / cat

上面的Map / Reduce作业的map输出键通常有四个用“。”分隔的字段。但是,Map / Reduce框架将使用-D mapreduce.partition.keycomparator.options = -k2,2nr选项按键的第二个字段对输出进行排序。在这里,-n指定排序是数字排序,而-r指定应该反转结果。一个简单的图示如下所示:

地图输出(按键)

11.12.1.2
11.14.2.3
11.11.4.1
11.12.1.1
11.14.2.2

归约器的排序输出(其中第二个字段用于排序)

11.14.2.3
11.14.2.2
11.12.1.2
11.12.1.1
11.11.4.1

Hadoop聚合软件包

Hadoop有一个名为Aggregate的库包。Aggregate提供一个特殊的reducer类和一个特殊的Combiner类,以及一系列简单的聚合器,这些聚合器对一系列值执行聚合,例如“ sum”,“ max”,“ min”等。通过Aggregate,您可以定义一个映射器插件类,该类将为映射器的每个输入键/值对生成“可聚合项”。合并器/归约器将通过调用适当的聚合器来聚合那些可聚合项。

要使用聚合,只需指定“ -reducer聚合”:

映射流
  输入myInputDirs \
  -输出myOutputDir \
  -mapper myAggregatorForKeyCount.py \
  -减速器合计\
  -文件myAggregatorForKeyCount.py \

python程序myAggregatorForKeyCount.py看起来像:

#!/ usr / bin / python

进口系统;

def generateLongCountToken(id):
    返回“ LongValueSum:” + id +“ \ t” +“ 1”

def main(argv):
    行= sys.stdin.readline();
    尝试:
        而线:
            line = line&#91;:-1];
            字段= line.split(“ \ t”);
            打印generateLongCountToken(fields&#91; 0]);
            行= sys.stdin.readline();
    “文件末尾”除外:
        不返回
如果__name__ ==“ __main__”:
     主要(sys.argv)

Hadoop字段选择类

Hadoop具有一个库类FieldSelectionMapReduce,该类有效地允许您处理文本数据,例如unix“ cut”实用程序。该类中定义的map函数将每个输入键/值对视为字段列表。您可以指定字段分隔符(默认为制表符)。您可以选择任意字段列表作为地图输出键,也可以选择任意字段列表作为地图输出值。类似地,在类中定义的reduce函数将每个输入键/值对视为字段列表。您可以选择任意字段列表作为reduce输出键,并选择任意字段列表作为reduce输出值。例如:

映射流
  -D mapreduce.map.output.key.field.separator =。\
  -D mapreduce.partition.keypartitioner.options = -k1,2 \
  -D mapreduce.fieldsel.data.field.separator =。\
  -D mapreduce.fieldsel.map.output.key.value.fields.spec = 6,5,1-3:0- \
  -D mapreduce.fieldsel.reduce.output.key.value.fields.spec = 0-2:5- \
  -D mapreduce.map.output.key.class = org.apache.hadoop.io.Text \
  -D mapreduce.job.reduces = 12 \
  输入myInputDirs \
  -输出myOutputDir \
  -mapper org.apache.hadoop.mapred.lib.FieldSelectionMapReduce \
  -reducer org.apache.hadoop.mapred.lib.FieldSelectionMapReduce \
  -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner

选项“ -D mapreduce.fieldsel.map.output.key.value.fields.spec = 6,5,1-3:0-”为地图输出指定键/值选择。键选择规范和值选择规范以“:”分隔。在这种情况下,地图输出键将由字段6、5、1、2和3组成。地图输出值将由所有字段组成(0-表示字段0和所有后续字段)。

选项“ -D mapreduce.fieldsel.reduce.output.key.value.fields.spec = 0-2:5-”指定缩减输出的键/值选择。在这种情况下,reduce输出关键字将包含字段0、1、2(对应于原始字段6、5、1)。减少输出值将包含从字段5开始的所有字段(对应于所有原始字段)。

经常问的问题

如何使用Hadoop Streaming运行任意(半)独立任务集?

通常,您不需要Map Reduce的全部功能,而只需要运行同一程序的多个实例-可以在数据的不同部分或在相同的数据上使用不同的参数。您可以使用Hadoop Streaming做到这一点。

如何处理文件,每个地图一个?

例如,考虑跨Hadoop集群压缩(压缩)一组文件的问题。您可以通过使用Hadoop Streaming和自定义映射器脚本来实现:

  • 生成一个包含输入文件的完整HDFS路径的文件。每个映射任务将获得一个文件名作为输入。

  • 创建一个映射器脚本,该脚本具有给定的文件名,可以将文件获取到本地磁盘,对文件进行gzip压缩,然后将其放回所需的输出目录中。

我应该使用多少个减速器?

有关详细信息,请参见MapReduce教程:Reducer

如果我在shell脚本中设置了别名,那么在-mapper之后可以使用吗?

例如,说我这样做:别名c1 ='cut -f1'。-mapper“ c1”可以工作吗?

使用别名将不起作用,但是如以下示例所示,允许变量替换:

$ hdfs dfs -cat / user / me / samples / student_marks
爱丽丝50
布鲁斯70
查理80
丹75

$ c2 ='cut -f2'; 映射流
  -D mapreduce.job.name ='实验'\
  -输入/ user / me / samples / student_marks \
  输出/ user / me / samples / student_out \
  -mapper“ $ c2” -reducer'cat'

$ hdfs dfs -cat / user / me / samples / student_out / part-00000
50
70
75
80

我可以使用UNIX管道吗?

例如,将-mapper“切割-f1 | sed s / foo / bar / g”起作用?

当前,这不起作用,并给出“ java.io.IOException:管道中断”错误。这可能是需要调查的错误。

如果出现“设备上没有剩余空间”错误该怎么办?

例如,当我通过-file选项通过分发大型可执行文件(例如3.6G)来运行流作业时,出现“设备上没有剩余空间”错误。

jar打包发生在配置变量stream.tmpdir指向的目录中。stream.tmpdir的默认值为/ tmp。将值设置为具有更多空间的目录:

-D stream.tmpdir = / export / bigspace / ...

如何指定多个输入目录?

您可以使用多个“ -input”选项指定多个输入目录:

映射流
  -输入'/ user / foo / dir1'-输入'/ user / foo / dir2'\
    (命令的其余部分)

如何生成gzip格式的输出文件?

除了纯文本文件,您还可以生成gzip文件作为生成的输出。将'-D mapreduce.output.fileoutputformat.compress = true -D mapreduce.output.fileoutputformat.compress.codec = org.apache.hadoop.io.compress.GzipCodec'作为选项传递给流作业。

如何通过流提供自己的输入/输出格式?

您可以通过打包它们并将自定义jar放入$ HADOOP_CLASSPATH来指定自己的自定义类。

如何使用流解析XML文档?

您可以使用记录读取器StreamXmlRecordReader来处理XML文档。

映射流
  -inputreader“ StreamXmlRecord,begin = BEGIN_STRING,end = END_STRING” \
    (命令的其余部分)

在BEGIN_STRING和END_STRING之间找到的任何内容都将被视为地图任务的一条记录。

StreamXmlRecordReader理解的名称-值属性是:

  • (字符串)'begin'-标记记录开始的字符,和'end'-标记记录结束的字符。
  • (布尔值)'slowmatch'-切换以查找开始和结束字符,但在CDATA中而不是常规标记中。默认为false。
  • (整数)'lookahead'-使用'slowmatch'时同步CDATA的最大lookahead字节应大于'maxrec'。默认为2 *'maxrec'。
  • (整数)'maxrec'-在'slowmatch'期间每次匹配之间读取的最大记录大小。默认为50000字节。

如何更新流应用程序中的计数器?

流处理过程可以使用stderr发出计数器信息。记者:计数器:<组>,<计数器>,<金额>应该发送到stderr以更新计数器。

如何更新流式应用程序中的状态?

流处理过程可以使用stderr发出状态信息。要设置状态,应该将reporter:status:<message>发送到stderr。

如何在流作业的映射器/缩减程序中获取Job变量?

请参阅配置的参数。在执行流作业期间,将转换“ mapred”参数的名称。点(。)变成下划线(_)。例如,mapreduce.job.id变为mapreduce_job_id,而mapreduce.job.jar变为mapreduce_job_jar。在您的代码中,将参数名称与下划线一起使用。

如果收到“错误= 7,参数列表过长”该怎么办?

该作业会将整个配置复制到环境中。如果作业正在处理大量输入文件,则将作业配置添加到环境中可能会导致环境溢出。环境中的作业配置副本对于运行作业不是必不可少的,可以通过以下方式将其截断:

-D stream.jobconf.truncate.limit = 20000

默认情况下,值不被截断(-1)。零(0)仅复制名称,而不复制值。在几乎所有情况下,20000是一个安全值,可以防止环境溢出。