FastAPI Backend — Implementation Draft

Infrastructure,
not Intelligence.

The backend runs the network. It does not think for it.

This document is an implementation draft of the AgentOS backend, containing the full code skeleton for all modules. Design principle: each module does exactly one thing. The decision-making logic is left to the Coordinator, The infrastructure logic is left to Backend. The boundary between the two cannot be blurred.

Python 3.12 FastAPI uvicorn PyGithub SSE Cloudflare Tunnel asyncio pydantic v2
00 /

Architecture Overview

Boundary principle:Backend is the tool and Coordinator is the brain. Backend does not judge whether the task is reasonable, does not decide whether escalation is needed, and does not modify the content of the AMP message. It only does: read and write SSOT, broadcast status changes, receive external webhooks, and expose operation interfaces to Admin.
module Responsibilities do what What not to do
core/ssot.py SSOT read and write layer The only entry point for reading and writing all GitHub repos. Other modules do not call PyGithub directly. Parse AMP semantics
core/amp.py message authentication layer AMP/1.0 schema validation. Build a standard envelope. Reject non-compliant messages. Decide who to send to
core/heartbeat.py heartbeat monitoring The background asyncio task checks the last SSOT write time of each Agent every 60 seconds. Timeout write escalation. Determine the cause
api/webhook.py Webhook receive Receive GitHub/XCode Cloud pushes, verify signatures, update SystemState cache, trigger SSE broadcasts. Modify CI results
api/events.py SSE broadcast Maintain Dashboard client connection pool and push real-time status changes. Filter event content
api/actions.py Admin operation interface approve/reject/override. Validate Admin JWT, write back SSOT, trigger SSE. Bypass JWT validation
main.py Start the portal Register all routers, start the heartbeat background task, and initialize SystemState. Contains business logic
01 /

Project Structure

SHELL Directory Layout
agentos-backend/
├── main.py                    # FastAPI app entrance
├── .env                       # Environment variables (do not submit git)
├── requirements.txt
│
├── core/                      # Pure logic, no HTTP
│   ├── ssot.py               # GitHub SSOT read and write
│   ├── amp.py                # AMP/1.0 schema validation
│   ├── heartbeat.py          # Heartbeat monitoring asyncio task
│   └── state.py              # SystemState memory cache
│
├── api/                       # HTTP layer
│   ├── webhook.py            # POST /webhooks/*
│   ├── events.py             # GET  /api/events (SSE)
│   └── actions.py            # POST /api/tasks/{id}/approve|reject
│
└── models/                    # Pydantic schemas
    ├── amp_models.py         # AMP message pydantic models
    └── task_models.py        # Task / Agent state models
ENV .env — complete configuration listing
# GitHub SSOT
GITHUB_TOKEN=ghp_xxxxxxxxxxxxxxxxxxxx
GITHUB_SSOT_REPO=kircerta/agentos-ssot      # owner/repo
GITHUB_SSOT_BRANCH=main
GITHUB_WEBHOOK_SECRET=your_webhook_secret

# Telegram
TELEGRAM_BOT_TOKEN=1234567890:AAxxxxxxxxxx
TELEGRAM_ADMIN_CHAT_ID=987654321

# Auth
JWT_SECRET=your_long_random_secret_min_32_chars
JWT_ALGORITHM=HS256

# Runtime
ENVIRONMENT=development                      # development | production
HEARTBEAT_INTERVAL_SEC=60
HEARTBEAT_TIMEOUT_SEC=1800                   # 30min
ACK_TIMEOUT_DISPATCH_SEC=300
ACK_TIMEOUT_REVIEW_SEC=600
TXT requirements.txt
fastapi>=0.115.0
uvicorn[standard]>=0.30.0
PyGithub>=2.3.0
pydantic>=2.7.0
pydantic-settings>=2.3.0
python-jose[cryptography]>=3.3.0
python-dotenv>=1.0.0
httpx>=0.27.0
sse-starlette>=2.1.0
02 /

core/state.py + core/ssot.py

Core
Design decisions:SystemStateIt is an in-memory live image, synchronized from SSOT. All read operations read SystemState first (fast), and write operations must first write SSOT (durable), and then update SystemState (eventually consistent). This order cannot be reversed - writing to memory first and then SSOT will lose state on a crash.
PY SystemState — memory cache core/state.py
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, Optional, Any
import asyncio


