diff --git a/internal/runtime/model.go b/internal/runtime/model.go index 7ea3d2f..4b4d104 100644 --- a/internal/runtime/model.go +++ b/internal/runtime/model.go @@ -11,12 +11,19 @@ type Snapshot struct { type SourceMap map[string]SourceEvidence type Thread struct { - ID string `json:"id"` - Role string `json:"role"` - Status string `json:"status"` - CreatedAt string `json:"createdAt"` - UpdatedAt string `json:"updatedAt"` - Source SourceEvidence `json:"source"` + ID string `json:"id"` + Role string `json:"role"` + Status string `json:"status"` + CreatedAt string `json:"createdAt"` + UpdatedAt string `json:"updatedAt"` + CWD string `json:"cwd"` + Title string `json:"title"` + AgentNickname string `json:"agentNickname,omitempty"` + AgentRole string `json:"agentRole,omitempty"` + AgentPath string `json:"agentPath,omitempty"` + ThreadSource string `json:"threadSource,omitempty"` + Preview string `json:"preview,omitempty"` + Source SourceEvidence `json:"source"` } type SpawnEdge struct { diff --git a/internal/runtime/store.go b/internal/runtime/store.go index 9714173..3dd5dc0 100644 --- a/internal/runtime/store.go +++ b/internal/runtime/store.go @@ -121,7 +121,20 @@ func readThreads(db *sql.DB, sourcePath string, sources SourceMap) ([]Thread, er sources["threads"] = tableSource("sqlite_schema_drift", sourcePath, "threads table is missing required id column.") return []Thread{}, nil } - query := `SELECT ` + textColumn(columns, "id") + `, ` + textColumn(columns, "role") + `, ` + textColumn(columns, "status") + `, ` + textColumn(columns, "created_at") + `, ` + textColumn(columns, "updated_at") + ` FROM threads ORDER BY ` + orderBy(columns, "created_at", "id") + query := `SELECT ` + + textColumn(columns, "id") + `, ` + + firstTextColumn(columns, "agent_role", "role") + `, ` + + textColumn(columns, "status") + `, ` + + firstTextColumn(columns, "created_at_ms", "created_at") + `, ` + + firstTextColumn(columns, "updated_at_ms", "updated_at") + `, ` + + textColumn(columns, "cwd") + `, ` + + textColumn(columns, "title") + `, ` + + textColumn(columns, "agent_nickname") + `, ` + + textColumn(columns, "agent_role") + `, ` + + textColumn(columns, "agent_path") + `, ` + + textColumn(columns, "thread_source") + `, ` + + textColumn(columns, "preview") + + ` FROM threads ORDER BY ` + orderBy(columns, "created_at_ms", "created_at", "id") rows, err := db.Query(query) if err != nil { if isMissingTable(err) { @@ -133,7 +146,20 @@ func readThreads(db *sql.DB, sourcePath string, sources SourceMap) ([]Thread, er var threads []Thread for rows.Next() { var item Thread - if err := rows.Scan(&item.ID, &item.Role, &item.Status, &item.CreatedAt, &item.UpdatedAt); err != nil { + if err := rows.Scan( + &item.ID, + &item.Role, + &item.Status, + &item.CreatedAt, + &item.UpdatedAt, + &item.CWD, + &item.Title, + &item.AgentNickname, + &item.AgentRole, + &item.AgentPath, + &item.ThreadSource, + &item.Preview, + ); err != nil { return nil, err } item.Source = SourceEvidence{Kind: "sqlite_table", Path: sourcePath, Confidence: "high"} @@ -152,11 +178,16 @@ func readSpawnEdges(db *sql.DB, sourcePath string, sources SourceMap) ([]SpawnEd sources["thread_spawn_edges"] = tableSource("sqlite_missing_table", sourcePath, "thread_spawn_edges table was not found.") return []SpawnEdge{}, nil } - if !columns["from_thread_id"] || !columns["to_thread_id"] { + if !(columns["from_thread_id"] && columns["to_thread_id"]) && !(columns["parent_thread_id"] && columns["child_thread_id"]) { sources["thread_spawn_edges"] = tableSource("sqlite_schema_drift", sourcePath, "thread_spawn_edges table is missing required endpoint columns.") return []SpawnEdge{}, nil } - query := `SELECT ` + textColumn(columns, "from_thread_id") + `, ` + textColumn(columns, "to_thread_id") + `, ` + textColumn(columns, "reason") + `, ` + textColumn(columns, "created_at") + ` FROM thread_spawn_edges ORDER BY ` + orderBy(columns, "created_at", "from_thread_id", "to_thread_id") + query := `SELECT ` + + firstTextColumn(columns, "from_thread_id", "parent_thread_id") + `, ` + + firstTextColumn(columns, "to_thread_id", "child_thread_id") + `, ` + + firstTextColumn(columns, "reason", "status") + `, ` + + textColumn(columns, "created_at") + + ` FROM thread_spawn_edges ORDER BY ` + orderBy(columns, "created_at", "from_thread_id", "parent_thread_id", "to_thread_id", "child_thread_id") rows, err := db.Query(query) if err != nil { if isMissingTable(err) { @@ -191,7 +222,12 @@ func readGoals(db *sql.DB, sourcePath string, sources SourceMap) ([]Goal, error) sources["thread_goals"] = tableSource("sqlite_schema_drift", sourcePath, "thread_goals table is missing required thread_id column.") return []Goal{}, nil } - query := `SELECT ` + textColumn(columns, "thread_id") + `, ` + textColumn(columns, "goal") + `, ` + textColumn(columns, "status") + `, ` + textColumn(columns, "updated_at") + ` FROM thread_goals ORDER BY ` + orderBy(columns, "updated_at", "thread_id") + query := `SELECT ` + + textColumn(columns, "thread_id") + `, ` + + firstTextColumn(columns, "goal", "objective") + `, ` + + textColumn(columns, "status") + `, ` + + firstTextColumn(columns, "updated_at_ms", "updated_at") + + ` FROM thread_goals ORDER BY ` + orderBy(columns, "updated_at_ms", "updated_at", "thread_id") rows, err := db.Query(query) if err != nil { if isMissingTable(err) { @@ -244,6 +280,15 @@ func textColumn(columns map[string]bool, name string) string { return "COALESCE(CAST(" + name + " AS TEXT), '')" } +func firstTextColumn(columns map[string]bool, names ...string) string { + for _, name := range names { + if columns[name] { + return textColumn(columns, name) + } + } + return "''" +} + func orderBy(columns map[string]bool, names ...string) string { var order []string for _, name := range names { diff --git a/internal/runtime/store_test.go b/internal/runtime/store_test.go index fc588c5..65dd601 100644 --- a/internal/runtime/store_test.go +++ b/internal/runtime/store_test.go @@ -66,6 +66,100 @@ func TestStoreReadsThreadsEdgesAndGoalsFromReadonlySQLite(t *testing.T) { } } +func TestStoreReadsCurrentCodexRuntimeSchema(t *testing.T) { + root := t.TempDir() + stateDB := openWritableSQLite(t, filepath.Join(root, "state_5.sqlite")) + defer stateDB.Close() + execSQL(t, stateDB, `CREATE TABLE threads ( + id TEXT PRIMARY KEY, + rollout_path TEXT NOT NULL, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL, + source TEXT NOT NULL, + model_provider TEXT NOT NULL, + cwd TEXT NOT NULL, + title TEXT NOT NULL, + sandbox_policy TEXT NOT NULL, + approval_mode TEXT NOT NULL, + tokens_used INTEGER NOT NULL DEFAULT 0, + has_user_event INTEGER NOT NULL DEFAULT 0, + archived INTEGER NOT NULL DEFAULT 0, + archived_at INTEGER, + git_sha TEXT, + git_branch TEXT, + git_origin_url TEXT, + cli_version TEXT NOT NULL DEFAULT '', + first_user_message TEXT NOT NULL DEFAULT '', + agent_nickname TEXT, + agent_role TEXT, + memory_mode TEXT NOT NULL DEFAULT 'enabled', + model TEXT, + reasoning_effort TEXT, + agent_path TEXT, + created_at_ms INTEGER, + updated_at_ms INTEGER, + thread_source TEXT, + preview TEXT NOT NULL DEFAULT '' + )`) + execSQL(t, stateDB, `CREATE TABLE thread_spawn_edges ( + parent_thread_id TEXT NOT NULL, + child_thread_id TEXT NOT NULL PRIMARY KEY, + status TEXT NOT NULL + )`) + execSQL(t, stateDB, `INSERT INTO threads ( + id, rollout_path, created_at, updated_at, source, model_provider, cwd, title, sandbox_policy, approval_mode, + cli_version, agent_nickname, agent_role, agent_path, created_at_ms, updated_at_ms, thread_source, preview + ) VALUES + ('thread-a', '/tmp/a.jsonl', 1, 2, 'codex', 'openai', '/repo/a', '主线程', 'workspace-write', 'on-request', + '0.0.1', '主控', '智能体编排者', '/agents/orchestrator.toml', 1000, 2000, 'cli', '监管项目流程'), + ('thread-b', '/tmp/b.jsonl', 3, 4, 'codex', 'openai', '/repo/b', '审查线程', 'workspace-write', 'on-request', + '0.0.1', '审查员', '代码审查员', '/agents/reviewer.toml', 3000, 4000, 'cli', '审查实现')`) + execSQL(t, stateDB, `INSERT INTO thread_spawn_edges (parent_thread_id, child_thread_id, status) VALUES + ('thread-a', 'thread-b', 'spawned')`) + + goalsDB := openWritableSQLite(t, filepath.Join(root, "goals_1.sqlite")) + defer goalsDB.Close() + execSQL(t, goalsDB, `CREATE TABLE thread_goals ( + thread_id TEXT PRIMARY KEY NOT NULL, + goal_id TEXT NOT NULL, + objective TEXT NOT NULL, + status TEXT NOT NULL, + token_budget INTEGER, + tokens_used INTEGER NOT NULL DEFAULT 0, + time_used_seconds INTEGER NOT NULL DEFAULT 0, + created_at_ms INTEGER NOT NULL, + updated_at_ms INTEGER NOT NULL + )`) + execSQL(t, goalsDB, `INSERT INTO thread_goals ( + thread_id, goal_id, objective, status, token_budget, tokens_used, time_used_seconds, created_at_ms, updated_at_ms + ) VALUES + ('thread-a', 'goal-a', '管理当前项目的智能体流程', 'active', 10000, 1200, 60, 1000, 2000)`) + + snapshot, err := Store{CodexHome: root}.Snapshot() + if err != nil { + t.Fatalf("Snapshot returned error: %v", err) + } + if len(snapshot.Threads) != 2 { + t.Fatalf("threads = %#v", snapshot.Threads) + } + first := snapshot.Threads[0] + if first.ID != "thread-a" || first.CWD != "/repo/a" || first.Title != "主线程" { + t.Fatalf("unexpected first thread identity: %#v", first) + } + if first.AgentNickname != "主控" || first.AgentRole != "智能体编排者" || first.Role != "智能体编排者" { + t.Fatalf("unexpected first thread agent fields: %#v", first) + } + if first.AgentPath != "/agents/orchestrator.toml" || first.Preview != "监管项目流程" || first.CreatedAt != "1000" || first.UpdatedAt != "2000" { + t.Fatalf("unexpected first thread metadata: %#v", first) + } + if len(snapshot.SpawnEdges) != 1 || snapshot.SpawnEdges[0].FromThreadID != "thread-a" || snapshot.SpawnEdges[0].ToThreadID != "thread-b" || snapshot.SpawnEdges[0].Reason != "spawned" { + t.Fatalf("unexpected current-schema edges: %#v", snapshot.SpawnEdges) + } + if len(snapshot.Goals) != 1 || snapshot.Goals[0].ThreadID != "thread-a" || snapshot.Goals[0].Goal != "管理当前项目的智能体流程" || snapshot.Goals[0].Status != "active" || snapshot.Goals[0].UpdatedAt != "2000" { + t.Fatalf("unexpected current-schema goals: %#v", snapshot.Goals) + } +} + func TestStoreMarksStateMissingWhenOnlyGoalsSQLiteExists(t *testing.T) { root := t.TempDir() createRuntimeSQLite(t, root) diff --git a/web/src/api/normalizers.js b/web/src/api/normalizers.js index e3fb5eb..fd1906a 100644 --- a/web/src/api/normalizers.js +++ b/web/src/api/normalizers.js @@ -30,6 +30,7 @@ const STATUS_LABELS = { failed: '失败', idle: '空闲', invalid: '无效', + active: '运行中', pending: '待处理', recent: '最近活跃', running: '运行中', @@ -164,18 +165,31 @@ export function normalizeAgents(payload = {}) { export function normalizeRuntime(payload = {}) { const threads = Array.isArray(payload.items) ? payload.items : [] const goals = Array.isArray(payload.goals) ? payload.goals : [] + const edges = Array.isArray(payload.edges) ? payload.edges : [] const goalsByThread = groupBy(goals, (goal) => goal.threadId) + const spawnStatusByThread = new Map( + edges + .filter((edge) => edge.toThreadId) + .map((edge) => [edge.toThreadId, normalizeSpawnStatus(edge.reason || edge.status)]), + ) const agents = threads.map((thread) => { const source = normalizeSource(thread.source, payload.source) const threadGoals = goalsByThread.get(thread.id) ?? [] + const status = thread.status || spawnStatusByThread.get(thread.id) || threadGoals.find((goal) => goal.status)?.status || 'unknown' + const goalText = threadGoals + .map((goal) => goal.goal || goal.objective) + .filter(Boolean) + .join(';') return { id: thread.id, - name: thread.role || thread.id || '未命名线程', - role: thread.id, - status: thread.status || 'unknown', - statusZh: formatStatus(thread.status), - goal: threadGoals.map((goal) => goal.goal).filter(Boolean).join(';') || '没有目标记录', - process: 'SQLite 只读快照', + name: thread.agentNickname || thread.agentRole || thread.title || thread.role || thread.id || '未命名线程', + role: thread.agentRole || thread.role || thread.agentPath || thread.id || '未记录角色', + projectPath: thread.cwd || '', + projectHints: [thread.title, thread.preview, thread.agentPath].filter(Boolean), + status, + statusZh: formatStatus(status), + goal: goalText || '没有目标记录', + process: thread.title || thread.preview || 'SQLite 只读快照', source: source.label, confidence: source.confidenceLabel, lastActivity: thread.updatedAt || thread.createdAt || '没有时间记录', @@ -187,7 +201,7 @@ export function normalizeRuntime(payload = {}) { agents, threads, goals, - edges: Array.isArray(payload.edges) ? payload.edges : [], + edges, source, isEmpty: threads.length === 0, emptyTitle: '没有运行线程', @@ -195,6 +209,28 @@ export function normalizeRuntime(payload = {}) { } } +export function filterRuntimeByProject(runtime = normalizeRuntime(), projectPath = '') { + const targetPath = normalizePath(projectPath) + const agents = runtime.agents.filter((agent) => belongsToProject(agent, targetPath)) + const threadIds = new Set(agents.map((agent) => agent.id)) + const threads = runtime.threads.filter((thread) => threadIds.has(thread.id)) + const goals = runtime.goals.filter((goal) => threadIds.has(goal.threadId)) + const edges = runtime.edges.filter((edge) => threadIds.has(edge.fromThreadId) && threadIds.has(edge.toThreadId)) + + return { + ...runtime, + agents, + threads, + goals, + edges, + isEmpty: agents.length === 0, + emptyTitle: '这个项目没有运行线程', + emptyText: projectPath + ? '当前选中的项目没有匹配到 Codex 运行线程;这里不会显示其他项目的智能体状态。' + : '请先选择一个项目。', + } +} + export function normalizeWorkflow(payload = {}) { const source = normalizeSource(payload.source) const phases = Array.isArray(payload.phases) @@ -327,6 +363,47 @@ function basename(path) { return String(path).split('/').filter(Boolean).at(-1) ?? String(path) } +function normalizePath(path) { + return String(path || '').replace(/\/+$/, '') +} + +function belongsToProject(agent, targetPath) { + if (!targetPath) { + return false + } + const agentPath = normalizePath(agent.projectPath) + if (agentPath === targetPath) { + return true + } + if (!agentPath || !targetPath.startsWith(`${agentPath}/`)) { + return false + } + return [agent.process, ...(agent.projectHints || [])].some((value) => containsPathToken(value, targetPath)) +} + +function normalizeSpawnStatus(status) { + if (status === 'open') { + return 'running' + } + if (status === 'closed') { + return 'complete' + } + return status || '' +} + +function containsPathToken(value, targetPath) { + const text = String(value || '') + let index = text.indexOf(targetPath) + while (index !== -1) { + const after = text[index + targetPath.length] || '' + if (!after || !/[A-Za-z0-9._/-]/.test(after)) { + return true + } + index = text.indexOf(targetPath, index + targetPath.length) + } + return false +} + function formatDateTime(value) { if (!value) { return '没有时间记录' diff --git a/web/src/api/normalizers.test.mjs b/web/src/api/normalizers.test.mjs index f0b35a8..f5cb0e3 100644 --- a/web/src/api/normalizers.test.mjs +++ b/web/src/api/normalizers.test.mjs @@ -9,6 +9,7 @@ import { formatStatus, normalizeAgent, normalizeDraftWriteback, + filterRuntimeByProject, normalizeProject, normalizeRuntime, normalizeValidationResult, @@ -134,6 +135,126 @@ test('normalizes empty runtime without falling back to fake real data', () => { assert.deepEqual(runtime.agents, []) }) +test('filters runtime to selected project and displays agent names from Codex metadata', () => { + const runtime = normalizeRuntime({ + items: [ + { + id: 'thread-a', + cwd: '/repo/a', + title: '主线程', + agentNickname: '主控', + agentRole: '智能体编排者', + status: '', + updatedAt: '2000', + source: { kind: 'sqlite_table', confidence: 'high' }, + }, + { + id: 'thread-b', + cwd: '/repo/b', + title: '审查线程', + agentNickname: '审查员', + agentRole: '代码审查员', + status: 'running', + source: { kind: 'sqlite_table', confidence: 'high' }, + }, + ], + goals: [ + { threadId: 'thread-a', goal: '管理当前项目的智能体流程', status: 'active' }, + ], + edges: [{ fromThreadId: 'thread-a', toThreadId: 'thread-b', reason: 'spawned' }], + source: { kind: 'sqlite_readonly', confidence: 'high' }, + }) + + const projectRuntime = filterRuntimeByProject(runtime, '/repo/a') + + assert.equal(projectRuntime.isEmpty, false) + assert.equal(projectRuntime.agents.length, 1) + assert.equal(projectRuntime.threads.length, 1) + assert.equal(projectRuntime.agents[0].name, '主控') + assert.equal(projectRuntime.agents[0].role, '智能体编排者') + assert.equal(projectRuntime.agents[0].projectPath, '/repo/a') + assert.equal(projectRuntime.agents[0].goal, '管理当前项目的智能体流程') + assert.equal(projectRuntime.agents[0].status, 'active') + assert.equal(projectRuntime.agents[0].statusZh, '运行中') +}) + +test('keeps project threads when Codex cwd is an ancestor but metadata names the project path', () => { + const runtime = normalizeRuntime({ + items: [ + { + id: 'thread-a', + cwd: '/Users/yoilun', + title: '处理 /Users/yoilun/Code/codex-agent-manager 的项目状态', + agentNickname: 'Nash', + agentRole: '代码审查员', + source: { kind: 'sqlite_table', confidence: 'high' }, + }, + { + id: 'thread-b', + cwd: '/Users/yoilun', + title: '处理其他项目', + agentNickname: '其他线程', + source: { kind: 'sqlite_table', confidence: 'high' }, + }, + ], + source: { kind: 'sqlite_readonly', confidence: 'high' }, + }) + + const projectRuntime = filterRuntimeByProject(runtime, '/Users/yoilun/Code/codex-agent-manager') + + assert.equal(projectRuntime.agents.length, 1) + assert.equal(projectRuntime.agents[0].id, 'thread-a') + assert.equal(projectRuntime.agents[0].projectPath, '/Users/yoilun') +}) + +test('does not match sibling projects that only share a path prefix', () => { + const runtime = normalizeRuntime({ + items: [ + { + id: 'sibling', + cwd: '/Users/yoilun', + title: '处理 /Users/yoilun/Code/codex-agent-manager-old 的项目状态', + agentNickname: '同名前缀项目', + source: { kind: 'sqlite_table', confidence: 'high' }, + }, + { + id: 'target', + cwd: '/Users/yoilun', + title: '处理 /Users/yoilun/Code/codex-agent-manager 当前工作树', + agentNickname: '目标项目', + source: { kind: 'sqlite_table', confidence: 'high' }, + }, + ], + source: { kind: 'sqlite_readonly', confidence: 'high' }, + }) + + const projectRuntime = filterRuntimeByProject(runtime, '/Users/yoilun/Code/codex-agent-manager') + + assert.deepEqual(projectRuntime.agents.map((agent) => agent.id), ['target']) +}) + +test('infers runtime status from Codex spawn edge state when thread status is absent', () => { + const runtime = normalizeRuntime({ + items: [ + { id: 'parent', cwd: '/repo/a', agentNickname: '主控', source: { kind: 'sqlite_table', confidence: 'high' } }, + { id: 'child-open', cwd: '/repo/a', agentNickname: '运行审查员', source: { kind: 'sqlite_table', confidence: 'high' } }, + { id: 'child-closed', cwd: '/repo/a', agentNickname: '已完成审查员', source: { kind: 'sqlite_table', confidence: 'high' } }, + ], + edges: [ + { fromThreadId: 'parent', toThreadId: 'child-open', reason: 'open' }, + { fromThreadId: 'parent', toThreadId: 'child-closed', reason: 'closed' }, + ], + source: { kind: 'sqlite_readonly', confidence: 'high' }, + }) + + const byId = new Map(runtime.agents.map((agent) => [agent.id, agent])) + + assert.equal(byId.get('child-open').status, 'running') + assert.equal(byId.get('child-open').statusZh, '运行中') + assert.equal(byId.get('child-closed').status, 'complete') + assert.equal(byId.get('child-closed').statusZh, '已完成') +}) + test('normalizes empty workflow with source evidence and no sample edges', () => { const workflow = normalizeWorkflow({ items: [], diff --git a/web/src/views/ProjectView.vue b/web/src/views/ProjectView.vue index dab2525..3f66bf8 100644 --- a/web/src/views/ProjectView.vue +++ b/web/src/views/ProjectView.vue @@ -2,7 +2,7 @@ import { computed, onMounted, ref } from 'vue' import StatusBadge from '../components/StatusBadge.vue' import { apiClient } from '../api/client' -import { normalizeProjects, normalizeRuntime } from '../api/normalizers' +import { filterRuntimeByProject, normalizeProjects, normalizeRuntime } from '../api/normalizers' import { agentMatrix as sampleAgentMatrix, projects as sampleProjects } from '../data' const loading = ref(true) @@ -12,7 +12,8 @@ const runtimeState = ref(normalizeRuntime()) const selectedProjectId = ref('') const selectedProject = computed(() => projectState.value.projects.find((project) => project.id === selectedProjectId.value)) -const selectedAgent = computed(() => runtimeState.value.agents[0]) +const projectRuntime = computed(() => filterRuntimeByProject(runtimeState.value, selectedProject.value?.path || '')) +const selectedAgent = computed(() => projectRuntime.value.agents[0]) onMounted(loadReadonlyData) @@ -105,13 +106,13 @@ async function loadReadonlyData() { 连接失败

无法读取 `/api/runtime/threads`,下面仅显示明确标注的示例状态。

-
- {{ runtimeState.emptyTitle }} -

{{ runtimeState.emptyText }}

+
+ {{ projectRuntime.emptyTitle }} +

{{ projectRuntime.emptyText }}

-
+
智能体 状态 @@ -119,7 +120,7 @@ async function loadReadonlyData() { 进程 最近活动
-
+
{{ agent.name }} {{ agent.role }} @@ -177,7 +178,7 @@ async function loadReadonlyData() {
项目数{{ projectState.projects.length }} - 线程数{{ runtimeState.threads.length }} + 当前项目线程数{{ projectRuntime.threads.length }} 接口{{ error ? '连接失败' : '只读连接' }}