Skip to content

feat: Integrate Celestia Data Availability layer in sequencer#130

Open
nobel-sh wants to merge 11 commits intofeature/parajuliswopnil/twine-sequencerfrom
nobel/nest-da
Open

feat: Integrate Celestia Data Availability layer in sequencer#130
nobel-sh wants to merge 11 commits intofeature/parajuliswopnil/twine-sequencerfrom
nobel/nest-da

Conversation

@nobel-sh
Copy link
Copy Markdown
Contributor

No description provided.

@nobel-sh nobel-sh marked this pull request as ready for review December 22, 2025 07:29
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR integrates Celestia as the Data Availability (DA) layer for the sequencer, enabling verified batches to be posted to Celestia and verified on L1 via the SP1 Blobstream contract. The implementation adds a DA worker service with separate poster and verifier threads, updates the state verifier to send batches to DA, and includes comprehensive L1 verification logic.

Key changes:

  • Adds DA worker service with poster and verifier threads for asynchronous batch processing
  • Integrates Celestia SDK for blob submission and retrieval with L1 verification via SP1 Blobstream
  • Refactors state verification logic into separate methods for better maintainability

Reviewed changes

Copilot reviewed 21 out of 24 changed files in this pull request and generated 19 comments.

Show a summary per file
File Description
crates/sequencer/src/da/workers.rs New DA worker service with poster/verifier loops for handling batch submission and L1 verification
crates/sequencer/src/da/celestia/da.rs Celestia DA implementation with blob posting and proof retrieval
crates/sequencer/src/da/celestia/l1.rs L1 verification logic via SP1 Blobstream contract interactions
crates/sequencer/src/da/celestia/proof.rs Tendermint proof fetching for data root inclusion
crates/sequencer/src/da/types.rs Type definitions for DA operations (BatchInfo, DACommitment, etc.)
crates/sequencer/src/da/traits.rs DA trait interface for data availability providers
crates/sequencer/src/verification/state_verifier.rs Refactored to extract verification methods and integrate DA posting
crates/sequencer/src/instance/instance.rs Initializes DA worker and passes sender to state verifier
crates/sequencer/src/config/config.rs Adds DAConfig structure and environment variable handling
crates/sequencer/src/errors.rs Adds DAError variant for DA-specific errors
crates/sequencer/src/common/consts.rs Adds DA-related constants and identifiers
bin/nest/config/sequencer.yaml Adds default DA configuration section with Celestia settings
crates/sequencer/Cargo.toml Adds celestia-rpc, celestia-types, and tendermint dependencies
crates/sequencer/src/chain_state/chains/twine/* Minor refactoring: renames module and improves string handling
bin/nest/src/main.rs Removes redundant tokio import

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread crates/sequencer/src/da/workers.rs Outdated
Comment on lines +258 to +260
"Failed to verify DA checkpoint on L1 for batch {batch_number}: {e}
"
)));
Copy link

Copilot AI Dec 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message has trailing whitespace and an extra newline within the string literal. The closing quote should be on the same line as the error text.

Copilot uses AI. Check for mistakes.
Comment thread crates/sequencer/src/da/celestia/l1.rs Outdated
.parse()
.map_err(|e| TwineSequencerError::Other(format!("Invalid contract address: {e}")))?;

let provider = ProviderBuilder::new().connect_http(config.eth_rpc_url.parse().unwrap());
Copy link

Copilot AI Dec 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using unwrap() on the RPC URL parse could cause a panic if the configuration contains an invalid URL. This should return a proper error instead.

Copilot uses AI. Check for mistakes.
Comment on lines +165 to +189
self.record_verified_batch(batch_number).await?;

tracing::info!(
target = "final_verifier",
"VERIFIED BATCH. batch_number: {} | hash: {:x}",
batch_number,
reference_state.batch_hash
);

// Send batch to DA worker thread
let batch_info = BatchInfo {
batch_num: batch_number,
batch_hash: reference_state.batch_hash,
};
if let Err(e) = self.da_sender.send(batch_info).await {
tracing::error!(
target = "final_verifier",
batch_number = batch_number,
error = %e,
"Failed to send batch to DA worker"
);
return Err(TwineSequencerError::DAError(format!(
"Failed to send batch to DA worker: {e}"
)));
}
Copy link

Copilot AI Dec 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The DA sending happens after recording the verified batch in the database. If the DA send fails, the batch is already marked as verified in the DB but hasn't been posted to DA. This creates an inconsistency. Consider either: 1) sending to DA before recording in DB, or 2) recording DA status separately in the DB to track which batches have been posted.

Copilot uses AI. Check for mistakes.

impl StateVerifier {
/// Creates a new StateVerifier instance.
/// Creates a new `StateVerifier` instance
Copy link

Copilot AI Dec 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing period at the end of the documentation comment for consistency with other doc comments in the file.

Suggested change
/// Creates a new `StateVerifier` instance
/// Creates a new `StateVerifier` instance.

Copilot uses AI. Check for mistakes.
Comment on lines +16 to +266
/// DA Worker service
#[derive(Debug)]
pub struct DAWorker {
/// Channel to send batch info for DA posting
pub batch_tx: Sender<BatchInfo>,
}

impl DAWorker {
/// Create a new DA Worker instance from config and spawn background workers
/// Returns the worker and join handles for both poster and verifier threads
pub async fn from_config(
da_config: DAConfig,
) -> Result<
(
Self,
Vec<tokio::task::JoinHandle<Result<(), TwineSequencerError>>>,
),
TwineSequencerError,
> {
tracing::info!(target: "da_worker", "Initializing Celestia DA worker");

let verifier_poll_interval = da_config.verifier_poll_interval;
let provider = CelestiaDA::from_config(da_config).await?;

tracing::info!(target: "da_worker", "Celestia DA provider initialized successfully");

// channel for incoming batches to be posted to DA
let (batch_tx, batch_rx) = channel::<BatchInfo>(100);

// channel for verifing commitments after posting to DA
// large buffer since verification is slow (~1 hour)
let (commitment_tx, commitment_rx) = channel::<(BatchInfo, DACommitment)>(1000);

let provider_arc = Arc::new(provider);

let poster_handle = tokio::spawn(Self::poster_loop(
provider_arc.clone(),
batch_rx,
commitment_tx,
));

let verifier_handle = tokio::spawn(Self::verifier_loop(
provider_arc,
commitment_rx,
verifier_poll_interval,
));

let worker = Self { batch_tx };

Ok((worker, vec![poster_handle, verifier_handle]))
}

/// Send a batch to the DA worker for processing
pub async fn post_batch(&self, batch_info: BatchInfo) -> Result<(), TwineSequencerError> {
self.batch_tx.send(batch_info).await.map_err(|e| {
TwineSequencerError::Other(format!("Failed to send batch to DA worker: {e}"))
})
}

/// Poster worker loop
async fn poster_loop<T: DA>(
da_provider: Arc<T>,
mut batch_rx: Receiver<BatchInfo>,
commitment_tx: Sender<(BatchInfo, DACommitment)>,
) -> Result<(), TwineSequencerError> {
tracing::info!(target = "da_poster", "DA poster thread started");

while let Some(batch_info) = batch_rx.recv().await {
let batch_number = batch_info.batch_num;

let commitment = match da_provider.post_to_da(batch_info.clone()).await {
Ok(c) => {
tracing::info!(
target = "da_poster",
batch_number = batch_number,
da_height = c.height,
"Batch posted to DA successfully"
);
c
}
Err(e) => {
tracing::error!(
target = "da_poster",
batch_number = batch_number,
error = %e,
"Failed to post batch to DA"
);
continue;
}
};

// Forward to verifier
if let Err(e) = commitment_tx.send((batch_info, commitment)).await {
tracing::error!(
target = "da_poster",
batch_number = batch_number,
error = %e,
"Failed to send commitment to verifier"
);
} else {
tracing::debug!(
target = "da_poster",
batch_number = batch_number,
"Commitment forwarded to verifier"
);
}
}

tracing::info!(target = "da_poster", "DA poster thread stopped");
Ok(())
}

/// Verifier worker loop
async fn verifier_loop<T: DA>(
da_provider: Arc<T>,
mut commitment_rx: Receiver<(BatchInfo, DACommitment)>,
verifier_poll_interval_ms: u64,
) -> Result<(), TwineSequencerError> {
tracing::info!(
target = "da_verifier",
verifier_poll_interval_ms = verifier_poll_interval_ms,
"DA verifier thread started"
);

let mut pending_commitments = VecDeque::<(BatchInfo, DACommitment)>::new();
let mut ticker = interval(Duration::from_millis(verifier_poll_interval_ms));

loop {
ticker.tick().await;

while let Ok((batch_info, commitment)) = commitment_rx.try_recv() {
tracing::debug!(
target = "da_verifier",
batch_number = batch_info.batch_num,
celestia_height = commitment.height,
"Received new commitment for verification"
);
pending_commitments.push_back((batch_info, commitment));
}

if pending_commitments.is_empty() {
continue;
}

tracing::debug!(
target = "da_verifier",
pending_count = pending_commitments.len(),
"Processing pending commitments"
);

// Only process the first commitment since heights are ordered
// If height N is not available, height N+1 won't be either
if let Some((batch_info, commitment)) = pending_commitments.front() {
let batch_number = batch_info.batch_num;
let celestia_height = commitment.height;

// Check if the height is available on L1 before attempting to get proof
let is_available = match da_provider.height_exists_on_l1(celestia_height).await {
Ok(available) => available,
Err(e) => {
tracing::warn!(
target = "da_verifier",
batch_number = batch_number,
celestia_height = celestia_height,
error = %e,
"Failed to check if height is available on L1, will retry next tick"
);
continue;
}
};

if !is_available {
tracing::debug!(
target = "da_verifier",
batch_number = batch_number,
celestia_height = celestia_height,
"Height not yet available on L1, will retry next tick"
);
continue;
}

// Remove from queue and process
let (batch_info, commitment) = pending_commitments.pop_front().unwrap();

tracing::info!(
target = "da_verifier",
batch_number = batch_number,
celestia_height = celestia_height,
"Height is available on L1, fetching DA existence proof"
);

// Get DA existence proof
let proof = match da_provider
.get_da_existence_proof(batch_info.clone(), commitment.clone())
.await
{
Ok(p) => {
tracing::info!(
target = "da_verifier",
batch_number = batch_number,
proof_nonce = p.proof_nonce,
"DA existence proof obtained"
);
p
}
Err(e) => {
tracing::error!(
target = "da_verifier",
batch_number = batch_number,
error = %e,
"Failed to get DA existence proof"
);
return Err(TwineSequencerError::DAError(format!(
"Failed to get DA existence proof for batch {batch_number}: {e}
"
)));
}
};

// Verify on L1
let checkpoint = DACheckpoint {
batch_info,
da_commitment: commitment,
da_existence_proof: proof,
};

match da_provider.verify_da_on_l1(checkpoint).await {
Ok(()) => {
tracing::info!(
target = "da_verifier",
batch_number = batch_number,
"DA checkpoint verified on L1 successfully"
);
}
Err(e) => {
tracing::error!(
target = "da_verifier",
batch_number = batch_number,
error = %e,
"Failed to verify DA checkpoint on L1"
);
return Err(TwineSequencerError::DAError(format!(
"Failed to verify DA checkpoint on L1 for batch {batch_number}: {e}
"
)));
}
}
}
}
}
}
Copy link

Copilot AI Dec 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The DA worker implementation lacks test coverage. Consider adding tests for: 1) poster loop handling successful and failed posts, 2) verifier loop processing commitments, 3) error handling when channels are closed, 4) behavior when DA heights are not yet available on L1.

Copilot uses AI. Check for mistakes.
Comment thread crates/sequencer/src/da/workers.rs Outdated
Comment on lines +229 to +230
"Failed to get DA existence proof for batch {batch_number}: {e}
"
Copy link

Copilot AI Dec 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message has trailing whitespace and an extra newline within the string literal. The closing quote should be on the same line as the error text.

Copilot uses AI. Check for mistakes.
Comment thread crates/sequencer/src/da/workers.rs Outdated
}

// Remove from queue and process
let (batch_info, commitment) = pending_commitments.pop_front().unwrap();
Copy link

Copilot AI Dec 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The DA worker threads will panic if this unwrap fails. Since we just checked that pending_commitments is not empty and we're only processing the front element, this should be safe. However, for defensive programming, consider using if-let or expect with a descriptive message.

Suggested change
let (batch_info, commitment) = pending_commitments.pop_front().unwrap();
let (batch_info, commitment) = pending_commitments
.pop_front()
.expect("pending_commitments was empty in DA verifier after confirming a front element");

Copilot uses AI. Check for mistakes.
Comment on lines +76 to +146
/// Verify state consistency across all chains
async fn verify_state_consistency(
&self,
batch_number: u64,
states: &std::collections::HashMap<String, crate::chain_state::state::State>,
) -> Result<crate::chain_state::state::State, TwineSequencerError> {
tracing::debug!(
target = "final_verifier",
"verifying batch {} ({} chains)",
batch_number,
states.len()
);

// take the first state as the reference
let reference_state = states
.values()
.next()
.expect("non-empty states map guaranteed above")
.clone();

for (chain, state) in states {
if state != &reference_state {
tracing::error!(
target = "final_verifier",
"mismatched l2 state for batch: {} on chain: {}",
batch_number,
chain,
);
tracing::debug!(
target = "final_verifier",
"expected: {:?}, got: {:?}",
reference_state,
state,
);

let error = TwineSequencerError::StateRecordMismatched(format!(
"batch: {batch_number}, chain: {chain}, expected: {reference_state:?}, got: {state:?}"
));

// On verification failure send kill signal to stop block producer
let shutdown_signal = ShutdownSignal::VerificationFailure(error.clone());
tracing::error!(
target = "final_verifier",
"sending shutdown signal: {}",
shutdown_signal
);
self.kill_sig_sender.send(shutdown_signal).map_err(|e| {
TwineSequencerError::Other(format!(
"Channel error sending shutdown signal: {e}",
))
})?;
return Err(error);
}
}

Ok(reference_state)
}

/// Record verified batch in database
async fn record_verified_batch(&self, batch_number: u64) -> Result<(), TwineSequencerError> {
self.db
.lock()
.await
.insert(
NS_CHAIN_STATE_VERIFIER.to_string(),
VERIFIED_BATCH.to_string(),
batch_number.to_string(),
)
.await
.map_err(|e| TwineSequencerError::Other(e.to_string()))
}
Copy link

Copilot AI Dec 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new verify_state_consistency and record_verified_batch methods lack test coverage. Consider adding tests to verify: 1) state mismatch detection, 2) shutdown signal sending on verification failure, 3) database recording success and failure scenarios.

Copilot uses AI. Check for mistakes.
Comment thread crates/sequencer/src/common/consts.rs Outdated
/// List of all registered L1 chains for verification
pub const REGISTERED_L1_CHAINS: &[&str] = &[ETHEREUM_CHAIN_IDENTIFIER, SOLANA_CHAIN_IDENTIFIER];
pub const REGISTERED_L1_CHAINS: &[&str] = &[
DA_CHAIN_IDENTIFIER,
Copy link

Copilot AI Dec 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding DA_CHAIN_IDENTIFIER to REGISTERED_L1_CHAINS may be conceptually incorrect. Celestia is a Data Availability layer, not an L1 chain in the same sense as Ethereum and Solana. The constants NS_DA_WATCHER and DA_PROCESSED_BATCH are defined but there's no corresponding DA watcher implementation in this PR. This suggests incomplete implementation or misleading naming.

Suggested change
DA_CHAIN_IDENTIFIER,

Copilot uses AI. Check for mistakes.
Comment on lines +143 to +264
loop {
ticker.tick().await;

while let Ok((batch_info, commitment)) = commitment_rx.try_recv() {
tracing::debug!(
target = "da_verifier",
batch_number = batch_info.batch_num,
celestia_height = commitment.height,
"Received new commitment for verification"
);
pending_commitments.push_back((batch_info, commitment));
}

if pending_commitments.is_empty() {
continue;
}

tracing::debug!(
target = "da_verifier",
pending_count = pending_commitments.len(),
"Processing pending commitments"
);

// Only process the first commitment since heights are ordered
// If height N is not available, height N+1 won't be either
if let Some((batch_info, commitment)) = pending_commitments.front() {
let batch_number = batch_info.batch_num;
let celestia_height = commitment.height;

// Check if the height is available on L1 before attempting to get proof
let is_available = match da_provider.height_exists_on_l1(celestia_height).await {
Ok(available) => available,
Err(e) => {
tracing::warn!(
target = "da_verifier",
batch_number = batch_number,
celestia_height = celestia_height,
error = %e,
"Failed to check if height is available on L1, will retry next tick"
);
continue;
}
};

if !is_available {
tracing::debug!(
target = "da_verifier",
batch_number = batch_number,
celestia_height = celestia_height,
"Height not yet available on L1, will retry next tick"
);
continue;
}

// Remove from queue and process
let (batch_info, commitment) = pending_commitments.pop_front().unwrap();

tracing::info!(
target = "da_verifier",
batch_number = batch_number,
celestia_height = celestia_height,
"Height is available on L1, fetching DA existence proof"
);

// Get DA existence proof
let proof = match da_provider
.get_da_existence_proof(batch_info.clone(), commitment.clone())
.await
{
Ok(p) => {
tracing::info!(
target = "da_verifier",
batch_number = batch_number,
proof_nonce = p.proof_nonce,
"DA existence proof obtained"
);
p
}
Err(e) => {
tracing::error!(
target = "da_verifier",
batch_number = batch_number,
error = %e,
"Failed to get DA existence proof"
);
return Err(TwineSequencerError::DAError(format!(
"Failed to get DA existence proof for batch {batch_number}: {e}
"
)));
}
};

// Verify on L1
let checkpoint = DACheckpoint {
batch_info,
da_commitment: commitment,
da_existence_proof: proof,
};

match da_provider.verify_da_on_l1(checkpoint).await {
Ok(()) => {
tracing::info!(
target = "da_verifier",
batch_number = batch_number,
"DA checkpoint verified on L1 successfully"
);
}
Err(e) => {
tracing::error!(
target = "da_verifier",
batch_number = batch_number,
error = %e,
"Failed to verify DA checkpoint on L1"
);
return Err(TwineSequencerError::DAError(format!(
"Failed to verify DA checkpoint on L1 for batch {batch_number}: {e}
"
)));
}
}
}
}
Copy link

Copilot AI Dec 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The verifier loop has no exit condition when the commitment channel is closed. Once all senders are dropped, the loop will continuously tick forever doing nothing. Add a check for when the channel is closed and return gracefully.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants