Manifest Committer 協定

此文件說明 Manifest Committer 的提交協定

背景

術語

術語 意義
提交者 一種可由 MR Spark 呼叫的類別,用於執行任務和工作提交作業。
Spark 驅動程式 安排工作並編排提交作業的 Spark 程序。
工作:在 MapReduce 中 整個應用程式。在 Spark 中,這是工作鏈中的一個單一階段
工作嘗試 嘗試執行工作一次。MR 支援多個工作嘗試,並在部分工作失敗時復原。Spark 則表示「從頭開始」
任務 工作的子區段,例如處理一個檔案或檔案的一部分
任務 ID 任務 ID,在此工作中是唯一的。通常從 0 開始,並用於檔名 (part-0000、part-001 等)
任務嘗試 (TA) 嘗試執行任務。它可能會失敗,這種情況下 MR/spark 將安排另一個任務。
任務嘗試 ID 任務嘗試的唯一 ID。任務 ID + 嘗試計數器。
目的地目錄 工作的最終目的地。
工作嘗試目錄 工作嘗試使用的暫時目錄。這總是位於目的地目錄下方,以確保它與 HDFS、其他檔案系統中的儲存磁碟區等位於相同的加密區域。
任務嘗試目錄 工作嘗試目錄下的目錄,任務嘗試在此建立子目錄以進行自己的工作
任務嘗試工作目錄 每個任務嘗試獨有的目錄,檔案會寫入其中
工作提交 取得任務嘗試的輸出,並使其成為該「成功」任務的最終/獨有結果。
工作提交 彙總所有已提交任務的輸出,並產生工作的最終結果。

提交者的目的是確保工作的完整輸出最終會出現在目的地,即使在任務失敗的情況下也是如此。

  • 完整:輸出包含所有成功任務的工作。
  • 獨有:不成功的任務輸出不存在。
  • 並行:當多個任務並行提交時,輸出與任務提交序列化時相同。這不是工作提交的要求。
  • 可中止:工作和任務可以在工作提交之前中止,之後它們的輸出將不可見。
  • 正確性連續性:一旦工作提交,任何失敗、中止或不成功的任務的輸出都絕不能在未來某個時間點出現。

對於 Hive 的經典階層式目錄結構表格,工作提交需要將所有已提交工作的輸出放入目錄樹中的正確位置。

內建在 hadoop-mapreduce-client-core 模組中的提交器為 FileOutputCommitter

它有兩種演算法,v1 和 v2。

v1 演算法能承受所有形式的工作失敗,但在提交最終彙總輸出時很慢,因為它會將每個新建立的檔案重新命名為表格中正確的位置,一個一個地進行。

v2 演算法不被視為安全,因為輸出在個別工作提交時就能看見,而不是延遲到工作提交時才顯示。多個工作嘗試有可能將其資料放入輸出目錄樹中,如果工作在提交之前失敗/中止,則輸出會可見。

檔案輸出提交器 V1 和 V2

檔案輸出提交器 V1 和 V2 提交演算法

工作嘗試執行 (V1 和 V2)

$dest/__temporary/$jobAttemptId/ 中的工作嘗試目錄包含進行中工作的全部輸出,每個工作嘗試都分配到自己的工作嘗試目錄 $dest/__temporary/$jobAttemptId/__temporary/$taskAttemptId

工作的所有工作都寫入工作嘗試目錄下。如果輸出是根目錄下的深度樹狀結構,工作嘗試目錄將會以類似的結構結束,包含它產生的檔案和其上方的目錄。

MapReduce V1 演算法

v1 工作提交

工作嘗試目錄直接在工作嘗試目錄下重新命名

rename(
  $dest/__temporary/$jobAttemptId/__temporary/$taskAttemptId
  $dest/__temporary/$jobAttemptId/$taskId)

V1 工作提交

對於每個已提交的工作,其下的所有檔案都會重新命名為目標目錄,其中檔案名稱會從工作的基礎目錄重新對應到目標目錄。

