Skip to main content

Installation

pip install raindrop-google-adk google-adk

Quick Start

from raindrop_google_adk import setup_google_adk
from google.adk import Runner
from google.adk.agents import LlmAgent
from google.adk.sessions import InMemorySessionService
from google.genai import types

# Enable automatic tracing for all ADK Runner interactions
rd = setup_google_adk(api_key="your-write-key")

def get_weather(city: str) -> dict:
    """Get weather for a city."""
    return {"temperature": 72, "condition": "sunny", "city": city}

agent = LlmAgent(
    name="weather_assistant",
    tools=[get_weather],
    model="gemini-2.5-flash",
    instruction="You are a helpful assistant that can check weather.",
)

session_service = InMemorySessionService()
runner = Runner(app_name="weather_app", agent=agent, session_service=session_service)

user_msg = types.Content(
    parts=[types.Part(text="What's the weather in New York?")],
    role="user",
)

for event in runner.run(user_id="user123", session_id="session123", new_message=user_msg):
    if event.is_final_response():
        print(event.content.parts[0].text)

What Gets Traced

The Google ADK integration automatically captures:
  • Runner invocations — input message, user_id, session_id, app_name
  • Agent responses — final output text from the agent
  • Token usage — prompt_tokens, completion_tokens, total_tokens from usage metadata
  • Tool calls — individual tool spans with name, input, output, duration, and error
  • Model info — model version when available (e.g., gemini-2.5-flash)
  • Agent identity — agent name, author from events
  • Finish reason — why generation stopped (e.g., STOP, SAFETY, MAX_TOKENS)
  • Errors — captured (with error message) and re-raised to the caller
  • Async support — both run() (sync) and run_async() (async) are instrumented

Configuration

Use setup_google_adk() to automatically patch all ADK Runner instances:
from raindrop_google_adk import setup_google_adk

rd = setup_google_adk(
    api_key="your-write-key",      # Required: your Raindrop API key. If None, telemetry is disabled.
    user_id="user-123",            # Optional: default user ID for all events
    convo_id="convo-456",          # Optional: conversation/thread ID
    tracing_enabled=True,          # Optional: enable/disable OTEL tracing (default: True)
    bypass_otel_for_tools=True,    # Optional: bypass OTEL for tool calls (default: True)
)

# All Runner.run() and Runner.run_async() calls are now automatically traced
# Call rd.shutdown() before process exit

Manual wrapping

Use create_raindrop_google_adk() to wrap specific Runner instances:
from raindrop_google_adk import create_raindrop_google_adk

rd = create_raindrop_google_adk(
    api_key="your-write-key",
    user_id="user-123",
)

wrapped_runner = rd.wrap(runner)

for event in wrapped_runner.run(user_id="user123", session_id="s1", new_message=msg):
    print(event)

rd.shutdown()

Class-based API

from raindrop_google_adk import RaindropGoogleADK

rd = RaindropGoogleADK(
    api_key="your-write-key",
    user_id="user-123",
    tracing_enabled=True,
    bypass_otel_for_tools=True,
    debug=False,
)

# Auto-patch all Runner instances
rd.setup()

# Or wrap a specific runner
wrapped = rd.wrap(runner)

# Cleanup
rd.shutdown()

Debug Mode

Enable verbose logging to troubleshoot integration issues:
rd = RaindropGoogleADK(
    api_key="your-write-key",
    debug=True,  # Enables DEBUG-level logging for the integration
)
When debug=True, internal telemetry operations (event extraction, interaction lifecycle) are logged at the DEBUG level via Python’s standard logging module.

Identify

Associate a user with traits for downstream analysis:
rd.identify(
    user_id="user-123",
    traits={"plan": "pro", "company": "Acme"},
)

Track Signal

Track feedback, edits, or custom signals tied to a specific event:
rd.track_signal(
    event_id="evt-abc123",
    name="thumbs_up",
    signal_type="feedback",
    sentiment="POSITIVE",
    comment="Great response!",
)

Tool Call Tracking

When your agent uses tools, individual tool spans are captured automatically with name, input, output, duration, and error status:
def get_stock_price(symbol: str) -> str:
    """Get the current stock price."""
    return "189.50"

