Skip to content

Conversation

@400Ping
Copy link

@400Ping 400Ping commented Dec 22, 2025

Purpose of PR

  • Add an pinned host buffer pool and wire it into the dual-stream pipeline so each chunk uses double-buffered pinned staging before H2D copies (reduces malloc/free and GPU idle).

Related Issues or PRs

Closes #703

Changes Made

  • Bug fix
  • New feature
  • Refactoring
  • Documentation
  • Test
  • CI/CD pipeline
  • Other

Breaking Changes

  • Yes
  • No

Checklist

  • Added or updated unit tests for all changes
  • Added or updated documentation for all changes
  • Successfully built and ran all unit tests or manual tests locally
  • PR title follows "MAHOUT-XXX: Brief Description" format (if related to an issue)
  • Code follows ASF guidelines

@400Ping 400Ping marked this pull request as draft December 22, 2025 13:39
@400Ping 400Ping marked this pull request as ready for review December 22, 2025 13:51
@400Ping
Copy link
Author

400Ping commented Dec 22, 2025

@400Ping 400Ping changed the title [QDP] Double-buffered async I/O for read_parquet_batch [QDP] Pinned host buffer + dual-stream event pipeline to overlap copy and compute Dec 22, 2025
@rich7420
Copy link
Contributor

rich7420 commented Dec 23, 2025

Thanks @400Ping for the patch!

  1. What's the reason you define a ffi function again?
  2. some tests failed locally due to tensor shape problem or you should fix test for excepted output.

@400Ping
Copy link
Author

400Ping commented Dec 23, 2025

Thanks @400Ping for the patch!

  1. What's the reason you define a ffi function again?
  2. some tests failed locally due to tensor shape problem or you should fix test for excepted output.

My bad just fixed it.

@400Ping 400Ping marked this pull request as draft December 24, 2025 07:54
@400Ping 400Ping marked this pull request as ready for review December 25, 2025 10:18
Copy link
Contributor

@rich7420 rich7420 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@400Ping thanks for the patch!
left some comments

@rich7420 rich7420 marked this pull request as draft December 25, 2025 13:24
@400Ping 400Ping changed the title [QDP] Pinned host buffer + dual-stream event pipeline to overlap copy and compute [QDP] Double-buffered pinned I/O pipeline and faster Parquet decode Dec 25, 2025
@400Ping 400Ping marked this pull request as ready for review December 25, 2025 22:51
@rich7420
Copy link
Contributor

I think maybe we could add some unit tests for this.

@ryankert01
Copy link
Contributor

We have 2 improvement in this PR. Based on the benchmark result, I'm speculating if there's one of them are not contributing to the speed improvement. What's your experience?

Signed-off-by: 400Ping <fourhundredping@gmail.com>
Signed-off-by: 400Ping <fourhundredping@gmail.com>
Signed-off-by: 400Ping <fourhundredping@gmail.com>
Signed-off-by: 400Ping <fourhundredping@gmail.com>
Signed-off-by: 400Ping <fourhundredping@gmail.com>
Signed-off-by: 400Ping <fourhundredping@gmail.com>
Signed-off-by: 400Ping <fourhundredping@gmail.com>
Signed-off-by: 400Ping <fourhundredping@gmail.com>
Signed-off-by: 400Ping <fourhundredping@gmail.com>
This reverts commit 3556b5a.
Signed-off-by: 400Ping <fourhundredping@gmail.com>
@400Ping
Copy link
Author

400Ping commented Dec 29, 2025

We have 2 improvement in this PR. Based on the benchmark result, I'm speculating if there's one of them are not contributing to the speed improvement. What's your experience?

I think both have improvements, for the second one is what @rich7420 and @guan404ming suggested to change a different decompression technique to improve its performance. But I think overall it is because of the first one improving the speed improvements.

Signed-off-by: 400Ping <fourhundredping@gmail.com>
@400Ping
Copy link
Author

