Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
739389d
fix: remove IGNORED_MESSAGE_TAGS filter in EmailMessage extractContacts
malek10xdev Mar 26, 2026
3aab105
update unit tests
malek10xdev Mar 26, 2026
133e64e
feat(signature): add normalize() in-memory cache
malek10xdev Mar 27, 2026
b63c4be
feat(cache): add setIfNewer to EmailSignatureCache interface
malek10xdev Apr 1, 2026
e64b673
feat(cache): implement setIfNewer with Redis pipeline
malek10xdev Apr 1, 2026
0e66695
feat(signature): add SignatureExtractorCache LLM response decorator
malek10xdev Apr 1, 2026
844cabd
feat(config): add SIGNATURE_LLM_CACHE_TTL_SECONDS env var
malek10xdev Apr 1, 2026
10b2f46
feat(signature-worker): wrap SignatureLLM with SignatureExtractorCache
malek10xdev Apr 1, 2026
3ea9c00
test(signature): add unit tests for SignatureExtractorCache
malek10xdev Apr 1, 2026
61bb342
fix(signature): address lint issues in SignatureExtractorCache
malek10xdev Apr 1, 2026
c809639
fix(signature): remove normalize() cache as it's over-engineered
malek10xdev Apr 1, 2026
1e01eb7
revert: remove normalize cache - over-engineered
malek10xdev Apr 1, 2026
74117aa
fix: properly type mock Redis in tests
malek10xdev Apr 1, 2026
6dc85cd
fix: add skipcq comment for intentional process.exit in worker startup
malek10xdev Apr 1, 2026
845e7e3
docs: add SIGNATURE_LLM_CACHE_TTL_SECONDS to env files
malek10xdev Apr 1, 2026
de7035a
feat: add SIGNATURE_LLM_CACHE_TTL_SECONDS env variable
malek10xdev Apr 1, 2026
7199221
feat(signature): add logging for LLM cache hits/misses with wrapped e…
malek10xdev Apr 3, 2026
dc4555f
fix: revert local test changes and fix DeepSource any type issue
malek10xdev Apr 3, 2026
bebe6a8
feat(redis): add exceptPrefixes param to flushAll to preserve LLM cac…
malek10xdev Apr 3, 2026
52e899a
fix: simplify boolean check
malek10xdev Apr 3, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 141 additions & 0 deletions .env
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@

# ==============| FRONTEND |============= #
FRONTEND_HOST = http://localhost:8082 # ( REQUIRED ) Frontend base URL
SERVER_ENDPOINT = http://localhost:8081 # ( REQUIRED ) Backend API endpoint
IMAGE_REVERSE_PROXY = # ( OPTIONAL ) Proxy for validating or checking image URLs
AVERAGE_EXTRACTION_RATE = 50 # ( OPTIONAL ) Estimated extraction rate (messages processed per second)

ENABLE_CREDIT = true # ( REQUIRED ) Enable or disable credit-based operations
CREDITS_PER_CONTACT = 1 # ( REQUIRED ) Number of credits charged per contact enrichment/export
EXTERNAL_REFILL_CREDITS_LINK = # ( OPTIONAL ) External link for users to purchase or refill credits
NOMINATIM_URL = 'https://nominatim.openstreetmap.org/search'

# ==============| BACKEND |============= #
APP_NAME = leadminer
FRONTEND_HOST = http://localhost:8082 # ( REQUIRED )
LEADMINER_API_PORT = 8081 # ( REQUIRED )
LEADMINER_API_HOST = http://localhost:8081 # ( REQUIRED )
LEADMINER_API_HASH_SECRET = change_me # ( REQUIRED ) Used for hashing secrets
LEADMINER_MINING_ID_GENERATOR_LENGTH = 10 # ( REQUIRED ) Length of the task id

REDIS_PUBSUB_COMMUNICATION_CHANNEL = 'stream-management' # Communication channel between task-manager & workers
REDIS_SIGNATURE_STREAM_NAME = 'email-signature-stream'
REDIS_SIGNATURE_STREAM_CONSUMER_GROUP = 'email-signature-consumer-group'
REDIS_CLEANING_STREAM_CONSUMER_GROUP = 'email-verification-consumer-group' # consumer group for cleaning worker
REDIS_EXTRACTING_STREAM_CONSUMER_GROUP = 'email-messages-consumer-group' # Consumer group for extracting worker
REDIS_EMAIL_SIGNATURE_CONSUMER_BATCH_SIZE = 500
REDIS_EMAIL_VERIFICATION_CONSUMER_BATCH_SIZE = 1000

