Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
266 changes: 204 additions & 62 deletions src/cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,37 @@ import type { Newline, ParseResult } from 'cosovo'

import { checkInteger, checkNonNegativeInteger } from './helpers.js'

interface RowStored {
status: 'stored'
range: CSVRange
cells: string[]
firstRangeRow: {
value: number
isEstimate: boolean
}
}

interface RowMissing {
status: 'missing'
leftRange: CSVRange
rightRange?: CSVRange
byteOffset: {
value: number
isEstimate: boolean
}
}

interface RowBeyondEOF {
status: 'beyond-eof'
isEstimate: boolean
}

interface RowUnknown {
status: 'unknown'
}

type RowStatus = RowStored | RowMissing | RowBeyondEOF | RowUnknown

/**
* Cache of parsed rows
*/
Expand Down Expand Up @@ -457,17 +488,6 @@ export class Estimator {
return this.isNumRowsEstimated ? Infinity : this.numRows
}

/**
* Get the cells of a given row
* @param options Options
* @param options.row The row number (0-based)
* @returns The cells of the row, or undefined if the row is not in this range
*/
isStored({ row }: { row: number }): boolean {
const cells = this.#getCells({ row })
return cells !== undefined
}

/**
* Get the cell value at the given row and column
* @param options Options
Expand All @@ -480,13 +500,13 @@ export class Estimator {
if (column >= this.#cache.columnNames.length) {
throw new Error(`Column index out of bounds: ${column}`)
}
const cells = this.#getCells({ row })
if (cells === undefined) {
const status = this.getStatus({ row })
if (status.status !== 'stored') {
return undefined
}
return {
// return empty string for missing columns in existing row
value: cells[column] ?? '',
value: status.cells[column] ?? '',
}
}

Expand All @@ -502,19 +522,73 @@ export class Estimator {
}
}

guessByteOffset({ row }: { row: number }): number | undefined {
// special case: even if averageRowByteCount is undefined or 0, we know the byte offset of row 0
if (row === 0) {
return this.#cache.headerByteCount
/**
* Guess the next missing row, searching from minRow
* @param options Options
* @param options.minRow The minimum row number (0-based)
* @returns An object defining the first missing row, with the byte offset,
* the row number, and if the offset is estimated.
* Returns undefined if all the rows from minRow are already cached,
* or if no estimation is possible.
*/
getFirstMissingRow({ minRow }: { minRow: number }): {
row: number
byteOffset: {
value: number
isEstimate: boolean
}
if (this.#averageRowByteCount === 0 || this.#averageRowByteCount === undefined) {
return undefined
} | undefined {
const status = this.getStatus({ row: minRow })
if (status.status === 'missing') {
return {
row: minRow,
byteOffset: status.byteOffset,
}
}
return Math.max(0,
Math.min(this.#cache.byteLength - 1,
this.#cache.headerByteCount + Math.round(row * this.#averageRowByteCount),
),
)

if (status.status === 'stored') {
const nextRow = status.firstRangeRow.value + status.range.rowsCache.numRows
if (status.range.nextByte >= this.#cache.byteLength) {
return undefined
}
return {
row: nextRow,
byteOffset: {
value: status.range.nextByte,
isEstimate: false, // the previous row is stored, so the offset is exact
},
}
}

// other cases: beyond-eof, unknown
return undefined
}

/**
* Guess the last missing row, searching backwards from maxRow
* @param options Options
* @param options.maxRow The maximum row number (0-based)
* @returns The last missing row number.
* Returns undefined if all the rows before maxRow are already cached,
* or if no estimation is possible.
*/
getLastMissingRowNumber({ maxRow }: { maxRow: number }): number | undefined {
const status = this.getStatus({ row: maxRow })

if (status.status === 'missing') {
return maxRow
}

if (status.status === 'stored') {
const firstRangeRow = status.firstRangeRow.value
if (firstRangeRow === 0) {
return undefined
}
return firstRangeRow - 1
}

// other cases: beyond-eof, unknown
return undefined
}

/**
Expand Down Expand Up @@ -562,50 +636,118 @@ export class Estimator {
}

/**
* Get the cells of a given row
* Get the status of a given row
* @param options Options
* @param options.row The row number (0-based)
* @returns The cells of the row, or undefined if the row is not in this range
* @param options.row The row number (0-based, non-negative integer).
* @returns The status of the row
*/
#getCells({ row }: { row: number }): string[] | undefined {
const cells = this.#cache.serialRange.getRow(row)
if (cells !== undefined) {
return cells
}
// find the range containing this row
// try the last range first
for (const [i, range] of this.#cache.randomRanges.reverse().entries()) {
// due to a bug in cosovo?, the last byte of https://huggingface.co/datasets/Mosab-Rezaei/19th-century-novelists/resolve/main/Dataset - Five Authors .csv
// is not counted. To make the demo work, we allow a 1-byte buffer for the last range.
const hotfixBuffer = 1
const estimatedFirstRow = (i === 0 && range.nextByte >= this.#cache.byteLength - hotfixBuffer && range.rowsCache.numRows > 0)
// special case: last range, and the last stored row is the last row of the file
? this.numRows - range.rowsCache.numRows
// normal case: estimate based on the byte offset
: this.#guessRowNumberInRandomRange({ byteOffset: range.firstByte })
if (estimatedFirstRow === undefined) {
return undefined
}
const cells = range.getRow(row - estimatedFirstRow)
if (cells !== undefined) {
return cells
getStatus({ row }: { row: number }): RowStatus {
checkNonNegativeInteger(row)

if (this.numRows > 0 && row >= this.numRows) {
return {
status: 'beyond-eof',
isEstimate: this.isNumRowsEstimated,
}
}
return undefined
}

#guessRowNumberInRandomRange({ byteOffset }: { byteOffset: number }): number | undefined {
// v8 ignore if -- @preserve
if (this.#averageRowByteCount === undefined) {
// if the cache is complete, there is no random range, so this should not happen
throw new Error('Incoherent state: cannot guess row number in random range when the cache is complete')
let left = {
range: this.#cache.serialRange,
firstRow: 0,
isEstimate: false,
}
if (this.#averageRowByteCount === 0) {
// no estimation available
return undefined

// 4 cases to consider:
// - inside the left range
// - just after the left range
// - after the left range and before the right range (the right range can be undefined, meaning the end of the file)
// - not before the right range (continue to the next range)
for (const rightRange of [...this.#cache.randomRanges, undefined]) {
const leftNextRow = left.firstRow + left.range.rowsCache.numRows

// first case: inside a range
if (row < leftNextRow) {
const cells = left.range.getRow(row - left.firstRow)
// v8 ignore if -- @preserve
if (cells === undefined) {
// sanity check: the range should contain at least one row
throw new Error('Incoherent state: the range should contain at least one row')
}
return {
status: 'stored',
range: left.range,
cells,
firstRangeRow: {
value: left.firstRow,
isEstimate: left.isEstimate,
},
}
}

// second case: just after a range
if (row === leftNextRow) {
return {
status: 'missing',
leftRange: left.range,
rightRange,
byteOffset: {
value: left.range.nextByte,
isEstimate: false, // the previous row is stored, so the offset is exact
},
}
}

// third case: between two ranges

// v8 ignore if -- @preserve
if (this.#averageRowByteCount === undefined) {
// the cache is complete, no need to fetch
throw new Error('Incoherent state: the cache is complete, we should have returned earlier.')
}
if (this.#averageRowByteCount === 0) {
// no estimation available (empty cache, and asking for a row at the middle of the file)
return {
status: 'unknown',
}
}

// Estimate the number of the first row in the right range
const rightFirstRow = rightRange === undefined
? this.numRows
// TODO(SL) restore this logic for end-of-file optimization? I removed it because it can lead to gaps between rows
// special case: if the right range ends at the end of the file, we can compute from the total number of rows
// TODO(SL): beware, it can lead to gap between rows
// row 98477 of http://localhost:5173/?url=https://huggingface.co/datasets/Codatta/MM-Food-100K/resolve/main/MM-Food-100K.csv
// : (rightRange.nextByte >= this.#cache.byteLength - hotfixBuffer) && (rightRange.rowsCache.numRows > 0)
// ? this.numRows - rightRange.rowsCache.numRows
: leftNextRow + Math.round((rightRange.firstByte - left.range.nextByte) / this.#averageRowByteCount)

// third case: between two ranges
if (row < rightFirstRow) {
return {
status: 'missing',
leftRange: left.range,
rightRange,
byteOffset: {
value: left.range.nextByte + Math.round((row - leftNextRow) * this.#averageRowByteCount),
isEstimate: true, // estimated offset
},
}
}

// fourth case: not before the right range (continue to the next range)
// v8 ignore else -- @preserve
if (rightRange !== undefined) {
left = {
range: rightRange,
firstRow: rightFirstRow,
isEstimate: true,
}
}
}
// estimation based on the average row byte count
return Math.max(Math.round((byteOffset - this.#cache.headerByteCount) / this.#averageRowByteCount), 0)

// v8 ignore next -- @preserve
throw new Error('Incoherent state: this point should not be reachable')
}

/**
Expand Down
44 changes: 20 additions & 24 deletions src/dataframe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,33 +163,28 @@ export async function csvDataFrame(params: Params): Promise<CSVDataFrame> {
},
})

// Compute the byte range to fetch
for (let r = rowStart; r < rowEnd; r++) {
if (!estimator.isStored({ row: r })) {
break
}
rowStart++
}
for (let r = rowEnd; r > rowStart; r--) {
if (!estimator.isStored({ row: r - 1 })) {
break
}
rowEnd--
}
if (rowEnd <= rowStart) {
// all rows are already cached
return
}

// fetch rows from rowStart to rowEnd (exclusive), with 3 extra rows before and after
const extraRows = 3
const fetchRowStart = Math.max(0, rowStart - extraRows)
const fetchRowEnd = Math.min(rowEnd + extraRows)
const numRowsToFetch = fetchRowEnd - fetchRowStart

const firstByte = estimator.guessByteOffset({ row: fetchRowStart })
if (firstByte === undefined) {
// cannot estimate
const firstMissingRow = estimator.getFirstMissingRow({ minRow: fetchRowStart })
const lastMissingRowNumber = estimator.getLastMissingRowNumber({ maxRow: fetchRowEnd - 1 })
const lastMissingRow = (lastMissingRowNumber ?? (fetchRowEnd - 1)) + 1 // make it exclusive

if (firstMissingRow === undefined) {
// could not estimate the initial byte offset
return
}
// Prepare the parsing options
const firstByte = firstMissingRow.byteOffset.value
// if lastMissingRowNumber is undefined, we use fetchRowEnd as the fallback (see line 173)
const numRowsToFetch = lastMissingRow - firstMissingRow.row
const initialState = firstMissingRow.byteOffset.isEstimate ? 'detect' : 'default'
const ignoreFirstRow = firstMissingRow.byteOffset.isEstimate

if (numRowsToFetch <= 0) {
// nothing to fetch
return
}

Expand All @@ -210,13 +205,14 @@ export async function csvDataFrame(params: Params): Promise<CSVDataFrame> {
chunkSize,
firstByte,
lastByte: byteLength - 1,
initialState: 'detect',
initialState,
})) {
stats.parsedRows++

// Check if the signal has been aborted
checkSignal(signal)

if (stats.parsedRows <= 1) {
if (stats.parsedRows <= 1 && ignoreFirstRow) {
// we might have started parsing in the middle of a row, ignore this first row
stats.ignored += 1
continue
Expand Down
Loading