MapReduce 教學

目的

此文件全面描述 Hadoop MapReduce 架構的所有使用者介面,並作為教學指南。

先決條件

確保已安裝、設定並執行 Hadoop。更多詳細資訊

概觀

Hadoop MapReduce 是一個軟體架構,可輕鬆撰寫應用程式,在大量商品硬體叢集(數千個節點)中可靠且容錯地平行處理大量資料(多 TB 資料集)。

MapReduce 工作通常會將輸入資料集分割成獨立區塊,再由 對應工作以完全平行的方式處理。架構會對對應產出進行排序,然後輸入到 縮減工作。通常,工作的輸入和輸出都會儲存在檔案系統中。架構會負責排程工作、監控工作,並重新執行失敗的工作。

通常,運算節點和儲存節點是相同的,也就是說,MapReduce 架構和 Hadoop 分散式檔案系統(請參閱 HDFS 架構指南)在同一組節點上執行。此設定讓架構能夠有效地在已存在資料的節點上排程工作,進而產生極高的叢集總頻寬。

MapReduce 架構包含一個主 ResourceManager、每個叢集節點一個工作 NodeManager,以及每個應用程式一個 MRAppMaster(請參閱 YARN 架構指南)。

應用程式至少會指定輸入/輸出位置,並透過實作適當的介面和/或抽象類別來提供 對應縮減 函式。這些和其他的工作參數組成 工作設定

Hadoop 工作用戶端 接著會將工作(jar/可執行檔等)和設定提交給 ResourceManager,然後 ResourceManager 會負責將軟體/設定分發給工作人員、排程工作並監控工作,並提供狀態和診斷資訊給工作用戶端。

雖然 Hadoop 架構是以 Java™ 實作,但 MapReduce 應用程式不需要以 Java 撰寫。

  • Hadoop 串流是一個公用程式,讓使用者能夠使用任何可執行檔(例如 shell 公用程式)作為對應器和/或縮減器來建立和執行工作。

  • Hadoop 管線是一個 SWIG 相容的 C++ API,用於實作 MapReduce 應用程式(非基於 JNI™)。

輸入和輸出

MapReduce 架構只針對 <key, value> 成對執行,也就是說,架構會將工作的輸入視為一組 <key, value> 成對,並產生一組 <key, value> 成對作為工作的輸出,可能為不同類型。

keyvalue 類別必須可由架構序列化,因此需要實作 Writable 介面。此外,key 類別必須實作 WritableComparable 介面,以利架構排序。

MapReduce 工作的輸入和輸出類型

(輸入) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (輸出)

範例:WordCount v1.0

在深入探討細節之前,讓我們先瀏覽一個 MapReduce 應用程式範例,了解它們如何運作。

WordCount 是一個簡單的應用程式,用於計算給定輸入集中的每個字詞出現的次數。

這適用於本機獨立式、偽分散式或完全分散式 Hadoop 安裝 (單一節點設定)。

原始碼

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

用法

假設環境變數設定如下

export JAVA_HOME=/usr/java/default
export PATH=${JAVA_HOME}/bin:${PATH}
export HADOOP_CLASSPATH=${JAVA_HOME}/lib/tools.jar

編譯 WordCount.java 並建立一個 jar

$ bin/hadoop com.sun.tools.javac.Main WordCount.java
$ jar cf wc.jar WordCount*.class

假設

  • /user/joe/wordcount/input - HDFS 中的輸入目錄
  • /user/joe/wordcount/output - HDFS 中的輸出目錄

範例文字檔案作為輸入

$ bin/hadoop fs -ls /user/joe/wordcount/input/
/user/joe/wordcount/input/file01
/user/joe/wordcount/input/file02

$ bin/hadoop fs -cat /user/joe/wordcount/input/file01
Hello World Bye World

$ bin/hadoop fs -cat /user/joe/wordcount/input/file02
Hello Hadoop Goodbye Hadoop

執行應用程式

$ bin/hadoop jar wc.jar WordCount /user/joe/wordcount/input /user/joe/wordcount/output

輸出

$ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000
Bye 1
Goodbye 1
Hadoop 2
Hello 2
World 2

應用程式可以使用 -files 選項指定一個逗號分隔路徑清單,這些路徑會出現在工作目前的目錄中。-libjars 選項允許應用程式將 jar 加入 map 和 reduce 的類別路徑中。-archives 選項允許它們傳遞逗號分隔的檔案清單作為引數。這些檔案會解壓縮,並在工作目前的目錄中建立一個連結,連結名稱為檔案名稱。可以在 命令指南 中找到更多關於命令列選項的詳細資訊。

使用 -libjars-files-archives 執行 wordcount 範例

bin/hadoop jar hadoop-mapreduce-examples-<ver>.jar wordcount -files cachefile.txt -libjars mylib.jar -archives myarchive.zip input output

在此,myarchive.zip 會放置在一個目錄中,並解壓縮為「myarchive.zip」。

使用者可以使用 # 為透過 -files-archives 選項傳遞的檔案和檔案庫指定不同的符號名稱。

例如,

bin/hadoop jar hadoop-mapreduce-examples-<ver>.jar wordcount -files dir1/dict.txt#dict1,dir2/dict.txt#dict2 -archives mytar.tgz#tgzdir input output

在此,工作可以使用符號名稱 dict1 和 dict2 分別存取檔案 dir1/dict.txt 和 dir2/dict.txt。檔案庫 mytar.tgz 會放置在一個目錄中,並解壓縮為「tgzdir」。

應用程式可以透過在命令列上使用選項 -Dmapreduce.map.env、-Dmapreduce.reduce.env 和 -Dyarn.app.mapreduce.am.env,為 mapper、reducer 和應用程式主工作指定環境變數。

例如,下列設定會為 mapper 和 reducer 設定環境變數 FOO_VAR=bar 和 LIST_VAR=a,b,c,

bin/hadoop jar hadoop-mapreduce-examples-<ver>.jar wordcount -Dmapreduce.map.env.FOO_VAR=bar -Dmapreduce.map.env.LIST_VAR=a,b,c -Dmapreduce.reduce.env.FOO_VAR=bar -Dmapreduce.reduce.env.LIST_VAR=a,b,c input output

逐步解說

