Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/dockerBuildPush.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
name: Build and Push to Docker Hub

on:
workflow_dispatch:
push:
paths:
- 'src/microbots/environment/local_docker/image_builder/Dockerfile'
Expand Down
1 change: 1 addition & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ addopts =
markers =
unit: Unit tests
integration: Integration tests
anthropic_integration: Integration tests requiring a real Anthropic API key
slow: Slow tests
docker: marks tests that require a running Docker daemon and pull container images
4 changes: 3 additions & 1 deletion src/microbots/MicroBot.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,9 @@ def _create_llm(self):
)
elif self.model_provider == ModelProvider.ANTHROPIC:
self.llm = AnthropicApi(
system_prompt=system_prompt_with_tools, deployment_name=self.deployment_name
system_prompt=system_prompt_with_tools,
deployment_name=self.deployment_name,
additional_tools=self.additional_tools,
)
# No Else case required as model provider is already validated using _validate_model_and_provider

Expand Down
150 changes: 137 additions & 13 deletions src/microbots/llm/anthropic_api.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import json
import os
import re
from dataclasses import asdict
from logging import getLogger
from typing import List, Optional

from dotenv import load_dotenv
from anthropic import Anthropic
Expand All @@ -16,9 +18,49 @@
api_key = os.getenv("ANTHROPIC_API_KEY")



class AnthropicApi(LLMInterface):

def __init__(self, system_prompt, deployment_name=deployment_name, max_retries=3):
def upgrade_tools(self, tools: list) -> list:
"""Replace ``MemoryTool`` with ``AnthropicMemoryTool`` for native tool-use."""
from microbots.tools.tool_definitions.memory_tool import MemoryTool
from microbots.tools.tool_definitions.anthropic_memory_tool import AnthropicMemoryTool

upgraded = []
for tool in tools:
if isinstance(tool, MemoryTool) and not isinstance(tool, AnthropicMemoryTool):
logger.info(
"\U0001f9e0 Auto-upgrading MemoryTool \u2192 AnthropicMemoryTool for Anthropic provider"
)
upgraded.append(AnthropicMemoryTool(
memory_dir=tool.memory_dir,
usage_instructions=tool.usage_instructions_to_llm,
))
else:
upgraded.append(tool)
return upgraded

def __init__(
self,
system_prompt: str,
deployment_name: str = deployment_name,
max_retries: int = 3,
additional_tools: Optional[List] = None,
):
"""
Parameters
----------
system_prompt : str
System prompt for the LLM.
deployment_name : str
The Anthropic model deployment name.
max_retries : int
Maximum number of retries for invalid LLM responses.
additional_tools : Optional[List]
Tool objects passed from MicroBot. Any provider-agnostic tools
(e.g. ``MemoryTool``) are silently upgraded to their Anthropic-
native variants, and their API schemas are extracted.
"""
self.ai_client = Anthropic(
api_key=api_key,
base_url=endpoint
Expand All @@ -27,30 +69,112 @@ def __init__(self, system_prompt, deployment_name=deployment_name, max_retries=3
self.system_prompt = system_prompt
self.messages = []

# Silently upgrade tools in-place and extract API schemas
tools = additional_tools or []
upgraded = self.upgrade_tools(tools)
# Mutate the original list so the caller (MicroBot) sees upgraded tools
if additional_tools is not None:
additional_tools[:] = upgraded
self._tool_dicts = [
t.to_dict() for t in upgraded
if callable(getattr(t, "to_dict", None))
]
self._pending_tool_response = None

# Set these values here. This logic will be handled in the parent class.
self.max_retries = max_retries
self.retries = 0

def ask(self, message) -> LLMAskResponse:
# ---------------------------------------------------------------------- #
# Internal helpers
# ---------------------------------------------------------------------- #

def _call_api(self) -> object:
"""Call the Anthropic messages API, including tool definitions when present."""
kwargs = dict(
model=self.deployment_name,
system=self.system_prompt,
messages=self.messages,
max_tokens=4096,
)

if self._tool_dicts:
kwargs["tools"] = self._tool_dicts

return self.ai_client.messages.create(**kwargs)

def _append_tool_result(self, response, result_text: str) -> None:
"""Append the assistant tool_use turn and the corresponding tool_result user turn.

Called when the caller provides the tool execution result via
the next ``ask()`` call.
"""
assistant_content = [block.model_dump() for block in response.content]
self.messages.append({"role": "assistant", "content": assistant_content})

tool_results = []
for block in response.content:
if block.type != "tool_use":
continue
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": str(result_text),
})

self.messages.append({"role": "user", "content": tool_results})

# ---------------------------------------------------------------------- #
# Public interface
# ---------------------------------------------------------------------- #

def ask(self, message: str) -> LLMAskResponse:
self.retries = 0 # reset retries for each ask. Handled in parent class.

self.messages.append({"role": "user", "content": message})
if self._pending_tool_response:
# Previous response was tool_use — format this message as tool results.
self._append_tool_result(self._pending_tool_response, message)
self._pending_tool_response = None
else:
self.messages.append({"role": "user", "content": message})

valid = False
while not valid:
response = self.ai_client.messages.create(
model=self.deployment_name,
system=self.system_prompt,
messages=self.messages,
max_tokens=4096,
)

# Extract text content from response
response_text = response.content[0].text if response.content else ""
response = self._call_api()

if response.stop_reason == "tool_use":
# Return tool call info as an LLMAskResponse so the
# caller (MicroBot.run) can dispatch the tool.
self._pending_tool_response = response

thoughts = ""
for block in response.content:
if block.type == "text":
thoughts = block.text
break

tool_calls = []
for block in response.content:
if block.type == "tool_use":
tool_calls.append({
"name": block.name,
"id": block.id,
"input": block.input,
})

command = json.dumps({"native_tool_calls": tool_calls})
return LLMAskResponse(task_done=False, thoughts=thoughts, command=command)

# Extract text content from the final response
response_text = ""
for block in response.content:
if block.type == "text":
response_text = block.text
break

logger.debug("Raw Anthropic response (first 500 chars): %s", response_text[:500])

# Try to extract JSON if wrapped in markdown code blocks
import re
json_match = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', response_text, re.DOTALL)
if json_match:
response_text = json_match.group(1)
Expand Down
9 changes: 9 additions & 0 deletions src/microbots/llm/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@ def ask(self, message: str) -> LLMAskResponse:
def clear_history(self) -> bool:
pass

def upgrade_tools(self, tools: list) -> list:
"""Upgrade tools for the specific LLM provider.

The default implementation is a no-op. Subclasses (e.g.
``AnthropicApi``) override this to swap provider-agnostic tools
with their native equivalents.
"""
return tools

def _validate_llm_response(self, response: str) -> tuple[bool, LLMAskResponse]:

if self.retries >= self.max_retries:
Expand Down
2 changes: 2 additions & 0 deletions src/microbots/tools/tool_definitions/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from microbots.tools.tool_definitions.memory_tool import MemoryTool
from microbots.tools.tool_definitions.anthropic_memory_tool import AnthropicMemoryTool
Loading
Loading