Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Low-Level Design: Large-scale Agentic Engineering

详细设计文档(应付“50% AI Coding“运动)

Date: 2026-03-01
Version: 1.0
Status: Design Complete


1. System Architecture

1.1 High-Level Architecture

┌─────────────────────────────────────────────────────────────────┐
│                    Large-scale Agentic Engineering              │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                   OpenClaw (Main Brain)                  │   │
│  │  - Model: qwen3.5-plus                                   │   │
│  │  - Role: Orchestrator, Decision Maker                    │   │
│  │  - Lifetime: Long-running (weeks to months)              │   │
│  └─────────────────────────────────────────────────────────┘   │
│                              │                                  │
│         ┌────────────────────┼────────────────────┐            │
│         │ sessions_spawn()   │ sessions_send()    │            │
│         ▼                    ▼                    ▼             │
│  ┌─────────────┐     ┌─────────────┐     ┌─────────────┐       │
│  │ Sub-Agent 1 │     │ Sub-Agent 2 │     │ Sub-Agent N │       │
│  │ (Analyzer)  │     │ (Migrator)  │     │ (Guardian)  │       │
│  │ qwen3.5+    │     │ qwen3.5+    │     │ qwen3.5+    │       │
│  │ Disposable  │     │ Disposable  │     │ Long-running│       │
│  └─────────────┘     └─────────────┘     └─────────────┘       │
│                                                                 │
│                              │                                  │
│                              ▼                                  │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                  Persistent State (.rd-os/)              │   │
│  │  - progress.db (SQLite): Definitive progress store      │   │
│  │  - agent-states/: Per-agent checkpoints (JSON)          │   │
│  │  - artifacts/: Generated reports, outputs               │   │
│  │  - Survives: OpenClaw restart, sub-agent death          │   │
│  └─────────────────────────────────────────────────────────┘   │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

1.2 Component Responsibilities

ComponentResponsibilityLifetimeModel
OpenClawOrchestration, decisions, recoveryWeeks-monthsqwen3.5-plus
Analyzer AgentsRepo analysis, value scoringMinutes-hoursqwen3.5-plus
Migrator AgentsCode migration, build updatesMinutes-hoursqwen3.5-plus
Guardian AgentsContinuous monitoring, PR reviewDays-weeksqwen3.5-plus
State StoreProgress.db, checkpointsPermanentN/A

2. Data Model

2.1 SQLite Schema (progress.db)

