> ## Documentation Index
> Fetch the complete documentation index at: https://docs.openhands.dev/llms.txt
> Use this file to discover all available pages before exploring further.

# Local Agent Server

> Run agents through a local HTTP server with RemoteConversation for client-server architecture.

export const path_to_script_0 = "examples/02_remote_agent_server/01_convo_with_local_agent_server.py"

> A ready-to-run example is available [here](#ready-to-run-example)!

The Local Agent Server demonstrates how to run a remote agent server locally and connect to it using `RemoteConversation`. This pattern is useful for local development, testing, and scenarios where you want to separate the client code from the agent execution environment.

## Key Concepts

### Managed API Server

The ready-to-run example includes a `ManagedAPIServer` context manager that handles starting and stopping the server subprocess:

```python icon="python" focus={1, 2, 4, 5} theme={null}
class ManagedAPIServer:
    """Context manager for subprocess-managed OpenHands API server."""
    
    def __enter__(self):
        """Start the API server subprocess."""
        self.process = subprocess.Popen(
            [
                "python",
                "-m",
                "openhands.agent_server",
                "--port",
                str(self.port),
                "--host",
                self.host,
            ],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,
            env={"LOG_JSON": "true", **os.environ},
        )
```

The server starts with `python -m openhands.agent_server` and automatically handles health checks to ensure it's ready before proceeding.

### Remote Workspace

When connecting to a remote server, you need to provide a `Workspace` that connects to that server:

```python icon="python" theme={null}
workspace = Workspace(host=server.base_url)
result = workspace.execute_command("pwd")
```

When `host` is provided, the `Workspace` returns an instance of `RemoteWorkspace` ([source](https://github.com/OpenHands/software-agent-sdk/blob/main/openhands-sdk/openhands/sdk/workspace/workspace.py)).
The `Workspace` object communicates with the remote server's API to execute commands and manage files.

### RemoteConversation

When you pass a remote `Workspace` to `Conversation`, it automatically becomes a `RemoteConversation` ([source](https://github.com/OpenHands/software-agent-sdk/blob/main/openhands-sdk/openhands/sdk/conversation/conversation.py)):

```python icon="python" focus={1, 3, 7} theme={null}
conversation = Conversation(
    agent=agent,
    workspace=workspace,
    callbacks=[event_callback],
    visualize=True,
)
assert isinstance(conversation, RemoteConversation)
```

`RemoteConversation` handles communication with the remote agent server over WebSocket for real-time event streaming.

### Event Callbacks

Callbacks receive events in real-time as they happen on the remote server:

```python icon="python" theme={null}
def event_callback(event):
    """Callback to capture events for testing."""
    event_type = type(event).__name__
    logger.info(f"🔔 Callback received event: {event_type}\n{event}")
    received_events.append(event)
    event_tracker["last_event_time"] = time.time()
```

This enables monitoring agent activity, tracking progress, and implementing custom event handling logic.

### Conversation State

The conversation state provides access to all events and status:

```python icon="python" theme={null}
# Count total events using state.events
total_events = len(conversation.state.events)
logger.info(f"📈 Total events in conversation: {total_events}")

# Get recent events (last 5) using state.events
all_events = conversation.state.events
recent_events = all_events[-5:] if len(all_events) >= 5 else all_events
```

This allows you to inspect the conversation history, analyze agent behavior, and build custom monitoring tools.

## Ready-to-run Example

<Note>
  This example is available on GitHub: [examples/02\_remote\_agent\_server/01\_convo\_with\_local\_agent\_server.py](https://github.com/OpenHands/software-agent-sdk/blob/main/examples/02_remote_agent_server/01_convo_with_local_agent_server.py)
</Note>

This example shows how to programmatically start a local agent server and interact with it through a `RemoteConversation`:

```python icon="python" expandable examples/02_remote_agent_server/01_convo_with_local_agent_server.py theme={null}
import os
import subprocess
import sys
import tempfile
import threading
import time

from pydantic import SecretStr

from openhands.sdk import LLM, Conversation, RemoteConversation, Workspace, get_logger
from openhands.sdk.event import ConversationStateUpdateEvent
from openhands.tools.preset.default import get_default_agent


logger = get_logger(__name__)


def _stream_output(stream, prefix, target_stream):
    """Stream output from subprocess to target stream with prefix."""
    try:
        for line in iter(stream.readline, ""):
            if line:
                target_stream.write(f"[{prefix}] {line}")
                target_stream.flush()
    except Exception as e:
        print(f"Error streaming {prefix}: {e}", file=sys.stderr)
    finally:
        stream.close()


class ManagedAPIServer:
    """Context manager for subprocess-managed OpenHands API server."""

    def __init__(self, port: int = 8000, host: str = "127.0.0.1"):
        self.port: int = port
        self.host: str = host
        self.process: subprocess.Popen[str] | None = None
        self.base_url: str = f"http://{host}:{port}"
        self.stdout_thread: threading.Thread | None = None
        self.stderr_thread: threading.Thread | None = None

    def __enter__(self):
        """Start the API server subprocess."""
        print(f"Starting OpenHands API server on {self.base_url}...")

        # Start the server process
        self.process = subprocess.Popen(
            [
                "python",
                "-m",
                "openhands.agent_server",
                "--port",
                str(self.port),
                "--host",
                self.host,
            ],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,
            env={"LOG_JSON": "true", **os.environ},
        )

        # Start threads to stream stdout and stderr
        assert self.process is not None
        assert self.process.stdout is not None
        assert self.process.stderr is not None
        self.stdout_thread = threading.Thread(
            target=_stream_output,
            args=(self.process.stdout, "SERVER", sys.stdout),
            daemon=True,
        )
        self.stderr_thread = threading.Thread(
            target=_stream_output,
            args=(self.process.stderr, "SERVER", sys.stderr),
            daemon=True,
        )

        self.stdout_thread.start()
        self.stderr_thread.start()

        # Wait for server to be ready
        max_retries = 30
        for i in range(max_retries):
            try:
                import httpx

                response = httpx.get(f"{self.base_url}/health", timeout=1.0)
                if response.status_code == 200:
                    print(f"API server is ready at {self.base_url}")
                    return self
            except Exception:
                pass

            assert self.process is not None
            if self.process.poll() is not None:
                # Process has terminated
                raise RuntimeError(
                    "Server process terminated unexpectedly. "
                    "Check the server logs above for details."
                )

            time.sleep(1)

        raise RuntimeError(f"Server failed to start after {max_retries} seconds")

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Stop the API server subprocess."""
        if self.process:
            print("Stopping API server...")
            self.process.terminate()
            try:
                self.process.wait(timeout=5)
            except subprocess.TimeoutExpired:
                print("Force killing API server...")
                self.process.kill()
                self.process.wait()

            # Wait for streaming threads to finish (they're daemon threads,
            # so they'll stop automatically)
            # But give them a moment to flush any remaining output
            time.sleep(0.5)
            print("API server stopped.")


api_key = os.getenv("LLM_API_KEY")
assert api_key is not None, "LLM_API_KEY environment variable is not set."

llm = LLM(
    usage_id="agent",
    model=os.getenv("LLM_MODEL", "anthropic/claude-sonnet-4-5-20250929"),
    base_url=os.getenv("LLM_BASE_URL"),
    api_key=SecretStr(api_key),
)
title_gen_llm = LLM(
    usage_id="title-gen-llm",
    model=os.getenv("LLM_MODEL", "openhands/gpt-5-mini-2025-08-07"),
    base_url=os.getenv("LLM_BASE_URL"),
    api_key=SecretStr(api_key),
)

# Use managed API server
with ManagedAPIServer(port=8001) as server:
    # Create agent
    agent = get_default_agent(
        llm=llm,
        cli_mode=True,  # Disable browser tools for simplicity
    )

    # Define callbacks to test the WebSocket functionality
    received_events = []
    event_tracker = {"last_event_time": time.time()}

    def event_callback(event):
        """Callback to capture events for testing."""
        event_type = type(event).__name__
        logger.info(f"🔔 Callback received event: {event_type}\n{event}")
        received_events.append(event)
        event_tracker["last_event_time"] = time.time()

    # Create RemoteConversation with callbacks
    # NOTE: Workspace is required for RemoteConversation
    # Use a temp directory that exists and is accessible in CI environments
    temp_workspace_dir = tempfile.mkdtemp(prefix="agent_server_demo_")
    workspace = Workspace(host=server.base_url, working_dir=temp_workspace_dir)
    result = workspace.execute_command("pwd")
    logger.info(
        f"Command '{result.command}' completed with exit code {result.exit_code}"
    )
    logger.info(f"Output: {result.stdout}")

    conversation = Conversation(
        agent=agent,
        workspace=workspace,
        callbacks=[event_callback],
    )
    assert isinstance(conversation, RemoteConversation)

    try:
        logger.info(f"\n📋 Conversation ID: {conversation.state.id}")

        # Send first message and run
        logger.info("📝 Sending first message...")
        conversation.send_message(
            "Read the current repo and write 3 facts about the project into FACTS.txt."
        )

        # Generate title using a specific LLM
        title = conversation.generate_title(max_length=60, llm=title_gen_llm)
        logger.info(f"Generated conversation title: {title}")

        logger.info("🚀 Running conversation...")
        conversation.run()

        logger.info("✅ First task completed!")
        logger.info(f"Agent status: {conversation.state.execution_status}")

        # Wait for events to stop coming (no events for 2 seconds)
        logger.info("⏳ Waiting for events to stop...")
        while time.time() - event_tracker["last_event_time"] < 2.0:
            time.sleep(0.1)
        logger.info("✅ Events have stopped")

        logger.info("🚀 Running conversation again...")
        conversation.send_message("Great! Now delete that file.")
        conversation.run()
        logger.info("✅ Second task completed!")

        # Demonstrate state.events functionality
        logger.info("\n" + "=" * 50)
        logger.info("📊 Demonstrating State Events API")
        logger.info("=" * 50)

        # Count total events using state.events
        total_events = len(conversation.state.events)
        logger.info(f"📈 Total events in conversation: {total_events}")

        # Get recent events (last 5) using state.events
        logger.info("\n🔍 Getting last 5 events using state.events...")
        all_events = conversation.state.events
        recent_events = all_events[-5:] if len(all_events) >= 5 else all_events

        for i, event in enumerate(recent_events, 1):
            event_type = type(event).__name__
            timestamp = getattr(event, "timestamp", "Unknown")
            logger.info(f"  {i}. {event_type} at {timestamp}")

        # Let's see what the actual event types are
        logger.info("\n🔍 Event types found:")
        event_types = set()
        for event in recent_events:
            event_type = type(event).__name__
            event_types.add(event_type)
        for event_type in sorted(event_types):
            logger.info(f"  - {event_type}")

        # Print all ConversationStateUpdateEvent
        logger.info("\n🗂️  ConversationStateUpdateEvent events:")
        for event in conversation.state.events:
            if isinstance(event, ConversationStateUpdateEvent):
                logger.info(f"  - {event}")

        cost = conversation.conversation_stats.get_combined_metrics().accumulated_cost
        print(f"EXAMPLE_COST: {cost}")

    finally:
        # Clean up
        print("\n🧹 Cleaning up conversation...")
        conversation.close()
```

You can run the example code as-is.

<Note>
  The model name should follow the [LiteLLM convention](https://models.litellm.ai/): `provider/model_name` (e.g., `anthropic/claude-sonnet-4-5-20250929`, `openai/gpt-4o`).
  The `LLM_API_KEY` should be the API key for your chosen provider.
</Note>

<CodeGroup>
  <CodeBlock language="bash" filename="Bring-your-own provider key" icon="terminal" wrap>
    {`export LLM_API_KEY="your-api-key"\nexport LLM_MODEL="anthropic/claude-sonnet-4-5-20250929"  # or openai/gpt-4o, etc.\ncd software-agent-sdk\nuv run python ${path_to_script_0}`}
  </CodeBlock>

  <CodeBlock language="bash" filename="OpenHands Cloud" icon="terminal" wrap>
    {`# https://app.all-hands.dev/settings/api-keys\nexport LLM_API_KEY="your-openhands-api-key"\nexport LLM_MODEL="openhands/claude-sonnet-4-5-20250929"\ncd software-agent-sdk\nuv run python ${path_to_script_0}`}
  </CodeBlock>
</CodeGroup>

<Tip>
  **ChatGPT Plus/Pro subscribers**: You can use `LLM.subscription_login()` to authenticate with your ChatGPT account and access Codex models without consuming API credits. See the [LLM Subscriptions guide](/sdk/guides/llm-subscriptions) for details.
</Tip>

## Next Steps

* **[Docker Sandboxed Server](/sdk/guides/agent-server/docker-sandbox)** - Run server in Docker for isolation
* **[API Sandboxed Server](/sdk/guides/agent-server/api-sandbox)** - Connect to hosted API service
* **[Agent Server Overview](/sdk/guides/agent-server/overview)** - Architecture and implementation details
* **[Agent Server Package Architecture](/sdk/arch/agent-server)** - Remote execution architecture