SIGNATURE_USE_LLM = false
SIGNATURE_OPENROUTER_API_KEY = 'test-key'

THEDIG_URL = http://127.0.0.1:8083/thedig/
THEDIG_API_KEY = _
VOILANORBERT_URL = http://127.0.0.1:8083/voilanorbert/2018-01-08/
VOILANORBERT_USERNAME = any_string
VOILANORBERT_API_KEY = test-api-key
ENRICH_LAYER_URL = https://enrichlayer.com/
ENRICH_LAYER_API_KEY = _

# ZEROBOUNCE_API_KEY =
# MAILERCHECK_API_KEY =
EMAILS_QUOTA_REACHER = 600
EMAILS_QUOTA_MAILERCHECK = 600
EMAILS_QUOTA_ZEROBOUNCE = 3000000
REACHER_HOST = https://stoplight.io/mocks/reacher/backend/68673 # ( REQUIRED )
REACHER_API_KEY = 'test-key'
# REACHER_HEADER_SECRET =
# REACHER_SMTP_FROM =
# REACHER_SMTP_HELLO =
# REACHER_PROXY_PORT =
# REACHER_PROXY_HOST =
# REACHER_PROXY_USERNAME =
# REACHER_PROXY_PASSWORD =
# REACHER_REQUEST_TIMEOUT_MS =
# REACHER_SMTP_CONNECTION_TIMEOUT_SECONDS =
# REACHER_SMTP_CONNECTION_RETRIES =
REACHER_HOTMAIL_USE_HEADLESS =true
# REACHER_MICROSOFT365_USE_API =
REACHER_GMAIL_USE_API =true
# REACHER_YAHOO_USE_API =
REACHER_RATE_LIMITER_REQUESTS = 60
REACHER_RATE_LIMITER_INTERVAL = 60000 # 1 minute

# ==============| MICRO-SERVICES |============= #
EMAIL_FETCHING_SERVICE_PORT = 8084 # ( REQUIRED )
EMAIL_FETCHING_SERVICE_URL = http://localhost:8083 # ( REQUIRED )
EMAIL_FETCHING_SERVICE_NAME = email-fetcher-service # ( REQUIRED )
EMAIL_FETCHING_SERVICE_API_TOKEN = 'test-key' # ( REQUIRED )
FETCHING_BATCH_SIZE_TO_SEND = 200 # Number of messages per sending batch
FETCHING_CHUNK_SIZE_PER_CONNECTION = 2000 # Number of messages fetched per IMAP connection chunk
FETCHING_MAX_CONNECTIONS_PER_FOLDER = 10 # Maximum concurrent connections per IMAP folder

# ==============| ANALYTICS |============= #
POSTHOG_API_KEY = # ( OPTIONAL ) PostHog API key for event tracking
POSTHOG_INSTANCE_ADDRESS = # ( OPTIONAL ) PostHog instance address (URL)
# POSTHOG_PROXY_ADDRESS= # ( OPTIONAL ) PostHog proxy address (URL)
# POSTHOG_INGEST_INSTANCE= # ( OPTIONAL ) PostHog ingest instance (URL)
# POSTHOG_STATIC_ASSETS_PROXY= # ( OPTIONAL ) PostHog static assets proxy (URL)

# ==============| SENTRY |============= #
# SENTRY_DSN_FRONTEND= # ( OPTIONAL ) Sentry dsn
# SENTRY_ENVIRONMENT_FRONTEND= # ( OPTIONAL ) Sentry environment
# SENTRY_DSN_BACKEND = # ( OPTIONAL ) Sentry DSN for reporting and monitoring
# SENTRY_ENVIRONMENT_BACKEND = 'leadminer-backend-prod'


# ==============| LOGGING / MONITORING |============= #
# GRAFANA_LOKI_HOST = # Use Loki transport for logging (Default is console)
LEADMINER_API_LOG_LEVEL = debug # Logging level (debug, info, notice, warning...)
EMAIL_FETCHING_SERVICE_LOG_LEVEL = debug

# ==============| SUPABASE INSTANCES |============= #
SAAS_SUPABASE_PROJECT_URL = http://127.0.0.1:54321 # ( REQUIRED ) SaaS Supabase project URL
SAAS_SUPABASE_ANON_KEY = eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6ImFub24iLCJleHAiOjE5ODM4MTI5OTZ9.CRXP1A7WOeoJeXxjNni43kdQwgnWNReilDMblYTn_I0 # ( REQUIRED ) Anon key (respects RLS and policy rules)

