Skip to content

Commit 8f46d87

Browse files
authored
Merge pull request #190 from JohnLCaron/H5readConcurrent
H5readConcurrent
2 parents 9c9aefb + db9a3a8 commit 8f46d87

15 files changed

Lines changed: 583 additions & 59 deletions

File tree

core/src/commonMain/kotlin/com/sunya/cdm/api/Netchdf.kt

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,19 @@ interface Netchdf : AutoCloseable {
1818
// TODO I think the output type is not always the input type
1919
fun <T> readArrayData(v2: Variable<T>, section: SectionPartial? = null) : ArrayTyped<T>
2020

21+
// iterate over all the chunks in section, order is arbitrary.
2122
fun <T> chunkIterator(v2: Variable<T>, section: SectionPartial? = null, maxElements : Int? = null) : Iterator<ArraySection<T>>
2223
}
2324

2425
// the section describes the array chunk reletive to the variable's shape.
2526
data class ArraySection<T>(val array : ArrayTyped<T>, val section : Section)
2627

2728
// Experimental: read concurrently chunks of data, call back with lamda, order is arbitrary.
28-
fun <T> Netchdf.chunkConcurrent(v2: Variable<T>, section: SectionPartial? = null, maxElements : Int? = null, nthreads: Int = 20,
29-
lamda : (ArraySection<T>) -> Unit) {
29+
fun <T> Netchdf.readChunksConcurrent(v2: Variable<T>,
30+
section: SectionPartial? = null,
31+
maxElements : Int? = null,
32+
nthreads: Int = 20,
33+
lamda : (ArraySection<T>) -> Unit) {
3034
val reader = ReadChunkConcurrent()
3135
val chunkIter = this.chunkIterator( v2, section, maxElements)
3236
reader.readChunks(nthreads, chunkIter, lamda)

core/src/commonMain/kotlin/com/sunya/cdm/layout/Tiling.kt

Lines changed: 36 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,38 +6,38 @@ import kotlin.math.min
66
/**
77
* A Tiling divides a multidimensional index space into tiles.
88
* Indices are points in the original multidimensional index space.
9-
* Tiles are points in the tiled space.
9+
* Tiles are points in the "tiled space" ~ varShape / chunkShape
1010
* Each tile has the same index size, given by chunk.
1111
*
1212
* Allows to efficiently find the data chunks that cover an arbitrary section of the variable's index space.
1313
*
14-
* @param varshape the variable's shape
15-
* @param chunk actual data storage has this shape. May be larger than the shape, last dim ignored if rank > varshape.
14+
* @param varShape the variable's shape
15+
* @param chunkShape actual data storage has this shape. May be larger than the shape, last dim ignored if rank > varshape.
1616
*/
17-
class Tiling(varshape: LongArray, chunkIn: LongArray) {
18-
val chunk = chunkIn.copyOf()
17+
class Tiling(varShape: LongArray, chunkShape: LongArray) {
18+
val chunk = chunkShape.copyOf()
1919
val rank: Int
2020
val tileShape : LongArray // overall shape of the dataset's tile space
21-
private val indexShape : LongArray // overall shape of the dataset's index space - may be larger than actual variable shape
22-
private val strider : LongArray // for computing tile index
21+
val indexShape : LongArray // overall shape of the dataset's index space - may be larger than actual variable shape
22+
val tileStrider : LongArray // for computing tile index
2323

2424
init {
2525
// convenient to allow tileSize to have (an) extra dimension at the end
2626
// to accommodate hdf5 storage, which has the element size
27-
require(varshape.size <= chunk.size)
28-
rank = varshape.size
27+
require(varShape.size <= chunk.size)
28+
rank = varShape.size
2929
this.indexShape = LongArray(rank)
3030
for (i in 0 until rank) {
31-
this.indexShape[i] = max(varshape[i], chunk[i])
31+
this.indexShape[i] = max(varShape[i], chunk[i])
3232
}
3333
this.tileShape = LongArray(rank)
3434
for (i in 0 until rank) {
3535
tileShape[i] = (this.indexShape[i] + chunk[i] - 1) / chunk[i]
3636
}
37-
strider = LongArray(rank)
37+
tileStrider = LongArray(rank)
3838
var accumStride = 1L
3939
for (k in rank - 1 downTo 0) {
40-
strider[k] = accumStride
40+
tileStrider[k] = accumStride
4141
accumStride *= tileShape[k]
4242
}
4343
}
@@ -57,27 +57,43 @@ class Tiling(varshape: LongArray, chunkIn: LongArray) {
5757
return tile
5858
}
5959

60-
/** Compute the minimum index of a tile, inverse of tile().
60+
/** Compute the left upper index of a tile, inverse of tile().
6161
* This will match a key in the datachunk btree, up to rank dimensions */
6262
fun index(tile: LongArray): LongArray {
6363
return LongArray(rank) { idx -> tile[idx] * chunk[idx] }
6464
}
6565

6666
/**
67-
* Get order based on which tile the pt belongs to
68-
* LOOK you could do this without using the tile
67+
* Get order based on which tile the index pt belongs to.
68+
* This is the linear ordering of the tile.
6969
*
70-
* @param pt index point
70+
* @param index index point
7171
* @return order number based on which tile the pt belongs to
7272
*/
73-
fun order(pt: LongArray): Long {
74-
val tile = tile(pt)
73+
fun order(index: LongArray): Long {
74+
val tile = tile(index)
7575
var order = 0L
76-
val useRank = min(rank, pt.size) // eg varlen (datatype 9) has mismatch
77-
for (i in 0 until useRank) order += strider[i] * tile[i]
76+
val useRank = min(rank, index.size) // eg varlen (datatype 9) has mismatch
77+
for (i in 0 until useRank) order += tileStrider[i] * tile[i]
7878
return order
7979
}
8080

81+
/** inverse of order() */
82+
fun orderToIndex(order: Long) : LongArray {
83+
// calculate tile
84+
val tile = LongArray(rank)
85+
var rem = order
86+
87+
for (k in 0 until rank) {
88+
tile[k] = rem / tileStrider[k]
89+
rem = rem - (tile[k] * tileStrider[k])
90+
}
91+
print("tile $order = ${tile.contentToString()}")
92+
93+
// convert to index
94+
return index(tile)
95+
}
96+
8197
/**
8298
* Create an ordering of index points based on which tile the point is in.
8399
*
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
@file:OptIn(InternalLibraryApi::class)
2+
3+
package com.sunya.netchdf.hdf5
4+
5+
import com.sunya.cdm.iosp.OpenFileState
6+
import com.sunya.cdm.layout.Tiling
7+
import com.sunya.cdm.util.InternalLibraryApi
8+
import kotlin.collections.mutableListOf
9+
10+
/** a BTree1 that uses OpenFileExtended */
11+
internal class BTree1ext(
12+
val raf: OpenFileExtended,
13+
val rootNodeAddress: Long,
14+
val nodeType : Int, // 0 = group/symbol table, 1 = raw data chunks
15+
varShape: LongArray,
16+
chunkShape: LongArray,
17+
) {
18+
val tiling = Tiling(varShape, chunkShape)
19+
val ndimStorage = chunkShape.size
20+
21+
init {
22+
// println(" BTreeNode varShape ${varShape.contentToString()} chunkShape ${chunkShape.contentToString()}")
23+
require (nodeType == 1)
24+
}
25+
26+
fun rootNode(): BTreeNode = BTreeNode(rootNodeAddress, null)
27+
28+
// here both internal and leaf are the same structure
29+
// Btree nodes Level 1A1 - Version 1 B-trees
30+
inner class BTreeNode(val address: Long, val parent: BTreeNode?) {
31+
val level: Int
32+
val nentries: Int
33+
private val leftAddress: Long
34+
private val rightAddress: Long
35+
36+
// type 1
37+
val keys = mutableListOf<LongArray>()
38+
val values = mutableListOf<DataChunkIF>()
39+
val children = mutableListOf<BTreeNode>()
40+
41+
init {
42+
val state = OpenFileState(raf.getFileOffset(address), false)
43+
val magic: String = raf.readString(state, 4)
44+
check(magic == "TREE") { "DataBTree doesnt start with TREE" }
45+
46+
val type: Int = raf.readByte(state).toInt()
47+
check(type == nodeType) { "DataBTree must be type $nodeType" }
48+
49+
level = raf.readByte(state).toInt() // leaf nodes are level 0
50+
nentries = raf.readShort(state).toInt() // number of children to which this node points
51+
leftAddress = raf.readOffset(state)
52+
rightAddress = raf.readOffset(state)
53+
54+
// println(" BTreeNode level $level nentries $nentries")
55+
56+
for (idx in 0 until nentries) {
57+
val chunkSize = raf.readInt(state)
58+
val filterMask = raf.readInt(state)
59+
val inner = LongArray(ndimStorage) { j -> raf.readLong(state) }
60+
val key = DataChunkKey(chunkSize, filterMask, inner)
61+
val childPointer = raf.readAddress(state) // 4 or 8 bytes, then add fileOffset
62+
if (level == 0) {
63+
keys.add(inner)
64+
values.add(DataChunkEntry1(level, this, idx, key, childPointer))
65+
} else {
66+
children.add(BTreeNode(childPointer, this))
67+
}
68+
}
69+
70+
// note there may be unused entries, "All nodes of a particular type of tree have the same maximum degree,
71+
// but most nodes will point to less than that number of children""
72+
}
73+
74+
// return only the leaf nodes, in any order
75+
fun asSequence(): Sequence<Pair<Long, DataChunkIF>> = sequence {
76+
// Handle child nodes recursively (in-order traversal)
77+
if (children.isNotEmpty()) {
78+
children.forEachIndexed { index, childNode ->
79+
yieldAll(childNode.asSequence()) // Yield all elements from the child
80+
}
81+
} else { // If it's a leaf node (no children)
82+
keys.forEachIndexed { index, key ->
83+
yield(tiling.order(key) to values[index]) // Yield all key-value pairs
84+
}
85+
}
86+
}
87+
}
88+
89+
data class DataChunkKey(val chunkSize: Int, val filterMask : Int, val offsets: LongArray) {
90+
override fun equals(other: Any?): Boolean {
91+
if (this === other) return true
92+
if (other !is DataChunkKey) return false
93+
if (!offsets.contentEquals(other.offsets)) return false
94+
return true
95+
}
96+
97+
override fun hashCode(): Int {
98+
return offsets.contentHashCode()
99+
}
100+
}
101+
102+
// childAddress = data chunk (level 1) else a child node
103+
data class DataChunkEntry1(val level : Int, val parent : BTreeNode, val idx : Int, val key : DataChunkKey, val childAddress : Long) : DataChunkIF {
104+
override fun childAddress() = childAddress
105+
override fun offsets() = key.offsets
106+
override fun isMissing() = (childAddress == -1L)
107+
override fun chunkSize() = key.chunkSize
108+
override fun filterMask() = key.filterMask
109+
110+
override fun show(tiling : Tiling) : String = "chunkSize=${key.chunkSize}, chunkStart=${key.offsets.contentToString()}" +
111+
", tile= ${tiling.tile(key.offsets).contentToString()} idx=$idx"
112+
}
113+
}
114+

core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/BTree2j.kt

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -98,10 +98,6 @@ internal class BTree2j(private val h5: H5builder, owner: String, address: Long,
9898
readRecords(childAddress, depth - 1, numberOfChildRecords, totalNumberOfChildRecords)
9999
}
100100
}
101-
102-
// bb.limit(bb.position() + 4);
103-
// bb.rewind();
104-
// ChecksumUtils.validateChecksum(bb);
105101
}
106102

107103
// heroic jhdf

core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/H5TiledData1.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import com.sunya.cdm.layout.IndexSpace
44
import com.sunya.cdm.layout.IndexND
55
import com.sunya.cdm.layout.Tiling
66

7-
/** wraps BTree1 and BTree2 to handle iterating through tiled data (aka chunked data) */
7+
/** wraps BTree1 to handle iterating through tiled data (aka chunked data) */
88
internal class H5TiledData1(val btree : BTree1, val varShape: LongArray, val chunkShape: LongArray) {
99
private val check = true
1010
private val debug = false

core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/H5builder.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ class H5builder(
3232
) {
3333
val raf: OpenFileIF
3434

35-
private val superblockStart: Long // may be offset for arbitrary metadata
35+
val superblockStart: Long // may be offset for arbitrary metadata
3636
var sizeOffsets: Int = 0
3737
var sizeLengths: Int = 0
3838
var sizeHeapId = 0

core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/H5chunkIterator.kt

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,15 @@ package com.sunya.netchdf.hdf5
22

33
import com.sunya.cdm.api.*
44
import com.sunya.cdm.array.ArrayTyped
5+
import com.sunya.cdm.iosp.OpenFileIF
56
import com.sunya.cdm.iosp.OpenFileState
67
import com.sunya.cdm.layout.Chunker
78
import com.sunya.cdm.layout.IndexSpace
89
import com.sunya.cdm.layout.transferMissingNelems
910
import com.sunya.cdm.util.InternalLibraryApi
1011

1112
// TODO assumes BTree1, could it include BTree2? any chunked reader ?
12-
// only used in Netchdf.chunkConcurrent
13+
// only used in Netchdf.readChunksConcurrent
1314

1415
@OptIn(InternalLibraryApi::class)
1516
internal class H5chunkIterator<T>(val h5 : H5builder, val v2: Variable<T>, val wantSection : Section) : AbstractIterator<ArraySection<T>>() {
@@ -31,8 +32,8 @@ internal class H5chunkIterator<T>(val h5 : H5builder, val v2: Variable<T>, val w
3132
elemSize = vinfo.storageDims[vinfo.storageDims.size - 1].toInt() // last one is always the elements size
3233
datatype = h5type.datatype()
3334

34-
val btreeNew = BTree1(h5, vinfo.dataPos, 1, vinfo.storageDims.size)
35-
tiledData = H5TiledData1(btreeNew, v2.shape, vinfo.storageDims)
35+
val btree1 = BTree1(h5, vinfo.dataPos, 1, vinfo.storageDims.size)
36+
tiledData = H5TiledData1(btree1, v2.shape, vinfo.storageDims)
3637
filters = FilterPipeline(v2.name, vinfo.mfp, h5type.isBE)
3738
if (debugChunking) println(" H5chunkIterator tiles=${tiledData.tiling}")
3839

@@ -87,3 +88,26 @@ internal class H5chunkIterator<T>(val h5 : H5builder, val v2: Variable<T>, val w
8788
return ArraySection(array, intersectSpace.section(v2.shape)) // LOOK use space instead of Section ??
8889
}
8990
}
91+
92+
// for H5readConcurrent
93+
class OpenFileExtended(val delegate: OpenFileIF,
94+
val isLengthLong: Boolean,
95+
val isOffsetLong: Boolean,
96+
val startingOffset: Long, ) : OpenFileIF by delegate {
97+
98+
fun readLength(state : OpenFileState): Long {
99+
return if (isLengthLong) delegate.readLong(state) else delegate.readInt(state).toLong()
100+
}
101+
102+
fun readOffset(state : OpenFileState): Long {
103+
return if (isOffsetLong) delegate.readLong(state) else delegate.readInt(state).toLong()
104+
}
105+
106+
fun getFileOffset(address: Long): Long {
107+
return startingOffset + address
108+
}
109+
110+
fun readAddress(state : OpenFileState): Long {
111+
return getFileOffset(readOffset(state))
112+
}
113+
}

core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/H5chunkReader.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,12 @@ internal class H5chunkReader(val h5 : H5builder) {
7373
}
7474
val ba = ByteArray(sizeBytes.toInt())
7575

76-
val btree = if (vinfo.mdl is DataLayoutBTreeVer1)
76+
val btree1 = if (vinfo.mdl is DataLayoutBTreeVer1)
7777
BTree1(h5, vinfo.dataPos, 1, vinfo.storageDims.size)
7878
else
7979
throw RuntimeException("Unsupprted mdl ${vinfo.mdl}")
8080

81-
val tiledData = H5TiledData1(btree, v2.shape, vinfo.storageDims)
81+
val tiledData = H5TiledData1(btree1, v2.shape, vinfo.storageDims)
8282
val filters = FilterPipeline(v2.name, vinfo.mfp, vinfo.h5type.isBE)
8383
if (debugChunking) println(" readChunkedData tiles=${tiledData.tiling}")
8484

0 commit comments

Comments
 (0)