類別 org.apache.hadoop.fs.FutureDataInputStreamBuilder

提供 Builder 模式的介面,用於建立 Java Future 參照,以指向 FSDataInputStream 及其子類別。用於初始化(可能非同步)作業,以開啟現有檔案進行讀取。

歷史

Hadoop 3.3.0:導入 API

HADOOP-15229 新增 FileSystem 基於建構函式的 openFile() API 以符合 createFile()

  • 沒有可用的 opt(String key, long value) 方法。
  • withFileStatus(status) 呼叫需要非空參數。
  • 處理選項和檔案狀態的唯一檔案系統為 S3A;
  • 只有 S3A 特定的選項為 S3 選取和 fs.s3a.experimental.input.fadvise
  • 如果傳入檔案狀態,且檔案狀態的路徑與 openFile(path) 呼叫的路徑不符,S3A 檔案系統會引發 IllegalArgumentException

這是基線實作。若要撰寫保證與此版本相容的程式碼,請使用 opt(String, String)must(String, String) 方法,並將數字明確轉換為字串。

fs.open("s3a://bucket/file")
  .opt("fs.option.openfile.length", Long.toString(length))
  .build().get()

Hadoop 3.3.5:標準化和擴充

HADOOP-16202 改善 openFile() 以提升針對物件儲存體的讀取效能

  • 必須接受 (並忽略) withFileStatus(null)
  • 任何提供的 FileStatus 路徑中,只有檔名部分必須與 openFile(path) 中傳入的檔名相符。
  • 新增 opt(String key, long value) 選項。*這現在已過時,因為它會導致回歸
  • 定義標準 fs.option.openfile 選項。
  • S3A FS 使用 openfile 長度選項,暫時使用尋找開始/結束選項。
  • Azure ABFS 連接器會取得提供的 VersionedFileStatus,並省略物件的任何 HEAD 探測。

Hadoop 3.3.6:API 變更以解決運算子超載錯誤。

新的 optLong()optDouble()mustLong()mustDouble() 建構函式方法。

  • 請參閱 HADOOP-18724 開啟檔案時,S3AFileSystem 會傳回 NumberFormatException,這是由超載的 opt(long) 導致。
  • 規格已更新,宣告無法解析的數字必須視為「未設定」,並使用預設值。

不變數

FutureDataInputStreamBuilder 介面在呼叫 build() 和/或在非同步開啟作業本身期間,不需要參數或 FileSystem 的狀態。

檔案系統狀態的某些方面,可以在最初的 openFile() 呼叫中檢查,前提是已知它們是不變數,且不會在 openFile()build().get() 順序之間變更。例如,路徑驗證。

`實作無關參數。

FutureDataInputStreamBuilder bufferSize(int bufSize)

設定要使用的緩衝區大小。

FutureDataInputStreamBuilder withFileStatus(FileStatus status)

一個 FileStatus 執行個體,用於參照正在開啟的檔案。

實作可能會使用此執行個體來短路檔案檢查,因此可以節省遠端呼叫,特別是對物件儲存體的呼叫。

需求

  • status != null
  • status.getPath().getName() == 正在開啟的檔案名稱。

如果儲存體在開啟檔案時使用 FileStatus,則必須進行路徑驗證,否則可以執行驗證。驗證應延後到 build() 操作。

此操作應視為對檔案系統的提示。

如果檔案系統實作延伸其實作中傳回的 FileStatus,則可以在開啟檔案時使用此資訊。

這與傳回版本/etag 資訊的儲存體相關,- 它們可以使用此資訊來保證開啟的檔案與在清單中傳回的檔案完全相同。

提供的狀態的最後一個 status.getPath().getName() 元素必須等於提供給 openFile(path) 呼叫的路徑的名稱值。

檔案系統不得驗證路徑的其餘部分。這是為了支援 viewfs 和其他掛載點包裝檔案系統,其中架構和路徑不同。這些通常會建立自己的 FileStatus 結果

前提條件

status == null or status.getPath().getName() == path.getName()

檔案系統不得要求 status 的類別等於其實作在 filestatus/list 操作中傳回的任何特定子類別。這是為了支援包裝檔案系統和狀態的序列化/反序列化。

設定選用或強制參數

FutureDataInputStreamBuilder opt(String key, String value)
FutureDataInputStreamBuilder opt(String key, int value)
FutureDataInputStreamBuilder opt(String key, boolean value)
FutureDataInputStreamBuilder optLong(String key, long value)
FutureDataInputStreamBuilder optDouble(String key, double value)
FutureDataInputStreamBuilder must(String key, String value)
FutureDataInputStreamBuilder must(String key, int value)
FutureDataInputStreamBuilder must(String key, boolean value)
FutureDataInputStreamBuilder mustLong(String key, long value)
FutureDataInputStreamBuilder mustDouble(String key, double value)

設定選用或強制參數給建構函式。使用 opt()must(),客戶端可以指定特定於檔案系統的參數,而無需檢查 FileSystem 的具體類型。

範例

out = fs.openFile(path)
    .must("fs.option.openfile.read.policy", "random")
    .optLong("fs.http.connection.timeout", 30_000L)
    .withFileStatus(statusFromListing)
    .build()
    .get();

這裡已指定 random 的讀取政策,要求檔案系統實作必須了解此選項。已提供一個特定於 http 的選項,任何儲存體都可以詮釋此選項;如果開啟檔案的檔案系統無法辨識此選項,則可以安全地忽略它。

何時使用 optmust

optmust 之間的差異在於開啟檔案的 FileSystem 必須如何回應它不認識的選項。

def must(name, value):
  if not name in known_keys:
    raise IllegalArgumentException
  if not name in supported_keys:
    raise UnsupportedException


def opt(name, value):
  if not name in known_keys:
     # ignore option

對於任何已知的金鑰,value 參數的驗證都必須相同,無論如何宣告 (key, value) 配對。

  1. 對於特定於檔案系統的選項,實作如何驗證輸入是實作的選擇。
  2. 對於標準選項,有效 value 的規格定義在此檔案系統規格中,透過合約測試驗證。

實作備註

必須在 build() 操作中檢查支援的選項。

  1. 如果透過 must(key, value)) 宣告的強制參數無法辨識,則必須擲出 IllegalArgumentException

  2. 如果透過 must(key, value) 宣告的強制參數依賴於特定 FileSystem/FileContext 執行個體中辨識但未支援的功能,則必須擲出 UnsupportedException

數值解析應修剪任何字串,如果無法將值解析為數字,則降級為提供的任何預設值。這是為了解決 HADOOP-18724 Open file fails with NumberFormatException for S3AFileSystem,其原因是當傳入長整數值時,超載的 opt() 建構函式參數繫結至 opt(String, double) 而不是 opt(String, long)

解決建構函式方法 (例如 bufferSize()) 和 opt()/must() 設定的參數之間衝突的行為如下

最後指定的選項定義值及其選擇性/強制性狀態。

如果在 withFileStatus() 中傳入的 FileStatus 選項已使用,則實作必須接受 FileStatus 的所有子類別,包括 LocatedFileStatus,而不仅仅是實作所實作的任何特定於 FS 的子類別 (例如 S3AFileStatus)。它們可以簡單地忽略那些不是自訂子類別的子類別。

這對於確保安全使用功能至關重要:目錄清單/狀態序列化/反序列化可能會導致 withFileStatus() 參數不是檔案系統實例自己的 getFileStatus()listFiles()listLocatedStatus() 呼叫等傳回的客製化子類別。

在這種情況下,實作必須

  1. 驗證 status.getPath().getName() 是否與目前的 path.getName() 值相符。路徑的其餘部分不得驗證。
  2. 依需要使用任何狀態欄位 - 例如檔案長度。

即使未使用的狀態值,參數的存在也可以解釋為呼叫者宣告他們相信檔案存在且具有給定的大小。

建構函式介面

CompletableFuture<FSDataInputStream> build()

傳回一個 CompletableFuture<FSDataInputStream>,在成功完成時,傳回一個可以從檔案系統讀取資料的輸入串流。

build() 作業可以執行檔案存在驗證、其類型,因此拒絕嘗試從目錄或不存在的檔案中讀取。或者,檔案存在/狀態檢查可以在傳回的 CompletableFuture<> 中非同步執行。檔案存在/狀態檢查可以延後到在任何讀取(例如 read()PositionedRead)中讀取第一個位元組為止。

也就是說,前提條件 exists(FS, path)isFile(FS, path) 僅保證在傳回的未來上呼叫 get() 並且已嘗試讀取串流後才會滿足。

因此,即使在檔案不存在或為目錄而非檔案時,下列呼叫都必須成功,傳回要評估的 CompletableFuture

Path p = new Path("file://tmp/file-which-does-not-exist");

CompletableFuture<FSDataInputStream> future = p.getFileSystem(conf)
      .openFile(p)
      .build();

無法存取/讀取檔案必須在未來的 get() 呼叫中引發 IOException 或子類別,或者,對於延遲繫結作業,則在呼叫讀取資料的作業時引發。

因此,在先前範例傳回的 future 上呼叫時,下列順序將會失敗。

  future.get().read();

存取權限檢查具有相同的可見度需求:權限失敗必須延遲到 get() 呼叫,並且可以延遲到後續作業。

注意:輸入串流上的一些作業,例如 seek(),可能根本不會嘗試任何 IO。這些作業在與不存在/無法讀取的檔案互動時,可能不會引發例外狀況。

Hadoop 3.3.3 以後標準的 openFile() 選項

這些是 FileSystemFileContext 實作必須辨識並可能透過適當地變更其輸入串流行為來支援的選項。

Hadoop 3.3.0 新增了 openFile() API;這些標準選項是在後續版本中定義的。因此,雖然它們是「廣為人知」,但除非確定應用程式只會針對知道這些選項的 Hadoop 版本執行,否則應用程式應透過 opt() 呼叫而不是 must() 來設定選項。

透過 openFile() 建構器 API 開啟檔案時,呼叫者可以使用 .opt(key, value).must(key, value) 呼叫來設定標準和特定於檔案系統的選項。

如果設定為 opt() 參數,則必須忽略不支援的「標準」選項,就像必須忽略無法辨識的標準選項一樣。

如果設定為 must() 參數,則必須忽略不支援的「標準」選項。必須拒絕無法辨識的標準選項。

標準的 openFile() 選項定義在 org.apache.hadoop.fs.OpenFileOptions 中;它們都應以 fs.option.openfile. 開頭。

請注意,雖然所有 FileSystem/FileContext 執行個體都應支援這些選項,以 must() 宣告不會失敗的程度,但實作可能會支援它們,以詮釋值的程度。這表示儲存體實際上不需要讀取讀取原則或檔案長度值,並在開啟檔案時使用它們。

除非另有說明,否則應將它們視為提示。

注意:如果新增一個標準選項,如果設定但未支援會產生錯誤,則實作應拒絕它。例如,S3A 檔案系統用戶端支援將 SQL 指令下推的能力。如果曾經標準化類似功能,則檔案系統不支援該功能時,必須拒絕在 opt()must() 引數中使用該選項。

選項:fs.option.openfile.buffer.size

以位元組為單位的讀取緩衝區大小。

這會以選項 io.file.buffer.size 覆寫組態中設定的預設值。

它受到所有檔案系統用戶端支援,這些用戶端允許透過 FileSystem.open(path, buffersize) 設定特定於串流的緩衝區大小。

選項:fs.option.openfile.read.policy

宣告輸入串流的讀取原則。這是輸入串流預期讀取模式的提示。這可能會控制預先讀取、緩衝和其他最佳化。

順序讀取可能會透過預先擷取資料和/或以較大區塊讀取資料來最佳化。有些應用程式(例如 distCp)甚至會在欄狀資料上執行順序 I/O。

相反地,隨機 I/O 會使用一系列的 seek()/read() 或透過 PositionedReadableByteBufferPositionedReadable API 在檔案的不同部分讀取資料。

如果幾乎沒有或沒有預先擷取,隨機 IO 效能可能會最佳化,以及其他可能的最佳化

針對 Apache ORC 和 Apache Parquet 等欄位格式的查詢執行此類隨機 IO;其他資料格式可能最適合使用順序或全檔案政策來讀取。

關鍵在於,針對順序讀取最佳化讀取可能會損害隨機效能,反之亦然。

  1. 搜尋政策是一個提示;即使宣告為 must() 選項,檔案系統仍可能會忽略它。
  2. 政策的詮釋/實作是檔案系統特定的行為,而且可能會隨著 Hadoop 版本和/或特定儲存子系統而變更。
  3. 如果未辨識政策,檔案系統用戶端必須忽略它。
政策 意義
adaptive 儲存體實作的任何適應性政策。
default 此儲存體的預設政策。通常為「adaptive」。
random 針對隨機存取最佳化。
sequential 針對順序存取最佳化。
vector 預計使用 Vectored IO API。
whole-file 將讀取整個檔案。

為輸入來源選擇錯誤的讀取政策可能會沒有效率。

可以提供讀取政策清單;檔案系統辨識/支援的第一個政策應為使用的政策。這允許支援自訂政策,例如針對 HBase HFiles 最佳化的 hbase-hfile 政策。

S3A 和 ABFS 輸入串流都實作 IOStatisticsSource API,而且可以查詢其 IO 效能。

提示:DEBUG 記錄輸入串流的 toString() 值。S3A 和 ABFS 輸入串流會記錄讀取統計資料,這可以提供有關讀取是否有效率的見解。

進一步閱讀

讀取政策 adaptive

嘗試將搜尋政策調整至應用程式的讀取模式。

S3A 用戶端的 normal 政策和 wasb: 用戶端支援的唯一政策都是適應性的,它們假設順序 IO,但一旦進行向後搜尋/定位讀取呼叫,串流就會切換至隨機 IO。

其他檔案系統實作可能會採用類似的策略,和/或擴充演算法以偵測前向尋址和/或在被認為較有效率時從隨機切換到循序 IO。

適應性讀取政策是不具備在 open() API 中宣告尋址政策的能力,因此需要在群集/應用程式組態中宣告(如果可組態的話)。不過,從循序尋址政策切換到隨機尋址政策可能會很昂貴。

如果應用程式明確設定 fs.option.openfile.read.policy 選項,且它們知道自己的讀取計畫,則它們應該宣告哪個政策最合適。

讀取政策 ``

檔案系統實例的預設政策。實作/安裝特定。

讀取政策 sequential

預期從第一個讀取的位元組到檔案結束/串流關閉時循序讀取。

讀取政策 random

預期 seek()/read() 序列,或使用 PositionedReadableByteBufferPositionedReadable API。

讀取政策 vector

這宣告呼叫者打算使用 HADOOP-11867 新增高效能向量化讀取 API 的向量化讀取 API。

這是提示:使用 API 時並非必要。它會告知實作,如果已實作此功能,串流應組態為最佳向量化 IO 效能。

不是獨佔的:同一個串流仍可使用傳統 InputStreamPositionedRead API 呼叫。實作應該使用 random 讀取政策搭配這些操作。

讀取政策 whole-file

這宣告整個檔案將從頭到尾讀取;檔案系統用戶端可以自由啟用任何策略來最大化效能。特別是,較大範圍的讀取/GET 可以透過減少 socket/TLS 設定成本並提供足夠長久的連線,讓 TCP 流量控制決定最佳下載速率,來提供高頻寬。

策略可以包括

  • openFile() 操作中啟動整個檔案的 HTTP GET。
  • 預先擷取資料成大區塊,可能使用平行讀取操作。

知道整個檔案將從已開啟串流中讀取的應用程式應宣告此讀取政策。

選項:fs.option.openfile.length

宣告檔案長度。

這可用於客戶端在開啟檔案時略過查詢遠端儲存體以取得檔案大小/是否存在,類似於透過 withFileStatus() 選項宣告檔案狀態。

如果由檔案系統連接器支援,此選項必須解釋為宣告檔案的最小長度

  1. 如果值為負,選項應視為未設定。
  2. 如果檔案的實際長度大於此值,則不應為錯誤。
  3. read()seek() 和定位讀取呼叫可能會使用跨越/超出此長度但低於檔案實際長度的定位。實作可能會在這種情況下引發 EOFExceptions,或可能會傳回資料。

如果此選項由檔案系統實作使用

實作人員備註

  • fs.option.openfile.length < 0 的值必須忽略。
  • 如果檔案狀態與 fs.opt.openfile.length 中的值一起提供;檔案狀態值優先。

選項:fs.option.openfile.split.startfs.option.openfile.split.end

在檔案已分割成多個部分處理時,宣告分割的開始和結束。

  1. 如果值為負,選項應視為未設定。
  2. 檔案系統可能會假設檔案長度大於或等於 fs.option.openfile.split.end 的值。
  3. 而且,如果客戶端應用程式讀取超過 fs.option.openfile.split.end 中設定的值,它們可能會引發例外狀況。
  4. 這對選項可用於最佳化讀取計畫,例如設定 GET 要求的內容範圍,或使用分割結束作為檔案保證最小長度的隱含宣告。
  5. 如果兩個選項都已設定,且分割開始宣告為大於分割結束,則分割開始應僅重設為零,而不是拒絕作業。

分割結束值可以提供輸入串流結束的提示。分割開始可用於最佳化檔案系統客戶端的任何初始讀取偏移量。

*實作人員注意事項:當應用程式需要讀取在分割結束前開始的記錄/列的結尾時,它們會讀取分割的結尾。

因此,如果檔案實際上比該值長,則必須允許客戶端在 fs.option.openfile.split.end 中設定的長度之後 seek()/read()

S3A 特定選項

S3A 連接器支援自訂的預讀和搜尋政策選項。

名稱 類型 意義
fs.s3a.readahead.range 長整數 預讀範圍(以位元組為單位)
fs.s3a.experimental.input.fadvise 字串 搜尋政策。已由 fs.option.openfile.read.policy 取代
fs.s3a.input.async.drain.threshold 長整數 切換到非同步串流排水的閾值。(自 3.3.5 起)

如果選項設定包含 fs.s3a.select.sql 陳述式中的 SQL 陳述式,則檔案會以 S3 Select 查詢開啟。請參閱 S3A 文件以取得更多詳細資訊。

ABFS 特定選項

ABFS 連接器支援自訂的輸入串流選項。

名稱 類型 意義
fs.azure.buffered.pread.disable 布林值 停用定位讀取作業的快取。

停用透過 PositionedReadable API 讀取資料的快取。

請參閱 ABFS 文件以取得更多詳細資訊。

範例

開啟檔案時宣告搜尋政策和分割限制。

以下是概念驗證 org.apache.parquet.hadoop.util.HadoopInputFile 讀取器的範例,它使用(可為空值)檔案狀態和分割開始/結束。

FileStatus 值總是傳入,但如果它為空值,則分割結束用於宣告檔案長度。

protected SeekableInputStream newStream(Path path, FileStatus stat,
     long splitStart, long splitEnd)
     throws IOException {

   FutureDataInputStreamBuilder builder = fs.openFile(path)
   .opt("fs.option.openfile.read.policy", "vector, random")
   .withFileStatus(stat);

   builder.optLong("fs.option.openfile.split.start", splitStart);
   builder.optLong("fs.option.openfile.split.end", splitEnd);
   CompletableFuture<FSDataInputStream> streamF = builder.build();
   return HadoopStreams.wrap(FutureIO.awaitFuture(streamF));
}

因此,無論是直接由檔案清單驅動,還是從 (path, splitStart, splitEnd) 的查詢計畫開啟檔案,都不需要探查遠端儲存空間以取得檔案長度。在使用遠端物件儲存空間時,即使此類探查是異步進行,也可以節省數十到數百毫秒。

如果檔案長度和分割結束都設定,則檔案長度必須被視為「更」具權威性,也就是它真的應該定義檔案長度。如果設定分割結束,則呼叫者可能無法讀取超過它。

如果正在處理壓縮記錄,則 CompressedSplitLineReader 可以讀取分割的結尾。也就是說:它假設未完成的記錄讀取表示檔案長度大於分割長度,且它必須讀取部分讀取記錄的全部內容。其他讀取器可能會表現類似。

因此

  1. FileStatusfs.option.openfile.length 中提供的檔案長度應設定為檔案長度的嚴格上限
  2. 設定在 fs.option.openfile.split.end 中的分割結尾必須視為提示,而非檔案的嚴格結尾。

開啟具有標準和非標準選項的檔案

標準和非標準選項可以在同一個 openFile() 操作中結合使用。

Future<FSDataInputStream> f = openFile(path)
  .must("fs.option.openfile.read.policy", "random, adaptive")
  .opt("fs.s3a.readahead.range", 1024 * 1024)
  .build();

FSDataInputStream is = f.get();

設定在 must() 中的選項必須被理解,或至少被所有檔案系統識別和忽略。在此範例中,S3A 特定的選項可能會被所有其他檔案系統用戶端忽略。

使用較舊版本開啟檔案

並非所有 hadoop 版本都識別 fs.option.openfile.read.policy 選項。

如果選項是透過 opt() 建構函數參數新增,則可以在應用程式程式碼中安全地使用該選項,因為它會被視為未知的選用金鑰,然後可以捨棄。

Future<FSDataInputStream> f = openFile(path)
  .opt("fs.option.openfile.read.policy", "vector, random, adaptive")
  .build();

FSDataInputStream is = f.get();

注意 1 如果選項名稱是由對 org.apache.hadoop.fs.Options.OpenFileOptions 中常數的參考設定,則程式不會連結到沒有特定選項的 Hadoop 版本。因此,為了彈性連結到較舊版本,請使用該值的副本。

注意 2 由於選項驗證是在檔案系統連接器中執行,因此設計為與多個 hadoop 版本搭配使用的第三方連接器可能不支援該選項。

傳遞選項到 MapReduce

Hadoop MapReduce 會自動讀取具有前綴 mapreduce.job.input.file.option.mapreduce.job.input.file.must. 的 MR 工作選項,並在移除特定於 mapreduce 的前綴後,將這些值分別套用為 .opt()must()

這使得傳遞選項到 MR 工作變得簡單。例如,宣告工作應使用隨機 IO 讀取其資料

JobConf jobConf = (JobConf) job.getConfiguration()
jobConf.set(
    "mapreduce.job.input.file.option.fs.option.openfile.read.policy",
    "random");

MapReduce 輸入格式傳播選項

記錄讀取器傳遞選項到它開啟的檔案的範例。

  public void initialize(InputSplit genericSplit,
                     TaskAttemptContext context) throws IOException {
    FileSplit split = (FileSplit)genericSplit;
    Configuration job = context.getConfiguration();
    start = split.getStart();
    end = start + split.getLength();
    Path file = split.getPath();

    // open the file and seek to the start of the split
    FutureDataInputStreamBuilder builder =
      file.getFileSystem(job).openFile(file);
    // the start and end of the split may be used to build
    // an input strategy.
    builder.optLong("fs.option.openfile.split.start", start);
    builder.optLong("fs.option.openfile.split.end", end);
    FutureIO.propagateOptions(builder, job,
        "mapreduce.job.input.file.option",
        "mapreduce.job.input.file.must");

    fileIn = FutureIO.awaitFuture(builder.build());
    fileIn.seek(start)
    /* Rest of the operation on the opened stream */
  }

FileContext.openFile

org.apache.hadoop.fs.AvroFSInput;檔案會以順序輸入的方式開啟。因為檔案長度已經被探測過,所以長度會被傳遞下來

  public AvroFSInput(FileContext fc, Path p) throws IOException {
    FileStatus status = fc.getFileStatus(p);
    this.len = status.getLen();
    this.stream = awaitFuture(fc.openFile(p)
        .opt("fs.option.openfile.read.policy",
            "sequential")
        .optLong("fs.option.openfile.length",
            Long.toString(status.getLen()))
        .build());
    fc.open(p);
  }

在這個範例中,長度會透過字串傳遞下來(經由 Long.toString()),而不是直接以長整數傳遞。這是為了確保輸入格式會連結到沒有 opt(String, long)must(String, long) 建構參數的 $Hadoop 版本。同樣地,這些值會以選用的方式傳遞,這樣一來,即使應用程式無法辨識,它仍然可以成功執行。

範例:讀取整個檔案

這來自 org.apache.hadoop.util.JsonSerialization

它的 load(FileSystem, Path, FileStatus) 方法 * 宣告整個檔案要從頭到尾讀取。* 傳遞檔案狀態

public T load(FileSystem fs,
        Path path,
        status)
        throws IOException {

 try (FSDataInputStream dataInputStream =
          awaitFuture(fs.openFile(path)
              .opt("fs.option.openfile.read.policy", "whole-file")
              .withFileStatus(status)
              .build())) {
   return fromJsonStream(dataInputStream);
 } catch (JsonProcessingException e) {
   throw new PathIOException(path.toString(),
       "Failed to read JSON file " + e, e);
 }
}