也就是說,$dest/__temporary/$jobAttemptId/$taskId 下的所有內容都會轉換為 $dest 下的路徑。

遞迴樹狀結構會找出每個 TA 目錄中要重新命名的路徑。如果工作目錄樹包含不存在於目標下的子目錄目錄,則會進行一些最佳化:在這種情況下,整個目錄都可以重新命名。如果目錄已經存在,則會對該目錄進行逐檔案合併,而子目錄的動作則再次取決於目標是否存在。

因此,如果每個任務的輸出都進入一個獨立的最終目錄(例如最終分區對單一任務而言是唯一的),則目錄的重新命名為 O(1),與子目錄無關。如果輸出與其他任務位於同一個目錄(或更新現有目錄),則重新命名的效能會變成 O(檔案)。

最後,如果 mapreduce.fileoutputcommitter.marksuccessfuljobs 為 true,則會寫入一個 0 位元組的 _SUCCESS 檔案。

MapReduce V2 演算法

V2 任務提交

任務嘗試目錄下的檔案會逐一重新命名為目標目錄。不會嘗試最佳化目錄重新命名,因為其他任務可能同時提交工作。因此,它是 O(檔案) 加上列出目錄樹的成本。再次說明:使用遞迴樹狀瀏覽完成,而不是深層的 listFiles(path, recursive=true) API,後者在 HDFS 和(儘管在此無關)S3 上會比較快。

V2 任務提交

如果 mapreduce.fileoutputcommitter.marksuccessfuljobs 為 true,則會寫入一個 0 位元組的 _SUCCESS 檔案。

V2 提交器不正確/不安全的理由

如果任務 T1 的任務嘗試 1 (T1A1) 在提交之前失敗,則驅動程式會排程新的嘗試「T1A2」,並提交它。一切順利。

但是:如果 T1A1 獲准提交,但在提交過程中失敗,則部分輸出可能已寫入目標目錄。

如果接著指示嘗試 T1A2 提交,則只有當它的輸出具有完全相同的檔案名稱組時,才會覆寫任何已重新命名的檔案。如果產生不同的檔案名稱,則輸出將包含 T1A1 和 T1A2 的檔案。

如果 T1A1 在提交過程中進行分區,則任務提交器會排程另一個嘗試並提交其工作。但是,如果 T1A1 仍與檔案系統連線,則它仍可以重新命名檔案。即使使用相同的檔案名稱,兩個任務的輸出也可能混雜在一起。

背景:S3A 提交器

論文 零重新命名提交器,Loughran 等人,涵蓋這些提交器

它也描述了提交問題,定義正確性,並描述 v1 和 v2 提交程序的演算法,以及 S3A 提交程序、IBM Stocator 提交程序,以及我們對 EMR 的 Spark 提交程序的了解。

hadoop-aws JAR 包含一對提交程序,「暫存」和「Magic」。這兩個都是相同問題的實作:安全且快速地將工作提交到 S3 物件儲存。

提交程序利用 S3 提供建立檔案的原子方式:PUT 要求。

檔案存在或不存在。檔案可以直接上傳到其目的地,且僅在上傳完成時,檔案才會顯示出來,覆寫任何現有的副本。

對於大型檔案,多部分上傳允許將此上傳作業分割成一系列 POST 要求

1 initiate-upload (路徑 -> 上傳 ID) 1. 上傳部分 (路徑、上傳 ID、資料[]) -> 檢查碼。 這可以並行處理。最多可以將 10,000 個部分上傳到單一物件。除了最後一個部分之外,所有部分都必須 >= 5MB。1. 完成上傳 (路徑、上傳 ID、清單<檢查碼>) 這會顯示檔案,從由檢查碼排序定義的區塊順序中的部分建立檔案。