WordCount 應用程式相當直觀。

public void map(Object key, Text value, Context context
                ) throws IOException, InterruptedException {
  StringTokenizer itr = new StringTokenizer(value.toString());
  while (itr.hasMoreTokens()) {
    word.set(itr.nextToken());
    context.write(word, one);
  }
}

Mapper 實作透過 map 方法逐行處理,由指定的 TextInputFormat 提供。接著,它會透過 StringTokenizer 將該行拆分為以空白分隔的代幣,並發出 < <word>, 1> 的金鑰值配對。

對於給定的範例輸入,第一個映射會發出

< Hello, 1>
< World, 1>
< Bye, 1>
< World, 1>

第二個映射會發出

< Hello, 1>
< Hadoop, 1>
< Goodbye, 1>
< Hadoop, 1>

我們稍後會在教學課程中深入了解為特定工作產生的映射數量,以及如何精細地控制它們。

    job.setCombinerClass(IntSumReducer.class);

WordCount 也指定一個 combiner。因此,每個映射的輸出會在以 金鑰排序後,傳遞給本機組合器(根據工作設定,與 Reducer 相同)進行本機聚合。

第一個映射的輸出

< Bye, 1>
< Hello, 1>
< World, 2>

第二個映射的輸出

< Goodbye, 1>
< Hadoop, 2>
< Hello, 1>
public void reduce(Text key, Iterable<IntWritable> values,
                   Context context
                   ) throws IOException, InterruptedException {
  int sum = 0;
  for (IntWritable val : values) {
    sum += val.get();
  }
  result.set(sum);
  context.write(key, result);
}

Reducer 實作透過 reduce 方法將值加總,這些值是每個金鑰的出現次數(在此範例中,即為字詞)。

因此,工作的輸出為

< Bye, 1>
< Goodbye, 1>
< Hadoop, 2>
< Hello, 2>
< World, 2>

main 方法在 Job 中指定工作的各種面向,例如輸入/輸出路徑(透過命令列傳遞)、金鑰/值類型、輸入/輸出格式等。接著,它會呼叫 job.waitForCompletion 來提交工作並監控其進度。

我們稍後會在教學課程中深入了解 JobInputFormatOutputFormat 和其他介面與類別。

MapReduce - 使用者介面

本節提供 MapReduce 架構中每個使用者面向面向的合理詳細資料。這應有助於使用者以精細的方式實作、設定和調整其工作。不過,請注意,每個類別/介面的 javadoc 仍然是最全面的文件;這僅作為教學課程使用。

讓我們先了解 MapperReducer 介面。應用程式通常會實作它們來提供 mapreduce 方法。

接著,我們將討論其他核心介面,包括 JobPartitionerInputFormatOutputFormat 等。

最後,我們將總結討論框架的一些有用功能,例如 DistributedCacheIsolationRunner 等。

Payload

應用程式通常實作 MapperReducer 介面,以提供 mapreduce 方法。這些方法構成作業的核心。

Mapper

Mapper 將輸入鍵/值對應到一組中間鍵/值對。

Map 是將輸入記錄轉換為中間記錄的個別作業。轉換後的中間記錄不需要與輸入記錄為相同類型。給定的輸入對應到零或多個輸出對。

Hadoop MapReduce 框架會為作業的 InputFormat 所產生的每個 InputSplit 產生一個 Map 作業。

整體而言,會透過 Job.setMapperClass(Class) 方法將 Mapper 實作傳遞給作業。然後,框架會針對該作業的 InputSplit 中的每個鍵/值對呼叫 map(WritableComparable, Writable, Context)。然後,應用程式可以覆寫 cleanup(Context) 方法,以執行任何必要的清理工作。

輸出對不需要與輸入對為相同類型。給定的輸入對應到零或多個輸出對。輸出對會透過呼叫 context.write(WritableComparable, Writable) 來收集。

應用程式可以使用 Counter 來報告其統計資料。

與給定輸出鍵相關的所有中間值,隨後會由框架分組,並傳遞給 Reducer,以決定最終輸出。使用者可以透過 Job.setGroupingComparatorClass(Class) 指定 Comparator 來控制分組。

Mapper 輸出會排序,然後依 Reducer 分割。分割的總數與作業的縮減作業數相同。使用者可以透過實作自訂 Partitioner 來控制哪些鍵(以及記錄)會傳遞到哪些 Reducer

使用者可以選擇透過 Job.setCombinerClass(Class) 指定 combiner,以執行中間輸出的區域聚合,有助於減少從 Mapper 傳輸到 Reducer 的資料量。

中間的排序輸出始終會儲存在簡單的(鍵長度、鍵、值長度、值)格式中。應用程式可以透過 Configuration 控制中間輸出是否要壓縮,以及如何壓縮,以及要使用的 CompressionCodec

有多少個 Map?

Map 的數量通常會由輸入的總大小決定,也就是輸入檔案的區塊總數。

地圖的平行處理最佳層級似乎是每節點 10-100 個地圖,雖然對於非常耗費 CPU 的地圖任務,已設定為 300 個地圖。任務設定需要一段時間,因此最好讓地圖執行時間至少一分鐘。

因此,如果您預期有 10TB 的輸入資料,且區塊大小為 128MB,您將會得到 82,000 個地圖,除非使用 Configuration.set(MRJobConfig.NUM_MAPS, int)(僅提供架構提示)將其設定得更高。

Reducer

Reducer 將一組共用一個金鑰的中間值縮減為一組較小的值。

使用者透過 Job.setNumReduceTasks(int) 設定工作中的縮減數量。

整體而言,Reducer 實作會透過 Job.setReducerClass(Class) 方法傳遞工作中的 Job,並可覆寫它來初始化自己。接著,架構會針對群組輸入中的每個 <金鑰,(值清單)> 對呼叫 reduce(WritableComparable, Iterable<Writable>, Context) 方法。接著,應用程式可以覆寫 cleanup(Context) 方法來執行任何必要的清理。

Reducer 有 3 個主要階段:洗牌、排序和縮減。

洗牌

Reducer 的輸入是映射器的已排序輸出。在此階段,架構會透過 HTTP 取得所有映射器輸出的相關分割區。

排序

