Skip to content

feat(server): Upload endpoint#5638

Open
jjbayer wants to merge 54 commits intotest/wait-for-logfrom
feat/tus-upload-endpoint
Open

feat(server): Upload endpoint#5638
jjbayer wants to merge 54 commits intotest/wait-for-logfrom
feat/tus-upload-endpoint

Conversation

@jjbayer
Copy link
Member

@jjbayer jjbayer commented Feb 13, 2026

This PR implements the first version of the /upload endpoint for large files.

  • It uses TUS-compliant headers, but in violation of the protocol only supports "Creation with Upload", not "Creation" with subsequent uploads.
  • The endpoint dispatches to either the upload service or the upstream relay.

TODO:

  • Wait for project config without subscription
  • Use separate credentials

Follow-up (?):

  • Let service handle project-wait?
  • ...

Closes https://linear.app/getsentry/issue/INGEST-724/implement-minimal-tus-protocol.

jjbayer and others added 30 commits February 10, 2026 09:17
Add a new `/api/{project_id}/upload/` endpoint that implements the TUS
protocol "Creation With Upload" feature. This allows uploading data in
a single POST request with TUS headers.

The endpoint validates:
- Tus-Resumable header (must be "1.0.0")
- Upload-Length header (must match body size)
- Upload-Metadata header (optional, base64-encoded key-value pairs)

Actual upload storage is not yet implemented (marked with TODOs).

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Process the upload body as a stream instead of buffering the entire
upload into memory. This makes the endpoint suitable for large file
uploads.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Split `upload.rs` into `upload/mod.rs` (endpoint handler) and
`upload/tus.rs` (protocol constants, error types, metadata parsing).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
… tests

Replace ContentLengthMismatch (400) with PayloadTooLarge (413) when the
streaming body exceeds the declared Upload-Length.

Add integration tests covering: successful upload, missing/invalid TUS
headers, and the 413 response for oversized bodies.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add a new `limits.max_upload_size` config option and apply it as
axum's DefaultBodyLimit on the TUS upload route.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Fix the upload endpoint to properly handle results from both the
upstream relay and the local upload service:

- Sink::upload now returns Result<SinkResult, UploadError> with proper
  error types for each sink variant
- Fix UploadError::UploadFailed to delegate to BadStoreRequest's
  IntoResponse (was calling .to_string() producing 200 OK)
- Add UploadError variants for Forward, UploadService, ServiceUnavailable
  with appropriate status code mapping (503, 504, 500)
- Fix handle_stream to create objectstore session with scoping and
  stream the body via put_stream
- Add scoping field to UploadStream so objectstore can scope uploads
- Change UploadStream response to AsyncResponse<Result<(), Error>> for
  proper error propagation
- Remove dbg!() call and dead code

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Wrap UploadStream in Managed<> for proper outcome tracking:

- Add expected_length field and implement Counted for UploadStream
- Change FromMessage impl to accept Managed<UploadStream>
- Create Managed<UploadStream> in the endpoint using envelope.wrap()
  to inherit scoping and outcome tracking metadata
