Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.9dlabs.xyz/llms.txt

Use this file to discover all available pages before exploring further.

The MemoryClient from nined-agents is the low-level client underneath the Agent class. Use it directly when you’re building multi-actor systems where different agents need separate identities, when you want full control over locking and handoff sequences, or when you need to manage worker configurations and policy bundles programmatically.
Start with the Agent class for single-agent use cases. Switch to MemoryClient when you need per-actor identity, explicit lock management, or worker config APIs.

Installation

pip install nined-agents

Constructor

from nined.agents import MemoryClient

client = MemoryClient(
    world_id: str,
    workspace_id: str,
    actor_id: str,
    api_key: str = "",
    base_url: str = "https://api.9dlabs.xyz",
    profile: str = "autonomy",
    capabilities: list[str] = None,
    timeout: int = 30,
    retry: RetryConfig = None,
)
ParameterDescription
world_idTop-level namespace shared across workspaces.
workspace_idWorkspace scope. All operations are isolated to this workspace.
actor_idIdentity of this agent. Used for ownership, locking, and audit trails.
api_keyYour API key.
base_urlServer URL. Defaults to the hosted API. Use http://127.0.0.1:8082 for local development.
profileRuntime capability profile: "builder", "team", or "autonomy" (default).
capabilitiesOverride the server capability list. If omitted, uses the preset for profile (builder, team, or autonomy).
timeoutHTTP request timeout in seconds.
retryRetry configuration. Pass RetryConfig(max_retries=3) for automatic retries on transient failures.
from nined.agents import MemoryClient, RetryConfig

client = MemoryClient(
    world_id="prod",
    workspace_id="ops",
    actor_id="agent-a",
    api_key="your-key",
    timeout=15,
    retry=RetryConfig(max_retries=3, backoff_base=0.5),
)

Bind to a different actor

other_client = client.with_actor("agent-beta")
# Same world/workspace/settings, different actor_id

Memory

client.ingest(artifacts: list[dict]) -> IngestResponse
client.context_pack(query: str, max_tokens: int = 4096, profile: str = None) -> ContextPack
client.list_artifacts(limit: int = 50, offset: int = 0) -> ArtifactListPage
ContextPack.snippets — list of Snippet (content, score, artifact_id, span_id, artifact_type) ContextPack.pack_hash — determinism fingerprint ContextPack.token_accounting — budget, used, snippet count

Tasks

client.start_task(task_id, title="", metadata=None) -> TaskReceipt
client.get_task_state(task_id) -> TaskState

# High-level: lock → attempt → complete → release in one call
client.work_on_task(task_id, action_name, reason="", complete=True) -> TaskReceipt

# Low-level individual steps
client.attempt_action(task_id, action_name, reason="", metadata=None) -> TaskReceipt
client.complete_task(task_id, reason="", metadata=None) -> TaskReceipt
client.fail_task(task_id, reason="", metadata=None) -> TaskReceipt

client.delegate_task(task_id, to_actor_id, reason="") -> TaskReceipt
client.escalate_task(task_id, reason="") -> TaskReceipt
TaskState fields: task_status, allowed_actions, protocol_hint, lock_state, is_terminal, can(action)

Locks and handoffs

Use explicit lock management when you need to guarantee atomicity across multiple actions before releasing.
client.acquire_lock(task_id, ttl_seconds=300) -> TaskReceipt
# Raises ConflictError if already locked by another actor

client.release_lock(task_id) -> TaskReceipt
client.handoff(task_id, to_actor_id, reason="") -> TaskReceipt

Multi-agent handoff pattern

# Agent A: triage and lock
client_a = MemoryClient(world_id="prod", workspace_id="ops", actor_id="agent-a", api_key="key")
client_a.start_task("deploy-v2", title="Deploy to production")
client_a.acquire_lock("deploy-v2", ttl_seconds=120)
client_a.attempt_action("deploy-v2", "verify_staging", reason="Staging looks good")

# Pass to Agent B for final approval
client_a.handoff("deploy-v2", to_actor_id="agent-b", reason="Needs senior sign-off")

# Agent B: picks up the task, completes it
client_b = client_a.with_actor("agent-b")
state = client_b.get_task_state("deploy-v2")
if state.can("work_on_task"):
    client_b.work_on_task("deploy-v2", "approve_deploy")
The full sequence — lock, action, handoff, approval — is recorded in the task timeline and auditable via client.get_timeline("deploy-v2").

Connections

Manage external integrations (Gmail, Slack, Jira, GitHub, Google Sheets, etc.) programmatically.

Create a connection

conn = client.create_connection(
    provider: str,
    display_name: str = "",
    trust_mode: str = "safe",
    policy: dict = None,
    credentials: dict = None,
) -> Connection
ParameterDescription
providerIntegration type: "gmail", "slack", "jira", "github", "google_sheets", etc.
display_nameHuman-readable label. Defaults to the provider name.
trust_mode"safe" (read-only), "team" (read + scoped writes), or "autonomous" (full access).
policyOptional action allowlist, e.g. {"allowed_actions": ["read_email", "send_email"]}.
credentialsPre-existing credentials dict. Omit to use OAuth instead.