400Ping commented Dec 29, 2025

Just tested, the second one doesn't improve much performance, going to remove it.

Signed-off-by: 400Ping <fourhundredping@gmail.com>
Signed-off-by: 400Ping <fourhundredping@gmail.com>
@400Ping 400Ping requested a review from rich7420 December 29, 2025 14:25
@rich7420
Copy link
Contributor

plz fix pre-commit error

@400Ping
Copy link
Author

400Ping commented Dec 31, 2025

Done, @rich7420 PTAL

Signed-off-by: 400Ping <fourhundredping@gmail.com>
@400Ping
Copy link
Author

400Ping commented Jan 1, 2026

cc @guan404ming @ryankert01

@ryankert01
Copy link
Contributor

ryankert01 commented Jan 1, 2026

please fix pre-commit. I tested locally and get a 2.8% speedup on arrow ipc case.
Will look into it next week.

Signed-off-by: 400Ping <fourhundredping@gmail.com>
Signed-off-by: 400Ping <fourhundredping@gmail.com>
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a double-buffered pinned host memory I/O pipeline to improve GPU data transfer performance. The key optimization is adding a reusable pool of pinned host buffers to eliminate repeated CUDA allocation/deallocation overhead in the streaming Parquet decode path.

  • Implements PinnedBufferPool with automatic RAII-based buffer management
  • Refactors PipelineContext to support multiple event slots for double-buffered synchronization
  • Renames PinnedBuffer to PinnedHostBuffer for clarity
  • Moves norm buffer allocation from per-pipeline to per-chunk

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 9 comments.