S3A 提交程序的祕密在於,即使檔案在任務嘗試執行/提交期間上傳,最終的 POST 要求也可以延遲到工作提交階段。任務嘗試需要確定每個檔案的最終目的地,上傳資料作為多部分作業的一部分,然後將完成上傳所需資訊儲存在稍後由工作提交程序讀取並用於 POST 要求的檔案中。

暫存提交程序

暫存提交程序是根據 Netflix 的 Ryan Blue 的貢獻而建立。它依賴 HDFS 作為一致性儲存,以傳播 .pendingset 檔案。

每個工作嘗試的工作目錄都在本機檔案系統中,即「暫存目錄」。使用 v1 FileOutputCommitter(與叢集 HDFS 檔案系統搭配使用)將完成上傳所需資訊從工作嘗試傳遞給工作提交者。這可確保提交者擁有與 v1 演算法相同的正確性保證。

  1. 工作提交包括將本機檔案系統工作嘗試目錄下的所有檔案上傳至其最終目的地路徑,並暫緩最終清單 POST。
  2. 包含完成工作嘗試中所有檔案上傳所需所有資訊的 JSON 檔案會寫入與 HDFS 搭配使用的包裝提交者的工作嘗試目錄。
  3. 工作提交:載入 HDFS 工作嘗試目錄中的所有清單檔案,然後發出 POST 要求以完成上傳。這些會並行處理。

Magic 提交者

Magic 提交者純粹是 S3A,並利用作者可以在檔案系統用戶端本身內進行變更的事實。

定義「Magic」路徑,當在其中開啟寫入時,會對最終目的地目錄啟動多方上傳。當輸出串流 close() 時,會將零位元組標記檔案寫入 Magic 路徑,並儲存包含完成上傳所需所有資訊的 JSON .pending 檔案。

工作提交:1. 列出每個工作嘗試的 Magic 目錄下的所有 .pending 檔案;1. 彙總到 .pendingset 檔案 1. 儲存到工作嘗試目錄,並附上工作 ID。

工作提交

  1. 列出工作嘗試目錄中的 .pendingset 檔案
  2. 使用 POST 要求完成上傳。

Magic 提交者絕對需要一致的 S3 儲存空間,最初使用 S3Guard。由於 S3 現在已一致,因此可以使用原始 S3。它不需要 HDFS 或任何其他具有 rename() 的檔案系統。

正確性

S3A 提交者被視為正確,原因如下

  1. 在工作提交之前,沒有任何資料實體化。
  2. 只能將一個工作嘗試的清單儲存到工作嘗試目錄。因此:只有具有相同工作 ID 的 TA 檔案會被獨家提交。
  3. 暫存提交者使用 HDFS 將明細從 TA 傳遞給工作提交者,確保 S3 的最終一致性不會導致遺漏明細。
  4. 在 S3 一致之前,神奇提交者依賴 S3Guard 在任務和工作提交期間提供所需的清單一致性。
  5. 作者和更廣泛的社群已修正所有在製作中浮現的與提交者相關的問題。

已修正的重要問題包括

  • HADOOP-15961。S3A 提交者:確保有定期 progress() 呼叫。
  • HADOOP-16570。S3A 提交者遇到規模問題。
  • HADOOP-16798。S3A 提交者執行緒池關閉問題。
  • HADOOP-17112。S3A 提交者無法處理路徑中的空白。
  • HADOOP-17318。支援具有相同應用程式嘗試 ID 的並發 S3A 提交工作。
  • HADOOP-17258。MagicS3GuardCommitter 失敗,因為 pendingset 已存在
  • HADOOP-17414。神奇提交者檔案沒有 Spark 收集的寫入位元組數。
  • SPARK-33230 Hadoop 提交者在 spark.sql.sources.writeJobUUID 中取得唯一的作業 ID
  • SPARK-33402 在同一秒啟動的作業有重複的 MapReduce 作業 ID
  • SPARK-33739。透過 S3A 神奇提交者提交的作業不會報告寫入的位元組(取決於 HADOOP-17414)