OAuth flow

result = client.start_oauth(
    provider: str,
    display_name: str = "",
    extra_scopes: list[str] = None,
) -> OAuthStartResult
# result.authorization_url — redirect the user here
# result.connection_id — tracks the pending connection

client.refresh_connection(connection_id: str) -> dict
# Refresh an expired OAuth token
result = client.start_oauth("gmail")
print(f"Authorize at: {result.authorization_url}")
# After the user authorizes, the callback stores credentials automatically

Manage connections

client.list_connections(provider="", status="", limit=100) -> ConnectionList
client.get_connection(connection_id) -> Connection
client.update_connection(connection_id, display_name=None, trust_mode=None, policy=None) -> Connection
client.delete_connection(connection_id) -> dict
client.revoke_connection(connection_id) -> Connection
client.set_connection_trust_mode(connection_id, trust_mode) -> Connection
client.test_connection(connection_id) -> dict
Connection fields: connection_id, provider, status, trust_mode, display_name, has_credentials

End-to-end example

# Create a Gmail connection via OAuth
result = client.start_oauth("gmail", display_name="Work Gmail")
print(f"Authorize: {result.authorization_url}")

# After authorization completes...
conn = client.get_connection(result.connection_id)
assert conn.status == "connected"

# Verify connectivity
check = client.test_connection(conn.connection_id)
print(check)  # {"ok": True, ...}

# Tighten permissions
client.set_connection_trust_mode(conn.connection_id, "safe")

Constraints

Update task constraints mid-flight (e.g. tighten policy, add context).
client.update_constraints(
    task_id: str,
    action_name: str = "constraint_update",
    decision_type: str = "approve",
    reason: str = "",
) -> TaskReceipt

Worker runtime

Run a continuous worker loop that claims and processes tasks.
client.claim_next_work(worker_type="generic") -> WorkerClaim
# WorkerClaim.claimed: bool
# WorkerClaim.task_id: str (if claimed)
# WorkerClaim.lease_id: str (heartbeat + release token)

client.worker_heartbeat(lease_id, detail="") -> dict
client.worker_complete(lease_id, detail="") -> dict
client.worker_release(lease_id, detail="") -> dict

Custom worker loop

def handle_task(task_data: dict, client: MemoryClient) -> dict:
    task_id = task_data["task_id"]
    state = task_data["state"]

    # Pull relevant context from memory
    pack = client.context_pack(f"How to handle: {state.protocol_hint}")
    context = "\n".join(s.content for s in pack.snippets)

    # Decide what to do
    if state.can("work_on_task"):
        client.work_on_task(task_id, "process", reason="Automated handler")
        return {"done": True}

    return {"release": True}

stats = client.run_worker_loop(
    worker_type="processor",
    on_task=handle_task,
    idle_sleep_seconds=2.0,
    max_claim_cycles=1000,
)
WorkerStats: claims, completed, released, idle

Policy

Define what each role can do across the workspace.
client.upsert_policy_bundle(
    bundle_version: str,
    role_capabilities: dict[str, list[str]] = None,
    constraints: dict = None,
) -> PolicyBundle

client.get_policy_bundle() -> PolicyBundle
# Example: separate read and write roles
client.upsert_policy_bundle(
    bundle_version="v1",
    role_capabilities={
        "reader": ["context_pack", "list_artifacts"],
        "writer": ["ingest", "context_pack", "feedback"],
        "admin": ["ingest", "context_pack", "feedback", "delete_workspace"],
    },
)
Policy violations surface as PolicyDeniedError with a reason_code and recommended_action.

Observability

client.get_timeline(task_id: str) -> Timeline
# timeline.events: list of TimelineEvent

client.list_managed_tasks(status="", limit=100) -> TaskList
client.activity_feed(limit=50, actor_id="", action="", outcome="") -> ActivityFeed
client.get_receipts(limit=50) -> ReceiptList
client.dashboard_stats() -> DashboardStats
DashboardStats fields: workers (active/total), tasks (completed/active/…), connections (connected/total)

Determinism check

result = client.replay(pack_hash: str) -> ReplayResult
# result.match: True if the pack can be reproduced byte-for-byte
Use replay to verify that a specific context pack is still reproducible — useful for auditing decisions made at a past point in time.

Action plans

Generate, review, and execute structured action plans from natural-language intent. Plans are validated against available adapter schemas and run through the policy gate.

Generate a plan

plan = client.create_plan(
    intent: str,
    trust_mode: str = "safe",
    task_id: str = "",
    model: str = "",
) -> dict
The server calls the LLM-backed plan generator, validates the output against your connected adapter capabilities, and returns a plan in proposed status ready for review.
plan = client.create_plan(
    "Send a summary of unread emails to the #updates Slack channel",
    trust_mode="team",
)
print(plan["plan_id"], plan["status"])  # "proposed"
for step in plan["steps"]:
    print(f"  {step['step_id']}: {step['description']} (risk: {step['risk_level']})")

Review and approve

client.get_plan(plan_id) -> dict
client.list_plans(status="", limit=100) -> dict

# Approve all pending steps
client.approve_plan(plan_id) -> dict