@dataclass
class AgentHealth:
    name:           str
    last_heartbeat: Optional[datetime] = None
    status:         str = "unknown"  # ok | monitor | timeout | offline


@dataclass
class TaskSnapshot:
    task_id:      str
    status:       str
    risk_level:   str
    reject_count: int = 0
    branch:       str = ""
    last_msg_id:  Optional[str] = None
    ci_status:    Dict[str, str] = field(default_factory=dict)
    updated_at:   Optional[datetime] = None


class SystemState:
    """
    Singleton. Single source of in-memory truth.
    All reads go here. All writes go to SSOT first, then update here.
    """
    _instance: Optional[SystemState] = None

    def __init__(self):
        self.tasks:   Dict[str, TaskSnapshot]  = {}
        self.agents:  Dict[str, AgentHealth]   = {
            "coordinator": AgentHealth("coordinator"),
            "executor":    AgentHealth("executor"),
            "reviewer":    AgentHealth("reviewer"),
        }
        self._lock = asyncio.Lock()

    @classmethod
    def get(cls) -> "SystemState":
        if cls._instance is None:
            cls._instance = cls()
        return cls._instance

    async def update_task(self, snapshot: TaskSnapshot) -> None:
        async with self._lock:
            self.tasks[snapshot.task_id] = snapshot

    async def update_agent_heartbeat(self, agent: str, ts: datetime) -> None:
        async with self._lock:
            if agent in self.agents:
                self.agents[agent].last_heartbeat = ts
                self.agents[agent].status = "ok"

    def to_dict(self) -> Dict[str, Any]:
        # Snapshot for SSE broadcast or /api/tasks response
        return {
            "tasks":  {k: vars(v) for k, v in self.tasks.items()},
            "agents": {k: vars(v) for k, v in self.agents.items()},
        }
PY SSOT read and write layer core/ssot.py
from __future__ import annotations
import json, base64
from datetime import datetime, timezone
from typing import Optional, Any
from github import Github, GithubException
from pydantic_settings import BaseSettings
from .state import SystemState, TaskSnapshot


class Settings(BaseSettings):
    github_token:        str
    github_ssot_repo:    str
    github_ssot_branch:  str = "main"
    class Config:
        env_file = ".env"


_settings = Settings()
_gh       = Github(_settings.github_token)
_repo     = _gh.get_repo(_settings.github_ssot_repo)

TASK_PATH = "tasks/{task_id}.json"
MSG_PATH  = "messages/{msg_id}.json"
LOG_PATH  = "audit/log.jsonl"


def read_task(task_id: str) -> Optional[dict]:
    """Read task JSON from SSOT. Returns None if not found."""
    try:
        f = _repo.get_contents(
            TASK_PATH.format(task_id=task_id),
            ref=_settings.github_ssot_branch
        )
        return json.loads(f.decoded_content)
    except GithubException as e:
        if e.status == 404: return None
        raise


def write_task(task_id: str, data: dict, commit_msg: str) -> None:
    """
    Write task JSON to SSOT.
    Create if new, update if exists (requires sha).
    After write succeeds, update SystemState.
    """
    path    = TASK_PATH.format(task_id=task_id)
    content = json.dumps(data, ensure_ascii=False, indent=2)
    try:
        existing = _repo.get_contents(path, ref=_settings.github_ssot_branch)
        _repo.update_file(path, commit_msg, content, existing.sha,
                          branch=_settings.github_ssot_branch)
    except GithubException as e:
        if e.status == 404:
            _repo.create_file(path, commit_msg, content,
                              branch=_settings.github_ssot_branch)
        else: raise

    # Update in-memory cache (fire and forget, non-blocking intent)
    snap = TaskSnapshot(
        task_id=task_id,
        status=data.get("status", "unknown"),
        risk_level=data.get("risk_level", "medium"),
        reject_count=data.get("reject_count", 0),
        branch=data.get("branch", ""),
        updated_at=datetime.now(timezone.utc),
    )
    # Sync call here is acceptable — write_task is only called from sync contexts
    import asyncio
    try:
        loop = asyncio.get_event_loop()
        loop.run_until_complete(SystemState.get().update_task(snap))
    except RuntimeError:
        pass  # no running loop (e.g. startup), will be synced on next read


