From 4c9e3f2252dc26548b62ccea9370ddd77faf6d9f Mon Sep 17 00:00:00 2001 From: null <60427892+null8626@users.noreply.github.com> Date: Sat, 21 Feb 2026 21:09:44 +0700 Subject: [PATCH 1/9] feat: update to webhooks v2 --- src/webhooks/actix_web.rs | 71 +++++++++--------- src/webhooks/axum.rs | 95 ++++++++++++++---------- src/webhooks/mod.rs | 150 +++++++++++++++++++++++++++++--------- src/webhooks/payload.rs | 103 ++++++++++++++++++++++++++ src/webhooks/rocket.rs | 31 ++++---- src/webhooks/vote.rs | 67 ----------------- src/webhooks/warp.rs | 78 ++++++++------------ 7 files changed, 355 insertions(+), 240 deletions(-) create mode 100644 src/webhooks/payload.rs delete mode 100644 src/webhooks/vote.rs diff --git a/src/webhooks/actix_web.rs b/src/webhooks/actix_web.rs index 9393140..96ffbb7 100644 --- a/src/webhooks/actix_web.rs +++ b/src/webhooks/actix_web.rs @@ -1,62 +1,61 @@ -use crate::Incoming; -use actix_web::{ - dev::Payload, - error::{Error, ErrorBadRequest, ErrorUnauthorized}, - web::Json, - FromRequest, HttpRequest, -}; -use serde::de::DeserializeOwned; +use super::IncomingPayload; use std::{ future::Future, pin::Pin, - task::{ready, Context, Poll}, + task::{Context, Poll, ready}, +}; + +use actix_web::{ + FromRequest, HttpRequest, + dev::Payload, + error::{Error, ErrorBadRequest, ErrorUnauthorized}, }; +use futures_core::stream::Stream; #[doc(hidden)] -pub struct IncomingFut { +pub struct IncomingPayloadFut { req: HttpRequest, - json_fut: as FromRequest>::Future, + payload: Payload, + body: Vec, } -impl Future for IncomingFut -where - T: DeserializeOwned, -{ - type Output = Result, Error>; +impl Future for IncomingPayloadFut { + type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - if let Ok(json) = ready!(Pin::new(&mut self.json_fut).poll(cx)) { - let headers = self.req.headers(); - - if let Some(authorization) = headers.get("Authorization") { - if let Ok(authorization) = authorization.to_str() { - return Poll::Ready(Ok(Incoming { - authorization: authorization.to_owned(), - data: json.into_inner(), - })); - } + while let Some(body) = ready!(Pin::new(&mut self.payload).poll_next(cx)) { + match body { + Ok(body) => self.body.extend_from_slice(&body), + + Err(_) => return Poll::Ready(Err(ErrorBadRequest("400"))), } + } + + let headers = self.req.headers(); - return Poll::Ready(Err(ErrorUnauthorized("401"))); + if let (Some(signature), Some(trace)) = ( + headers.get("x-topgg-signature"), + headers.get("x-topgg-trace"), + ) && let (Ok(signature), Ok(trace)) = (signature.to_str(), trace.to_str()) + && let Some(incoming) = IncomingPayload::new(signature, self.body.clone(), trace) + { + return Poll::Ready(Ok(incoming)); } - Poll::Ready(Err(ErrorBadRequest("400"))) + Poll::Ready(Err(ErrorUnauthorized("401"))) } } #[cfg_attr(docsrs, doc(cfg(feature = "actix-web")))] -impl FromRequest for Incoming -where - T: DeserializeOwned, -{ +impl FromRequest for IncomingPayload { type Error = Error; - type Future = IncomingFut; + type Future = IncomingPayloadFut; - #[inline(always)] fn from_request(req: &HttpRequest, payload: &mut Payload) -> Self::Future { - IncomingFut { + IncomingPayloadFut { req: req.clone(), - json_fut: Json::from_request(req, payload), + payload: payload.take(), + body: vec![], } } } diff --git a/src/webhooks/axum.rs b/src/webhooks/axum.rs index 4175b1b..7a163e5 100644 --- a/src/webhooks/axum.rs +++ b/src/webhooks/axum.rs @@ -1,45 +1,69 @@ -use super::Webhook; +use super::Payload; +use std::sync::Arc; + use axum::{ + Router, extract::State, http::{HeaderMap, StatusCode}, - response::IntoResponse, + response::{IntoResponse, Response}, routing::post, - Router, }; -use serde::de::DeserializeOwned; -use std::sync::Arc; + +/// An axum webhook listener for listening to payloads. +/// +/// # Example +/// +/// ```rust,no_run +/// struct MyTopggListener {} +/// +/// #[async_trait::async_trait] +/// impl topgg::axum::Listener for MyTopggListener { +/// async fn callback(self: Arc, payload: Payload, _trace: &str) -> Response { +/// println!("{payload:?}"); +/// +/// (StatusCode::NO_CONTENT, ()).into_response() +/// } +/// } +/// ``` +#[async_trait::async_trait] +#[cfg_attr(docsrs, doc(cfg(feature = "axum")))] +pub trait Listener: Send + Sync + 'static { + async fn callback(self: Arc, payload: Payload, trace: &str) -> Response; +} struct WebhookState { state: Arc, - password: Arc, + secret: Arc, } impl Clone for WebhookState { - #[inline(always)] fn clone(&self) -> Self { Self { - state: Arc::clone(&self.state), - password: Arc::clone(&self.password), + state: self.state.clone(), + secret: self.secret.clone(), } } } -/// Creates a new axum [`Router`] for receiving vote events. +/// Creates a new axum [`Router`] for receiving webhook payloads. /// /// # Example /// /// ```rust,no_run -/// use axum::{routing::get, Router}; -/// use topgg::{VoteEvent, Webhook}; -/// use tokio::net::TcpListener; +/// use topgg::Payload; /// use std::sync::Arc; /// -/// struct MyVoteListener {} +/// use axum::{http::status::StatusCode, response::{IntoResponse, Response}, routing::get, Router}; +/// use tokio::net::TcpListener; +/// +/// struct MyTopggListener {} /// /// #[async_trait::async_trait] -/// impl Webhook for MyVoteListener { -/// async fn callback(&self, vote: VoteEvent) { -/// println!("A user with the ID of {} has voted us on Top.gg!", vote.voter_id); +/// impl topgg::axum::Listener for MyTopggListener { +/// async fn callback(self: Arc, payload: Payload, _trace: &str) -> Response { +/// println!("{payload:?}"); +/// +/// (StatusCode::NO_CONTENT, ()).into_response() /// } /// } /// @@ -49,11 +73,11 @@ impl Clone for WebhookState { /// /// #[tokio::main] /// async fn main() { -/// let state = Arc::new(MyVoteListener {}); +/// let state = Arc::new(MyTopggListener {}); /// /// let router = Router::new().route("/", get(index)).nest( -/// "/votes", -/// topgg::axum::webhook(env!("MY_TOPGG_WEBHOOK_SECRET").to_string(), Arc::clone(&state)), +/// "/webhook", +/// topgg::axum::webhook(Arc::clone(&state), env!("TOPGG_WEBHOOK_SECRET").to_string()), /// ); /// /// let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap(); @@ -61,36 +85,31 @@ impl Clone for WebhookState { /// axum::serve(listener, router).await.unwrap(); /// } /// ``` -#[inline(always)] #[cfg_attr(docsrs, doc(cfg(feature = "axum")))] -pub fn webhook(password: String, state: Arc) -> Router +pub fn webhook(state: Arc, secret: String) -> Router where - D: DeserializeOwned + Send, - T: Webhook, + S: Listener, { Router::new() .route( "/", post( - async |headers: HeaderMap, State(webhook): State>, body: String| { - if let Some(authorization) = headers.get("Authorization") { - if let Ok(authorization) = authorization.to_str() { - if authorization == *(webhook.password) { - if let Ok(data) = serde_json::from_str(&body) { - webhook.state.callback(data).await; - - return (StatusCode::NO_CONTENT, ()).into_response(); - } - } - } + async |headers: HeaderMap, State(wrapped_state): State>, body: String| { + if let Some(signature) = headers.get("x-topgg-signature") + && let Ok(signature) = signature.to_str() + && let Some(trace) = headers.get("x-topgg-trace") + && let Ok(trace) = trace.to_str() + && let Some(payload) = Payload::new(signature, &body, &wrapped_state.secret) + { + wrapped_state.state.callback(payload, trace).await + } else { + (StatusCode::UNAUTHORIZED, ()).into_response() } - - (StatusCode::UNAUTHORIZED, ()).into_response() }, ), ) .with_state(WebhookState { state, - password: Arc::new(password), + secret: Arc::new(secret), }) } diff --git a/src/webhooks/mod.rs b/src/webhooks/mod.rs index a5ec591..124daf9 100644 --- a/src/webhooks/mod.rs +++ b/src/webhooks/mod.rs @@ -1,6 +1,6 @@ -mod vote; -#[cfg_attr(docsrs, doc(cfg(feature = "webhooks")))] -pub use vote::*; +mod payload; + +pub use payload::Payload; #[cfg(feature = "actix-web")] mod actix_web; @@ -26,49 +26,127 @@ cfg_if::cfg_if! { cfg_if::cfg_if! { if #[cfg(any(feature = "actix-web", feature = "rocket"))] { - /// An unauthenticated incoming Top.gg webhook request. - #[must_use] + use std::collections::HashMap; + + use hmac::{Hmac, Mac}; + use sha2::Sha256; + + /// An incoming [`Payload`] that is yet to be [authenticated with a secret][IncomingPayload::authenticate]. + /// + /// # Examples + /// + /// With actix-web: + /// + /// ```rust,no_run + /// use topgg::IncomingPayload; + /// use std::io; + /// + /// use actix_web::{ + /// error::{Error, ErrorUnauthorized}, + /// get, post, App, HttpServer, + /// }; + /// + /// #[get("/")] + /// async fn index() -> &'static str { + /// "Hello, World!" + /// } + /// + /// #[post("/webhook")] + /// async fn webhook(payload: IncomingPayload) -> Result<&'static str, Error> { + /// match payload.authenticate(env!("TOPGG_WEBHOOK_SECRET")) { + /// Some(payload) => { + /// println!("{payload:?}"); + /// + /// Ok("ok") + /// } + /// + /// _ => Err(ErrorUnauthorized("401")), + /// } + /// } + /// + /// #[actix_web::main] + /// async fn main() -> io::Result<()> { + /// HttpServer::new(|| App::new().service(index).service(webhook)) + /// .bind("127.0.0.1:8080")? + /// .run() + /// .await + /// } + /// ``` + /// + /// With rocket: + /// + /// ```rust,no_run + /// use topgg::IncomingPayload; + /// + /// use rocket::{get, http::Status, launch, post, routes, Build, Rocket}; + /// + /// #[get("/")] + /// fn index() -> &'static str { + /// "Hello, World!" + /// } + /// + /// #[post("/webhook", data = "")] + /// fn webhook(payload: IncomingPayload) -> Status { + /// match payload.authenticate(env!("TOPGG_WEBHOOK_SECRET")) { + /// Some(payload) => { + /// println!("{payload:?}"); + /// + /// Status::Ok + /// }, + /// _ => { + /// println!("found an unauthorized attacker."); + /// + /// Status::Unauthorized + /// } + /// } + /// } + /// + /// #[launch] + /// fn rocket() -> Rocket { + /// rocket::build().mount("/", routes![index, webhook]) + /// } + /// ``` #[cfg_attr(docsrs, doc(cfg(any(feature = "actix-web", feature = "rocket"))))] - pub struct Incoming { - pub(crate) authorization: String, - pub(crate) data: T, + pub struct IncomingPayload { + t: String, + signature: String, + body: String, + trace: String, } - impl Incoming { - /// Authenticates a valid password with this request. + impl IncomingPayload { + pub(super) fn new(signature: &str, body: Vec, trace: &str) -> Option { + let signature = signature.split(',').filter_map(|p| p.split_once('=')).collect::>(); + + Some(Self { + t: signature.get("t")?.to_string(), + signature: signature.get("v1")?.to_string(), + body: String::from_utf8(body).ok()?, + trace: trace.into(), + }) + } + + /// Tries to authenticate a valid secret with this request. #[must_use] - #[inline(always)] - pub fn authenticate(self, password: &str) -> Option { - if self.authorization == password { - Some(self.data) + pub fn authenticate(&self, secret: &str) -> Option { + let mut hmac = Hmac::::new_from_slice(secret.as_bytes()).ok()?; + + hmac.update(format!("{}.{}", self.t, self.body).as_bytes()); + + let digest = hex::encode(hmac.finalize().into_bytes()); + + if digest == self.signature && let Ok(payload) = serde_json::from_str(&self.body) { + Some(payload) } else { None } } - } - impl Clone for Incoming - where - T: Clone, - { - #[inline(always)] - fn clone(&self) -> Self { - Self { - authorization: self.authorization.clone(), - data: self.data.clone(), - } + /// Retrieves the payload's `x-topgg-trace` header for debugging and correlating requests with Top.gg support. + #[must_use] + pub fn get_trace(&self) -> &str { + &self.trace } } } } - -cfg_if::cfg_if! { - if #[cfg(any(feature = "axum", feature = "warp"))] { - /// Webhook event handler. - #[cfg_attr(docsrs, doc(cfg(any(feature = "axum", feature = "warp"))))] - #[async_trait::async_trait] - pub trait Webhook: Send + Sync + 'static { - async fn callback(&self, data: T); - } - } -} diff --git a/src/webhooks/payload.rs b/src/webhooks/payload.rs new file mode 100644 index 0000000..a62e3b6 --- /dev/null +++ b/src/webhooks/payload.rs @@ -0,0 +1,103 @@ +use super::super::{PartialProject, User, snowflake}; + +use chrono::{DateTime, Utc}; +use serde::Deserialize; + +/// A webhook payload. +#[non_exhaustive] +#[derive(Clone, Debug, Deserialize)] +#[serde(tag = "type", content = "data")] +#[cfg_attr(docsrs, doc(cfg(feature = "webhooks")))] +pub enum Payload { + /// A `integration.create` webhook payload. Fires when a user has connected to your webhook integration. + #[serde(rename = "integration.create")] + IntegrationCreate { + /// The unique identifier for this connection. + #[serde(deserialize_with = "snowflake::deserialize")] + connection_id: u64, + + /// The secret used to verify future webhook deliveries. + #[serde(rename = "webhook_secret")] + secret: String, + + /// The project that the integration refers to. + project: PartialProject, + + /// The user who triggered this event. + user: User, + }, + + /// A `integration.delete` webhook payload. Fires when a user has disconnected from your webhook integration. + #[serde(rename = "integration.delete")] + IntegrationDelete { + /// The unique identifier for this connection. + #[serde(deserialize_with = "snowflake::deserialize")] + connection_id: u64, + }, + + /// A `webhook.test` webhook payload. Fires upon sent test from the project dashboard. + #[serde(rename = "webhook.test")] + Test { + /// The project that the test refers to. + project: PartialProject, + + /// The user who triggered this test. + user: User, + }, + + /// A `vote.create` webhook payload. Fires when a user votes for your project. + #[serde(rename = "vote.create")] + VoteCreate { + /// The vote's ID. + #[serde(deserialize_with = "snowflake::deserialize")] + id: u64, + + /// The number of votes this vote counted for. This is a rounded integer value which determines how many points this individual vote was worth. + weight: u64, + + /// When the vote was cast. + created_at: DateTime, + + /// When the vote expires and the user is required to vote again. + expires_at: DateTime, + + /// The project that received this vote. + project: PartialProject, + + /// The user who voted for this project. + user: User, + }, +} + +impl Payload { + #[cfg(any(feature = "axum", feature = "warp"))] + pub(super) fn new(signature: &str, body: &str, secret: &str) -> Option { + use std::collections::HashMap; + + use hmac::{Hmac, Mac}; + use sha2::Sha256; + + let signature = signature + .split(',') + .filter_map(|p| p.split_once('=')) + .collect::>(); + + let (Some(t), Some(signature)) = (signature.get("t"), signature.get("v1")) else { + return None; + }; + + let mut hmac = Hmac::::new_from_slice(secret.as_bytes()).ok()?; + + hmac.update(format!("{t}.{body}").as_bytes()); + + let digest = hex::encode(hmac.finalize().into_bytes()); + + if &digest == signature + && let Ok(payload) = serde_json::from_str(body) + { + Some(payload) + } else { + None + } + } +} diff --git a/src/webhooks/rocket.rs b/src/webhooks/rocket.rs index 0bfb988..31d0034 100644 --- a/src/webhooks/rocket.rs +++ b/src/webhooks/rocket.rs @@ -1,31 +1,30 @@ -use crate::Incoming; +use super::IncomingPayload; + use rocket::{ - data::{Data, FromData, Outcome}, + data::{Data, FromData, Outcome, ToByteUnit}, http::Status, request::Request, - serde::json::Json, }; -use serde::de::DeserializeOwned; #[cfg_attr(docsrs, doc(cfg(feature = "rocket")))] #[rocket::async_trait] -impl<'r, T> FromData<'r> for Incoming -where - T: DeserializeOwned, -{ +impl<'r> FromData<'r> for IncomingPayload { type Error = (); async fn from_data(request: &'r Request<'_>, data: Data<'r>) -> Outcome<'r, Self> { let headers = request.headers(); - if let Some(authorization) = headers.get_one("Authorization") { - return match as FromData>::from_data(request, data).await { - Outcome::Success(data) => Outcome::Success(Self { - authorization: authorization.to_owned(), - data: data.into_inner(), - }), - _ => Outcome::Error((Status::BadRequest, ())), - }; + if let (Some(signature), Some(trace)) = ( + headers.get_one("x-topgg-signature"), + headers.get_one("x-topgg-trace"), + ) { + if let Ok(body) = data.open(2.mebibytes()).into_bytes().await + && let Some(output) = Self::new(signature, body.into_inner(), trace) + { + return Outcome::Success(output); + } + + return Outcome::Error((Status::BadRequest, ())); } Outcome::Error((Status::Unauthorized, ())) diff --git a/src/webhooks/vote.rs b/src/webhooks/vote.rs deleted file mode 100644 index 0bbbb54..0000000 --- a/src/webhooks/vote.rs +++ /dev/null @@ -1,67 +0,0 @@ -use crate::snowflake; -use serde::{Deserialize, Deserializer}; -use std::collections::HashMap; - -#[inline(always)] -fn deserialize_is_test<'de, D>(deserializer: D) -> Result -where - D: Deserializer<'de>, -{ - String::deserialize(deserializer).map(|s| s == "test") -} - -fn deserialize_query_string<'de, D>(deserializer: D) -> Result, D::Error> -where - D: Deserializer<'de>, -{ - Ok( - String::deserialize(deserializer) - .map(|s| { - let mut output = HashMap::new(); - - for mut it in s - .trim_start_matches('?') - .split('&') - .map(|pair| pair.split('=')) - { - if let (Some(k), Some(v)) = (it.next(), it.next()) { - if let Ok(v) = urlencoding::decode(v) { - output.insert(k.to_owned(), v.into_owned()); - } - } - } - - output - }) - .unwrap_or_default(), - ) -} - -/// A dispatched Top.gg vote event. -#[must_use] -#[derive(Clone, Debug, Deserialize)] -pub struct VoteEvent { - /// The ID of the project that received a vote. - #[serde( - deserialize_with = "snowflake::deserialize", - alias = "bot", - alias = "guild" - )] - pub receiver_id: u64, - - /// The ID of the Top.gg user who voted. - #[serde(deserialize_with = "snowflake::deserialize", rename = "user")] - pub voter_id: u64, - - /// Whether this vote is just a test done from the page settings. - #[serde(deserialize_with = "deserialize_is_test", rename = "type")] - pub is_test: bool, - - /// Whether the weekend multiplier is active, where a single vote counts as two. - #[serde(default, rename = "isWeekend")] - pub is_weekend: bool, - - /// Query strings found on the vote page. - #[serde(default, deserialize_with = "deserialize_query_string")] - pub query: HashMap, -} diff --git a/src/webhooks/warp.rs b/src/webhooks/warp.rs index 51c108b..30d3dba 100644 --- a/src/webhooks/warp.rs +++ b/src/webhooks/warp.rs @@ -1,36 +1,34 @@ -use super::Webhook; -use serde::de::DeserializeOwned; -use std::sync::Arc; -use warp::{body, header, http::StatusCode, path, Filter, Rejection, Reply}; +use super::Payload; + +use bytes::Bytes; +use warp::{Filter, Rejection, body, header, path}; /// Creates a new warp [`Filter`] for receiving webhook events. /// /// # Example /// /// ```rust,no_run -/// use std::{net::SocketAddr, sync::Arc}; -/// use topgg::{VoteEvent, Webhook}; -/// use warp::Filter; -/// -/// struct MyVoteListener {} +/// use std::net::SocketAddr; /// -/// #[async_trait::async_trait] -/// impl Webhook for MyVoteListener { -/// async fn callback(&self, vote: VoteEvent) { -/// println!("A user with the ID of {} has voted us on Top.gg!", vote.voter_id); -/// } -/// } +/// use warp::{http::StatusCode, reply, Filter}; /// /// #[tokio::main] /// async fn main() { -/// let state = Arc::new(MyVoteListener {}); -/// -/// // POST /votes +/// // POST /webhook /// let webhook = topgg::warp::webhook( -/// "votes", -/// env!("MY_TOPGG_WEBHOOK_SECRET").to_string(), -/// Arc::clone(&state), -/// ); +/// "webhook", +/// env!("TOPGG_WEBHOOK_SECRET").to_string() +/// ).then(|payload, _trace| async move { +/// match payload { +/// Some(payload) => { +/// println!("{payload:?}"); +/// +/// reply::with_status("", StatusCode::NO_CONTENT) +/// }, +/// +/// None => reply::with_status("Unauthorized", StatusCode::UNAUTHORIZED) +/// } +/// }); /// /// let routes = warp::get().map(|| "Hello, World!").or(webhook); /// @@ -39,34 +37,20 @@ use warp::{body, header, http::StatusCode, path, Filter, Rejection, Reply}; /// warp::serve(routes).run(addr).await /// } /// ``` +#[must_use] #[cfg_attr(docsrs, doc(cfg(feature = "warp")))] -pub fn webhook( +pub fn webhook( endpoint: &'static str, - password: String, - state: Arc, -) -> impl Filter + Clone -where - D: DeserializeOwned + Send, - T: Webhook, -{ - let password = Arc::new(password); - + secret: String, +) -> impl Filter, String), Error = Rejection> + Clone { warp::post() .and(path(endpoint)) - .and(header("Authorization")) - .and(body::json()) - .then(move |auth: String, data: D| { - let current_state = Arc::clone(&state); - let current_password = Arc::clone(&password); - - async move { - if auth == *current_password { - current_state.callback(data).await; - - StatusCode::NO_CONTENT - } else { - StatusCode::UNAUTHORIZED - } - } + .and(header("x-topgg-signature")) + .and(body::bytes()) + .map(move |signature: String, body: Bytes| { + str::from_utf8(&body) + .ok() + .and_then(|body| Payload::new(&signature, body, &secret)) }) + .and(header("x-topgg-trace")) } From 4fb0852b22cd8e389727f6a5acf8e2d3bf42e403 Mon Sep 17 00:00:00 2001 From: null <60427892+null8626@users.noreply.github.com> Date: Wed, 25 Feb 2026 04:58:29 +0700 Subject: [PATCH 2/9] doc: use an, not a --- src/webhooks/payload.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/webhooks/payload.rs b/src/webhooks/payload.rs index a62e3b6..6723f88 100644 --- a/src/webhooks/payload.rs +++ b/src/webhooks/payload.rs @@ -9,7 +9,7 @@ use serde::Deserialize; #[serde(tag = "type", content = "data")] #[cfg_attr(docsrs, doc(cfg(feature = "webhooks")))] pub enum Payload { - /// A `integration.create` webhook payload. Fires when a user has connected to your webhook integration. + /// An `integration.create` webhook payload. Fires when a user has connected to your webhook integration. #[serde(rename = "integration.create")] IntegrationCreate { /// The unique identifier for this connection. @@ -27,7 +27,7 @@ pub enum Payload { user: User, }, - /// A `integration.delete` webhook payload. Fires when a user has disconnected from your webhook integration. + /// An `integration.delete` webhook payload. Fires when a user has disconnected from your webhook integration. #[serde(rename = "integration.delete")] IntegrationDelete { /// The unique identifier for this connection. From d93289e8195e92c871a71e956eaa8e983f606389 Mon Sep 17 00:00:00 2001 From: null <60427892+null8626@users.noreply.github.com> Date: Wed, 4 Mar 2026 23:19:59 +0700 Subject: [PATCH 3/9] feat: rename created_at to voted_at --- src/webhooks/payload.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/webhooks/payload.rs b/src/webhooks/payload.rs index 6723f88..12b16c1 100644 --- a/src/webhooks/payload.rs +++ b/src/webhooks/payload.rs @@ -56,7 +56,8 @@ pub enum Payload { weight: u64, /// When the vote was cast. - created_at: DateTime, + #[serde(rename = "created_at")] + voted_at: DateTime, /// When the vote expires and the user is required to vote again. expires_at: DateTime, From 8d9e27984f5fd75abf69dbea76791f415ecca99d Mon Sep 17 00:00:00 2001 From: null <60427892+null8626@users.noreply.github.com> Date: Sat, 7 Mar 2026 19:53:58 +0700 Subject: [PATCH 4/9] style: minor dependabot.yml reformatting --- .github/dependabot.yml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/.github/dependabot.yml b/.github/dependabot.yml index 5505ae2..5ef95a7 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -1,10 +1,10 @@ version: 2 updates: - - package-ecosystem: "cargo" - directory: "/" + - package-ecosystem: cargo + directory: '/' commit-message: - prefix: "deps: " + prefix: 'deps: ' schedule: - day: "saturday" - interval: "weekly" - time: "07:15" \ No newline at end of file + day: saturday + interval: weekly + time: '07:15' \ No newline at end of file From 7fc1e7c48457efd7398d2879e9e36e922cbb053d Mon Sep 17 00:00:00 2001 From: null <60427892+null8626@users.noreply.github.com> Date: Wed, 11 Mar 2026 01:23:53 +0700 Subject: [PATCH 5/9] fix: fallback to warn + 204 instead of returning 422 --- src/webhooks/actix_web.rs | 56 ++++++++++++++++++++++++++++++++++----- src/webhooks/axum.rs | 12 +++++++-- src/webhooks/rocket.rs | 13 ++++++--- 3 files changed, 68 insertions(+), 13 deletions(-) diff --git a/src/webhooks/actix_web.rs b/src/webhooks/actix_web.rs index 96ffbb7..bdc4649 100644 --- a/src/webhooks/actix_web.rs +++ b/src/webhooks/actix_web.rs @@ -1,16 +1,52 @@ use super::IncomingPayload; use std::{ + fmt::{self, Display, Formatter}, future::Future, pin::Pin, task::{Context, Poll, ready}, }; use actix_web::{ - FromRequest, HttpRequest, - dev::Payload, - error::{Error, ErrorBadRequest, ErrorUnauthorized}, + FromRequest, HttpRequest, HttpResponse, ResponseError, body::BoxBody, dev::Payload, + http::StatusCode, }; use futures_core::stream::Stream; +use log::warn; + +#[doc(hidden)] +#[derive(Debug)] +pub enum IncomingPayloadError { + ParseFailure, + Unauthorized, +} + +impl Display for IncomingPayloadError { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.write_str(match self { + Self::ParseFailure => "Unable to parse Top.gg webhook payload.", + + Self::Unauthorized => "Unauthorized.", + }) + } +} + +impl ResponseError for IncomingPayloadError { + fn error_response(&self) -> HttpResponse { + match self { + Self::ParseFailure => HttpResponse::NoContent().body(()), + + Self::Unauthorized => HttpResponse::Unauthorized().body("Unauthorized"), + } + } + + fn status_code(&self) -> StatusCode { + match self { + Self::ParseFailure => StatusCode::NO_CONTENT, + + Self::Unauthorized => StatusCode::UNAUTHORIZED, + } + } +} #[doc(hidden)] pub struct IncomingPayloadFut { @@ -20,14 +56,20 @@ pub struct IncomingPayloadFut { } impl Future for IncomingPayloadFut { - type Output = Result; + type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { while let Some(body) = ready!(Pin::new(&mut self.payload).poll_next(cx)) { match body { Ok(body) => self.body.extend_from_slice(&body), - Err(_) => return Poll::Ready(Err(ErrorBadRequest("400"))), + Err(err) => { + warn!( + "Unable to parse Top.gg webhook payload. Please report this bug to the SDK maintainers: {err:?}" + ); + + return Poll::Ready(Err(IncomingPayloadError::ParseFailure)); + } } } @@ -42,13 +84,13 @@ impl Future for IncomingPayloadFut { return Poll::Ready(Ok(incoming)); } - Poll::Ready(Err(ErrorUnauthorized("401"))) + Poll::Ready(Err(IncomingPayloadError::Unauthorized)) } } #[cfg_attr(docsrs, doc(cfg(feature = "actix-web")))] impl FromRequest for IncomingPayload { - type Error = Error; + type Error = IncomingPayloadError; type Future = IncomingPayloadFut; fn from_request(req: &HttpRequest, payload: &mut Payload) -> Self::Future { diff --git a/src/webhooks/axum.rs b/src/webhooks/axum.rs index 7a163e5..b7051c1 100644 --- a/src/webhooks/axum.rs +++ b/src/webhooks/axum.rs @@ -8,6 +8,7 @@ use axum::{ response::{IntoResponse, Response}, routing::post, }; +use log::warn; /// An axum webhook listener for listening to payloads. /// @@ -99,9 +100,16 @@ where && let Ok(signature) = signature.to_str() && let Some(trace) = headers.get("x-topgg-trace") && let Ok(trace) = trace.to_str() - && let Some(payload) = Payload::new(signature, &body, &wrapped_state.secret) { - wrapped_state.state.callback(payload, trace).await + if let Some(payload) = Payload::new(signature, &body, &wrapped_state.secret) { + wrapped_state.state.callback(payload, trace).await + } else { + warn!( + "Unable to parse Top.gg webhook payload. Please report this bug to the SDK maintainers.\n--- BEGIN BODY DUMP ---\n{body}\n--- END BODY DUMP ---" + ); + + (StatusCode::NO_CONTENT, ()).into_response() + } } else { (StatusCode::UNAUTHORIZED, ()).into_response() } diff --git a/src/webhooks/rocket.rs b/src/webhooks/rocket.rs index 31d0034..88afc28 100644 --- a/src/webhooks/rocket.rs +++ b/src/webhooks/rocket.rs @@ -1,5 +1,6 @@ use super::IncomingPayload; +use log::warn; use rocket::{ data::{Data, FromData, Outcome, ToByteUnit}, http::Status, @@ -18,10 +19,14 @@ impl<'r> FromData<'r> for IncomingPayload { headers.get_one("x-topgg-signature"), headers.get_one("x-topgg-trace"), ) { - if let Ok(body) = data.open(2.mebibytes()).into_bytes().await - && let Some(output) = Self::new(signature, body.into_inner(), trace) - { - return Outcome::Success(output); + if let Ok(body) = data.open(2.mebibytes()).into_bytes().await { + return Self::new(signature, body.into_inner(), trace).map_or_else(|| { + warn!( + "Unable to parse Top.gg webhook payload. Please report this bug to the SDK maintainers." + ); + + Outcome::Error((Status::NoContent, ())) + }, Outcome::Success); } return Outcome::Error((Status::BadRequest, ())); From 772c7a79d3730b8d92d2e963b54dded2a1229a38 Mon Sep 17 00:00:00 2001 From: null <60427892+null8626@users.noreply.github.com> Date: Wed, 11 Mar 2026 07:22:41 +0700 Subject: [PATCH 6/9] fix: enforce 2 MiB body limit in warp filter --- src/webhooks/warp.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/webhooks/warp.rs b/src/webhooks/warp.rs index 30d3dba..c95f234 100644 --- a/src/webhooks/warp.rs +++ b/src/webhooks/warp.rs @@ -46,6 +46,7 @@ pub fn webhook( warp::post() .and(path(endpoint)) .and(header("x-topgg-signature")) + .and(body::content_length_limit(2 * 1024 * 1024)) .and(body::bytes()) .map(move |signature: String, body: Bytes| { str::from_utf8(&body) From 7c5f05011f888d99f4860e6fcbe9c2c1ed71c2ae Mon Sep 17 00:00:00 2001 From: null <60427892+null8626@users.noreply.github.com> Date: Wed, 18 Mar 2026 06:30:20 +0700 Subject: [PATCH 7/9] deps: add timeout handling to actix-web, rocket, and axum --- src/webhooks/actix_web.rs | 24 ++++++++++++++++++++++++ src/webhooks/axum.rs | 22 ++++++++++++++++++---- src/webhooks/rocket.rs | 24 +++++++++++++++--------- 3 files changed, 57 insertions(+), 13 deletions(-) diff --git a/src/webhooks/actix_web.rs b/src/webhooks/actix_web.rs index bdc4649..925fb7d 100644 --- a/src/webhooks/actix_web.rs +++ b/src/webhooks/actix_web.rs @@ -4,6 +4,7 @@ use std::{ future::Future, pin::Pin, task::{Context, Poll, ready}, + time::{Duration, Instant}, }; use actix_web::{ @@ -18,6 +19,7 @@ use log::warn; pub enum IncomingPayloadError { ParseFailure, Unauthorized, + Timeout, } impl Display for IncomingPayloadError { @@ -26,6 +28,8 @@ impl Display for IncomingPayloadError { Self::ParseFailure => "Unable to parse Top.gg webhook payload.", Self::Unauthorized => "Unauthorized.", + + Self::Timeout => "Request timed out.", }) } } @@ -36,6 +40,8 @@ impl ResponseError for IncomingPayloadError { Self::ParseFailure => HttpResponse::NoContent().body(()), Self::Unauthorized => HttpResponse::Unauthorized().body("Unauthorized"), + + Self::Timeout => HttpResponse::RequestTimeout().body("Request timed out"), } } @@ -44,6 +50,8 @@ impl ResponseError for IncomingPayloadError { Self::ParseFailure => StatusCode::NO_CONTENT, Self::Unauthorized => StatusCode::UNAUTHORIZED, + + Self::Timeout => StatusCode::REQUEST_TIMEOUT, } } } @@ -53,13 +61,28 @@ pub struct IncomingPayloadFut { req: HttpRequest, payload: Payload, body: Vec, + start: Instant, +} + +impl IncomingPayloadFut { + fn timed_out(&self) -> bool { + self.start.elapsed() > Duration::from_secs(5) + } } impl Future for IncomingPayloadFut { type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if self.timed_out() { + return Poll::Ready(Err(IncomingPayloadError::Timeout)); + } + while let Some(body) = ready!(Pin::new(&mut self.payload).poll_next(cx)) { + if self.timed_out() { + return Poll::Ready(Err(IncomingPayloadError::Timeout)); + } + match body { Ok(body) => self.body.extend_from_slice(&body), @@ -98,6 +121,7 @@ impl FromRequest for IncomingPayload { req: req.clone(), payload: payload.take(), body: vec![], + start: Instant::now(), } } } diff --git a/src/webhooks/axum.rs b/src/webhooks/axum.rs index b7051c1..7c8008d 100644 --- a/src/webhooks/axum.rs +++ b/src/webhooks/axum.rs @@ -1,14 +1,16 @@ use super::Payload; -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use axum::{ - Router, - extract::State, + BoxError, Router, + error_handling::HandleErrorLayer, + extract::{DefaultBodyLimit, State}, http::{HeaderMap, StatusCode}, response::{IntoResponse, Response}, routing::post, }; use log::warn; +use tower::{ServiceBuilder, timeout::error::Elapsed}; /// An axum webhook listener for listening to payloads. /// @@ -91,6 +93,16 @@ pub fn webhook(state: Arc, secret: String) -> Router where S: Listener, { + let timeout_layer = ServiceBuilder::new() + .layer(HandleErrorLayer::new(|err: BoxError| async move { + if err.is::() { + (StatusCode::REQUEST_TIMEOUT, "Request timed out") + } else { + (StatusCode::INTERNAL_SERVER_ERROR, "Internal Server Error") + } + })) + .timeout(Duration::from_secs(5)); + Router::new() .route( "/", @@ -111,11 +123,13 @@ where (StatusCode::NO_CONTENT, ()).into_response() } } else { - (StatusCode::UNAUTHORIZED, ()).into_response() + (StatusCode::UNAUTHORIZED, "Unauthorized").into_response() } }, ), ) + .layer(timeout_layer.into_inner()) + .layer(DefaultBodyLimit::max(2 * 1024 * 1024)) .with_state(WebhookState { state, secret: Arc::new(secret), diff --git a/src/webhooks/rocket.rs b/src/webhooks/rocket.rs index 88afc28..5db63c9 100644 --- a/src/webhooks/rocket.rs +++ b/src/webhooks/rocket.rs @@ -1,4 +1,5 @@ use super::IncomingPayload; +use std::time::Duration; use log::warn; use rocket::{ @@ -6,6 +7,7 @@ use rocket::{ http::Status, request::Request, }; +use tokio::time::timeout; #[cfg_attr(docsrs, doc(cfg(feature = "rocket")))] #[rocket::async_trait] @@ -19,17 +21,21 @@ impl<'r> FromData<'r> for IncomingPayload { headers.get_one("x-topgg-signature"), headers.get_one("x-topgg-trace"), ) { - if let Ok(body) = data.open(2.mebibytes()).into_bytes().await { - return Self::new(signature, body.into_inner(), trace).map_or_else(|| { - warn!( - "Unable to parse Top.gg webhook payload. Please report this bug to the SDK maintainers." - ); + return match timeout(Duration::from_secs(5), data.open(2.mebibytes()).into_bytes()).await { + Ok(Ok(body)) => { + Self::new(signature, body.into_inner(), trace).map_or_else(|| { + warn!( + "Unable to parse Top.gg webhook payload. Please report this bug to the SDK maintainers." + ); - Outcome::Error((Status::NoContent, ())) - }, Outcome::Success); - } + Outcome::Error((Status::NoContent, ())) + }, Outcome::Success) + }, - return Outcome::Error((Status::BadRequest, ())); + Err(_) => Outcome::Error((Status::RequestTimeout, ())), + + _ => Outcome::Error((Status::BadRequest, ())), + }; } Outcome::Error((Status::Unauthorized, ())) From 9c246a01ceb5e475c21a4f7d6c9bac8661d6f96e Mon Sep 17 00:00:00 2001 From: null <60427892+null8626@users.noreply.github.com> Date: Fri, 20 Mar 2026 22:27:22 +0700 Subject: [PATCH 8/9] feat: add replay attack mitigation --- src/webhooks/actix_web.rs | 5 ++++- src/webhooks/axum.rs | 5 ++++- src/webhooks/mod.rs | 22 ++++++++++++++-------- src/webhooks/payload.rs | 16 ++++++++++++++-- src/webhooks/rocket.rs | 4 +++- src/webhooks/warp.rs | 5 ++++- 6 files changed, 43 insertions(+), 14 deletions(-) diff --git a/src/webhooks/actix_web.rs b/src/webhooks/actix_web.rs index 925fb7d..aa5e070 100644 --- a/src/webhooks/actix_web.rs +++ b/src/webhooks/actix_web.rs @@ -11,6 +11,7 @@ use actix_web::{ FromRequest, HttpRequest, HttpResponse, ResponseError, body::BoxBody, dev::Payload, http::StatusCode, }; +use chrono::{DateTime, Utc}; use futures_core::stream::Stream; use log::warn; @@ -62,6 +63,7 @@ pub struct IncomingPayloadFut { payload: Payload, body: Vec, start: Instant, + now: DateTime, } impl IncomingPayloadFut { @@ -102,7 +104,7 @@ impl Future for IncomingPayloadFut { headers.get("x-topgg-signature"), headers.get("x-topgg-trace"), ) && let (Ok(signature), Ok(trace)) = (signature.to_str(), trace.to_str()) - && let Some(incoming) = IncomingPayload::new(signature, self.body.clone(), trace) + && let Some(incoming) = IncomingPayload::new(&self.now, signature, self.body.clone(), trace) { return Poll::Ready(Ok(incoming)); } @@ -122,6 +124,7 @@ impl FromRequest for IncomingPayload { payload: payload.take(), body: vec![], start: Instant::now(), + now: Utc::now(), } } } diff --git a/src/webhooks/axum.rs b/src/webhooks/axum.rs index 7c8008d..057df74 100644 --- a/src/webhooks/axum.rs +++ b/src/webhooks/axum.rs @@ -9,6 +9,7 @@ use axum::{ response::{IntoResponse, Response}, routing::post, }; +use chrono::Utc; use log::warn; use tower::{ServiceBuilder, timeout::error::Elapsed}; @@ -108,12 +109,14 @@ where "/", post( async |headers: HeaderMap, State(wrapped_state): State>, body: String| { + let now = Utc::now(); + if let Some(signature) = headers.get("x-topgg-signature") && let Ok(signature) = signature.to_str() && let Some(trace) = headers.get("x-topgg-trace") && let Ok(trace) = trace.to_str() { - if let Some(payload) = Payload::new(signature, &body, &wrapped_state.secret) { + if let Some(payload) = Payload::new(&now, signature, &body, &wrapped_state.secret) { wrapped_state.state.callback(payload, trace).await } else { warn!( diff --git a/src/webhooks/mod.rs b/src/webhooks/mod.rs index 124daf9..5a902c4 100644 --- a/src/webhooks/mod.rs +++ b/src/webhooks/mod.rs @@ -28,6 +28,7 @@ cfg_if::cfg_if! { if #[cfg(any(feature = "actix-web", feature = "rocket"))] { use std::collections::HashMap; + use chrono::{DateTime, Utc}; use hmac::{Hmac, Mac}; use sha2::Sha256; @@ -108,22 +109,27 @@ cfg_if::cfg_if! { /// ``` #[cfg_attr(docsrs, doc(cfg(any(feature = "actix-web", feature = "rocket"))))] pub struct IncomingPayload { - t: String, + t: i64, signature: String, body: String, trace: String, } impl IncomingPayload { - pub(super) fn new(signature: &str, body: Vec, trace: &str) -> Option { + pub(super) fn new(now: &DateTime, signature: &str, body: Vec, trace: &str) -> Option { let signature = signature.split(',').filter_map(|p| p.split_once('=')).collect::>(); + let t = signature.get("t")?.to_string().parse::().ok()? * 1000; - Some(Self { - t: signature.get("t")?.to_string(), - signature: signature.get("v1")?.to_string(), - body: String::from_utf8(body).ok()?, - trace: trace.into(), - }) + if (now.timestamp_millis() - (t * 1000)).abs() < 30000 { + None + } else { + Some(Self { + t, + signature: signature.get("v1")?.to_string(), + body: String::from_utf8(body).ok()?, + trace: trace.into(), + }) + } } /// Tries to authenticate a valid secret with this request. diff --git a/src/webhooks/payload.rs b/src/webhooks/payload.rs index 12b16c1..bbb93e1 100644 --- a/src/webhooks/payload.rs +++ b/src/webhooks/payload.rs @@ -72,7 +72,12 @@ pub enum Payload { impl Payload { #[cfg(any(feature = "axum", feature = "warp"))] - pub(super) fn new(signature: &str, body: &str, secret: &str) -> Option { + pub(super) fn new( + now: &DateTime, + signature: &str, + body: &str, + secret: &str, + ) -> Option { use std::collections::HashMap; use hmac::{Hmac, Mac}; @@ -83,10 +88,17 @@ impl Payload { .filter_map(|p| p.split_once('=')) .collect::>(); - let (Some(t), Some(signature)) = (signature.get("t"), signature.get("v1")) else { + let (Some(Ok(t)), Some(signature)) = ( + signature.get("t").map(|t| t.parse::()), + signature.get("v1"), + ) else { return None; }; + if (now.timestamp_millis() - (t * 1000)).abs() < 30000 { + return None; + } + let mut hmac = Hmac::::new_from_slice(secret.as_bytes()).ok()?; hmac.update(format!("{t}.{body}").as_bytes()); diff --git a/src/webhooks/rocket.rs b/src/webhooks/rocket.rs index 5db63c9..a9454aa 100644 --- a/src/webhooks/rocket.rs +++ b/src/webhooks/rocket.rs @@ -1,6 +1,7 @@ use super::IncomingPayload; use std::time::Duration; +use chrono::Utc; use log::warn; use rocket::{ data::{Data, FromData, Outcome, ToByteUnit}, @@ -15,6 +16,7 @@ impl<'r> FromData<'r> for IncomingPayload { type Error = (); async fn from_data(request: &'r Request<'_>, data: Data<'r>) -> Outcome<'r, Self> { + let now = Utc::now(); let headers = request.headers(); if let (Some(signature), Some(trace)) = ( @@ -23,7 +25,7 @@ impl<'r> FromData<'r> for IncomingPayload { ) { return match timeout(Duration::from_secs(5), data.open(2.mebibytes()).into_bytes()).await { Ok(Ok(body)) => { - Self::new(signature, body.into_inner(), trace).map_or_else(|| { + Self::new(&now, signature, body.into_inner(), trace).map_or_else(|| { warn!( "Unable to parse Top.gg webhook payload. Please report this bug to the SDK maintainers." ); diff --git a/src/webhooks/warp.rs b/src/webhooks/warp.rs index c95f234..301de43 100644 --- a/src/webhooks/warp.rs +++ b/src/webhooks/warp.rs @@ -1,6 +1,7 @@ use super::Payload; use bytes::Bytes; +use chrono::Utc; use warp::{Filter, Rejection, body, header, path}; /// Creates a new warp [`Filter`] for receiving webhook events. @@ -49,9 +50,11 @@ pub fn webhook( .and(body::content_length_limit(2 * 1024 * 1024)) .and(body::bytes()) .map(move |signature: String, body: Bytes| { + let now = Utc::now(); + str::from_utf8(&body) .ok() - .and_then(|body| Payload::new(&signature, body, &secret)) + .and_then(|body| Payload::new(&now, &signature, body, &secret)) }) .and(header("x-topgg-trace")) } From bc349d098707bc67ea1b39a338a2dbaf0c8bb750 Mon Sep 17 00:00:00 2001 From: null <60427892+null8626@users.noreply.github.com> Date: Sat, 21 Mar 2026 19:43:53 +0700 Subject: [PATCH 9/9] [refactor,feat]: overhaul webhooks to reduce code repetition --- src/webhooks/actix_web.rs | 39 +++----- src/webhooks/axum.rs | 39 +++++--- src/webhooks/mod.rs | 135 +------------------------ src/webhooks/payload.rs | 204 ++++++++++++++++++++++++++++++++++---- src/webhooks/rocket.rs | 30 +++--- src/webhooks/warp.rs | 45 +++++---- 6 files changed, 267 insertions(+), 225 deletions(-) diff --git a/src/webhooks/actix_web.rs b/src/webhooks/actix_web.rs index aa5e070..ce3209f 100644 --- a/src/webhooks/actix_web.rs +++ b/src/webhooks/actix_web.rs @@ -13,22 +13,18 @@ use actix_web::{ }; use chrono::{DateTime, Utc}; use futures_core::stream::Stream; -use log::warn; #[doc(hidden)] #[derive(Debug)] pub enum IncomingPayloadError { - ParseFailure, - Unauthorized, + BadRequest, Timeout, } impl Display for IncomingPayloadError { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { f.write_str(match self { - Self::ParseFailure => "Unable to parse Top.gg webhook payload.", - - Self::Unauthorized => "Unauthorized.", + Self::BadRequest => "Bad Request.", Self::Timeout => "Request timed out.", }) @@ -38,9 +34,7 @@ impl Display for IncomingPayloadError { impl ResponseError for IncomingPayloadError { fn error_response(&self) -> HttpResponse { match self { - Self::ParseFailure => HttpResponse::NoContent().body(()), - - Self::Unauthorized => HttpResponse::Unauthorized().body("Unauthorized"), + Self::BadRequest => HttpResponse::BadRequest().body("Bad Request"), Self::Timeout => HttpResponse::RequestTimeout().body("Request timed out"), } @@ -48,9 +42,7 @@ impl ResponseError for IncomingPayloadError { fn status_code(&self) -> StatusCode { match self { - Self::ParseFailure => StatusCode::NO_CONTENT, - - Self::Unauthorized => StatusCode::UNAUTHORIZED, + Self::BadRequest => StatusCode::BAD_REQUEST, Self::Timeout => StatusCode::REQUEST_TIMEOUT, } @@ -85,16 +77,10 @@ impl Future for IncomingPayloadFut { return Poll::Ready(Err(IncomingPayloadError::Timeout)); } - match body { - Ok(body) => self.body.extend_from_slice(&body), - - Err(err) => { - warn!( - "Unable to parse Top.gg webhook payload. Please report this bug to the SDK maintainers: {err:?}" - ); - - return Poll::Ready(Err(IncomingPayloadError::ParseFailure)); - } + if let Ok(body) = body { + self.body.extend_from_slice(&body); + } else { + return Poll::Ready(Err(IncomingPayloadError::BadRequest)); } } @@ -103,13 +89,16 @@ impl Future for IncomingPayloadFut { if let (Some(signature), Some(trace)) = ( headers.get("x-topgg-signature"), headers.get("x-topgg-trace"), - ) && let (Ok(signature), Ok(trace)) = (signature.to_str(), trace.to_str()) - && let Some(incoming) = IncomingPayload::new(&self.now, signature, self.body.clone(), trace) + ) && let (Ok(signature), Ok(trace), Ok(body)) = ( + signature.to_str(), + trace.to_str(), + str::from_utf8(&self.body), + ) && let Some(incoming) = IncomingPayload::new(self.now, body.into(), signature, trace) { return Poll::Ready(Ok(incoming)); } - Poll::Ready(Err(IncomingPayloadError::Unauthorized)) + Poll::Ready(Err(IncomingPayloadError::BadRequest)) } } diff --git a/src/webhooks/axum.rs b/src/webhooks/axum.rs index 057df74..5ea8a13 100644 --- a/src/webhooks/axum.rs +++ b/src/webhooks/axum.rs @@ -1,4 +1,4 @@ -use super::Payload; +use super::{Payload, PayloadResult}; use std::{sync::Arc, time::Duration}; use axum::{ @@ -10,7 +10,6 @@ use axum::{ routing::post, }; use chrono::Utc; -use log::warn; use tower::{ServiceBuilder, timeout::error::Elapsed}; /// An axum webhook listener for listening to payloads. @@ -57,7 +56,12 @@ impl Clone for WebhookState { /// use topgg::Payload; /// use std::sync::Arc; /// -/// use axum::{http::status::StatusCode, response::{IntoResponse, Response}, routing::get, Router}; +/// use axum::{ +/// Router, +/// http::status::StatusCode, +/// response::{IntoResponse, Response}, +/// routing::get, +/// }; /// use tokio::net::TcpListener; /// /// struct MyTopggListener {} @@ -79,9 +83,10 @@ impl Clone for WebhookState { /// async fn main() { /// let state = Arc::new(MyTopggListener {}); /// +/// // POST /webhook /// let router = Router::new().route("/", get(index)).nest( /// "/webhook", -/// topgg::axum::webhook(Arc::clone(&state), env!("TOPGG_WEBHOOK_SECRET").to_string()), +/// topgg::axum::webhook(Arc::clone(&state), env!("TOPGG_WEBHOOK_SECRET").into()), /// ); /// /// let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap(); @@ -116,17 +121,27 @@ where && let Some(trace) = headers.get("x-topgg-trace") && let Ok(trace) = trace.to_str() { - if let Some(payload) = Payload::new(&now, signature, &body, &wrapped_state.secret) { - wrapped_state.state.callback(payload, trace).await - } else { - warn!( - "Unable to parse Top.gg webhook payload. Please report this bug to the SDK maintainers.\n--- BEGIN BODY DUMP ---\n{body}\n--- END BODY DUMP ---" - ); + match Payload::new(now, body, signature, &wrapped_state.secret) { + PayloadResult::Accepted(payload) => { + wrapped_state.state.callback(payload, trace).await + } - (StatusCode::NO_CONTENT, ()).into_response() + PayloadResult::Forbidden => (StatusCode::FORBIDDEN, "Forbidden").into_response(), + + PayloadResult::BadRequest => (StatusCode::BAD_REQUEST, "Bad Request").into_response(), + + PayloadResult::Unauthorized => { + (StatusCode::UNAUTHORIZED, "Unauthorized").into_response() + } + + PayloadResult::DeserializationFailure => (StatusCode::NO_CONTENT, "").into_response(), + + PayloadResult::InternalServerError => { + (StatusCode::INTERNAL_SERVER_ERROR, "Internal Server Error").into_response() + } } } else { - (StatusCode::UNAUTHORIZED, "Unauthorized").into_response() + (StatusCode::BAD_REQUEST, "Bad Request").into_response() } }, ), diff --git a/src/webhooks/mod.rs b/src/webhooks/mod.rs index 5a902c4..d358965 100644 --- a/src/webhooks/mod.rs +++ b/src/webhooks/mod.rs @@ -1,6 +1,6 @@ mod payload; -pub use payload::Payload; +pub use payload::{IncomingPayload, Payload, PayloadResult}; #[cfg(feature = "actix-web")] mod actix_web; @@ -23,136 +23,3 @@ cfg_if::cfg_if! { pub mod warp; } } - -cfg_if::cfg_if! { - if #[cfg(any(feature = "actix-web", feature = "rocket"))] { - use std::collections::HashMap; - - use chrono::{DateTime, Utc}; - use hmac::{Hmac, Mac}; - use sha2::Sha256; - - /// An incoming [`Payload`] that is yet to be [authenticated with a secret][IncomingPayload::authenticate]. - /// - /// # Examples - /// - /// With actix-web: - /// - /// ```rust,no_run - /// use topgg::IncomingPayload; - /// use std::io; - /// - /// use actix_web::{ - /// error::{Error, ErrorUnauthorized}, - /// get, post, App, HttpServer, - /// }; - /// - /// #[get("/")] - /// async fn index() -> &'static str { - /// "Hello, World!" - /// } - /// - /// #[post("/webhook")] - /// async fn webhook(payload: IncomingPayload) -> Result<&'static str, Error> { - /// match payload.authenticate(env!("TOPGG_WEBHOOK_SECRET")) { - /// Some(payload) => { - /// println!("{payload:?}"); - /// - /// Ok("ok") - /// } - /// - /// _ => Err(ErrorUnauthorized("401")), - /// } - /// } - /// - /// #[actix_web::main] - /// async fn main() -> io::Result<()> { - /// HttpServer::new(|| App::new().service(index).service(webhook)) - /// .bind("127.0.0.1:8080")? - /// .run() - /// .await - /// } - /// ``` - /// - /// With rocket: - /// - /// ```rust,no_run - /// use topgg::IncomingPayload; - /// - /// use rocket::{get, http::Status, launch, post, routes, Build, Rocket}; - /// - /// #[get("/")] - /// fn index() -> &'static str { - /// "Hello, World!" - /// } - /// - /// #[post("/webhook", data = "")] - /// fn webhook(payload: IncomingPayload) -> Status { - /// match payload.authenticate(env!("TOPGG_WEBHOOK_SECRET")) { - /// Some(payload) => { - /// println!("{payload:?}"); - /// - /// Status::Ok - /// }, - /// _ => { - /// println!("found an unauthorized attacker."); - /// - /// Status::Unauthorized - /// } - /// } - /// } - /// - /// #[launch] - /// fn rocket() -> Rocket { - /// rocket::build().mount("/", routes![index, webhook]) - /// } - /// ``` - #[cfg_attr(docsrs, doc(cfg(any(feature = "actix-web", feature = "rocket"))))] - pub struct IncomingPayload { - t: i64, - signature: String, - body: String, - trace: String, - } - - impl IncomingPayload { - pub(super) fn new(now: &DateTime, signature: &str, body: Vec, trace: &str) -> Option { - let signature = signature.split(',').filter_map(|p| p.split_once('=')).collect::>(); - let t = signature.get("t")?.to_string().parse::().ok()? * 1000; - - if (now.timestamp_millis() - (t * 1000)).abs() < 30000 { - None - } else { - Some(Self { - t, - signature: signature.get("v1")?.to_string(), - body: String::from_utf8(body).ok()?, - trace: trace.into(), - }) - } - } - - /// Tries to authenticate a valid secret with this request. - #[must_use] - pub fn authenticate(&self, secret: &str) -> Option { - let mut hmac = Hmac::::new_from_slice(secret.as_bytes()).ok()?; - - hmac.update(format!("{}.{}", self.t, self.body).as_bytes()); - - let digest = hex::encode(hmac.finalize().into_bytes()); - - if digest == self.signature && let Ok(payload) = serde_json::from_str(&self.body) { - Some(payload) - } else { - None - } - } - - /// Retrieves the payload's `x-topgg-trace` header for debugging and correlating requests with Top.gg support. - #[must_use] - pub fn get_trace(&self) -> &str { - &self.trace - } - } - } -} diff --git a/src/webhooks/payload.rs b/src/webhooks/payload.rs index bbb93e1..05dcfec 100644 --- a/src/webhooks/payload.rs +++ b/src/webhooks/payload.rs @@ -1,7 +1,11 @@ use super::super::{PartialProject, User, snowflake}; +use std::collections::HashMap; use chrono::{DateTime, Utc}; +use hmac::{Hmac, Mac}; +use log::warn; use serde::Deserialize; +use sha2::Sha256; /// A webhook payload. #[non_exhaustive] @@ -71,46 +75,202 @@ pub enum Payload { } impl Payload { + #[allow(clippy::new_ret_no_self)] #[cfg(any(feature = "axum", feature = "warp"))] pub(super) fn new( - now: &DateTime, + now: DateTime, + body: String, signature: &str, - body: &str, secret: &str, - ) -> Option { - use std::collections::HashMap; + ) -> PayloadResult { + IncomingPayload::new(now, body, signature, "").map_or(PayloadResult::BadRequest, |incoming| { + incoming.authenticate(secret) + }) + } +} + +/// A processed [`Payload`]. +#[cfg_attr(docsrs, doc(cfg(feature = "webhooks")))] +pub enum PayloadResult { + /// The payload has been successfully authenticated. + Accepted(Payload), + + /// The timestamp is outside of the accepted time window, possibly being a part of a replay attack. + Forbidden, + + /// The request's headers are missing or invalid. + BadRequest, - use hmac::{Hmac, Mac}; - use sha2::Sha256; + /// The request's signature cannot be authenticated with the correct webhook secret. + Unauthorized, + + /// Unable deserialize payload. This could possibly be a bug with the SDK. + /// + /// It's recommended to return a 200 and 204 status code and report this to the SDK's maintainers when this happens. + DeserializationFailure, + + /// Unable to create a SHA-256 HMAC instance from the specified webhook secret. + InternalServerError, +} + +/// An incoming [`Payload`] that is yet to be [authenticated with a secret][IncomingPayload::authenticate]. +/// +/// # Examples +/// +/// With actix-web: +/// +/// ```rust,no_run +/// use topgg::{IncomingPayload, PayloadResult}; +/// use std::io; +/// +/// use actix_web::{ +/// App, HttpServer, +/// error::{Error, ErrorBadRequest, ErrorForbidden, ErrorInternalServerError, ErrorUnauthorized}, +/// get, post, +/// }; +/// +/// #[get("/")] +/// async fn index() -> &'static str { +/// "Hello, World!" +/// } +/// +/// // POST /webhook +/// #[post("/webhook")] +/// async fn webhook(payload: IncomingPayload) -> Result<&'static str, Error> { +/// match payload.authenticate(env!("TOPGG_WEBHOOK_SECRET")) { +/// PayloadResult::Accepted(payload) => { +/// println!("{payload:?}"); +/// +/// Ok("ok") +/// } +/// +/// PayloadResult::Forbidden => Err(ErrorForbidden("Forbidden")), +/// +/// PayloadResult::BadRequest => Err(ErrorBadRequest("Bad Request")), +/// +/// PayloadResult::Unauthorized => Err(ErrorUnauthorized("Unauthorized")), +/// +/// PayloadResult::DeserializationFailure => Ok(""), +/// +/// PayloadResult::InternalServerError => Err(ErrorInternalServerError("Internal Server Error")), +/// } +/// } +/// +/// #[actix_web::main] +/// async fn main() -> io::Result<()> { +/// HttpServer::new(|| App::new().service(index).service(webhook)) +/// .bind("127.0.0.1:8080")? +/// .run() +/// .await +/// } +/// ``` +/// +/// With rocket: +/// +/// ```rust,no_run +/// use topgg::{IncomingPayload, PayloadResult}; +/// +/// use rocket::{Build, Rocket, get, http::Status, launch, post, routes}; +/// +/// #[get("/")] +/// fn index() -> &'static str { +/// "Hello, World!" +/// } +/// +/// // POST /webhook +/// #[post("/webhook", data = "")] +/// fn webhook(payload: IncomingPayload) -> Status { +/// match payload.authenticate(env!("TOPGG_WEBHOOK_SECRET")) { +/// PayloadResult::Accepted(payload) => { +/// println!("{payload:?}"); +/// +/// Status::NoContent +/// } +/// +/// PayloadResult::Forbidden => Status::Forbidden, +/// +/// PayloadResult::BadRequest => Status::BadRequest, +/// +/// PayloadResult::Unauthorized => Status::Unauthorized, +/// +/// PayloadResult::DeserializationFailure => Status::NoContent, +/// +/// PayloadResult::InternalServerError => Status::InternalServerError, +/// } +/// } +/// +/// #[launch] +/// fn rocket() -> Rocket { +/// rocket::build().mount("/", routes![index, webhook]) +/// } +/// ``` +#[cfg_attr(docsrs, doc(cfg(feature = "webhooks")))] +pub struct IncomingPayload { + timestamp: i64, + now: i64, + signature: Vec, + body: String, + trace: String, +} +impl IncomingPayload { + /// Tries to create a new incoming payload from the current timestamp, a request body, an `x-topgg-signature` header, and an `x-topgg-trace` header. Returns [`None`] if the header values cannot be parsed. + #[must_use] + pub fn new(now: DateTime, body: String, signature: &str, trace: &str) -> Option { let signature = signature .split(',') .filter_map(|p| p.split_once('=')) .collect::>(); - let (Some(Ok(t)), Some(signature)) = ( - signature.get("t").map(|t| t.parse::()), - signature.get("v1"), - ) else { - return None; - }; + if let (Some(timestamp), Some(signature)) = (signature.get("t"), signature.get("v1")) + && let (Ok(timestamp), Ok(signature)) = (timestamp.parse(), hex::decode(signature)) + { + Some(Self { + timestamp, + now: now.timestamp_millis(), + signature, + body, + trace: trace.into(), + }) + } else { + None + } + } - if (now.timestamp_millis() - (t * 1000)).abs() < 30000 { - return None; + /// Tries to authenticate a valid secret with this request. + #[must_use] + pub fn authenticate(&self, secret: &str) -> PayloadResult { + if (self.now - (self.timestamp * 1000)).abs() > 30000 { + return PayloadResult::Forbidden; } - let mut hmac = Hmac::::new_from_slice(secret.as_bytes()).ok()?; + let Ok(mut hmac) = Hmac::::new_from_slice(secret.as_bytes()) else { + warn!( + "Unable to create a SHA-256 HMAC instance from the specified webhook secret. Dismissing payload request." + ); - hmac.update(format!("{t}.{body}").as_bytes()); + return PayloadResult::InternalServerError; + }; - let digest = hex::encode(hmac.finalize().into_bytes()); + hmac.update(format!("{}.{}", self.timestamp, self.body).as_bytes()); - if &digest == signature - && let Ok(payload) = serde_json::from_str(body) - { - Some(payload) + if hmac.verify_slice(&self.signature).is_ok() { + serde_json::from_str(&self.body).map_or_else(|_| { + warn!( + "Unable to parse Top.gg webhook payload. Please report this bug to the SDK maintainers.\n--- BEGIN BODY DUMP ---\n{}\n--- END BODY DUMP ---", + self.body + ); + + PayloadResult::DeserializationFailure + }, PayloadResult::Accepted) } else { - None + PayloadResult::Unauthorized } } + + /// Retrieves the payload's `x-topgg-trace` header for debugging and correlating requests with Top.gg support. + #[must_use] + pub fn get_trace(&self) -> &str { + &self.trace + } } diff --git a/src/webhooks/rocket.rs b/src/webhooks/rocket.rs index a9454aa..38ecc6a 100644 --- a/src/webhooks/rocket.rs +++ b/src/webhooks/rocket.rs @@ -2,7 +2,6 @@ use super::IncomingPayload; use std::time::Duration; use chrono::Utc; -use log::warn; use rocket::{ data::{Data, FromData, Outcome, ToByteUnit}, http::Status, @@ -23,23 +22,26 @@ impl<'r> FromData<'r> for IncomingPayload { headers.get_one("x-topgg-signature"), headers.get_one("x-topgg-trace"), ) { - return match timeout(Duration::from_secs(5), data.open(2.mebibytes()).into_bytes()).await { + match timeout( + Duration::from_secs(5), + data.open(2.mebibytes()).into_bytes(), + ) + .await + { Ok(Ok(body)) => { - Self::new(&now, signature, body.into_inner(), trace).map_or_else(|| { - warn!( - "Unable to parse Top.gg webhook payload. Please report this bug to the SDK maintainers." - ); + if let Ok(body) = String::from_utf8(body.into_inner()) + && let Some(payload) = Self::new(now, body, signature, trace) + { + return Outcome::Success(payload); + } + } - Outcome::Error((Status::NoContent, ())) - }, Outcome::Success) - }, + Err(_) => return Outcome::Error((Status::RequestTimeout, ())), - Err(_) => Outcome::Error((Status::RequestTimeout, ())), - - _ => Outcome::Error((Status::BadRequest, ())), - }; + _ => {} + } } - Outcome::Error((Status::Unauthorized, ())) + Outcome::Error((Status::BadRequest, ())) } } diff --git a/src/webhooks/warp.rs b/src/webhooks/warp.rs index 301de43..6c1a763 100644 --- a/src/webhooks/warp.rs +++ b/src/webhooks/warp.rs @@ -1,4 +1,4 @@ -use super::Payload; +use super::{Payload, PayloadResult}; use bytes::Bytes; use chrono::Utc; @@ -9,27 +9,36 @@ use warp::{Filter, Rejection, body, header, path}; /// # Example /// /// ```rust,no_run +/// use topgg::PayloadResult; /// use std::net::SocketAddr; /// -/// use warp::{http::StatusCode, reply, Filter}; +/// use warp::{Filter, http::StatusCode, reply}; /// /// #[tokio::main] /// async fn main() { /// // POST /webhook -/// let webhook = topgg::warp::webhook( -/// "webhook", -/// env!("TOPGG_WEBHOOK_SECRET").to_string() -/// ).then(|payload, _trace| async move { -/// match payload { -/// Some(payload) => { -/// println!("{payload:?}"); +/// let webhook = +/// topgg::warp::webhook("webhook", env!("TOPGG_WEBHOOK_SECRET").into()).then(|payload, _trace| async move { +/// match payload { +/// PayloadResult::Accepted(payload) => { +/// println!("{payload:?}"); /// -/// reply::with_status("", StatusCode::NO_CONTENT) -/// }, +/// reply::with_status("", StatusCode::NO_CONTENT) +/// } /// -/// None => reply::with_status("Unauthorized", StatusCode::UNAUTHORIZED) -/// } -/// }); +/// PayloadResult::Forbidden => reply::with_status("Forbidden", StatusCode::FORBIDDEN), +/// +/// PayloadResult::BadRequest => reply::with_status("Bad Request", StatusCode::BAD_REQUEST), +/// +/// PayloadResult::Unauthorized => reply::with_status("Unauthorized", StatusCode::UNAUTHORIZED), +/// +/// PayloadResult::DeserializationFailure => reply::with_status("", StatusCode::NO_CONTENT), +/// +/// PayloadResult::InternalServerError => { +/// reply::with_status("Internal Server Error", StatusCode::INTERNAL_SERVER_ERROR) +/// } +/// } +/// }); /// /// let routes = warp::get().map(|| "Hello, World!").or(webhook); /// @@ -43,7 +52,7 @@ use warp::{Filter, Rejection, body, header, path}; pub fn webhook( endpoint: &'static str, secret: String, -) -> impl Filter, String), Error = Rejection> + Clone { +) -> impl Filter + Clone { warp::post() .and(path(endpoint)) .and(header("x-topgg-signature")) @@ -52,9 +61,9 @@ pub fn webhook( .map(move |signature: String, body: Bytes| { let now = Utc::now(); - str::from_utf8(&body) - .ok() - .and_then(|body| Payload::new(&now, &signature, body, &secret)) + String::from_utf8(body.to_vec()).map_or(PayloadResult::BadRequest, |body| { + Payload::new(now, body, &signature, &secret) + }) }) .and(header("x-topgg-trace")) }