Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions src/pipeline/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,35 @@ pub(crate) fn compress_block(
}
}

/// Compress a single block from pre-computed demux output (entropy-encode only).
///
/// Used by the GPU streaming coordinator: GPU match-finding produces matches,
/// CPU `demux_lz77_matches` converts to streams + meta, and this function
/// runs only the entropy stage. Skips match-finding entirely.
#[cfg(feature = "webgpu")]
pub(crate) fn compress_block_from_demux(
pipeline: Pipeline,
original_len: usize,
streams: Vec<Vec<u8>>,
pre_entropy_len: usize,
demux_meta: Vec<u8>,
options: &CompressOptions,
) -> PzResult<Vec<u8>> {
let block = StageBlock {
block_index: 0,
original_len,
data: Vec::new(),
streams: Some(streams),
metadata: StageMetadata {
pre_entropy_len: Some(pre_entropy_len),
demux_meta,
..StageMetadata::default()
},
};
let block = entropy_encode(block, pipeline, original_len, options)?;
Ok(block.data)
}

/// Decompress a single block using the appropriate pipeline (no container header).
pub(crate) fn decompress_block(
payload: &[u8],
Expand Down
17 changes: 17 additions & 0 deletions src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ use crate::bwt;
use crate::lz77;
use crate::{PzError, PzResult};

#[cfg(feature = "webgpu")]
pub(crate) use blocks::compress_block_from_demux;
pub(crate) use blocks::{compress_block, decompress_block};
#[cfg(feature = "webgpu")]
pub(crate) use demux::demux_lz77_matches;
use parallel::{compress_parallel, decompress_parallel};
pub use telemetry::UnifiedSchedulerStats;

Expand Down Expand Up @@ -365,6 +369,19 @@ impl TryFrom<u8> for Pipeline {
}
}

impl Pipeline {
/// Whether this pipeline uses LZ-demux (match-finding + stream demux).
///
/// Only these pipelines benefit from the GPU coordinator's batched
/// LZ77 match-finding path. BWT and SortLz pipelines do not.
pub(crate) fn uses_lz_demux(self) -> bool {
matches!(
self,
Self::Lzf | Self::Lzfi | Self::LzssR | Self::LzSeqR | Self::LzSeqH
)
}
}

// ---------------------------------------------------------------------------
// Public API
// ---------------------------------------------------------------------------
Expand Down
2 changes: 2 additions & 0 deletions src/pipeline/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -943,6 +943,7 @@ fn test_lzseq_r_optimal_round_trip_short() {
}

