fix: show real project runtime agents

This commit is contained in:
Yoilun
2026-05-25 23:40:52 +08:00
parent 4262191462
commit b6648f384d
6 changed files with 371 additions and 26 deletions

View File

@@ -11,12 +11,19 @@ type Snapshot struct {
type SourceMap map[string]SourceEvidence
type Thread struct {
ID string `json:"id"`
Role string `json:"role"`
Status string `json:"status"`
CreatedAt string `json:"createdAt"`
UpdatedAt string `json:"updatedAt"`
Source SourceEvidence `json:"source"`
ID string `json:"id"`
Role string `json:"role"`
Status string `json:"status"`
CreatedAt string `json:"createdAt"`
UpdatedAt string `json:"updatedAt"`
CWD string `json:"cwd"`
Title string `json:"title"`
AgentNickname string `json:"agentNickname,omitempty"`
AgentRole string `json:"agentRole,omitempty"`
AgentPath string `json:"agentPath,omitempty"`
ThreadSource string `json:"threadSource,omitempty"`
Preview string `json:"preview,omitempty"`
Source SourceEvidence `json:"source"`
}
type SpawnEdge struct {

View File

@@ -121,7 +121,20 @@ func readThreads(db *sql.DB, sourcePath string, sources SourceMap) ([]Thread, er
sources["threads"] = tableSource("sqlite_schema_drift", sourcePath, "threads table is missing required id column.")
return []Thread{}, nil
}
query := `SELECT ` + textColumn(columns, "id") + `, ` + textColumn(columns, "role") + `, ` + textColumn(columns, "status") + `, ` + textColumn(columns, "created_at") + `, ` + textColumn(columns, "updated_at") + ` FROM threads ORDER BY ` + orderBy(columns, "created_at", "id")
query := `SELECT ` +
textColumn(columns, "id") + `, ` +
firstTextColumn(columns, "agent_role", "role") + `, ` +
textColumn(columns, "status") + `, ` +
firstTextColumn(columns, "created_at_ms", "created_at") + `, ` +
firstTextColumn(columns, "updated_at_ms", "updated_at") + `, ` +
textColumn(columns, "cwd") + `, ` +
textColumn(columns, "title") + `, ` +
textColumn(columns, "agent_nickname") + `, ` +
textColumn(columns, "agent_role") + `, ` +
textColumn(columns, "agent_path") + `, ` +
textColumn(columns, "thread_source") + `, ` +
textColumn(columns, "preview") +
` FROM threads ORDER BY ` + orderBy(columns, "created_at_ms", "created_at", "id")
rows, err := db.Query(query)
if err != nil {
if isMissingTable(err) {
@@ -133,7 +146,20 @@ func readThreads(db *sql.DB, sourcePath string, sources SourceMap) ([]Thread, er
var threads []Thread
for rows.Next() {
var item Thread
if err := rows.Scan(&item.ID, &item.Role, &item.Status, &item.CreatedAt, &item.UpdatedAt); err != nil {
if err := rows.Scan(
&item.ID,
&item.Role,
&item.Status,
&item.CreatedAt,
&item.UpdatedAt,
&item.CWD,
&item.Title,
&item.AgentNickname,
&item.AgentRole,
&item.AgentPath,
&item.ThreadSource,
&item.Preview,
); err != nil {
return nil, err
}
item.Source = SourceEvidence{Kind: "sqlite_table", Path: sourcePath, Confidence: "high"}
@@ -152,11 +178,16 @@ func readSpawnEdges(db *sql.DB, sourcePath string, sources SourceMap) ([]SpawnEd
sources["thread_spawn_edges"] = tableSource("sqlite_missing_table", sourcePath, "thread_spawn_edges table was not found.")
return []SpawnEdge{}, nil
}
if !columns["from_thread_id"] || !columns["to_thread_id"] {
if !(columns["from_thread_id"] && columns["to_thread_id"]) && !(columns["parent_thread_id"] && columns["child_thread_id"]) {
sources["thread_spawn_edges"] = tableSource("sqlite_schema_drift", sourcePath, "thread_spawn_edges table is missing required endpoint columns.")
return []SpawnEdge{}, nil
}
query := `SELECT ` + textColumn(columns, "from_thread_id") + `, ` + textColumn(columns, "to_thread_id") + `, ` + textColumn(columns, "reason") + `, ` + textColumn(columns, "created_at") + ` FROM thread_spawn_edges ORDER BY ` + orderBy(columns, "created_at", "from_thread_id", "to_thread_id")
query := `SELECT ` +
firstTextColumn(columns, "from_thread_id", "parent_thread_id") + `, ` +
firstTextColumn(columns, "to_thread_id", "child_thread_id") + `, ` +
firstTextColumn(columns, "reason", "status") + `, ` +
textColumn(columns, "created_at") +
` FROM thread_spawn_edges ORDER BY ` + orderBy(columns, "created_at", "from_thread_id", "parent_thread_id", "to_thread_id", "child_thread_id")
rows, err := db.Query(query)
if err != nil {
if isMissingTable(err) {
@@ -191,7 +222,12 @@ func readGoals(db *sql.DB, sourcePath string, sources SourceMap) ([]Goal, error)
sources["thread_goals"] = tableSource("sqlite_schema_drift", sourcePath, "thread_goals table is missing required thread_id column.")
return []Goal{}, nil
}
query := `SELECT ` + textColumn(columns, "thread_id") + `, ` + textColumn(columns, "goal") + `, ` + textColumn(columns, "status") + `, ` + textColumn(columns, "updated_at") + ` FROM thread_goals ORDER BY ` + orderBy(columns, "updated_at", "thread_id")
query := `SELECT ` +
textColumn(columns, "thread_id") + `, ` +
firstTextColumn(columns, "goal", "objective") + `, ` +
textColumn(columns, "status") + `, ` +
firstTextColumn(columns, "updated_at_ms", "updated_at") +
` FROM thread_goals ORDER BY ` + orderBy(columns, "updated_at_ms", "updated_at", "thread_id")
rows, err := db.Query(query)
if err != nil {
if isMissingTable(err) {
@@ -244,6 +280,15 @@ func textColumn(columns map[string]bool, name string) string {
return "COALESCE(CAST(" + name + " AS TEXT), '')"
}
func firstTextColumn(columns map[string]bool, names ...string) string {
for _, name := range names {
if columns[name] {
return textColumn(columns, name)
}
}
return "''"
}
func orderBy(columns map[string]bool, names ...string) string {
var order []string
for _, name := range names {

View File

@@ -66,6 +66,100 @@ func TestStoreReadsThreadsEdgesAndGoalsFromReadonlySQLite(t *testing.T) {
}
}
func TestStoreReadsCurrentCodexRuntimeSchema(t *testing.T) {
root := t.TempDir()
stateDB := openWritableSQLite(t, filepath.Join(root, "state_5.sqlite"))
defer stateDB.Close()
execSQL(t, stateDB, `CREATE TABLE threads (
id TEXT PRIMARY KEY,
rollout_path TEXT NOT NULL,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
source TEXT NOT NULL,
model_provider TEXT NOT NULL,
cwd TEXT NOT NULL,
title TEXT NOT NULL,
sandbox_policy TEXT NOT NULL,
approval_mode TEXT NOT NULL,
tokens_used INTEGER NOT NULL DEFAULT 0,
has_user_event INTEGER NOT NULL DEFAULT 0,
archived INTEGER NOT NULL DEFAULT 0,
archived_at INTEGER,
git_sha TEXT,
git_branch TEXT,
git_origin_url TEXT,
cli_version TEXT NOT NULL DEFAULT '',
first_user_message TEXT NOT NULL DEFAULT '',
agent_nickname TEXT,
agent_role TEXT,
memory_mode TEXT NOT NULL DEFAULT 'enabled',
model TEXT,
reasoning_effort TEXT,
agent_path TEXT,
created_at_ms INTEGER,
updated_at_ms INTEGER,
thread_source TEXT,
preview TEXT NOT NULL DEFAULT ''
)`)
execSQL(t, stateDB, `CREATE TABLE thread_spawn_edges (
parent_thread_id TEXT NOT NULL,
child_thread_id TEXT NOT NULL PRIMARY KEY,
status TEXT NOT NULL
)`)
execSQL(t, stateDB, `INSERT INTO threads (
id, rollout_path, created_at, updated_at, source, model_provider, cwd, title, sandbox_policy, approval_mode,
cli_version, agent_nickname, agent_role, agent_path, created_at_ms, updated_at_ms, thread_source, preview
) VALUES
('thread-a', '/tmp/a.jsonl', 1, 2, 'codex', 'openai', '/repo/a', '主线程', 'workspace-write', 'on-request',
'0.0.1', '主控', '智能体编排者', '/agents/orchestrator.toml', 1000, 2000, 'cli', '监管项目流程'),
('thread-b', '/tmp/b.jsonl', 3, 4, 'codex', 'openai', '/repo/b', '审查线程', 'workspace-write', 'on-request',
'0.0.1', '审查员', '代码审查员', '/agents/reviewer.toml', 3000, 4000, 'cli', '审查实现')`)
execSQL(t, stateDB, `INSERT INTO thread_spawn_edges (parent_thread_id, child_thread_id, status) VALUES
('thread-a', 'thread-b', 'spawned')`)
goalsDB := openWritableSQLite(t, filepath.Join(root, "goals_1.sqlite"))
defer goalsDB.Close()
execSQL(t, goalsDB, `CREATE TABLE thread_goals (
thread_id TEXT PRIMARY KEY NOT NULL,
goal_id TEXT NOT NULL,
objective TEXT NOT NULL,
status TEXT NOT NULL,
token_budget INTEGER,
tokens_used INTEGER NOT NULL DEFAULT 0,
time_used_seconds INTEGER NOT NULL DEFAULT 0,
created_at_ms INTEGER NOT NULL,
updated_at_ms INTEGER NOT NULL
)`)
execSQL(t, goalsDB, `INSERT INTO thread_goals (
thread_id, goal_id, objective, status, token_budget, tokens_used, time_used_seconds, created_at_ms, updated_at_ms
) VALUES
('thread-a', 'goal-a', '管理当前项目的智能体流程', 'active', 10000, 1200, 60, 1000, 2000)`)
snapshot, err := Store{CodexHome: root}.Snapshot()
if err != nil {
t.Fatalf("Snapshot returned error: %v", err)
}
if len(snapshot.Threads) != 2 {
t.Fatalf("threads = %#v", snapshot.Threads)
}
first := snapshot.Threads[0]
if first.ID != "thread-a" || first.CWD != "/repo/a" || first.Title != "主线程" {
t.Fatalf("unexpected first thread identity: %#v", first)
}
if first.AgentNickname != "主控" || first.AgentRole != "智能体编排者" || first.Role != "智能体编排者" {
t.Fatalf("unexpected first thread agent fields: %#v", first)
}
if first.AgentPath != "/agents/orchestrator.toml" || first.Preview != "监管项目流程" || first.CreatedAt != "1000" || first.UpdatedAt != "2000" {
t.Fatalf("unexpected first thread metadata: %#v", first)
}
if len(snapshot.SpawnEdges) != 1 || snapshot.SpawnEdges[0].FromThreadID != "thread-a" || snapshot.SpawnEdges[0].ToThreadID != "thread-b" || snapshot.SpawnEdges[0].Reason != "spawned" {
t.Fatalf("unexpected current-schema edges: %#v", snapshot.SpawnEdges)
}
if len(snapshot.Goals) != 1 || snapshot.Goals[0].ThreadID != "thread-a" || snapshot.Goals[0].Goal != "管理当前项目的智能体流程" || snapshot.Goals[0].Status != "active" || snapshot.Goals[0].UpdatedAt != "2000" {
t.Fatalf("unexpected current-schema goals: %#v", snapshot.Goals)
}
}
func TestStoreMarksStateMissingWhenOnlyGoalsSQLiteExists(t *testing.T) {
root := t.TempDir()
createRuntimeSQLite(t, root)

View File

@@ -30,6 +30,7 @@ const STATUS_LABELS = {
failed: '失败',
idle: '空闲',
invalid: '无效',
active: '运行中',
pending: '待处理',
recent: '最近活跃',
running: '运行中',
@@ -164,18 +165,31 @@ export function normalizeAgents(payload = {}) {
export function normalizeRuntime(payload = {}) {
const threads = Array.isArray(payload.items) ? payload.items : []
const goals = Array.isArray(payload.goals) ? payload.goals : []
const edges = Array.isArray(payload.edges) ? payload.edges : []
const goalsByThread = groupBy(goals, (goal) => goal.threadId)
const spawnStatusByThread = new Map(
edges
.filter((edge) => edge.toThreadId)
.map((edge) => [edge.toThreadId, normalizeSpawnStatus(edge.reason || edge.status)]),
)
const agents = threads.map((thread) => {
const source = normalizeSource(thread.source, payload.source)
const threadGoals = goalsByThread.get(thread.id) ?? []
const status = thread.status || spawnStatusByThread.get(thread.id) || threadGoals.find((goal) => goal.status)?.status || 'unknown'
const goalText = threadGoals
.map((goal) => goal.goal || goal.objective)
.filter(Boolean)
.join('')
return {
id: thread.id,
name: thread.role || thread.id || '未命名线程',
role: thread.id,
status: thread.status || 'unknown',
statusZh: formatStatus(thread.status),
goal: threadGoals.map((goal) => goal.goal).filter(Boolean).join('') || '没有目标记录',
process: 'SQLite 只读快照',
name: thread.agentNickname || thread.agentRole || thread.title || thread.role || thread.id || '未命名线程',
role: thread.agentRole || thread.role || thread.agentPath || thread.id || '未记录角色',
projectPath: thread.cwd || '',
projectHints: [thread.title, thread.preview, thread.agentPath].filter(Boolean),
status,
statusZh: formatStatus(status),
goal: goalText || '没有目标记录',
process: thread.title || thread.preview || 'SQLite 只读快照',
source: source.label,
confidence: source.confidenceLabel,
lastActivity: thread.updatedAt || thread.createdAt || '没有时间记录',
@@ -187,7 +201,7 @@ export function normalizeRuntime(payload = {}) {
agents,
threads,
goals,
edges: Array.isArray(payload.edges) ? payload.edges : [],
edges,
source,
isEmpty: threads.length === 0,
emptyTitle: '没有运行线程',
@@ -195,6 +209,28 @@ export function normalizeRuntime(payload = {}) {
}
}
export function filterRuntimeByProject(runtime = normalizeRuntime(), projectPath = '') {
const targetPath = normalizePath(projectPath)
const agents = runtime.agents.filter((agent) => belongsToProject(agent, targetPath))
const threadIds = new Set(agents.map((agent) => agent.id))
const threads = runtime.threads.filter((thread) => threadIds.has(thread.id))
const goals = runtime.goals.filter((goal) => threadIds.has(goal.threadId))
const edges = runtime.edges.filter((edge) => threadIds.has(edge.fromThreadId) && threadIds.has(edge.toThreadId))
return {
...runtime,
agents,
threads,
goals,
edges,
isEmpty: agents.length === 0,
emptyTitle: '这个项目没有运行线程',
emptyText: projectPath
? '当前选中的项目没有匹配到 Codex 运行线程;这里不会显示其他项目的智能体状态。'
: '请先选择一个项目。',
}
}
export function normalizeWorkflow(payload = {}) {
const source = normalizeSource(payload.source)
const phases = Array.isArray(payload.phases)
@@ -327,6 +363,47 @@ function basename(path) {
return String(path).split('/').filter(Boolean).at(-1) ?? String(path)
}
function normalizePath(path) {
return String(path || '').replace(/\/+$/, '')
}
function belongsToProject(agent, targetPath) {
if (!targetPath) {
return false
}
const agentPath = normalizePath(agent.projectPath)
if (agentPath === targetPath) {
return true
}
if (!agentPath || !targetPath.startsWith(`${agentPath}/`)) {
return false
}
return [agent.process, ...(agent.projectHints || [])].some((value) => containsPathToken(value, targetPath))
}
function normalizeSpawnStatus(status) {
if (status === 'open') {
return 'running'
}
if (status === 'closed') {
return 'complete'
}
return status || ''
}
function containsPathToken(value, targetPath) {
const text = String(value || '')
let index = text.indexOf(targetPath)
while (index !== -1) {
const after = text[index + targetPath.length] || ''
if (!after || !/[A-Za-z0-9._/-]/.test(after)) {
return true
}
index = text.indexOf(targetPath, index + targetPath.length)
}
return false
}
function formatDateTime(value) {
if (!value) {
return '没有时间记录'

View File

@@ -9,6 +9,7 @@ import {
formatStatus,
normalizeAgent,
normalizeDraftWriteback,
filterRuntimeByProject,
normalizeProject,
normalizeRuntime,
normalizeValidationResult,
@@ -134,6 +135,126 @@ test('normalizes empty runtime without falling back to fake real data', () => {
assert.deepEqual(runtime.agents, [])
})
test('filters runtime to selected project and displays agent names from Codex metadata', () => {
const runtime = normalizeRuntime({
items: [
{
id: 'thread-a',
cwd: '/repo/a',
title: '主线程',
agentNickname: '主控',
agentRole: '智能体编排者',
status: '',
updatedAt: '2000',
source: { kind: 'sqlite_table', confidence: 'high' },
},
{
id: 'thread-b',
cwd: '/repo/b',
title: '审查线程',
agentNickname: '审查员',
agentRole: '代码审查员',
status: 'running',
source: { kind: 'sqlite_table', confidence: 'high' },
},
],
goals: [
{ threadId: 'thread-a', goal: '管理当前项目的智能体流程', status: 'active' },
],
edges: [{ fromThreadId: 'thread-a', toThreadId: 'thread-b', reason: 'spawned' }],
source: { kind: 'sqlite_readonly', confidence: 'high' },
})
const projectRuntime = filterRuntimeByProject(runtime, '/repo/a')
assert.equal(projectRuntime.isEmpty, false)
assert.equal(projectRuntime.agents.length, 1)
assert.equal(projectRuntime.threads.length, 1)
assert.equal(projectRuntime.agents[0].name, '主控')
assert.equal(projectRuntime.agents[0].role, '智能体编排者')
assert.equal(projectRuntime.agents[0].projectPath, '/repo/a')
assert.equal(projectRuntime.agents[0].goal, '管理当前项目的智能体流程')
assert.equal(projectRuntime.agents[0].status, 'active')
assert.equal(projectRuntime.agents[0].statusZh, '运行中')
})
test('keeps project threads when Codex cwd is an ancestor but metadata names the project path', () => {
const runtime = normalizeRuntime({
items: [
{
id: 'thread-a',
cwd: '/Users/yoilun',
title: '处理 /Users/yoilun/Code/codex-agent-manager 的项目状态',
agentNickname: 'Nash',
agentRole: '代码审查员',
source: { kind: 'sqlite_table', confidence: 'high' },
},
{
id: 'thread-b',
cwd: '/Users/yoilun',
title: '处理其他项目',
agentNickname: '其他线程',
source: { kind: 'sqlite_table', confidence: 'high' },
},
],
source: { kind: 'sqlite_readonly', confidence: 'high' },
})
const projectRuntime = filterRuntimeByProject(runtime, '/Users/yoilun/Code/codex-agent-manager')
assert.equal(projectRuntime.agents.length, 1)
assert.equal(projectRuntime.agents[0].id, 'thread-a')
assert.equal(projectRuntime.agents[0].projectPath, '/Users/yoilun')
})
test('does not match sibling projects that only share a path prefix', () => {
const runtime = normalizeRuntime({
items: [
{
id: 'sibling',
cwd: '/Users/yoilun',
title: '处理 /Users/yoilun/Code/codex-agent-manager-old 的项目状态',
agentNickname: '同名前缀项目',
source: { kind: 'sqlite_table', confidence: 'high' },
},
{
id: 'target',
cwd: '/Users/yoilun',
title: '处理 /Users/yoilun/Code/codex-agent-manager 当前工作树',
agentNickname: '目标项目',
source: { kind: 'sqlite_table', confidence: 'high' },
},
],
source: { kind: 'sqlite_readonly', confidence: 'high' },
})
const projectRuntime = filterRuntimeByProject(runtime, '/Users/yoilun/Code/codex-agent-manager')
assert.deepEqual(projectRuntime.agents.map((agent) => agent.id), ['target'])
})
test('infers runtime status from Codex spawn edge state when thread status is absent', () => {
const runtime = normalizeRuntime({
items: [
{ id: 'parent', cwd: '/repo/a', agentNickname: '主控', source: { kind: 'sqlite_table', confidence: 'high' } },
{ id: 'child-open', cwd: '/repo/a', agentNickname: '运行审查员', source: { kind: 'sqlite_table', confidence: 'high' } },
{ id: 'child-closed', cwd: '/repo/a', agentNickname: '已完成审查员', source: { kind: 'sqlite_table', confidence: 'high' } },
],
edges: [
{ fromThreadId: 'parent', toThreadId: 'child-open', reason: 'open' },
{ fromThreadId: 'parent', toThreadId: 'child-closed', reason: 'closed' },
],
source: { kind: 'sqlite_readonly', confidence: 'high' },
})
const byId = new Map(runtime.agents.map((agent) => [agent.id, agent]))
assert.equal(byId.get('child-open').status, 'running')
assert.equal(byId.get('child-open').statusZh, '运行中')
assert.equal(byId.get('child-closed').status, 'complete')
assert.equal(byId.get('child-closed').statusZh, '已完成')
})
test('normalizes empty workflow with source evidence and no sample edges', () => {
const workflow = normalizeWorkflow({
items: [],

View File

@@ -2,7 +2,7 @@
import { computed, onMounted, ref } from 'vue'
import StatusBadge from '../components/StatusBadge.vue'
import { apiClient } from '../api/client'
import { normalizeProjects, normalizeRuntime } from '../api/normalizers'
import { filterRuntimeByProject, normalizeProjects, normalizeRuntime } from '../api/normalizers'
import { agentMatrix as sampleAgentMatrix, projects as sampleProjects } from '../data'
const loading = ref(true)
@@ -12,7 +12,8 @@ const runtimeState = ref(normalizeRuntime())
const selectedProjectId = ref('')
const selectedProject = computed(() => projectState.value.projects.find((project) => project.id === selectedProjectId.value))
const selectedAgent = computed(() => runtimeState.value.agents[0])
const projectRuntime = computed(() => filterRuntimeByProject(runtimeState.value, selectedProject.value?.path || ''))
const selectedAgent = computed(() => projectRuntime.value.agents[0])
onMounted(loadReadonlyData)
@@ -105,13 +106,13 @@ async function loadReadonlyData() {
<strong>连接失败</strong>
<p>无法读取 `/api/runtime/threads`下面仅显示明确标注的示例状态</p>
</div>
<div v-else-if="runtimeState.isEmpty" class="empty-state compact">
<strong>{{ runtimeState.emptyTitle }}</strong>
<p>{{ runtimeState.emptyText }}</p>
<div v-else-if="projectRuntime.isEmpty" class="empty-state compact">
<strong>{{ projectRuntime.emptyTitle }}</strong>
<p>{{ projectRuntime.emptyText }}</p>
<StatusBadge label="空数据" status="unknown" :source="runtimeState.source.label" :confidence="runtimeState.source.confidenceLabel" />
</div>
<div v-if="!loading && !error && !runtimeState.isEmpty" class="matrix-table" role="table" aria-label="智能体状态矩阵">
<div v-if="!loading && !error && !projectRuntime.isEmpty" class="matrix-table" role="table" aria-label="智能体状态矩阵">
<div class="matrix-row head" role="row">
<span role="columnheader">智能体</span>
<span role="columnheader">状态</span>
@@ -119,7 +120,7 @@ async function loadReadonlyData() {
<span role="columnheader">进程</span>
<span role="columnheader">最近活动</span>
</div>
<div v-for="agent in runtimeState.agents" :key="agent.id" class="matrix-row" role="row">
<div v-for="agent in projectRuntime.agents" :key="agent.id" class="matrix-row" role="row">
<span role="cell">
<strong>{{ agent.name }}</strong>
<small>{{ agent.role }}</small>
@@ -177,7 +178,7 @@ async function loadReadonlyData() {
</div>
<div class="detail-grid">
<span>项目数</span><strong>{{ projectState.projects.length }}</strong>
<span>线程数</span><strong>{{ runtimeState.threads.length }}</strong>
<span>当前项目线程数</span><strong>{{ projectRuntime.threads.length }}</strong>
<span>接口</span><strong>{{ error ? '连接失败' : '只读连接' }}</strong>
</div>
</aside>