agent = LlmAgent(
    name="stock_agent",
    tools=[get_stock_price],
    model="gemini-2.5-flash",
    instruction="Answer questions about stock prices.",
)

# Tool calls are tracked as individual spans and counted in event properties as
# google_adk.tool_calls_count and google_adk.tool_call_names

Multi-Agent Workflows

Google ADK supports complex agent topologies — sequential agents, parallel agents, and nested sub-agents. The integration captures the top-level Runner invocations regardless of agent complexity:
from google.adk.agents import LlmAgent, SequentialAgent, ParallelAgent

researcher = LlmAgent(name="researcher", model="gemini-2.5-flash",
    instruction="Research the topic.", tools=[search_tool])
writer = LlmAgent(name="writer", model="gemini-2.5-flash",
    instruction="Write a summary based on research.")

pipeline = SequentialAgent(name="research_pipeline", sub_agents=[researcher, writer])

runner = Runner(app_name="research_app", agent=pipeline, session_service=session_service)
# Runner invocations are automatically traced with input/output capture
The integration records one ai_generation event per Runner.run() invocation. ADK can mark one final response per participating agent, so for multi-agent invocations the event output is the last non-empty final agent response. Earlier agent responses are not merged into the displayed conversation output. Output-specific metadata such as model, author, agent name, branch, and finish reason comes from that selected response, while token and tool counts cover the full invocation.

Async Usage

The wrapper supports both sync and async runner usage:
import asyncio

async def main():
    user_msg = types.Content(
        parts=[types.Part(text="What's the weather?")],
        role="user",
    )
    async for event in runner.run_async(
        user_id="user123",
        session_id="session123",
        new_message=user_msg,
    ):
        if event.is_final_response():
            print(event.content.parts[0].text)

    rd.shutdown()

asyncio.run(main())

Finish Reason Tracking

The integration captures the finish_reason from model responses, indicating why generation stopped. Common values include:
ValueMeaning
STOPNormal completion
SAFETYBlocked by safety filters
MAX_TOKENSHit token limit
RECITATIONBlocked for recitation
This is available in event properties as google_adk.finish_reason.

Token Tracking

Token usage is accumulated across all model calls within a single Runner.run() invocation. The following fields are captured:
PropertyDescription
ai.usage.prompt_tokensTotal input tokens across all model calls
ai.usage.completion_tokensTotal output tokens across all model calls
ai.usage.total_tokensTotal tokens (prompt + completion)
ai.usage.cached_tokensCached content token count
ai.usage.thoughts_tokensThinking/reasoning token count

Captured Properties

Each event includes the following properties when available:
PropertyDescription
ai.usage.prompt_tokensInput token count
ai.usage.completion_tokensOutput token count
ai.usage.total_tokensTotal token count
ai.usage.cached_tokensCached content token count
ai.usage.thoughts_tokensThinking/reasoning token count
google_adk.app_nameRunner app name
google_adk.user_idUser ID from the runner call
google_adk.session_idSession ID from the runner call
google_adk.authorEvent author (agent name)
google_adk.agent_nameAgent name from the event
google_adk.tool_calls_countNumber of tool calls in the run
google_adk.tool_call_namesJSON list of tool names called
google_adk.finish_reasonWhy generation stopped (STOP, SAFETY, etc.)
google_adk.errorWhether a Python exception occurred
google_adk.error_messagePython exception message if applicable
google_adk.error_codeLLM-level error code (e.g. SAFETY, QUOTA)
google_adk.llm_error_messageLLM-level error message
google_adk.agent_branchAgent hierarchy path for multi-agent setups

Flushing and Shutdown

Always call shutdown() before your process exits to ensure all telemetry is shipped:
rd.flush()     # flush pending events without releasing resources
rd.shutdown()  # flush + release resources

Factory Function (backward compat)

The setup_google_adk() and create_raindrop_google_adk() factory functions return RaindropGoogleADK instances and accept the same parameters:
from raindrop_google_adk import setup_google_adk, create_raindrop_google_adk

# Auto-patch all Runner instances
rd = setup_google_adk(api_key="your-write-key")

# Or create without auto-patching
rd = create_raindrop_google_adk(api_key="your-write-key")
wrapped = rd.wrap(runner)
The RaindropGoogleADK class provides identify() and track_signal() methods directly, so you don’t need to import the core SDK separately.