-- Repository registry
CREATE TABLE repos (
    repo_id TEXT PRIMARY KEY,
    name TEXT NOT NULL,
    full_name TEXT,
    priority TEXT,  -- P0, P1, P2, P3
    category TEXT,  -- product, platform, tool, docs, sdk
    github_url TEXT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Analysis state
CREATE TABLE analysis_state (
    repo_id TEXT PRIMARY KEY,
    status TEXT NOT NULL,  -- pending, running, done, failed
    progress_percent INTEGER DEFAULT 0,
    value_score INTEGER,   -- 0-100
    tier TEXT,             -- S, A, B, C
    started_at TIMESTAMP,
    completed_at TIMESTAMP,
    result_json TEXT,      -- Full analysis result
    error_message TEXT,
    last_checkpoint TEXT,
    FOREIGN KEY (repo_id) REFERENCES repos(repo_id)
);

-- Migration state
CREATE TABLE migration_state (
    repo_id TEXT PRIMARY KEY,
    status TEXT NOT NULL,  -- pending, running, done, failed
    phase TEXT,            -- prep, transfer, integrate, validate
    progress_percent INTEGER DEFAULT 0,
    started_at TIMESTAMP,
    completed_at TIMESTAMP,
    result_json TEXT,
    error_message TEXT,
    FOREIGN KEY (repo_id) REFERENCES repos(repo_id)
);

-- Sub-agent registry
CREATE TABLE sub_agents (
    agent_id TEXT PRIMARY KEY,
    agent_type TEXT NOT NULL,  -- analyzer, migrator, guardian
    repo_id TEXT,
    status TEXT NOT NULL,      -- active, idle, paused, completed, failed
    spawned_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    completed_at TIMESTAMP,
    last_heartbeat TIMESTAMP,
    checkpoint_path TEXT,
    FOREIGN KEY (repo_id) REFERENCES repos(repo_id)
);

-- Checkpoints
CREATE TABLE checkpoints (
    checkpoint_id TEXT PRIMARY KEY,
    checkpoint_type TEXT NOT NULL,  -- micro, batch, milestone
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    state_snapshot TEXT,  -- JSON of full state
    recoverable BOOLEAN DEFAULT TRUE
);

-- Event log (for debugging/audit)
CREATE TABLE events (
    event_id TEXT PRIMARY KEY,
    timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    event_type TEXT NOT NULL,
    agent_id TEXT,
    repo_id TEXT,
    details TEXT
);

-- Indexes for fast queries
CREATE INDEX idx_analysis_status ON analysis_state(status);
CREATE INDEX idx_migration_status ON migration_state(status);
CREATE INDEX idx_agent_status ON sub_agents(status);
CREATE INDEX idx_events_timestamp ON events(timestamp);
CREATE INDEX idx_repos_priority ON repos(priority);

2.2 JSON State Format

// .rd-os/state/agent-states/{repo_id}-analysis.json
{
  "agent_id": "analyzer-tidb-001",
  "repo_id": "tidb",
  "status": "completed",
  "created_at": "2026-03-01T10:00:00Z",
  "updated_at": "2026-03-01T10:30:00Z",
  
  "work": {
    "phase": "analysis",
    "subtask": "dependency_mapping",
    "progress_percent": 100,
    "items_total": 50,
    "items_completed": 50,
    "items_failed": 0
  },
  
  "result": {
    "success": true,
    "output_path": ".rd-os/store/artifacts/tidb-analysis.json",
    "summary": {
      "lines_of_code": 652000,
      "dependencies": 127,
      "test_coverage": 78.5,
      "last_commit": "2026-02-28",
      "merge_recommendation": "P0-migrate"
    }
  },
  
  "checkpoint": {
    "last_action": "wrote_dependency_graph",
    "last_action_time": "2026-03-01T10:30:00Z",
    "can_resume": false,
    "resume_point": null
  },
  
  "errors": []
}

3. OpenClaw Main Loop

3.1 Orchestration Logic

class OpenClawOrchestrator:
    """
    OpenClaw main orchestration loop
    """
    
    def __init__(self, db_path: str, max_concurrent: int = 50):
        self.db = load_database(db_path)
        self.max_concurrent = max_concurrent
        self.active_agents = 0
        self.lock = asyncio.Lock()
    
    async def run(self):
        """
        Main orchestration loop
        """
        # 1. Recovery (after restart)
        await self.recover_state()
        
        # 2. Main loop
        while not self.is_complete():
            # 2.1 Check progress
            progress = await self.load_progress()
            
            # 2.2 Make scheduling decisions
            decisions = await self.make_scheduling_decisions(progress)
            
            # 2.3 Spawn sub-agents for new work
            for decision in decisions:
                if self.active_agents < self.max_concurrent:
                    if decision.action == 'analyze':
                        await self.spawn_analyzer(decision.repo)
                    elif decision.action == 'migrate':
                        await self.spawn_migrator(decision.repo)
                    elif decision.action == 'deep_dive':
                        await self.spawn_deep_analysis_team(decision.repo)
            
            # 2.4 Check for completed sub-agents
            completed = await self.check_completed_sub_agents()
            for result in completed:
                await self.process_result(result)
            
            # 2.5 Handle escalations
            await self.handle_escalations()
            
            # 2.6 Update progress
            await self.update_progress()
            
            # 2.7 Checkpoint
            await self.checkpoint()
            
            # 2.8 Wait (avoid busy loop)
            await asyncio.sleep(60)
        
        # 3. Completion
        await self.generate_final_report()
    
    async def recover_state(self):
        """
        Recover state after OpenClaw restart
        """
        # Load progress DB
        incomplete = self.db.query("""
            SELECT repo_id, progress_percent, last_checkpoint
            FROM analysis_state
            WHERE status = 'running'
        """)
        
        for task in incomplete:
            # Check if sub-agent has checkpoint
            checkpoint_path = f".rd-os/state/agent-states/{task.repo_id}-analysis.checkpoint.json"
            
            if exists(checkpoint_path):
                # Resume from checkpoint
                checkpoint = read_json(checkpoint_path)
                await self.resume_analyzer(task.repo_id, checkpoint)
            else:
                # No checkpoint, restart
                await self.spawn_analyzer(task.repo_id)
    
    async def spawn_analyzer(self, repo: Repo):
        """
        Spawn a sub-agent to analyze a repo
        """
        task = f"""
        Analyze repository: {repo.name}
        
        Output to: .rd-os/state/agent-states/{repo.id}-analysis.json
        
        Steps:
        1. Read repo metadata from GitHub API
        2. Analyze code structure
        3. Map dependencies
        4. Assess code quality
        5. Generate merge recommendation
        
        Checkpoint after each step.
        Report completion via sessions_send().
        """
        
        # Spawn sub-agent (qwen3.5-plus, cheap)
        session = await sessions_spawn(
            task=task,
            model='qwen3.5-plus',
            cleanup='delete',  # Destroy after completion
            label=f'analyzer-{repo.id}'
        )
        
        # Register sub-agent
        self.db.execute("""
            INSERT INTO sub_agents (agent_id, type, repo_id, status, spawned_at)
            VALUES (?, 'analyzer', ?, 'running', ?)
        """, (session.id, repo.id, now()))
        
        self.active_agents += 1

3.2 Sub-Agent Task Template

# Template for sub-agent tasks

ANALYZER_TASK_TEMPLATE = """
You are a Repository Analyzer Agent.

TASK: Analyze {repo_name}
OUTPUT: .rd-os/state/agent-states/{repo_id}-analysis.json

INSTRUCTIONS:
1. Read repo metadata from .rd-os/store/repos/{repo_id}.json
2. Analyze code structure (use GitHub API or local clone)
3. Map dependencies (go.mod, package.json, requirements.txt)
4. Assess code quality (tests, docs, lint)
5. Generate merge recommendation (P0/P1/P2/P3/archive)

VALUE SCORING (0-100):
- Activity (0-25): Last commit frequency, active contributors
- Impact (0-25): Stars, forks, import count, deployment instances
- Strategic (0-25): Core product, platform component, critical dependency
- Quality (0-15): Test coverage, documentation, code standards
- Feasibility (0-10): Dependency complexity, team support, tech stack match

CHECKPOINTING:
- After each step, write checkpoint to:
  .rd-os/state/agent-states/{repo_id}-analysis.checkpoint.json
- Include: step_completed, partial_results, can_resume

COMPLETION:
- Write final output to: .rd-os/state/agent-states/{repo_id}-analysis.json
- Send completion message via sessions_send():
  "Analysis complete: {repo_id}, output: {output_path}"

MODEL: qwen3.5-plus
TIMEOUT: 30 minutes
CLEANUP: delete (session destroyed after completion)
"""

4. State Persistence

4.1 Checkpoint Strategy

Checkpoint TypeFrequencyContentUse Case
MicroEvery actionAgent stateCrash recovery
BatchEvery N itemsBatch summaryBatch resume
MilestonePhase completeFull state snapshotPhase resume
PeriodicEvery N minutesAggregated progressTime-based recovery

4.2 Checkpoint Implementation

class CheckpointManager:
    """
    Manage checkpoints for recovery
    """
    
    def __init__(self, base_path: str):
        self.base_path = base_path
        self.state_path = f"{base_path}/state"
        self.store_path = f"{base_path}/store"
    
    def save_agent_state(self, agent_id: str, state: dict):
        """Save per-agent checkpoint (micro)"""
        path = f"{self.state_path}/agent-states/{agent_id}.state.json"
        state['checkpoint_time'] = now()
        write_json(path, state)
        
        # Also update SQLite
        db.execute("""
            INSERT OR REPLACE INTO sub_agents (agent_id, state_json, updated_at)
            VALUES (?, ?, ?)
        """, (agent_id, json.dumps(state), now()))
    
    def save_batch_progress(self, batch_id: str, progress: dict):
        """Save batch progress (batch)"""
        path = f"{self.state_path}/progress/{batch_id}.json"
        write_json(path, progress)
        
        # Update SQLite summary
        db.execute("""
            UPDATE batch_progress
            SET progress_json = ?, updated_at = ?
            WHERE batch_id = ?
        """, (json.dumps(progress), now(), batch_id))
    
    def save_milestone(self, milestone_name: str):
        """Save full state snapshot (milestone)"""
        checkpoint_id = f"checkpoint-{milestone_name}-{timestamp()}"
        path = f"{self.state_path}/checkpoints/{checkpoint_id}/"
        
        # Snapshot everything
        snapshot = {
            'milestone': milestone_name,
            'timestamp': now(),
            'analysis_state': db.query_all("SELECT * FROM analysis_state"),
            'migration_state': db.query_all("SELECT * FROM migration_state"),
            'agent_state': db.query_all("SELECT * FROM sub_agents"),
            'progress_summary': self.calculate_progress_summary()
        }
        
        write_json(f"{path}/snapshot.json", snapshot)
        
        # Record in SQLite
        db.execute("""
            INSERT INTO checkpoints (checkpoint_id, checkpoint_type, created_at, state_snapshot, recoverable)
            VALUES (?, ?, ?, ?, ?)
        """, (checkpoint_id, 'milestone', now(), json.dumps(snapshot), True))
        
        return checkpoint_id
    
    def load_checkpoint(self, checkpoint_id: str) -> dict:
        """Load checkpoint for recovery"""
        path = f"{self.state_path}/checkpoints/{checkpoint_id}/snapshot.json"
        return read_json(path)
    
    def get_recovery_state(self) -> dict:
        """Get current state for recovery"""
        return {
            'analysis': db.query_all("SELECT * FROM analysis_state WHERE status != 'done'"),
            'migration': db.query_all("SELECT * FROM migration_state WHERE status != 'done'"),
            'agents': db.query_all("SELECT * FROM sub_agents WHERE status != 'idle'"),
            'latest_checkpoint': db.query_one("SELECT * FROM checkpoints ORDER BY created_at DESC LIMIT 1")
        }

5. Recovery Protocol

5.1 Recovery Flow

OpenClaw Restarts
       │
       ▼
┌─────────────────────────────────────────────────────────────┐
│  1. Load State from .rd-os/store/progress.db                │
│     - Query: What repos are analyzed?                       │
│     - Query: What repos are in progress?                    │
│     - Query: What sub-agents were running?                  │
└─────────────────────────────────────────────────────────────┘
       │
       ▼
┌─────────────────────────────────────────────────────────────┐
│  2. Reconcile Sub-Agent State                               │
│     - Find sub-agents marked 'running'                      │
│     - Check if they have checkpoints                        │
│     - If checkpoint exists → respawn with resume            │
│     - If no checkpoint → restart from beginning             │
└─────────────────────────────────────────────────────────────┘
       │
       ▼
┌─────────────────────────────────────────────────────────────┐
│  3. Resume Orchestration                                    │
│     - Continue main loop                                    │
│     - Spawn new sub-agents for pending work                 │
│     - Resume from last checkpoint                           │
└─────────────────────────────────────────────────────────────┘

Result: OpenClaw can restart anytime, progress is never lost

5.2 Recovery Example

async def recover_after_restart():
    """
    Recovery after OpenClaw restart
    """
    # Load durable state
    db = load_database(".rd-os/store/progress.db")
    
    # Find incomplete analysis
    incomplete = db.query("""
        SELECT repo_id, progress_percent, last_checkpoint
        FROM analysis_state
        WHERE status = 'running' OR status = 'pending'
    """)
    
    for task in incomplete:
        if task.progress_percent > 0:
            # Has progress - try to resume
            checkpoint = load_checkpoint(task.last_checkpoint)
            await resume_analysis(task.repo_id, checkpoint)
        else:
            # No progress - restart
            await start_analysis(task.repo_id)
    
    # Find incomplete migrations
    # ... similar logic
    
    # Resume agents
    agents = db.query("SELECT * FROM sub_agents WHERE status = 'active'")
    for agent in agents:
        await resume_agent(agent.agent_id)
    
    log.info(f"Recovery complete: {len(incomplete)} tasks resumed")

6. Concurrency Control

6.1 Agent Pool Manager

class AgentPoolManager:
    """
    Manage sub-agent concurrency
    """
    
    def __init__(self, max_concurrent: int = 50):
        self.max_concurrent = max_concurrent
        self.active_count = 0
        self.lock = asyncio.Lock()
    
    async def acquire(self) -> bool:
        """
        Acquire a slot for new sub-agent
        """
        async with self.lock:
            if self.active_count < self.max_concurrent:
                self.active_count += 1
                return True
            return False
    
    async def release(self):
        """
        Release a slot when sub-agent completes
        """
        async with self.lock:
            self.active_count -= 1
    
    def get_utilization(self) -> float:
        return self.active_count / self.max_concurrent
    
    def get_available_slots(self) -> int:
        return self.max_concurrent - self.active_count

6.2 Batch Processing

async def process_in_batches(repos: List[Repo], batch_size: int = 50):
    """
    Process repos in batches (avoid overwhelming system)
    """
    for i in range(0, len(repos), batch_size):
        batch = repos[i:i+batch_size]
        
        log.info(f"Processing batch {i//batch_size + 1}: {len(batch)} repos")
        
        # Spawn sub-agents for batch
        tasks = [spawn_analyzer(repo) for repo in batch]
        
        # Wait for batch to complete (with timeout)
        await asyncio.gather(*tasks, return_exceptions=True)
        
        # Checkpoint after batch
        await checkpoint(f'batch-{i//batch_size}')
        
        # Rate limit (avoid API throttling)
        await asyncio.sleep(60)

7. API Specifications

7.1 GitHub API Integration

class GitHubAPIClient:
    """
    GitHub API client for repo metadata
    """
    
    def __init__(self, token: str):
        self.token = token
        self.base_url = "https://api.github.com"
        self.rate_limit = 5000  # requests/hour
        self.requests_made = 0
    
    async def get_repo(self, owner: str, repo: str) -> dict:
        """
        Get repository metadata
        """
        url = f"{self.base_url}/repos/{owner}/{repo}"
        return await self._request(url)
    
    async def get_repos(self, org: str, per_page: int = 100) -> List[dict]:
        """
        Get all repositories for an organization
        """
        repos = []
        page = 1
        while True:
            url = f"{self.base_url}/orgs/{org}/repos"
            params = {"sort": "stars", "direction": "desc", "per_page": per_page, "page": page}
            result = await self._request(url, params)
            if not result:
                break
            repos.extend(result)
            page += 1
        return repos
    
    async def _request(self, url: str, params: dict = None) -> dict:
        """
        Make authenticated request with rate limiting
        """
        if self.requests_made >= self.rate_limit:
            await self._wait_for_reset()
        
        headers = {"Authorization": f"token {self.token}"}
        async with aiohttp.ClientSession() as session:
            async with session.get(url, headers=headers, params=params) as response:
                self.requests_made += 1
                return await response.json()

7.2 sessions_spawn Interface

async def sessions_spawn(
    task: str,
    model: str = 'qwen3.5-plus',
    cleanup: str = 'delete',
    label: str = None,
    timeout_seconds: int = 1800
) -> Session:
    """
    Spawn a sub-agent session
    
    Args:
        task: Task description for the sub-agent
        model: Model to use (default: qwen3.5-plus)
        cleanup: 'delete' (destroy after completion) or 'keep'
        label: Optional label for the session
        timeout_seconds: Timeout in seconds (default: 30 minutes)
    
    Returns:
        Session object with id and methods
    """
    # Implementation via OpenClaw sessions_spawn API
    pass

7.3 sessions_send Interface

async def sessions_send(
    session_key: str = None,
    label: str = None,
    message: str = None,
    timeout_seconds: int = 60
):
    """
    Send a message to/from a session
    
    Args:
        session_key: Target session key (or label)
        label: Target session label
        message: Message to send
        timeout_seconds: Timeout in seconds
    """
    # Implementation via OpenClaw sessions_send API
    pass

8. Directory Structure

mono-repo/
└── .rd-os/
    ├── state/                      # Runtime state (can rebuild)
    │   ├── agent-states/           # Per-agent checkpoint
    │   │   ├── repo-001.state.json
    │   │   ├── repo-002.state.json
    │   │   └── ...
    │   ├── progress/               # Aggregated progress
    │   │   ├── analysis-progress.json
    │   │   ├── migration-progress.json
    │   │   └── daily-summary/
    │   │       ├── 2026-03-01.json
    │   │       └── ...
    │   └── checkpoints/            # Milestone snapshots
    │       ├── checkpoint-001-analysis-complete/
    │       ├── checkpoint-002-p0-migrated/
    │       └── ...
    │
    └── store/                      # Durable store (source of truth)
        ├── progress.db             # SQLite: definitive progress
        ├── agents.db               # SQLite: agent registry
        ├── artifacts/              # Generated outputs
        │   ├── analysis-report.json
        │   ├── migration-log.jsonl
        │   └── ...
        └── config/                 # Configuration
            ├── agents.yaml
            ├── workflows.yaml
            └── policies.yaml

9. Cost Estimate

9.1 Token Usage

PhaseReposTokens/RepoTotal TokensCost (@$0.002/1K)
Analysis40010K4M~$8
Deep Analysis150 (S/A)50K7.5M~$15
Migration40050K20M~$40
Ongoing (monthly)--3M~$6
Total (Year 1)--~35M~$70

9.2 Infrastructure

ResourceEstimateCost
Storage100GB SSD~$10/month
ComputeLocal (existing)$0
GitHub APIFree tier (5K/hr)$0
Total (monthly)-~$10

9.3 Total Cost (Year 1)

CategoryCost
LLM Tokens~$70
Infrastructure~$120
Total~$190

10. Risk Mitigation

10.1 Technical Risks

RiskProbabilityImpactMitigation
API Rate LimitMediumMediumBatch requests, add delays, use multiple tokens
Sub-Agent FailureHighLowCheckpoint + retry, idempotent operations
OpenClaw RestartMediumLowRecovery from progress.db, automatic resume
Token OverrunLowMediumMonitor usage, set limits, alert on threshold
Poor Quality OutputMediumMediumHuman review, iterate template, add validation

10.2 Operational Risks

RiskProbabilityImpactMitigation
Data LossLowHighFull backups before each batch, SQLite WAL mode
Build FailuresMediumMediumComprehensive tests, canary deploys, rollback
Performance DegradationMediumMediumIncremental builds, remote caching, parallel execution

11. Testing Strategy

11.1 Unit Tests

# Test checkpoint manager
def test_save_agent_state():
    manager = CheckpointManager(".rd-os")
    state = {"agent_id": "test-001", "status": "running", "progress": 50}
    manager.save_agent_state("test-001", state)
    
    # Verify file created
    assert exists(".rd-os/state/agent-states/test-001.state.json")
    
    # Verify SQLite updated
    result = db.query_one("SELECT * FROM sub_agents WHERE agent_id = ?", ("test-001",))
    assert result is not None

# Test recovery
def test_recovery_after_restart():
    # Simulate restart
    orchestrator = OpenClawOrchestrator(".rd-os/store/progress.db")
    await orchestrator.recover_state()
    
    # Verify incomplete tasks resumed
    incomplete = db.query_all("SELECT * FROM analysis_state WHERE status = 'running'")
    for task in incomplete:
        assert task.repo_id in orchestrator.active_tasks

11.2 Integration Tests

# Test full analysis workflow
async def test_full_analysis_workflow():
    # Setup
    repos = [Repo("tidb"), Repo("tiflow")]
    
    # Run analysis
    await process_in_batches(repos, batch_size=2)
    
    # Verify results
    for repo in repos:
        state = db.query_one("SELECT * FROM analysis_state WHERE repo_id = ?", (repo.id,))
        assert state.status == "done"
        assert state.result_json is not None
    
    # Verify checkpoints
    assert exists(".rd-os/state/checkpoints/checkpoint-batch-0/")

11.3 Recovery Tests

# Test recovery after crash
async def test_recovery_after_crash():
    # Start analysis
    orchestrator = OpenClawOrchestrator(".rd-os/store/progress.db")
    task = asyncio.create_task(orchestrator.run())
    
    # Wait for some progress
    await asyncio.sleep(300)  # 5 minutes
    
    # Simulate crash
    task.cancel()
    await task
    
    # Restart
    orchestrator2 = OpenClawOrchestrator(".rd-os/store/progress.db")
    await orchestrator2.recover_state()
    
    # Verify progress preserved
    progress = await orchestrator2.load_progress()
    assert progress['analyzed'] > 0
    assert progress['in_progress'] >= 0

12. Deployment Plan

12.1 Phase 1: Infrastructure Setup (Week 1-2)

Week 1:
- Create .rd-os/ directory structure
- Initialize progress.db schema
- Implement OpenClaw main loop
- Implement checkpoint manager

Week 2:
- Create sub-agent task templates
- Implement recovery protocol
- Test restart recovery
- Test sub-agent failure recovery

12.2 Phase 2: 400-Repo Analysis (Week 3-4)

Week 3:
- Fetch all 400 repos via GitHub API
- Run initial scan (all repos)
- Score and tier repos

Week 4:
- Deep analysis for S/A-tier repos
- Generate analysis report
- Create migration priority list

12.3 Phase 3: Migration (Week 5-16)

Week 5-7:  P0 repos (50 repos)
Week 8-11: P1 repos (100 repos)
Week 12-15: P2-P3 repos (150 repos)
Week 16:   P4-P5 cleanup (100 repos)

13. Monitoring & Alerting

13.1 Key Metrics

metrics = {
    'total_repos': 400,
    'analyzed': 150,
    'in_progress': 50,
    'pending': 200,
    'failed': 0,
    'progress_percent': 37.5,
    
    'active_agents': 45,
    'agent_utilization': 0.90,
    
    'tokens_used': 1500000,
    'tokens_remaining': 3500000,
    'estimated_cost': 3.00,
    
    'last_checkpoint': '2026-03-01T14:00:00Z',
    'checkpoint_age_minutes': 15,
}

13.2 Alerting Rules

alerts:
  - name: high_failure_rate
    condition: "failed_count / total_count > 0.05"
    severity: warning
    action: notify_human

  - name: stalled_progress
    condition: "no_progress_for_minutes > 60"
    severity: warning
    action: notify_human

  - name: agent_down
    condition: "agent_heartbeat_age_minutes > 10"
    severity: critical
    action: notify_human + restart_agent

  - name: checkpoint_age
    condition: "last_checkpoint_age_minutes > 30"
    severity: warning
    action: force_checkpoint

  - name: token_budget
    condition: "tokens_remaining < 500000"
    severity: warning
    action: notify_human

14. Appendix

14.1 Glossary

TermDefinition
OpenClawMain orchestrator (LLM-based)
Sub-AgentTemporary worker agent (spawned by OpenClaw)
CheckpointSaved state for recovery
Mono-RepoSingle repository containing all code
RD-OSResearch & Development Operating System

14.2 References


Low-Level Design: Large-scale Agentic Engineering
Version 1.0 | 2026-03-01