在此階段,架構會依金鑰群組 Reducer 輸入(因為不同的映射器可能會輸出相同的金鑰)。

洗牌和排序階段會同時發生;在擷取地圖輸出時會將它們合併。

次要排序

如果中間金鑰的群組等價規則需要與縮減前金鑰的群組規則不同,則可以透過 Job.setSortComparatorClass(Class) 指定 Comparator。由於 Job.setGroupingComparatorClass(Class) 可用於控制中間金鑰的群組方式,因此可以將這些方法結合使用來模擬「依據值進行次要排序」。

縮減

在此階段,會針對群組輸入中的每個 <金鑰,(值清單)> 對呼叫 reduce(WritableComparable, Iterable<Writable>, Context) 方法。

通常會透過 Context.write(WritableComparable, Writable) 將縮減任務的輸出寫入 FileSystem

應用程式可以使用 Counter 來報告其統計資料。

Reducer 的輸出未排序

縮減數量?

適當的縮減數量似乎是 (<節點數> * <每個節點的最大容器數>) 乘以 0.951.75

使用 0.95 時,所有縮減作業都可以立即啟動,並在映射作業完成時開始傳輸映射輸出。使用 1.75 時,較快的節點將完成第一輪縮減作業,並啟動第二波縮減作業,以更有效地執行負載平衡。

增加縮減數量會增加架構開銷,但也會增加負載平衡並降低發生故障的成本。

上述縮放係數略小於整數,以便在架構中保留一些縮減時段,以供推測性任務和失敗任務使用。

縮減作業無

如果不需要縮減作業,將縮減任務數量設定為是合法的。

在此情況下,映射任務的輸出會直接傳送到 FileSystem,傳送到 FileOutputFormat.setOutputPath(Job, Path) 設定的輸出路徑中。在將映射輸出寫入 FileSystem 之前,架構不會對其進行排序。

分割器

Partitioner 會分割金鑰空間。

Partitioner 控制中間映射輸出的金鑰分割。金鑰 (或金鑰的子集) 用於衍生分割,通常使用雜湊函數。分割的總數與作業的縮減任務數量相同。因此,這會控制中間金鑰 (以及記錄) 會傳送到哪一個 m 縮減任務中以進行縮減。

HashPartitioner 是預設的 Partitioner

計數器

Counter 是 MapReduce 應用程式用來報告其統計資料的工具。

MapperReducer 實作可以使用 Counter 來報告統計資料。

Hadoop MapReduce 內建一個 函式庫,其中包含一般有用的映射器、縮減器和分割器。

作業設定

作業代表 MapReduce 作業設定。

作業是使用者用來描述 MapReduce 作業給 Hadoop 架構執行的主要介面。架構會嘗試忠實執行由 作業描述的作業,但

作業通常用來指定 Mapper、組合器(如果有)、PartitionerReducerInputFormatOutputFormat 實作。 FileInputFormat 指出輸入檔案集合(FileInputFormat.setInputPaths(Job, Path…)/ FileInputFormat.addInputPath(Job, Path))和(FileInputFormat.setInputPaths(Job, String…)/ FileInputFormat.addInputPaths(Job, String))),以及輸出檔案應寫入的位置(FileOutputFormat.setOutputPath(Path))。

作業(選擇性)用來指定作業的其他進階面向,例如要使用的 Comparator、要放入 DistributedCache 的檔案、中間和/或作業輸出是否要壓縮(以及如何壓縮)、作業工作是否能以推測方式執行(setMapSpeculativeExecution(boolean)/ setReduceSpeculativeExecution(boolean))、每個工作最大嘗試次數(setMaxMapAttempts(int)/ setMaxReduceAttempts(int))等。

當然,使用者可以使用 Configuration.set(String, String)/ Configuration.get(String) 來設定/取得應用程式所需的任意參數。但請使用 DistributedCache 來處理大量(唯讀)資料。

工作執行與環境

MRAppMaster 在獨立的 jvm 中執行 Mapper/Reducer工作作為子處理序。

子工作會繼承父 MRAppMaster 的環境。使用者可以使用 mapreduce.{map|reduce}.java.opts作業中的設定參數,透過子 jvm 向子 jvm 指定其他選項,例如透過 -Djava.library.path=<> 等,讓執行時期連結器搜尋共用函式庫的非標準路徑。如果 mapreduce.{map|reduce}.java.opts 參數包含符號 @taskid@,則會以 MapReduce 工作的 taskid 值內插。

以下是包含多個引數和替換的範例,顯示 jvm GC 記錄,以及無密碼 JVM JMX 代理程式啟動,以便它能與 jconsole 等連線,以觀察子記憶體、執行緒和取得執行緒傾印。它也會將 map 和 reduce 子 jvm 的最大堆積大小分別設定為 512MB 和 1024MB。它也會在子 jvm 的 java.library.path 中新增其他路徑。

<property>
  <name>mapreduce.map.java.opts</name>
  <value>
  -Xmx512M -Djava.library.path=/home/mycompany/lib -verbose:gc -Xloggc:/tmp/@taskid@.gc
  -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false
  </value>
</property>

<property>
  <name>mapreduce.reduce.java.opts</name>
  <value>
  -Xmx1024M -Djava.library.path=/home/mycompany/lib -verbose:gc -Xloggc:/tmp/@taskid@.gc
  -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false
  </value>
</property>

記憶體管理

使用者/管理員也可以使用 mapreduce.{map|reduce}.memory.mb 指定啟動子工作的最大虛擬記憶體,以及它遞迴啟動的任何子處理序。請注意,這裡設定的值是每個處理序的限制。mapreduce.{map|reduce}.memory.mb 的值應以 MB(百萬位元組)為單位指定。而且值必須大於或等於傳遞給 JavaVM 的 -Xmx,否則 VM 可能無法啟動。

注意:mapreduce.{map|reduce}.java.opts 僅用於配置 MRAppMaster 中啟動的子工作。在 配置 Hadoop Daemon 環境 中記載了配置 Daemon 記憶體選項的文件。

架構中某些部分可用的記憶體也是可配置的。在 MapReduce 工作中,調整影響作業並行性和資料存取磁碟頻率的參數可能會影響效能。監控工作的文件系統計數器,特別是相對於 MapReduce 中的位元組計數,對於調整這些參數非常有幫助。

