Skip to main content
The MemoryClient from nined-agents is the low-level client underneath the Agent class. Use it directly when you’re building multi-actor systems where different agents need separate identities, when you want full control over locking and handoff sequences, or when you need to manage worker configurations and policy bundles programmatically.
Start with the Agent class for single-agent use cases. Switch to MemoryClient when you need per-actor identity, explicit lock management, or worker config APIs.

Installation

pip install nined-agents

Constructor

from nined.agents import MemoryClient

client = MemoryClient(
    world_id: str,
    workspace_id: str,
    actor_id: str,
    api_key: str = "",
    base_url: str = "https://api.9dlabs.xyz",
)
ParameterDescription
world_idTop-level namespace shared across workspaces.
workspace_idWorkspace scope. All operations are isolated to this workspace.
actor_idIdentity of this agent. Used for ownership, locking, and audit trails.
api_keyYour API key.
base_urlServer URL. Defaults to the hosted API.

Bind to a different actor

other_client = client.with_actor("agent-beta")
# Same world/workspace/settings, different actor_id

Memory

client.ingest(artifacts: list[dict]) -> IngestResponse
client.context_pack(query: str, max_tokens: int = 4096, profile: str = None) -> ContextPack
client.list_artifacts(limit: int = 50, offset: int = 0) -> ArtifactListPage
ContextPack.snippets — list of Snippet (content, score, artifact_id, span_id, artifact_type) ContextPack.pack_hash — determinism fingerprint ContextPack.token_accounting — budget, used, snippet count

Tasks

client.start_task(task_id, title="", metadata=None) -> TaskReceipt
client.get_task_state(task_id) -> TaskState

# High-level: lock → attempt → complete → release in one call
client.work_on_task(task_id, action_name, reason="", complete=True) -> TaskReceipt

# Low-level individual steps
client.attempt_action(task_id, action_name, reason="", metadata=None) -> TaskReceipt
client.complete_task(task_id, reason="", metadata=None) -> TaskReceipt
client.fail_task(task_id, reason="", metadata=None) -> TaskReceipt

client.delegate_task(task_id, to_actor_id, reason="") -> TaskReceipt
client.escalate_task(task_id, reason="") -> TaskReceipt
TaskState fields: task_status, allowed_actions, protocol_hint, lock_state, is_terminal, can(action)

Locks and handoffs

Use explicit lock management when you need to guarantee atomicity across multiple actions before releasing.
client.acquire_lock(task_id, ttl_seconds=300) -> TaskReceipt
# Raises ConflictError if already locked by another actor

client.release_lock(task_id) -> TaskReceipt
client.handoff(task_id, to_actor_id, reason="") -> TaskReceipt

Multi-agent handoff pattern

# Agent A: triage and lock
client_a = MemoryClient(world_id="prod", workspace_id="ops", actor_id="agent-a", api_key="key")
client_a.start_task("deploy-v2", title="Deploy to production")
client_a.acquire_lock("deploy-v2", ttl_seconds=120)
client_a.attempt_action("deploy-v2", "verify_staging", reason="Staging looks good")

# Pass to Agent B for final approval
client_a.handoff("deploy-v2", to_actor_id="agent-b", reason="Needs senior sign-off")

# Agent B: picks up the task, completes it
client_b = client_a.with_actor("agent-b")
state = client_b.get_task_state("deploy-v2")
if state.can("work_on_task"):
    client_b.work_on_task("deploy-v2", "approve_deploy")
The full sequence — lock, action, handoff, approval — is recorded in the task timeline and auditable via client.get_timeline("deploy-v2").

Worker runtime

Run a continuous worker loop that claims and processes tasks.
client.claim_next_work(worker_type="generic") -> WorkerClaim
# WorkerClaim.claimed: bool
# WorkerClaim.task_id: str (if claimed)
# WorkerClaim.lease_id: str (heartbeat + release token)

client.worker_heartbeat(lease_id, detail="") -> dict
client.worker_complete(lease_id, detail="") -> dict
client.worker_release(lease_id, detail="") -> dict

Custom worker loop

def handle_task(task_data: dict, client: MemoryClient) -> dict:
    task_id = task_data["task_id"]
    state = task_data["state"]

    # Pull relevant context from memory
    pack = client.context_pack(f"How to handle: {state.protocol_hint}")
    context = "\n".join(s.content for s in pack.snippets)

    # Decide what to do
    if state.can("work_on_task"):
        client.work_on_task(task_id, "process", reason="Automated handler")
        return {"done": True}

    return {"release": True}

stats = client.run_worker_loop(
    worker_type="processor",
    on_task=handle_task,
    idle_sleep_seconds=2.0,
    max_claim_cycles=1000,
)
WorkerStats: claims, completed, released, idle

Policy

Define what each role can do across the workspace.
client.upsert_policy_bundle(
    bundle_version: str,
    role_capabilities: dict[str, list[str]] = None,
    constraints: dict = None,
) -> PolicyBundle

client.get_policy_bundle() -> PolicyBundle
# Example: separate read and write roles
client.upsert_policy_bundle(
    bundle_version="v1",
    role_capabilities={
        "reader": ["context_pack", "list_artifacts"],
        "writer": ["ingest", "context_pack", "feedback"],
        "admin": ["ingest", "context_pack", "feedback", "delete_workspace"],
    },
)
Policy violations surface as PolicyDeniedError with a reason_code and recommended_action.

Observability

client.get_timeline(task_id: str) -> Timeline
# timeline.events: list of TimelineEvent

client.list_managed_tasks(status="", limit=100) -> TaskList
client.activity_feed(limit=50, actor_id="", action="", outcome="") -> ActivityFeed
client.get_receipts(limit=50) -> ReceiptList
client.dashboard_stats() -> DashboardStats
DashboardStats fields: workers (active/total), tasks (completed/active/…), connections (connected/total)

Determinism check

result = client.replay(pack_hash: str) -> ReplayResult
# result.match: True if the pack can be reproduced byte-for-byte
Use replay to verify that a specific context pack is still reproducible — useful for auditing decisions made at a past point in time.

Error types

ExceptionWhen raised
MemoryAPIErrorBase — unexpected API errors
PolicyDeniedErrorAction blocked by workspace policy. Has reason_code + recommended_action.
ConflictErrorLock contention or stale handoff
AuthorizationErrorMissing or invalid credentials
RateLimitError429. Has retry_after (seconds).
from nined.agents import MemoryClient, ConflictError, PolicyDeniedError

try:
    client.acquire_lock("task-123")
except ConflictError:
    print("Task is locked by another agent — will retry")
except PolicyDeniedError as e:
    print(f"Action not allowed: {e.reason_code}{e.recommended_action}")