def write_amp_message(msg: dict) -> None:
    """Persist a validated AMP message to SSOT messages/."""
    msg_id  = msg["msg_id"]
    path    = MSG_PATH.format(msg_id=msg_id)
    content = json.dumps(msg, ensure_ascii=False, indent=2)
    _repo.create_file(path, f"AMP message: {msg_id}", content,
                      branch=_settings.github_ssot_branch)


def read_amp_message(msg_id: str) -> Optional[dict]:
    try:
        f = _repo.get_contents(MSG_PATH.format(msg_id=msg_id),
                               ref=_settings.github_ssot_branch)
        return json.loads(f.decoded_content)
    except GithubException as e:
        if e.status == 404: return None
        raise


def append_audit_log(entry: dict) -> None:
    """Append one line to audit/log.jsonl (create if not exists)."""
    path    = LOG_PATH
    line    = json.dumps(entry, ensure_ascii=False) + "\n"
    try:
        existing = _repo.get_contents(path, ref=_settings.github_ssot_branch)
        old      = existing.decoded_content.decode()
        _repo.update_file(path, "audit log", old + line, existing.sha,
                          branch=_settings.github_ssot_branch)
    except GithubException as e:
        if e.status == 404:
            _repo.create_file(path, "init audit log", line,
                              branch=_settings.github_ssot_branch)
        else: raise


def get_all_tasks() -> list[dict]:
    """List all task JSON files from SSOT tasks/ directory."""
    try:
        contents = _repo.get_contents("tasks", ref=_settings.github_ssot_branch)
        results  = []
        for f in contents:
            if f.name.endswith(".json"):
                results.append(json.loads(f.decoded_content))
        return results
    except GithubException:
        return []
03 /

core/amp.py

Schema Validator
역할:AMP/1.0 의 유일한 게이트키퍼. 모든 들어오는 메세지와 나가는 메세지는 이 모듈을 통과해야 한다. 유효하지 않은 메세지는 거부되고 audit log에 기록된다. (Note: The only gatekeeper for AMP. All incoming and outgoing messages must pass through this module. Non-compliant messages are rejected and written to the audit log.)
PY AMP Schema validation + Envelope build core/amp.py
from __future__ import annotations
import time
from datetime import datetime, timezone
from typing import Literal, Optional, Any
from pydantic import BaseModel, field_validator, model_validator


# ── Enums ──────────────────────────────────────────────

MsgType   = Literal["task_dispatch", "task_result", "review_request",
                     "review_verdict", "escalation"]
AgentName = Literal["admin", "coordinator", "executor", "reviewer", "system"]
RiskLevel = Literal["low", "medium", "high"]

FORBIDDEN_BRANCHES = {"main", "master"}

ACK_TIMEOUTS: dict[str, int] = {
    "task_dispatch":   300,
    "task_result":     120,
    "review_request":  600,
    "review_verdict":  60,
    "escalation":      0,   # No ACK required
}


# ── Base Envelope ──────────────────────────────────────

class AMPEnvelope(BaseModel):
    msg_id:           str
    protocol_version: str   = "AMP/1.0"
    type:             MsgType
    from_:            AgentName = Field(alias="from")
    to:               AgentName
    task_id:          str
    timestamp:        str
    requires_ack:     bool
    ack_timeout_sec:  int   = 0
    context_ref:      list[str] = []
    payload:          dict[str, Any]

    @field_validator("protocol_version")
    @classmethod
    def check_version(cls, v: str) -> str:
        if v != "AMP/1.0":
            raise ValueError(f"Unsupported protocol version: {v}")
        return v

    @field_validator("msg_id")
    @classmethod
    def check_msg_id_format(cls, v: str) -> str:
        parts = v.split("-")
        if len(parts) < 3:
            raise ValueError(f"Invalid msg_id format: {v}")
        return v

    @field_validator("timestamp")
    @classmethod
    def check_timestamp(cls, v: str) -> str:
        try:
            datetime.fromisoformat(v)
        except ValueError:
            raise ValueError(f"Invalid ISO 8601 timestamp: {v}")
        return v

    @model_validator(mode="after")
    def validate_dispatch_payload(self):
        # Branch safety gate on task_dispatch
        if self.type == "task_dispatch":
            branch = self.payload.get("branch", "")
            if branch.lower() in FORBIDDEN_BRANCHES:
                raise ValueError(
                    f"BRANCH VIOLATION: task_dispatch targets forbidden branch '{branch}'"
                )
            criteria = self.payload.get("acceptance_criteria", [])
            if not criteria:
                raise ValueError("acceptance_criteria must be non-empty for task_dispatch")
        return self

    model_config = {"populate_by_name": True}


# ── Factory helpers ────────────────────────────────────

def make_msg_id(msg_type: str, task_id: str) -> str:
    ms = int(time.time() * 1000)
    return f"{msg_type}-{task_id}-{ms}"


def make_timestamp() -> str:
    return datetime.now(timezone.utc).isoformat()


def validate_inbound(raw: dict) -> AMPEnvelope:
    """
    Validate an inbound AMP message (from any agent writing to SSOT).
    Raises ValidationError on failure — caller must handle.
    """
    return AMPEnvelope.model_validate(raw)


def build_escalation(
    task_id:       str,
    esc_type:      str,
    severity:      str,
    triggered_by:  str,
    description:   str,
    affected_msgs: list[str],
    snapshot:      dict,
    suggested:     list[str] = [],
    auto_suspended:bool = True,
) -> dict:
    """Build a valid escalation AMP envelope. Ready to write to SSOT."""
    msg_id = make_msg_id("escalation", task_id)
    return {
        "msg_id":           msg_id,
        "protocol_version": "AMP/1.0",
        "type":             "escalation",
        "from":             triggered_by,
        "to":               "admin",
        "task_id":          task_id,
        "timestamp":        make_timestamp(),
        "requires_ack":     False,
        "ack_timeout_sec":  0,
        "context_ref":      affected_msgs,
        "payload": {
            "escalation_type":       esc_type,
            "severity":              severity,
            "triggered_by":          triggered_by,
            "description":           description,
            "affected_msgs":         affected_msgs,
            "system_state_snapshot": snapshot,
            "suggested_actions":     suggested,
            "auto_suspended":        auto_suspended,
        }
    }
04 /

core/heartbeat.py

Background Task
PY Heartbeat Monitor — asyncio background loop core/heartbeat.py
from __future__ import annotations
import asyncio, logging
from datetime import datetime, timezone, timedelta
from .state import SystemState
from .amp   import build_escalation
from .ssot  import write_amp_message, append_audit_log
from pydantic_settings import BaseSettings

log = logging.getLogger("heartbeat")


class HBSettings(BaseSettings):
    heartbeat_interval_sec: int = 60
    heartbeat_timeout_sec:  int = 1800  # 30 min
    class Config: env_file = ".env"

_cfg = HBSettings()


async def heartbeat_loop() -> None:
    """
    Runs forever as asyncio background task.
    Every interval: check each agent's last_heartbeat in SystemState.
    If elapsed > timeout: write escalation to SSOT, broadcast via SSE.
    """
    while True:
        await asyncio.sleep(_cfg.heartbeat_interval_sec)
        await _check_agents()


async def _check_agents() -> None:
    state   = SystemState.get()
    now     = datetime.now(timezone.utc)
    timeout = timedelta(seconds=_cfg.heartbeat_timeout_sec)

    async with state._lock:
        for name, agent in state.agents.items():
            if name == "admin":
                continue  # Admin is human, not monitored

            if agent.last_heartbeat is None:
                agent.status = "unknown"
                continue

            elapsed = now - agent.last_heartbeat

            if elapsed > timeout and agent.status != "timeout":
                agent.status = "timeout"
                await _fire_timeout_escalation(name, agent.last_heartbeat, elapsed)

            elif elapsed > timeout * 0.6:
                agent.status = "monitor"  # Warning zone: >18min
            else:
                agent.status = "ok"