Show a summary per file
File Description
qdp/qdp-core/src/gpu/buffer_pool.rs New pinned host buffer pool with acquire/release semantics and automatic return-to-pool on drop
qdp/qdp-core/src/gpu/pipeline.rs Extended PipelineContext to support multiple event slots; integrated pinned buffer pool; improved error handling with Result returns
qdp/qdp-core/src/gpu/memory.rs Renamed PinnedBuffer to PinnedHostBuffer and added immutable slice accessor
qdp/qdp-core/src/lib.rs Integrated buffer pool types; moved norm buffer allocation to per-chunk scope; updated pipeline event handling
qdp/qdp-core/src/gpu/mod.rs Exposed new buffer_pool module and its public types
qdp/qdp-core/src/gpu/cuda_ffi.rs Removed redundant cfg attribute (already applied at module level)
qdp/qdp-kernels/tests/amplitude_encode.rs Refactored test loop to use idiomatic iterator pattern instead of direct indexing

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 103 to 111
let mut free = self.free.lock().unwrap();
loop {
if let Some(buffer) = free.pop() {
return PinnedBufferHandle {
buffer: Some(buffer),
pool: Arc::clone(self),
};
}
free = self.available_cv.wait(free).unwrap();
Copy link

Copilot AI Jan 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The .unwrap() calls on mutex lock operations can cause panics if the mutex is poisoned. In a production system, poisoned mutex errors should be handled more gracefully, either by propagating the error or by documenting that panic behavior is intentional in these scenarios.

Copilot uses AI. Check for mistakes.
/// Returns `None` if the pool is currently empty; callers can choose to spin/wait
/// or fall back to synchronous paths.
pub fn try_acquire(self: &Arc<Self>) -> Option<PinnedBufferHandle> {
let mut free = self.free.lock().unwrap();
Copy link

Copilot AI Jan 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The .unwrap() call on the mutex lock operation can cause a panic if the mutex is poisoned. Consider handling this error more gracefully or documenting the panic behavior.

Copilot uses AI. Check for mistakes.

/// Number of buffers currently available.
pub fn available(&self) -> usize {
self.free.lock().unwrap().len()
Copy link

Copilot AI Jan 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The .unwrap() call on the mutex lock operation can cause a panic if the mutex is poisoned. Consider handling this error more gracefully or documenting the panic behavior.

Copilot uses AI. Check for mistakes.
Comment on lines +281 to +288
if chunk.len() > CHUNK_SIZE_ELEMENTS {
return Err(MahoutError::InvalidInput(format!(
"Chunk size {} exceeds pinned buffer capacity {}",
chunk.len(),
CHUNK_SIZE_ELEMENTS
)));
}

Copy link

Copilot AI Jan 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is redundant because the iterator chunks() will never produce a chunk larger than CHUNK_SIZE_ELEMENTS. The check can be removed to simplify the code.

Suggested change
if chunk.len() > CHUNK_SIZE_ELEMENTS {
return Err(MahoutError::InvalidInput(format!(
"Chunk size {} exceeds pinned buffer capacity {}",
chunk.len(),
CHUNK_SIZE_ELEMENTS
)));
}

Copilot uses AI. Check for mistakes.
Comment on lines +331 to +339
let mut norm_buffer = self
.device
.alloc_zeros::<f64>(samples_in_chunk)
.map_err(|e| {
MahoutError::MemoryAllocation(format!(
"Failed to allocate norm buffer: {:?}",
e
))
})?;
Copy link

Copilot AI Jan 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving the norm buffer allocation inside the per-chunk loop (line 331-339) reintroduces per-chunk allocation overhead that was previously avoided. The old code comment explicitly stated: "Reuse a single norm buffer across chunks to avoid per-chunk allocations" and warned that "per-chunk allocation + drop can lead to use-after-free when the next chunk reuses the same device memory while the previous chunk is still running." While the use-after-free concern is mitigated by proper stream synchronization, the per-chunk allocation overhead remains. Consider pre-allocating a single norm buffer sized for the maximum expected samples_in_chunk to improve performance.

Copilot uses AI. Check for mistakes.
impl Drop for PinnedBufferHandle {
fn drop(&mut self) {
if let Some(buf) = self.buffer.take() {
let mut free = self.pool.free.lock().unwrap();
Copy link

Copilot AI Jan 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The .unwrap() call on the mutex lock operation can cause a panic if the mutex is poisoned. Consider handling this error more gracefully or documenting the panic behavior.

Suggested change
let mut free = self.pool.free.lock().unwrap();
let mut free = match self.pool.free.lock() {
Ok(guard) => guard,
Err(poisoned) => poisoned.into_inner(),
};

Copilot uses AI. Check for mistakes.
Comment on lines +37 to +49
fn deref(&self) -> &Self::Target {
self.buffer
.as_ref()
.expect("Buffer already returned to pool")
}
}

#[cfg(target_os = "linux")]
impl std::ops::DerefMut for PinnedBufferHandle {
fn deref_mut(&mut self) -> &mut Self::Target {
self.buffer
.as_mut()
.expect("Buffer already returned to pool")
Copy link

Copilot AI Jan 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The panic message "Buffer already returned to pool" may be misleading. This panic occurs when attempting to use a PinnedBufferHandle after it has been dropped and its buffer returned to the pool. Consider a more descriptive message such as "Attempted to use PinnedBufferHandle after buffer was returned to pool (use-after-drop)" to better indicate the programmer error.

Copilot uses AI. Check for mistakes.
@guan404ming
Copy link
Member

guan404ming commented Jan 1, 2026

I agree that the comment regarding .unwrap() is valid.
Do we want to handle this more gracefully, or is the current panic-on-poison behavior expected? If so, we can document it.

@400Ping
Copy link
Author

400Ping commented Jan 1, 2026

I agree that the comment regarding .unwrap() is valid. Do we want to handle this more gracefully, or is the current panic-on-poison behavior expected? If so, we can document it.

I think I will change the code to handle it more gracefully and add some comments to document this behavior.

Signed-off-by: 400Ping <fourhundredping@gmail.com>
Signed-off-by: 400Ping <fourhundredping@gmail.com>
@guan404ming
Copy link
Member

Need resolve conflicts, and overall looks good to me!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants