Documentation Index
Fetch the complete documentation index at: https://docs.9dlabs.xyz/llms.txt
Use this file to discover all available pages before exploring further.
The nined-agents package gives you a single Agent class that handles the entire lifecycle of an AI agent: storing and searching memory, managing tasks with a full audit trail, handing off to other agents, and running autonomous worker loops.
Installation
Zero mandatory dependencies — pure Python stdlib for the core client.
Quick start
from nined.agents import Agent
agent = Agent(
api_key="your-key",
workspace="my-project",
)
# Store something in memory
agent.memory.add("Runbook: restart the pods on OOM error, then page the on-call engineer.")
# Retrieve relevant context
context = agent.memory.search("How do I handle an OOM alert?")
for snippet in context.snippets:
print(f"[{snippet.score:.2f}] {snippet.content}")
# Start and work on a task
agent.start_task("inc-001", title="OOM alert in prod")
state = agent.get_task_state("inc-001")
if state.can("work_on_task"):
agent.work_on_task("inc-001", "triage_incident", reason="Following runbook")
Constructor
Agent(
api_key: str,
base_url: str = "https://api.9dlabs.xyz",
workspace: str = "default",
world: str = "default",
mode: str = "safe",
name: str = "", # Stable identity label. Auto-generated if omitted.
)
| Parameter | Description |
|---|
api_key | Your API key. |
base_url | Server URL. Defaults to the hosted API. Use http://127.0.0.1:8082 for local development. |
workspace | Workspace scope — isolates data, tasks, and connections. |
world | World scope — use one world per logical project or environment. |
mode | Trust mode. Controls what the agent is allowed to do (see below). |
name | Stable identity label. Becomes the actor_id in the runtime. Ownership, locks, handoffs, and audit trails all key on this. Auto-generated UUID if omitted. |
Modes
| Mode | Capabilities |
|---|
safe | Memory + task management |
team | Everything in safe + handoffs, locks, multi-agent coordination |
autonomous | Everything in team + worker loops, escalation, full governance contracts |
Memory
agent.memory.add(
content: str,
title: str = "",
artifact_type: str = "document",
) -> str # Returns artifact_id
agent.memory.add_many(items: list[dict]) -> list[str]
# Each item: {"content": "...", "title": "...", "type": "document"} # type defaults to "document"
agent.memory.search(
query: str,
max_tokens: int = 4096,
) -> ContextPack
# ContextPack.snippets: list of Snippet (content, score, artifact_id, span_id, artifact_type)
# ContextPack.token_accounting: budget, used, snippets
# ContextPack.pack_hash: determinism fingerprint
agent.memory.list_artifacts(limit: int = 50, offset: int = 0) -> ArtifactListPage
Connections
Connect external integrations (Gmail, Slack, Jira, etc.) via OAuth.
info = agent.connect(provider: str, open_browser: bool = True) -> ConnectionInfo
# Opens the browser for OAuth authorization and returns a ConnectionInfo
agent.connections() -> list[ConnectionInfo]
# Lists all integrations connected to the workspace
ConnectionInfo fields: connection_id, provider, status ("pending", "connected", "revoked"), trust_mode
# Connect Gmail, then verify
info = agent.connect("gmail")
# Browser opens → user authorizes → callback completes
for conn in agent.connections():
print(f"{conn.provider}: {conn.status}")
For programmatic connection management (create without browser, set trust modes, test connectivity), use the MemoryClient directly via agent.runtime.
Tasks
Start a task
agent.start_task(task_id: str, title: str = "", metadata: dict = None) -> TaskReceipt
Check what’s allowed next
state = agent.get_task_state(task_id: str) -> TaskState
state.allowed_actions # list of action names the agent may perform
state.protocol_hint # plain-English guidance
state.task_status # "running", "blocked", "succeeded", "failed", etc.
state.can("work_on_task") # quick boolean check
state.is_terminal # True when task is done (succeeded or failed)
Work on, complete, or fail
# High-level — acquires lock, performs action, releases lock atomically
agent.work_on_task(
task_id: str,
action: str,
reason: str = "",
complete: bool = True, # Set False for multi-step workflows
) -> TaskReceipt
agent.complete_task(task_id: str, reason: str = "") -> TaskReceipt
agent.fail_task(task_id: str, reason: str = "") -> TaskReceipt
Coordination (team and autonomous modes)
agent.delegate_task(task_id: str, to_actor_id: str, reason: str = "") -> TaskReceipt
agent.escalate_task(task_id: str, reason: str = "") -> TaskReceipt
Delegated tasks appear in the target actor’s queue. The full handoff chain is recorded in the task timeline.
Worker loop (autonomous mode)
Run the agent as a continuous worker that claims and processes tasks automatically.
from nined.agents import Agent, MemoryClient
agent = Agent(api_key="your-key", workspace="ops", mode="autonomous")
def handle_task(task_data: dict, client: MemoryClient) -> dict:
task_id = task_data["task_id"]
state = task_data["state"]
if not state.can("work_on_task"):
return {"release": True}
# Your logic here
client.work_on_task(task_id, "process", reason="Handled by worker")
return {"done": True}
stats = agent.run_worker_loop(
worker_type="ops-worker",
on_task=handle_task,
idle_sleep_seconds=2.0,
max_claim_cycles=500,
)
print(f"Completed: {stats.completed}, Released: {stats.released}")
When on_task returns {"done": True}, the worker completes the task and claims the next one. When it returns {"release": True}, it releases without completing — useful for tasks that need a different worker type.
Observability
agent.get_timeline(task_id: str) -> Timeline
# timeline.events: list of TimelineEvent (event_type, actor_id, action_name, timestamp, reason)
agent.tasks(status: str = "", limit: int = 50) -> TaskList
agent.activity(limit: int = 50, actor_id: str = "", action: str = "") -> ActivityFeed
agent.stats() -> DashboardStats # active workers, task counts
agent.receipts(limit: int = 50) -> ReceiptList # memory retrieval audit receipts
Full example — incident response agent
from nined.agents import Agent
agent = Agent(api_key="your-key", workspace="sre", mode="team")
# Load runbooks into memory once
agent.memory.add(
"Runbook P0: 1) page on-call via PagerDuty 2) check DB CPU 3) rollback if > 90% CPU",
title="P0 Runbook",
)
agent.memory.add(
"Escalation path: SRE lead → VP Eng → CTO. Max 15min between escalations.",
title="Escalation Policy",
)
# Handle an incoming alert
def handle_incident(alert: dict):
task_id = f"inc-{alert['id']}"
agent.start_task(task_id, title=alert["title"])
# Pull relevant runbook context
context = agent.memory.search(f"How to handle: {alert['title']}")
runbook = "\n".join(s.content for s in context.snippets)
# Triage
state = agent.get_task_state(task_id)
if state.can("work_on_task"):
agent.work_on_task(task_id, "triage", reason="Initial assessment complete")
# Hand off to senior engineer for resolution
if alert.get("severity") == "P0":
agent.delegate_task(task_id, to_actor_id="senior-sre", reason="P0 requires human")
else:
agent.work_on_task(task_id, "resolve", reason="Automated resolution via runbook")
handle_incident({"id": "8234", "title": "DB CPU spike > 90%", "severity": "P0"})
Every action — triage, delegation, resolution — is recorded in the task timeline and queryable via agent.get_timeline(task_id).