Map 參數

從 Map 中發出的記錄會序列化成一個緩衝區,而元資料會儲存在會計緩衝區中。如以下選項所述,當序列化緩衝區或元資料超過臨界值時,緩衝區的內容會在背景中排序並寫入磁碟,而 Map 會繼續輸出記錄。如果在溢寫進行時任一緩衝區完全填滿,Map 執行緒會封鎖。當 Map 完成時,任何剩餘的記錄會寫入磁碟,而所有磁碟區段會合併成一個單一檔案。將溢寫到磁碟的次數降至最低可以減少 Map 時間,但較大的緩衝區也會減少 Map 程序可用的記憶體。

名稱 類型 說明
mapreduce.task.io.sort.mb int 從 Map 中發出的記錄的序列化和會計緩衝區的累積大小(以 MB 為單位)。
mapreduce.map.sort.spill.percent float 序列化緩衝區中的軟限制。一旦達到,執行緒會開始在背景中將內容溢寫到磁碟。

其他注意事項

  • 如果在溢寫進行時超過任一溢寫臨界值,收集會持續到溢寫完成。例如,如果將 mapreduce.map.sort.spill.percent 設定為 0.33,而緩衝區的剩餘部分在溢寫執行時填滿,下一次溢寫會包含所有收集的記錄,或緩衝區的 0.66,而且不會產生其他溢寫。換句話說,這些臨界值定義的是觸發器,而不是封鎖。

  • 大於序列化緩衝區的記錄會先觸發溢寫,然後溢寫到一個獨立的檔案。此記錄是否會先通過組合器是未定義的。

Shuffle/Reduce 參數

如前所述,每個縮減都會透過 HTTP 從區隔器取得指派給它的輸出並儲存在記憶體中,並定期將這些輸出合併到磁碟中。如果已開啟中間壓縮地圖輸出,每個輸出都會解壓縮到記憶體中。下列選項會影響縮減前這些合併到磁碟的頻率,以及縮減期間分配給地圖輸出的記憶體。

名稱 類型 說明
mapreduce.task.io.soft.factor int 指定同時合併的磁碟區段數。它會限制合併期間開啟的檔案和壓縮編解碼器的數量。如果檔案數量超過這個限制,合併將會進行數次。雖然這個限制也適用於地圖,但大多數工作都應該設定為不太可能達到這個限制。
mapreduce.reduce.merge.inmem.thresholds int 在合併到磁碟之前,會先取得並儲存在記憶體中的已排序地圖輸出數量。就像前一個註解中的溢出臨界值,這並未定義區隔單元,而是一個觸發器。實際上,這通常會設定得非常高 (1000) 或停用 (0),因為在記憶體中合併區段通常比從磁碟合併更便宜 (請參閱此表格後的註解)。這個臨界值只會影響洗牌期間在記憶體中合併的頻率。
mapreduce.reduce.shuffle.merge.percent float 在開始在記憶體中合併之前,已取得的地圖輸出的記憶體臨界值,表示為分配給儲存在記憶體中的地圖輸出的記憶體百分比。由於無法放入記憶體的地圖輸出可能會暫停,因此將此設定得較高可能會降低取得和合併之間的平行性。相反地,對於輸入可以完全放入記憶體的縮減,1.0 的值會很有效。這個參數只會影響洗牌期間在記憶體中合併的頻率。
mapreduce.reduce.shuffle.input.buffer.percent float 相對於通常在 mapreduce.reduce.java.opts 中指定的最大堆積大小,可以分配給在洗牌期間儲存地圖輸出的記憶體百分比。雖然應該為架構保留一些記憶體,但一般來說,將此設定得夠高以儲存大量且眾多的地圖輸出是有利的。
mapreduce.reduce.input.buffer.percent float 相對於縮減期間可以保留地圖輸出的最大堆積大小的記憶體百分比。當縮減開始時,地圖輸出將會合併到磁碟中,直到剩下的輸出低於此定義的資源限制。預設情況下,所有地圖輸出都會在縮減開始前合併到磁碟中,以最大化縮減可用的記憶體。對於記憶體密集度較低縮減,應該增加這個值以避免磁碟行程。

其他注意事項

  • 如果映射輸出大於分配給複製映射輸出的記憶體的 25%,它將直接寫入磁碟,而不會先暫存在記憶體中。

  • 當使用合併器執行時,關於高合併閾值和大型緩衝區的推理可能不成立。對於在擷取所有映射輸出之前啟動的合併,合併器會在溢出到磁碟時執行。在某些情況下,可以透過花費資源合併映射輸出(使磁碟溢出變小,並將溢出和擷取平行化)來獲得更好的縮減時間,而不是積極增加緩衝區大小。

  • 當將記憶體中的映射輸出合併到磁碟以開始縮減時,如果需要中間合併,因為有要溢出的區段,並且磁碟上至少有 mapreduce.task.io.sort.factor 個區段,則記憶體中的映射輸出將會是中間合併的一部分。

已設定參數

下列屬性會在每個工作執行的工作設定中進行區域化

名稱 類型 說明
mapreduce.job.id 字串 工作識別碼
mapreduce.job.jar 字串 工作目錄中的 job.jar 位置
mapreduce.job.local.dir 字串 工作特定的共用暫存空間
mapreduce.task.id 字串 工作識別碼
mapreduce.task.attempt.id 字串 工作嘗試識別碼
mapreduce.task.is.map 布林值 這是映射工作嗎?
mapreduce.task.partition int 工作在工作中的識別碼
mapreduce.map.input.file 字串 映射正在讀取的檔案名稱
mapreduce.map.input.start 長整數 映射輸入分割開始的偏移量
mapreduce.map.input.length 長整數 映射輸入分割中的位元組數
mapreduce.task.output.dir 字串 工作的暫時輸出目錄

注意:在串流工作的執行期間,「mapreduce」參數的名稱會被轉換。點 ( . ) 會變成底線 ( _ )。例如,mapreduce.job.id 會變成 mapreduce_job_id,而 mapreduce.job.jar 會變成 mapreduce_job_jar。若要取得串流工作中的對應器/縮減器的值,請使用帶有底線的參數名稱。