SUPABASE_PROJECT_URL = http://127.0.0.1:54321 # ( REQUIRED ) Supabase project URL (e.g., https://db.yourdomain.com for self-hosted/prod)
SUPABASE_ANON_KEY = eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6ImFub24iLCJleHAiOjE5ODM4MTI5OTZ9.CRXP1A7WOeoJeXxjNni43kdQwgnWNReilDMblYTn_I0 # ( REQUIRED ) Anon key (respects RLS and policy rules)
PG_CONNECTION_STRING = postgresql://postgres:postgres@127.0.0.1:54322/postgres # ( REQUIRED ) Postgres connection string

# ==============| REDIS INSTANCE |============= #
REDIS_HOST = localhost # ( REQUIRED ) Redis host
REDIS_PORT = 6379 # ( REQUIRED ) Redis port
# REDIS_USERNAME = # ( OPTIONAL ) Redis username
# REDIS_PASSWORD = # ( OPTIONAL ) Redis password
REDIS_EVICTION_POLICY = allkeys-lru # ( REQUIRED ) Redis Eviction policies: noeviction, allkeys-lru, allkeys-random, volatile-lru, volatile-random, volatile-ttl
REDIS_MAXMEMORY = 1gb # ( REQUIRED ) Max memory used by redis, adjust it depending on your server capacity.
REDIS_DB = 0 # ( REQUIRED ) Redis db
REDIS_TLS = false # ( REQUIRED ) Enable or disable TLS for Redis
REDIS_EXPIRATION_TIMEOUT = 7776000 # ( REQUIRED ) Expiration timeout (90 days ) for Redis entries
REDIS_CONSUMER_BATCH_SIZE = 200 # ( REQUIRED ) The entries to read from each stream

# ==============| IMAP CONNECTION CONFIG |============= #
IMAP_FETCH_BODY = true # Enable or disable fetching Email bodies
IMAP_MAX_CONNECTIONS = 10 # Maximum number of simultaneous IMAP connections allowed. It is recommended to keep this value between 1 and 15.
IMAP_AUTH_TIMEOUT = 10000 # Milliseconds to wait for authentication after an IMAP connection is established
IMAP_CONNECTION_TIMEOUT = 10000 # Milliseconds to wait for a connection to be established

# ==============| OAUTH CONFIG |============= #
GOOGLE_CLIENT_ID = 21825381029-993l33883t26n48fv11mmm049j6qn6lh.apps.googleusercontent.com # ( REQUIRED ) Google client ID
GOOGLE_SECRET = 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 # ( REQUIRED ) Google secret
AZURE_CLIENT_ID = your_azure_client_id # ( REQUIRED ) Azure client ID
AZURE_SECRET = your_azure_client_secret # ( REQUIRED ) Azure secret


# ==============| GOOGLE CONTACTS Sync |============= #

# Critical Read Operations (API Limit: 90/min)
GOOGLE_CONTACTS_CRITICAL_READ_REQUESTS = 80 # ( OPTIONAL ) Number of requests allowed per interval
GOOGLE_CONTACTS_CRITICAL_READ_INTERVAL = 60 # ( OPTIONAL ) Rate limit interval in seconds

# Critical Write Operations (API Limit: 90/min)
GOOGLE_CONTACTS_CRITICAL_WRITE_REQUESTS = 80 # ( OPTIONAL ) Number of requests allowed per interval
GOOGLE_CONTACTS_CRITICAL_WRITE_INTERVAL = 60 # ( OPTIONAL ) Rate limit interval in seconds

# Read Operations - Groups (API Limit: 120/min)
GOOGLE_CONTACTS_READ_REQUESTS = 110 # ( OPTIONAL ) Number of requests allowed per interval
GOOGLE_CONTACTS_READ_INTERVAL = 60 # ( OPTIONAL ) Rate limit interval in seconds

# Write Operations - Groups/Delete (API Limit: 90/min)
GOOGLE_CONTACTS_WRITE_REQUESTS = 80 # ( OPTIONAL ) Number of requests allowed per interval
GOOGLE_CONTACTS_WRITE_INTERVAL = 60 # ( OPTIONAL ) Rate limit interval in seconds
1 change: 1 addition & 0 deletions .env.docker
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ REDIS_EMAIL_VERIFICATION_CONSUMER_BATCH_SIZE = 1000

SIGNATURE_USE_LLM = false
SIGNATURE_OPENROUTER_API_KEY = 'test-key'
SIGNATURE_LLM_CACHE_TTL_SECONDS = 86400 # LLM response cache TTL in seconds (default: 86400)

THEDIG_URL = http://127.0.0.1:8083/thedig/
THEDIG_API_KEY = _
Expand Down
1 change: 1 addition & 0 deletions backend/.env.dev
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ AZURE_SECRET = your_azure_client_secret # ( REQUIRED ) Azure secret
# ==============| API - EMAIL SIGNATURE EXTRACTION WORKER |============= #
SIGNATURE_USE_LLM = false
SIGNATURE_OPENROUTER_API_KEY = "test-key"
SIGNATURE_LLM_CACHE_TTL_SECONDS = 86400 # ( OPTIONAL ) LLM response cache TTL in seconds (default: 86400 = 24 hours)


# ==============| THIRD-PARTY - EMAIL STATUS VERIFICATION SERVICES |============= #
Expand Down
1 change: 1 addition & 0 deletions backend/src/config/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const schema = z.object({

SIGNATURE_USE_LLM: boolean(),
SIGNATURE_OPENROUTER_API_KEY: z.string().min(1).optional(),
SIGNATURE_LLM_CACHE_TTL_SECONDS: number().optional().default(86400),

/* SUPABASE + POSTGRES */
SUPABASE_PROJECT_URL: z.string().url(),
Expand Down
35 changes: 23 additions & 12 deletions backend/src/emailSignatureWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import RedisEmailSignatureCache from './services/cache/redis/RedisEmailSignature
import supabaseClient from './utils/supabase';
import { EngineConfig, Signature } from './services/signature';
import { SignatureLLM } from './services/signature/llm';
import SignatureExtractorCache from './services/signature/llm/signature-extractor-cache';
import { checkDomainStatus } from './utils/helpers/domainHelpers';
import { Distribution, TokenBucketRateLimiter } from './services/rate-limiter';
import { SignatureRE } from './services/signature/regex';
Expand All @@ -33,19 +34,28 @@ const signatureEngines: EngineConfig[] = [
];

if (ENV.SIGNATURE_OPENROUTER_API_KEY) {
const signatureLLM = new SignatureLLM(
new TokenBucketRateLimiter({
executeEvenly: false,
intervalSeconds: 60,
uniqueKey: 'email-signature-service',
distribution: Distribution.Memory,
requests: LLMModelsList.every((m) => m.includes('free')) ? 15 : 1000
}),
logger,
LLMModelsList,
ENV.SIGNATURE_OPENROUTER_API_KEY ?? ''
);

const signatureExtractorCache = new SignatureExtractorCache(
signatureLLM,
redisClient,
logger,
ENV.SIGNATURE_LLM_CACHE_TTL_SECONDS ?? 86400
);

signatureEngines.push({
engine: new SignatureLLM(
new TokenBucketRateLimiter({
executeEvenly: false,
intervalSeconds: 60,
uniqueKey: 'email-signature-service',
distribution: Distribution.Memory,
requests: LLMModelsList.every((m) => m.includes('free')) ? 15 : 1000
}),
logger,
LLMModelsList,
ENV.SIGNATURE_OPENROUTER_API_KEY ?? ''
),
engine: signatureExtractorCache,
useAsFallback: false
});
}
Expand Down Expand Up @@ -98,6 +108,7 @@ const emailsStreamConsumer = new EmailSignatureConsumer(
logger.info('Consumer group already created');
} else {
logger.error('Failed to start consumer:', err);
// skipcq: JS-0263 - Intentional: worker must terminate on startup failure, cannot recover
process.exit(1);
}
}
Expand Down
2 changes: 1 addition & 1 deletion backend/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ console.log(
);

(async () => {
await redis.flushAll([ENV.REDIS_SIGNATURE_STREAM_NAME]);
await redis.flushAll([ENV.REDIS_SIGNATURE_STREAM_NAME], ['llm-sig:']);
await redis.initProviders();

const miningSources = new PgMiningSources(
Expand Down
13 changes: 13 additions & 0 deletions backend/src/services/cache/EmailSignatureCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,19 @@ export default interface EmailSignatureCache {
*/
isNewer(userId: string, email: string, messageDate: string): Promise<boolean>;

/**
* Atomically checks if messageDate is newer and sets the signature
* @returns true if the signature was newer and was set, false otherwise
*/
setIfNewer(
userId: string,
email: string,
signature: string,
messageId: string,
messageDate: string,
miningId: string
): Promise<boolean>;

/**
* Retrieves all signatures associated with a mining operation
* @param miningId - The ID of the mining operation
Expand Down
47 changes: 47 additions & 0 deletions backend/src/services/cache/redis/RedisEmailSignatureCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,53 @@ export default class RedisEmailSignatureCache implements EmailSignatureCache {
return new Date(messageDate) > new Date(existingDateStr);
}

public async setIfNewer(
userId: string,
email: string,
signature: string,
messageId: string,
messageDate: string,
miningId: string
): Promise<boolean> {
const key = RedisEmailSignatureCache.getKey(userId, email);
const messageDateISO = new Date(messageDate).toISOString();

const pipeline = this.client.pipeline();
pipeline.hget(key, 'lastSeenDate');
pipeline.hgetall(key);

const results = await pipeline.exec();
if (!results) return false;

const [dateResult, allResult] = results as [
[Error, string | null],
[Error, Partial<EmailSignatureWithMetadata>]
];

const existingDateStr = dateResult[1];
if (existingDateStr && new Date(messageDate) <= new Date(existingDateStr)) {
return false;
}

const existing = allResult[1];
const isNew = !existing || Object.keys(existing).length === 0;
const firstSeenDate = isNew
? messageDateISO
: (existing.firstSeenDate ?? messageDateISO);

await this.client.hset(key, {
signature,
firstSeenDate,
lastSeenDate: messageDateISO,
messageId,
userId,
email
});

await this.client.sadd(`mining:${miningId}`, key);
return true;
}

public async getAllFromMining(
miningId: string
): Promise<EmailSignatureWithMetadata[]> {
Expand Down
14 changes: 8 additions & 6 deletions backend/src/services/extractors/engines/EmailMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -333,12 +333,14 @@ export default class EmailMessage {
field: validContact.sourceField
});

// Eliminate unwanted contacts associated with tags listed in IGNORED_MESSAGE_TAGS
if (
tags.some((t) => EmailMessage.IGNORED_MESSAGE_TAGS.includes(t.name))
) {
return;
}
// TODO: [BUG] Re-enable once refine process properly handles transactional tag
// Previously this filtered out contacts from transactional emails, but it caused
// transactional-tagged contacts to be missing from the refined table.
// if (
// tags.some((t) => EmailMessage.IGNORED_MESSAGE_TAGS.includes(t.name))
// ) {
// return;
// }

if (tags.some((t) => t.reachable === REACHABILITY.DIRECT_PERSON)) {
if (
Expand Down
23 changes: 18 additions & 5 deletions backend/src/services/signature/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,29 @@ export class Signature implements ExtractSignature {
return fallbackEngine?.engine || null;
}

/**
* Get display name for an engine, including wrapped engine if applicable
*/
private static getEngineDisplayName(engine: ExtractSignature): string {
const baseName = engine.constructor.name;
if (engine.wrappedEngineName) {
return `${baseName} (${engine.wrappedEngineName})`;
}
return baseName;
}

async extract(email: string, signature: string): Promise<PersonLD | null> {
const primary = this.getEngine();
const fallback = this.getFallback();
const engine = primary ?? fallback;

if (engine && primary) {
this.logger.debug(`Using primary engine: ${primary.constructor.name}`);
this.logger.debug(
`Using primary engine: ${Signature.getEngineDisplayName(primary)}`
);
} else if (engine && fallback) {
this.logger.debug(
`Primary engine not available, falling back to: ${fallback.constructor.name}`
`Primary engine not available, falling back to: ${Signature.getEngineDisplayName(fallback)}`
);
} else {
this.logger.error('No available engine for signature extraction');
Expand All @@ -63,12 +76,12 @@ export class Signature implements ExtractSignature {

try {
this.logger.debug(
`Attempting extraction with engine: ${engine.constructor.name}`
`Attempting extraction with engine: ${Signature.getEngineDisplayName(engine)}`
);
const result = await engine.extract(email, signature);

this.logger.debug(
`Engine ${engine.constructor.name} extraction completed`,
`Engine ${Signature.getEngineDisplayName(engine)} extraction completed`,
{
success: result !== null
}
Expand All @@ -77,7 +90,7 @@ export class Signature implements ExtractSignature {
return result;
} catch (err) {
this.logger.error(
`Engine ${engine.constructor.name} failed during extraction`,
`Engine ${Signature.getEngineDisplayName(engine)} failed during extraction`,
{
error: err instanceof Error ? err.message : String(err)
}
Expand Down
Loading
Loading