Documentation Index
Fetch the complete documentation index at: https://docs.9dlabs.xyz/llms.txt
Use this file to discover all available pages before exploring further.
The MemoryClient from nined-agents is the low-level client underneath the Agent class. Use it directly when you’re building multi-actor systems where different agents need separate identities, when you want full control over locking and handoff sequences, or when you need to manage worker configurations and policy bundles programmatically.
Start with the Agent class for single-agent use cases. Switch to MemoryClient when you need per-actor identity, explicit lock management, or worker config APIs.
Installation
Constructor
from nined.agents import MemoryClient
client = MemoryClient(
world_id: str,
workspace_id: str,
actor_id: str,
api_key: str = "",
base_url: str = "https://api.9dlabs.xyz",
profile: str = "autonomy",
capabilities: list[str] = None,
timeout: int = 30,
retry: RetryConfig = None,
)
| Parameter | Description |
|---|
world_id | Top-level namespace shared across workspaces. |
workspace_id | Workspace scope. All operations are isolated to this workspace. |
actor_id | Identity of this agent. Used for ownership, locking, and audit trails. |
api_key | Your API key. |
base_url | Server URL. Defaults to the hosted API. Use http://127.0.0.1:8082 for local development. |
profile | Runtime capability profile: "builder", "team", or "autonomy" (default). |
capabilities | Override the server capability list. If omitted, uses the preset for profile (builder, team, or autonomy). |
timeout | HTTP request timeout in seconds. |
retry | Retry configuration. Pass RetryConfig(max_retries=3) for automatic retries on transient failures. |
from nined.agents import MemoryClient, RetryConfig
client = MemoryClient(
world_id="prod",
workspace_id="ops",
actor_id="agent-a",
api_key="your-key",
timeout=15,
retry=RetryConfig(max_retries=3, backoff_base=0.5),
)
Bind to a different actor
other_client = client.with_actor("agent-beta")
# Same world/workspace/settings, different actor_id
Memory
client.ingest(artifacts: list[dict]) -> IngestResponse
client.context_pack(query: str, max_tokens: int = 4096, profile: str = None) -> ContextPack
client.list_artifacts(limit: int = 50, offset: int = 0) -> ArtifactListPage
ContextPack.snippets — list of Snippet (content, score, artifact_id, span_id, artifact_type)
ContextPack.pack_hash — determinism fingerprint
ContextPack.token_accounting — budget, used, snippet count
Tasks
client.start_task(task_id, title="", metadata=None) -> TaskReceipt
client.get_task_state(task_id) -> TaskState
# High-level: lock → attempt → complete → release in one call
client.work_on_task(task_id, action_name, reason="", complete=True) -> TaskReceipt
# Low-level individual steps
client.attempt_action(task_id, action_name, reason="", metadata=None) -> TaskReceipt
client.complete_task(task_id, reason="", metadata=None) -> TaskReceipt
client.fail_task(task_id, reason="", metadata=None) -> TaskReceipt
client.delegate_task(task_id, to_actor_id, reason="") -> TaskReceipt
client.escalate_task(task_id, reason="") -> TaskReceipt
TaskState fields: task_status, allowed_actions, protocol_hint, lock_state, is_terminal, can(action)
Locks and handoffs
Use explicit lock management when you need to guarantee atomicity across multiple actions before releasing.
client.acquire_lock(task_id, ttl_seconds=300) -> TaskReceipt
# Raises ConflictError if already locked by another actor
client.release_lock(task_id) -> TaskReceipt
client.handoff(task_id, to_actor_id, reason="") -> TaskReceipt
Multi-agent handoff pattern
# Agent A: triage and lock
client_a = MemoryClient(world_id="prod", workspace_id="ops", actor_id="agent-a", api_key="key")
client_a.start_task("deploy-v2", title="Deploy to production")
client_a.acquire_lock("deploy-v2", ttl_seconds=120)
client_a.attempt_action("deploy-v2", "verify_staging", reason="Staging looks good")
# Pass to Agent B for final approval
client_a.handoff("deploy-v2", to_actor_id="agent-b", reason="Needs senior sign-off")
# Agent B: picks up the task, completes it
client_b = client_a.with_actor("agent-b")
state = client_b.get_task_state("deploy-v2")
if state.can("work_on_task"):
client_b.work_on_task("deploy-v2", "approve_deploy")
The full sequence — lock, action, handoff, approval — is recorded in the task timeline and auditable via client.get_timeline("deploy-v2").
Connections
Manage external integrations (Gmail, Slack, Jira, GitHub, Google Sheets, etc.) programmatically.
Create a connection
conn = client.create_connection(
provider: str,
display_name: str = "",
trust_mode: str = "safe",
policy: dict = None,
credentials: dict = None,
) -> Connection
| Parameter | Description |
|---|
provider | Integration type: "gmail", "slack", "jira", "github", "google_sheets", etc. |
display_name | Human-readable label. Defaults to the provider name. |
trust_mode | "safe" (read-only), "team" (read + scoped writes), or "autonomous" (full access). |
policy | Optional action allowlist, e.g. {"allowed_actions": ["read_email", "send_email"]}. |
credentials | Pre-existing credentials dict. Omit to use OAuth instead. |
OAuth flow
result = client.start_oauth(
provider: str,
display_name: str = "",
extra_scopes: list[str] = None,
) -> OAuthStartResult
# result.authorization_url — redirect the user here
# result.connection_id — tracks the pending connection
client.refresh_connection(connection_id: str) -> dict
# Refresh an expired OAuth token
result = client.start_oauth("gmail")
print(f"Authorize at: {result.authorization_url}")
# After the user authorizes, the callback stores credentials automatically
Manage connections
client.list_connections(provider="", status="", limit=100) -> ConnectionList
client.get_connection(connection_id) -> Connection
client.update_connection(connection_id, display_name=None, trust_mode=None, policy=None) -> Connection
client.delete_connection(connection_id) -> dict
client.revoke_connection(connection_id) -> Connection
client.set_connection_trust_mode(connection_id, trust_mode) -> Connection
client.test_connection(connection_id) -> dict
Connection fields: connection_id, provider, status, trust_mode, display_name, has_credentials
End-to-end example
# Create a Gmail connection via OAuth
result = client.start_oauth("gmail", display_name="Work Gmail")
print(f"Authorize: {result.authorization_url}")
# After authorization completes...
conn = client.get_connection(result.connection_id)
assert conn.status == "connected"
# Verify connectivity
check = client.test_connection(conn.connection_id)
print(check) # {"ok": True, ...}
# Tighten permissions
client.set_connection_trust_mode(conn.connection_id, "safe")
Constraints
Update task constraints mid-flight (e.g. tighten policy, add context).
client.update_constraints(
task_id: str,
action_name: str = "constraint_update",
decision_type: str = "approve",
reason: str = "",
) -> TaskReceipt
Worker runtime
Run a continuous worker loop that claims and processes tasks.
client.claim_next_work(worker_type="generic") -> WorkerClaim
# WorkerClaim.claimed: bool
# WorkerClaim.task_id: str (if claimed)
# WorkerClaim.lease_id: str (heartbeat + release token)
client.worker_heartbeat(lease_id, detail="") -> dict
client.worker_complete(lease_id, detail="") -> dict
client.worker_release(lease_id, detail="") -> dict
Custom worker loop
def handle_task(task_data: dict, client: MemoryClient) -> dict:
task_id = task_data["task_id"]
state = task_data["state"]
# Pull relevant context from memory
pack = client.context_pack(f"How to handle: {state.protocol_hint}")
context = "\n".join(s.content for s in pack.snippets)
# Decide what to do
if state.can("work_on_task"):
client.work_on_task(task_id, "process", reason="Automated handler")
return {"done": True}
return {"release": True}
stats = client.run_worker_loop(
worker_type="processor",
on_task=handle_task,
idle_sleep_seconds=2.0,
max_claim_cycles=1000,
)
WorkerStats: claims, completed, released, idle
Policy
Define what each role can do across the workspace.
client.upsert_policy_bundle(
bundle_version: str,
role_capabilities: dict[str, list[str]] = None,
constraints: dict = None,
) -> PolicyBundle
client.get_policy_bundle() -> PolicyBundle
# Example: separate read and write roles
client.upsert_policy_bundle(
bundle_version="v1",
role_capabilities={
"reader": ["context_pack", "list_artifacts"],
"writer": ["ingest", "context_pack", "feedback"],
"admin": ["ingest", "context_pack", "feedback", "delete_workspace"],
},
)
Policy violations surface as PolicyDeniedError with a reason_code and recommended_action.
Observability
client.get_timeline(task_id: str) -> Timeline
# timeline.events: list of TimelineEvent
client.list_managed_tasks(status="", limit=100) -> TaskList
client.activity_feed(limit=50, actor_id="", action="", outcome="") -> ActivityFeed
client.get_receipts(limit=50) -> ReceiptList
client.dashboard_stats() -> DashboardStats
DashboardStats fields: workers (active/total), tasks (completed/active/…), connections (connected/total)
Determinism check
result = client.replay(pack_hash: str) -> ReplayResult
# result.match: True if the pack can be reproduced byte-for-byte
Use replay to verify that a specific context pack is still reproducible — useful for auditing decisions made at a past point in time.
Action plans
Generate, review, and execute structured action plans from natural-language intent. Plans are validated against available adapter schemas and run through the policy gate.
Generate a plan
plan = client.create_plan(
intent: str,
trust_mode: str = "safe",
task_id: str = "",
model: str = "",
) -> dict
The server calls the LLM-backed plan generator, validates the output against your connected adapter capabilities, and returns a plan in proposed status ready for review.
plan = client.create_plan(
"Send a summary of unread emails to the #updates Slack channel",
trust_mode="team",
)
print(plan["plan_id"], plan["status"]) # "proposed"
for step in plan["steps"]:
print(f" {step['step_id']}: {step['description']} (risk: {step['risk_level']})")
Review and approve
client.get_plan(plan_id) -> dict
client.list_plans(status="", limit=100) -> dict
# Approve all pending steps
client.approve_plan(plan_id) -> dict
# Or approve specific steps only
client.approve_plan(plan_id, step_ids=["step_abc", "step_def"]) -> dict
# Edit a step before approving
client.edit_plan_step(
plan_id,
step_id,
description=None,
params=None,
risk_level=None,
step_status=None,
) -> dict
Execute
result = client.execute_plan(plan_id, halt_on_failure=True) -> dict
print(result["plan"]["status"]) # "completed" or "failed"
print(result["plan"]["summary"]) # "3 succeeded, 0 failed, ..."
Each step runs through the adapter policy gate and generates an audit trail entry.
Cancel
client.cancel_plan(plan_id) -> dict
Capability schema
Inspect what actions are available given the workspace’s active connections.
schema = client.capability_schema() -> dict
# Lists all adapter methods with parameters and risk levels
Worker configs
Manage persistent worker definitions — the agents that appear in the dashboard and can be connected to integrations.
Create and manage
worker = client.create_worker_config(
name: str,
role: str = "",
model: str = "gpt-4o-mini",
llm_provider: str = "auto",
autonomy_level: str = "low",
connection_ids: list[str] = None,
approval_required: bool = True,
metadata: dict = None,
) -> WorkerConfig
| Parameter | Description |
|---|
name | Display name for the worker. |
role | Freeform role description (e.g. "Handle guest inquiries and FAQ"). |
model | LLM model to use. |
autonomy_level | "low" (approval required), "medium", or "high" (fully autonomous). |
connection_ids | Pre-attach connections at creation time. |
approval_required | Whether actions need human approval before execution. |
client.list_worker_configs(status="", limit=100) -> WorkerConfigList
client.get_worker_config(worker_id) -> WorkerConfig
client.update_worker_config(
worker_id,
name=None, role=None, model=None, llm_provider=None,
status=None, autonomy_level=None, approval_required=None,
) -> WorkerConfig
client.delete_worker_config(worker_id) -> dict
WorkerConfig fields: worker_id, name, role, model, status, autonomy_level, approval_required, connection_ids
Attach and detach connections
Bind integrations to a worker so it can use them during task execution.
client.attach_worker_connection(worker_id, connection_id) -> WorkerConfig
client.detach_worker_connection(worker_id, connection_id) -> WorkerConfig
# Create a worker and wire up Gmail + Slack
worker = client.create_worker_config(
"Support Agent",
role="Handle support tickets via email, post updates to Slack",
autonomy_level="medium",
)
client.attach_worker_connection(worker.worker_id, gmail_conn.connection_id)
client.attach_worker_connection(worker.worker_id, slack_conn.connection_id)
Adapters
Get a fully-wired, policy-gated adapter instance from a stored connection. The adapter enforces trust mode and audit logging automatically.
adapter = client.get_adapter(connection_id: str) -> Adapter
gmail = client.get_adapter("conn_abc123")
emails = gmail.fetch_unread(max_results=5)
sheets = client.get_adapter("conn_xyz789")
data = sheets.read_range("Sheet1!A1:C10")
Actions blocked by the connection’s trust mode or policy raise PolicyDeniedError. Every adapter call is recorded in the audit trail.
Adapter audit events
Emit custom audit events for adapter decisions (useful when building your own adapters).
client.emit_adapter_audit(
connection_id: str = "",
action_name: str,
target: str = "",
decision: str = "allowed",
blocked_reason: str = "",
task_id: str = "",
metadata: dict = None,
) -> dict
AsyncMemoryClient
AsyncMemoryClient provides the exact same API as MemoryClient, but every method is async. It uses asyncio.to_thread internally — zero external dependencies.
import asyncio
from nined.agents import AsyncMemoryClient
async def main():
async with AsyncMemoryClient(
world_id="prod",
workspace_id="ops",
actor_id="async-agent",
api_key="your-key",
) as client:
pack = await client.context_pack("What happened yesterday?")
for s in pack.snippets:
print(s.content)
state = await client.get_task_state("task-001")
if state.can("work_on_task"):
await client.work_on_task("task-001", "process")
asyncio.run(main())
Supports async with as a context manager and with_actor for multi-agent setups:
other = client.with_actor("agent-beta")
await other.start_task("task-002", title="Delegated work")
run_worker_loop is sync-only — it runs a blocking claim/heartbeat loop. Use the sync MemoryClient for worker loops, or call the individual claim_next_work / worker_heartbeat / worker_complete async methods to build your own async loop.
Error types
| Exception | When raised |
|---|
MemoryAPIError | Base — unexpected API errors |
PolicyDeniedError | Action blocked by workspace policy. Has reason_code + recommended_action. |
ConflictError | Lock contention or stale handoff |
AuthorizationError | Missing or invalid credentials |
RateLimitError | 429. Has retry_after (seconds). |
from nined.agents import MemoryClient, ConflictError, PolicyDeniedError
try:
client.acquire_lock("task-123")
except ConflictError:
print("Task is locked by another agent — will retry")
except PolicyDeniedError as e:
print(f"Action not allowed: {e.reason_code} — {e.recommended_action}")