async def _fire_timeout_escalation(
    agent_name:     str,
    last_heartbeat: datetime,
    elapsed:        timedelta,
) -> None:
    log.warning(f"Heartbeat timeout: {agent_name}, elapsed {elapsed}")

    # Find a task_id to associate — use the most recently active task
    state     = SystemState.get()
    tasks     = list(state.tasks.values())
    active    = [t for t in tasks if t.status not in ("approved", "locked")]
    task_id   = active[0].task_id if active else "SYSTEM"

    escalation = build_escalation(
        task_id=task_id,
        esc_type="heartbeat_timeout",
        severity="critical",
        triggered_by="system",
        description=(
            f"{agent_name.upper()} has not written to SSOT for "
            f"{int(elapsed.total_seconds() / 60)} minutes. "
            f"Last heartbeat: {last_heartbeat.isoformat()}. "
            f"System is suspending dependent operations."
        ),
        affected_msgs=[],
        snapshot={
            "agent":          agent_name,
            "last_heartbeat": last_heartbeat.isoformat(),
            "elapsed_sec":    int(elapsed.total_seconds()),
        },
        suggested=[
            f"Manually restart {agent_name} CLI",
            "Check Mac Studio process list",
            "Verify API credentials have not expired",
        ]
    )

    write_amp_message(escalation)
    append_audit_log({
        "timestamp": escalation["timestamp"],
        "actor":     "system",
        "action":    "heartbeat_timeout",
        "detail":    f"{agent_name} timed out",
        "msg_id":    escalation["msg_id"],
    })

    # Broadcast to Dashboard via SSE (imported lazily to avoid circular)
    from api.events import broadcast
    await broadcast({"type": "escalation", "payload": escalation})
05 /

api/webhook.py

Inbound
Signature verification cannot be skipped.A webhook endpoint without signature verification is equivalent to opening the permission to write SSOT to the public network. Cloudflare Tunnel provides in-transit encryption but does not authenticate message origin. Signature verification is the only mechanism to confirm that a message actually came from GitHub.
PY GitHub + XCode Cloud Webhook Receive api/webhook.py
from __future__ import annotations
import hashlib, hmac, logging
from datetime import datetime, timezone
from typing import Annotated
from fastapi import APIRouter, Request, Header, HTTPException
from pydantic_settings import BaseSettings
from core.state import SystemState
from core.ssot  import read_task, write_task, append_audit_log
from api.events import broadcast

log    = logging.getLogger("webhook")
router = APIRouter(prefix="/webhooks")


class WHSettings(BaseSettings):
    github_webhook_secret: str
    class Config: env_file = ".env"

_cfg = WHSettings()


def _verify_github_sig(body: bytes, sig_header: str) -> bool:
    if not sig_header.startswith("sha256="): return False
    expected = hmac.new(
        _cfg.github_webhook_secret.encode(),
        body, hashlib.sha256
    ).hexdigest()
    return hmac.compare_digest(f"sha256={expected}", sig_header)


@router.post("/github")
async def github_webhook(
    request:   Request,
    x_hub_sig: Annotated[str | None, Header(alias="x-hub-signature-256")] = None,
):
    body = await request.body()
    if not x_hub_sig or not _verify_github_sig(body, x_hub_sig):
        raise HTTPException(401, "Invalid webhook signature")

    event   = request.headers.get("x-github-event", "unknown")
    payload = await request.json()

    if event == "push":
        await _handle_push(payload)
    elif event == "pull_request":
        await _handle_pr(payload)
    elif event == "check_run":
        await _handle_ci(payload, source="github")

    return {"ok": True}


@router.post("/xcode")
async def xcode_webhook(request: Request):
    # XCode Cloud uses its own auth header — extend here when integrating
    payload = await request.json()
    await _handle_ci(payload, source="xcode_cloud")
    return {"ok": True}