#[test]
#[ignore] // Takes >60s in debug builds — run with `cargo test -- --ignored`
fn test_lzseq_r_optimal_round_trip_large() {
// Larger data to exercise optimal parsing
let pattern = b"compression and decompression with optimal parsing ";
Expand Down Expand Up @@ -979,6 +980,7 @@ fn test_lzseq_r_quality_level_quality_uses_larger_window() {
}

#[test]
#[ignore] // Takes >60s in debug builds — run with `cargo test -- --ignored`
fn test_lzseq_r_optimal_better_than_lazy_on_structured_data() {
// Verify both parsing strategies round-trip correctly on structured data.
// NOTE: We don't assert optimal < lazy because they make different tradeoffs:
Expand Down
203 changes: 195 additions & 8 deletions src/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use crate::pipeline::{
compress_block, decompress_block, resolve_thread_count, write_header, CompressOptions,
DecompressOptions, Pipeline, BLOCK_HEADER_SIZE, FRAMED_SENTINEL, MAGIC, VERSION,
};
#[cfg(feature = "webgpu")]
use crate::pipeline::{compress_block_from_demux, demux_lz77_matches, Backend};
use crate::{PzError, PzResult};

// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -92,10 +94,10 @@ pub fn compress_stream<R: Read + Send, W: Write>(
let num_threads = resolve_thread_count(options.threads);

if num_threads <= 1 {
compress_stream_single(input, output, pipeline, options)
} else {
compress_stream_parallel(input, output, pipeline, options, num_threads)
return compress_stream_single(input, output, pipeline, options);
}

compress_stream_parallel(input, output, pipeline, options, num_threads)
}

/// Decompress from a reader to a writer.
Expand Down Expand Up @@ -191,24 +193,198 @@ fn compress_stream_parallel<R: Read + Send, W: Write>(
let (input_tx, input_rx) = mpsc::sync_channel::<(usize, Vec<u8>)>(num_threads);
let input_rx = Arc::new(Mutex::new(input_rx));

// Channel: workers -> writer (bounded for back-pressure)
// Channel: workers + GPU coordinator -> writer (bounded for back-pressure)
let (output_tx, output_rx) =
mpsc::sync_channel::<(usize, Result<(Vec<u8>, usize), PzError>)>(num_threads);

// GPU coordinator: only for LZ-demux pipelines (LZ77 match-finding + demux).
// Non-LZ pipelines (BWT, SortLz) handle their own GPU paths in compress_block.
#[cfg(feature = "webgpu")]
let has_gpu_backend =
matches!(options.backend, Backend::WebGpu) && options.webgpu_engine.is_some();
#[cfg(feature = "webgpu")]
let has_gpu = has_gpu_backend && pipeline.uses_lz_demux();
#[cfg(not(feature = "webgpu"))]
let has_gpu = false;

#[cfg(feature = "webgpu")]
#[allow(clippy::type_complexity)]
let (gpu_tx, gpu_rx): (
Option<mpsc::SyncSender<(usize, Vec<u8>)>>,
Option<mpsc::Receiver<(usize, Vec<u8>)>>,
) = if has_gpu {
let (tx, rx) = mpsc::sync_channel(2);
(Some(tx), Some(rx))
} else {
(None, None)
};

#[cfg(feature = "webgpu")]
let max_dispatch = if has_gpu {
options
.webgpu_engine
.as_ref()
.unwrap()
.max_dispatch_input_size()
} else {
0
};

// Adaptive backpressure: workers check this before try_send to GPU.
// Score ramps up on Full (+2), down on Ok (-1). When score >= limit,
// workers skip GPU entirely. Same mechanism as parallel.rs.
#[cfg(feature = "webgpu")]
let gpu_pressure = if has_gpu {
Some(Arc::new(std::sync::atomic::AtomicUsize::new(0)))
} else {
None
};
#[cfg(feature = "webgpu")]
let gpu_pressure_limit = num_threads.saturating_mul(2).max(4);

// Use thread::scope so all threads are joined before we return.
// The writer runs on the main thread to avoid Send requirements on W.
let writer_result: StreamResult<u64> = std::thread::scope(|scope| {
// Spawn optional GPU coordinator thread.
// Receives blocks from workers via gpu_rx, batches them for GPU
// match-finding, then sends compressed results to output_tx.
#[cfg(feature = "webgpu")]
let gpu_handle = if let Some(gpu_rx) = gpu_rx {
let gpu_output_tx = output_tx.clone();
let opts = options.clone();
let pl = pipeline;
Some(scope.spawn(move || -> StreamResult<()> {
let engine = opts.webgpu_engine.as_ref().unwrap();

while let Ok((first_idx, first_data)) = gpu_rx.recv() {
// Batch-collect: blocking recv got first item, drain rest.
let mut batch_indices = vec![first_idx];
let mut batch_blocks = vec![first_data];

while let Ok((idx, data)) = gpu_rx.try_recv() {
batch_indices.push(idx);
batch_blocks.push(data);
}

// Run GPU match-finding on the batch.
let block_refs: Vec<&[u8]> =
batch_blocks.iter().map(|b| b.as_slice()).collect();
match engine.find_matches_batched(&block_refs) {
Ok(all_matches) => {
for (i, matches) in all_matches.into_iter().enumerate() {
let block_index = batch_indices[i];
let input_data = &batch_blocks[i];
let original_len = input_data.len();

let result = match demux_lz77_matches(input_data, matches, pl) {
Ok(demux_out) => compress_block_from_demux(
pl,
original_len,
demux_out.streams,
demux_out.pre_entropy_len,
demux_out.meta,
&opts,
)
.map(|c| (c, original_len)),
Err(_) => {
// Demux failed: full CPU fallback
compress_block(input_data, pl, &opts)
.map(|c| (c, original_len))
}
};
if gpu_output_tx.send((block_index, result)).is_err() {
return Ok(());
}
}
}
Err(_) => {
// GPU batch failed: fall back all blocks to CPU
for (i, block_data) in batch_blocks.iter().enumerate() {
let block_index = batch_indices[i];
let original_len = block_data.len();
let result = compress_block(block_data, pl, &opts)
.map(|c| (c, original_len));
if gpu_output_tx.send((block_index, result)).is_err() {
return Ok(());
}
}
}
}
}
Ok(())
}))
} else {
None
};

// Workers always compress on CPU. When GPU is active, the GPU
// coordinator handles GPU work; workers must not accidentally route
// through the GPU path inside compress_block → compress_and_demux.
#[cfg(feature = "webgpu")]
let worker_options = if has_gpu {
CompressOptions {
backend: crate::pipeline::Backend::Cpu,
webgpu_engine: None,
..options.clone()
}
} else {
options.clone()
};
#[cfg(not(feature = "webgpu"))]
let worker_options = options.clone();

// Spawn worker threads
for _ in 0..num_threads {
let rx = Arc::clone(&input_rx);
let tx = output_tx.clone();
let opts = options.clone();
let opts = worker_options.clone();
#[cfg(feature = "webgpu")]
let gpu_tx_clone = gpu_tx.clone();
#[cfg(feature = "webgpu")]
let gpu_pressure_clone = gpu_pressure.clone();
scope.spawn(move || {
loop {
let (idx, block_data) = match rx.lock().unwrap().recv() {
let (idx, mut block_data) = match rx.lock().unwrap().recv() {
Ok(msg) => msg,
Err(_) => break, // channel closed
};

// Try to offload to GPU coordinator if available,
// gated by adaptive backpressure.
#[cfg(feature = "webgpu")]
if let Some(ref gpu_sender) = gpu_tx_clone {
let pressure = gpu_pressure_clone
.as_ref()
.map_or(0, |p| p.load(std::sync::atomic::Ordering::Relaxed));
let len = block_data.len();
if pressure < gpu_pressure_limit
&& len >= crate::webgpu::MIN_GPU_INPUT_SIZE
&& len <= max_dispatch
{
match gpu_sender.try_send((idx, block_data)) {
Ok(()) => {
if let Some(ref p) = gpu_pressure_clone {
let _ = p.fetch_update(
std::sync::atomic::Ordering::Relaxed,
std::sync::atomic::Ordering::Relaxed,
|v| Some(v.saturating_sub(1)),
);
}
continue;
}
Err(mpsc::TrySendError::Full((_, data))) => {
if let Some(ref p) = gpu_pressure_clone {
p.fetch_add(2, std::sync::atomic::Ordering::Relaxed);
}
block_data = data;
}
Err(mpsc::TrySendError::Disconnected((_, data))) => {
block_data = data;
}
}
}
}

let original_len = block_data.len();
let result = compress_block(&block_data, pipeline, &opts)
.map(|compressed| (compressed, original_len));
Expand All @@ -218,8 +394,10 @@ fn compress_stream_parallel<R: Read + Send, W: Write>(
}
});
}
// Drop the original sender so workers' clones are the only ones.
// Drop senders so downstream channels close when all producers finish.
drop(output_tx);
#[cfg(feature = "webgpu")]
drop(gpu_tx);

// Spawn reader thread
let reader_handle = scope.spawn(move || -> StreamResult<()> {
Expand All @@ -243,7 +421,6 @@ fn compress_stream_parallel<R: Read + Send, W: Write>(
// Runs on the current (scoped) thread.
let mut expected = 0usize;
let mut reorder: BTreeMap<usize, (Vec<u8>, usize)> = BTreeMap::new();

for (idx, result) in output_rx {
let (compressed, original_len) = result?;
reorder.insert(idx, (compressed, original_len));
Expand All @@ -262,6 +439,16 @@ fn compress_stream_parallel<R: Read + Send, W: Write>(
Err(_) => return Err(StreamError::Pz(PzError::InvalidInput)),
}

// Check GPU coordinator for errors
#[cfg(feature = "webgpu")]
if let Some(handle) = gpu_handle {
match handle.join() {
Ok(Ok(())) => {}
Ok(Err(e)) => return Err(e),
Err(_) => return Err(StreamError::Pz(PzError::InvalidInput)),
}
}

Ok(bytes_written)
});

Expand Down