類別 org.apache.hadoop.fs.FSDataInputStream

類別 FSDataInputStream extends DataInputStream

FSDataInputStream 的核心行為由 java.io.DataInputStream 定義,其中延伸會新增系統的主要假設。

  1. 來源是本機或遠端檔案系統。
  2. 正在讀取的串流會參照有限的位元組陣列。
  3. 資料長度在讀取過程中不會變更。
  4. 資料內容在過程中不會變更。
  5. 原始檔案會在讀取過程中保持存在。
  6. 呼叫者可以使用 Seekable.seek() 來偏移位元組陣列中的位移,而後續的讀取會從此位移開始。
  7. 向前和向後尋查的成本很低。
  8. 串流實作不需要執行緒安全。
  9. 但是,如果串流實作 PositionedReadable,則「定位讀取」必須執行緒安全。

檔案透過 FileSystem.open(p) 開啟,如果成功,則會傳回

result = FSDataInputStream(0, FS.Files[p])

串流可以建模為

FSDIS = (pos, data[], isOpen)

具有存取功能

pos(FSDIS)
data(FSDIS)
isOpen(FSDIS)

隱含不變式:資料串流的大小等於 FileSystem.getFileStatus(Path p) 傳回的檔案大小

forall p in dom(FS.Files[p]) :
len(data(FSDIS)) == FS.getFileStatus(p).length

Closeable.close()

java.io.Closeable 的語意定義在 JRE 內的介面定義中。

操作必須是冪等的;下列順序不是錯誤

FSDIS.close();
FSDIS.close();

實作注意事項

  • 實作應具有容錯能力。如果內部串流已關閉,應先檢查它是否為 null

  • 實作不應在此操作期間引發 IOException 例外(或任何其他例外)。用戶端應用程式通常會忽略這些例外,或可能會意外失敗。

後置條件

FSDIS' = ((undefined), (undefined), False)

Seekable.getPos()

傳回目前位置。串流關閉時的結果未定義。

前置條件

isOpen(FSDIS)

後置條件

result = pos(FSDIS)

InputStream.read()

傳回目前位置的資料。

  1. 串流關閉時,實作應失敗。
  2. read() 完成所需時間沒有限制。

前置條件

isOpen(FSDIS)

後置條件

if ( pos < len(data) ):
   FSDIS' = (pos + 1, data, True)
   result = data[pos]
else
    result = -1

InputStream.read(buffer[], offset, length)

length 位元組的資料讀取到目標緩衝區中,從位移 offset 開始。資料來源是串流的目前位置,如 pos 中隱含設定。

前置條件

isOpen(FSDIS)
buffer != null else raise NullPointerException
length >= 0
offset < len(buffer)
length <= len(buffer) - offset
pos >= 0 else raise EOFException, IOException

前置條件失敗時可能會引發的例外為

InvalidArgumentException
ArrayIndexOutOfBoundsException
RuntimeException

並非所有檔案系統都會檢查 isOpen 狀態。

後置條件

if length == 0 :
  result = 0

else if pos > len(data):
  result = -1

else
  let l = min(length, len(data)-length) :
    buffer' = buffer where forall i in [0..l-1]:
       buffer'[o+i] = data[pos+i]
    FSDIS' = (pos+l, data, true)
    result = l

java.io API 指出,如果要讀取的資料量(即 length),則呼叫必須封鎖,直到可用的資料量大於零,也就是說,直到有一些資料為止。呼叫不必在緩衝區已滿時傳回,或確實封鎖直到串流中沒有資料為止。

換句話說,l 並非簡單地定義為 min(length, len(data)-length),它嚴格來說是範圍 1..min(length, len(data)-length) 中的整數。雖然呼叫者可能預期緩衝區會盡可能填滿,但實作回傳較小的數字(可能只有一個位元組)也在規格範圍內。

關鍵在於,除非目的地緩衝區大小為 0,否則呼叫必須封鎖,直到至少回傳一個位元組。因此,對於長度大於 0 的任何資料來源,重複呼叫此 read() 作業最終會讀取所有資料。

Seekable.seek(s)

前置條件

並非所有子類別都實作 Seek 作業