async def _handle_push(payload: dict) -> None:
    # A push to SSOT repo may contain new AMP messages from agents
    # Parse commits for task JSON updates, update SystemState accordingly
    branch = payload.get("ref", "").replace("refs/heads/", "")
    if branch != "main": return

    modified = [f for c in payload.get("commits", [])
                  for f in c.get("modified", []) + c.get("added", [])]

    task_files = [f for f in modified if f.startswith("tasks/") and f.endswith(".json")]

    for path in task_files:
        task_id = path.split("/")[1].replace(".json", "")
        task    = read_task(task_id)
        if task:
            from core.state import TaskSnapshot
            snap = TaskSnapshot(
                task_id=task_id,
                status=task.get("status", "unknown"),
                risk_level=task.get("risk_level", "medium"),
                reject_count=task.get("reject_count", 0),
                branch=task.get("branch", ""),
                updated_at=datetime.now(timezone.utc),
            )
            await SystemState.get().update_task(snap)
            await broadcast({"type": "task_update", "task_id": task_id, "snapshot": vars(snap)})

            # Update agent heartbeat if this push is from a known agent
            writer = task.get("last_writer")
            if writer in ("coordinator", "executor", "reviewer"):
                await SystemState.get().update_agent_heartbeat(
                    writer, datetime.now(timezone.utc)
                )


async def _handle_pr(payload: dict) -> None:
    action = payload.get("action")
    pr     = payload.get("pull_request", {})
    branch = pr.get("head", {}).get("ref", "")

    await broadcast({
        "type":   "pr_update",
        "action": action,
        "pr":     pr.get("number"),
        "branch": branch,
        "title":  pr.get("title"),
        "url":    pr.get("html_url"),
    })


async def _handle_ci(payload: dict, source: str) -> None:
    # Extract CI status and update relevant task
    conclusion = payload.get("conclusion") or payload.get("status", "unknown")
    branch     = (payload.get("check_run", {}).get("check_suite", {})
                         .get("head_branch", ""))

    await broadcast({
        "type":       "ci_update",
        "source":     source,
        "branch":     branch,
        "conclusion": conclusion,
    })

    append_audit_log({
        "timestamp": datetime.now(timezone.utc).isoformat(),
        "actor":     source,
        "action":    "ci_result",
        "detail":    f"{source}: {conclusion} on {branch}",
    })
06 /

api/events.py

SSE
PY Server-Sent Events Broadcast api/events.py
from __future__ import annotations
import asyncio, json, logging
from typing import Any
from fastapi import APIRouter
from sse_starlette.sse import EventSourceResponse
from core.state import SystemState

log    = logging.getLogger("events")
router = APIRouter(prefix="/api")

# Global broadcast queue — all connected clients share this
_clients: list[asyncio.Queue] = []
_lock = asyncio.Lock()


async def broadcast(event: dict[str, Any]) -> None:
    """Push an event to all connected Dashboard clients."""
    async with _lock:
        dead = []
        for q in _clients:
            try:
                q.put_nowait(event)
            except asyncio.QueueFull:
                dead.append(q)
        for q in dead:
            _clients.remove(q)
            log.warning("Removed full SSE client queue")


@router.get("/events")
async def sse_stream(request: Request):
    """
    Dashboard subscribes here. One long-lived connection per browser tab.
    On connect: immediately send full current state.
    On disconnect: clean up queue.
    """
    queue: asyncio.Queue = asyncio.Queue(maxsize=100)

    async with _lock:
        _clients.append(queue)

    async def event_generator():
        # Send initial state snapshot on connect
        yield {
            "event": "init",
            "data":  json.dumps(SystemState.get().to_dict()),
        }

        try:
            while True:
                if await request.is_disconnected():
                    break
                try:
                    event = await asyncio.wait_for(queue.get(), timeout=30.0)
                    yield {
                        "event": event.get("type", "update"),
                        "data":  json.dumps(event),
                    }
                except asyncio.TimeoutError:
                    # Keepalive ping
                    yield {"event": "ping", "data": ""}
        finally:
            async with _lock:
                try: _clients.remove(queue)
                except ValueError: pass

    return EventSourceResponse(event_generator())


@router.get("/tasks")
async def get_tasks():
    return SystemState.get().to_dict()["tasks"]


@router.get("/health")
async def get_health():
    return SystemState.get().to_dict()["agents"]
07 /

api/actions.py

Admin Operations
JWT validation is not optional.This endpoint writes back to SSOT and triggers the Coordinator to continue execution. Any unauthorized calls amount to falsified Admin decisions. Dashboard must also use JWT when running locally - verification cannot be bypassed due to "local access".
PY Admin Approve / Reject / Override api/actions.py
from __future__ import annotations
import logging
from datetime import datetime, timezone
from fastapi import APIRouter, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from pydantic import BaseModel
from pydantic_settings import BaseSettings
from core.ssot  import read_task, write_task, append_audit_log
from api.events import broadcast

log    = logging.getLogger("actions")
router = APIRouter(prefix="/api/tasks")
oauth2 = OAuth2PasswordBearer(tokenUrl="token")


class AuthSettings(BaseSettings):
    jwt_secret:    str
    jwt_algorithm: str = "HS256"
    class Config: env_file = ".env"

_auth = AuthSettings()


def verify_admin_token(token: str = Depends(oauth2)) -> str:
    try:
        payload = jwt.decode(token, _auth.jwt_secret, algorithms=[_auth.jwt_algorithm])
        role    = payload.get("role")
        if role != "admin":
            raise HTTPException(403, "Admin role required")
        return payload.get("sub", "admin")
    except JWTError:
        raise HTTPException(401, "Invalid or expired token")


class ActionRequest(BaseModel):
    reason: str = ""


@router.post("/{task_id}/approve")
async def approve_task(
    task_id:  str,
    body:     ActionRequest,
    admin_id: str = Depends(verify_admin_token),
):
    task = read_task(task_id)
    if not task:
        raise HTTPException(404, f"Task {task_id} not found in SSOT")

    if task["status"] != "awaiting_approval":
        raise HTTPException(409, f"Task {task_id} is not in awaiting_approval state")

    task["status"]           = "approved"
    task["approved_by"]      = admin_id
    task["approved_at"]      = datetime.now(timezone.utc).isoformat()
    task["approval_reason"]   = body.reason
    task["last_writer"]      = "admin"

    write_task(task_id, task, f"Admin approved {task_id}")

    entry = {
        "timestamp": task["approved_at"],
        "actor":     "admin",
        "action":    "approved",
        "task_id":   task_id,
        "detail":    body.reason or "No reason provided",
    }
    append_audit_log(entry)
    await broadcast({"type": "task_approved", "task_id": task_id})

    log.info(f"Task {task_id} approved by {admin_id}")
    return {"ok": True, "task_id": task_id, "status": "approved"}


@router.post("/{task_id}/reject")
async def reject_task(
    task_id:  str,
    body:     ActionRequest,
    admin_id: str = Depends(verify_admin_token),
):
    task = read_task(task_id)
    if not task:
        raise HTTPException(404, f"Task {task_id} not found")

    if not body.reason:
        raise HTTPException(422, "reason is required for rejection")

    task["status"]          = "rejected"
    task["rejected_by"]     = admin_id
    task["rejected_at"]     = datetime.now(timezone.utc).isoformat()
    task["rejection_reason"] = body.reason
    task["last_writer"]     = "admin"

    write_task(task_id, task, f"Admin rejected {task_id}")
    append_audit_log({
        "timestamp": task["rejected_at"],
        "actor":     "admin",
        "action":    "rejected",
        "task_id":   task_id,
        "detail":    body.reason,
    })
    await broadcast({"type": "task_rejected", "task_id": task_id, "reason": body.reason})
    return {"ok": True, "task_id": task_id, "status": "rejected"}


@router.post("/{task_id}/override")
async def override_task(
    task_id:  str,
    body:     ActionRequest,
    admin_id: str = Depends(verify_admin_token),
):
    """
    Force-set task status. USE WITH EXTREME CAUTION.
    Bypasses normal flow. reason is mandatory. Written to audit log.
    """
    if not body.reason:
        raise HTTPException(422, "reason is mandatory for override")

    task = read_task(task_id)
    if not task:
        raise HTTPException(404, f"Task {task_id} not found")

    prev_status       = task["status"]
    task["status"]    = "override"
    task["last_writer"] = "admin"

    write_task(task_id, task, f"Admin OVERRIDE {task_id}: {body.reason}")
    append_audit_log({
        "timestamp":   datetime.now(timezone.utc).isoformat(),
        "actor":       "admin",
        "action":      "override",
        "task_id":     task_id,
        "prev_status": prev_status,
        "detail":      body.reason,
    })
    await broadcast({"type": "task_override", "task_id": task_id})
    return {"ok": True, "task_id": task_id, "prev_status": prev_status}
