Sample
This commit is contained in:
35
tools/pgimport/README.md
Normal file
35
tools/pgimport/README.md
Normal file
@@ -0,0 +1,35 @@
|
||||
# pgimport
|
||||
|
||||
Migrates ntfy data from SQLite to PostgreSQL.
|
||||
|
||||
## Build
|
||||
|
||||
```bash
|
||||
go build -o pgimport ./tools/pgimport/
|
||||
```
|
||||
|
||||
## Usage
|
||||
|
||||
```bash
|
||||
# Using CLI flags
|
||||
pgimport \
|
||||
--database-url "postgres://user:pass@host:5432/ntfy?sslmode=require" \
|
||||
--cache-file /var/cache/ntfy/cache.db \
|
||||
--auth-file /var/lib/ntfy/user.db \
|
||||
--web-push-file /var/lib/ntfy/webpush.db
|
||||
|
||||
# Using server.yml (flags override config values)
|
||||
pgimport --config /etc/ntfy/server.yml
|
||||
```
|
||||
|
||||
## Prerequisites
|
||||
|
||||
- PostgreSQL schema must already be set up (run ntfy with `database-url` once)
|
||||
- ntfy must not be running during the import
|
||||
- All three SQLite files are optional; only the ones specified will be imported
|
||||
|
||||
## Notes
|
||||
|
||||
- The tool is idempotent and safe to re-run
|
||||
- After importing, row counts and content are verified against the SQLite sources
|
||||
- Invalid UTF-8 in messages is replaced with the Unicode replacement character
|
||||
@@ -658,10 +658,35 @@ func verifyUsers(sqliteFile string, pgDB *sql.DB, failed *bool) error {
|
||||
defer sqlDB.Close()
|
||||
|
||||
verifyCount(sqlDB, pgDB, "tier", `SELECT COUNT(*) FROM tier`, `SELECT COUNT(*) FROM tier`, failed)
|
||||
verifyContent(sqlDB, pgDB, "tier",
|
||||
`SELECT id, code, name FROM tier ORDER BY id`,
|
||||
`SELECT id, code, name FROM tier ORDER BY id COLLATE "C"`,
|
||||
failed)
|
||||
|
||||
verifyCount(sqlDB, pgDB, "user", `SELECT COUNT(*) FROM user`, `SELECT COUNT(*) FROM "user"`, failed)
|
||||
verifyContent(sqlDB, pgDB, "user",
|
||||
`SELECT id, user, role, sync_topic FROM user ORDER BY id`,
|
||||
`SELECT id, user_name, role, sync_topic FROM "user" ORDER BY id COLLATE "C"`,
|
||||
failed)
|
||||
|
||||
verifyCount(sqlDB, pgDB, "user_access", `SELECT COUNT(*) FROM user_access a JOIN user u ON u.id = a.user_id`, `SELECT COUNT(*) FROM user_access`, failed)
|
||||
verifyContent(sqlDB, pgDB, "user_access",
|
||||
`SELECT a.user_id, a.topic FROM user_access a JOIN user u ON u.id = a.user_id ORDER BY a.user_id, a.topic`,
|
||||
`SELECT user_id, topic FROM user_access ORDER BY user_id COLLATE "C", topic COLLATE "C"`,
|
||||
failed)
|
||||
|
||||
verifyCount(sqlDB, pgDB, "user_token", `SELECT COUNT(*) FROM user_token t JOIN user u ON u.id = t.user_id`, `SELECT COUNT(*) FROM user_token`, failed)
|
||||
verifyContent(sqlDB, pgDB, "user_token",
|
||||
`SELECT t.user_id, t.token, t.label FROM user_token t JOIN user u ON u.id = t.user_id ORDER BY t.user_id, t.token`,
|
||||
`SELECT user_id, token, label FROM user_token ORDER BY user_id COLLATE "C", token COLLATE "C"`,
|
||||
failed)
|
||||
|
||||
verifyCount(sqlDB, pgDB, "user_phone", `SELECT COUNT(*) FROM user_phone p JOIN user u ON u.id = p.user_id`, `SELECT COUNT(*) FROM user_phone`, failed)
|
||||
verifyContent(sqlDB, pgDB, "user_phone",
|
||||
`SELECT p.user_id, p.phone_number FROM user_phone p JOIN user u ON u.id = p.user_id ORDER BY p.user_id, p.phone_number`,
|
||||
`SELECT user_id, phone_number FROM user_phone ORDER BY user_id COLLATE "C", phone_number COLLATE "C"`,
|
||||
failed)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -673,6 +698,7 @@ func verifyMessages(sqliteFile string, pgDB *sql.DB, failed *bool) error {
|
||||
defer sqlDB.Close()
|
||||
|
||||
verifyCount(sqlDB, pgDB, "messages", `SELECT COUNT(*) FROM messages`, `SELECT COUNT(*) FROM message`, failed)
|
||||
verifySampledMessages(sqlDB, pgDB, failed)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -684,26 +710,179 @@ func verifyWebPush(sqliteFile string, pgDB *sql.DB, failed *bool) error {
|
||||
defer sqlDB.Close()
|
||||
|
||||
verifyCount(sqlDB, pgDB, "subscription", `SELECT COUNT(*) FROM subscription`, `SELECT COUNT(*) FROM webpush_subscription`, failed)
|
||||
verifyContent(sqlDB, pgDB, "subscription",
|
||||
`SELECT id, endpoint, key_auth, key_p256dh, user_id FROM subscription ORDER BY id`,
|
||||
`SELECT id, endpoint, key_auth, key_p256dh, user_id FROM webpush_subscription ORDER BY id COLLATE "C"`,
|
||||
failed)
|
||||
|
||||
verifyCount(sqlDB, pgDB, "subscription_topic", `SELECT COUNT(*) FROM subscription_topic`, `SELECT COUNT(*) FROM webpush_subscription_topic`, failed)
|
||||
verifyContent(sqlDB, pgDB, "subscription_topic",
|
||||
`SELECT subscription_id, topic FROM subscription_topic ORDER BY subscription_id, topic`,
|
||||
`SELECT subscription_id, topic FROM webpush_subscription_topic ORDER BY subscription_id COLLATE "C", topic COLLATE "C"`,
|
||||
failed)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func verifyCount(sqlDB, pgDB *sql.DB, table, sqliteQuery, pgQuery string, failed *bool) {
|
||||
var sqliteCount, pgCount int64
|
||||
if err := sqlDB.QueryRow(sqliteQuery).Scan(&sqliteCount); err != nil {
|
||||
fmt.Printf(" %-20s ERROR reading SQLite: %s\n", table, err)
|
||||
fmt.Printf(" %-25s count ERROR reading SQLite: %s\n", table, err)
|
||||
*failed = true
|
||||
return
|
||||
}
|
||||
if err := pgDB.QueryRow(pgQuery).Scan(&pgCount); err != nil {
|
||||
fmt.Printf(" %-20s ERROR reading PostgreSQL: %s\n", table, err)
|
||||
fmt.Printf(" %-25s count ERROR reading PostgreSQL: %s\n", table, err)
|
||||
*failed = true
|
||||
return
|
||||
}
|
||||
if sqliteCount == pgCount {
|
||||
fmt.Printf(" %-20s OK (%d rows)\n", table, pgCount)
|
||||
fmt.Printf(" %-25s count OK (%d rows)\n", table, pgCount)
|
||||
} else {
|
||||
fmt.Printf(" %-20s MISMATCH: SQLite=%d, PostgreSQL=%d\n", table, sqliteCount, pgCount)
|
||||
fmt.Printf(" %-25s count MISMATCH: SQLite=%d, PostgreSQL=%d\n", table, sqliteCount, pgCount)
|
||||
*failed = true
|
||||
}
|
||||
}
|
||||
|
||||
func verifyContent(sqlDB, pgDB *sql.DB, table, sqliteQuery, pgQuery string, failed *bool) {
|
||||
sqliteRows, err := sqlDB.Query(sqliteQuery)
|
||||
if err != nil {
|
||||
fmt.Printf(" %-25s content ERROR reading SQLite: %s\n", table, err)
|
||||
*failed = true
|
||||
return
|
||||
}
|
||||
defer sqliteRows.Close()
|
||||
|
||||
pgRows, err := pgDB.Query(pgQuery)
|
||||
if err != nil {
|
||||
fmt.Printf(" %-25s content ERROR reading PostgreSQL: %s\n", table, err)
|
||||
*failed = true
|
||||
return
|
||||
}
|
||||
defer pgRows.Close()
|
||||
|
||||
cols, err := sqliteRows.Columns()
|
||||
if err != nil {
|
||||
fmt.Printf(" %-25s content ERROR reading columns: %s\n", table, err)
|
||||
*failed = true
|
||||
return
|
||||
}
|
||||
numCols := len(cols)
|
||||
|
||||
rowNum := 0
|
||||
mismatches := 0
|
||||
for sqliteRows.Next() {
|
||||
rowNum++
|
||||
if !pgRows.Next() {
|
||||
fmt.Printf(" %-25s content MISMATCH: PostgreSQL has fewer rows (at row %d)\n", table, rowNum)
|
||||
*failed = true
|
||||
return
|
||||
}
|
||||
sqliteVals := makeStringSlice(numCols)
|
||||
pgVals := makeStringSlice(numCols)
|
||||
if err := sqliteRows.Scan(sqliteVals...); err != nil {
|
||||
fmt.Printf(" %-25s content ERROR scanning SQLite row %d: %s\n", table, rowNum, err)
|
||||
*failed = true
|
||||
return
|
||||
}
|
||||
if err := pgRows.Scan(pgVals...); err != nil {
|
||||
fmt.Printf(" %-25s content ERROR scanning PostgreSQL row %d: %s\n", table, rowNum, err)
|
||||
*failed = true
|
||||
return
|
||||
}
|
||||
for i := 0; i < numCols; i++ {
|
||||
sv := *(sqliteVals[i].(*sql.NullString))
|
||||
pv := *(pgVals[i].(*sql.NullString))
|
||||
if sv != pv {
|
||||
mismatches++
|
||||
if mismatches <= 3 {
|
||||
fmt.Printf(" %-25s content MISMATCH at row %d, col %s: SQLite=%q, PostgreSQL=%q\n", table, rowNum, cols[i], sv.String, pv.String)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if pgRows.Next() {
|
||||
fmt.Printf(" %-25s content MISMATCH: PostgreSQL has more rows than SQLite\n", table)
|
||||
*failed = true
|
||||
return
|
||||
}
|
||||
if mismatches > 0 {
|
||||
if mismatches > 3 {
|
||||
fmt.Printf(" %-25s content ... and %d more mismatches\n", table, mismatches-3)
|
||||
}
|
||||
*failed = true
|
||||
} else {
|
||||
fmt.Printf(" %-25s content OK\n", table)
|
||||
}
|
||||
}
|
||||
|
||||
func verifySampledMessages(sqlDB, pgDB *sql.DB, failed *bool) {
|
||||
rows, err := sqlDB.Query(`SELECT mid, topic, time, message, title, tags, priority FROM messages ORDER BY mid`)
|
||||
if err != nil {
|
||||
fmt.Printf(" %-25s content ERROR reading SQLite: %s\n", "messages (sampled)", err)
|
||||
*failed = true
|
||||
return
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
rowNum := 0
|
||||
checked := 0
|
||||
mismatches := 0
|
||||
for rows.Next() {
|
||||
rowNum++
|
||||
var mid, topic, message, title, tags string
|
||||
var msgTime int64
|
||||
var priority int
|
||||
if err := rows.Scan(&mid, &topic, &msgTime, &message, &title, &tags, &priority); err != nil {
|
||||
fmt.Printf(" %-25s content ERROR scanning SQLite row %d: %s\n", "messages (sampled)", rowNum, err)
|
||||
*failed = true
|
||||
return
|
||||
}
|
||||
if rowNum%100 != 1 {
|
||||
continue
|
||||
}
|
||||
checked++
|
||||
var pgTopic, pgMessage, pgTitle, pgTags string
|
||||
var pgTime int64
|
||||
var pgPriority int
|
||||
err := pgDB.QueryRow(`SELECT topic, time, message, title, tags, priority FROM message WHERE mid = $1`, mid).
|
||||
Scan(&pgTopic, &pgTime, &pgMessage, &pgTitle, &pgTags, &pgPriority)
|
||||
if err == sql.ErrNoRows {
|
||||
mismatches++
|
||||
if mismatches <= 3 {
|
||||
fmt.Printf(" %-25s content MISMATCH: mid=%s not found in PostgreSQL\n", "messages (sampled)", mid)
|
||||
}
|
||||
continue
|
||||
} else if err != nil {
|
||||
fmt.Printf(" %-25s content ERROR querying PostgreSQL for mid=%s: %s\n", "messages (sampled)", mid, err)
|
||||
*failed = true
|
||||
return
|
||||
}
|
||||
topic = toUTF8(topic)
|
||||
message = toUTF8(message)
|
||||
title = toUTF8(title)
|
||||
tags = toUTF8(tags)
|
||||
if topic != pgTopic || msgTime != pgTime || message != pgMessage || title != pgTitle || tags != pgTags || priority != pgPriority {
|
||||
mismatches++
|
||||
if mismatches <= 3 {
|
||||
fmt.Printf(" %-25s content MISMATCH at mid=%s\n", "messages (sampled)", mid)
|
||||
}
|
||||
}
|
||||
}
|
||||
if mismatches > 0 {
|
||||
if mismatches > 3 {
|
||||
fmt.Printf(" %-25s content ... and %d more mismatches\n", "messages (sampled)", mismatches-3)
|
||||
}
|
||||
*failed = true
|
||||
} else {
|
||||
fmt.Printf(" %-25s content OK (%d samples checked)\n", "messages (sampled)", checked)
|
||||
}
|
||||
}
|
||||
|
||||
func makeStringSlice(n int) []any {
|
||||
vals := make([]any, n)
|
||||
for i := range vals {
|
||||
vals[i] = &sql.NullString{}
|
||||
}
|
||||
return vals
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user