Skip to content
This repository was archived by the owner on Jun 1, 2026. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Settings.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,10 @@ scan:
public: ""
secret: ""

support:
url: "https://support.gemwallet.com"
website_token: "GkTYRSL8msUHt3qcZzDxoDNJ"

parser:
timeout: 1s
shutdown:
Expand Down
1 change: 1 addition & 0 deletions apps/api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ reqwest = { workspace = true }

gem_tracing = { path = "../../crates/tracing" }
storage = { path = "../../crates/storage" }
support = { path = "../../crates/support" }
pricer = { path = "../../crates/pricer" }
prices = { path = "../../crates/prices" }
fiat = { path = "../../crates/fiat" }
Expand Down
9 changes: 9 additions & 0 deletions apps/api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ mod prices;
mod referral;
mod responders;
mod status;
mod support;
mod swap;
mod webhooks;
mod websocket;
Expand Down Expand Up @@ -51,6 +52,7 @@ use settings::Settings;
use settings_chain::{ChainProviders, ProviderFactory};
use storage::Database;
use streamer::{StreamProducer, StreamProducerConfig};
use support::SupportApiClient;
use swap::SwapClient;
use swapper::okx::{OkxClientConfig, OkxProvider};
use swapper::swapper::GemSwapper;
Expand Down Expand Up @@ -98,6 +100,11 @@ fn mount_routes(rocket: Rocket<Build>, admin_enabled: bool) -> Rocket<Build> {
chain::transaction::get_transaction,
chain::transaction::get_transaction_status,
referral::get_rewards_leaderboard,
support::get_support_conversation,
support::get_support_messages,
support::post_support_message,
support::post_support_typing,
support::post_support_last_seen,
swap::post_near_intents_quote,
swap::okx::post_okx_quote,
swap::okx::post_okx_quote_data,
Expand Down Expand Up @@ -237,6 +244,7 @@ async fn rocket_api(settings: Settings) -> Result<Rocket<Build>, Box<dyn std::er
let rewards_client = RewardsClient::new(database.clone(), stream_producer.clone(), ip_security_client, pusher_client.clone());
let redemption_client = RewardsRedemptionClient::new(database.clone(), stream_producer.clone());
let notifications_client = NotificationsClient::new(database.clone());
let support_client = SupportApiClient::new(settings.support.url.clone(), settings.support.website_token.clone(), cacher_client.clone());
let near_intents_client = swap::NearIntentsProxyClient::new(cacher_client.clone());
let okx_provider = OkxProvider::new(
OkxClientConfig {
Expand Down Expand Up @@ -279,6 +287,7 @@ async fn rocket_api(settings: Settings) -> Result<Rocket<Build>, Box<dyn std::er
.manage(Mutex::new(redemption_client))
.manage(Mutex::new(wallets_client))
.manage(Mutex::new(notifications_client))
.manage(Mutex::new(support_client))
.manage(Mutex::new(near_intents_client))
.manage(okx_provider)
.manage(Mutex::new(portfolio_client))
Expand Down
121 changes: 121 additions & 0 deletions apps/api/src/support.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
use cacher::{CacheKey, CacherClient};
use primitives::{SupportConversation, SupportMessage, SupportMessageInput, SupportTyping};
use rocket::{State, get, post, serde::json::Json, tokio::sync::Mutex};
use std::{error::Error, future::Future};
use storage::models::DeviceRow;
use support::{ChatwootClient, ChatwootError, ChatwootSession};

use crate::{
devices::guard::AuthenticatedDevice,
responders::{ApiError, ApiResponse},
};

pub struct SupportApiClient {
chatwoot: ChatwootClient,
cacher: CacherClient,
}

impl SupportApiClient {
pub fn new(url: String, website_token: String, cacher: CacherClient) -> Self {
Self {
chatwoot: ChatwootClient::new(url, website_token),
cacher,
}
}

pub async fn conversation(&self, device: &DeviceRow) -> Result<Option<SupportConversation>, Box<dyn Error + Send + Sync>> {
self.with_session(device, |session| async move { self.chatwoot.conversation(&session).await }).await
}

pub async fn messages(&self, device: &DeviceRow, before: Option<i64>, after: Option<i64>) -> Result<Vec<SupportMessage>, Box<dyn Error + Send + Sync>> {
self.with_session(device, |session| async move { self.chatwoot.messages(&session, before, after).await })
.await
}

pub async fn send_message(&self, device: &DeviceRow, input: SupportMessageInput) -> Result<SupportMessage, Box<dyn Error + Send + Sync>> {
let content = input.content;
self.with_session(device, |session| {
let content = content.clone();
async move { self.chatwoot.send_message(&session, content).await }
})
.await
}

pub async fn set_typing(&self, device: &DeviceRow, typing: SupportTyping) -> Result<bool, Box<dyn Error + Send + Sync>> {
let status = typing.status;
self.with_session(device, |session| {
let status = status.clone();
async move { self.chatwoot.set_typing(&session, status).await }
})
.await
}

pub async fn update_last_seen(&self, device: &DeviceRow) -> Result<bool, Box<dyn Error + Send + Sync>> {
self.with_session(device, |session| async move { self.chatwoot.update_last_seen(&session).await }).await
}

async fn with_session<T, F, Fut>(&self, device: &DeviceRow, call: F) -> Result<T, Box<dyn Error + Send + Sync>>
where
F: Fn(ChatwootSession) -> Fut,
Fut: Future<Output = Result<T, ChatwootError>>,
{
let session = self.chatwoot_session(device).await?;
match call(session).await {
Ok(value) => Ok(value),
Err(error) if error.is_session_error() => {
let session = self.refresh_session(device).await?;
Ok(call(session).await?)
}
Err(error) => Err(Box::new(error)),
}
}

async fn chatwoot_session(&self, device: &DeviceRow) -> Result<ChatwootSession, Box<dyn Error + Send + Sync>> {
let cache_key = CacheKey::SupportDeviceSession(&device.device_id);
if let Some(session) = self.cacher.get_cached_optional(cache_key).await? {
return Ok(session);
}
self.refresh_session(device).await
}

async fn refresh_session(&self, device: &DeviceRow) -> Result<ChatwootSession, Box<dyn Error + Send + Sync>> {
let cache_key = CacheKey::SupportDeviceSession(&device.device_id);
let session = self.chatwoot.create_session(&device.as_primitive()).await?;
self.cacher.set_cached(cache_key, &session).await?;
Ok(session)
}
}

#[get("/support")]
pub async fn get_support_conversation(device: AuthenticatedDevice, client: &State<Mutex<SupportApiClient>>) -> Result<ApiResponse<Option<SupportConversation>>, ApiError> {
Ok(client.lock().await.conversation(&device.device_row).await?.into())
}

#[get("/support/messages?<before>&<after>")]
pub async fn get_support_messages(
device: AuthenticatedDevice,
before: Option<i64>,
after: Option<i64>,
client: &State<Mutex<SupportApiClient>>,
) -> Result<ApiResponse<Vec<SupportMessage>>, ApiError> {
Ok(client.lock().await.messages(&device.device_row, before, after).await?.into())
}

#[post("/support/messages", format = "json", data = "<input>")]
pub async fn post_support_message(
device: AuthenticatedDevice,
input: Json<SupportMessageInput>,
client: &State<Mutex<SupportApiClient>>,
) -> Result<ApiResponse<SupportMessage>, ApiError> {
Ok(client.lock().await.send_message(&device.device_row, input.into_inner()).await?.into())
}

#[post("/support/typing", format = "json", data = "<typing>")]
pub async fn post_support_typing(device: AuthenticatedDevice, typing: Json<SupportTyping>, client: &State<Mutex<SupportApiClient>>) -> Result<ApiResponse<bool>, ApiError> {
Ok(client.lock().await.set_typing(&device.device_row, typing.into_inner()).await?.into())
}

#[post("/support/last_seen")]
pub async fn post_support_last_seen(device: AuthenticatedDevice, client: &State<Mutex<SupportApiClient>>) -> Result<ApiResponse<bool>, ApiError> {
Ok(client.lock().await.update_last_seen(&device.device_row).await?.into())
}
4 changes: 3 additions & 1 deletion apps/daemon/src/consumers/support/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod support_webhook_consumer;
use std::error::Error;
use std::sync::Arc;

use cacher::CacherClient;
use settings::Settings;
use storage::Database;
use streamer::{ConsumerStatusReporter, QueueName, ShutdownReceiver, StreamProducer, StreamProducerConfig, SupportWebhookPayload, run_consumer};
Expand All @@ -18,8 +19,9 @@ pub async fn run_consumer_support(settings: Settings, shutdown_rx: ShutdownRecei
let retry = streamer::Retry::new(settings.rabbitmq.retry.delay, settings.rabbitmq.retry.timeout);
let rabbitmq_config = StreamProducerConfig::new(settings.rabbitmq.url.clone(), retry);
let stream_producer = StreamProducer::new(&rabbitmq_config, "daemon_support_producer", shutdown_rx.clone()).await?;
let cacher = CacherClient::new(&settings.redis.url).await;

let support_client = SupportClient::new(database, stream_producer);
let support_client = SupportClient::new(database, stream_producer, cacher);
let consumer = SupportWebhookConsumer::new(support_client);

let queue = QueueName::SupportWebhooks;
Expand Down
21 changes: 15 additions & 6 deletions apps/daemon/src/consumers/support/support_webhook_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use streamer::SupportWebhookPayload;
use streamer::consumer::MessageConsumer;

use primitives::Device;
use support::{ChatwootWebhookPayload, EVENT_CONVERSATION_STATUS_CHANGED, EVENT_CONVERSATION_UPDATED, EVENT_MESSAGE_CREATED, SupportClient};
use support::{ChatwootWebhookPayload, EVENT_CONVERSATION_STATUS_CHANGED, EVENT_CONVERSATION_UPDATED, EVENT_MESSAGE_CREATED, SupportClient, SupportProcessResult};

pub struct SupportWebhookConsumer {
support_client: SupportClient,
Expand All @@ -17,11 +17,14 @@ impl SupportWebhookConsumer {
Self { support_client }
}

async fn process_notification(&self, device: &Device, webhook: &ChatwootWebhookPayload) -> Result<usize, Box<dyn Error + Send + Sync>> {
async fn process_notification(&self, device: &Device, webhook: &ChatwootWebhookPayload) -> Result<SupportProcessResult, Box<dyn Error + Send + Sync>> {
match webhook.event.as_str() {
EVENT_MESSAGE_CREATED => self.support_client.handle_message_created(device, webhook).await,
EVENT_CONVERSATION_UPDATED | EVENT_CONVERSATION_STATUS_CHANGED => self.support_client.handle_conversation_updated(webhook).map(|_| 0),
_ => Ok(0),
EVENT_CONVERSATION_UPDATED | EVENT_CONVERSATION_STATUS_CHANGED => self.support_client.handle_conversation_updated(device, webhook).await,
_ => Ok(SupportProcessResult {
notifications: 0,
stream_events: 0,
}),
}
}
}
Expand Down Expand Up @@ -52,8 +55,14 @@ impl MessageConsumer<SupportWebhookPayload, bool> for SupportWebhookConsumer {
};

match self.process_notification(&device, &webhook).await {
Ok(notifications) => {
info_with_fields!("support webhook processed", device_id = device_id, event = webhook.event, notifications = notifications);
Ok(result) => {
info_with_fields!(
"support webhook processed",
device_id = device_id,
event = webhook.event,
notifications = result.notifications,
stream_events = result.stream_events
);
Ok(true)
}
Err(error) => {
Expand Down
3 changes: 3 additions & 0 deletions crates/cacher/src/keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub enum CacheKey<'a> {

// Device keys
InactiveDeviceObserver(&'a str),
SupportDeviceSession(&'a str),

// Fetch consumer keys (chain, address)
FetchCoinAddresses(&'a str, &'a str),
Expand Down Expand Up @@ -79,6 +80,7 @@ impl CacheKey<'_> {
Self::UsernameCreationGlobalDaily => "username:global:daily".to_string(),
Self::UsernameCreationPerCountryDaily(country) => format!("username:country:daily:{}", country),
Self::InactiveDeviceObserver(device_id) => format!("device:inactive_observer:{}", device_id),
Self::SupportDeviceSession(device_id) => format!("support:device_session:{}", device_id),
Self::FetchCoinAddresses(chain, address) => format!("fetch:coin_addresses:{}:{}", chain, address),
Self::FetchTokenAddresses(chain, address) => format!("fetch:token_addresses:{}:{}", chain, address),
Self::FetchNftAssetsAddresses(chain, address) => format!("fetch:nft_assets_addresses:{}:{}", chain, address),
Expand Down Expand Up @@ -117,6 +119,7 @@ impl CacheKey<'_> {
Self::UsernameCreationGlobalDaily => SECONDS_PER_DAY,
Self::UsernameCreationPerCountryDaily(_) => SECONDS_PER_DAY,
Self::InactiveDeviceObserver(_) => 30 * SECONDS_PER_DAY,
Self::SupportDeviceSession(_) => 170 * SECONDS_PER_DAY,
Self::FetchCoinAddresses(_, _) => 7 * SECONDS_PER_DAY,
Self::FetchTokenAddresses(_, _) => 30 * SECONDS_PER_DAY,
Self::FetchNftAssetsAddresses(_, _) => 30 * SECONDS_PER_DAY,
Expand Down
5 changes: 5 additions & 0 deletions crates/primitives/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,11 @@ pub mod websocket;
pub use self::websocket::{WebSocketPriceAction, WebSocketPriceActionType, WebSocketPricePayload};
pub mod stream;
pub use self::stream::{StreamBalanceUpdate, StreamEvent, StreamMessage, StreamMessagePrices, StreamTransactionsUpdate, StreamWalletUpdate, device_stream_channel};
pub mod support;
pub use self::support::{
SupportAgent, SupportConversation, SupportConversationStatus, SupportMessage, SupportMessageDeliveryStatus, SupportMessageInput, SupportMessageSender, SupportStreamEvent,
SupportTyping, SupportTypingStatus,
};
pub mod asset_balance;
pub use self::asset_balance::{AddressBalances, AssetBalance, Balance};
pub mod chain_address;
Expand Down
3 changes: 2 additions & 1 deletion crates/primitives/src/stream.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use serde::{Deserialize, Serialize};
use typeshare::typeshare;

use crate::{AssetId, InAppNotification, TransactionId, WalletId, WebSocketPricePayload};
use crate::{AssetId, InAppNotification, SupportStreamEvent, TransactionId, WalletId, WebSocketPricePayload};

pub const DEVICE_STREAM_CHANNEL_PREFIX: &str = "stream:device:";

Expand All @@ -22,6 +22,7 @@ pub enum StreamEvent {
Perpetual(StreamWalletUpdate),
InAppNotification(StreamNotificationlUpdate),
FiatTransaction(StreamWalletUpdate),
Support(SupportStreamEvent),
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down
Loading
Loading