Manifest Committer 架構

本文說明 Manifest Committer 的架構和其他實作/正確性面向

協定及其正確性涵蓋於 Manifest Committer 協定 中。

Manifest 提交器是工作提交器,可提供 ABFS 在「實際」查詢上的效能,以及 GCS 上的效能和正確性。

此提交器使用 S3A 提交器中提供的延伸點。使用者可以為 abfs://gcs:// URL 宣告新的提交器工廠。它可透過 Hadoop MapReduce 和 Apache Spark 使用。

背景

術語

術語 意義
提交者 一個類別,可由 MR/Spark 呼叫以執行任務和作業提交操作。
Spark 驅動程式 安排工作和編排提交操作的 Spark 程序。
作業 在 MapReduce 中,整個應用程式。在 Spark 中,這是工作鏈中的單一階段
作業嘗試 單一作業嘗試。MR 支援多個作業嘗試,可在部分作業失敗時復原。Spark 則表示「從頭開始」。
任務 作業的子部分,例如處理一個檔案或檔案的一部分
任務 ID 任務的 ID,在這個作業中是唯一的。通常從 0 開始,並用於檔案名稱 (part-0000、part-001 等)。
任務嘗試 (TA) 執行任務的嘗試。它可能會失敗,在這種情況下,MR/Spark 將安排另一個嘗試。
任務嘗試 ID 任務嘗試的唯一 ID。任務 ID + 嘗試計數器。
目的地目錄 工作的最終目的地。
作業嘗試目錄 作業嘗試使用的暫時目錄。這始終在目的地目錄之下,以確保它與 HDFS、其他檔案系統中的儲存磁碟區等位於相同的加密區域中。
任務嘗試目錄 (也稱為「任務嘗試工作目錄」)。每個任務嘗試專屬的目錄,檔案會寫入其中
工作提交 取得任務嘗試的輸出,並使其成為該「成功」任務的最終/專屬結果。
工作提交 彙總所有已提交任務的所有輸出,並產生作業的最終結果。

提交者的目的是確保作業的完整輸出最終會出現在目的地中,即使任務失敗也是如此。

  • 完整:輸出包含所有成功任務的工作。
  • 專屬:不成功的任務輸出不存在。
  • 並行:當多個任務並行提交時,輸出與任務提交序列化時相同。這不是作業提交的要求。
  • 可中止:作業和任務可以在作業提交之前中止,之後它們的輸出將不可見。
  • 正確性連續性:一旦作業提交,任何失敗、中止或不成功的任務的輸出都絕不能在將來某個時間點出現。

對於 Hive 的傳統階層式目錄結構表格,作業提交需要將所有已提交任務的輸出放入目錄樹中的正確位置。

內建於 hadoop-mapreduce-client-core 模組的提交者是 FileOutputCommitter

Manifest 提交者:Spark 在 Azure 和 Google 儲存上效能極高的提交者。

Manifest Committer 是一種效能更高的 committer,適用於透過許多工作在深層目錄樹中建立檔案的 ABFS 和 GCS 儲存。

它也會在 hdfs://file:// URL 上執行,但它經過最佳化,以解決雲端儲存中的列出和重新命名效能和限制問題。

不會與 S3 正確運作,因為它仰賴原子重新命名-不覆寫作業來提交 manifest 檔案。它也會遇到複製而非移動所有產生資料的效能問題。

雖然它會與 MapReduce 一起運作,但沒有處理從先前失敗嘗試中復原的多個工作嘗試。

Manifest

Manifest 檔案經過設計,包含(以及 IOStatistics 和其他一些項目)

  1. 如果不存在,則必須建立的目的地目錄清單。
  2. 要重新命名的檔案清單,記錄為(絕對來源、絕對目的地、檔案大小)條目。

工作提交