08 /

main.py

Entrypoint
PY FastAPI App + Startup main.py
from __future__ import annotations
import asyncio, logging
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

from core.state     import SystemState
from core.ssot      import get_all_tasks
from core.heartbeat import heartbeat_loop
from api.webhook    import router as webhook_router
from api.events     import router as events_router
from api.actions    import router as actions_router

logging.basicConfig(level=logging.INFO,
                    format="%(asctime)s %(name)s %(levelname)s %(message)s")
log = logging.getLogger("main")


@asynccontextmanager
async def lifespan(app: FastAPI):
    # ── Startup ──────────────────────────────────────
    log.info("AgentOS Backend starting...")

    # 1. Warm up SystemState from SSOT
    tasks = get_all_tasks()
    for t in tasks:
        from core.state import TaskSnapshot
        from datetime import timezone
        import datetime as dt
        snap = TaskSnapshot(
            task_id=t["task_id"],
            status=t.get("status", "unknown"),
            risk_level=t.get("risk_level", "medium"),
            reject_count=t.get("reject_count", 0),
            branch=t.get("branch", ""),
            updated_at=dt.datetime.now(timezone.utc),
        )
        await SystemState.get().update_task(snap)
    log.info(f"Loaded {len(tasks)} tasks from SSOT")

    # 2. Start heartbeat monitor as background task
    hb_task = asyncio.create_task(heartbeat_loop())
    log.info("Heartbeat monitor started")

    yield  # ── Running ──

    # ── Shutdown ─────────────────────────────────────
    hb_task.cancel()
    try: await hb_task
    except asyncio.CancelledError: pass
    log.info("AgentOS Backend shut down cleanly")


app = FastAPI(
    title="AgentOS Backend",
    version="0.1.0",
    lifespan=lifespan,
)

app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:*", "https://*.zixiangzhang.com"],
    allow_methods=["*"],
    allow_headers=["*"],
)

app.include_router(webhook_router)
app.include_router(events_router)
app.include_router(actions_router)
09 /

Deploy — Mac Studio Setup

SHELL Install + Run
# 1. Create venv
python3 -m venv .venv
source .venv/bin/activate

# 2. Install deps
pip install -r requirements.txt

# 3. Copy and fill .env
cp .env.example .env
# Edit .env with your tokens

# 4. Start backend
uvicorn main:app \
  --host 0.0.0.0 \
  --port 8000 \
  --reload

# 5. Start Cloudflare Tunnel
# (separate terminal)
cloudflared tunnel \
  --url http://localhost:8000

# 6. Register tunnel URL in GitHub
# Settings → Webhooks → Add webhook
# URL: https://<id>.trycloudflare.com/webhooks/github
# Secret: matches GITHUB_WEBHOOK_SECRET in .env
# Events: push, pull_request, check_run
SHELL Generate Admin JWT (one-time)
# Run once to generate your admin token
python3 - <<'EOF'
from jose import jwt
from datetime import datetime, timezone, timedelta
import os

secret = os.environ["JWT_SECRET"]
token  = jwt.encode(
    {
        "sub":  "kircerta",
        "role": "admin",
        "iat":  datetime.now(timezone.utc),
        "exp":  datetime.now(timezone.utc)
                + timedelta(days=365),
    },
    secret,
    algorithm="HS256",
)
print(token)
EOF

# Store the token in your Dashboard
# or Telegram bot for API calls

# Health check
curl http://localhost:8000/api/health

# Test SSE stream
curl -N http://localhost:8000/api/events
Known limitation (Draft v0.1): The sync/async boundary handling in write_task is a temporary solution—PyGithub is synchronous. In an async FastAPI context it should be wrapped with run_in_executor to avoid blocking the event loop. This is acceptable in the Draft stage, but before production it should be replaced with an async GitHub client or a threadpool executor.
AgentOS Backend — Draft v0.1 · 2026.02.26 Authored by Reviewer Provider (claude/gemini/deepseek) · Known limitation documented in §09