supported(FSDIS, Seekable.seek) else raise [UnsupportedOperationException, IOException]

如果支援此作業,則檔案應開啟

isOpen(FSDIS)

有些檔案系統不會執行此檢查,而是依賴 read() 合約來拒絕在封閉串流上讀取(例如 RawLocalFileSystem)。

seek(0) 一定會成功,因為尋址位置必須為正數且小於串流長度

s > 0 and ((s==0) or ((s < len(data)))) else raise [EOFException, IOException]

如果未符合此條件,有些檔案系統不會引發例外。相反地,它們會在任何 read() 作業中回傳 -1,其中在讀取時,len(data(FSDIS)) < pos(FSDIS)

在尋址失敗後,pos(FSDIS) 的值可能會變更。例如,尋址超過 EOF 可能會將讀取位置移至檔案結尾,並引發 EOFException

後置條件

FSDIS' = (s, data, True)

有一個隱含的不變式:尋址到目前位置不會執行任何作業

seek(getPos())

實作可能會辨識此作業並略過所有其他前置條件檢查,讓輸入串流保持不變。

最近連接到物件儲存體的連接器都實作某種形式的「延遲尋址」:seek() 呼叫可能會更新串流,而且 getPos() 的值會更新,但檔案不會在實際讀取資料之前開啟/重新開啟。延遲尋址的實作仍必須根據檔案的已知長度驗證新的尋址位置。但是,檔案的狀態(例如是否存在、目前的長度為何)不需要在此時更新。檔案已刪除或被截斷的事實可能要等到 read() 呼叫才會浮現。

Seekable.seekToNewSource(offset)

此作業指示來源從目前來源的另一個來源擷取 data[]。這只有在檔案系統支援檔案的多個複本,而且在偏移量 offset 處有多於 1 個資料複本時才相關。

前置條件

並非所有子類別都實作此作業,而是會引發例外或傳回 False

supported(FSDIS, Seekable.seekToNewSource) else raise [UnsupportedOperationException, IOException]

範例:CompressionInputStreamHttpFSFileSystem

如果支援,檔案必須開啟

isOpen(FSDIS)

後置條件

大多數未實作此作業的子類別只會失敗。

if not supported(FSDIS, Seekable.seekToNewSource(s)):
    result = False

範例:RawLocalFileSystemHttpFSFileSystem

如果支援此作業,而且資料有新的位置

    FSDIS' = (pos, data', true)
    result = True

新資料是原始資料(或其更新版本,如下方一致性區段所述),但包含資料的區塊在 offset 處來自不同的複本。

如果沒有其他複本,FSDIS 就不會更新;回應會指出這一點

    result = False

在測試方法之外,此方法的主要用途在於 {{FSInputChecker}} 類別,它可以透過嘗試從其他地方取得資料來回應讀取中的檢查碼錯誤。如果可以找到新的來源,它會嘗試重新讀取並重新檢查檔案的那個部分。

CanUnbuffer.unbuffer()

此作業指示來源釋放目前握有的任何系統資源,例如緩衝區、socket、檔案描述子等。任何後續的 IO 作業可能必須重新取得這些資源。在串流需要保持開啟,但預期在近期內不會從串流進行任何 IO 作業的情況下,解除緩衝很有用(範例包括檔案控制代碼快取)。

前置條件

並非所有子類別都實作此作業。除了實作 CanUnbuffer 之外,子類別必須實作 StreamCapabilities 介面,而且 StreamCapabilities.hasCapability(UNBUFFER) 必須傳回 true。如果子類別實作 CanUnbuffer,但未透過 StreamCapabilities 報告功能,則呼叫 unbuffer 沒有作用。如果子類別報告它確實實作 UNBUFFER,但未實作 CanUnbuffer 介面,則會擲回 UnsupportedOperationException

supported(FSDIS, StreamCapabilities.hasCapability && FSDIS.hasCapability(UNBUFFER) && CanUnbuffer.unbuffer)

此方法不是執行緒安全的。如果在 read 進行中呼叫 unbuffer,結果是未定義的。

unbuffer 可以呼叫已關閉的檔案,在這種情況下,unbuffer 什麼都不會做。

後置條件

大多數未實作此作業的子類別只會什麼都不做。

