org.apache.hadoop.fs.FSDataInputStream
FSDataInputStream extends DataInputStream
FSDataInputStream
的核心行為由 java.io.DataInputStream
定義,其中延伸會新增系統的主要假設。
Seekable.seek()
來偏移位元組陣列中的位移,而後續的讀取會從此位移開始。檔案透過 FileSystem.open(p)
開啟,如果成功,則會傳回
result = FSDataInputStream(0, FS.Files[p])
串流可以建模為
FSDIS = (pos, data[], isOpen)
具有存取功能
pos(FSDIS) data(FSDIS) isOpen(FSDIS)
隱含不變式:資料串流的大小等於 FileSystem.getFileStatus(Path p)
傳回的檔案大小
forall p in dom(FS.Files[p]) : len(data(FSDIS)) == FS.getFileStatus(p).length
Closeable.close()
java.io.Closeable
的語意定義在 JRE 內的介面定義中。
操作必須是冪等的;下列順序不是錯誤
FSDIS.close(); FSDIS.close();
實作應具有容錯能力。如果內部串流已關閉,應先檢查它是否為 null
。
實作不應在此操作期間引發 IOException
例外(或任何其他例外)。用戶端應用程式通常會忽略這些例外,或可能會意外失敗。
FSDIS' = ((undefined), (undefined), False)
Seekable.getPos()
傳回目前位置。串流關閉時的結果未定義。
isOpen(FSDIS)
result = pos(FSDIS)
InputStream.read()
傳回目前位置的資料。
read()
完成所需時間沒有限制。isOpen(FSDIS)
if ( pos < len(data) ): FSDIS' = (pos + 1, data, True) result = data[pos] else result = -1
InputStream.read(buffer[], offset, length)
將 length
位元組的資料讀取到目標緩衝區中,從位移 offset
開始。資料來源是串流的目前位置,如 pos
中隱含設定。
isOpen(FSDIS) buffer != null else raise NullPointerException length >= 0 offset < len(buffer) length <= len(buffer) - offset pos >= 0 else raise EOFException, IOException
前置條件失敗時可能會引發的例外為
InvalidArgumentException ArrayIndexOutOfBoundsException RuntimeException
並非所有檔案系統都會檢查 isOpen
狀態。
if length == 0 : result = 0 else if pos > len(data): result = -1 else let l = min(length, len(data)-length) : buffer' = buffer where forall i in [0..l-1]: buffer'[o+i] = data[pos+i] FSDIS' = (pos+l, data, true) result = l
java.io
API 指出,如果要讀取的資料量(即 length
),則呼叫必須封鎖,直到可用的資料量大於零,也就是說,直到有一些資料為止。呼叫不必在緩衝區已滿時傳回,或確實封鎖直到串流中沒有資料為止。
換句話說,l
並非簡單地定義為 min(length, len(data)-length)
,它嚴格來說是範圍 1..min(length, len(data)-length)
中的整數。雖然呼叫者可能預期緩衝區會盡可能填滿,但實作回傳較小的數字(可能只有一個位元組)也在規格範圍內。
關鍵在於,除非目的地緩衝區大小為 0,否則呼叫必須封鎖,直到至少回傳一個位元組。因此,對於長度大於 0 的任何資料來源,重複呼叫此 read()
作業最終會讀取所有資料。
Seekable.seek(s)
並非所有子類別都實作 Seek 作業
supported(FSDIS, Seekable.seek) else raise [UnsupportedOperationException, IOException]
如果支援此作業,則檔案應開啟
isOpen(FSDIS)
有些檔案系統不會執行此檢查,而是依賴 read()
合約來拒絕在封閉串流上讀取(例如 RawLocalFileSystem
)。
seek(0)
一定會成功,因為尋址位置必須為正數且小於串流長度
s > 0 and ((s==0) or ((s < len(data)))) else raise [EOFException, IOException]
如果未符合此條件,有些檔案系統不會引發例外。相反地,它們會在任何 read()
作業中回傳 -1,其中在讀取時,len(data(FSDIS)) < pos(FSDIS)
。
在尋址失敗後,pos(FSDIS)
的值可能會變更。例如,尋址超過 EOF 可能會將讀取位置移至檔案結尾,並引發 EOFException
。
FSDIS' = (s, data, True)
有一個隱含的不變式:尋址到目前位置不會執行任何作業
seek(getPos())
實作可能會辨識此作業並略過所有其他前置條件檢查,讓輸入串流保持不變。
最近連接到物件儲存體的連接器都實作某種形式的「延遲尋址」:seek()
呼叫可能會更新串流,而且 getPos()
的值會更新,但檔案不會在實際讀取資料之前開啟/重新開啟。延遲尋址的實作仍必須根據檔案的已知長度驗證新的尋址位置。但是,檔案的狀態(例如是否存在、目前的長度為何)不需要在此時更新。檔案已刪除或被截斷的事實可能要等到 read()
呼叫才會浮現。
Seekable.seekToNewSource(offset)
此作業指示來源從目前來源的另一個來源擷取 data[]
。這只有在檔案系統支援檔案的多個複本,而且在偏移量 offset
處有多於 1 個資料複本時才相關。
並非所有子類別都實作此作業,而是會引發例外或傳回 False
。
supported(FSDIS, Seekable.seekToNewSource) else raise [UnsupportedOperationException, IOException]
範例:CompressionInputStream
、HttpFSFileSystem
如果支援,檔案必須開啟
isOpen(FSDIS)
大多數未實作此作業的子類別只會失敗。
if not supported(FSDIS, Seekable.seekToNewSource(s)): result = False
範例:RawLocalFileSystem
、HttpFSFileSystem
如果支援此作業,而且資料有新的位置
FSDIS' = (pos, data', true) result = True
新資料是原始資料(或其更新版本,如下方一致性區段所述),但包含資料的區塊在 offset
處來自不同的複本。
如果沒有其他複本,FSDIS
就不會更新;回應會指出這一點
result = False
在測試方法之外,此方法的主要用途在於 {{FSInputChecker}} 類別,它可以透過嘗試從其他地方取得資料來回應讀取中的檢查碼錯誤。如果可以找到新的來源,它會嘗試重新讀取並重新檢查檔案的那個部分。
CanUnbuffer.unbuffer()
此作業指示來源釋放目前握有的任何系統資源,例如緩衝區、socket、檔案描述子等。任何後續的 IO 作業可能必須重新取得這些資源。在串流需要保持開啟,但預期在近期內不會從串流進行任何 IO 作業的情況下,解除緩衝很有用(範例包括檔案控制代碼快取)。
並非所有子類別都實作此作業。除了實作 CanUnbuffer
之外,子類別必須實作 StreamCapabilities
介面,而且 StreamCapabilities.hasCapability(UNBUFFER)
必須傳回 true。如果子類別實作 CanUnbuffer
,但未透過 StreamCapabilities
報告功能,則呼叫 unbuffer
沒有作用。如果子類別報告它確實實作 UNBUFFER
,但未實作 CanUnbuffer
介面,則會擲回 UnsupportedOperationException
。
supported(FSDIS, StreamCapabilities.hasCapability && FSDIS.hasCapability(UNBUFFER) && CanUnbuffer.unbuffer)
此方法不是執行緒安全的。如果在 read
進行中呼叫 unbuffer
,結果是未定義的。
unbuffer
可以呼叫已關閉的檔案,在這種情況下,unbuffer
什麼都不會做。
大多數未實作此作業的子類別只會什麼都不做。
如果支援此作業,unbuffer
會釋放與串流相關的所有系統資源。這些資源的確切清單通常取決於實作,不過一般來說,它可能包括緩衝區、socket、檔案描述子等。
PositionedReadable
PositionedReadable
作業提供「定位讀取」(「pread」)。它們提供從資料串流中特定位置讀取資料到緩衝區的功能。定位讀取等於在特定偏移量處執行 Seekable.seek
,然後執行 InputStream.read(buffer[], offset, length)
,只不過是單一方法呼叫,而不是先執行 seek
再執行 read
,而且兩個定位讀取可以選擇性地透過 FSDataInputStream
串流的單一執行個體同時執行。
介面宣告定位讀取為執行緒安全(某些實作並未遵循此保證)。
任何與串流作業同時執行的定位讀取(例如 Seekable.seek
、Seekable.getPos()
和 InputStream.read()
)都必須獨立執行;不能相互干擾。
同時執行的定位讀取和串流作業必須是可序列化;其中一個可能會封鎖另一個,讓它們依序執行,但為了提升吞吐量和「活性」,它們應該同時執行。
假設兩個並行的定位讀取,一個在 pos1
處讀取 len1
到緩衝區 dest1
,另一個在 pos2
處讀取 len2
到緩衝區 dest2
,而且在尋至 pos3
之後執行同時的串流讀取,則結果緩衝區必須填入如下資料,即使讀取在底層串流中重疊。
// Positioned read #1 read(pos1, dest1, ... len1) -> dest1[0..len1 - 1] = [data(FS, path, pos1), data(FS, path, pos1 + 1) ... data(FS, path, pos1 + len1 - 1] // Positioned read #2 read(pos2, dest2, ... len2) -> dest2[0..len2 - 1] = [data(FS, path, pos2), data(FS, path, pos2 + 1) ... data(FS, path, pos2 + len2 - 1] // Stream read seek(pos3); read(dest3, ... len3) -> dest3[0..len3 - 1] = [data(FS, path, pos3), data(FS, path, pos3 + 1) ... data(FS, path, pos3 + len3 - 1]
請注意,實作不需要是原子性的;作業的中間狀態(getPos()
值的變更)可能會被看見。
並非所有 FSDataInputStream
實作都支援這些作業。未實作 Seekable.seek()
的實作不會實作 PositionedReadable
介面。
supported(FSDIS, Seekable.seek) else raise [UnsupportedOperationException, IOException]
這可以視為理所當然:如果串流不是 Seekable
,則用戶端無法尋至某個位置。這也是使用 Seekable.seek()
的基底類別實作的副作用。
隱含不變式:對於所有 PositionedReadable
作業,pos
的值在作業結束時不變
pos(FSDIS') == pos(FSDIS)
對於任何失敗的作業,目的地 buffer
的內容都是未定義的。實作可能會在回報失敗之前覆寫緩衝區的部分或全部。在這種情況下,可能會省略檢查串流是否位於檔案結尾。
int PositionedReadable.read(position, buffer, offset, length)
盡可能讀取資料到為其配置的緩衝區空間。
position >= 0 else raise [EOFException, IOException, IllegalArgumentException, RuntimeException] len(buffer) - offset >= length else raise [IndexOutOfBoundException, RuntimeException] length >= 0 offset >= 0
讀取的資料量為長度或從指定位置可取得的資料量中較小的值
let available = min(length, len(data)-position) buffer'[offset..(offset+available-1)] = data[position..position+available -1] result = available
length==0
隱含地不會讀取任何資料;實作可能會簡化作業並省略任何 IO。在這種情況下,可能會省略檢查串流是否位於檔案結尾。buffer
的最終狀態將是未定義的。void PositionedReadable.readFully(position, buffer, offset, length)
讀取剛好 length
位元的資料到緩衝區中,如果沒有足夠的資料可用,則會失敗。
position >= 0 else raise [EOFException, IOException, IllegalArgumentException, RuntimeException] length >= 0 offset >= 0 len(buffer) - offset >= length else raise [IndexOutOfBoundException, RuntimeException] (position + length) <= len(data) else raise [EOFException, IOException]
如果在讀取作業期間發生 IO 例外,buffer
的最終狀態將是未定義的。
如果輸入串流中沒有足夠的資料來滿足請求,則 buffer
的最後狀態是不確定的。
從偏移量 offset
開始的緩衝區會填入從 position
開始的資料
buffer'[offset..(offset+length-1)] = data[position..(position + length -1)]
PositionedReadable.readFully(position, buffer)
其語意完全等同於
readFully(position, buffer, 0, len(buffer))
也就是說,緩衝區會完全填入從位置 position
開始的輸入來源內容
default void readVectored(List<? extends FileRange> ranges, IntFunction<ByteBuffer> allocate)
非同步地讀取一串範圍的完整資料。預設實作會遍歷範圍,嘗試根據 minSeekForVectorReads
和 maxReadSizeForVectorReads
的值合併範圍,然後同步讀取每個合併的範圍,但目的是子類別可以實作有效率的實作。支援在直接和堆積緩衝區中讀取。此外,建議客戶端使用 WeakReferencedElasticByteBufferPool
來配置緩衝區,這樣一來,即使是直接緩衝區,只要不再被參照,就會被垃圾回收。
readVectored()
之後,getPos()
傳回的位置是不確定的。
如果在 readVectored()
作業進行時檔案有變更,輸出是不確定的。有些範圍可能有舊資料,有些可能有新資料,有些可能兩者都有。
當 readVectored()
作業進行時,正常的讀取 API 呼叫可能會被封鎖。
注意:不要使用直接緩衝區從 ChecksumFileSystem 讀取,因為這可能會導致 HADOOP-18296 中說明的記憶體分段。
對於每個要求的範圍
range.getOffset >= 0 else raise IllegalArgumentException range.getLength >= 0 else raise EOFException
對於每個要求的範圍
range.getData() returns CompletableFuture<ByteBuffer> which will have data from range.getOffset to range.getLength.
minSeekForVectorReads()
最小的合理搜尋。如果第一個範圍的結束與下一個範圍的開始之間的差異大於這個值,則不會將兩個範圍合併在一起。
maxReadSizeForVectorReads()
合併範圍後,一次可以讀取的最大位元組數。如果要讀取的合併資料大於這個值,則不會將兩個範圍合併在一起。基本上,將這個值設為 0 會停用範圍合併。
FileSystem.open(p)
提供的資料串流 FSDIS 的所有讀取器(本機和遠端)都能在開啟時存取 FS.Files[p]
的資料。在時間 t0
FSDIS0 = FS'read(p) = (0, data0[])
在時間 t1
FS' = FS' where FS'.Files[p] = data1
從時間 t >= t1
開始,FSDIS0
的值未定義。
它可能保持不變
FSDIS0.data == data0 forall l in len(FSDIS0.data): FSDIS0.read() == data0[l]
它可能會擷取新資料
FSDIS0.data == data1 forall l in len(FSDIS0.data): FSDIS0.read() == data1[l]
它可能會不一致,例如,讀取偏移量會傳回來自任何一個資料集的資料
forall l in len(FSDIS0.data): (FSDIS0.read(l) == data0[l]) or (FSDIS0.read(l) == data1[l]))
也就是說,讀取的每個值都可能是來自原始檔案或已更新的檔案。
它也可能在重複讀取同一個偏移量時不一致,也就是在時間 t2 > t1
r2 = FSDIS0.read(l)
而在時間 t3 > t2
r3 = FSDIS0.read(l)
r3 != r2
。(也就是說,某些資料可能會快取或複製,而在後續讀取時,會傳回檔案內容的不同版本)。
類似地,如果路徑 p
中的資料被刪除,這個變更在對 FSDIS0
執行讀取作業時可能會或可能不會被看見。