# Or approve specific steps only
client.approve_plan(plan_id, step_ids=["step_abc", "step_def"]) -> dict

# Edit a step before approving
client.edit_plan_step(
    plan_id,
    step_id,
    description=None,
    params=None,
    risk_level=None,
    step_status=None,
) -> dict

Execute

result = client.execute_plan(plan_id, halt_on_failure=True) -> dict
print(result["plan"]["status"])   # "completed" or "failed"
print(result["plan"]["summary"])  # "3 succeeded, 0 failed, ..."
Each step runs through the adapter policy gate and generates an audit trail entry.

Cancel

client.cancel_plan(plan_id) -> dict

Capability schema

Inspect what actions are available given the workspace’s active connections.
schema = client.capability_schema() -> dict
# Lists all adapter methods with parameters and risk levels

Worker configs

Manage persistent worker definitions — the agents that appear in the dashboard and can be connected to integrations.

Create and manage

worker = client.create_worker_config(
    name: str,
    role: str = "",
    model: str = "gpt-4o-mini",
    llm_provider: str = "auto",
    autonomy_level: str = "low",
    connection_ids: list[str] = None,
    approval_required: bool = True,
    metadata: dict = None,
) -> WorkerConfig
ParameterDescription
nameDisplay name for the worker.
roleFreeform role description (e.g. "Handle guest inquiries and FAQ").
modelLLM model to use.
autonomy_level"low" (approval required), "medium", or "high" (fully autonomous).
connection_idsPre-attach connections at creation time.
approval_requiredWhether actions need human approval before execution.
client.list_worker_configs(status="", limit=100) -> WorkerConfigList
client.get_worker_config(worker_id) -> WorkerConfig
client.update_worker_config(
    worker_id,
    name=None, role=None, model=None, llm_provider=None,
    status=None, autonomy_level=None, approval_required=None,
) -> WorkerConfig
client.delete_worker_config(worker_id) -> dict
WorkerConfig fields: worker_id, name, role, model, status, autonomy_level, approval_required, connection_ids

Attach and detach connections

Bind integrations to a worker so it can use them during task execution.
client.attach_worker_connection(worker_id, connection_id) -> WorkerConfig
client.detach_worker_connection(worker_id, connection_id) -> WorkerConfig
# Create a worker and wire up Gmail + Slack
worker = client.create_worker_config(
    "Support Agent",
    role="Handle support tickets via email, post updates to Slack",
    autonomy_level="medium",
)
client.attach_worker_connection(worker.worker_id, gmail_conn.connection_id)
client.attach_worker_connection(worker.worker_id, slack_conn.connection_id)

Adapters

Get a fully-wired, policy-gated adapter instance from a stored connection. The adapter enforces trust mode and audit logging automatically.
adapter = client.get_adapter(connection_id: str) -> Adapter
gmail = client.get_adapter("conn_abc123")
emails = gmail.fetch_unread(max_results=5)

sheets = client.get_adapter("conn_xyz789")
data = sheets.read_range("Sheet1!A1:C10")
Actions blocked by the connection’s trust mode or policy raise PolicyDeniedError. Every adapter call is recorded in the audit trail.

Adapter audit events

Emit custom audit events for adapter decisions (useful when building your own adapters).
client.emit_adapter_audit(
    connection_id: str = "",
    action_name: str,
    target: str = "",
    decision: str = "allowed",
    blocked_reason: str = "",
    task_id: str = "",
    metadata: dict = None,
) -> dict

AsyncMemoryClient

AsyncMemoryClient provides the exact same API as MemoryClient, but every method is async. It uses asyncio.to_thread internally — zero external dependencies.
import asyncio
from nined.agents import AsyncMemoryClient

async def main():
    async with AsyncMemoryClient(
        world_id="prod",
        workspace_id="ops",
        actor_id="async-agent",
        api_key="your-key",
    ) as client:
        pack = await client.context_pack("What happened yesterday?")
        for s in pack.snippets:
            print(s.content)

        state = await client.get_task_state("task-001")
        if state.can("work_on_task"):
            await client.work_on_task("task-001", "process")

asyncio.run(main())
Supports async with as a context manager and with_actor for multi-agent setups:
other = client.with_actor("agent-beta")
await other.start_task("task-002", title="Delegated work")
run_worker_loop is sync-only — it runs a blocking claim/heartbeat loop. Use the sync MemoryClient for worker loops, or call the individual claim_next_work / worker_heartbeat / worker_complete async methods to build your own async loop.

Error types

ExceptionWhen raised
MemoryAPIErrorBase — unexpected API errors
PolicyDeniedErrorAction blocked by workspace policy. Has reason_code + recommended_action.
ConflictErrorLock contention or stale handoff
AuthorizationErrorMissing or invalid credentials
RateLimitError429. Has retry_after (seconds).
from nined.agents import MemoryClient, ConflictError, PolicyDeniedError

try:
    client.acquire_lock("task-123")
except ConflictError:
    print("Task is locked by another agent — will retry")
except PolicyDeniedError as e:
    print(f"Action not allowed: {e.reason_code}{e.recommended_action}")