Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions core/src/commonMain/kotlin/com/sunya/cdm/api/Netchdf.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@ interface Netchdf : AutoCloseable {
// TODO I think the output type is not always the input type
fun <T> readArrayData(v2: Variable<T>, section: SectionPartial? = null) : ArrayTyped<T>

// iterate over all the chunks in section, order is arbitrary.
fun <T> chunkIterator(v2: Variable<T>, section: SectionPartial? = null, maxElements : Int? = null) : Iterator<ArraySection<T>>
}

// the section describes the array chunk reletive to the variable's shape.
data class ArraySection<T>(val array : ArrayTyped<T>, val section : Section)

// Experimental: read concurrently chunks of data, call back with lamda, order is arbitrary.
fun <T> Netchdf.chunkConcurrent(v2: Variable<T>, section: SectionPartial? = null, maxElements : Int? = null, nthreads: Int = 20,
lamda : (ArraySection<T>) -> Unit) {
fun <T> Netchdf.readChunksConcurrent(v2: Variable<T>,
section: SectionPartial? = null,
maxElements : Int? = null,
nthreads: Int = 20,
lamda : (ArraySection<T>) -> Unit) {
val reader = ReadChunkConcurrent()
val chunkIter = this.chunkIterator( v2, section, maxElements)
reader.readChunks(nthreads, chunkIter, lamda)
Expand Down
56 changes: 36 additions & 20 deletions core/src/commonMain/kotlin/com/sunya/cdm/layout/Tiling.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,38 +6,38 @@ import kotlin.math.min
/**
* A Tiling divides a multidimensional index space into tiles.
* Indices are points in the original multidimensional index space.
* Tiles are points in the tiled space.
* Tiles are points in the "tiled space" ~ varShape / chunkShape
* Each tile has the same index size, given by chunk.
*
* Allows to efficiently find the data chunks that cover an arbitrary section of the variable's index space.
*
* @param varshape the variable's shape
* @param chunk actual data storage has this shape. May be larger than the shape, last dim ignored if rank > varshape.
* @param varShape the variable's shape
* @param chunkShape actual data storage has this shape. May be larger than the shape, last dim ignored if rank > varshape.
*/
class Tiling(varshape: LongArray, chunkIn: LongArray) {
val chunk = chunkIn.copyOf()
class Tiling(varShape: LongArray, chunkShape: LongArray) {
val chunk = chunkShape.copyOf()
val rank: Int
val tileShape : LongArray // overall shape of the dataset's tile space
private val indexShape : LongArray // overall shape of the dataset's index space - may be larger than actual variable shape
private val strider : LongArray // for computing tile index
val indexShape : LongArray // overall shape of the dataset's index space - may be larger than actual variable shape
val tileStrider : LongArray // for computing tile index

init {
// convenient to allow tileSize to have (an) extra dimension at the end
// to accommodate hdf5 storage, which has the element size
require(varshape.size <= chunk.size)
rank = varshape.size
require(varShape.size <= chunk.size)
rank = varShape.size
this.indexShape = LongArray(rank)
for (i in 0 until rank) {
this.indexShape[i] = max(varshape[i], chunk[i])
this.indexShape[i] = max(varShape[i], chunk[i])
}
this.tileShape = LongArray(rank)
for (i in 0 until rank) {
tileShape[i] = (this.indexShape[i] + chunk[i] - 1) / chunk[i]
}
strider = LongArray(rank)
tileStrider = LongArray(rank)
var accumStride = 1L
for (k in rank - 1 downTo 0) {
strider[k] = accumStride
tileStrider[k] = accumStride
accumStride *= tileShape[k]
}
}
Expand All @@ -57,27 +57,43 @@ class Tiling(varshape: LongArray, chunkIn: LongArray) {
return tile
}

/** Compute the minimum index of a tile, inverse of tile().
/** Compute the left upper index of a tile, inverse of tile().
* This will match a key in the datachunk btree, up to rank dimensions */
fun index(tile: LongArray): LongArray {
return LongArray(rank) { idx -> tile[idx] * chunk[idx] }
}

/**
* Get order based on which tile the pt belongs to
* LOOK you could do this without using the tile
* Get order based on which tile the index pt belongs to.
* This is the linear ordering of the tile.
*
* @param pt index point
* @param index index point
* @return order number based on which tile the pt belongs to
*/
fun order(pt: LongArray): Long {
val tile = tile(pt)
fun order(index: LongArray): Long {
val tile = tile(index)
var order = 0L
val useRank = min(rank, pt.size) // eg varlen (datatype 9) has mismatch
for (i in 0 until useRank) order += strider[i] * tile[i]
val useRank = min(rank, index.size) // eg varlen (datatype 9) has mismatch
for (i in 0 until useRank) order += tileStrider[i] * tile[i]
return order
}

/** inverse of order() */
fun orderToIndex(order: Long) : LongArray {
// calculate tile
val tile = LongArray(rank)
var rem = order

for (k in 0 until rank) {
tile[k] = rem / tileStrider[k]
rem = rem - (tile[k] * tileStrider[k])
}
print("tile $order = ${tile.contentToString()}")

// convert to index
return index(tile)
}

/**
* Create an ordering of index points based on which tile the point is in.
*
Expand Down
114 changes: 114 additions & 0 deletions core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/BTree1ext.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
@file:OptIn(InternalLibraryApi::class)

package com.sunya.netchdf.hdf5

import com.sunya.cdm.iosp.OpenFileState
import com.sunya.cdm.layout.Tiling
import com.sunya.cdm.util.InternalLibraryApi
import kotlin.collections.mutableListOf

/** a BTree1 that uses OpenFileExtended */
internal class BTree1ext(
val raf: OpenFileExtended,
val rootNodeAddress: Long,
val nodeType : Int, // 0 = group/symbol table, 1 = raw data chunks
varShape: LongArray,
chunkShape: LongArray,
) {
val tiling = Tiling(varShape, chunkShape)
val ndimStorage = chunkShape.size

init {
// println(" BTreeNode varShape ${varShape.contentToString()} chunkShape ${chunkShape.contentToString()}")
require (nodeType == 1)
}

fun rootNode(): BTreeNode = BTreeNode(rootNodeAddress, null)

// here both internal and leaf are the same structure
// Btree nodes Level 1A1 - Version 1 B-trees
inner class BTreeNode(val address: Long, val parent: BTreeNode?) {
val level: Int
val nentries: Int
private val leftAddress: Long
private val rightAddress: Long

// type 1
val keys = mutableListOf<LongArray>()
val values = mutableListOf<DataChunkIF>()
val children = mutableListOf<BTreeNode>()

init {
val state = OpenFileState(raf.getFileOffset(address), false)
val magic: String = raf.readString(state, 4)
check(magic == "TREE") { "DataBTree doesnt start with TREE" }

val type: Int = raf.readByte(state).toInt()
check(type == nodeType) { "DataBTree must be type $nodeType" }

level = raf.readByte(state).toInt() // leaf nodes are level 0
nentries = raf.readShort(state).toInt() // number of children to which this node points
leftAddress = raf.readOffset(state)
rightAddress = raf.readOffset(state)

// println(" BTreeNode level $level nentries $nentries")

for (idx in 0 until nentries) {
val chunkSize = raf.readInt(state)
val filterMask = raf.readInt(state)
val inner = LongArray(ndimStorage) { j -> raf.readLong(state) }
val key = DataChunkKey(chunkSize, filterMask, inner)
val childPointer = raf.readAddress(state) // 4 or 8 bytes, then add fileOffset
if (level == 0) {
keys.add(inner)
values.add(DataChunkEntry1(level, this, idx, key, childPointer))
} else {
children.add(BTreeNode(childPointer, this))
}
}

// note there may be unused entries, "All nodes of a particular type of tree have the same maximum degree,
// but most nodes will point to less than that number of children""
}

// return only the leaf nodes, in any order
fun asSequence(): Sequence<Pair<Long, DataChunkIF>> = sequence {
// Handle child nodes recursively (in-order traversal)
if (children.isNotEmpty()) {
children.forEachIndexed { index, childNode ->
yieldAll(childNode.asSequence()) // Yield all elements from the child
}
} else { // If it's a leaf node (no children)
keys.forEachIndexed { index, key ->
yield(tiling.order(key) to values[index]) // Yield all key-value pairs
}
}
}
}

data class DataChunkKey(val chunkSize: Int, val filterMask : Int, val offsets: LongArray) {
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (other !is DataChunkKey) return false
if (!offsets.contentEquals(other.offsets)) return false
return true
}

override fun hashCode(): Int {
return offsets.contentHashCode()
}
}

// childAddress = data chunk (level 1) else a child node
data class DataChunkEntry1(val level : Int, val parent : BTreeNode, val idx : Int, val key : DataChunkKey, val childAddress : Long) : DataChunkIF {
override fun childAddress() = childAddress
override fun offsets() = key.offsets
override fun isMissing() = (childAddress == -1L)
override fun chunkSize() = key.chunkSize
override fun filterMask() = key.filterMask

override fun show(tiling : Tiling) : String = "chunkSize=${key.chunkSize}, chunkStart=${key.offsets.contentToString()}" +
", tile= ${tiling.tile(key.offsets).contentToString()} idx=$idx"
}
}

4 changes: 0 additions & 4 deletions core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/BTree2j.kt
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,6 @@ internal class BTree2j(private val h5: H5builder, owner: String, address: Long,
readRecords(childAddress, depth - 1, numberOfChildRecords, totalNumberOfChildRecords)
}
}

// bb.limit(bb.position() + 4);
// bb.rewind();
// ChecksumUtils.validateChecksum(bb);
}

// heroic jhdf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import com.sunya.cdm.layout.IndexSpace
import com.sunya.cdm.layout.IndexND
import com.sunya.cdm.layout.Tiling

/** wraps BTree1 and BTree2 to handle iterating through tiled data (aka chunked data) */
/** wraps BTree1 to handle iterating through tiled data (aka chunked data) */
internal class H5TiledData1(val btree : BTree1, val varShape: LongArray, val chunkShape: LongArray) {
private val check = true
private val debug = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class H5builder(
) {
val raf: OpenFileIF

private val superblockStart: Long // may be offset for arbitrary metadata
val superblockStart: Long // may be offset for arbitrary metadata
var sizeOffsets: Int = 0
var sizeLengths: Int = 0
var sizeHeapId = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ package com.sunya.netchdf.hdf5

import com.sunya.cdm.api.*
import com.sunya.cdm.array.ArrayTyped
import com.sunya.cdm.iosp.OpenFileIF
import com.sunya.cdm.iosp.OpenFileState
import com.sunya.cdm.layout.Chunker
import com.sunya.cdm.layout.IndexSpace
import com.sunya.cdm.layout.transferMissingNelems
import com.sunya.cdm.util.InternalLibraryApi

// TODO assumes BTree1, could it include BTree2? any chunked reader ?
// only used in Netchdf.chunkConcurrent
// only used in Netchdf.readChunksConcurrent

@OptIn(InternalLibraryApi::class)
internal class H5chunkIterator<T>(val h5 : H5builder, val v2: Variable<T>, val wantSection : Section) : AbstractIterator<ArraySection<T>>() {
Expand All @@ -31,8 +32,8 @@ internal class H5chunkIterator<T>(val h5 : H5builder, val v2: Variable<T>, val w
elemSize = vinfo.storageDims[vinfo.storageDims.size - 1].toInt() // last one is always the elements size
datatype = h5type.datatype()

val btreeNew = BTree1(h5, vinfo.dataPos, 1, vinfo.storageDims.size)
tiledData = H5TiledData1(btreeNew, v2.shape, vinfo.storageDims)
val btree1 = BTree1(h5, vinfo.dataPos, 1, vinfo.storageDims.size)
tiledData = H5TiledData1(btree1, v2.shape, vinfo.storageDims)
filters = FilterPipeline(v2.name, vinfo.mfp, h5type.isBE)
if (debugChunking) println(" H5chunkIterator tiles=${tiledData.tiling}")

Expand Down Expand Up @@ -87,3 +88,26 @@ internal class H5chunkIterator<T>(val h5 : H5builder, val v2: Variable<T>, val w
return ArraySection(array, intersectSpace.section(v2.shape)) // LOOK use space instead of Section ??
}
}

// for H5readConcurrent
class OpenFileExtended(val delegate: OpenFileIF,
val isLengthLong: Boolean,
val isOffsetLong: Boolean,
val startingOffset: Long, ) : OpenFileIF by delegate {

fun readLength(state : OpenFileState): Long {
return if (isLengthLong) delegate.readLong(state) else delegate.readInt(state).toLong()
}

fun readOffset(state : OpenFileState): Long {
return if (isOffsetLong) delegate.readLong(state) else delegate.readInt(state).toLong()
}

fun getFileOffset(address: Long): Long {
return startingOffset + address
}

fun readAddress(state : OpenFileState): Long {
return getFileOffset(readOffset(state))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ internal class H5chunkReader(val h5 : H5builder) {
}
val ba = ByteArray(sizeBytes.toInt())

val btree = if (vinfo.mdl is DataLayoutBTreeVer1)
val btree1 = if (vinfo.mdl is DataLayoutBTreeVer1)
BTree1(h5, vinfo.dataPos, 1, vinfo.storageDims.size)
else
throw RuntimeException("Unsupprted mdl ${vinfo.mdl}")

val tiledData = H5TiledData1(btree, v2.shape, vinfo.storageDims)
val tiledData = H5TiledData1(btree1, v2.shape, vinfo.storageDims)
val filters = FilterPipeline(v2.name, vinfo.mfp, vinfo.h5type.isBE)
if (debugChunking) println(" readChunkedData tiles=${tiledData.tiling}")

Expand Down
Loading
Loading