diff --git a/tools/pgimport/README.md b/tools/pgimport/README.md new file mode 100644 index 00000000..2c645405 --- /dev/null +++ b/tools/pgimport/README.md @@ -0,0 +1,35 @@ +# pgimport + +Migrates ntfy data from SQLite to PostgreSQL. + +## Build + +```bash +go build -o pgimport ./tools/pgimport/ +``` + +## Usage + +```bash +# Using CLI flags +pgimport \ + --database-url "postgres://user:pass@host:5432/ntfy?sslmode=require" \ + --cache-file /var/cache/ntfy/cache.db \ + --auth-file /var/lib/ntfy/user.db \ + --web-push-file /var/lib/ntfy/webpush.db + +# Using server.yml (flags override config values) +pgimport --config /etc/ntfy/server.yml +``` + +## Prerequisites + +- PostgreSQL schema must already be set up (run ntfy with `database-url` once) +- ntfy must not be running during the import +- All three SQLite files are optional; only the ones specified will be imported + +## Notes + +- The tool is idempotent and safe to re-run +- After importing, row counts and content are verified against the SQLite sources +- Invalid UTF-8 in messages is replaced with the Unicode replacement character diff --git a/tools/pgimport/main.go b/tools/pgimport/main.go index fb91da27..9637d33d 100644 --- a/tools/pgimport/main.go +++ b/tools/pgimport/main.go @@ -658,10 +658,35 @@ func verifyUsers(sqliteFile string, pgDB *sql.DB, failed *bool) error { defer sqlDB.Close() verifyCount(sqlDB, pgDB, "tier", `SELECT COUNT(*) FROM tier`, `SELECT COUNT(*) FROM tier`, failed) + verifyContent(sqlDB, pgDB, "tier", + `SELECT id, code, name FROM tier ORDER BY id`, + `SELECT id, code, name FROM tier ORDER BY id COLLATE "C"`, + failed) + verifyCount(sqlDB, pgDB, "user", `SELECT COUNT(*) FROM user`, `SELECT COUNT(*) FROM "user"`, failed) + verifyContent(sqlDB, pgDB, "user", + `SELECT id, user, role, sync_topic FROM user ORDER BY id`, + `SELECT id, user_name, role, sync_topic FROM "user" ORDER BY id COLLATE "C"`, + failed) + verifyCount(sqlDB, pgDB, "user_access", `SELECT COUNT(*) FROM user_access a JOIN user u ON u.id = a.user_id`, `SELECT COUNT(*) FROM user_access`, failed) + verifyContent(sqlDB, pgDB, "user_access", + `SELECT a.user_id, a.topic FROM user_access a JOIN user u ON u.id = a.user_id ORDER BY a.user_id, a.topic`, + `SELECT user_id, topic FROM user_access ORDER BY user_id COLLATE "C", topic COLLATE "C"`, + failed) + verifyCount(sqlDB, pgDB, "user_token", `SELECT COUNT(*) FROM user_token t JOIN user u ON u.id = t.user_id`, `SELECT COUNT(*) FROM user_token`, failed) + verifyContent(sqlDB, pgDB, "user_token", + `SELECT t.user_id, t.token, t.label FROM user_token t JOIN user u ON u.id = t.user_id ORDER BY t.user_id, t.token`, + `SELECT user_id, token, label FROM user_token ORDER BY user_id COLLATE "C", token COLLATE "C"`, + failed) + verifyCount(sqlDB, pgDB, "user_phone", `SELECT COUNT(*) FROM user_phone p JOIN user u ON u.id = p.user_id`, `SELECT COUNT(*) FROM user_phone`, failed) + verifyContent(sqlDB, pgDB, "user_phone", + `SELECT p.user_id, p.phone_number FROM user_phone p JOIN user u ON u.id = p.user_id ORDER BY p.user_id, p.phone_number`, + `SELECT user_id, phone_number FROM user_phone ORDER BY user_id COLLATE "C", phone_number COLLATE "C"`, + failed) + return nil } @@ -673,6 +698,7 @@ func verifyMessages(sqliteFile string, pgDB *sql.DB, failed *bool) error { defer sqlDB.Close() verifyCount(sqlDB, pgDB, "messages", `SELECT COUNT(*) FROM messages`, `SELECT COUNT(*) FROM message`, failed) + verifySampledMessages(sqlDB, pgDB, failed) return nil } @@ -684,26 +710,179 @@ func verifyWebPush(sqliteFile string, pgDB *sql.DB, failed *bool) error { defer sqlDB.Close() verifyCount(sqlDB, pgDB, "subscription", `SELECT COUNT(*) FROM subscription`, `SELECT COUNT(*) FROM webpush_subscription`, failed) + verifyContent(sqlDB, pgDB, "subscription", + `SELECT id, endpoint, key_auth, key_p256dh, user_id FROM subscription ORDER BY id`, + `SELECT id, endpoint, key_auth, key_p256dh, user_id FROM webpush_subscription ORDER BY id COLLATE "C"`, + failed) + verifyCount(sqlDB, pgDB, "subscription_topic", `SELECT COUNT(*) FROM subscription_topic`, `SELECT COUNT(*) FROM webpush_subscription_topic`, failed) + verifyContent(sqlDB, pgDB, "subscription_topic", + `SELECT subscription_id, topic FROM subscription_topic ORDER BY subscription_id, topic`, + `SELECT subscription_id, topic FROM webpush_subscription_topic ORDER BY subscription_id COLLATE "C", topic COLLATE "C"`, + failed) + return nil } func verifyCount(sqlDB, pgDB *sql.DB, table, sqliteQuery, pgQuery string, failed *bool) { var sqliteCount, pgCount int64 if err := sqlDB.QueryRow(sqliteQuery).Scan(&sqliteCount); err != nil { - fmt.Printf(" %-20s ERROR reading SQLite: %s\n", table, err) + fmt.Printf(" %-25s count ERROR reading SQLite: %s\n", table, err) *failed = true return } if err := pgDB.QueryRow(pgQuery).Scan(&pgCount); err != nil { - fmt.Printf(" %-20s ERROR reading PostgreSQL: %s\n", table, err) + fmt.Printf(" %-25s count ERROR reading PostgreSQL: %s\n", table, err) *failed = true return } if sqliteCount == pgCount { - fmt.Printf(" %-20s OK (%d rows)\n", table, pgCount) + fmt.Printf(" %-25s count OK (%d rows)\n", table, pgCount) } else { - fmt.Printf(" %-20s MISMATCH: SQLite=%d, PostgreSQL=%d\n", table, sqliteCount, pgCount) + fmt.Printf(" %-25s count MISMATCH: SQLite=%d, PostgreSQL=%d\n", table, sqliteCount, pgCount) *failed = true } } + +func verifyContent(sqlDB, pgDB *sql.DB, table, sqliteQuery, pgQuery string, failed *bool) { + sqliteRows, err := sqlDB.Query(sqliteQuery) + if err != nil { + fmt.Printf(" %-25s content ERROR reading SQLite: %s\n", table, err) + *failed = true + return + } + defer sqliteRows.Close() + + pgRows, err := pgDB.Query(pgQuery) + if err != nil { + fmt.Printf(" %-25s content ERROR reading PostgreSQL: %s\n", table, err) + *failed = true + return + } + defer pgRows.Close() + + cols, err := sqliteRows.Columns() + if err != nil { + fmt.Printf(" %-25s content ERROR reading columns: %s\n", table, err) + *failed = true + return + } + numCols := len(cols) + + rowNum := 0 + mismatches := 0 + for sqliteRows.Next() { + rowNum++ + if !pgRows.Next() { + fmt.Printf(" %-25s content MISMATCH: PostgreSQL has fewer rows (at row %d)\n", table, rowNum) + *failed = true + return + } + sqliteVals := makeStringSlice(numCols) + pgVals := makeStringSlice(numCols) + if err := sqliteRows.Scan(sqliteVals...); err != nil { + fmt.Printf(" %-25s content ERROR scanning SQLite row %d: %s\n", table, rowNum, err) + *failed = true + return + } + if err := pgRows.Scan(pgVals...); err != nil { + fmt.Printf(" %-25s content ERROR scanning PostgreSQL row %d: %s\n", table, rowNum, err) + *failed = true + return + } + for i := 0; i < numCols; i++ { + sv := *(sqliteVals[i].(*sql.NullString)) + pv := *(pgVals[i].(*sql.NullString)) + if sv != pv { + mismatches++ + if mismatches <= 3 { + fmt.Printf(" %-25s content MISMATCH at row %d, col %s: SQLite=%q, PostgreSQL=%q\n", table, rowNum, cols[i], sv.String, pv.String) + } + } + } + } + if pgRows.Next() { + fmt.Printf(" %-25s content MISMATCH: PostgreSQL has more rows than SQLite\n", table) + *failed = true + return + } + if mismatches > 0 { + if mismatches > 3 { + fmt.Printf(" %-25s content ... and %d more mismatches\n", table, mismatches-3) + } + *failed = true + } else { + fmt.Printf(" %-25s content OK\n", table) + } +} + +func verifySampledMessages(sqlDB, pgDB *sql.DB, failed *bool) { + rows, err := sqlDB.Query(`SELECT mid, topic, time, message, title, tags, priority FROM messages ORDER BY mid`) + if err != nil { + fmt.Printf(" %-25s content ERROR reading SQLite: %s\n", "messages (sampled)", err) + *failed = true + return + } + defer rows.Close() + + rowNum := 0 + checked := 0 + mismatches := 0 + for rows.Next() { + rowNum++ + var mid, topic, message, title, tags string + var msgTime int64 + var priority int + if err := rows.Scan(&mid, &topic, &msgTime, &message, &title, &tags, &priority); err != nil { + fmt.Printf(" %-25s content ERROR scanning SQLite row %d: %s\n", "messages (sampled)", rowNum, err) + *failed = true + return + } + if rowNum%100 != 1 { + continue + } + checked++ + var pgTopic, pgMessage, pgTitle, pgTags string + var pgTime int64 + var pgPriority int + err := pgDB.QueryRow(`SELECT topic, time, message, title, tags, priority FROM message WHERE mid = $1`, mid). + Scan(&pgTopic, &pgTime, &pgMessage, &pgTitle, &pgTags, &pgPriority) + if err == sql.ErrNoRows { + mismatches++ + if mismatches <= 3 { + fmt.Printf(" %-25s content MISMATCH: mid=%s not found in PostgreSQL\n", "messages (sampled)", mid) + } + continue + } else if err != nil { + fmt.Printf(" %-25s content ERROR querying PostgreSQL for mid=%s: %s\n", "messages (sampled)", mid, err) + *failed = true + return + } + topic = toUTF8(topic) + message = toUTF8(message) + title = toUTF8(title) + tags = toUTF8(tags) + if topic != pgTopic || msgTime != pgTime || message != pgMessage || title != pgTitle || tags != pgTags || priority != pgPriority { + mismatches++ + if mismatches <= 3 { + fmt.Printf(" %-25s content MISMATCH at mid=%s\n", "messages (sampled)", mid) + } + } + } + if mismatches > 0 { + if mismatches > 3 { + fmt.Printf(" %-25s content ... and %d more mismatches\n", "messages (sampled)", mismatches-3) + } + *failed = true + } else { + fmt.Printf(" %-25s content OK (%d samples checked)\n", "messages (sampled)", checked) + } +} + +func makeStringSlice(n int) []any { + vals := make([]any, n) + for i := range vals { + vals[i] = &sql.NullString{} + } + return vals +}