在影響正確性而非規模/效能/UX 的問題中:HADOOP-17258 涉及在 TA1 任務提交完成後(但未報告)從失敗中復原。SPARK-33402、SPARK-33230 和 HADOOP-17318 都是相關的:如果兩個 Spark 作業/階段在同一秒啟動,它們有相同的作業 ID。這導致暫存提交者使用的 HDFS 目錄混雜在一起。

值得注意的是:這些都是最小整合測試套件未發現的問題。

好消息:我們現在知道這些問題,並且更能避免再次複製它們。而且知道要撰寫哪些測試。

V1 提交者:在 Azure 中速度慢,在 GCS 中速度慢且不安全。

V1 committer 在 ABFS 上效能不佳,原因如下:

  1. 使用 ABFS 時,目錄清單和檔案重新命名會比使用 HDFS 慢一些。
  2. v1 committer 會透過列出每個已提交任務的輸出,循序提交每個任務的輸出,在目的地不存在目錄時移動目錄,將檔案合併到現有目錄中。

V2 committer 在工作提交中快很多,因為它在任務提交中執行清單和重新命名程序。由於這是非原子的,因此使用它被認為很危險。V2 任務提交演算法顯示,可以透過獨家使用逐檔案重新命名來並行提交不同任務的輸出。

V1 committer 在 GCS 上效能不佳,因為即使任務提交作業(目錄重新命名)也是非原子的 O(files) 作業。這也表示它不安全。

如果任務嘗試已分割,而 Spark 驅動程式排程/提交另一個 TA,則任務目錄可能包含來自第一次嘗試的 1 個以上的檔案。


Manifest Committer 協定

儲存體需求

此 committer 支援的儲存體/檔案系統必須

  • 具有相符的清單。
  • 具有原子的 O(1) 檔案重新命名作業。

此 committer 支援的儲存體/檔案系統應該

  • 在負載下成功重新命名檔案。ABFS 無法做到這一點,因此在那裡提供了特殊的復原功能。
  • 實作 HADOOP-17979 的 EtagSource 介面。這用於 ABFS 重新命名復原,以及最終輸出的選用驗證。

此 committer 支援的儲存體/檔案系統可以

  • 具有延遲很高的清單作業。
  • 在負載下拒絕呼叫並提供節流回應,這必須在檔案系統連接器中處理。

此 committer 支援的儲存體/檔案系統不能

  • 支援原子目錄重新命名。這只在清理時選擇性使用,其他時候從不使用。
  • 支援 O(1) 目錄刪除。CleanupJobStage 假設並非如此,因此並行刪除任務嘗試目錄。
  • 支援原子 create(Path, overwrite=false) 作業。透過寫入包含任務嘗試 ID 的路徑來提交 manifest,然後重新命名為其最終路徑。
  • 支援快速 listFiles(path, recursive=true) 呼叫。此 API 呼叫未使用。

FileOutputCommitter 相比,已移除的要求為

  • 原子目錄重新命名。
  • O(1) 目錄刪除。
  • 快速目錄清單。
  • 隱含的節流行為不存在。

HDFS 符合所有這些要求,因此不會從此提交者中受益良多,儘管它仍會在其中執行。

S3 儲存體不符合此提交者的重新命名要求,即使現在它是一致的。此提交者在 S3 上使用並不安全。

任務和工作 ID

每個工作都必須有唯一的 ID。

實作預期 Spark 執行時間具有相關的修補程式,以確保這一點。

工作 ID 用於命名暫時目錄,而不是使用 _temporary/0/ 的經典遞增自然編號方案。該方案來自 MapReduce,其中嘗試 ID > 1 的工作嘗試尋找由前輩提交的任務,並將其納入其結果中。

此提交者針對 Spark,其中沒有恢復嘗試。透過在路徑中使用工作 ID,如果將工作設定為在工作清理/中止時刪除所有 _temporary,則可以使用相同表格作為目的地執行多個工作。

