此文件說明 Manifest Committer 的提交協定
術語 | 意義 |
---|---|
提交者 | 一種可由 MR Spark 呼叫的類別,用於執行任務和工作提交作業。 |
Spark 驅動程式 | 安排工作並編排提交作業的 Spark 程序。 |
工作:在 MapReduce 中 | 整個應用程式。在 Spark 中,這是工作鏈中的一個單一階段 |
工作嘗試 | 嘗試執行工作一次。MR 支援多個工作嘗試,並在部分工作失敗時復原。Spark 則表示「從頭開始」 |
任務 | 工作的子區段,例如處理一個檔案或檔案的一部分 |
任務 ID | 任務 ID,在此工作中是唯一的。通常從 0 開始,並用於檔名 (part-0000、part-001 等) |
任務嘗試 (TA) | 嘗試執行任務。它可能會失敗,這種情況下 MR/spark 將安排另一個任務。 |
任務嘗試 ID | 任務嘗試的唯一 ID。任務 ID + 嘗試計數器。 |
目的地目錄 | 工作的最終目的地。 |
工作嘗試目錄 | 工作嘗試使用的暫時目錄。這總是位於目的地目錄下方,以確保它與 HDFS、其他檔案系統中的儲存磁碟區等位於相同的加密區域。 |
任務嘗試目錄 | 工作嘗試目錄下的目錄,任務嘗試在此建立子目錄以進行自己的工作 |
任務嘗試工作目錄 | 每個任務嘗試獨有的目錄,檔案會寫入其中 |
工作提交 | 取得任務嘗試的輸出,並使其成為該「成功」任務的最終/獨有結果。 |
工作提交 | 彙總所有已提交任務的輸出,並產生工作的最終結果。 |
提交者的目的是確保工作的完整輸出最終會出現在目的地,即使在任務失敗的情況下也是如此。
對於 Hive 的經典階層式目錄結構表格,工作提交需要將所有已提交工作的輸出放入目錄樹中的正確位置。
內建在 hadoop-mapreduce-client-core
模組中的提交器為 FileOutputCommitter
。
它有兩種演算法,v1 和 v2。
v1 演算法能承受所有形式的工作失敗,但在提交最終彙總輸出時很慢,因為它會將每個新建立的檔案重新命名為表格中正確的位置,一個一個地進行。
v2 演算法不被視為安全,因為輸出在個別工作提交時就能看見,而不是延遲到工作提交時才顯示。多個工作嘗試有可能將其資料放入輸出目錄樹中,如果工作在提交之前失敗/中止,則輸出會可見。
$dest/__temporary/$jobAttemptId/
中的工作嘗試目錄包含進行中工作的全部輸出,每個工作嘗試都分配到自己的工作嘗試目錄 $dest/__temporary/$jobAttemptId/__temporary/$taskAttemptId
工作的所有工作都寫入工作嘗試目錄下。如果輸出是根目錄下的深度樹狀結構,工作嘗試目錄將會以類似的結構結束,包含它產生的檔案和其上方的目錄。
工作嘗試目錄直接在工作嘗試目錄下重新命名
rename( $dest/__temporary/$jobAttemptId/__temporary/$taskAttemptId $dest/__temporary/$jobAttemptId/$taskId)
對於每個已提交的工作,其下的所有檔案都會重新命名為目標目錄,其中檔案名稱會從工作的基礎目錄重新對應到目標目錄。
也就是說,$dest/__temporary/$jobAttemptId/$taskId
下的所有內容都會轉換為 $dest
下的路徑。
遞迴樹狀結構會找出每個 TA 目錄中要重新命名的路徑。如果工作目錄樹包含不存在於目標下的子目錄目錄,則會進行一些最佳化:在這種情況下,整個目錄都可以重新命名。如果目錄已經存在,則會對該目錄進行逐檔案合併,而子目錄的動作則再次取決於目標是否存在。
因此,如果每個任務的輸出都進入一個獨立的最終目錄(例如最終分區對單一任務而言是唯一的),則目錄的重新命名為 O(1),與子目錄無關。如果輸出與其他任務位於同一個目錄(或更新現有目錄),則重新命名的效能會變成 O(檔案)。
最後,如果 mapreduce.fileoutputcommitter.marksuccessfuljobs
為 true,則會寫入一個 0 位元組的 _SUCCESS
檔案。
任務嘗試目錄下的檔案會逐一重新命名為目標目錄。不會嘗試最佳化目錄重新命名,因為其他任務可能同時提交工作。因此,它是 O(檔案)
加上列出目錄樹的成本。再次說明:使用遞迴樹狀瀏覽完成,而不是深層的 listFiles(path, recursive=true)
API,後者在 HDFS 和(儘管在此無關)S3 上會比較快。
如果 mapreduce.fileoutputcommitter.marksuccessfuljobs
為 true,則會寫入一個 0 位元組的 _SUCCESS
檔案。
如果任務 T1 的任務嘗試 1 (T1A1) 在提交之前失敗,則驅動程式會排程新的嘗試「T1A2」,並提交它。一切順利。
但是:如果 T1A1 獲准提交,但在提交過程中失敗,則部分輸出可能已寫入目標目錄。
如果接著指示嘗試 T1A2 提交,則只有當它的輸出具有完全相同的檔案名稱組時,才會覆寫任何已重新命名的檔案。如果產生不同的檔案名稱,則輸出將包含 T1A1 和 T1A2 的檔案。
如果 T1A1 在提交過程中進行分區,則任務提交器會排程另一個嘗試並提交其工作。但是,如果 T1A1 仍與檔案系統連線,則它仍可以重新命名檔案。即使使用相同的檔案名稱,兩個任務的輸出也可能混雜在一起。
論文 零重新命名提交器,Loughran 等人,涵蓋這些提交器
它也描述了提交問題,定義正確性,並描述 v1 和 v2 提交程序的演算法,以及 S3A 提交程序、IBM Stocator 提交程序,以及我們對 EMR 的 Spark 提交程序的了解。
hadoop-aws
JAR 包含一對提交程序,「暫存」和「Magic」。這兩個都是相同問題的實作:安全且快速地將工作提交到 S3 物件儲存。
提交程序利用 S3 提供建立檔案的原子方式:PUT 要求。
檔案存在或不存在。檔案可以直接上傳到其目的地,且僅在上傳完成時,檔案才會顯示出來,覆寫任何現有的副本。
對於大型檔案,多部分上傳允許將此上傳作業分割成一系列 POST 要求
1 initiate-upload (路徑 -> 上傳 ID)
1. 上傳部分 (路徑、上傳 ID、資料[]) -> 檢查碼。
這可以並行處理。最多可以將 10,000 個部分上傳到單一物件。除了最後一個部分之外,所有部分都必須 >= 5MB。1. 完成上傳 (路徑、上傳 ID、清單<檢查碼>)
這會顯示檔案,從由檢查碼排序定義的區塊順序中的部分建立檔案。
S3A 提交程序的祕密在於,即使檔案在任務嘗試執行/提交期間上傳,最終的 POST 要求也可以延遲到工作提交階段。任務嘗試需要確定每個檔案的最終目的地,上傳資料作為多部分作業的一部分,然後將完成上傳所需資訊儲存在稍後由工作提交程序讀取並用於 POST 要求的檔案中。
暫存提交程序是根據 Netflix 的 Ryan Blue 的貢獻而建立。它依賴 HDFS 作為一致性儲存,以傳播 .pendingset
檔案。
每個工作嘗試的工作目錄都在本機檔案系統中,即「暫存目錄」。使用 v1 FileOutputCommitter(與叢集 HDFS 檔案系統搭配使用)將完成上傳所需資訊從工作嘗試傳遞給工作提交者。這可確保提交者擁有與 v1 演算法相同的正確性保證。
Magic 提交者純粹是 S3A,並利用作者可以在檔案系統用戶端本身內進行變更的事實。
定義「Magic」路徑,當在其中開啟寫入時,會對最終目的地目錄啟動多方上傳。當輸出串流 close()
時,會將零位元組標記檔案寫入 Magic 路徑,並儲存包含完成上傳所需所有資訊的 JSON .pending 檔案。
工作提交:1. 列出每個工作嘗試的 Magic 目錄下的所有 .pending
檔案;1. 彙總到 .pendingset
檔案 1. 儲存到工作嘗試目錄,並附上工作 ID。
工作提交
.pendingset
檔案Magic 提交者絕對需要一致的 S3 儲存空間,最初使用 S3Guard。由於 S3 現在已一致,因此可以使用原始 S3。它不需要 HDFS 或任何其他具有 rename()
的檔案系統。
S3A 提交者被視為正確,原因如下
已修正的重要問題包括
pendingset
已存在spark.sql.sources.writeJobUUID
中取得唯一的作業 ID在影響正確性而非規模/效能/UX 的問題中:HADOOP-17258 涉及在 TA1 任務提交完成後(但未報告)從失敗中復原。SPARK-33402、SPARK-33230 和 HADOOP-17318 都是相關的:如果兩個 Spark 作業/階段在同一秒啟動,它們有相同的作業 ID。這導致暫存提交者使用的 HDFS 目錄混雜在一起。
值得注意的是:這些都是最小整合測試套件未發現的問題。
好消息:我們現在知道這些問題,並且更能避免再次複製它們。而且知道要撰寫哪些測試。
V1 committer 在 ABFS 上效能不佳,原因如下:
V2 committer 在工作提交中快很多,因為它在任務提交中執行清單和重新命名程序。由於這是非原子的,因此使用它被認為很危險。V2 任務提交演算法顯示,可以透過獨家使用逐檔案重新命名來並行提交不同任務的輸出。
V1 committer 在 GCS 上效能不佳,因為即使任務提交作業(目錄重新命名)也是非原子的 O(files)
作業。這也表示它不安全。
如果任務嘗試已分割,而 Spark 驅動程式排程/提交另一個 TA,則任務目錄可能包含來自第一次嘗試的 1 個以上的檔案。
此 committer 支援的儲存體/檔案系統必須
O(1)
檔案重新命名作業。此 committer 支援的儲存體/檔案系統應該
EtagSource
介面。這用於 ABFS 重新命名復原,以及最終輸出的選用驗證。此 committer 支援的儲存體/檔案系統可以
此 committer 支援的儲存體/檔案系統不能
O(1)
目錄刪除。CleanupJobStage
假設並非如此,因此並行刪除任務嘗試目錄。create(Path, overwrite=false)
作業。透過寫入包含任務嘗試 ID 的路徑來提交 manifest,然後重新命名為其最終路徑。listFiles(path, recursive=true)
呼叫。此 API 呼叫未使用。與 FileOutputCommitter
相比,已移除的要求為
O(1)
目錄刪除。HDFS 符合所有這些要求,因此不會從此提交者中受益良多,儘管它仍會在其中執行。
S3 儲存體不符合此提交者的重新命名要求,即使現在它是一致的。此提交者在 S3 上使用並不安全。
每個工作都必須有唯一的 ID。
實作預期 Spark 執行時間具有相關的修補程式,以確保這一點。
工作 ID 用於命名暫時目錄,而不是使用 _temporary/0/
的經典遞增自然編號方案。該方案來自 MapReduce,其中嘗試 ID > 1 的工作嘗試尋找由前輩提交的任務,並將其納入其結果中。
此提交者針對 Spark,其中沒有恢復嘗試。透過在路徑中使用工作 ID,如果將工作設定為在工作清理/中止時不刪除所有 _temporary
,則可以使用相同表格作為目的地執行多個工作。
任務 ID 和任務嘗試 ID 將像往常一樣從工作 ID 派生。
預期已寫入檔案的檔名應為唯一。這是 Spark 中 ORC 和 Parquet 檔案所執行的,並允許預設省略對目標檔案的檢查。
給定目標目錄 destDir: Path
ID 為 jobID: String
和嘗試次數為 jobAttemptNumber:int
的工作將使用目錄
$destDir/_temporary/manifest_$jobID/$jobAttemptNumber/
作為其工作(注意:它實際上會使用 %02d
格式化該最終子目錄)。
這稱為工作嘗試目錄
在工作嘗試目錄下,會建立子目錄 tasks
。這稱為任務嘗試目錄。每個任務嘗試都將有自己的子目錄,其工作將儲存在其中。
在工作嘗試目錄下,會建立子目錄 manifests
。這稱為y。
所有已提交任務的清單將儲存到此目錄中,檔名為 $taskId-manifest.json
完整路徑
$destDir/_temporary/manifest_$jobID/$jobAttemptNumber/manifests/$taskId-manifest.json
是已提交任務所建立所有檔案的清單的最終位置。它稱為已提交任務的清單路徑。
任務嘗試會將其清單儲存到此目錄中,並使用暫時檔名 $taskAttemptId-manifest.json.tmp
。
這稱為「工作嘗試清單的暫時路徑」。
然後,針對工作和工作操作,定義下列路徑。
let jobDirectory = "$destDir/_temporary/manifest_$jobID/" let jobAttemptDirectory = jobDirectory + "$jobAttemptNumber/" let manifestDirectory = jobAttemptDirectory + "manifests/" let taskAttemptDirectory = jobAttemptDirectory + "tasks/"
而且針對每個工作嘗試,也定義下列路徑
let taskAttemptWorkingDirectory = taskAttemptDirectory + "$taskAttemptId" let taskManifestPath = manifestDirectory + "$taskId-manifest.json" let taskAttemptTemporaryManifestPath = manifestDirectory + "$taskAttemptId-manifest.json"
這份 JSON 檔案經過設計,包含(連同 IO 統計資料和一些診斷資料)
mkdir(jobAttemptDirectory) mkdir(manifestDirectory) mkdir(taskAttemptDirectory)
mkdir(taskAttemptWorkingDirectory)
工作嘗試透過下列方式提交
此時不會進行重新命名:檔案會保留在原始位置,直到在工作提交中重新命名為止。
let (renames, directories) = scan(taskAttemptWorkingDirectory) let manifest = new Manifest(renames, directories) manifest.save(taskAttemptTemporaryManifestPath) rename(taskAttemptTemporaryManifestPath, taskManifestPath)
delete(taskAttemptWorkingDirectory)
工作提交包含
_SUCCESS
檔案(用於測試;使用寫入和重新命名進行原子儲存)工作提交階段支援多個工作和每個工作中的多個檔案的平行化,特別是有一個平行儲存 IO 的執行緒池
let manifestPaths = list("$manifestDirectory/*-manifest.json") let manifests = manifestPaths.map(p -> loadManifest(p)) let directoriesToCreate = merge(manifests.directories) let filesToRename = concat(manifests.files) directoriesToCreate.map(p -> mkdirs(p)) filesToRename.map((src, dest, etag) -> rename(src, dest, etag)) if mapreduce.fileoutputcommitter.marksuccessfuljobs then success.save("$destDir/_SUCCESS")
實作說明
為了協助除錯和開發,摘要可以儲存在相同或不同檔案系統中的位置;中間清單可以重新命名為目標檔案系統中的位置。
if summary.report.directory != "" then success.save("${summary.report.directory}/$jobID.json") if diagnostics.manifest.directory != null then rename($manifestDirectory, "${diagnostics.manifest.directory}/$jobID")
即使工作提交因任何原因失敗,摘要報告仍會儲存
工作清理通常是刪除工作目錄
delete(jobDirectory)
為了解決物件儲存的規模問題,這應先進行所有工作嘗試工作目錄的(平行化)刪除
let taskAttemptWorkingDirectories = list("taskAttemptDirectory") taskAttemptWorkingDirectories.map(p -> delete(p))