工作記錄

NodeManager 會讀取工作的標準輸出 (stdout) 和錯誤 (stderr) 串流以及 syslog,並記錄到 ${HADOOP_LOG_DIR}/userlogs

散布程式庫

DistributedCache」也可以用來分發 jar 檔和原生程式庫,以便在 map 和/或 reduce 任務中使用。子 jvm 始終會將其「目前工作目錄」新增至 java.library.pathLD_LIBRARY_PATH。因此,快取的程式庫可透過 System.loadLibrarySystem.load 載入。有關如何透過分散式快取載入共用程式庫的更多詳細資訊,請參閱 原生程式庫 中的說明。

工作提交和監控

Job 是使用者工作與 ResourceManager 互動的主要介面。

Job 提供提交工作、追蹤工作進度、存取元件任務的報告和記錄、取得 MapReduce 群集狀態資訊等功能。

工作提交程序包括

  1. 檢查工作的輸入和輸出規格。

  2. 計算工作的 InputSplit 值。

  3. 如有必要,設定工作的 DistributedCache 所需的會計資訊。

  4. 將工作的 jar 檔和組態複製到 FileSystem 上的 MapReduce 系統目錄。

  5. 將工作提交至 ResourceManager,並選擇性地監控其狀態。

工作記錄檔也會記錄到使用者指定的目錄 mapreduce.jobhistory.intermediate-done-dirmapreduce.jobhistory.done-dir,預設為工作輸出目錄。

使用者可以使用下列指令在指定的目錄中檢視記錄摘要:$ mapred job -history output.jhist。此指令會列印工作詳細資料、失敗和已終止的提示詳細資料。有關工作的更多詳細資料,例如每個任務的成功任務和任務嘗試,可以使用下列指令檢視:$ mapred job -history all output.jhist

使用者通常使用 Job 來建立應用程式、描述工作的各種面向、提交工作和監控其進度。

工作控制

使用者可能需要串連 MapReduce 工作,以完成無法透過單一 MapReduce 工作執行的複雜任務。這相當容易,因為工作的輸出通常會傳送到分散式檔案系統,而輸出反過來可以當作下一個工作的輸入。

然而,這也表示確保工作完成(成功/失敗)的責任完全落在客戶端身上。在這種情況下,各種工作控制選項為

工作輸入

InputFormat描述 MapReduce 工作的輸入規格。

MapReduce 架構依賴於工作的 InputFormat

  1. 驗證工作的輸入規格。

  2. 將輸入檔案分割成邏輯的 InputSplit 實例,每個實例隨後會指派給個別的 Mapper

  3. 提供 RecordReader 實作,用於從邏輯 InputSplit 中收集輸入記錄,以便 Mapper 處理。

基於檔案的 InputFormat 實作(通常是 FileInputFormat 的子類別)的預設行為,是根據輸入檔案的總大小(以位元組為單位)將輸入分割成 邏輯 InputSplit 實例。然而,輸入檔案的 FileSystem 區塊大小被視為輸入分割的上限。分割大小的下限可透過 mapreduce.input.fileinputformat.split.minsize 設定。

顯然,根據輸入大小進行邏輯分割對於許多應用程式來說是不夠的,因為必須遵守記錄邊界。在這種情況下,應用程式應實作 RecordReader,負責遵守記錄邊界,並向個別工作提供邏輯 InputSplit 的記錄導向檢視。

TextInputFormat 是預設的 InputFormat

如果 TextInputFormat 是特定工作的 InputFormat,架構會偵測具有 .gz 副檔名的輸入檔案,並使用適當的 CompressionCodec 自動解壓縮它們。然而,必須注意的是,具有上述副檔名的壓縮檔案無法被 分割,且每個壓縮檔案都由單一 mapper 整體處理。

InputSplit

InputSplit代表個別 Mapper 要處理的資料。

通常,InputSplit 呈現輸入的位元組導向檢視,而 RecordReader 則負責處理並呈現記錄導向檢視。

FileSplit 是預設的 InputSplit。它將 mapreduce.map.input.file 設定為邏輯分割的輸入檔案路徑。

RecordReader

RecordReaderInputSplit 讀取 <key, value> 配對。

通常,RecordReader 會轉換 InputSplit 提供的輸入位元組導向檢視,並為 Mapper 實作呈現記錄導向檢視以進行處理。因此,RecordReader 負責處理記錄邊界,並向工作提供金鑰和值。

工作輸出

OutputFormat 描述 MapReduce 工作的輸出規格。

MapReduce 架構依賴工作的 OutputFormat

  1. 驗證工作的輸出規格;例如,檢查輸出目錄是否已存在。

  2. 提供用於寫入工作輸出檔案的 RecordWriter 實作。輸出檔案儲存在 FileSystem 中。

TextOutputFormat 是預設的 OutputFormat

OutputCommitter

OutputCommitter 描述 MapReduce 工作的任務輸出提交。

MapReduce 架構依賴工作的 OutputCommitter

  1. 在初始化期間設定工作。例如,在工作的初始化期間建立工作的暫時輸出目錄。工作設定是由一個獨立的任務在工作處於 PREP 狀態且初始化任務之後完成。設定任務完成後,工作將移至 RUNNING 狀態。

  2. 在工作完成後清理工作。例如,在工作完成後移除暫時輸出目錄。工作清理是由一個獨立的任務在工作結束時完成。清理任務完成後,工作會宣告 SUCCEDED/FAILED/KILLED。

  3. 設定任務暫時輸出。任務設定是作為任務初始化期間的同一任務的一部分完成。

  4. 檢查任務是否需要提交。這是為了在任務不需要提交時避免提交程序。

  5. 提交任務輸出。任務完成後,任務會在需要時提交其輸出。

  6. 捨棄工作提交。如果工作已失敗/終止,輸出將會清除。如果工作無法清除(在例外區塊中),將會啟動一個具有相同嘗試 ID 的獨立工作來執行清除。

FileOutputCommitter 是預設的 OutputCommitter。工作設定/清除工作會佔用映射或縮減容器,視 NodeManager 上哪一種可用而定。而且,JobCleanup 工作、TaskCleanup 工作和 JobSetup 工作具有最高優先順序,而且依此順序執行。

