Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 85 additions & 134 deletions src/inputs/plugins/twitter.py
Original file line number Diff line number Diff line change
@@ -1,178 +1,129 @@
import asyncio
import logging
from queue import Empty, Queue
from typing import AsyncIterator, List, Optional
import time
from typing import Optional

import aiohttp
from pydantic import Field

from inputs.base import SensorConfig
from inputs.base import Message, SensorConfig
from inputs.base.loop import FuserInput
from providers.io_provider import IOProvider


class TwitterSensorConfig(SensorConfig):
"""
Configuration for Twitter Sensor.

Parameters
----------
query : str
Query to search for on Twitter.
"""
"""Configuration for TwitterInput."""

query: str = Field(
default="What's new in AI and technology?",
description="Query to search for on Twitter",
)
poll_interval: float = Field(
default=60.0,
description="Seconds between API polls",
)


class TwitterInput(FuserInput[TwitterSensorConfig, Optional[str]]):
"""Context query input handler for RAG."""

def __init__(
self,
config: Optional[TwitterSensorConfig],
):
"""Initialize TwitterInput with configuration.
class TwitterInput(FuserInput[TwitterSensorConfig, Optional[dict]]):
"""RAG-based context input from OpenMind knowledge base."""

Parameters
----------
config : Optional[TwitterSensorConfig]
Configuration object from the runtime
"""
def __init__(self, config: Optional[TwitterSensorConfig] = None):
if config is None:
config = TwitterSensorConfig()

super().__init__(config)

self.buffer: List[str] = []
self.message_buffer: Queue[str] = Queue()
self.io_provider = IOProvider()
self.messages: list[Message] = []
self.descriptor_for_LLM = "TwitterInput CONTEXT"
self.api_url = "https://api.openmind.org/api/core/query"
self.session: Optional[aiohttp.ClientSession] = None
self.context: Optional[str] = None

# Use getattr instead of .get() since config is an object, not a dict
self.query = self.config.query

async def __aenter__(self):
"""Async context manager entry."""
await self._init_session()
return self

async def __aexit__(self, _exc_type, _exc_val, _exc_tb):
"""Async context manager exit."""
if self.session:
await self.session.close()
self.poll_interval = self.config.poll_interval
self._last_poll_time: float = 0
self.session: Optional[aiohttp.ClientSession] = None

async def _init_session(self):
"""Initialize aiohttp session if not exists."""
if self.session is None:
timeout = aiohttp.ClientTimeout(total=10)
self.session = aiohttp.ClientSession(timeout=timeout)
self.session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=10)
)

async def _query_context(self, query: str):
"""Perform context query to RAG endpoint."""
async def _poll(self) -> Optional[dict]:
current_time = time.time()

if current_time - self._last_poll_time < self.poll_interval:
await asyncio.sleep(1.0)
return None

self._last_poll_time = current_time
await self._init_session()

if self.session is None:
return None

try:
async with self.session.post( # type: ignore
async with self.session.post(
self.api_url,
json={"query": query},
json={"query": self.query},
headers={"Content-Type": "application/json"},
) as response:
if response.status == 200:
data = await response.json()
if "results" in data:
documents = data["results"]
context = "\n\n".join(
[
r.get("content", {}).get("text", "")
for r in documents
if r.get("content", {}).get("text", "")
]
)
self.context = context
self.buffer = [context] # Replace buffer with context
else:
error_text = await response.text()
logging.error(
f"Query failed with status {response.status}: {error_text}"
)

return await response.json()
error_text = await response.text()
logging.error(
f"TwitterInput: API error {response.status}: {error_text}"
)
return None
except asyncio.TimeoutError:
logging.error("TwitterInput: Request timed out")
return None
except aiohttp.ClientError as e:
logging.error(f"TwitterInput: Network error: {e}")
return None
except Exception as e:
logging.error(f"Error querying context: {str(e)}")

async def raw_to_text(self, raw_input: Optional[str] = None):
"""Convert raw input to text format and add to buffer.

Parameters
----------
raw_input : Optional[str]
Raw input to process. If None, process from message buffer.

Returns
-------
str
The processed text
"""
if raw_input:
self.message_buffer.put_nowait(raw_input)

if self.message_buffer:
try:
message = self.message_buffer.get_nowait()
if message:
self.buffer.append(message)
logging.debug(f"Added to buffer: {message}")
return message
except Empty:
pass

return ""

async def start(self):
"""Start the input handler with initial query."""
await self._query_context(self.query)
self.message_buffer.put_nowait(self.query)

async def listen(self) -> AsyncIterator[str]:
"""Listen for new messages."""
await self.start()

while True:
message = await self._poll()
if message:
yield message
await asyncio.sleep(0.1)

async def _poll(self) -> Optional[str]:
"""Poll for new messages."""
await asyncio.sleep(0.5)
logging.error(f"TwitterInput: Unexpected error: {e}")
return None

async def _raw_to_text(self, raw_input: Optional[dict]) -> Optional[Message]:
if raw_input is None:
return None

try:
message = self.message_buffer.get_nowait()
return message
except Empty:
documents = raw_input.get("results", [])
context = "\n\n".join(
r.get("content", {}).get("text", "")
for r in documents
if r.get("content", {}).get("text", "")
)

if not context:
return None

return Message(timestamp=time.time(), message=context)

except Exception as e:
logging.error(f"TwitterInput: Error parsing response: {e}")
return None

def formatted_latest_buffer(self) -> Optional[str]:
"""Format and return the context."""
content = (
self.context if self.context else (self.buffer[-1] if self.buffer else None)
)
async def raw_to_text(self, raw_input: Optional[dict]):
"""Process raw input and append to message buffer."""
pending_message = await self._raw_to_text(raw_input)
if pending_message is not None:
self.messages.append(pending_message)

if not content:
def formatted_latest_buffer(self) -> Optional[str]:
"""Return latest message formatted for LLM and clear buffer."""
if not self.messages:
return None

result = f"""
TwitterInput CONTEXT
// START
{content}
// END
"""
return result
latest = self.messages[-1]
result = (
f"\nINPUT: {self.descriptor_for_LLM}\n// START\n"
f"{latest.message}\n// END\n"
)

async def initialize_with_query(self, query: str):
"""Initialize with a query."""
logging.info(f"[TwitterInput] Initializing with query: {query}")
self.message_buffer.put_nowait(query) # Add query to message buffer
await self._query_context(query) # Immediately get context
self.io_provider.add_input(
self.descriptor_for_LLM, latest.message, latest.timestamp
)
self.messages = []
return result
Loading
Loading