Compare commits
8 Commits
main
...
postgres-w
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b567b4e904 | ||
|
|
82e15d84bd | ||
|
|
4e5f95ba0c | ||
|
|
869b972a50 | ||
|
|
bdd20197b3 | ||
|
|
a8dcecdb6d | ||
|
|
5331437664 | ||
|
|
e432bf2886 |
@@ -39,6 +39,7 @@ var flagsServe = append(
|
|||||||
altsrc.NewStringFlag(&cli.StringFlag{Name: "key-file", Aliases: []string{"key_file", "K"}, EnvVars: []string{"NTFY_KEY_FILE"}, Usage: "private key file, if listen-https is set"}),
|
altsrc.NewStringFlag(&cli.StringFlag{Name: "key-file", Aliases: []string{"key_file", "K"}, EnvVars: []string{"NTFY_KEY_FILE"}, Usage: "private key file, if listen-https is set"}),
|
||||||
altsrc.NewStringFlag(&cli.StringFlag{Name: "cert-file", Aliases: []string{"cert_file", "E"}, EnvVars: []string{"NTFY_CERT_FILE"}, Usage: "certificate file, if listen-https is set"}),
|
altsrc.NewStringFlag(&cli.StringFlag{Name: "cert-file", Aliases: []string{"cert_file", "E"}, EnvVars: []string{"NTFY_CERT_FILE"}, Usage: "certificate file, if listen-https is set"}),
|
||||||
altsrc.NewStringFlag(&cli.StringFlag{Name: "firebase-key-file", Aliases: []string{"firebase_key_file", "F"}, EnvVars: []string{"NTFY_FIREBASE_KEY_FILE"}, Usage: "Firebase credentials file; if set additionally publish to FCM topic"}),
|
altsrc.NewStringFlag(&cli.StringFlag{Name: "firebase-key-file", Aliases: []string{"firebase_key_file", "F"}, EnvVars: []string{"NTFY_FIREBASE_KEY_FILE"}, Usage: "Firebase credentials file; if set additionally publish to FCM topic"}),
|
||||||
|
altsrc.NewStringFlag(&cli.StringFlag{Name: "database-url", Aliases: []string{"database_url"}, EnvVars: []string{"NTFY_DATABASE_URL"}, Usage: "PostgreSQL connection string for database-backed stores (e.g. postgres://user:pass@host:5432/ntfy)"}),
|
||||||
altsrc.NewStringFlag(&cli.StringFlag{Name: "cache-file", Aliases: []string{"cache_file", "C"}, EnvVars: []string{"NTFY_CACHE_FILE"}, Usage: "cache file used for message caching"}),
|
altsrc.NewStringFlag(&cli.StringFlag{Name: "cache-file", Aliases: []string{"cache_file", "C"}, EnvVars: []string{"NTFY_CACHE_FILE"}, Usage: "cache file used for message caching"}),
|
||||||
altsrc.NewStringFlag(&cli.StringFlag{Name: "cache-duration", Aliases: []string{"cache_duration", "b"}, EnvVars: []string{"NTFY_CACHE_DURATION"}, Value: util.FormatDuration(server.DefaultCacheDuration), Usage: "buffer messages for this time to allow `since` requests"}),
|
altsrc.NewStringFlag(&cli.StringFlag{Name: "cache-duration", Aliases: []string{"cache_duration", "b"}, EnvVars: []string{"NTFY_CACHE_DURATION"}, Value: util.FormatDuration(server.DefaultCacheDuration), Usage: "buffer messages for this time to allow `since` requests"}),
|
||||||
altsrc.NewIntFlag(&cli.IntFlag{Name: "cache-batch-size", Aliases: []string{"cache_batch_size"}, EnvVars: []string{"NTFY_BATCH_SIZE"}, Usage: "max size of messages to batch together when writing to message cache (if zero, writes are synchronous)"}),
|
altsrc.NewIntFlag(&cli.IntFlag{Name: "cache-batch-size", Aliases: []string{"cache_batch_size"}, EnvVars: []string{"NTFY_BATCH_SIZE"}, Usage: "max size of messages to batch together when writing to message cache (if zero, writes are synchronous)"}),
|
||||||
@@ -143,6 +144,7 @@ func execServe(c *cli.Context) error {
|
|||||||
keyFile := c.String("key-file")
|
keyFile := c.String("key-file")
|
||||||
certFile := c.String("cert-file")
|
certFile := c.String("cert-file")
|
||||||
firebaseKeyFile := c.String("firebase-key-file")
|
firebaseKeyFile := c.String("firebase-key-file")
|
||||||
|
databaseURL := c.String("database-url")
|
||||||
webPushPrivateKey := c.String("web-push-private-key")
|
webPushPrivateKey := c.String("web-push-private-key")
|
||||||
webPushPublicKey := c.String("web-push-public-key")
|
webPushPublicKey := c.String("web-push-public-key")
|
||||||
webPushFile := c.String("web-push-file")
|
webPushFile := c.String("web-push-file")
|
||||||
@@ -284,8 +286,8 @@ func execServe(c *cli.Context) error {
|
|||||||
return errors.New("if set, FCM key file must exist")
|
return errors.New("if set, FCM key file must exist")
|
||||||
} else if firebaseKeyFile != "" && !server.FirebaseAvailable {
|
} else if firebaseKeyFile != "" && !server.FirebaseAvailable {
|
||||||
return errors.New("cannot set firebase-key-file, support for Firebase is not available (nofirebase)")
|
return errors.New("cannot set firebase-key-file, support for Firebase is not available (nofirebase)")
|
||||||
} else if webPushPublicKey != "" && (webPushPrivateKey == "" || webPushFile == "" || webPushEmailAddress == "" || baseURL == "") {
|
} else if webPushPublicKey != "" && (webPushPrivateKey == "" || (webPushFile == "" && databaseURL == "") || webPushEmailAddress == "" || baseURL == "") {
|
||||||
return errors.New("if web push is enabled, web-push-private-key, web-push-public-key, web-push-file, web-push-email-address, and base-url should be set. run 'ntfy webpush keys' to generate keys")
|
return errors.New("if web push is enabled, web-push-private-key, web-push-public-key, web-push-file (or database-url), web-push-email-address, and base-url should be set. run 'ntfy webpush keys' to generate keys")
|
||||||
} else if keepaliveInterval < 5*time.Second {
|
} else if keepaliveInterval < 5*time.Second {
|
||||||
return errors.New("keepalive interval cannot be lower than five seconds")
|
return errors.New("keepalive interval cannot be lower than five seconds")
|
||||||
} else if managerInterval < 5*time.Second {
|
} else if managerInterval < 5*time.Second {
|
||||||
@@ -494,6 +496,7 @@ func execServe(c *cli.Context) error {
|
|||||||
conf.EnableMetrics = enableMetrics
|
conf.EnableMetrics = enableMetrics
|
||||||
conf.MetricsListenHTTP = metricsListenHTTP
|
conf.MetricsListenHTTP = metricsListenHTTP
|
||||||
conf.ProfileListenHTTP = profileListenHTTP
|
conf.ProfileListenHTTP = profileListenHTTP
|
||||||
|
conf.DatabaseURL = databaseURL
|
||||||
conf.WebPushPrivateKey = webPushPrivateKey
|
conf.WebPushPrivateKey = webPushPrivateKey
|
||||||
conf.WebPushPublicKey = webPushPublicKey
|
conf.WebPushPublicKey = webPushPublicKey
|
||||||
conf.WebPushFile = webPushFile
|
conf.WebPushFile = webPushFile
|
||||||
|
|||||||
@@ -144,6 +144,20 @@ the message to the subscribers.
|
|||||||
Subscribers can retrieve cached messaging using the [`poll=1` parameter](subscribe/api.md#poll-for-messages), as well as the
|
Subscribers can retrieve cached messaging using the [`poll=1` parameter](subscribe/api.md#poll-for-messages), as well as the
|
||||||
[`since=` parameter](subscribe/api.md#fetch-cached-messages).
|
[`since=` parameter](subscribe/api.md#fetch-cached-messages).
|
||||||
|
|
||||||
|
## PostgreSQL database
|
||||||
|
By default, ntfy uses SQLite for all database-backed stores. As an alternative, you can configure ntfy to use PostgreSQL
|
||||||
|
by setting the `database-url` option to a PostgreSQL connection string:
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
database-url: "postgres://user:pass@host:5432/ntfy"
|
||||||
|
```
|
||||||
|
|
||||||
|
When `database-url` is set, ntfy will use PostgreSQL for the web push subscription store instead of SQLite. The
|
||||||
|
`web-push-file` option is not required in this case. Support for PostgreSQL for the message cache and user manager
|
||||||
|
will be added in future releases.
|
||||||
|
|
||||||
|
You can also set this via the environment variable `NTFY_DATABASE_URL` or the command line flag `--database-url`.
|
||||||
|
|
||||||
## Attachments
|
## Attachments
|
||||||
If desired, you may allow users to upload and [attach files to notifications](publish.md#attachments). To enable
|
If desired, you may allow users to upload and [attach files to notifications](publish.md#attachments). To enable
|
||||||
this feature, you have to simply configure an attachment cache directory and a base URL (`attachment-cache-dir`, `base-url`).
|
this feature, you have to simply configure an attachment cache directory and a base URL (`attachment-cache-dir`, `base-url`).
|
||||||
@@ -1141,12 +1155,15 @@ a database to keep track of the browser's subscriptions, and an admin email addr
|
|||||||
|
|
||||||
- `web-push-public-key` is the generated VAPID public key, e.g. AA1234BBCCddvveekaabcdfqwertyuiopasdfghjklzxcvbnm1234567890
|
- `web-push-public-key` is the generated VAPID public key, e.g. AA1234BBCCddvveekaabcdfqwertyuiopasdfghjklzxcvbnm1234567890
|
||||||
- `web-push-private-key` is the generated VAPID private key, e.g. AA2BB1234567890abcdefzxcvbnm1234567890
|
- `web-push-private-key` is the generated VAPID private key, e.g. AA2BB1234567890abcdefzxcvbnm1234567890
|
||||||
- `web-push-file` is a database file to keep track of browser subscription endpoints, e.g. `/var/cache/ntfy/webpush.db`
|
- `web-push-file` is a database file to keep track of browser subscription endpoints, e.g. `/var/cache/ntfy/webpush.db` (not required if `database-url` is set)
|
||||||
- `web-push-email-address` is the admin email address send to the push provider, e.g. `sysadmin@example.com`
|
- `web-push-email-address` is the admin email address send to the push provider, e.g. `sysadmin@example.com`
|
||||||
- `web-push-startup-queries` is an optional list of queries to run on startup`
|
- `web-push-startup-queries` is an optional list of queries to run on startup`
|
||||||
- `web-push-expiry-warning-duration` defines the duration after which unused subscriptions are sent a warning (default is `55d`)
|
- `web-push-expiry-warning-duration` defines the duration after which unused subscriptions are sent a warning (default is `55d`)
|
||||||
- `web-push-expiry-duration` defines the duration after which unused subscriptions will expire (default is `60d`)
|
- `web-push-expiry-duration` defines the duration after which unused subscriptions will expire (default is `60d`)
|
||||||
|
|
||||||
|
Alternatively, you can use PostgreSQL instead of SQLite for the web push subscription store by setting `database-url`
|
||||||
|
(see [PostgreSQL database](#postgresql-database)).
|
||||||
|
|
||||||
Limitations:
|
Limitations:
|
||||||
|
|
||||||
- Like foreground browser notifications, background push notifications require the web app to be served over HTTPS. A _valid_
|
- Like foreground browser notifications, background push notifications require the web app to be served over HTTPS. A _valid_
|
||||||
@@ -1172,9 +1189,10 @@ web-push-file: /var/cache/ntfy/webpush.db
|
|||||||
web-push-email-address: sysadmin@example.com
|
web-push-email-address: sysadmin@example.com
|
||||||
```
|
```
|
||||||
|
|
||||||
The `web-push-file` is used to store the push subscriptions. Unused subscriptions will send out a warning after 55 days,
|
The `web-push-file` is used to store the push subscriptions in a local SQLite database. Alternatively, if `database-url`
|
||||||
and will automatically expire after 60 days (default). If the gateway returns an error (e.g. 410 Gone when a user has unsubscribed),
|
is set, subscriptions are stored in PostgreSQL and `web-push-file` is not required. Unused subscriptions will send out
|
||||||
subscriptions are also removed automatically.
|
a warning after 55 days, and will automatically expire after 60 days (default). If the gateway returns an error
|
||||||
|
(e.g. 410 Gone when a user has unsubscribed), subscriptions are also removed automatically.
|
||||||
|
|
||||||
The web app refreshes subscriptions on start and regularly on an interval, but this file should be persisted across restarts. If the subscription
|
The web app refreshes subscriptions on start and regularly on an interval, but this file should be persisted across restarts. If the subscription
|
||||||
file is deleted or lost, any web apps that aren't open will not receive new web push notifications until you open then.
|
file is deleted or lost, any web apps that aren't open will not receive new web push notifications until you open then.
|
||||||
@@ -1755,6 +1773,7 @@ variable before running the `ntfy` command (e.g. `export NTFY_LISTEN_HTTP=:80`).
|
|||||||
| `key-file` | `NTFY_KEY_FILE` | *filename* | - | HTTPS/TLS private key file, only used if `listen-https` is set. |
|
| `key-file` | `NTFY_KEY_FILE` | *filename* | - | HTTPS/TLS private key file, only used if `listen-https` is set. |
|
||||||
| `cert-file` | `NTFY_CERT_FILE` | *filename* | - | HTTPS/TLS certificate file, only used if `listen-https` is set. |
|
| `cert-file` | `NTFY_CERT_FILE` | *filename* | - | HTTPS/TLS certificate file, only used if `listen-https` is set. |
|
||||||
| `firebase-key-file` | `NTFY_FIREBASE_KEY_FILE` | *filename* | - | If set, also publish messages to a Firebase Cloud Messaging (FCM) topic for your app. This is optional and only required to save battery when using the Android app. See [Firebase (FCM)](#firebase-fcm). |
|
| `firebase-key-file` | `NTFY_FIREBASE_KEY_FILE` | *filename* | - | If set, also publish messages to a Firebase Cloud Messaging (FCM) topic for your app. This is optional and only required to save battery when using the Android app. See [Firebase (FCM)](#firebase-fcm). |
|
||||||
|
| `database-url` | `NTFY_DATABASE_URL` | *string (connection URL)* | - | PostgreSQL connection string (e.g. `postgres://user:pass@host:5432/ntfy`). If set, uses PostgreSQL for database-backed stores instead of SQLite. Currently applies to the web push store. See [PostgreSQL database](#postgresql-database). |
|
||||||
| `cache-file` | `NTFY_CACHE_FILE` | *filename* | - | If set, messages are cached in a local SQLite database instead of only in-memory. This allows for service restarts without losing messages in support of the since= parameter. See [message cache](#message-cache). |
|
| `cache-file` | `NTFY_CACHE_FILE` | *filename* | - | If set, messages are cached in a local SQLite database instead of only in-memory. This allows for service restarts without losing messages in support of the since= parameter. See [message cache](#message-cache). |
|
||||||
| `cache-duration` | `NTFY_CACHE_DURATION` | *duration* | 12h | Duration for which messages will be buffered before they are deleted. This is required to support the `since=...` and `poll=1` parameter. Set this to `0` to disable the cache entirely. |
|
| `cache-duration` | `NTFY_CACHE_DURATION` | *duration* | 12h | Duration for which messages will be buffered before they are deleted. This is required to support the `since=...` and `poll=1` parameter. Set this to `0` to disable the cache entirely. |
|
||||||
| `cache-startup-queries` | `NTFY_CACHE_STARTUP_QUERIES` | *string (SQL queries)* | - | SQL queries to run during database startup; this is useful for tuning and [enabling WAL mode](#message-cache) |
|
| `cache-startup-queries` | `NTFY_CACHE_STARTUP_QUERIES` | *string (SQL queries)* | - | SQL queries to run during database startup; this is useful for tuning and [enabling WAL mode](#message-cache) |
|
||||||
|
|||||||
@@ -1714,3 +1714,10 @@ and the [ntfy Android app](https://github.com/binwiederhier/ntfy-android/release
|
|||||||
* Fix crash when default server URL is missing scheme by auto-prepending `https://` ([#1582](https://github.com/binwiederhier/ntfy/issues/1582), thanks to [@hard-zero1](https://github.com/hard-zero1))
|
* Fix crash when default server URL is missing scheme by auto-prepending `https://` ([#1582](https://github.com/binwiederhier/ntfy/issues/1582), thanks to [@hard-zero1](https://github.com/hard-zero1))
|
||||||
* Fix notification timestamp to use original send time instead of receive time ([#1112](https://github.com/binwiederhier/ntfy/issues/1112), thanks to [@voruti](https://github.com/voruti) for reporting)
|
* Fix notification timestamp to use original send time instead of receive time ([#1112](https://github.com/binwiederhier/ntfy/issues/1112), thanks to [@voruti](https://github.com/voruti) for reporting)
|
||||||
* Fix notifications being missed after service restart by using persisted lastNotificationId ([#1591](https://github.com/binwiederhier/ntfy/issues/1591), thanks to @Epifeny for reporting)
|
* Fix notifications being missed after service restart by using persisted lastNotificationId ([#1591](https://github.com/binwiederhier/ntfy/issues/1591), thanks to @Epifeny for reporting)
|
||||||
|
* Fix crash in settings when fragment is detached during backup/restore or log operations
|
||||||
|
|
||||||
|
### ntfy server v2.12.x (UNRELEASED)
|
||||||
|
|
||||||
|
**Features:**
|
||||||
|
|
||||||
|
* Add PostgreSQL as an alternative database backend for the web push subscription store via `database-url` config option
|
||||||
|
|||||||
4
go.mod
4
go.mod
@@ -30,6 +30,7 @@ require github.com/pkg/errors v0.9.1 // indirect
|
|||||||
require (
|
require (
|
||||||
firebase.google.com/go/v4 v4.19.0
|
firebase.google.com/go/v4 v4.19.0
|
||||||
github.com/SherClockHolmes/webpush-go v1.4.0
|
github.com/SherClockHolmes/webpush-go v1.4.0
|
||||||
|
github.com/jackc/pgx/v5 v5.8.0
|
||||||
github.com/microcosm-cc/bluemonday v1.0.27
|
github.com/microcosm-cc/bluemonday v1.0.27
|
||||||
github.com/prometheus/client_golang v1.23.2
|
github.com/prometheus/client_golang v1.23.2
|
||||||
github.com/stripe/stripe-go/v74 v74.30.0
|
github.com/stripe/stripe-go/v74 v74.30.0
|
||||||
@@ -71,6 +72,9 @@ require (
|
|||||||
github.com/googleapis/enterprise-certificate-proxy v0.3.11 // indirect
|
github.com/googleapis/enterprise-certificate-proxy v0.3.11 // indirect
|
||||||
github.com/googleapis/gax-go/v2 v2.17.0 // indirect
|
github.com/googleapis/gax-go/v2 v2.17.0 // indirect
|
||||||
github.com/gorilla/css v1.0.1 // indirect
|
github.com/gorilla/css v1.0.1 // indirect
|
||||||
|
github.com/jackc/pgpassfile v1.0.0 // indirect
|
||||||
|
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
|
||||||
|
github.com/jackc/puddle/v2 v2.2.2 // indirect
|
||||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
|
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
|
||||||
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect
|
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect
|
||||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
|
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
|
||||||
|
|||||||
9
go.sum
9
go.sum
@@ -104,6 +104,14 @@ github.com/gorilla/css v1.0.1 h1:ntNaBIghp6JmvWnxbZKANoLyuXTPZ4cAMlo6RyhlbO8=
|
|||||||
github.com/gorilla/css v1.0.1/go.mod h1:BvnYkspnSzMmwRK+b8/xgNPLiIuNZr6vbZBTPQ2A3b0=
|
github.com/gorilla/css v1.0.1/go.mod h1:BvnYkspnSzMmwRK+b8/xgNPLiIuNZr6vbZBTPQ2A3b0=
|
||||||
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
|
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
|
||||||
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||||
|
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
|
||||||
|
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
|
||||||
|
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
|
||||||
|
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
|
||||||
|
github.com/jackc/pgx/v5 v5.8.0 h1:TYPDoleBBme0xGSAX3/+NujXXtpZn9HBONkQC7IEZSo=
|
||||||
|
github.com/jackc/pgx/v5 v5.8.0/go.mod h1:QVeDInX2m9VyzvNeiCJVjCkNFqzsNb43204HshNSZKw=
|
||||||
|
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
|
||||||
|
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
|
||||||
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
|
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
|
||||||
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
|
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
|
||||||
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
|
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
|
||||||
@@ -144,6 +152,7 @@ github.com/spiffe/go-spiffe/v2 v2.6.0/go.mod h1:gm2SeUoMZEtpnzPNs2Csc0D/gX33k1xI
|
|||||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||||
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
|
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
|
||||||
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
|
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
|
||||||
|
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
||||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||||
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
|
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
|
||||||
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
|
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
|
||||||
|
|||||||
@@ -88,6 +88,7 @@ var (
|
|||||||
// Config is the main config struct for the application. Use New to instantiate a default config struct.
|
// Config is the main config struct for the application. Use New to instantiate a default config struct.
|
||||||
type Config struct {
|
type Config struct {
|
||||||
File string // Config file, only used for testing
|
File string // Config file, only used for testing
|
||||||
|
DatabaseURL string // PostgreSQL connection string (e.g. "postgres://user:pass@host:5432/ntfy")
|
||||||
BaseURL string
|
BaseURL string
|
||||||
ListenHTTP string
|
ListenHTTP string
|
||||||
ListenHTTPS string
|
ListenHTTPS string
|
||||||
@@ -192,6 +193,7 @@ type Config struct {
|
|||||||
func NewConfig() *Config {
|
func NewConfig() *Config {
|
||||||
return &Config{
|
return &Config{
|
||||||
File: DefaultConfigFile, // Only used for testing
|
File: DefaultConfigFile, // Only used for testing
|
||||||
|
DatabaseURL: "",
|
||||||
BaseURL: "",
|
BaseURL: "",
|
||||||
ListenHTTP: DefaultListenHTTP,
|
ListenHTTP: DefaultListenHTTP,
|
||||||
ListenHTTPS: "",
|
ListenHTTPS: "",
|
||||||
|
|||||||
@@ -37,6 +37,7 @@ import (
|
|||||||
"heckel.io/ntfy/v2/user"
|
"heckel.io/ntfy/v2/user"
|
||||||
"heckel.io/ntfy/v2/util"
|
"heckel.io/ntfy/v2/util"
|
||||||
"heckel.io/ntfy/v2/util/sprig"
|
"heckel.io/ntfy/v2/util/sprig"
|
||||||
|
"heckel.io/ntfy/v2/webpush"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Server is the main server, providing the UI and API for ntfy
|
// Server is the main server, providing the UI and API for ntfy
|
||||||
@@ -57,7 +58,7 @@ type Server struct {
|
|||||||
messagesHistory []int64 // Last n values of the messages counter, used to determine rate
|
messagesHistory []int64 // Last n values of the messages counter, used to determine rate
|
||||||
userManager *user.Manager // Might be nil!
|
userManager *user.Manager // Might be nil!
|
||||||
messageCache *messageCache // Database that stores the messages
|
messageCache *messageCache // Database that stores the messages
|
||||||
webPush *webPushStore // Database that stores web push subscriptions
|
webPush webpush.Store // Database that stores web push subscriptions
|
||||||
fileCache *fileCache // File system based cache that stores attachments
|
fileCache *fileCache // File system based cache that stores attachments
|
||||||
stripe stripeAPI // Stripe API, can be replaced with a mock
|
stripe stripeAPI // Stripe API, can be replaced with a mock
|
||||||
priceCache *util.LookupCache[map[string]int64] // Stripe price ID -> price as cents (USD implied!)
|
priceCache *util.LookupCache[map[string]int64] // Stripe price ID -> price as cents (USD implied!)
|
||||||
@@ -176,9 +177,13 @@ func New(conf *Config) (*Server, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
var webPush *webPushStore
|
var wp webpush.Store
|
||||||
if conf.WebPushPublicKey != "" {
|
if conf.WebPushPublicKey != "" {
|
||||||
webPush, err = newWebPushStore(conf.WebPushFile, conf.WebPushStartupQueries)
|
if conf.DatabaseURL != "" {
|
||||||
|
wp, err = webpush.NewPostgresStore(conf.DatabaseURL)
|
||||||
|
} else {
|
||||||
|
wp, err = webpush.NewSQLiteStore(conf.WebPushFile, conf.WebPushStartupQueries)
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -233,7 +238,7 @@ func New(conf *Config) (*Server, error) {
|
|||||||
s := &Server{
|
s := &Server{
|
||||||
config: conf,
|
config: conf,
|
||||||
messageCache: messageCache,
|
messageCache: messageCache,
|
||||||
webPush: webPush,
|
webPush: wp,
|
||||||
fileCache: fileCache,
|
fileCache: fileCache,
|
||||||
firebaseClient: firebaseClient,
|
firebaseClient: firebaseClient,
|
||||||
smtpSender: mailer,
|
smtpSender: mailer,
|
||||||
|
|||||||
@@ -38,6 +38,12 @@
|
|||||||
#
|
#
|
||||||
# firebase-key-file: <filename>
|
# firebase-key-file: <filename>
|
||||||
|
|
||||||
|
# If "database-url" is set, ntfy will use PostgreSQL for database-backed stores instead of SQLite.
|
||||||
|
# Currently this applies to the web push subscription store. Message cache and user manager support
|
||||||
|
# will be added in future releases. When set, the "web-push-file" option is not required.
|
||||||
|
#
|
||||||
|
# database-url: "postgres://user:pass@host:5432/ntfy"
|
||||||
|
|
||||||
# If "cache-file" is set, messages are cached in a local SQLite database instead of only in-memory.
|
# If "cache-file" is set, messages are cached in a local SQLite database instead of only in-memory.
|
||||||
# This allows for service restarts without losing messages in support of the since= parameter.
|
# This allows for service restarts without losing messages in support of the since= parameter.
|
||||||
#
|
#
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ import (
|
|||||||
"github.com/SherClockHolmes/webpush-go"
|
"github.com/SherClockHolmes/webpush-go"
|
||||||
"heckel.io/ntfy/v2/log"
|
"heckel.io/ntfy/v2/log"
|
||||||
"heckel.io/ntfy/v2/user"
|
"heckel.io/ntfy/v2/user"
|
||||||
|
wpush "heckel.io/ntfy/v2/webpush"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@@ -128,7 +129,7 @@ func (s *Server) pruneAndNotifyWebPushSubscriptionsInternal() error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
warningSent := make([]*webPushSubscription, 0)
|
warningSent := make([]*wpush.Subscription, 0)
|
||||||
for _, subscription := range subscriptions {
|
for _, subscription := range subscriptions {
|
||||||
if err := s.sendWebPushNotification(subscription, payload); err != nil {
|
if err := s.sendWebPushNotification(subscription, payload); err != nil {
|
||||||
log.Tag(tagWebPush).Err(err).With(subscription).Warn("Unable to publish expiry imminent warning")
|
log.Tag(tagWebPush).Err(err).With(subscription).Warn("Unable to publish expiry imminent warning")
|
||||||
@@ -143,7 +144,7 @@ func (s *Server) pruneAndNotifyWebPushSubscriptionsInternal() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) sendWebPushNotification(sub *webPushSubscription, message []byte, contexters ...log.Contexter) error {
|
func (s *Server) sendWebPushNotification(sub *wpush.Subscription, message []byte, contexters ...log.Contexter) error {
|
||||||
log.Tag(tagWebPush).With(sub).With(contexters...).Debug("Sending web push message")
|
log.Tag(tagWebPush).With(sub).With(contexters...).Debug("Sending web push message")
|
||||||
payload := &webpush.Subscription{
|
payload := &webpush.Subscription{
|
||||||
Endpoint: sub.Endpoint,
|
Endpoint: sub.Endpoint,
|
||||||
|
|||||||
@@ -5,10 +5,6 @@ package server
|
|||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/SherClockHolmes/webpush-go"
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
"heckel.io/ntfy/v2/user"
|
|
||||||
"heckel.io/ntfy/v2/util"
|
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
@@ -18,6 +14,11 @@ import (
|
|||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/SherClockHolmes/webpush-go"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"heckel.io/ntfy/v2/user"
|
||||||
|
"heckel.io/ntfy/v2/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@@ -235,11 +236,11 @@ func TestServer_WebPush_Expiry(t *testing.T) {
|
|||||||
}))
|
}))
|
||||||
defer pushService.Close()
|
defer pushService.Close()
|
||||||
|
|
||||||
addSubscription(t, s, pushService.URL+"/push-receive", "test-topic")
|
endpoint := pushService.URL + "/push-receive"
|
||||||
|
addSubscription(t, s, endpoint, "test-topic")
|
||||||
requireSubscriptionCount(t, s, "test-topic", 1)
|
requireSubscriptionCount(t, s, "test-topic", 1)
|
||||||
|
|
||||||
_, err := s.webPush.db.Exec("UPDATE subscription SET updated_at = ?", time.Now().Add(-55*24*time.Hour).Unix())
|
require.Nil(t, s.webPush.SetSubscriptionUpdatedAt(endpoint, time.Now().Add(-55*24*time.Hour).Unix()))
|
||||||
require.Nil(t, err)
|
|
||||||
|
|
||||||
s.pruneAndNotifyWebPushSubscriptions()
|
s.pruneAndNotifyWebPushSubscriptions()
|
||||||
requireSubscriptionCount(t, s, "test-topic", 1)
|
requireSubscriptionCount(t, s, "test-topic", 1)
|
||||||
@@ -248,8 +249,7 @@ func TestServer_WebPush_Expiry(t *testing.T) {
|
|||||||
return received.Load()
|
return received.Load()
|
||||||
})
|
})
|
||||||
|
|
||||||
_, err = s.webPush.db.Exec("UPDATE subscription SET updated_at = ?", time.Now().Add(-60*24*time.Hour).Unix())
|
require.Nil(t, s.webPush.SetSubscriptionUpdatedAt(endpoint, time.Now().Add(-60*24*time.Hour).Unix()))
|
||||||
require.Nil(t, err)
|
|
||||||
|
|
||||||
s.pruneAndNotifyWebPushSubscriptions()
|
s.pruneAndNotifyWebPushSubscriptions()
|
||||||
waitFor(t, func() bool {
|
waitFor(t, func() bool {
|
||||||
|
|||||||
@@ -593,22 +593,6 @@ func newWebPushSubscriptionExpiringPayload() *webPushControlMessagePayload {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type webPushSubscription struct {
|
|
||||||
ID string
|
|
||||||
Endpoint string
|
|
||||||
Auth string
|
|
||||||
P256dh string
|
|
||||||
UserID string
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *webPushSubscription) Context() log.Context {
|
|
||||||
return map[string]any{
|
|
||||||
"web_push_subscription_id": w.ID,
|
|
||||||
"web_push_subscription_user_id": w.UserID,
|
|
||||||
"web_push_subscription_endpoint": w.Endpoint,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// https://developer.mozilla.org/en-US/docs/Web/Manifest
|
// https://developer.mozilla.org/en-US/docs/Web/Manifest
|
||||||
type webManifestResponse struct {
|
type webManifestResponse struct {
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
|
|||||||
@@ -1,285 +0,0 @@
|
|||||||
package server
|
|
||||||
|
|
||||||
import (
|
|
||||||
"database/sql"
|
|
||||||
"errors"
|
|
||||||
"heckel.io/ntfy/v2/util"
|
|
||||||
"net/netip"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
_ "github.com/mattn/go-sqlite3" // SQLite driver
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
subscriptionIDPrefix = "wps_"
|
|
||||||
subscriptionIDLength = 10
|
|
||||||
subscriptionEndpointLimitPerSubscriberIP = 10
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
errWebPushNoRows = errors.New("no rows found")
|
|
||||||
errWebPushTooManySubscriptions = errors.New("too many subscriptions")
|
|
||||||
errWebPushUserIDCannotBeEmpty = errors.New("user ID cannot be empty")
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
createWebPushSubscriptionsTableQuery = `
|
|
||||||
BEGIN;
|
|
||||||
CREATE TABLE IF NOT EXISTS subscription (
|
|
||||||
id TEXT PRIMARY KEY,
|
|
||||||
endpoint TEXT NOT NULL,
|
|
||||||
key_auth TEXT NOT NULL,
|
|
||||||
key_p256dh TEXT NOT NULL,
|
|
||||||
user_id TEXT NOT NULL,
|
|
||||||
subscriber_ip TEXT NOT NULL,
|
|
||||||
updated_at INT NOT NULL,
|
|
||||||
warned_at INT NOT NULL DEFAULT 0
|
|
||||||
);
|
|
||||||
CREATE UNIQUE INDEX IF NOT EXISTS idx_endpoint ON subscription (endpoint);
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_subscriber_ip ON subscription (subscriber_ip);
|
|
||||||
CREATE TABLE IF NOT EXISTS subscription_topic (
|
|
||||||
subscription_id TEXT NOT NULL,
|
|
||||||
topic TEXT NOT NULL,
|
|
||||||
PRIMARY KEY (subscription_id, topic),
|
|
||||||
FOREIGN KEY (subscription_id) REFERENCES subscription (id) ON DELETE CASCADE
|
|
||||||
);
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_topic ON subscription_topic (topic);
|
|
||||||
CREATE TABLE IF NOT EXISTS schemaVersion (
|
|
||||||
id INT PRIMARY KEY,
|
|
||||||
version INT NOT NULL
|
|
||||||
);
|
|
||||||
COMMIT;
|
|
||||||
`
|
|
||||||
builtinStartupQueries = `
|
|
||||||
PRAGMA foreign_keys = ON;
|
|
||||||
`
|
|
||||||
|
|
||||||
selectWebPushSubscriptionIDByEndpoint = `SELECT id FROM subscription WHERE endpoint = ?`
|
|
||||||
selectWebPushSubscriptionCountBySubscriberIP = `SELECT COUNT(*) FROM subscription WHERE subscriber_ip = ?`
|
|
||||||
selectWebPushSubscriptionsForTopicQuery = `
|
|
||||||
SELECT id, endpoint, key_auth, key_p256dh, user_id
|
|
||||||
FROM subscription_topic st
|
|
||||||
JOIN subscription s ON s.id = st.subscription_id
|
|
||||||
WHERE st.topic = ?
|
|
||||||
ORDER BY endpoint
|
|
||||||
`
|
|
||||||
selectWebPushSubscriptionsExpiringSoonQuery = `
|
|
||||||
SELECT id, endpoint, key_auth, key_p256dh, user_id
|
|
||||||
FROM subscription
|
|
||||||
WHERE warned_at = 0 AND updated_at <= ?
|
|
||||||
`
|
|
||||||
insertWebPushSubscriptionQuery = `
|
|
||||||
INSERT INTO subscription (id, endpoint, key_auth, key_p256dh, user_id, subscriber_ip, updated_at, warned_at)
|
|
||||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
||||||
ON CONFLICT (endpoint)
|
|
||||||
DO UPDATE SET key_auth = excluded.key_auth, key_p256dh = excluded.key_p256dh, user_id = excluded.user_id, subscriber_ip = excluded.subscriber_ip, updated_at = excluded.updated_at, warned_at = excluded.warned_at
|
|
||||||
`
|
|
||||||
updateWebPushSubscriptionWarningSentQuery = `UPDATE subscription SET warned_at = ? WHERE id = ?`
|
|
||||||
deleteWebPushSubscriptionByEndpointQuery = `DELETE FROM subscription WHERE endpoint = ?`
|
|
||||||
deleteWebPushSubscriptionByUserIDQuery = `DELETE FROM subscription WHERE user_id = ?`
|
|
||||||
deleteWebPushSubscriptionByAgeQuery = `DELETE FROM subscription WHERE updated_at <= ?` // Full table scan!
|
|
||||||
|
|
||||||
insertWebPushSubscriptionTopicQuery = `INSERT INTO subscription_topic (subscription_id, topic) VALUES (?, ?)`
|
|
||||||
deleteWebPushSubscriptionTopicAllQuery = `DELETE FROM subscription_topic WHERE subscription_id = ?`
|
|
||||||
deleteWebPushSubscriptionTopicWithoutSubscription = `DELETE FROM subscription_topic WHERE subscription_id NOT IN (SELECT id FROM subscription)`
|
|
||||||
)
|
|
||||||
|
|
||||||
// Schema management queries
|
|
||||||
const (
|
|
||||||
currentWebPushSchemaVersion = 1
|
|
||||||
insertWebPushSchemaVersion = `INSERT INTO schemaVersion VALUES (1, ?)`
|
|
||||||
selectWebPushSchemaVersionQuery = `SELECT version FROM schemaVersion WHERE id = 1`
|
|
||||||
)
|
|
||||||
|
|
||||||
type webPushStore struct {
|
|
||||||
db *sql.DB
|
|
||||||
}
|
|
||||||
|
|
||||||
func newWebPushStore(filename, startupQueries string) (*webPushStore, error) {
|
|
||||||
db, err := sql.Open("sqlite3", filename)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if err := setupWebPushDB(db); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if err := runWebPushStartupQueries(db, startupQueries); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return &webPushStore{
|
|
||||||
db: db,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func setupWebPushDB(db *sql.DB) error {
|
|
||||||
// If 'schemaVersion' table does not exist, this must be a new database
|
|
||||||
rows, err := db.Query(selectWebPushSchemaVersionQuery)
|
|
||||||
if err != nil {
|
|
||||||
return setupNewWebPushDB(db)
|
|
||||||
}
|
|
||||||
return rows.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
func setupNewWebPushDB(db *sql.DB) error {
|
|
||||||
if _, err := db.Exec(createWebPushSubscriptionsTableQuery); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if _, err := db.Exec(insertWebPushSchemaVersion, currentWebPushSchemaVersion); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func runWebPushStartupQueries(db *sql.DB, startupQueries string) error {
|
|
||||||
if _, err := db.Exec(startupQueries); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if _, err := db.Exec(builtinStartupQueries); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// UpsertSubscription adds or updates Web Push subscriptions for the given topics and user ID. It always first deletes all
|
|
||||||
// existing entries for a given endpoint.
|
|
||||||
func (c *webPushStore) UpsertSubscription(endpoint string, auth, p256dh, userID string, subscriberIP netip.Addr, topics []string) error {
|
|
||||||
tx, err := c.db.Begin()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer tx.Rollback()
|
|
||||||
// Read number of subscriptions for subscriber IP address
|
|
||||||
rowsCount, err := tx.Query(selectWebPushSubscriptionCountBySubscriberIP, subscriberIP.String())
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer rowsCount.Close()
|
|
||||||
var subscriptionCount int
|
|
||||||
if !rowsCount.Next() {
|
|
||||||
return errWebPushNoRows
|
|
||||||
}
|
|
||||||
if err := rowsCount.Scan(&subscriptionCount); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := rowsCount.Close(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
// Read existing subscription ID for endpoint (or create new ID)
|
|
||||||
rows, err := tx.Query(selectWebPushSubscriptionIDByEndpoint, endpoint)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer rows.Close()
|
|
||||||
var subscriptionID string
|
|
||||||
if rows.Next() {
|
|
||||||
if err := rows.Scan(&subscriptionID); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if subscriptionCount >= subscriptionEndpointLimitPerSubscriberIP {
|
|
||||||
return errWebPushTooManySubscriptions
|
|
||||||
}
|
|
||||||
subscriptionID = util.RandomStringPrefix(subscriptionIDPrefix, subscriptionIDLength)
|
|
||||||
}
|
|
||||||
if err := rows.Close(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
// Insert or update subscription
|
|
||||||
updatedAt, warnedAt := time.Now().Unix(), 0
|
|
||||||
if _, err = tx.Exec(insertWebPushSubscriptionQuery, subscriptionID, endpoint, auth, p256dh, userID, subscriberIP.String(), updatedAt, warnedAt); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
// Replace all subscription topics
|
|
||||||
if _, err := tx.Exec(deleteWebPushSubscriptionTopicAllQuery, subscriptionID); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
for _, topic := range topics {
|
|
||||||
if _, err = tx.Exec(insertWebPushSubscriptionTopicQuery, subscriptionID, topic); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return tx.Commit()
|
|
||||||
}
|
|
||||||
|
|
||||||
// SubscriptionsForTopic returns all subscriptions for the given topic
|
|
||||||
func (c *webPushStore) SubscriptionsForTopic(topic string) ([]*webPushSubscription, error) {
|
|
||||||
rows, err := c.db.Query(selectWebPushSubscriptionsForTopicQuery, topic)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer rows.Close()
|
|
||||||
return c.subscriptionsFromRows(rows)
|
|
||||||
}
|
|
||||||
|
|
||||||
// SubscriptionsExpiring returns all subscriptions that have not been updated for a given time period
|
|
||||||
func (c *webPushStore) SubscriptionsExpiring(warnAfter time.Duration) ([]*webPushSubscription, error) {
|
|
||||||
rows, err := c.db.Query(selectWebPushSubscriptionsExpiringSoonQuery, time.Now().Add(-warnAfter).Unix())
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer rows.Close()
|
|
||||||
return c.subscriptionsFromRows(rows)
|
|
||||||
}
|
|
||||||
|
|
||||||
// MarkExpiryWarningSent marks the given subscriptions as having received a warning about expiring soon
|
|
||||||
func (c *webPushStore) MarkExpiryWarningSent(subscriptions []*webPushSubscription) error {
|
|
||||||
tx, err := c.db.Begin()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer tx.Rollback()
|
|
||||||
for _, subscription := range subscriptions {
|
|
||||||
if _, err := tx.Exec(updateWebPushSubscriptionWarningSentQuery, time.Now().Unix(), subscription.ID); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return tx.Commit()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *webPushStore) subscriptionsFromRows(rows *sql.Rows) ([]*webPushSubscription, error) {
|
|
||||||
subscriptions := make([]*webPushSubscription, 0)
|
|
||||||
for rows.Next() {
|
|
||||||
var id, endpoint, auth, p256dh, userID string
|
|
||||||
if err := rows.Scan(&id, &endpoint, &auth, &p256dh, &userID); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
subscriptions = append(subscriptions, &webPushSubscription{
|
|
||||||
ID: id,
|
|
||||||
Endpoint: endpoint,
|
|
||||||
Auth: auth,
|
|
||||||
P256dh: p256dh,
|
|
||||||
UserID: userID,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
return subscriptions, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// RemoveSubscriptionsByEndpoint removes the subscription for the given endpoint
|
|
||||||
func (c *webPushStore) RemoveSubscriptionsByEndpoint(endpoint string) error {
|
|
||||||
_, err := c.db.Exec(deleteWebPushSubscriptionByEndpointQuery, endpoint)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// RemoveSubscriptionsByUserID removes all subscriptions for the given user ID
|
|
||||||
func (c *webPushStore) RemoveSubscriptionsByUserID(userID string) error {
|
|
||||||
if userID == "" {
|
|
||||||
return errWebPushUserIDCannotBeEmpty
|
|
||||||
}
|
|
||||||
_, err := c.db.Exec(deleteWebPushSubscriptionByUserIDQuery, userID)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// RemoveExpiredSubscriptions removes all subscriptions that have not been updated for a given time period
|
|
||||||
func (c *webPushStore) RemoveExpiredSubscriptions(expireAfter time.Duration) error {
|
|
||||||
_, err := c.db.Exec(deleteWebPushSubscriptionByAgeQuery, time.Now().Add(-expireAfter).Unix())
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
_, err = c.db.Exec(deleteWebPushSubscriptionTopicWithoutSubscription)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close closes the underlying database connection
|
|
||||||
func (c *webPushStore) Close() error {
|
|
||||||
return c.db.Close()
|
|
||||||
}
|
|
||||||
@@ -1,199 +0,0 @@
|
|||||||
package server
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
"net/netip"
|
|
||||||
"path/filepath"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestWebPushStore_UpsertSubscription_SubscriptionsForTopic(t *testing.T) {
|
|
||||||
webPush := newTestWebPushStore(t)
|
|
||||||
defer webPush.Close()
|
|
||||||
|
|
||||||
require.Nil(t, webPush.UpsertSubscription(testWebPushEndpoint, "auth-key", "p256dh-key", "u_1234", netip.MustParseAddr("1.2.3.4"), []string{"test-topic", "mytopic"}))
|
|
||||||
|
|
||||||
subs, err := webPush.SubscriptionsForTopic("test-topic")
|
|
||||||
require.Nil(t, err)
|
|
||||||
require.Len(t, subs, 1)
|
|
||||||
require.Equal(t, subs[0].Endpoint, testWebPushEndpoint)
|
|
||||||
require.Equal(t, subs[0].P256dh, "p256dh-key")
|
|
||||||
require.Equal(t, subs[0].Auth, "auth-key")
|
|
||||||
require.Equal(t, subs[0].UserID, "u_1234")
|
|
||||||
|
|
||||||
subs2, err := webPush.SubscriptionsForTopic("mytopic")
|
|
||||||
require.Nil(t, err)
|
|
||||||
require.Len(t, subs2, 1)
|
|
||||||
require.Equal(t, subs[0].Endpoint, subs2[0].Endpoint)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestWebPushStore_UpsertSubscription_SubscriberIPLimitReached(t *testing.T) {
|
|
||||||
webPush := newTestWebPushStore(t)
|
|
||||||
defer webPush.Close()
|
|
||||||
|
|
||||||
// Insert 10 subscriptions with the same IP address
|
|
||||||
for i := 0; i < 10; i++ {
|
|
||||||
endpoint := fmt.Sprintf(testWebPushEndpoint+"%d", i)
|
|
||||||
require.Nil(t, webPush.UpsertSubscription(endpoint, "auth-key", "p256dh-key", "u_1234", netip.MustParseAddr("1.2.3.4"), []string{"test-topic", "mytopic"}))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Another one for the same endpoint should be fine
|
|
||||||
require.Nil(t, webPush.UpsertSubscription(testWebPushEndpoint+"0", "auth-key", "p256dh-key", "u_1234", netip.MustParseAddr("1.2.3.4"), []string{"test-topic", "mytopic"}))
|
|
||||||
|
|
||||||
// But with a different endpoint it should fail
|
|
||||||
require.Equal(t, errWebPushTooManySubscriptions, webPush.UpsertSubscription(testWebPushEndpoint+"11", "auth-key", "p256dh-key", "u_1234", netip.MustParseAddr("1.2.3.4"), []string{"test-topic", "mytopic"}))
|
|
||||||
|
|
||||||
// But with a different IP address it should be fine again
|
|
||||||
require.Nil(t, webPush.UpsertSubscription(testWebPushEndpoint+"99", "auth-key", "p256dh-key", "u_1234", netip.MustParseAddr("9.9.9.9"), []string{"test-topic", "mytopic"}))
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestWebPushStore_UpsertSubscription_UpdateTopics(t *testing.T) {
|
|
||||||
webPush := newTestWebPushStore(t)
|
|
||||||
defer webPush.Close()
|
|
||||||
|
|
||||||
// Insert subscription with two topics, and another with one topic
|
|
||||||
require.Nil(t, webPush.UpsertSubscription(testWebPushEndpoint+"0", "auth-key", "p256dh-key", "u_1234", netip.MustParseAddr("1.2.3.4"), []string{"topic1", "topic2"}))
|
|
||||||
require.Nil(t, webPush.UpsertSubscription(testWebPushEndpoint+"1", "auth-key", "p256dh-key", "", netip.MustParseAddr("9.9.9.9"), []string{"topic1"}))
|
|
||||||
|
|
||||||
subs, err := webPush.SubscriptionsForTopic("topic1")
|
|
||||||
require.Nil(t, err)
|
|
||||||
require.Len(t, subs, 2)
|
|
||||||
require.Equal(t, testWebPushEndpoint+"0", subs[0].Endpoint)
|
|
||||||
require.Equal(t, testWebPushEndpoint+"1", subs[1].Endpoint)
|
|
||||||
|
|
||||||
subs, err = webPush.SubscriptionsForTopic("topic2")
|
|
||||||
require.Nil(t, err)
|
|
||||||
require.Len(t, subs, 1)
|
|
||||||
require.Equal(t, testWebPushEndpoint+"0", subs[0].Endpoint)
|
|
||||||
|
|
||||||
// Update the first subscription to have only one topic
|
|
||||||
require.Nil(t, webPush.UpsertSubscription(testWebPushEndpoint+"0", "auth-key", "p256dh-key", "u_1234", netip.MustParseAddr("1.2.3.4"), []string{"topic1"}))
|
|
||||||
|
|
||||||
subs, err = webPush.SubscriptionsForTopic("topic1")
|
|
||||||
require.Nil(t, err)
|
|
||||||
require.Len(t, subs, 2)
|
|
||||||
require.Equal(t, testWebPushEndpoint+"0", subs[0].Endpoint)
|
|
||||||
|
|
||||||
subs, err = webPush.SubscriptionsForTopic("topic2")
|
|
||||||
require.Nil(t, err)
|
|
||||||
require.Len(t, subs, 0)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestWebPushStore_RemoveSubscriptionsByEndpoint(t *testing.T) {
|
|
||||||
webPush := newTestWebPushStore(t)
|
|
||||||
defer webPush.Close()
|
|
||||||
|
|
||||||
// Insert subscription with two topics
|
|
||||||
require.Nil(t, webPush.UpsertSubscription(testWebPushEndpoint, "auth-key", "p256dh-key", "u_1234", netip.MustParseAddr("1.2.3.4"), []string{"topic1", "topic2"}))
|
|
||||||
subs, err := webPush.SubscriptionsForTopic("topic1")
|
|
||||||
require.Nil(t, err)
|
|
||||||
require.Len(t, subs, 1)
|
|
||||||
|
|
||||||
// And remove it again
|
|
||||||
require.Nil(t, webPush.RemoveSubscriptionsByEndpoint(testWebPushEndpoint))
|
|
||||||
subs, err = webPush.SubscriptionsForTopic("topic1")
|
|
||||||
require.Nil(t, err)
|
|
||||||
require.Len(t, subs, 0)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestWebPushStore_RemoveSubscriptionsByUserID(t *testing.T) {
|
|
||||||
webPush := newTestWebPushStore(t)
|
|
||||||
defer webPush.Close()
|
|
||||||
|
|
||||||
// Insert subscription with two topics
|
|
||||||
require.Nil(t, webPush.UpsertSubscription(testWebPushEndpoint, "auth-key", "p256dh-key", "u_1234", netip.MustParseAddr("1.2.3.4"), []string{"topic1", "topic2"}))
|
|
||||||
subs, err := webPush.SubscriptionsForTopic("topic1")
|
|
||||||
require.Nil(t, err)
|
|
||||||
require.Len(t, subs, 1)
|
|
||||||
|
|
||||||
// And remove it again
|
|
||||||
require.Nil(t, webPush.RemoveSubscriptionsByUserID("u_1234"))
|
|
||||||
subs, err = webPush.SubscriptionsForTopic("topic1")
|
|
||||||
require.Nil(t, err)
|
|
||||||
require.Len(t, subs, 0)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestWebPushStore_RemoveSubscriptionsByUserID_Empty(t *testing.T) {
|
|
||||||
webPush := newTestWebPushStore(t)
|
|
||||||
defer webPush.Close()
|
|
||||||
require.Equal(t, errWebPushUserIDCannotBeEmpty, webPush.RemoveSubscriptionsByUserID(""))
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestWebPushStore_MarkExpiryWarningSent(t *testing.T) {
|
|
||||||
webPush := newTestWebPushStore(t)
|
|
||||||
defer webPush.Close()
|
|
||||||
|
|
||||||
// Insert subscription with two topics
|
|
||||||
require.Nil(t, webPush.UpsertSubscription(testWebPushEndpoint, "auth-key", "p256dh-key", "u_1234", netip.MustParseAddr("1.2.3.4"), []string{"topic1", "topic2"}))
|
|
||||||
subs, err := webPush.SubscriptionsForTopic("topic1")
|
|
||||||
require.Nil(t, err)
|
|
||||||
require.Len(t, subs, 1)
|
|
||||||
|
|
||||||
// Mark them as warning sent
|
|
||||||
require.Nil(t, webPush.MarkExpiryWarningSent(subs))
|
|
||||||
|
|
||||||
rows, err := webPush.db.Query("SELECT endpoint FROM subscription WHERE warned_at > 0")
|
|
||||||
require.Nil(t, err)
|
|
||||||
defer rows.Close()
|
|
||||||
var endpoint string
|
|
||||||
require.True(t, rows.Next())
|
|
||||||
require.Nil(t, rows.Scan(&endpoint))
|
|
||||||
require.Nil(t, err)
|
|
||||||
require.Equal(t, testWebPushEndpoint, endpoint)
|
|
||||||
require.False(t, rows.Next())
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestWebPushStore_SubscriptionsExpiring(t *testing.T) {
|
|
||||||
webPush := newTestWebPushStore(t)
|
|
||||||
defer webPush.Close()
|
|
||||||
|
|
||||||
// Insert subscription with two topics
|
|
||||||
require.Nil(t, webPush.UpsertSubscription(testWebPushEndpoint, "auth-key", "p256dh-key", "u_1234", netip.MustParseAddr("1.2.3.4"), []string{"topic1", "topic2"}))
|
|
||||||
subs, err := webPush.SubscriptionsForTopic("topic1")
|
|
||||||
require.Nil(t, err)
|
|
||||||
require.Len(t, subs, 1)
|
|
||||||
|
|
||||||
// Fake-mark them as soon-to-expire
|
|
||||||
_, err = webPush.db.Exec("UPDATE subscription SET updated_at = ? WHERE endpoint = ?", time.Now().Add(-8*24*time.Hour).Unix(), testWebPushEndpoint)
|
|
||||||
require.Nil(t, err)
|
|
||||||
|
|
||||||
// Should not be cleaned up yet
|
|
||||||
require.Nil(t, webPush.RemoveExpiredSubscriptions(9*24*time.Hour))
|
|
||||||
|
|
||||||
// Run expiration
|
|
||||||
subs, err = webPush.SubscriptionsExpiring(7 * 24 * time.Hour)
|
|
||||||
require.Nil(t, err)
|
|
||||||
require.Len(t, subs, 1)
|
|
||||||
require.Equal(t, testWebPushEndpoint, subs[0].Endpoint)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestWebPushStore_RemoveExpiredSubscriptions(t *testing.T) {
|
|
||||||
webPush := newTestWebPushStore(t)
|
|
||||||
defer webPush.Close()
|
|
||||||
|
|
||||||
// Insert subscription with two topics
|
|
||||||
require.Nil(t, webPush.UpsertSubscription(testWebPushEndpoint, "auth-key", "p256dh-key", "u_1234", netip.MustParseAddr("1.2.3.4"), []string{"topic1", "topic2"}))
|
|
||||||
subs, err := webPush.SubscriptionsForTopic("topic1")
|
|
||||||
require.Nil(t, err)
|
|
||||||
require.Len(t, subs, 1)
|
|
||||||
|
|
||||||
// Fake-mark them as expired
|
|
||||||
_, err = webPush.db.Exec("UPDATE subscription SET updated_at = ? WHERE endpoint = ?", time.Now().Add(-10*24*time.Hour).Unix(), testWebPushEndpoint)
|
|
||||||
require.Nil(t, err)
|
|
||||||
|
|
||||||
// Run expiration
|
|
||||||
require.Nil(t, webPush.RemoveExpiredSubscriptions(9*24*time.Hour))
|
|
||||||
|
|
||||||
// List again, should be 0
|
|
||||||
subs, err = webPush.SubscriptionsForTopic("topic1")
|
|
||||||
require.Nil(t, err)
|
|
||||||
require.Len(t, subs, 0)
|
|
||||||
}
|
|
||||||
|
|
||||||
func newTestWebPushStore(t *testing.T) *webPushStore {
|
|
||||||
webPush, err := newWebPushStore(filepath.Join(t.TempDir(), "webpush.db"), "")
|
|
||||||
require.Nil(t, err)
|
|
||||||
return webPush
|
|
||||||
}
|
|
||||||
188
webpush/store.go
Normal file
188
webpush/store.go
Normal file
@@ -0,0 +1,188 @@
|
|||||||
|
package webpush
|
||||||
|
|
||||||
|
import (
|
||||||
|
"database/sql"
|
||||||
|
"errors"
|
||||||
|
"net/netip"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"heckel.io/ntfy/v2/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
subscriptionIDPrefix = "wps_"
|
||||||
|
subscriptionIDLength = 10
|
||||||
|
subscriptionEndpointLimitPerSubscriberIP = 10
|
||||||
|
)
|
||||||
|
|
||||||
|
// Errors returned by the store
|
||||||
|
var (
|
||||||
|
ErrWebPushTooManySubscriptions = errors.New("too many subscriptions")
|
||||||
|
ErrWebPushUserIDCannotBeEmpty = errors.New("user ID cannot be empty")
|
||||||
|
)
|
||||||
|
|
||||||
|
// Store is the interface for a web push subscription store.
|
||||||
|
type Store interface {
|
||||||
|
UpsertSubscription(endpoint, auth, p256dh, userID string, subscriberIP netip.Addr, topics []string) error
|
||||||
|
SubscriptionsForTopic(topic string) ([]*Subscription, error)
|
||||||
|
SubscriptionsExpiring(warnAfter time.Duration) ([]*Subscription, error)
|
||||||
|
MarkExpiryWarningSent(subscriptions []*Subscription) error
|
||||||
|
RemoveSubscriptionsByEndpoint(endpoint string) error
|
||||||
|
RemoveSubscriptionsByUserID(userID string) error
|
||||||
|
RemoveExpiredSubscriptions(expireAfter time.Duration) error
|
||||||
|
SetSubscriptionUpdatedAt(endpoint string, updatedAt int64) error
|
||||||
|
Close() error
|
||||||
|
}
|
||||||
|
|
||||||
|
// storeQueries holds the database-specific SQL queries.
|
||||||
|
type storeQueries struct {
|
||||||
|
selectSubscriptionIDByEndpoint string
|
||||||
|
selectSubscriptionCountBySubscriberIP string
|
||||||
|
selectSubscriptionsForTopic string
|
||||||
|
selectSubscriptionsExpiringSoon string
|
||||||
|
insertSubscription string
|
||||||
|
updateSubscriptionWarningSent string
|
||||||
|
updateSubscriptionUpdatedAt string
|
||||||
|
deleteSubscriptionByEndpoint string
|
||||||
|
deleteSubscriptionByUserID string
|
||||||
|
deleteSubscriptionByAge string
|
||||||
|
insertSubscriptionTopic string
|
||||||
|
deleteSubscriptionTopicAll string
|
||||||
|
deleteSubscriptionTopicWithoutSubscription string
|
||||||
|
}
|
||||||
|
|
||||||
|
// commonStore implements store operations that are identical across database backends.
|
||||||
|
type commonStore struct {
|
||||||
|
db *sql.DB
|
||||||
|
queries storeQueries
|
||||||
|
}
|
||||||
|
|
||||||
|
// UpsertSubscription adds or updates Web Push subscriptions for the given topics and user ID.
|
||||||
|
func (s *commonStore) UpsertSubscription(endpoint string, auth, p256dh, userID string, subscriberIP netip.Addr, topics []string) error {
|
||||||
|
tx, err := s.db.Begin()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer tx.Rollback()
|
||||||
|
// Read number of subscriptions for subscriber IP address
|
||||||
|
var subscriptionCount int
|
||||||
|
if err := tx.QueryRow(s.queries.selectSubscriptionCountBySubscriberIP, subscriberIP.String()).Scan(&subscriptionCount); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// Read existing subscription ID for endpoint (or create new ID)
|
||||||
|
var subscriptionID string
|
||||||
|
err = tx.QueryRow(s.queries.selectSubscriptionIDByEndpoint, endpoint).Scan(&subscriptionID)
|
||||||
|
if errors.Is(err, sql.ErrNoRows) {
|
||||||
|
if subscriptionCount >= subscriptionEndpointLimitPerSubscriberIP {
|
||||||
|
return ErrWebPushTooManySubscriptions
|
||||||
|
}
|
||||||
|
subscriptionID = util.RandomStringPrefix(subscriptionIDPrefix, subscriptionIDLength)
|
||||||
|
} else if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// Insert or update subscription
|
||||||
|
updatedAt, warnedAt := time.Now().Unix(), 0
|
||||||
|
if _, err = tx.Exec(s.queries.insertSubscription, subscriptionID, endpoint, auth, p256dh, userID, subscriberIP.String(), updatedAt, warnedAt); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// Replace all subscription topics
|
||||||
|
if _, err := tx.Exec(s.queries.deleteSubscriptionTopicAll, subscriptionID); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, topic := range topics {
|
||||||
|
if _, err = tx.Exec(s.queries.insertSubscriptionTopic, subscriptionID, topic); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return tx.Commit()
|
||||||
|
}
|
||||||
|
|
||||||
|
// SubscriptionsForTopic returns all subscriptions for the given topic.
|
||||||
|
func (s *commonStore) SubscriptionsForTopic(topic string) ([]*Subscription, error) {
|
||||||
|
rows, err := s.db.Query(s.queries.selectSubscriptionsForTopic, topic)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
return subscriptionsFromRows(rows)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SubscriptionsExpiring returns all subscriptions that have not been updated for a given time period.
|
||||||
|
func (s *commonStore) SubscriptionsExpiring(warnAfter time.Duration) ([]*Subscription, error) {
|
||||||
|
rows, err := s.db.Query(s.queries.selectSubscriptionsExpiringSoon, time.Now().Add(-warnAfter).Unix())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
return subscriptionsFromRows(rows)
|
||||||
|
}
|
||||||
|
|
||||||
|
// MarkExpiryWarningSent marks the given subscriptions as having received a warning about expiring soon.
|
||||||
|
func (s *commonStore) MarkExpiryWarningSent(subscriptions []*Subscription) error {
|
||||||
|
tx, err := s.db.Begin()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer tx.Rollback()
|
||||||
|
for _, subscription := range subscriptions {
|
||||||
|
if _, err := tx.Exec(s.queries.updateSubscriptionWarningSent, time.Now().Unix(), subscription.ID); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return tx.Commit()
|
||||||
|
}
|
||||||
|
|
||||||
|
// RemoveSubscriptionsByEndpoint removes the subscription for the given endpoint.
|
||||||
|
func (s *commonStore) RemoveSubscriptionsByEndpoint(endpoint string) error {
|
||||||
|
_, err := s.db.Exec(s.queries.deleteSubscriptionByEndpoint, endpoint)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// RemoveSubscriptionsByUserID removes all subscriptions for the given user ID.
|
||||||
|
func (s *commonStore) RemoveSubscriptionsByUserID(userID string) error {
|
||||||
|
if userID == "" {
|
||||||
|
return ErrWebPushUserIDCannotBeEmpty
|
||||||
|
}
|
||||||
|
_, err := s.db.Exec(s.queries.deleteSubscriptionByUserID, userID)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// RemoveExpiredSubscriptions removes all subscriptions that have not been updated for a given time period.
|
||||||
|
func (s *commonStore) RemoveExpiredSubscriptions(expireAfter time.Duration) error {
|
||||||
|
_, err := s.db.Exec(s.queries.deleteSubscriptionByAge, time.Now().Add(-expireAfter).Unix())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
_, err = s.db.Exec(s.queries.deleteSubscriptionTopicWithoutSubscription)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetSubscriptionUpdatedAt updates the updated_at timestamp for a subscription by endpoint. This is
|
||||||
|
// exported for testing purposes and is not part of the Store interface.
|
||||||
|
func (s *commonStore) SetSubscriptionUpdatedAt(endpoint string, updatedAt int64) error {
|
||||||
|
_, err := s.db.Exec(s.queries.updateSubscriptionUpdatedAt, updatedAt, endpoint)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close closes the underlying database connection.
|
||||||
|
func (s *commonStore) Close() error {
|
||||||
|
return s.db.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func subscriptionsFromRows(rows *sql.Rows) ([]*Subscription, error) {
|
||||||
|
subscriptions := make([]*Subscription, 0)
|
||||||
|
for rows.Next() {
|
||||||
|
var id, endpoint, auth, p256dh, userID string
|
||||||
|
if err := rows.Scan(&id, &endpoint, &auth, &p256dh, &userID); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
subscriptions = append(subscriptions, &Subscription{
|
||||||
|
ID: id,
|
||||||
|
Endpoint: endpoint,
|
||||||
|
Auth: auth,
|
||||||
|
P256dh: p256dh,
|
||||||
|
UserID: userID,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return subscriptions, nil
|
||||||
|
}
|
||||||
126
webpush/store_postgres.go
Normal file
126
webpush/store_postgres.go
Normal file
@@ -0,0 +1,126 @@
|
|||||||
|
package webpush
|
||||||
|
|
||||||
|
import (
|
||||||
|
"database/sql"
|
||||||
|
|
||||||
|
_ "github.com/jackc/pgx/v5/stdlib" // PostgreSQL driver
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
pgCreateTablesQuery = `
|
||||||
|
CREATE TABLE IF NOT EXISTS webpush_subscription (
|
||||||
|
id TEXT PRIMARY KEY,
|
||||||
|
endpoint TEXT NOT NULL UNIQUE,
|
||||||
|
key_auth TEXT NOT NULL,
|
||||||
|
key_p256dh TEXT NOT NULL,
|
||||||
|
user_id TEXT NOT NULL,
|
||||||
|
subscriber_ip TEXT NOT NULL,
|
||||||
|
updated_at BIGINT NOT NULL,
|
||||||
|
warned_at BIGINT NOT NULL DEFAULT 0
|
||||||
|
);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_webpush_subscriber_ip ON webpush_subscription (subscriber_ip);
|
||||||
|
CREATE TABLE IF NOT EXISTS webpush_subscription_topic (
|
||||||
|
subscription_id TEXT NOT NULL REFERENCES webpush_subscription (id) ON DELETE CASCADE,
|
||||||
|
topic TEXT NOT NULL,
|
||||||
|
PRIMARY KEY (subscription_id, topic)
|
||||||
|
);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_webpush_topic ON webpush_subscription_topic (topic);
|
||||||
|
CREATE TABLE IF NOT EXISTS webpush_schema_version (
|
||||||
|
id INT PRIMARY KEY,
|
||||||
|
version INT NOT NULL
|
||||||
|
);
|
||||||
|
`
|
||||||
|
|
||||||
|
pgSelectSubscriptionIDByEndpoint = `SELECT id FROM webpush_subscription WHERE endpoint = $1`
|
||||||
|
pgSelectSubscriptionCountBySubscriberIP = `SELECT COUNT(*) FROM webpush_subscription WHERE subscriber_ip = $1`
|
||||||
|
pgSelectSubscriptionsForTopicQuery = `
|
||||||
|
SELECT s.id, s.endpoint, s.key_auth, s.key_p256dh, s.user_id
|
||||||
|
FROM webpush_subscription_topic st
|
||||||
|
JOIN webpush_subscription s ON s.id = st.subscription_id
|
||||||
|
WHERE st.topic = $1
|
||||||
|
ORDER BY s.endpoint
|
||||||
|
`
|
||||||
|
pgSelectSubscriptionsExpiringSoonQuery = `
|
||||||
|
SELECT id, endpoint, key_auth, key_p256dh, user_id
|
||||||
|
FROM webpush_subscription
|
||||||
|
WHERE warned_at = 0 AND updated_at <= $1
|
||||||
|
`
|
||||||
|
pgInsertSubscriptionQuery = `
|
||||||
|
INSERT INTO webpush_subscription (id, endpoint, key_auth, key_p256dh, user_id, subscriber_ip, updated_at, warned_at)
|
||||||
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
||||||
|
ON CONFLICT (endpoint)
|
||||||
|
DO UPDATE SET key_auth = excluded.key_auth, key_p256dh = excluded.key_p256dh, user_id = excluded.user_id, subscriber_ip = excluded.subscriber_ip, updated_at = excluded.updated_at, warned_at = excluded.warned_at
|
||||||
|
`
|
||||||
|
pgUpdateSubscriptionWarningSentQuery = `UPDATE webpush_subscription SET warned_at = $1 WHERE id = $2`
|
||||||
|
pgUpdateSubscriptionUpdatedAtQuery = `UPDATE webpush_subscription SET updated_at = $1 WHERE endpoint = $2`
|
||||||
|
pgDeleteSubscriptionByEndpointQuery = `DELETE FROM webpush_subscription WHERE endpoint = $1`
|
||||||
|
pgDeleteSubscriptionByUserIDQuery = `DELETE FROM webpush_subscription WHERE user_id = $1`
|
||||||
|
pgDeleteSubscriptionByAgeQuery = `DELETE FROM webpush_subscription WHERE updated_at <= $1`
|
||||||
|
|
||||||
|
pgInsertSubscriptionTopicQuery = `INSERT INTO webpush_subscription_topic (subscription_id, topic) VALUES ($1, $2)`
|
||||||
|
pgDeleteSubscriptionTopicAllQuery = `DELETE FROM webpush_subscription_topic WHERE subscription_id = $1`
|
||||||
|
pgDeleteSubscriptionTopicWithoutSubscription = `DELETE FROM webpush_subscription_topic WHERE subscription_id NOT IN (SELECT id FROM webpush_subscription)`
|
||||||
|
)
|
||||||
|
|
||||||
|
// PostgreSQL schema management queries
|
||||||
|
const (
|
||||||
|
pgCurrentSchemaVersion = 1
|
||||||
|
pgInsertSchemaVersion = `INSERT INTO webpush_schema_version VALUES (1, $1)`
|
||||||
|
pgSelectSchemaVersionQuery = `SELECT version FROM webpush_schema_version WHERE id = 1`
|
||||||
|
)
|
||||||
|
|
||||||
|
// NewPostgresStore creates a new PostgreSQL-backed web push store.
|
||||||
|
func NewPostgresStore(dsn string) (Store, error) {
|
||||||
|
db, err := sql.Open("pgx", dsn)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err := db.Ping(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err := setupPostgresDB(db); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &commonStore{
|
||||||
|
db: db,
|
||||||
|
queries: storeQueries{
|
||||||
|
selectSubscriptionIDByEndpoint: pgSelectSubscriptionIDByEndpoint,
|
||||||
|
selectSubscriptionCountBySubscriberIP: pgSelectSubscriptionCountBySubscriberIP,
|
||||||
|
selectSubscriptionsForTopic: pgSelectSubscriptionsForTopicQuery,
|
||||||
|
selectSubscriptionsExpiringSoon: pgSelectSubscriptionsExpiringSoonQuery,
|
||||||
|
insertSubscription: pgInsertSubscriptionQuery,
|
||||||
|
updateSubscriptionWarningSent: pgUpdateSubscriptionWarningSentQuery,
|
||||||
|
updateSubscriptionUpdatedAt: pgUpdateSubscriptionUpdatedAtQuery,
|
||||||
|
deleteSubscriptionByEndpoint: pgDeleteSubscriptionByEndpointQuery,
|
||||||
|
deleteSubscriptionByUserID: pgDeleteSubscriptionByUserIDQuery,
|
||||||
|
deleteSubscriptionByAge: pgDeleteSubscriptionByAgeQuery,
|
||||||
|
insertSubscriptionTopic: pgInsertSubscriptionTopicQuery,
|
||||||
|
deleteSubscriptionTopicAll: pgDeleteSubscriptionTopicAllQuery,
|
||||||
|
deleteSubscriptionTopicWithoutSubscription: pgDeleteSubscriptionTopicWithoutSubscription,
|
||||||
|
},
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func setupPostgresDB(db *sql.DB) error {
|
||||||
|
// If 'webpush_schema_version' table does not exist, this must be a new database
|
||||||
|
rows, err := db.Query(pgSelectSchemaVersionQuery)
|
||||||
|
if err != nil {
|
||||||
|
return setupNewPostgresDB(db)
|
||||||
|
}
|
||||||
|
return rows.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func setupNewPostgresDB(db *sql.DB) error {
|
||||||
|
tx, err := db.Begin()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer tx.Rollback()
|
||||||
|
if _, err := tx.Exec(pgCreateTablesQuery); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := tx.Exec(pgInsertSchemaVersion, pgCurrentSchemaVersion); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return tx.Commit()
|
||||||
|
}
|
||||||
91
webpush/store_postgres_test.go
Normal file
91
webpush/store_postgres_test.go
Normal file
@@ -0,0 +1,91 @@
|
|||||||
|
package webpush_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
|
"net/url"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"heckel.io/ntfy/v2/util"
|
||||||
|
"heckel.io/ntfy/v2/webpush"
|
||||||
|
)
|
||||||
|
|
||||||
|
func newTestPostgresStore(t *testing.T) webpush.Store {
|
||||||
|
dsn := os.Getenv("NTFY_TEST_DATABASE_URL")
|
||||||
|
if dsn == "" {
|
||||||
|
t.Skip("NTFY_TEST_DATABASE_URL not set, skipping PostgreSQL tests")
|
||||||
|
}
|
||||||
|
// Create a unique schema for this test
|
||||||
|
schema := fmt.Sprintf("test_%s", util.RandomString(10))
|
||||||
|
setupDB, err := sql.Open("pgx", dsn)
|
||||||
|
require.Nil(t, err)
|
||||||
|
_, err = setupDB.Exec(fmt.Sprintf("CREATE SCHEMA %s", schema))
|
||||||
|
require.Nil(t, err)
|
||||||
|
require.Nil(t, setupDB.Close())
|
||||||
|
// Open store with search_path set to the new schema
|
||||||
|
u, err := url.Parse(dsn)
|
||||||
|
require.Nil(t, err)
|
||||||
|
q := u.Query()
|
||||||
|
q.Set("search_path", schema)
|
||||||
|
u.RawQuery = q.Encode()
|
||||||
|
store, err := webpush.NewPostgresStore(u.String())
|
||||||
|
require.Nil(t, err)
|
||||||
|
t.Cleanup(func() {
|
||||||
|
store.Close()
|
||||||
|
cleanDB, err := sql.Open("pgx", dsn)
|
||||||
|
if err == nil {
|
||||||
|
cleanDB.Exec(fmt.Sprintf("DROP SCHEMA %s CASCADE", schema))
|
||||||
|
cleanDB.Close()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
return store
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPostgresStoreUpsertSubscriptionSubscriptionsForTopic(t *testing.T) {
|
||||||
|
testStoreUpsertSubscriptionSubscriptionsForTopic(t, newTestPostgresStore(t))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPostgresStoreUpsertSubscriptionSubscriberIPLimitReached(t *testing.T) {
|
||||||
|
testStoreUpsertSubscriptionSubscriberIPLimitReached(t, newTestPostgresStore(t))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPostgresStoreUpsertSubscriptionUpdateTopics(t *testing.T) {
|
||||||
|
testStoreUpsertSubscriptionUpdateTopics(t, newTestPostgresStore(t))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPostgresStoreUpsertSubscriptionUpdateFields(t *testing.T) {
|
||||||
|
testStoreUpsertSubscriptionUpdateFields(t, newTestPostgresStore(t))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPostgresStoreRemoveByUserIDMultiple(t *testing.T) {
|
||||||
|
testStoreRemoveByUserIDMultiple(t, newTestPostgresStore(t))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPostgresStoreRemoveByEndpoint(t *testing.T) {
|
||||||
|
testStoreRemoveByEndpoint(t, newTestPostgresStore(t))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPostgresStoreRemoveByUserID(t *testing.T) {
|
||||||
|
testStoreRemoveByUserID(t, newTestPostgresStore(t))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPostgresStoreRemoveByUserIDEmpty(t *testing.T) {
|
||||||
|
testStoreRemoveByUserIDEmpty(t, newTestPostgresStore(t))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPostgresStoreExpiryWarningSent(t *testing.T) {
|
||||||
|
store := newTestPostgresStore(t)
|
||||||
|
testStoreExpiryWarningSent(t, store, store.SetSubscriptionUpdatedAt)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPostgresStoreExpiring(t *testing.T) {
|
||||||
|
store := newTestPostgresStore(t)
|
||||||
|
testStoreExpiring(t, store, store.SetSubscriptionUpdatedAt)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPostgresStoreRemoveExpired(t *testing.T) {
|
||||||
|
store := newTestPostgresStore(t)
|
||||||
|
testStoreRemoveExpired(t, store, store.SetSubscriptionUpdatedAt)
|
||||||
|
}
|
||||||
138
webpush/store_sqlite.go
Normal file
138
webpush/store_sqlite.go
Normal file
@@ -0,0 +1,138 @@
|
|||||||
|
package webpush
|
||||||
|
|
||||||
|
import (
|
||||||
|
"database/sql"
|
||||||
|
|
||||||
|
_ "github.com/mattn/go-sqlite3" // SQLite driver
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
sqliteCreateWebPushSubscriptionsTableQuery = `
|
||||||
|
BEGIN;
|
||||||
|
CREATE TABLE IF NOT EXISTS subscription (
|
||||||
|
id TEXT PRIMARY KEY,
|
||||||
|
endpoint TEXT NOT NULL,
|
||||||
|
key_auth TEXT NOT NULL,
|
||||||
|
key_p256dh TEXT NOT NULL,
|
||||||
|
user_id TEXT NOT NULL,
|
||||||
|
subscriber_ip TEXT NOT NULL,
|
||||||
|
updated_at INT NOT NULL,
|
||||||
|
warned_at INT NOT NULL DEFAULT 0
|
||||||
|
);
|
||||||
|
CREATE UNIQUE INDEX IF NOT EXISTS idx_endpoint ON subscription (endpoint);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_subscriber_ip ON subscription (subscriber_ip);
|
||||||
|
CREATE TABLE IF NOT EXISTS subscription_topic (
|
||||||
|
subscription_id TEXT NOT NULL,
|
||||||
|
topic TEXT NOT NULL,
|
||||||
|
PRIMARY KEY (subscription_id, topic),
|
||||||
|
FOREIGN KEY (subscription_id) REFERENCES subscription (id) ON DELETE CASCADE
|
||||||
|
);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_topic ON subscription_topic (topic);
|
||||||
|
CREATE TABLE IF NOT EXISTS schemaVersion (
|
||||||
|
id INT PRIMARY KEY,
|
||||||
|
version INT NOT NULL
|
||||||
|
);
|
||||||
|
COMMIT;
|
||||||
|
`
|
||||||
|
sqliteBuiltinStartupQueries = `
|
||||||
|
PRAGMA foreign_keys = ON;
|
||||||
|
`
|
||||||
|
|
||||||
|
sqliteSelectWebPushSubscriptionIDByEndpoint = `SELECT id FROM subscription WHERE endpoint = ?`
|
||||||
|
sqliteSelectWebPushSubscriptionCountBySubscriberIP = `SELECT COUNT(*) FROM subscription WHERE subscriber_ip = ?`
|
||||||
|
sqliteSelectWebPushSubscriptionsForTopicQuery = `
|
||||||
|
SELECT id, endpoint, key_auth, key_p256dh, user_id
|
||||||
|
FROM subscription_topic st
|
||||||
|
JOIN subscription s ON s.id = st.subscription_id
|
||||||
|
WHERE st.topic = ?
|
||||||
|
ORDER BY endpoint
|
||||||
|
`
|
||||||
|
sqliteSelectWebPushSubscriptionsExpiringSoonQuery = `
|
||||||
|
SELECT id, endpoint, key_auth, key_p256dh, user_id
|
||||||
|
FROM subscription
|
||||||
|
WHERE warned_at = 0 AND updated_at <= ?
|
||||||
|
`
|
||||||
|
sqliteInsertWebPushSubscriptionQuery = `
|
||||||
|
INSERT INTO subscription (id, endpoint, key_auth, key_p256dh, user_id, subscriber_ip, updated_at, warned_at)
|
||||||
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
||||||
|
ON CONFLICT (endpoint)
|
||||||
|
DO UPDATE SET key_auth = excluded.key_auth, key_p256dh = excluded.key_p256dh, user_id = excluded.user_id, subscriber_ip = excluded.subscriber_ip, updated_at = excluded.updated_at, warned_at = excluded.warned_at
|
||||||
|
`
|
||||||
|
sqliteUpdateWebPushSubscriptionWarningSentQuery = `UPDATE subscription SET warned_at = ? WHERE id = ?`
|
||||||
|
sqliteUpdateWebPushSubscriptionUpdatedAtQuery = `UPDATE subscription SET updated_at = ? WHERE endpoint = ?`
|
||||||
|
sqliteDeleteWebPushSubscriptionByEndpointQuery = `DELETE FROM subscription WHERE endpoint = ?`
|
||||||
|
sqliteDeleteWebPushSubscriptionByUserIDQuery = `DELETE FROM subscription WHERE user_id = ?`
|
||||||
|
sqliteDeleteWebPushSubscriptionByAgeQuery = `DELETE FROM subscription WHERE updated_at <= ?` // Full table scan!
|
||||||
|
|
||||||
|
sqliteInsertWebPushSubscriptionTopicQuery = `INSERT INTO subscription_topic (subscription_id, topic) VALUES (?, ?)`
|
||||||
|
sqliteDeleteWebPushSubscriptionTopicAllQuery = `DELETE FROM subscription_topic WHERE subscription_id = ?`
|
||||||
|
sqliteDeleteWebPushSubscriptionTopicWithoutSubscription = `DELETE FROM subscription_topic WHERE subscription_id NOT IN (SELECT id FROM subscription)`
|
||||||
|
)
|
||||||
|
|
||||||
|
// SQLite schema management queries
|
||||||
|
const (
|
||||||
|
sqliteCurrentWebPushSchemaVersion = 1
|
||||||
|
sqliteInsertWebPushSchemaVersion = `INSERT INTO schemaVersion VALUES (1, ?)`
|
||||||
|
sqliteSelectWebPushSchemaVersionQuery = `SELECT version FROM schemaVersion WHERE id = 1`
|
||||||
|
)
|
||||||
|
|
||||||
|
// NewSQLiteStore creates a new SQLite-backed web push store.
|
||||||
|
func NewSQLiteStore(filename, startupQueries string) (Store, error) {
|
||||||
|
db, err := sql.Open("sqlite3", filename)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err := setupSQLiteWebPushDB(db); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err := runSQLiteWebPushStartupQueries(db, startupQueries); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &commonStore{
|
||||||
|
db: db,
|
||||||
|
queries: storeQueries{
|
||||||
|
selectSubscriptionIDByEndpoint: sqliteSelectWebPushSubscriptionIDByEndpoint,
|
||||||
|
selectSubscriptionCountBySubscriberIP: sqliteSelectWebPushSubscriptionCountBySubscriberIP,
|
||||||
|
selectSubscriptionsForTopic: sqliteSelectWebPushSubscriptionsForTopicQuery,
|
||||||
|
selectSubscriptionsExpiringSoon: sqliteSelectWebPushSubscriptionsExpiringSoonQuery,
|
||||||
|
insertSubscription: sqliteInsertWebPushSubscriptionQuery,
|
||||||
|
updateSubscriptionWarningSent: sqliteUpdateWebPushSubscriptionWarningSentQuery,
|
||||||
|
updateSubscriptionUpdatedAt: sqliteUpdateWebPushSubscriptionUpdatedAtQuery,
|
||||||
|
deleteSubscriptionByEndpoint: sqliteDeleteWebPushSubscriptionByEndpointQuery,
|
||||||
|
deleteSubscriptionByUserID: sqliteDeleteWebPushSubscriptionByUserIDQuery,
|
||||||
|
deleteSubscriptionByAge: sqliteDeleteWebPushSubscriptionByAgeQuery,
|
||||||
|
insertSubscriptionTopic: sqliteInsertWebPushSubscriptionTopicQuery,
|
||||||
|
deleteSubscriptionTopicAll: sqliteDeleteWebPushSubscriptionTopicAllQuery,
|
||||||
|
deleteSubscriptionTopicWithoutSubscription: sqliteDeleteWebPushSubscriptionTopicWithoutSubscription,
|
||||||
|
},
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func setupSQLiteWebPushDB(db *sql.DB) error {
|
||||||
|
// If 'schemaVersion' table does not exist, this must be a new database
|
||||||
|
rows, err := db.Query(sqliteSelectWebPushSchemaVersionQuery)
|
||||||
|
if err != nil {
|
||||||
|
return setupNewSQLiteWebPushDB(db)
|
||||||
|
}
|
||||||
|
return rows.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func setupNewSQLiteWebPushDB(db *sql.DB) error {
|
||||||
|
if _, err := db.Exec(sqliteCreateWebPushSubscriptionsTableQuery); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := db.Exec(sqliteInsertWebPushSchemaVersion, sqliteCurrentWebPushSchemaVersion); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func runSQLiteWebPushStartupQueries(db *sql.DB, startupQueries string) error {
|
||||||
|
if _, err := db.Exec(startupQueries); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := db.Exec(sqliteBuiltinStartupQueries); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
63
webpush/store_sqlite_test.go
Normal file
63
webpush/store_sqlite_test.go
Normal file
@@ -0,0 +1,63 @@
|
|||||||
|
package webpush_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"path/filepath"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"heckel.io/ntfy/v2/webpush"
|
||||||
|
)
|
||||||
|
|
||||||
|
func newTestSQLiteStore(t *testing.T) webpush.Store {
|
||||||
|
store, err := webpush.NewSQLiteStore(filepath.Join(t.TempDir(), "webpush.db"), "")
|
||||||
|
require.Nil(t, err)
|
||||||
|
t.Cleanup(func() { store.Close() })
|
||||||
|
return store
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSQLiteStoreUpsertSubscriptionSubscriptionsForTopic(t *testing.T) {
|
||||||
|
testStoreUpsertSubscriptionSubscriptionsForTopic(t, newTestSQLiteStore(t))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSQLiteStoreUpsertSubscriptionSubscriberIPLimitReached(t *testing.T) {
|
||||||
|
testStoreUpsertSubscriptionSubscriberIPLimitReached(t, newTestSQLiteStore(t))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSQLiteStoreUpsertSubscriptionUpdateTopics(t *testing.T) {
|
||||||
|
testStoreUpsertSubscriptionUpdateTopics(t, newTestSQLiteStore(t))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSQLiteStoreUpsertSubscriptionUpdateFields(t *testing.T) {
|
||||||
|
testStoreUpsertSubscriptionUpdateFields(t, newTestSQLiteStore(t))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSQLiteStoreRemoveByUserIDMultiple(t *testing.T) {
|
||||||
|
testStoreRemoveByUserIDMultiple(t, newTestSQLiteStore(t))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSQLiteStoreRemoveByEndpoint(t *testing.T) {
|
||||||
|
testStoreRemoveByEndpoint(t, newTestSQLiteStore(t))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSQLiteStoreRemoveByUserID(t *testing.T) {
|
||||||
|
testStoreRemoveByUserID(t, newTestSQLiteStore(t))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSQLiteStoreRemoveByUserIDEmpty(t *testing.T) {
|
||||||
|
testStoreRemoveByUserIDEmpty(t, newTestSQLiteStore(t))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSQLiteStoreExpiryWarningSent(t *testing.T) {
|
||||||
|
store := newTestSQLiteStore(t)
|
||||||
|
testStoreExpiryWarningSent(t, store, store.SetSubscriptionUpdatedAt)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSQLiteStoreExpiring(t *testing.T) {
|
||||||
|
store := newTestSQLiteStore(t)
|
||||||
|
testStoreExpiring(t, store, store.SetSubscriptionUpdatedAt)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSQLiteStoreRemoveExpired(t *testing.T) {
|
||||||
|
store := newTestSQLiteStore(t)
|
||||||
|
testStoreRemoveExpired(t, store, store.SetSubscriptionUpdatedAt)
|
||||||
|
}
|
||||||
213
webpush/store_test.go
Normal file
213
webpush/store_test.go
Normal file
@@ -0,0 +1,213 @@
|
|||||||
|
package webpush_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"net/netip"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"heckel.io/ntfy/v2/webpush"
|
||||||
|
)
|
||||||
|
|
||||||
|
const testWebPushEndpoint = "https://updates.push.services.mozilla.com/wpush/v1/AAABBCCCDDEEEFFF"
|
||||||
|
|
||||||
|
func testStoreUpsertSubscriptionSubscriptionsForTopic(t *testing.T, store webpush.Store) {
|
||||||
|
require.Nil(t, store.UpsertSubscription(testWebPushEndpoint, "auth-key", "p256dh-key", "u_1234", netip.MustParseAddr("1.2.3.4"), []string{"test-topic", "mytopic"}))
|
||||||
|
|
||||||
|
subs, err := store.SubscriptionsForTopic("test-topic")
|
||||||
|
require.Nil(t, err)
|
||||||
|
require.Len(t, subs, 1)
|
||||||
|
require.Equal(t, subs[0].Endpoint, testWebPushEndpoint)
|
||||||
|
require.Equal(t, subs[0].P256dh, "p256dh-key")
|
||||||
|
require.Equal(t, subs[0].Auth, "auth-key")
|
||||||
|
require.Equal(t, subs[0].UserID, "u_1234")
|
||||||
|
|
||||||
|
subs2, err := store.SubscriptionsForTopic("mytopic")
|
||||||
|
require.Nil(t, err)
|
||||||
|
require.Len(t, subs2, 1)
|
||||||
|
require.Equal(t, subs[0].Endpoint, subs2[0].Endpoint)
|
||||||
|
}
|
||||||
|
|
||||||
|
func testStoreUpsertSubscriptionSubscriberIPLimitReached(t *testing.T, store webpush.Store) {
|
||||||
|
// Insert 10 subscriptions with the same IP address
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
endpoint := fmt.Sprintf(testWebPushEndpoint+"%d", i)
|
||||||
|
require.Nil(t, store.UpsertSubscription(endpoint, "auth-key", "p256dh-key", "u_1234", netip.MustParseAddr("1.2.3.4"), []string{"test-topic", "mytopic"}))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Another one for the same endpoint should be fine
|
||||||
|
require.Nil(t, store.UpsertSubscription(testWebPushEndpoint+"0", "auth-key", "p256dh-key", "u_1234", netip.MustParseAddr("1.2.3.4"), []string{"test-topic", "mytopic"}))
|
||||||
|
|
||||||
|
// But with a different endpoint it should fail
|
||||||
|
require.Equal(t, webpush.ErrWebPushTooManySubscriptions, store.UpsertSubscription(testWebPushEndpoint+"11", "auth-key", "p256dh-key", "u_1234", netip.MustParseAddr("1.2.3.4"), []string{"test-topic", "mytopic"}))
|
||||||
|
|
||||||
|
// But with a different IP address it should be fine again
|
||||||
|
require.Nil(t, store.UpsertSubscription(testWebPushEndpoint+"99", "auth-key", "p256dh-key", "u_1234", netip.MustParseAddr("9.9.9.9"), []string{"test-topic", "mytopic"}))
|
||||||
|
}
|
||||||
|
|
||||||
|
func testStoreUpsertSubscriptionUpdateTopics(t *testing.T, store webpush.Store) {
|
||||||
|
// Insert subscription with two topics, and another with one topic
|
||||||
|
require.Nil(t, store.UpsertSubscription(testWebPushEndpoint+"0", "auth-key", "p256dh-key", "u_1234", netip.MustParseAddr("1.2.3.4"), []string{"topic1", "topic2"}))
|
||||||
|
require.Nil(t, store.UpsertSubscription(testWebPushEndpoint+"1", "auth-key", "p256dh-key", "", netip.MustParseAddr("9.9.9.9"), []string{"topic1"}))
|
||||||
|
|
||||||
|
subs, err := store.SubscriptionsForTopic("topic1")
|
||||||
|
require.Nil(t, err)
|
||||||
|
require.Len(t, subs, 2)
|
||||||
|
require.Equal(t, testWebPushEndpoint+"0", subs[0].Endpoint)
|
||||||
|
require.Equal(t, testWebPushEndpoint+"1", subs[1].Endpoint)
|
||||||
|
|
||||||
|
subs, err = store.SubscriptionsForTopic("topic2")
|
||||||
|
require.Nil(t, err)
|
||||||
|
require.Len(t, subs, 1)
|
||||||
|
require.Equal(t, testWebPushEndpoint+"0", subs[0].Endpoint)
|
||||||
|
|
||||||
|
// Update the first subscription to have only one topic
|
||||||
|
require.Nil(t, store.UpsertSubscription(testWebPushEndpoint+"0", "auth-key", "p256dh-key", "u_1234", netip.MustParseAddr("1.2.3.4"), []string{"topic1"}))
|
||||||
|
|
||||||
|
subs, err = store.SubscriptionsForTopic("topic1")
|
||||||
|
require.Nil(t, err)
|
||||||
|
require.Len(t, subs, 2)
|
||||||
|
require.Equal(t, testWebPushEndpoint+"0", subs[0].Endpoint)
|
||||||
|
|
||||||
|
subs, err = store.SubscriptionsForTopic("topic2")
|
||||||
|
require.Nil(t, err)
|
||||||
|
require.Len(t, subs, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
func testStoreUpsertSubscriptionUpdateFields(t *testing.T, store webpush.Store) {
|
||||||
|
// Insert a subscription
|
||||||
|
require.Nil(t, store.UpsertSubscription(testWebPushEndpoint, "auth-key", "p256dh-key", "u_1234", netip.MustParseAddr("1.2.3.4"), []string{"topic1"}))
|
||||||
|
|
||||||
|
subs, err := store.SubscriptionsForTopic("topic1")
|
||||||
|
require.Nil(t, err)
|
||||||
|
require.Len(t, subs, 1)
|
||||||
|
require.Equal(t, "auth-key", subs[0].Auth)
|
||||||
|
require.Equal(t, "p256dh-key", subs[0].P256dh)
|
||||||
|
require.Equal(t, "u_1234", subs[0].UserID)
|
||||||
|
|
||||||
|
// Re-upsert the same endpoint with different auth, p256dh, and userID
|
||||||
|
require.Nil(t, store.UpsertSubscription(testWebPushEndpoint, "new-auth", "new-p256dh", "u_5678", netip.MustParseAddr("1.2.3.4"), []string{"topic1"}))
|
||||||
|
|
||||||
|
subs, err = store.SubscriptionsForTopic("topic1")
|
||||||
|
require.Nil(t, err)
|
||||||
|
require.Len(t, subs, 1)
|
||||||
|
require.Equal(t, testWebPushEndpoint, subs[0].Endpoint)
|
||||||
|
require.Equal(t, "new-auth", subs[0].Auth)
|
||||||
|
require.Equal(t, "new-p256dh", subs[0].P256dh)
|
||||||
|
require.Equal(t, "u_5678", subs[0].UserID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func testStoreRemoveByUserIDMultiple(t *testing.T, store webpush.Store) {
|
||||||
|
// Insert two subscriptions for u_1234 and one for u_5678
|
||||||
|
require.Nil(t, store.UpsertSubscription(testWebPushEndpoint+"0", "auth-key", "p256dh-key", "u_1234", netip.MustParseAddr("1.2.3.4"), []string{"topic1"}))
|
||||||
|
require.Nil(t, store.UpsertSubscription(testWebPushEndpoint+"1", "auth-key", "p256dh-key", "u_1234", netip.MustParseAddr("1.2.3.4"), []string{"topic1"}))
|
||||||
|
require.Nil(t, store.UpsertSubscription(testWebPushEndpoint+"2", "auth-key", "p256dh-key", "u_5678", netip.MustParseAddr("9.9.9.9"), []string{"topic1"}))
|
||||||
|
|
||||||
|
subs, err := store.SubscriptionsForTopic("topic1")
|
||||||
|
require.Nil(t, err)
|
||||||
|
require.Len(t, subs, 3)
|
||||||
|
|
||||||
|
// Remove all subscriptions for u_1234
|
||||||
|
require.Nil(t, store.RemoveSubscriptionsByUserID("u_1234"))
|
||||||
|
|
||||||
|
// Only u_5678's subscription should remain
|
||||||
|
subs, err = store.SubscriptionsForTopic("topic1")
|
||||||
|
require.Nil(t, err)
|
||||||
|
require.Len(t, subs, 1)
|
||||||
|
require.Equal(t, testWebPushEndpoint+"2", subs[0].Endpoint)
|
||||||
|
require.Equal(t, "u_5678", subs[0].UserID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func testStoreRemoveByEndpoint(t *testing.T, store webpush.Store) {
|
||||||
|
// Insert subscription with two topics
|
||||||
|
require.Nil(t, store.UpsertSubscription(testWebPushEndpoint, "auth-key", "p256dh-key", "u_1234", netip.MustParseAddr("1.2.3.4"), []string{"topic1", "topic2"}))
|
||||||
|
subs, err := store.SubscriptionsForTopic("topic1")
|
||||||
|
require.Nil(t, err)
|
||||||
|
require.Len(t, subs, 1)
|
||||||
|
|
||||||
|
// And remove it again
|
||||||
|
require.Nil(t, store.RemoveSubscriptionsByEndpoint(testWebPushEndpoint))
|
||||||
|
subs, err = store.SubscriptionsForTopic("topic1")
|
||||||
|
require.Nil(t, err)
|
||||||
|
require.Len(t, subs, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
func testStoreRemoveByUserID(t *testing.T, store webpush.Store) {
|
||||||
|
// Insert subscription with two topics
|
||||||
|
require.Nil(t, store.UpsertSubscription(testWebPushEndpoint, "auth-key", "p256dh-key", "u_1234", netip.MustParseAddr("1.2.3.4"), []string{"topic1", "topic2"}))
|
||||||
|
subs, err := store.SubscriptionsForTopic("topic1")
|
||||||
|
require.Nil(t, err)
|
||||||
|
require.Len(t, subs, 1)
|
||||||
|
|
||||||
|
// And remove it again
|
||||||
|
require.Nil(t, store.RemoveSubscriptionsByUserID("u_1234"))
|
||||||
|
subs, err = store.SubscriptionsForTopic("topic1")
|
||||||
|
require.Nil(t, err)
|
||||||
|
require.Len(t, subs, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
func testStoreRemoveByUserIDEmpty(t *testing.T, store webpush.Store) {
|
||||||
|
require.Equal(t, webpush.ErrWebPushUserIDCannotBeEmpty, store.RemoveSubscriptionsByUserID(""))
|
||||||
|
}
|
||||||
|
|
||||||
|
func testStoreExpiryWarningSent(t *testing.T, store webpush.Store, setUpdatedAt func(endpoint string, updatedAt int64) error) {
|
||||||
|
// Insert subscription with two topics
|
||||||
|
require.Nil(t, store.UpsertSubscription(testWebPushEndpoint, "auth-key", "p256dh-key", "u_1234", netip.MustParseAddr("1.2.3.4"), []string{"topic1", "topic2"}))
|
||||||
|
|
||||||
|
// Set updated_at to the past so it shows up as expiring
|
||||||
|
require.Nil(t, setUpdatedAt(testWebPushEndpoint, time.Now().Add(-8*24*time.Hour).Unix()))
|
||||||
|
|
||||||
|
// Verify subscription appears in expiring list (warned_at == 0)
|
||||||
|
subs, err := store.SubscriptionsExpiring(7 * 24 * time.Hour)
|
||||||
|
require.Nil(t, err)
|
||||||
|
require.Len(t, subs, 1)
|
||||||
|
require.Equal(t, testWebPushEndpoint, subs[0].Endpoint)
|
||||||
|
|
||||||
|
// Mark them as warning sent
|
||||||
|
require.Nil(t, store.MarkExpiryWarningSent(subs))
|
||||||
|
|
||||||
|
// Verify subscription no longer appears in expiring list (warned_at > 0)
|
||||||
|
subs, err = store.SubscriptionsExpiring(7 * 24 * time.Hour)
|
||||||
|
require.Nil(t, err)
|
||||||
|
require.Len(t, subs, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
func testStoreExpiring(t *testing.T, store webpush.Store, setUpdatedAt func(endpoint string, updatedAt int64) error) {
|
||||||
|
// Insert subscription with two topics
|
||||||
|
require.Nil(t, store.UpsertSubscription(testWebPushEndpoint, "auth-key", "p256dh-key", "u_1234", netip.MustParseAddr("1.2.3.4"), []string{"topic1", "topic2"}))
|
||||||
|
subs, err := store.SubscriptionsForTopic("topic1")
|
||||||
|
require.Nil(t, err)
|
||||||
|
require.Len(t, subs, 1)
|
||||||
|
|
||||||
|
// Fake-mark them as soon-to-expire
|
||||||
|
require.Nil(t, setUpdatedAt(testWebPushEndpoint, time.Now().Add(-8*24*time.Hour).Unix()))
|
||||||
|
|
||||||
|
// Should not be cleaned up yet
|
||||||
|
require.Nil(t, store.RemoveExpiredSubscriptions(9*24*time.Hour))
|
||||||
|
|
||||||
|
// Run expiration
|
||||||
|
subs, err = store.SubscriptionsExpiring(7 * 24 * time.Hour)
|
||||||
|
require.Nil(t, err)
|
||||||
|
require.Len(t, subs, 1)
|
||||||
|
require.Equal(t, testWebPushEndpoint, subs[0].Endpoint)
|
||||||
|
}
|
||||||
|
|
||||||
|
func testStoreRemoveExpired(t *testing.T, store webpush.Store, setUpdatedAt func(endpoint string, updatedAt int64) error) {
|
||||||
|
// Insert subscription with two topics
|
||||||
|
require.Nil(t, store.UpsertSubscription(testWebPushEndpoint, "auth-key", "p256dh-key", "u_1234", netip.MustParseAddr("1.2.3.4"), []string{"topic1", "topic2"}))
|
||||||
|
subs, err := store.SubscriptionsForTopic("topic1")
|
||||||
|
require.Nil(t, err)
|
||||||
|
require.Len(t, subs, 1)
|
||||||
|
|
||||||
|
// Fake-mark them as expired
|
||||||
|
require.Nil(t, setUpdatedAt(testWebPushEndpoint, time.Now().Add(-10*24*time.Hour).Unix()))
|
||||||
|
|
||||||
|
// Run expiration
|
||||||
|
require.Nil(t, store.RemoveExpiredSubscriptions(9*24*time.Hour))
|
||||||
|
|
||||||
|
// List again, should be 0
|
||||||
|
subs, err = store.SubscriptionsForTopic("topic1")
|
||||||
|
require.Nil(t, err)
|
||||||
|
require.Len(t, subs, 0)
|
||||||
|
}
|
||||||
21
webpush/types.go
Normal file
21
webpush/types.go
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
package webpush
|
||||||
|
|
||||||
|
import "heckel.io/ntfy/v2/log"
|
||||||
|
|
||||||
|
// Subscription represents a web push subscription.
|
||||||
|
type Subscription struct {
|
||||||
|
ID string
|
||||||
|
Endpoint string
|
||||||
|
Auth string
|
||||||
|
P256dh string
|
||||||
|
UserID string
|
||||||
|
}
|
||||||
|
|
||||||
|
// Context returns the logging context for the subscription.
|
||||||
|
func (w *Subscription) Context() log.Context {
|
||||||
|
return map[string]any{
|
||||||
|
"web_push_subscription_id": w.ID,
|
||||||
|
"web_push_subscription_user_id": w.UserID,
|
||||||
|
"web_push_subscription_endpoint": w.Endpoint,
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user