工作副作用檔案

在某些應用程式中,組件工作需要建立和/或寫入副作用檔案,這些檔案與實際工作輸出檔案不同。

在這種情況下,可能會出現兩個相同 MapperReducer 執行個體同時執行(例如,推測性工作)的問題,它們會嘗試在 FileSystem 上開啟和/或寫入相同的檔案(路徑)。因此,應用程式撰寫者必須為每個工作嘗試(使用嘗試 ID,例如 attempt_200709221812_0001_m_000000_0)選擇唯一名稱,而不能只為每個工作選擇名稱。

為了避免這些問題,當 OutputCommitterFileOutputCommitter 時,MapReduce 架構會在 FileSystem 上為每個工作嘗試維護一個特殊的 ${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid} 子目錄,可透過 ${mapreduce.task.output.dir} 存取,工作嘗試的輸出會儲存在此目錄中。在工作嘗試成功完成後,${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid}(僅此目錄)中的檔案會提升${mapreduce.output.fileoutputformat.outputdir}。當然,架構會捨棄未成功工作嘗試的子目錄。這個程序對應用程式來說完全透明。

應用程式撰寫者可以在工作執行期間透過 FileOutputFormat.getWorkOutputPath(Conext)${mapreduce.task.output.dir} 中建立任何必要的副作用檔案來利用此功能,而架構會以類似的方式為成功的任務嘗試提升這些檔案,因此無需為每個任務嘗試選擇唯一的路徑。

請注意:在特定工作嘗試執行期間,${mapreduce.task.output.dir} 的值實際上是 ${mapreduce.output.fileoutputformat.outputdir}/_temporary/_{$taskid},而且這個值是由 MapReduce 架構設定的。因此,只要在 MapReduce 工作傳回的 FileOutputFormat.getWorkOutputPath(Conext) 路徑中建立任何副作用檔案即可利用此功能。

由於在這種情況下,映射的輸出會直接傳送至 HDFS,因此整個討論也適用於縮減器為 NONE(即 0 個縮減器)的工作映射。

記錄寫入器

RecordWriter 會將輸出 <key, value> 對寫入輸出檔案。

RecordWriter 實作會將工作輸出寫入 FileSystem

其他有用的功能

將工作提交至佇列

使用者會將工作提交至佇列。佇列作為工作的集合,允許系統提供特定功能。例如,佇列會使用 ACL 來控制哪些使用者可以向其提交工作。預期佇列主要由 Hadoop 排程器使用。

Hadoop 已設定為使用稱為「預設」的單一強制佇列。佇列名稱會定義在 Hadoop 站台設定的 mapreduce.job.queuename 屬性中。某些工作排程器,例如 容量排程器,支援多個佇列。

工作會透過 mapreduce.job.queuename 屬性或 Configuration.set(MRJobConfig.QUEUE_NAME, String) API 定義需要提交至的佇列。設定佇列名稱是選用的。如果提交工作時未附帶佇列名稱,則會提交至「預設」佇列。

計數器

Counters 代表全域計數器,由 MapReduce 架構或應用程式定義。每個 Counter 可以是任何 Enum 類型。特定 Enum 的計數器會分組為 Counters.Group 類型的群組。

應用程式可以定義任意 Counters (Enum 類型),並透過 Counters.incrCounter(Enum, long) 或 Counters.incrCounter(String, String, long) 在 map 和/或 reduce 方法中更新它們。這些計數器會由架構進行全域彙總。

分散式快取

DistributedCache 有效地分發應用程式特定的、大型、唯讀檔案。

DistributedCache 是 MapReduce 架構提供的設施,用於快取應用程式需要的檔案 (文字、檔案、jar 等)。