如果支援此作業,unbuffer 會釋放與串流相關的所有系統資源。這些資源的確切清單通常取決於實作,不過一般來說,它可能包括緩衝區、socket、檔案描述子等。

介面 PositionedReadable

PositionedReadable 作業提供「定位讀取」(「pread」)。它們提供從資料串流中特定位置讀取資料到緩衝區的功能。定位讀取等於在特定偏移量處執行 Seekable.seek,然後執行 InputStream.read(buffer[], offset, length),只不過是單一方法呼叫,而不是先執行 seek 再執行 read,而且兩個定位讀取可以選擇性地透過 FSDataInputStream 串流的單一執行個體同時執行。

介面宣告定位讀取為執行緒安全(某些實作並未遵循此保證)。

任何與串流作業同時執行的定位讀取(例如 Seekable.seekSeekable.getPos()InputStream.read())都必須獨立執行;不能相互干擾。

同時執行的定位讀取和串流作業必須是可序列化;其中一個可能會封鎖另一個,讓它們依序執行,但為了提升吞吐量和「活性」,它們應該同時執行。

假設兩個並行的定位讀取,一個在 pos1 處讀取 len1 到緩衝區 dest1,另一個在 pos2 處讀取 len2 到緩衝區 dest2,而且在尋至 pos3 之後執行同時的串流讀取,則結果緩衝區必須填入如下資料,即使讀取在底層串流中重疊。

