fix: tolerate runtime schema drift

This commit is contained in:
Yoilun
2026-05-25 18:43:46 +08:00
parent bb8b8fe732
commit 69e1af7a17
8 changed files with 313 additions and 25 deletions

View File

@@ -44,11 +44,11 @@ func (s Store) Snapshot() (Snapshot, error) {
return Snapshot{}, err
}
defer db.Close()
snapshot.Threads, err = readThreads(db, statePath)
snapshot.Threads, err = readThreads(db, statePath, snapshot.Sources)
if err != nil {
return Snapshot{}, err
}
snapshot.SpawnEdges, err = readSpawnEdges(db, statePath)
snapshot.SpawnEdges, err = readSpawnEdges(db, statePath, snapshot.Sources)
if err != nil {
return Snapshot{}, err
}
@@ -59,7 +59,7 @@ func (s Store) Snapshot() (Snapshot, error) {
return Snapshot{}, err
}
defer db.Close()
snapshot.Goals, err = readGoals(db, goalsPath)
snapshot.Goals, err = readGoals(db, goalsPath, snapshot.Sources)
if err != nil {
return Snapshot{}, err
}
@@ -94,7 +94,7 @@ func aggregateSource(sources SourceMap) SourceEvidence {
}
return SourceEvidence{
Kind: "sqlite_partial",
Confidence: "partial",
Confidence: "medium",
Message: "One Codex SQLite source is missing; available data was read from existing sources only.",
}
}
@@ -108,8 +108,21 @@ func openReadonlySQLite(path string) (*sql.DB, error) {
return sql.Open("sqlite", uri.String())
}
func readThreads(db *sql.DB, sourcePath string) ([]Thread, error) {
rows, err := db.Query(`SELECT id, role, status, created_at, updated_at FROM threads ORDER BY created_at, id`)
func readThreads(db *sql.DB, sourcePath string, sources SourceMap) ([]Thread, error) {
columns, err := tableColumns(db, "threads")
if err != nil {
return nil, err
}
if len(columns) == 0 {
sources["threads"] = tableSource("sqlite_missing_table", sourcePath, "threads table was not found.")
return []Thread{}, nil
}
if !columns["id"] {
sources["threads"] = tableSource("sqlite_schema_drift", sourcePath, "threads table is missing required id column.")
return []Thread{}, nil
}
query := `SELECT ` + textColumn(columns, "id") + `, ` + textColumn(columns, "role") + `, ` + textColumn(columns, "status") + `, ` + textColumn(columns, "created_at") + `, ` + textColumn(columns, "updated_at") + ` FROM threads ORDER BY ` + orderBy(columns, "created_at", "id")
rows, err := db.Query(query)
if err != nil {
if isMissingTable(err) {
return []Thread{}, nil
@@ -126,11 +139,25 @@ func readThreads(db *sql.DB, sourcePath string) ([]Thread, error) {
item.Source = SourceEvidence{Kind: "sqlite_table", Path: sourcePath, Confidence: "high"}
threads = append(threads, item)
}
sources["threads"] = SourceEvidence{Kind: "sqlite_table", Path: sourcePath, Confidence: "high"}
return threads, rows.Err()
}
func readSpawnEdges(db *sql.DB, sourcePath string) ([]SpawnEdge, error) {
rows, err := db.Query(`SELECT from_thread_id, to_thread_id, reason, created_at FROM thread_spawn_edges ORDER BY created_at, from_thread_id, to_thread_id`)
func readSpawnEdges(db *sql.DB, sourcePath string, sources SourceMap) ([]SpawnEdge, error) {
columns, err := tableColumns(db, "thread_spawn_edges")
if err != nil {
return nil, err
}
if len(columns) == 0 {
sources["thread_spawn_edges"] = tableSource("sqlite_missing_table", sourcePath, "thread_spawn_edges table was not found.")
return []SpawnEdge{}, nil
}
if !columns["from_thread_id"] || !columns["to_thread_id"] {
sources["thread_spawn_edges"] = tableSource("sqlite_schema_drift", sourcePath, "thread_spawn_edges table is missing required endpoint columns.")
return []SpawnEdge{}, nil
}
query := `SELECT ` + textColumn(columns, "from_thread_id") + `, ` + textColumn(columns, "to_thread_id") + `, ` + textColumn(columns, "reason") + `, ` + textColumn(columns, "created_at") + ` FROM thread_spawn_edges ORDER BY ` + orderBy(columns, "created_at", "from_thread_id", "to_thread_id")
rows, err := db.Query(query)
if err != nil {
if isMissingTable(err) {
return []SpawnEdge{}, nil
@@ -147,11 +174,25 @@ func readSpawnEdges(db *sql.DB, sourcePath string) ([]SpawnEdge, error) {
item.Source = SourceEvidence{Kind: "sqlite_table", Path: sourcePath, Confidence: "high"}
edges = append(edges, item)
}
sources["thread_spawn_edges"] = SourceEvidence{Kind: "sqlite_table", Path: sourcePath, Confidence: "high"}
return edges, rows.Err()
}
func readGoals(db *sql.DB, sourcePath string) ([]Goal, error) {
rows, err := db.Query(`SELECT thread_id, goal, status, updated_at FROM thread_goals ORDER BY updated_at, thread_id`)
func readGoals(db *sql.DB, sourcePath string, sources SourceMap) ([]Goal, error) {
columns, err := tableColumns(db, "thread_goals")
if err != nil {
return nil, err
}
if len(columns) == 0 {
sources["thread_goals"] = tableSource("sqlite_missing_table", sourcePath, "thread_goals table was not found.")
return []Goal{}, nil
}
if !columns["thread_id"] {
sources["thread_goals"] = tableSource("sqlite_schema_drift", sourcePath, "thread_goals table is missing required thread_id column.")
return []Goal{}, nil
}
query := `SELECT ` + textColumn(columns, "thread_id") + `, ` + textColumn(columns, "goal") + `, ` + textColumn(columns, "status") + `, ` + textColumn(columns, "updated_at") + ` FROM thread_goals ORDER BY ` + orderBy(columns, "updated_at", "thread_id")
rows, err := db.Query(query)
if err != nil {
if isMissingTable(err) {
return []Goal{}, nil
@@ -168,9 +209,58 @@ func readGoals(db *sql.DB, sourcePath string) ([]Goal, error) {
item.Source = SourceEvidence{Kind: "sqlite_table", Path: sourcePath, Confidence: "high"}
goals = append(goals, item)
}
sources["thread_goals"] = SourceEvidence{Kind: "sqlite_table", Path: sourcePath, Confidence: "high"}
return goals, rows.Err()
}
func tableColumns(db *sql.DB, table string) (map[string]bool, error) {
rows, err := db.Query(`PRAGMA table_info(` + table + `)`)
if err != nil {
return nil, err
}
defer rows.Close()
columns := map[string]bool{}
for rows.Next() {
var cid sql.NullInt64
var name sql.NullString
var typ sql.NullString
var notNull sql.NullInt64
var defaultValue sql.NullString
var pk sql.NullInt64
if err := rows.Scan(&cid, &name, &typ, &notNull, &defaultValue, &pk); err != nil {
return nil, err
}
if name.Valid {
columns[name.String] = true
}
}
return columns, rows.Err()
}
func textColumn(columns map[string]bool, name string) string {
if !columns[name] {
return "''"
}
return "COALESCE(CAST(" + name + " AS TEXT), '')"
}
func orderBy(columns map[string]bool, names ...string) string {
var order []string
for _, name := range names {
if columns[name] {
order = append(order, name)
}
}
if len(order) == 0 {
return "rowid"
}
return strings.Join(order, ", ")
}
func tableSource(kind string, sourcePath string, message string) SourceEvidence {
return SourceEvidence{Kind: kind, Path: sourcePath, Confidence: "low", Message: message}
}
func fileExists(path string) bool {
info, err := os.Stat(path)
return err == nil && !info.IsDir()

View File

@@ -10,7 +10,8 @@ import (
)
func TestStoreMissingSQLiteReturnsEmptySnapshot(t *testing.T) {
snapshot, err := Store{CodexHome: t.TempDir()}.Snapshot()
root := t.TempDir()
snapshot, err := Store{CodexHome: root}.Snapshot()
if err != nil {
t.Fatalf("Snapshot returned error: %v", err)
}
@@ -26,6 +27,12 @@ func TestStoreMissingSQLiteReturnsEmptySnapshot(t *testing.T) {
if source := snapshot.Sources["goals"]; source.Kind != "sqlite_missing" || source.Confidence != "low" {
t.Fatalf("unexpected goals source evidence: %#v", source)
}
if _, err := os.Stat(filepath.Join(root, "state_5.sqlite")); !os.IsNotExist(err) {
t.Fatalf("state sqlite was created or stat failed unexpectedly: %v", err)
}
if _, err := os.Stat(filepath.Join(root, "goals_1.sqlite")); !os.IsNotExist(err) {
t.Fatalf("goals sqlite was created or stat failed unexpectedly: %v", err)
}
}
func TestStoreReadsThreadsEdgesAndGoalsFromReadonlySQLite(t *testing.T) {
@@ -82,7 +89,7 @@ func TestStoreMarksStateMissingWhenOnlyGoalsSQLiteExists(t *testing.T) {
if source := snapshot.Sources["goals"]; source.Kind != "sqlite_readonly" || source.Confidence != "high" {
t.Fatalf("unexpected goals source evidence: %#v", source)
}
if snapshot.Source.Confidence != "partial" || snapshot.Source.Kind != "sqlite_partial" {
if snapshot.Source.Confidence != "medium" || snapshot.Source.Kind != "sqlite_partial" {
t.Fatalf("unexpected aggregate source evidence: %#v", snapshot.Source)
}
}
@@ -110,19 +117,167 @@ func TestStoreMarksGoalsMissingWhenOnlyStateSQLiteExists(t *testing.T) {
if source := snapshot.Sources["goals"]; source.Kind != "sqlite_missing" || source.Confidence != "low" {
t.Fatalf("unexpected goals source evidence: %#v", source)
}
if snapshot.Source.Confidence != "partial" || snapshot.Source.Kind != "sqlite_partial" {
if snapshot.Source.Confidence != "medium" || snapshot.Source.Kind != "sqlite_partial" {
t.Fatalf("unexpected aggregate source evidence: %#v", snapshot.Source)
}
}
func TestStoreToleratesMissingOptionalRuntimeColumns(t *testing.T) {
root := t.TempDir()
stateDB := openWritableSQLite(t, filepath.Join(root, "state_5.sqlite"))
defer stateDB.Close()
execSQL(t, stateDB, `CREATE TABLE threads (
id TEXT PRIMARY KEY,
role TEXT,
status TEXT,
created_at TEXT
)`)
execSQL(t, stateDB, `CREATE TABLE thread_spawn_edges (
from_thread_id TEXT,
to_thread_id TEXT,
created_at TEXT
)`)
execSQL(t, stateDB, `INSERT INTO threads (id, role, status, created_at) VALUES
('thread-a', 'analyst', 'done', '2026-05-25T01:00:00Z')`)
execSQL(t, stateDB, `INSERT INTO thread_spawn_edges (from_thread_id, to_thread_id, created_at) VALUES
('thread-a', 'thread-b', '2026-05-25T01:06:00Z')`)
snapshot, err := Store{CodexHome: root}.Snapshot()
if err != nil {
t.Fatalf("Snapshot returned error: %v", err)
}
if len(snapshot.Threads) != 1 || snapshot.Threads[0].UpdatedAt != "" {
t.Fatalf("unexpected threads for missing updated_at: %#v", snapshot.Threads)
}
if len(snapshot.SpawnEdges) != 1 || snapshot.SpawnEdges[0].Reason != "" {
t.Fatalf("unexpected edges for missing reason: %#v", snapshot.SpawnEdges)
}
}
func TestStoreToleratesNullRuntimeValues(t *testing.T) {
root := t.TempDir()
stateDB := openWritableSQLite(t, filepath.Join(root, "state_5.sqlite"))
defer stateDB.Close()
execSQL(t, stateDB, `CREATE TABLE threads (
id TEXT PRIMARY KEY,
role TEXT,
status TEXT,
created_at TEXT,
updated_at TEXT
)`)
execSQL(t, stateDB, `CREATE TABLE thread_spawn_edges (
from_thread_id TEXT,
to_thread_id TEXT,
reason TEXT,
created_at TEXT
)`)
execSQL(t, stateDB, `INSERT INTO threads (id, role, status, created_at, updated_at) VALUES
('thread-a', NULL, NULL, NULL, NULL)`)
execSQL(t, stateDB, `INSERT INTO thread_spawn_edges (from_thread_id, to_thread_id, reason, created_at) VALUES
('thread-a', 'thread-b', NULL, NULL)`)
goalsDB := openWritableSQLite(t, filepath.Join(root, "goals_1.sqlite"))
defer goalsDB.Close()
execSQL(t, goalsDB, `CREATE TABLE thread_goals (
thread_id TEXT,
goal TEXT,
status TEXT,
updated_at TEXT
)`)
execSQL(t, goalsDB, `INSERT INTO thread_goals (thread_id, goal, status, updated_at) VALUES
('thread-a', NULL, NULL, NULL)`)
snapshot, err := Store{CodexHome: root}.Snapshot()
if err != nil {
t.Fatalf("Snapshot returned error: %v", err)
}
if snapshot.Threads[0].Role != "" || snapshot.Threads[0].Status != "" || snapshot.Threads[0].CreatedAt != "" || snapshot.Threads[0].UpdatedAt != "" {
t.Fatalf("NULL thread fields were not converted to empty strings: %#v", snapshot.Threads[0])
}
if snapshot.SpawnEdges[0].Reason != "" || snapshot.SpawnEdges[0].CreatedAt != "" {
t.Fatalf("NULL edge fields were not converted to empty strings: %#v", snapshot.SpawnEdges[0])
}
if snapshot.Goals[0].Goal != "" || snapshot.Goals[0].Status != "" || snapshot.Goals[0].UpdatedAt != "" {
t.Fatalf("NULL goal fields were not converted to empty strings: %#v", snapshot.Goals[0])
}
}
func TestStoreCastsNumericRuntimeValuesToText(t *testing.T) {
root := t.TempDir()
stateDB := openWritableSQLite(t, filepath.Join(root, "state_5.sqlite"))
defer stateDB.Close()
execSQL(t, stateDB, `CREATE TABLE threads (
id INTEGER PRIMARY KEY,
role INTEGER,
status INTEGER,
created_at INTEGER,
updated_at INTEGER
)`)
execSQL(t, stateDB, `CREATE TABLE thread_spawn_edges (
from_thread_id INTEGER,
to_thread_id INTEGER,
reason INTEGER,
created_at INTEGER
)`)
execSQL(t, stateDB, `INSERT INTO threads (id, role, status, created_at, updated_at) VALUES
(7, 8, 9, 10, 11)`)
execSQL(t, stateDB, `INSERT INTO thread_spawn_edges (from_thread_id, to_thread_id, reason, created_at) VALUES
(7, 12, 13, 14)`)
goalsDB := openWritableSQLite(t, filepath.Join(root, "goals_1.sqlite"))
defer goalsDB.Close()
execSQL(t, goalsDB, `CREATE TABLE thread_goals (
thread_id INTEGER,
goal INTEGER,
status INTEGER,
updated_at INTEGER
)`)
execSQL(t, goalsDB, `INSERT INTO thread_goals (thread_id, goal, status, updated_at) VALUES
(7, 15, 16, 17)`)
snapshot, err := Store{CodexHome: root}.Snapshot()
if err != nil {
t.Fatalf("Snapshot returned error: %v", err)
}
if snapshot.Threads[0].ID != "7" || snapshot.Threads[0].Role != "8" || snapshot.Threads[0].UpdatedAt != "11" {
t.Fatalf("numeric thread fields were not cast to text: %#v", snapshot.Threads[0])
}
if snapshot.SpawnEdges[0].FromThreadID != "7" || snapshot.SpawnEdges[0].Reason != "13" {
t.Fatalf("numeric edge fields were not cast to text: %#v", snapshot.SpawnEdges[0])
}
if snapshot.Goals[0].ThreadID != "7" || snapshot.Goals[0].Goal != "15" {
t.Fatalf("numeric goal fields were not cast to text: %#v", snapshot.Goals[0])
}
}
func TestStoreReturnsEmptyTableEvidenceWhenCriticalColumnMissing(t *testing.T) {
root := t.TempDir()
stateDB := openWritableSQLite(t, filepath.Join(root, "state_5.sqlite"))
defer stateDB.Close()
execSQL(t, stateDB, `CREATE TABLE threads (
role TEXT,
status TEXT,
created_at TEXT,
updated_at TEXT
)`)
execSQL(t, stateDB, `INSERT INTO threads (role, status, created_at, updated_at) VALUES
('analyst', 'done', '2026-05-25T01:00:00Z', '2026-05-25T01:05:00Z')`)
snapshot, err := Store{CodexHome: root}.Snapshot()
if err != nil {
t.Fatalf("Snapshot returned error: %v", err)
}
if len(snapshot.Threads) != 0 {
t.Fatalf("threads = %#v, want empty when critical id column is missing", snapshot.Threads)
}
if source := snapshot.Sources["threads"]; source.Kind != "sqlite_schema_drift" || source.Confidence != "low" {
t.Fatalf("unexpected threads schema source: %#v", source)
}
}
func createRuntimeSQLite(t *testing.T, root string) {
t.Helper()
statePath := filepath.Join(root, "state_5.sqlite")
goalsPath := filepath.Join(root, "goals_1.sqlite")
stateDB, err := sql.Open("sqlite", statePath)
if err != nil {
t.Fatal(err)
}
stateDB := openWritableSQLite(t, statePath)
defer stateDB.Close()
execSQL(t, stateDB, `CREATE TABLE threads (
id TEXT PRIMARY KEY,
@@ -143,10 +298,7 @@ func createRuntimeSQLite(t *testing.T, root string) {
execSQL(t, stateDB, `INSERT INTO thread_spawn_edges (from_thread_id, to_thread_id, reason, created_at) VALUES
('thread-a', 'thread-b', 'handoff', '2026-05-25T01:06:00Z')`)
goalsDB, err := sql.Open("sqlite", goalsPath)
if err != nil {
t.Fatal(err)
}
goalsDB := openWritableSQLite(t, goalsPath)
defer goalsDB.Close()
execSQL(t, goalsDB, `CREATE TABLE thread_goals (
thread_id TEXT,
@@ -158,6 +310,15 @@ func createRuntimeSQLite(t *testing.T, root string) {
('thread-b', 'ship phase 3', 'in_progress', '2026-05-25T01:08:00Z')`)
}
func openWritableSQLite(t *testing.T, path string) *sql.DB {
t.Helper()
db, err := sql.Open("sqlite", path)
if err != nil {
t.Fatal(err)
}
return db
}
func execSQL(t *testing.T, db *sql.DB, query string) {
t.Helper()
if _, err := db.Exec(query); err != nil {