Skip to main content
The nined-agents package gives you a single Agent class that handles the entire lifecycle of an AI agent: storing and searching memory, managing tasks with a full audit trail, handing off to other agents, and running autonomous worker loops.

Installation

pip install nined-agents
Zero mandatory dependencies — pure Python stdlib for the core client.

Quick start

from nined.agents import Agent

agent = Agent(
    api_key="your-key",
    workspace="my-project",
)

# Store something in memory
agent.memory.add("Runbook: restart the pods on OOM error, then page the on-call engineer.")

# Retrieve relevant context
context = agent.memory.search("How do I handle an OOM alert?")
for snippet in context.snippets:
    print(f"[{snippet.score:.2f}] {snippet.content}")

# Start and work on a task
agent.start_task("inc-001", title="OOM alert in prod")
state = agent.get_task_state("inc-001")

if state.can("work_on_task"):
    agent.work_on_task("inc-001", "triage_incident", reason="Following runbook")

Constructor

Agent(
    api_key: str,
    workspace: str,
    world: str = "default",
    mode: str = "safe",
    name: str = "",           # Stable identity label. Auto-generated if omitted.
    base_url: str = "https://api.9dlabs.xyz",
)

Modes

ModeCapabilities
safeMemory + task management
teamEverything in safe + handoffs, locks, multi-agent coordination
autonomousEverything in team + worker loops, escalation, full governance contracts

Memory

agent.memory.add(
    content: str,
    title: str = "",
    artifact_type: str = "document",
) -> str  # Returns artifact_id
agent.memory.add_many(items: list[dict]) -> list[str]
# Each item: {"content": "...", "title": "...", "artifact_type": "..."}
agent.memory.search(
    query: str,
    max_tokens: int = 4096,
) -> ContextPack
# ContextPack.snippets: list of Snippet (content, score, artifact_id, span_id, artifact_type)
# ContextPack.token_accounting: budget, used, snippets
# ContextPack.pack_hash: determinism fingerprint
agent.memory.list_artifacts(limit: int = 50, offset: int = 0) -> ArtifactListPage

Tasks

Start a task

agent.start_task(task_id: str, title: str = "", metadata: dict = None) -> TaskReceipt

Check what’s allowed next

state = agent.get_task_state(task_id: str) -> TaskState

state.allowed_actions   # list of action names the agent may perform
state.protocol_hint     # plain-English guidance
state.task_status       # "running", "blocked", "succeeded", "failed", etc.
state.can("work_on_task")  # quick boolean check
state.is_terminal       # True when task is done (succeeded or failed)

Work on, complete, or fail

# High-level — acquires lock, performs action, releases lock atomically
agent.work_on_task(
    task_id: str,
    action: str,
    reason: str = "",
    complete: bool = True,  # Set False for multi-step workflows
) -> TaskReceipt
agent.complete_task(task_id: str, reason: str = "") -> TaskReceipt
agent.fail_task(task_id: str, reason: str = "") -> TaskReceipt

Coordination (team and autonomous modes)

agent.delegate_task(task_id: str, to_actor_id: str, reason: str = "") -> TaskReceipt
agent.escalate_task(task_id: str, reason: str = "") -> TaskReceipt
Delegated tasks appear in the target actor’s queue. The full handoff chain is recorded in the task timeline.

Worker loop (autonomous mode)

Run the agent as a continuous worker that claims and processes tasks automatically.
from nined.agents import Agent, MemoryClient

agent = Agent(api_key="your-key", workspace="ops", mode="autonomous")

def handle_task(task_data: dict, client: MemoryClient) -> dict:
    task_id = task_data["task_id"]
    state = task_data["state"]

    if not state.can("work_on_task"):
        return {"release": True}

    # Your logic here
    client.work_on_task(task_id, "process", reason="Handled by worker")
    return {"done": True}

stats = agent.run_worker_loop(
    worker_type="ops-worker",
    on_task=handle_task,
    idle_sleep_seconds=2.0,
    max_claim_cycles=500,
)
print(f"Completed: {stats.completed}, Released: {stats.released}")
When on_task returns {"done": True}, the worker completes the task and claims the next one. When it returns {"release": True}, it releases without completing — useful for tasks that need a different worker type.

Observability

agent.get_timeline(task_id: str) -> Timeline
# timeline.events: list of TimelineEvent (event_type, actor_id, action_name, timestamp, reason)

agent.tasks(status: str = "", limit: int = 50) -> TaskList
agent.activity(limit: int = 50, actor_id: str = "", action: str = "") -> ActivityFeed
agent.stats() -> DashboardStats   # active workers, task counts
agent.receipts(limit: int = 50) -> ReceiptList  # memory retrieval audit receipts

Full example — incident response agent

from nined.agents import Agent

agent = Agent(api_key="your-key", workspace="sre", mode="team")

# Load runbooks into memory once
agent.memory.add(
    "Runbook P0: 1) page on-call via PagerDuty 2) check DB CPU 3) rollback if > 90% CPU",
    title="P0 Runbook",
)
agent.memory.add(
    "Escalation path: SRE lead → VP Eng → CTO. Max 15min between escalations.",
    title="Escalation Policy",
)

# Handle an incoming alert
def handle_incident(alert: dict):
    task_id = f"inc-{alert['id']}"

    agent.start_task(task_id, title=alert["title"])

    # Pull relevant runbook context
    context = agent.memory.search(f"How to handle: {alert['title']}")
    runbook = "\n".join(s.content for s in context.snippets)

    # Triage
    state = agent.get_task_state(task_id)
    if state.can("work_on_task"):
        agent.work_on_task(task_id, "triage", reason="Initial assessment complete")

    # Hand off to senior engineer for resolution
    if alert.get("severity") == "P0":
        agent.delegate_task(task_id, to_actor_id="senior-sre", reason="P0 requires human")
    else:
        agent.work_on_task(task_id, "resolve", reason="Automated resolution via runbook")

handle_incident({"id": "8234", "title": "DB CPU spike > 90%", "severity": "P0"})
Every action — triage, delegation, resolution — is recorded in the task timeline and queryable via agent.get_timeline(task_id).