- Update handle_stream to accept/reject the Managed wrapper
- On load-shed, properly reject the Managed instance

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Introduce ExactStream, a Sync streaming wrapper that validates the total
byte count against the announced Upload-Length header. It returns an
error if the stream provides more or fewer bytes than expected, catching
protocol violations at the stream level rather than after buffering.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Move ExactStream from the upload service module to relay-server utils,
making it generic over the inner stream type. The Stream impl requires
S: Unpin, which is naturally satisfied by boxed streams (Pin<Box<…>>).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Comment on lines +49 to +72
match self {
Error::Tus(_) => StatusCode::BAD_REQUEST,
Error::Request(error) => return error.into_response(),
Error::Upload(error) => match error {
upload::Error::Forward(error) => return error.into_response(),
upload::Error::Upstream(status) => status,
upload::Error::InvalidLocation | upload::Error::SigningFailed => {
StatusCode::INTERNAL_SERVER_ERROR
}
upload::Error::ServiceUnavailable => StatusCode::SERVICE_UNAVAILABLE,
upload::Error::UploadService(service_error) => match service_error {
ServiceError::Timeout => StatusCode::GATEWAY_TIMEOUT,
ServiceError::LoadShed => StatusCode::SERVICE_UNAVAILABLE,
ServiceError::UploadFailed(error) => match error {
objectstore::Error::Reqwest(error) => match error.status() {
Some(status) => status,
None => StatusCode::INTERNAL_SERVER_ERROR,
},
_ => StatusCode::INTERNAL_SERVER_ERROR,
},
ServiceError::Uuid(_) => StatusCode::INTERNAL_SERVER_ERROR,
},
},
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a large match expression, but I wanted control over the status codes in a single place.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's perfectly fine like this.

Comment on lines 166 to 173
if project.state().is_pending() {
state.project_cache_handle().fetch(public_key);
while project.state().is_pending() {
relay_log::trace!("Waiting for project state");
let _ = state.project_cache_handle().changes().recv().await;
project = state.project_cache_handle().get(public_key);
}
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be a function exposed from the project cache, that does not need to check every update.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd not ship this, I'd first figure out a better way to await a project config, this may be quite the footgun and potentially DoS vector.

@jjbayer jjbayer changed the base branch from master to test/wait-for-log February 13, 2026 16:18
@jjbayer jjbayer marked this pull request as ready for review February 13, 2026 16:18
@jjbayer jjbayer requested a review from a team as a code owner February 13, 2026 16:18
.route("/api/{project_id}/events/{event_id}/attachments/", post(attachments::handle))
.route("/api/{project_id}/unreal/{sentry_key}/", unreal::route(config));
.route("/api/{project_id}/unreal/{sentry_key}/", unreal::route(config))
.route("/api/{project_id}/upload/", upload::route(config));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only support "creation with upload", so we don't need /upload/{id}/ for now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Random bikeshedding about the name, I assume you went with upload to be independent of attachments and other things, generically uploading data? Any other options you considered?

Comment on lines +40 to +41
/// `application/vnd.sentry.attachment-ref`
AttachmentRef,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this because I need it for my pseudo-envelope, but we're going to need it anyway to allow SDKs / downstream relays to send placeholders for uploads.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this actually used anywhere yet?

Comment on lines 72 to 79
if has_source(&e, |source| {
source
.downcast_ref::<http_body_util::LengthLimitError>()
.is_some()
|| source
.downcast_ref::<io::Error>()
.is_some_and(|e| e.kind() == io::ErrorKind::FileTooLarge)
}) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is getting annoying, I'm open to better ideas how to unpack this error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Best idea I have is a macro :/

///
/// This is currently the easiest way to guarantee that the upload gets checked the same way as
/// the envelope.
async fn check_request(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm willing to refactor this to work without envelopes, I believe I'm still missing the state.memory_checker().check_memory() from handle_envelope, so the check is incomplete anyway.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine for a start, but we should consider changing this. Either a separate path or better a generic path which can work with envelopes and other items at the same time.

Comment on lines +49 to +72
match self {
Error::Tus(_) => StatusCode::BAD_REQUEST,
Error::Request(error) => return error.into_response(),
Error::Upload(error) => match error {
upload::Error::Forward(error) => return error.into_response(),
upload::Error::Upstream(status) => status,
upload::Error::InvalidLocation | upload::Error::SigningFailed => {
StatusCode::INTERNAL_SERVER_ERROR
}
upload::Error::ServiceUnavailable => StatusCode::SERVICE_UNAVAILABLE,
upload::Error::UploadService(service_error) => match service_error {
ServiceError::Timeout => StatusCode::GATEWAY_TIMEOUT,
ServiceError::LoadShed => StatusCode::SERVICE_UNAVAILABLE,
ServiceError::UploadFailed(error) => match error {
objectstore::Error::Reqwest(error) => match error.status() {
Some(status) => status,
None => StatusCode::INTERNAL_SERVER_ERROR,
},
_ => StatusCode::INTERNAL_SERVER_ERROR,
},
ServiceError::Uuid(_) => StatusCode::INTERNAL_SERVER_ERROR,
},
},
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's perfectly fine like this.

Comment on lines +112 to +113
.await
.map_err(Error::from)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like it should already be handled by the ?.

Suggested change
.await
.map_err(Error::from)?;
.await?;

}

let content_type = headers.get(hyper::header::CONTENT_TYPE);
if content_type.is_none_or(|ct| ct != EXPECTED_CONTENT_TYPE) {
Copy link
Contributor

@loewenheim loewenheim Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the version check you do a comparison with Some(&TUS_VERSION). I would settle on one of the two idioms.

Comment on lines 514 to 520
/// Returns the attachment payload size.
///
/// For AttachmentV2, returns only the size of the actual payload, excluding the attachment meta.
/// For Attachment, returns the size of entire payload.
///
/// **Note:** This relies on the `meta_length` header which might not be correct as such this
/// is best effort.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This docstring could use a sentence about the newly added case.

#[serde(flatten, skip_serializing_if = "Option::is_none")]
parent_id: Option<ParentId>,

/// Size of the attachment this reference rate limits.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't entirely understand the reference to rate limiting here.

uri.as_bytes(),
&SignatureHeader {
timestamp: Some(Utc::now()),
signature_algorithm: None, //Some(SignatureAlgorithm::Prehashed),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this comment mean?

url = { workspace = true, features = ["serde"] }
uuid = { workspace = true, features = ["v5"] }
zstd = { workspace = true }
http.workspace = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Might want to sort this alphabetically and use the conventional formatting.

uuid = { workspace = true }
reqwest = { workspace = true, features = ["gzip", "native-tls-vendored"] }
mimalloc = { workspace = true, features = ["v3", "override", "debug_in_debug"] }
http.workspace = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See relay-server/Cargo.toml.

Comment on lines +28 to +37
response = relay.post(
"/api/%s/upload/?sentry_key=%s"
% (project_id, mini_sentry.get_dsn_public_key(project_id)),
headers={
"Tus-Resumable": "1.0.0",
"Upload-Length": str(len(data)),
"Content-Type": "application/offset+octet-stream",
},
data=data,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This uploads the payload to relay, which forwards it to mini_sentry, which has a dummy implementation of the upload route. Is that correct?

(length,) = query_params["length"]
assert length == "11"
(signature,) = query_params["signature"]
print(signature)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this intentionally left in the test?

.route("/api/{project_id}/events/{event_id}/attachments/", post(attachments::handle))
.route("/api/{project_id}/unreal/{sentry_key}/", unreal::route(config));
.route("/api/{project_id}/unreal/{sentry_key}/", unreal::route(config))
.route("/api/{project_id}/upload/", upload::route(config));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Random bikeshedding about the name, I assume you went with upload to be independent of attachments and other things, generically uploading data? Any other options you considered?

.route("/api/{project_id}/events/{event_id}/attachments/", post(attachments::handle))
.route("/api/{project_id}/unreal/{sentry_key}/", unreal::route(config));
.route("/api/{project_id}/unreal/{sentry_key}/", unreal::route(config))
.route("/api/{project_id}/upload/", upload::route(config));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly we should do some versioning for this endpoint?

response.headers_mut().insert(
tus::UPLOAD_OFFSET,
HeaderValue::from_str(&upload_length.to_string())
.expect("integer should always be a valid header"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: In most (?) places we just turn this into error responses to not have a panic path, I'd also consider this here or make a utility which converts integers to headers so we have an auditable place.

///
/// This is currently the easiest way to guarantee that the upload gets checked the same way as
/// the envelope.
async fn check_request(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine for a start, but we should consider changing this. Either a separate path or better a generic path which can work with envelopes and other items at the same time.

Comment on lines +126 to +129
let signature = config
.credentials()
.ok_or(Error::SigningFailed)?
.secret_key
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should provision separate credentials here, right now different Relay instances can have different credentials, but now they wouldn't be able to verify validity. This also limits us from giving the credentials to a different layer of Relays.

key,
length,
} = self;
format!("/api/{project_id}/upload/{key}/?length={length}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is the best way to do this via TUS, I guess we have to, but this feels wrong, and I'd much rather have something much more explicit.

Comment on lines +40 to +41
/// `application/vnd.sentry.attachment-ref`
AttachmentRef,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this actually used anywhere yet?

format!("/api/{project_id}/upload/{key}/?length={length}")
}

fn try_sign(self, config: &Config) -> Result<SignedLocation, Error> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be entirely untested and no test to verify the signature, including timestamps etc.

}

/// An identifier for the upload.
pub struct Location {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a lot of logic attached here, with signing etc. I think it'd be great to add all this to the docs.

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.

let notify = Arc::clone(&guard.notify);
drop(guard); // don't hold the guard across await points
notify.notified().await;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Race condition in ready_project_inner may miss notification

Medium Severity

In ready_project_inner, the Notified future is created (via notify.notified()) after checking is_pending() and dropping the guard. Tokio's notify_waiters() uses a counter-based mechanism — a Notified future only captures notifications that occur after it's created. If set_project_state fires notify_waiters() between drop(guard) and notify.notified(), the notification is permanently missed, causing the loop to block until timeout or the next periodic project refresh. Moving notified() creation to before the is_pending() check would close this race window.

Fix in Cursor Fix in Web

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants