Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions wavefront/server/apps/floware/floware/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ gcp_location = ${GCP_LOCATION}
gcp_key_ring = ${GCP_KMS_KEY_RING}
gcp_crypto_key = ${GCP_KMS_CRYPTO_KEY}
gcp_crypto_key_version = ${GCP_KMS_CRYPTO_KEY_VERSION}
gcp_enc_crypto_key = ${GCP_KMS_ENC_CRYPTO_KEY}

[floware]
asset_storage_bucket=${ASSET_STORAGE_BUCKET}
Expand Down Expand Up @@ -177,3 +178,12 @@ url=${HERMES_URL:http://localhost:8080/flo-hermes}

[workflow]
worker_topic=${WORKFLOW_WORKER_TOPIC}

[triggers_gmail]
client_id=${GOOGLE_OAUTH_CLIENT_ID}
client_secret=${GOOGLE_OAUTH_CLIENT_SECRET}
redirect_uri=${GOOGLE_OAUTH_REDIRECT_URI}
pubsub_project_id=${GCP_PROJECT_ID}
pubsub_topic_prefix=${GMAIL_PUBSUB_TOPIC_PREFIX:agentic-trigger}
push_endpoint_template=${GMAIL_PUSH_ENDPOINT_TEMPLATE}
oidc_service_account_email=${GMAIL_PUBSUB_OIDC_SA_EMAIL}
35 changes: 35 additions & 0 deletions wavefront/server/apps/floware/floware/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@
AsyncAgenticExecutionResultConsumer,
)
from agents_module.agents_container import AgentsContainer
from triggers_module.controllers.trigger_controller import trigger_router
from triggers_module.triggers_container import TriggersContainer
from inference_module.inference_container import InferenceContainer
from inference_module.controllers.inference_controller import inference_router

Expand Down Expand Up @@ -202,6 +204,17 @@
cache_manager=db_repo_container.cache_manager,
cloud_storage_manager=common_container.cloud_storage_manager,
)

triggers_container = TriggersContainer(
trigger_repository=db_repo_container.agentic_trigger_repository,
credential_repository=db_repo_container.agentic_trigger_credential_repository,
event_repository=db_repo_container.agentic_trigger_event_repository,
agent_repository=db_repo_container.agent_repository,
workflow_repository=db_repo_container.workflow_repository,
async_agentic_execution_service=agents_container.async_agentic_execution_service,
cache_manager=db_repo_container.cache_manager,
)

scheduler_manager = SchedulerManager()


Expand Down Expand Up @@ -234,6 +247,18 @@ async def lifespan(app: FastAPI):
scheduler_manager.register_stale_lock_recovery(
callback=scheduled_job_service.recover_stale_locks_sync
)

trigger_subscription_renewer = triggers_container.trigger_subscription_renewer()

def _run_trigger_renewer_sync() -> None:
try:
asyncio.run(trigger_subscription_renewer.run_once())
except Exception as exc:
logger.warning(f'Trigger subscription renewer failed: {exc}')

scheduler_manager.register_trigger_subscription_renewer(

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will renew the subscription multiple times parallely if there are multiple pods running.

callback=_run_trigger_renewer_sync
)
logger.info('Database connection established.')

# Load API services from database into registry
Expand Down Expand Up @@ -418,6 +443,7 @@ async def metrics(request: Request):
app.include_router(tool_router, prefix='/floware')
app.include_router(message_processor_router, prefix='/floware')
app.include_router(cloud_storage_router, prefix='/floware')
app.include_router(trigger_router, prefix='/floware')


@app.exception_handler(Exception)
Expand Down Expand Up @@ -506,6 +532,7 @@ async def global_exception_handler(request: Request, exc: Exception):
'llm_inference_config_module.controllers',
'tools_module.controllers',
'voice_agents_module.controllers',
'triggers_module.controllers',
],
)

Expand Down Expand Up @@ -538,6 +565,14 @@ async def global_exception_handler(request: Request, exc: Exception):
],
)

triggers_container.wire(
modules=[__name__],
packages=[
'triggers_module.controllers',
'triggers_module.services',
],
)

inference_container.wire(
modules=[__name__],
packages=['inference_module.controllers'],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,17 @@ def register_stale_lock_recovery(self, callback: Callable):
replace_existing=True,
)

def register_trigger_subscription_renewer(self, callback: Callable):
"""Runs every 6 hours to renew provider subscriptions about to expire."""
if self.scheduler is None:
raise RuntimeError('Scheduler must be started before registering jobs')
self.scheduler.add_job(
callback,
trigger=CronTrigger(hour='*/6'),
id='trigger-subscription-renewer',
replace_existing=True,
)

def shutdown(self):
if self.scheduler and self.scheduler.running:
# wait=True ensures in-flight jobs finish before shutdown,
Expand Down
2 changes: 2 additions & 0 deletions wavefront/server/apps/floware/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ dependencies = [
"tools-module",
"api-services-module",
"voice-agents-module",
"triggers-module",


"fastapi>=0.115.2,<1.0.0",
Expand All @@ -48,6 +49,7 @@ llm-inference-config-module = {workspace = true}
tools-module = {workspace = true}
api-services-module = {workspace = true}
voice-agents-module = {workspace = true}
triggers-module = {workspace = true}

[build-system]
requires = ["hatchling"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
include=[
'celery_worker.tasks.agent_task',
'celery_worker.tasks.workflow_task',
'celery_worker.tasks.trigger_event_task',
],
task_serializer='json',
accept_content=['json'],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,22 @@
DB_HOST: str = os.environ['DB_HOST']
DB_PORT: str = os.environ['DB_PORT']
DB_NAME: str = os.environ['DB_NAME']


def _required_env(name: str) -> str:
value = os.getenv(name)
if not value:
raise RuntimeError(f'Missing required environment variable: {name}')
return value


# Triggers — Gmail OAuth + Pub/Sub
GOOGLE_OAUTH_CLIENT_ID: str = _required_env('GOOGLE_OAUTH_CLIENT_ID')

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had assumed this was part of the db schema for creating triggers ? Are we using environment variables ?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should go in db, and should be part of the configuration

GOOGLE_OAUTH_CLIENT_SECRET: str = _required_env('GOOGLE_OAUTH_CLIENT_SECRET')
GOOGLE_OAUTH_REDIRECT_URI: str = _required_env('GOOGLE_OAUTH_REDIRECT_URI')
GCP_PROJECT_ID: str = _required_env('GCP_PROJECT_ID')
GMAIL_PUBSUB_TOPIC_PREFIX: str = os.getenv(
'GMAIL_PUBSUB_TOPIC_PREFIX', 'agentic-trigger'
)
GMAIL_PUSH_ENDPOINT_TEMPLATE: str = _required_env('GMAIL_PUSH_ENDPOINT_TEMPLATE')
GMAIL_PUBSUB_OIDC_SA_EMAIL: str = _required_env('GMAIL_PUBSUB_OIDC_SA_EMAIL')
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import asyncio
from typing import Any, Dict
from uuid import UUID

from common_module.log.logger import logger

from celery_worker.celery_app import app
from celery_worker.env import MAX_RETRIES, RETRY_DELAY
from celery_worker.worker_setup import get_services


@app.task(
name='celery_worker.tasks.trigger_event_task.process_trigger_event_task',
bind=True,
max_retries=MAX_RETRIES,
default_retry_delay=RETRY_DELAY,
)
def process_trigger_event_task(
self, trigger_id: str, raw_payload: Dict[str, Any], push_message_id: str
) -> Dict[str, Any]:
services = get_services()
processor = services.trigger_event_processor

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
parsed_trigger_id = UUID(trigger_id)
return loop.run_until_complete(
processor.process(trigger_id=parsed_trigger_id, raw_payload=raw_payload)
)
except ValueError:
logger.error(f'Invalid trigger_id for process_trigger_event_task: {trigger_id}')
raise
except Exception as exc:
logger.exception(
f'process_trigger_event_task failed for trigger {trigger_id} '
f'(push_message_id={push_message_id}): {exc}'
)
if self.request.retries < self.max_retries:
raise self.retry(exc=exc)
raise
Comment thread
coderabbitai[bot] marked this conversation as resolved.
finally:
pending = asyncio.all_tasks(loop)
for task in pending:
task.cancel()
try:
loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
except Exception:
Comment thread
github-code-quality[bot] marked this conversation as resolved.
Fixed
pass
loop.close()
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
from flo_cloud.cloud_storage import CloudStorageManager
from plugins_module.plugins_container import PluginsContainer
from tools_module.tools_container import ToolsContainer
from triggers_module.services.trigger_event_processor import TriggerEventProcessor
from triggers_module.triggers_container import TriggersContainer

from celery_worker.env import (
AGENT_YAML_BUCKET,
Expand All @@ -35,6 +37,13 @@
DB_PASSWORD,
DB_PORT,
DB_USERNAME,
GCP_PROJECT_ID,
GMAIL_PUBSUB_OIDC_SA_EMAIL,
GMAIL_PUBSUB_TOPIC_PREFIX,
GMAIL_PUSH_ENDPOINT_TEMPLATE,
GOOGLE_OAUTH_CLIENT_ID,
GOOGLE_OAUTH_CLIENT_SECRET,
GOOGLE_OAUTH_REDIRECT_URI,
WORKFLOW_WORKER_TOPIC,
)

Expand All @@ -46,6 +55,7 @@ class WorkerServices:
cloud_storage: CloudStorageManager
cache: CacheManager
execution_bucket: str
trigger_event_processor: TriggerEventProcessor


_lock = threading.Lock()
Expand Down Expand Up @@ -145,12 +155,36 @@ def get_services() -> WorkerServices:
# floware's CacheManager uses config.env_config.app_name as its namespace
cache = CacheManager(namespace=APP_NAME)

triggers_container = TriggersContainer(
trigger_repository=db_repo_container.agentic_trigger_repository,
credential_repository=db_repo_container.agentic_trigger_credential_repository,
event_repository=db_repo_container.agentic_trigger_event_repository,
agent_repository=db_repo_container.agent_repository,
workflow_repository=db_repo_container.workflow_repository,
async_agentic_execution_service=agents_container.async_agentic_execution_service,
)
triggers_container.config.from_dict(
{
'cloud_config': {'cloud_provider': CLOUD_PROVIDER},
'triggers_gmail': {
'client_id': GOOGLE_OAUTH_CLIENT_ID,
'client_secret': GOOGLE_OAUTH_CLIENT_SECRET,
'redirect_uri': GOOGLE_OAUTH_REDIRECT_URI,
'pubsub_project_id': GCP_PROJECT_ID,
'pubsub_topic_prefix': GMAIL_PUBSUB_TOPIC_PREFIX,
'push_endpoint_template': GMAIL_PUSH_ENDPOINT_TEMPLATE,
'oidc_service_account_email': GMAIL_PUBSUB_OIDC_SA_EMAIL or None,
},
}
)

_services = WorkerServices(
agent_inference=agents_container.agent_inference_service(),
workflow_inference=agents_container.workflow_inference_service(),
cloud_storage=common_container.cloud_storage_manager(),
cache=cache,
execution_bucket=AGENTIC_EXECUTIONS_BUCKET,
trigger_event_processor=triggers_container.trigger_event_processor(),
)

return _services
2 changes: 2 additions & 0 deletions wavefront/server/background_jobs/celery_worker/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ dependencies = [
"tools-module",
"api-services-module",
"common-module",
"triggers-module",
"celery[redis]>=5.4.0,<6.0.0",
"python-dotenv>=1.1.0,<2.0.0",
]
Expand All @@ -27,6 +28,7 @@ flo-utils = { workspace = true }
tools-module = { workspace = true }
api-services-module = { workspace = true }
common-module = { workspace = true }
triggers-module = { workspace = true }

[tool.uv]
package = true
Expand Down
43 changes: 43 additions & 0 deletions wavefront/server/docker/celery_worker.Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
FROM python:3.11-slim

WORKDIR /app

COPY --from=ghcr.io/astral-sh/uv:0.8.6 /uv /uvx /bin/

RUN apt-get update && apt-get install -y \
libpq-dev \
gcc \
libgl1 \
libglib2.0-0 \
&& rm -rf /var/lib/apt/lists/*
Comment on lines +7 to +12

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

rg -nP --iglob '*Dockerfile*' 'apt-get\s+install\s+-y(?!\s+--no-install-recommends)'

Repository: rootflo/wavefront

Length of output: 613


Add --no-install-recommends to apt-get install to reduce image size/attack surface.

wavefront/server/docker/celery_worker.Dockerfile (and other Dockerfiles in the same folder: workflow.Dockerfile, inference_app.Dockerfile, floware.Dockerfile, floconsole.Dockerfile, call_processing.Dockerfile) use apt-get install -y without --no-install-recommends.

💡 Proposed fix
-RUN apt-get update && apt-get install -y \
+RUN apt-get update && apt-get install -y --no-install-recommends \
     libpq-dev \
     gcc \
     libgl1 \
     libglib2.0-0 \
     && rm -rf /var/lib/apt/lists/*
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
RUN apt-get update && apt-get install -y \
libpq-dev \
gcc \
libgl1 \
libglib2.0-0 \
&& rm -rf /var/lib/apt/lists/*
RUN apt-get update && apt-get install -y --no-install-recommends \
libpq-dev \
gcc \
libgl1 \
libglib2.0-0 \
&& rm -rf /var/lib/apt/lists/*
🧰 Tools
🪛 Trivy (0.69.3)

[error] 7-12: 'apt-get' missing '--no-install-recommends'

'--no-install-recommends' flag is missed: 'apt-get update && apt-get install -y libpq-dev gcc libgl1 libglib2.0-0 && rm -rf /var/lib/apt/lists/*'

Rule: DS-0029

Learn more

(IaC/Dockerfile)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@wavefront/server/docker/celery_worker.Dockerfile` around lines 7 - 12, The
apt-get install invocation in the RUN line should include
--no-install-recommends to avoid installing unnecessary packages; update the RUN
line in celery_worker.Dockerfile (and the sibling Dockerfiles
workflow.Dockerfile, inference_app.Dockerfile, floware.Dockerfile,
floconsole.Dockerfile, call_processing.Dockerfile) so it reads apt-get install
-y --no-install-recommends \ followed by the same packages and keep the existing
rm -rf /var/lib/apt/lists/*; ensure you only add the flag and do not remove the
existing cleanup step.


COPY wavefront/server/pyproject.toml wavefront/server/uv.lock ./

COPY wavefront/server/modules/common_module /app/modules/common_module
COPY wavefront/server/modules/db_repo_module /app/modules/db_repo_module
COPY wavefront/server/modules/knowledge_base_module /app/modules/knowledge_base_module
COPY wavefront/server/modules/llm_inference_config_module /app/modules/llm_inference_config_module
COPY wavefront/server/modules/agents_module /app/modules/agents_module
COPY wavefront/server/modules/plugins_module /app/modules/plugins_module
COPY wavefront/server/modules/tools_module /app/modules/tools_module
COPY wavefront/server/modules/api_services_module /app/modules/api_services_module
COPY wavefront/server/modules/triggers_module /app/modules/triggers_module

COPY wavefront/server/packages/flo_cloud /app/packages/flo_cloud
COPY wavefront/server/packages/flo_utils /app/packages/flo_utils

COPY wavefront/server/plugins/datasource /app/plugins/datasource
COPY wavefront/server/plugins/authenticator /app/plugins/authenticator

COPY wavefront/server/background_jobs/celery_worker /app/background_jobs/celery_worker

RUN uv sync --package celery-worker --frozen --no-dev

RUN useradd -m -u 1000 celery && \
chown -R celery:celery /app

USER celery

WORKDIR /app/background_jobs/celery_worker

CMD ["uv", "run", "celery", "-A", "celery_worker.celery_app", "worker", "--loglevel=info", "--pool=solo"]
1 change: 1 addition & 0 deletions wavefront/server/docker/floware.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ COPY wavefront/server/modules/inference_module /app/modules/inference_module
COPY wavefront/server/modules/tools_module /app/modules/tools_module
COPY wavefront/server/modules/voice_agents_module /app/modules/voice_agents_module
COPY wavefront/server/modules/api_services_module /app/modules/api_services_module
COPY wavefront/server/modules/triggers_module /app/modules/triggers_module

COPY wavefront/server/packages/flo_cloud /app/packages/flo_cloud
COPY wavefront/server/packages/flo_utils /app/packages/flo_utils
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@
from db_repo_module.models.message_processors import MessageProcessors
from db_repo_module.models.scheduled_job import ScheduledJob
from db_repo_module.models.scheduled_job_execution import ScheduledJobExecution
from db_repo_module.models.async_agentic_execution import AsyncAgenticExecution
from db_repo_module.models.agentic_trigger_credential import AgenticTriggerCredential
from db_repo_module.models.agentic_trigger import AgenticTrigger
from db_repo_module.models.agentic_trigger_event import AgenticTriggerEvent
from dotenv import load_dotenv
from sqlalchemy import engine_from_config
from sqlalchemy import pool
Expand Down Expand Up @@ -80,6 +84,10 @@
MessageProcessors,
ScheduledJob,
ScheduledJobExecution,
AsyncAgenticExecution,
AgenticTriggerCredential,
AgenticTrigger,
AgenticTriggerEvent,
]
target_metadata = Base.metadata

Expand Down
Loading
Loading