工作嘗試透過下列方式提交

  1. 遞迴列出工作嘗試工作目錄以建立
  2. 檔案重新命名的目錄清單。
  3. 要重新命名的檔案清單:來源、目的地、大小和 etag(選用)。
  4. 將這些資訊儲存在工作嘗試目錄中的 manifest 檔案,其檔名衍生自工作 ID。注意:寫入暫存檔,然後重新命名為最終路徑,將用於確保 manifest 建立為原子。

不進行重新命名,檔案會留在其原始位置。

目錄樹狀瀏覽是單執行緒的,然後是 O(directories),每個目錄列出使用一個或多個分頁 LIST 呼叫。

這很簡單,而且對大多數工作而言,掃描會脫離工作的關鍵路徑。

統計分析可能會證明未來轉移至平行掃描是合理的。

工作提交

工作提交包含

  1. 列出工作嘗試目錄中的所有 manifest 檔案。
  2. 載入每個 manifest 檔案,建立尚未存在的目錄,然後重新命名重新命名清單中的每個檔案。
  3. 儲存 JSON _SUCCESS 檔案,其格式與 S3A committer 相同(用於測試;使用寫入和重新命名進行原子儲存)

工作提交階段支援許多工作和每個工作許多檔案的平行化,特別是

  1. 明細任務會載入並在「明細處理器」執行緒池中處理。
  2. 目錄建立和檔案重新命名作業會在「執行器」執行緒池中處理:許多重新命名作業可以使用最少的網路 IO 並行執行。
  3. 工作清除可以並行刪除工作嘗試目錄。這很重要,因為在 Google 雲端儲存空間上刪除目錄是 O(files),在使用 OAuth 驗證時在 ABFS 上也是如此。

祖先目錄準備

選用掃描所有祖先…如果有任何檔案,請刪除。

父目錄建立

  1. 探查共用目錄地圖以找出目錄是否存在。如果找到:作業已完成。
  2. 如果地圖是空的,請在路徑上呼叫 getFileStatus()。找不到:建立目錄、新增項目和所有父路徑的項目找到目錄:新增項目和所有父路徑的項目找到檔案:刪除。然後像以前一樣建立。

有效率地處理目錄的並行建立(或刪除+建立)會是一個麻煩點;會投入一些心力在那裡來建立要建立的目錄組。

檔案重新命名

檔案會並行重新命名。

重新命名之前檢查該路徑上的任何內容(並刪除)是選用的。由於 Spark 會為每個檔案建立新的 UUID,因此不會發生這種情況,而且可以節省 HTTP 要求。

驗證

選用掃描所有提交的檔案並驗證長度,如果已知,則驗證 etag。用於測試和診斷。

優點

  • 將來源樹清單作業推入工作提交階段,這通常會脫離執行作業的關鍵路徑
  • 提供 GCS 的原子工作提交,因為沒有預期目錄重新命名是原子的
  • 可以從明細中的工作人員傳遞 IOStatistics。
  • 允許執行一些類似於 S3A「已分區暫存提交器」的重新命名前作業。可以將此作業設定為刪除所有排定要建立的目錄中的現有項目,或者在這些分區不為空時失敗。請參閱 已分區暫存提交器
  • 允許選用的預飛驗證檢查(驗證沒有不同工作建立的重複檔案)
  • 在開發/除錯期間,可以檢視明細、確定輸出大小等。

缺點

  • 需要新的清單檔案格式。
  • 可能會使任務提交更複雜。

此解決方案對 GCS 來說是必要的,並且應對 ABFS 有益,因為任務提交者會支付列出開銷。

實作詳細資料

限制

主要目標是讓清單提交者保持孤立,既不觸及現有的提交者程式碼,也不觸及 Hadoop 程式碼庫的其他部分。

它必須直接插入 MR 和 Spark,而不需要任何變更,除了已經針對 S3A 提交者實作的變更之外

  • 獨立:不得要求變更 hadoop-common 等。
  • 孤立:不得變更現有的提交者
  • 整合:必須透過 PathOutputCommitterFactory 繫結。

因此,會從其他地方複製和貼上一些內容,例如 org.apache.hadoop.util.functional.TaskPool 是根據 S3ACommitter 的 org.apache.hadoop.fs.s3a.commit.Tasks 為基礎。

_SUCCESS 檔案必須與 S3A JSON 檔案相容。這是為了確保任何驗證 S3A 提交者輸出的現有測試套件都可以重新鎖定在由清單提交者執行的作業,而不需要任何變更。

作業提交中的進度回呼。

何時?建議:持續執行直到重新命名最終完成。

作業提交中的錯誤處理和中止。

我們希望停止整個作業提交。在每個任務提交者執行緒的 iteraton 處理目錄(或每個檔案的處理?)時,需要檢查某些原子布林值「中止作業」。列出或重新命名失敗將需要升級為停止整個作業提交。這表示在非同步重新命名作業或在任務提交者執行緒中引發的任何 IOE 都必須

  1. 被捕獲
  2. 儲存在共用欄位/變數中
  3. 觸發中止
  4. commitJob() 呼叫結束時重新擲出

避免死結

如果作業提交階段使用執行緒池進行每個任務作業,例如載入檔案,則不得將同一個執行緒池用於每個任務階段中的平行作業。

由於每個 JobStage 都在任務或作業提交中依序執行,因此可以在各階段共用同一個執行緒池。

在目前的實作中,作業提交中沒有平行「每個清單」作業,除了實際載入檔案之外。建立目錄和重新命名檔案的作業實際上是在不執行個別清單的平行處理的情況下執行的。

目錄準備:合併所有清單的目錄清單,然後排隊建立(希望非常小的)唯一目錄集。

重新命名:逐一瀏覽所有清單,並將其重新命名排入重新命名池中。

執行緒池生命週期

執行緒池的生命週期受限於階段組態,這將限制在每個 PathOutputCommitter 方法中進行設定、提交、中止和清理。

這可避免 S3A 提交者的執行緒池生命週期問題。

與 S3A HADOOP-16570 類似的規模問題。

這是 terasorting 中的失敗,其中許多工作各產生許多檔案;要提交的完整檔案清單(以及每個區塊的 etag)會在執行前建立在記憶體中並驗證。

明細提交者假設儲存在記憶體中的資料量較少,因為不再需要為每個已提交檔案的每個區塊儲存 etag。

在 dest dir 中重複建立目錄

合併所有要建立的目錄清單並消除重複項。

實作架構

實作架構反映了 S3A 連接器的經驗。

  • 將提交階段與 MR 提交類別隔離,因為它的生命週期很複雜。
  • 改為分解成一系列可以在隔離中測試並鏈接到提供最終協定的階段
  • 不要傳遞 MR 資料類型(taskID 等)到階段,傳遞具有一般類型(字串等)的組態。
  • 也傳遞儲存作業的回呼,以便輕鬆實作假的儲存。
  • 針對每個階段:定義前置條件和後置條件、失敗模式。在隔離中測試。

統計資料

提交者會針對其對檔案系統執行/呼叫的所有作業收集持續時間統計資料。* 在工作提交期間收集的資料會儲存到明細中(不包括儲存和重新命名該檔案的時間)* 當這些明細在工作提交期間載入時,這些統計資料會合併以形成整個工作的總計統計資料。* 會儲存到 _SUCCESS 檔案* 以及在由 mapreduce.manifest.committer.summary.report.directory 指定的目錄中該檔案的任何副本(如果已設定)。要儲存。* 類別 org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestPrinter 可以載入並列印這些資料。

不會收集查詢中使用的檔案系統和輸入和輸出串流的 IO 統計資料。

稽核

透過 PathOutputCommitter API 呼叫 ManifestCommitter 時,下列屬性會新增到作用中(執行緒)的內容

金鑰
ji 工作 ID
tai 任務嘗試 ID
st 階段

這些也都在執行階段執行工作的所有輔助執行緒中設定。

任何支援稽核的儲存/FS 都能夠收集此資料並包含在記錄檔中。

為了簡化回溯,所有稽核整合都在單一類別 org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.AuditingIntegration 中。