任務 ID 和任務嘗試 ID 將像往常一樣從工作 ID 派生。

預期已寫入檔案的檔名應為唯一。這是 Spark 中 ORC 和 Parquet 檔案所執行的,並允許預設省略對目標檔案的檢查。

目錄結構

給定目標目錄 destDir: Path

ID 為 jobID: String 和嘗試次數為 jobAttemptNumber:int 的工作將使用目錄

$destDir/_temporary/manifest_$jobID/$jobAttemptNumber/

作為其工作(注意:它實際上會使用 %02d 格式化該最終子目錄)。

這稱為工作嘗試目錄

在工作嘗試目錄下,會建立子目錄 tasks。這稱為任務嘗試目錄。每個任務嘗試都將有自己的子目錄,其工作將儲存在其中。

在工作嘗試目錄下,會建立子目錄 manifests。這稱為y

所有已提交任務的清單將儲存到此目錄中,檔名為 $taskId-manifest.json

完整路徑

$destDir/_temporary/manifest_$jobID/$jobAttemptNumber/manifests/$taskId-manifest.json

是已提交任務所建立所有檔案的清單的最終位置。它稱為已提交任務的清單路徑

任務嘗試會將其清單儲存到此目錄中,並使用暫時檔名 $taskAttemptId-manifest.json.tmp

這稱為「工作嘗試清單的暫時路徑」。

然後,針對工作和工作操作,定義下列路徑。

let jobDirectory = "$destDir/_temporary/manifest_$jobID/"
let jobAttemptDirectory = jobDirectory + "$jobAttemptNumber/"
let manifestDirectory = jobAttemptDirectory + "manifests/"
let taskAttemptDirectory = jobAttemptDirectory + "tasks/"

而且針對每個工作嘗試,也定義下列路徑

let taskAttemptWorkingDirectory = taskAttemptDirectory + "$taskAttemptId"
let taskManifestPath = manifestDirectory + "$taskId-manifest.json"
let taskAttemptTemporaryManifestPath = manifestDirectory + "$taskAttemptId-manifest.json"

通訊協定的核心演算法

  1. 每個工作嘗試會將其所有檔案寫入工作嘗試目錄下的唯一目錄樹。
  2. 工作提交包含對該工作嘗試目錄的遞迴掃描,建立目錄清單和檔案清單。
  3. 這些清單儲存為 JSON 清單檔案。
  4. 工作提交包含列出所有 JSON 清單檔案、載入其內容、建立目的地目錄的集合,以及將所有檔案重新命名為其最終目的地。

中間清單

這份 JSON 檔案經過設計,包含(連同 IO 統計資料和一些診斷資料)

  1. 如果不存在,則必須建立的目的地目錄清單。
  2. 作為(絕對來源、絕對目的地、檔案大小)項目重新命名的檔案清單。

工作設定

mkdir(jobAttemptDirectory)
mkdir(manifestDirectory)
mkdir(taskAttemptDirectory)

工作設定

mkdir(taskAttemptWorkingDirectory)

工作提交

工作嘗試透過下列方式提交

  1. 遞迴列出工作嘗試工作目錄以建構
  2. 將檔案重新命名的目的地目錄清單及其狀態(存在、找不到、檔案)
  3. 要重新命名的檔案清單:來源、目的地、大小和 etag(如果有的話)。
  4. 這些清單會填入 JSON 檔案,也就是「中間清單」。
  5. 工作嘗試會將此檔案儲存至其「工作嘗試清單的暫時路徑」。
  6. 然後,工作嘗試會刪除「已提交工作的清單路徑」,並將其自己的清單檔案重新命名為該路徑。
  7. 如果重新命名成功,則工作提交會視為成功。

此時不會進行重新命名:檔案會保留在原始位置,直到在工作提交中重新命名為止。