應用程式會透過 Job 中的網址 (hdfs://) 指定要快取的檔案。DistributedCache 假設透過 hdfs:// 網址指定的檔案已存在於 FileSystem 中。

在工作中的任何工作在該節點上執行之前,架構會將必要的檔案複製到工作節點。其效率源自於檔案只會針對每個工作複製一次,以及解壓縮在工作節點上解壓縮的檔案的能力。

DistributedCache 會追蹤快取檔案的修改時間戳記。顯然在工作執行時,快取檔案不應由應用程式或外部修改。

DistributedCache 可用於分發簡單的唯讀資料/文字檔,以及更複雜的類型,例如檔案和 jar。檔案(zip、tar、tgz 和 tar.gz 檔)會在工作節點上「解壓縮」。檔案已設定「執行權限」。

檔案/檔案可透過設定屬性 mapreduce.job.cache.{files |archives} 來分發。如果必須分發多個檔案/檔案,它們可以作為逗號分隔的路徑新增。這些屬性也可以透過 API Job.addCacheFile(URI)/ Job.addCacheArchive(URI)Job.setCacheFiles(URI[])/ Job.setCacheArchives(URI[]) 來設定,其中 URI 的格式為 hdfs://host:port/absolute-path#link-name。在串流中,檔案可透過命令列選項 -cacheFile/-cacheArchive 來分發。

DistributedCache 也可用作地圖和/或縮減任務中使用的基本軟體分發機制。它可用於分發 jar 和原生程式庫。Job.addArchiveToClassPath(Path)Job.addFileToClassPath(Path) API 可用於快取檔案/jar,並將它們新增到子 jvm 的「classpath」中。透過設定組態屬性 mapreduce.job.classpath.{files |archives} 也能執行相同的動作。同樣地,連結到任務工作目錄中的快取檔案可用於分發原生程式庫並載入它們。

私人和公開 DistributedCache 檔案

DistributedCache 檔案可以是私人或公開的,這會決定它們如何在工作節點上共用。

  • 「私人」DistributedCache 檔案快取在需要這些檔案的使用者專用的本機目錄中。這些檔案僅由特定使用者的所有任務和工作共用,而無法由工作節點上其他使用者的工作存取。DistributedCache 檔案會因為其在檔案系統(通常是 HDFS)上傳檔案的權限而變為私人。如果檔案沒有世界可讀取存取權,或者導致檔案的目錄路徑沒有世界可執行存取權限以進行查詢,則檔案會變為私人。

  • 「公開」DistributedCache 檔案快取在全域目錄中,且檔案存取設定為公開,以便所有使用者都能看到。這些檔案可由工作節點上所有使用者的任務和工作共用。DistributedCache 檔案會因為其在檔案系統(通常是 HDFS)上傳檔案的權限而變為公開。如果檔案有世界可讀取存取權限,而且導致檔案的目錄路徑有世界可執行存取權限以進行查詢,則檔案會變為公開。換句話說,如果使用者打算讓檔案公開給所有使用者,則必須設定檔案權限為世界可讀取,且導致檔案的路徑上的目錄權限必須為世界可執行。

剖析

剖析是一種取得內建 Java 剖析器代表性範例 (2 或 3) 的公用程式,用於範例的對應和縮減。

使用者可以透過設定組態屬性 mapreduce.task.profile 來指定系統是否應收集工作中部分任務的剖析器資訊。可以使用 API Configuration.set(MRJobConfig.TASK_PROFILE, boolean) 設定值。如果值設定為 true,則會啟用任務剖析。剖析器資訊會儲存在使用者日誌目錄中。預設情況下,工作不會啟用剖析。

使用者在設定需要剖析後,可以使用組態屬性 mapreduce.task.profile.{maps|reduces} 來設定要剖析的 MapReduce 任務範圍。可以使用 API Configuration.set(MRJobConfig.NUM_{MAP|REDUCE}_PROFILES, String) 設定值。預設情況下,指定的範圍為 0-2

使用者也可以透過設定組態屬性 mapreduce.task.profile.params 來指定剖析器組態引數。可以使用 API Configuration.set(MRJobConfig.TASK_PROFILE_PARAMS, String) 指定值。如果字串包含 %s,則會在任務執行時以剖析輸出檔案的名稱取代它。這些參數會透過命令列傳遞給任務子 JVM。剖析參數的預設值為 -agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s

偵錯

MapReduce 架構提供一個讓使用者執行自訂指令碼以進行偵錯的功能。當 MapReduce 任務失敗時,使用者可以執行偵錯指令碼,例如處理任務日誌。指令碼會取得任務的 stdout 和 stderr 輸出、syslog 和 jobconf 的存取權。偵錯指令碼的 stdout 和 stderr 輸出會顯示在主控台診斷中,也會顯示在工作使用者介面的部分內容中。

在以下各節中,我們會說明如何提交偵錯指令碼和工作。指令碼檔案需要散布並提交至架構。

如何散布指令碼檔案

使用者需要使用 DistributedCache散布建立符號連結至指令碼檔案。

如何提交指令碼

提交偵錯指令碼的快速方法是設定屬性 mapreduce.map.debug.scriptmapreduce.reduce.debug.script 的值,分別用於偵錯 map 和 reduce 任務。這些屬性也可以使用 API Configuration.set(MRJobConfig.MAP_DEBUG_SCRIPT, String)Configuration.set(MRJobConfig.REDUCE_DEBUG_SCRIPT, String) 來設定。在串流模式中,偵錯指令碼可以使用命令列選項 -mapdebug-reducedebug 來提交,分別用於偵錯 map 和 reduce 任務。

指令碼的引數是任務的 stdout、stderr、syslog 和 jobconf 檔案。在 MapReduce 任務失敗的節點上執行的偵錯命令是
$script $stdout $stderr $syslog $jobconf

管道程式將 c++ 程式名稱作為第五個命令引數。因此,對於管道程式,命令是
$script $stdout $stderr $syslog $jobconf $program

預設行為

對於管道,預設指令碼會執行以在 gdb 下處理核心傾印、列印堆疊追蹤並提供有關執行緒的資訊。

資料壓縮

Hadoop MapReduce 提供了讓應用程式撰寫者為中間 map 輸出和作業輸出(即 reduce 的輸出)指定壓縮的工具。它也隨附 CompressionCodec 實作,用於 zlib 壓縮演算法。gzipbzip2snappylz4 檔案格式也受支援。

Hadoop 也提供上述壓縮編解碼器的原生實作,原因在於效能(zlib)和 Java 函式庫不可用。有關其使用和可用性的更多詳細資訊,請參閱 這裡

中間輸出

應用程式可以透過 Configuration.set(MRJobConfig.MAP_OUTPUT_COMPRESS, boolean) API 和透過 Configuration.set(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC, Class) API 使用的 CompressionCodec 來控制中間 map 輸出的壓縮。

作業輸出

應用程式可以透過 FileOutputFormat.setCompressOutput(Job, boolean) API 來控制作業輸出的壓縮,而透過 FileOutputFormat.setOutputCompressorClass(Job, Class) API 可以指定要使用的 CompressionCodec

如果作業輸出要儲存在 SequenceFileOutputFormat 中,則可以透過 SequenceFileOutputFormat.setOutputCompressionType(Job, SequenceFile.CompressionType) API 指定所需的 SequenceFile.CompressionType(即 RECORD / BLOCK - 預設為 RECORD)。

略過不良記錄

Hadoop 提供一個選項,在處理映射輸入時,可以略過一組不良輸入記錄。應用程式可透過 SkipBadRecords 類別控制此功能。

當映射工作在特定輸入上確定性地崩潰時,可以使用此功能。這通常是因映射函數中的錯誤所致。通常,使用者必須修正這些錯誤。然而,有時這並不可行。例如,錯誤可能出在第三方函式庫中,而其原始碼不可用。在這種情況下,即使經過多次嘗試,工作也永遠無法順利完成,且工作會失敗。有了這個功能,只有不良記錄周圍的一小部分資料會遺失,而這對某些應用程式來說可能是可以接受的(例如,對非常大型資料執行統計分析的應用程式)。

預設情況下,此功能已停用。若要啟用它,請參閱 SkipBadRecords.setMapperMaxSkipRecords(Configuration, long)SkipBadRecords.setReducerMaxSkipGroups(Configuration, long)

啟用此功能後,框架在一定數量的映射失敗後會進入「略過模式」。有關更多詳細資訊,請參閱 SkipBadRecords.setAttemptsToStartSkipping(Configuration, int)。在「略過模式」中,映射工作會維護正在處理的記錄範圍。為此,框架依賴於已處理記錄計數器。請參閱 SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDSSkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS。此計數器使框架能夠知道已成功處理了多少記錄,因此,哪些記錄範圍導致工作崩潰。在後續嘗試中,會略過此記錄範圍。

略過的記錄數取決於應用程式增加已處理記錄計數器的頻率。建議在處理每筆記錄後增加此計數器。在通常會批次處理的某些應用程式中,這可能不可行。在這種情況下,框架可能會略過不良記錄周圍的其他記錄。使用者可以透過 SkipBadRecords.setMapperMaxSkipRecords(Configuration, long)SkipBadRecords.setReducerMaxSkipGroups(Configuration, long) 控制略過記錄的數量。框架會嘗試使用類似二元搜尋的方法縮小略過記錄的範圍。略過的範圍會分成兩半,只執行其中一半。在後續失敗中,框架會找出哪一半包含不良記錄。工作會一直重新執行,直到達到可接受的略過值或耗盡所有工作嘗試。若要增加工作嘗試的數量,請使用 Job.setMaxMapAttempts(int)Job.setMaxReduceAttempts(int)

略過的記錄會寫入 HDFS 中的順序檔案格式,以供後續分析。可以透過 SkipBadRecords.setSkipOutputPath(JobConf, Path) 變更位置。

範例:WordCount v2.0

以下是一個更完整的 WordCount,它使用了我們到目前為止討論過的 MapReduce 架構提供的許多功能。

這需要 HDFS 啟動並執行,特別是對於 DistributedCache 相關功能。因此,它只能與 偽分散式完全分散式 Hadoop 安裝一起使用。

原始碼

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.StringUtils;

public class WordCount2 {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    static enum CountersEnum { INPUT_WORDS }

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    private boolean caseSensitive;
    private Set<String> patternsToSkip = new HashSet<String>();

    private Configuration conf;
    private BufferedReader fis;

    @Override
    public void setup(Context context) throws IOException,
        InterruptedException {
      conf = context.getConfiguration();
      caseSensitive = conf.getBoolean("wordcount.case.sensitive", true);
      if (conf.getBoolean("wordcount.skip.patterns", false)) {
        URI[] patternsURIs = Job.getInstance(conf).getCacheFiles();
        for (URI patternsURI : patternsURIs) {
          Path patternsPath = new Path(patternsURI.getPath());
          String patternsFileName = patternsPath.getName().toString();
          parseSkipFile(patternsFileName);
        }
      }
    }

    private void parseSkipFile(String fileName) {
      try {
        fis = new BufferedReader(new FileReader(fileName));
        String pattern = null;
        while ((pattern = fis.readLine()) != null) {
          patternsToSkip.add(pattern);
        }
      } catch (IOException ioe) {
        System.err.println("Caught exception while parsing the cached file '"
            + StringUtils.stringifyException(ioe));
      }
    }

    @Override
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      String line = (caseSensitive) ?
          value.toString() : value.toString().toLowerCase();
      for (String pattern : patternsToSkip) {
        line = line.replaceAll(pattern, "");
      }
      StringTokenizer itr = new StringTokenizer(line);
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
        Counter counter = context.getCounter(CountersEnum.class.getName(),
            CountersEnum.INPUT_WORDS.toString());
        counter.increment(1);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
    String[] remainingArgs = optionParser.getRemainingArgs();
    if ((remainingArgs.length != 2) && (remainingArgs.length != 4)) {
      System.err.println("Usage: wordcount <in> <out> [-skip skipPatternFile]");
      System.exit(2);
    }
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount2.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    List<String> otherArgs = new ArrayList<String>();
    for (int i=0; i < remainingArgs.length; ++i) {
      if ("-skip".equals(remainingArgs[i])) {
        job.addCacheFile(new Path(remainingArgs[++i]).toUri());
        job.getConfiguration().setBoolean("wordcount.skip.patterns", true);
      } else {
        otherArgs.add(remainingArgs[i]);
      }
    }
    FileInputFormat.addInputPath(job, new Path(otherArgs.get(0)));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(1)));

    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