// Positioned read #1
read(pos1, dest1, ... len1) -> dest1[0..len1 - 1] =
  [data(FS, path, pos1), data(FS, path, pos1 + 1) ... data(FS, path, pos1 + len1 - 1]

// Positioned read #2
read(pos2, dest2, ... len2) -> dest2[0..len2 - 1] =
  [data(FS, path, pos2), data(FS, path, pos2 + 1) ... data(FS, path, pos2 + len2 - 1]

// Stream read
seek(pos3);
read(dest3, ... len3) -> dest3[0..len3 - 1] =
  [data(FS, path, pos3), data(FS, path, pos3 + 1) ... data(FS, path, pos3 + len3 - 1]

請注意,實作不需要是原子性的;作業的中間狀態(getPos() 值的變更)可能會被看見。

實作前置條件

並非所有 FSDataInputStream 實作都支援這些作業。未實作 Seekable.seek() 的實作不會實作 PositionedReadable 介面。

supported(FSDIS, Seekable.seek) else raise [UnsupportedOperationException, IOException]

這可以視為理所當然:如果串流不是 Seekable,則用戶端無法尋至某個位置。這也是使用 Seekable.seek() 的基底類別實作的副作用。

隱含不變式:對於所有 PositionedReadable 作業,pos 的值在作業結束時不變

pos(FSDIS') == pos(FSDIS)

失敗狀態

對於任何失敗的作業,目的地 buffer 的內容都是未定義的。實作可能會在回報失敗之前覆寫緩衝區的部分或全部。在這種情況下,可能會省略檢查串流是否位於檔案結尾。

int PositionedReadable.read(position, buffer, offset, length)

盡可能讀取資料到為其配置的緩衝區空間。

前置條件

position >= 0 else raise [EOFException, IOException, IllegalArgumentException, RuntimeException]
len(buffer) - offset >= length else raise [IndexOutOfBoundException, RuntimeException]
length >= 0
offset >= 0

後置條件

讀取的資料量為長度或從指定位置可取得的資料量中較小的值

let available = min(length, len(data)-position)
buffer'[offset..(offset+available-1)] = data[position..position+available -1]
result = available
  1. -1 的回傳值表示串流沒有更多可用的資料。
  2. 呼叫 length==0 隱含地不會讀取任何資料;實作可能會簡化作業並省略任何 IO。在這種情況下,可能會省略檢查串流是否位於檔案結尾。
  3. 如果在讀取作業期間發生 IO 例外,buffer 的最終狀態將是未定義的。

void PositionedReadable.readFully(position, buffer, offset, length)

讀取剛好 length 位元的資料到緩衝區中,如果沒有足夠的資料可用,則會失敗。

前置條件

position >= 0 else raise [EOFException, IOException, IllegalArgumentException, RuntimeException]
length >= 0
offset >= 0
len(buffer) - offset >= length else raise [IndexOutOfBoundException, RuntimeException]
(position + length) <= len(data) else raise [EOFException, IOException]

如果在讀取作業期間發生 IO 例外,buffer 的最終狀態將是未定義的。

如果輸入串流中沒有足夠的資料來滿足請求,則 buffer 的最後狀態是不確定的。

後置條件

從偏移量 offset 開始的緩衝區會填入從 position 開始的資料

buffer'[offset..(offset+length-1)] = data[position..(position + length -1)]

PositionedReadable.readFully(position, buffer)

其語意完全等同於

readFully(position, buffer, 0, len(buffer))

也就是說,緩衝區會完全填入從位置 position 開始的輸入來源內容

default void readVectored(List<? extends FileRange> ranges, IntFunction<ByteBuffer> allocate)

非同步地讀取一串範圍的完整資料。預設實作會遍歷範圍,嘗試根據 minSeekForVectorReadsmaxReadSizeForVectorReads 的值合併範圍,然後同步讀取每個合併的範圍,但目的是子類別可以實作有效率的實作。支援在直接和堆積緩衝區中讀取。此外,建議客戶端使用 WeakReferencedElasticByteBufferPool 來配置緩衝區,這樣一來,即使是直接緩衝區,只要不再被參照,就會被垃圾回收。

readVectored() 之後,getPos() 傳回的位置是不確定的。

如果在 readVectored() 作業進行時檔案有變更,輸出是不確定的。有些範圍可能有舊資料,有些可能有新資料,有些可能兩者都有。

readVectored() 作業進行時,正常的讀取 API 呼叫可能會被封鎖。

注意:不要使用直接緩衝區從 ChecksumFileSystem 讀取,因為這可能會導致 HADOOP-18296 中說明的記憶體分段。

前置條件

對於每個要求的範圍

range.getOffset >= 0 else raise IllegalArgumentException
range.getLength >= 0 else raise EOFException

後置條件

對於每個要求的範圍

range.getData() returns CompletableFuture<ByteBuffer> which will have data
from range.getOffset to range.getLength.

minSeekForVectorReads()

最小的合理搜尋。如果第一個範圍的結束與下一個範圍的開始之間的差異大於這個值,則不會將兩個範圍合併在一起。

maxReadSizeForVectorReads()

合併範圍後,一次可以讀取的最大位元組數。如果要讀取的合併資料大於這個值,則不會將兩個範圍合併在一起。基本上,將這個值設為 0 會停用範圍合併。

一致性

  • 預期從 FileSystem.open(p) 提供的資料串流 FSDIS 的所有讀取器(本機和遠端)都能在開啟時存取 FS.Files[p] 的資料。
  • 如果在讀取過程中變更基礎資料,這些變更可能會或可能不會顯示。
  • 顯示的變更可能會部分顯示。

在時間 t0

FSDIS0 = FS'read(p) = (0, data0[])

在時間 t1

FS' = FS' where FS'.Files[p] = data1

從時間 t >= t1 開始,FSDIS0 的值未定義。

它可能保持不變

FSDIS0.data == data0

forall l in len(FSDIS0.data):
  FSDIS0.read() == data0[l]

它可能會擷取新資料

FSDIS0.data == data1

forall l in len(FSDIS0.data):
  FSDIS0.read() == data1[l]

它可能會不一致,例如,讀取偏移量會傳回來自任何一個資料集的資料

forall l in len(FSDIS0.data):
  (FSDIS0.read(l) == data0[l]) or (FSDIS0.read(l) == data1[l]))

也就是說,讀取的每個值都可能是來自原始檔案或已更新的檔案。

它也可能在重複讀取同一個偏移量時不一致,也就是在時間 t2 > t1

r2 = FSDIS0.read(l)

而在時間 t3 > t2

r3 = FSDIS0.read(l)

r3 != r2。(也就是說,某些資料可能會快取或複製,而在後續讀取時,會傳回檔案內容的不同版本)。

類似地,如果路徑 p 中的資料被刪除,這個變更在對 FSDIS0 執行讀取作業時可能會或可能不會被看見。