let (renames, directories) = scan(taskAttemptWorkingDirectory)
let manifest = new Manifest(renames, directories)

manifest.save(taskAttemptTemporaryManifestPath)
rename(taskAttemptTemporaryManifestPath, taskManifestPath)

工作中止/清除

delete(taskAttemptWorkingDirectory)

工作提交

工作提交包含

  1. 列出工作嘗試目錄中的所有清單檔案。
  2. 載入每個清單檔案,建立尚未存在的目錄,然後重新命名重新命名清單中的每個檔案。
  3. 選擇性地儲存與 S3A 提交器格式相同的 JSON _SUCCESS 檔案(用於測試;使用寫入和重新命名進行原子儲存)

工作提交階段支援多個工作和每個工作中的多個檔案的平行化,特別是有一個平行儲存 IO 的執行緒池

  1. 清單工作會平行載入和處理。
  2. 刪除打算建立目錄的檔案。
  3. 建立葉目錄。
  4. 檔案重新命名。
  5. 在清理和中止中:刪除工作嘗試目錄
  6. 如果啟用輸出的驗證進行測試/除錯:呼叫 getFileStatus 來比較檔案長度,如果可能的話,還包括 etag。
let manifestPaths = list("$manifestDirectory/*-manifest.json")
let manifests = manifestPaths.map(p -> loadManifest(p))
let directoriesToCreate = merge(manifests.directories)
let filesToRename = concat(manifests.files)

directoriesToCreate.map(p -> mkdirs(p))
filesToRename.map((src, dest, etag) -> rename(src, dest, etag))

if mapreduce.fileoutputcommitter.marksuccessfuljobs then
  success.save("$destDir/_SUCCESS")

實作說明

為了協助除錯和開發,摘要可以儲存在相同或不同檔案系統中的位置;中間清單可以重新命名為目標檔案系統中的位置。

if summary.report.directory != "" then
  success.save("${summary.report.directory}/$jobID.json")
if diagnostics.manifest.directory != null then
  rename($manifestDirectory, "${diagnostics.manifest.directory}/$jobID")

即使工作提交因任何原因失敗,摘要報告仍會儲存

工作中止/清理

工作清理通常是刪除工作目錄

delete(jobDirectory)

為了解決物件儲存的規模問題,這先進行所有工作嘗試工作目錄的(平行化)刪除

let taskAttemptWorkingDirectories = list("taskAttemptDirectory")
taskAttemptWorkingDirectories.map(p -> delete(p))

新通訊協定的優點

  • 將來源樹狀清單操作推入工作提交階段,這通常不在執行的重要路徑中。
  • 將探測/建立的目錄數量減少到輸出目錄的集合,並消除所有重複項。
  • 檔案重新命名可以平行化,其限制在於已設定的執行緒池大小和/或任何速率限制約束。
  • 提供 GCS 的原子工作提交,因為不期望目錄重新命名是原子的。
  • 允許透過清單將 IOStatistics 從工作嘗試傳遞到工作提交器。
  • 允許在工作提交器中進行一些重新命名前的操作,類似於 S3A「分割暫存提交器」。這可以設定為刪除所有排定建立的目錄中的現有項目,或是在這些分割區非空時失敗。請參閱分割暫存提交器
  • 允許選擇性的預飛驗證檢查(驗證沒有重複的檔案由不同的工作建立)。
  • 清單可以在開發/除錯期間檢視、決定輸出的大小等。

與 v1 演算法相比,新通訊協定的缺點

  • 需要新的清單檔案格式。
  • 如果工作建立許多檔案和/或子目錄,或如果收集 etag 且這些標籤的長度很重要,清單可能會變大。HTTP 通訊協定將每個 etag 限制為 8 KiB,因此成本可能是每個檔案 8 KiB。
  • 使工作提交比 v1 演算法更複雜。
  • 在個別工作建立唯一輸出目錄的工作上可能次優,因為目錄重新命名永遠不會用於提交目錄。