範例執行

範例文字檔案作為輸入

$ bin/hadoop fs -ls /user/joe/wordcount/input/
/user/joe/wordcount/input/file01
/user/joe/wordcount/input/file02

$ bin/hadoop fs -cat /user/joe/wordcount/input/file01
Hello World, Bye World!

$ bin/hadoop fs -cat /user/joe/wordcount/input/file02
Hello Hadoop, Goodbye to hadoop.

執行應用程式

$ bin/hadoop jar wc.jar WordCount2 /user/joe/wordcount/input /user/joe/wordcount/output

輸出

$ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000
Bye 1
Goodbye 1
Hadoop, 1
Hello 2
World! 1
World, 1
hadoop. 1
to 1

請注意,輸入與我們查看的第一個版本不同,以及它們如何影響輸出。

現在,讓我們透過 DistributedCache 插入一個列出要忽略的字詞模式的模式檔案。

$ bin/hadoop fs -cat /user/joe/wordcount/patterns.txt
\.
\,
\!
to

再次執行它,這次使用更多選項

$ bin/hadoop jar wc.jar WordCount2 -Dwordcount.case.sensitive=true /user/joe/wordcount/input /user/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt

正如預期,輸出

$ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000
Bye 1
Goodbye 1
Hadoop 1
Hello 2
World 2
hadoop 1

再次執行它,這次關閉大小寫敏感性

$ bin/hadoop jar wc.jar WordCount2 -Dwordcount.case.sensitive=false /user/joe/wordcount/input /user/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt

果然,輸出

$ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000
bye 1
goodbye 1
hadoop 2
hello 2
world 2

重點

第二版的 WordCount 透過使用 MapReduce 架構提供的一些功能,改進了前一版

  • 展示應用程式如何存取 Mapper (和 Reducer) 實作的 setup 方法中的組態參數。

  • 展示如何使用 DistributedCache 來散發作業所需的唯讀資料。在此,它允許使用者指定在計算時要略過的字詞模式。

  • 展示 GenericOptionsParser 的實用性,以處理一般 Hadoop 命令列選項。

  • 展示應用程式如何使用 Counters,以及如何設定傳遞給 map (和 reduce) 方法的應用程式特定狀態資訊。

Java 和 JNI 是 Oracle America, Inc. 在美國和其他國家的商標或註冊商標。