本文說明 Manifest Committer 的架構和其他實作/正確性面向
協定及其正確性涵蓋於 Manifest Committer 協定 中。
Manifest 提交器是工作提交器,可提供 ABFS 在「實際」查詢上的效能,以及 GCS 上的效能和正確性。
此提交器使用 S3A 提交器中提供的延伸點。使用者可以為 abfs://
和 gcs://
URL 宣告新的提交器工廠。它可透過 Hadoop MapReduce 和 Apache Spark 使用。
術語 | 意義 |
---|---|
提交者 | 一個類別,可由 MR/Spark 呼叫以執行任務和作業提交操作。 |
Spark 驅動程式 | 安排工作和編排提交操作的 Spark 程序。 |
作業 | 在 MapReduce 中,整個應用程式。在 Spark 中,這是工作鏈中的單一階段 |
作業嘗試 | 單一作業嘗試。MR 支援多個作業嘗試,可在部分作業失敗時復原。Spark 則表示「從頭開始」。 |
任務 | 作業的子部分,例如處理一個檔案或檔案的一部分 |
任務 ID | 任務的 ID,在這個作業中是唯一的。通常從 0 開始,並用於檔案名稱 (part-0000、part-001 等)。 |
任務嘗試 (TA) | 執行任務的嘗試。它可能會失敗,在這種情況下,MR/Spark 將安排另一個嘗試。 |
任務嘗試 ID | 任務嘗試的唯一 ID。任務 ID + 嘗試計數器。 |
目的地目錄 | 工作的最終目的地。 |
作業嘗試目錄 | 作業嘗試使用的暫時目錄。這始終在目的地目錄之下,以確保它與 HDFS、其他檔案系統中的儲存磁碟區等位於相同的加密區域中。 |
任務嘗試目錄 | (也稱為「任務嘗試工作目錄」)。每個任務嘗試專屬的目錄,檔案會寫入其中 |
工作提交 | 取得任務嘗試的輸出,並使其成為該「成功」任務的最終/專屬結果。 |
工作提交 | 彙總所有已提交任務的所有輸出,並產生作業的最終結果。 |
提交者的目的是確保作業的完整輸出最終會出現在目的地中,即使任務失敗也是如此。
對於 Hive 的傳統階層式目錄結構表格,作業提交需要將所有已提交任務的輸出放入目錄樹中的正確位置。
內建於 hadoop-mapreduce-client-core
模組的提交者是 FileOutputCommitter
。
Manifest Committer 是一種效能更高的 committer,適用於透過許多工作在深層目錄樹中建立檔案的 ABFS 和 GCS 儲存。
它也會在 hdfs://
和 file://
URL 上執行,但它經過最佳化,以解決雲端儲存中的列出和重新命名效能和限制問題。
它不會與 S3 正確運作,因為它仰賴原子重新命名-不覆寫作業來提交 manifest 檔案。它也會遇到複製而非移動所有產生資料的效能問題。
雖然它會與 MapReduce 一起運作,但沒有處理從先前失敗嘗試中復原的多個工作嘗試。
Manifest 檔案經過設計,包含(以及 IOStatistics 和其他一些項目)
工作嘗試透過下列方式提交
不進行重新命名,檔案會留在其原始位置。
目錄樹狀瀏覽是單執行緒的,然後是 O(directories)
,每個目錄列出使用一個或多個分頁 LIST 呼叫。
這很簡單,而且對大多數工作而言,掃描會脫離工作的關鍵路徑。
統計分析可能會證明未來轉移至平行掃描是合理的。
工作提交包含
_SUCCESS
檔案,其格式與 S3A committer 相同(用於測試;使用寫入和重新命名進行原子儲存)工作提交階段支援許多工作和每個工作許多檔案的平行化,特別是
O(files)
,在使用 OAuth 驗證時在 ABFS 上也是如此。選用掃描所有祖先…如果有任何檔案,請刪除。
getFileStatus()
。找不到:建立目錄、新增項目和所有父路徑的項目找到目錄:新增項目和所有父路徑的項目找到檔案:刪除。然後像以前一樣建立。有效率地處理目錄的並行建立(或刪除+建立)會是一個麻煩點;會投入一些心力在那裡來建立要建立的目錄組。
檔案會並行重新命名。
重新命名之前檢查該路徑上的任何內容(並刪除)是選用的。由於 Spark 會為每個檔案建立新的 UUID,因此不會發生這種情況,而且可以節省 HTTP 要求。
選用掃描所有提交的檔案並驗證長度,如果已知,則驗證 etag。用於測試和診斷。
此解決方案對 GCS 來說是必要的,並且應對 ABFS 有益,因為任務提交者會支付列出開銷。
主要目標是讓清單提交者保持孤立,既不觸及現有的提交者程式碼,也不觸及 Hadoop 程式碼庫的其他部分。
它必須直接插入 MR 和 Spark,而不需要任何變更,除了已經針對 S3A 提交者實作的變更之外
PathOutputCommitterFactory
繫結。因此,會從其他地方複製和貼上一些內容,例如 org.apache.hadoop.util.functional.TaskPool
是根據 S3ACommitter 的 org.apache.hadoop.fs.s3a.commit.Tasks
為基礎。
_SUCCESS
檔案必須與 S3A JSON 檔案相容。這是為了確保任何驗證 S3A 提交者輸出的現有測試套件都可以重新鎖定在由清單提交者執行的作業,而不需要任何變更。
何時?建議:持續執行直到重新命名最終完成。
我們希望停止整個作業提交。在每個任務提交者執行緒的 iteraton 處理目錄(或每個檔案的處理?)時,需要檢查某些原子布林值「中止作業」。列出或重新命名失敗將需要升級為停止整個作業提交。這表示在非同步重新命名作業或在任務提交者執行緒中引發的任何 IOE 都必須
commitJob()
呼叫結束時重新擲出如果作業提交階段使用執行緒池進行每個任務作業,例如載入檔案,則不得將同一個執行緒池用於每個任務階段中的平行作業。
由於每個 JobStage
都在任務或作業提交中依序執行,因此可以在各階段共用同一個執行緒池。
在目前的實作中,作業提交中沒有平行「每個清單」作業,除了實際載入檔案之外。建立目錄和重新命名檔案的作業實際上是在不執行個別清單的平行處理的情況下執行的。
目錄準備:合併所有清單的目錄清單,然後排隊建立(希望非常小的)唯一目錄集。
重新命名:逐一瀏覽所有清單,並將其重新命名排入重新命名池中。
執行緒池的生命週期受限於階段組態,這將限制在每個 PathOutputCommitter
方法中進行設定、提交、中止和清理。
這可避免 S3A 提交者的執行緒池生命週期問題。
這是 terasorting 中的失敗,其中許多工作各產生許多檔案;要提交的完整檔案清單(以及每個區塊的 etag)會在執行前建立在記憶體中並驗證。
明細提交者假設儲存在記憶體中的資料量較少,因為不再需要為每個已提交檔案的每個區塊儲存 etag。
合併所有要建立的目錄清單並消除重複項。
實作架構反映了 S3A 連接器的經驗。
提交者會針對其對檔案系統執行/呼叫的所有作業收集持續時間統計資料。* 在工作提交期間收集的資料會儲存到明細中(不包括儲存和重新命名該檔案的時間)* 當這些明細在工作提交期間載入時,這些統計資料會合併以形成整個工作的總計統計資料。* 會儲存到 _SUCCESS
檔案* 以及在由 mapreduce.manifest.committer.summary.report.directory
指定的目錄中該檔案的任何副本(如果已設定)。要儲存。* 類別 org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestPrinter
可以載入並列印這些資料。
不會收集查詢中使用的檔案系統和輸入和輸出串流的 IO 統計資料。
透過 PathOutputCommitter
API 呼叫 ManifestCommitter
時,下列屬性會新增到作用中(執行緒)的內容
金鑰 | 值 |
---|---|
ji |
工作 ID |
tai |
任務嘗試 ID |
st |
階段 |
這些也都在執行階段執行工作的所有輔助執行緒中設定。
任何支援稽核的儲存/FS 都能夠收集此資料並包含在記錄檔中。
為了簡化回溯,所有稽核整合都在單一類別 org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.AuditingIntegration
中。