Remove Rate-Topics

This commit is contained in:
binwiederhier
2024-03-07 10:07:42 -05:00
parent 1d2b759dc0
commit aecf0a5f25
12 changed files with 1839 additions and 807 deletions

View File

@@ -743,8 +743,8 @@ func (s *Server) handlePublishInternal(r *http.Request, v *visitor) (*message, e
return nil, e.With(t)
}
if unifiedpush && s.config.VisitorSubscriberRateLimiting && t.RateVisitor() == nil {
// UnifiedPush clients must subscribe before publishing to allow proper subscriber-based rate limiting (see
// Rate-Topics header). The 5xx response is because some app servers (in particular Mastodon) will remove
// UnifiedPush clients must subscribe before publishing to allow proper subscriber-based rate limiting.
// The 5xx response is because some app servers (in particular Mastodon) will remove
// the subscription as invalid if any 400-499 code (except 429/408) is returned.
// See https://github.com/mastodon/mastodon/blob/730bb3e211a84a2f30e3e2bbeae3f77149824a68/app/workers/web/push_notification_worker.rb#L35-L46
return nil, errHTTPInsufficientStorageUnifiedPush.With(t)
@@ -1182,7 +1182,7 @@ func (s *Server) handleSubscribeHTTP(w http.ResponseWriter, r *http.Request, v *
if err != nil {
return err
}
poll, since, scheduled, filters, rateTopics, err := parseSubscribeParams(r)
poll, since, scheduled, filters, err := parseSubscribeParams(r)
if err != nil {
return err
}
@@ -1212,7 +1212,7 @@ func (s *Server) handleSubscribeHTTP(w http.ResponseWriter, r *http.Request, v *
}
return nil
}
if err := s.maybeSetRateVisitors(r, v, topics, rateTopics); err != nil {
if err := s.maybeSetRateVisitors(r, v, topics); err != nil {
return err
}
w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
@@ -1278,7 +1278,7 @@ func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *vi
if err != nil {
return err
}
poll, since, scheduled, filters, rateTopics, err := parseSubscribeParams(r)
poll, since, scheduled, filters, err := parseSubscribeParams(r)
if err != nil {
return err
}
@@ -1364,7 +1364,7 @@ func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *vi
}
return conn.WriteJSON(msg)
}
if err := s.maybeSetRateVisitors(r, v, topics, rateTopics); err != nil {
if err := s.maybeSetRateVisitors(r, v, topics); err != nil {
return err
}
w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
@@ -1397,7 +1397,7 @@ func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *vi
return err
}
func parseSubscribeParams(r *http.Request) (poll bool, since sinceMarker, scheduled bool, filters *queryFilter, rateTopics []string, err error) {
func parseSubscribeParams(r *http.Request) (poll bool, since sinceMarker, scheduled bool, filters *queryFilter, err error) {
poll = readBoolParam(r, false, "x-poll", "poll", "po")
scheduled = readBoolParam(r, false, "x-scheduled", "scheduled", "sched")
since, err = parseSince(r, poll)
@@ -1408,7 +1408,6 @@ func parseSubscribeParams(r *http.Request) (poll bool, since sinceMarker, schedu
if err != nil {
return
}
rateTopics = readCommaSeparatedParam(r, "x-rate-topics", "rate-topics")
return
}
@@ -1420,9 +1419,8 @@ func parseSubscribeParams(r *http.Request) (poll bool, since sinceMarker, schedu
// - or the topic is reserved, and v.user is the owner
// - or the topic is not reserved, and v.user has write access
//
// Note: This TEMPORARILY also registers all topics starting with "up" (= UnifiedPush). This is to ease the transition
// until the Android app will send the "Rate-Topics" header.
func (s *Server) maybeSetRateVisitors(r *http.Request, v *visitor, topics []*topic, rateTopics []string) error {
// This only applies to UnifiedPush topics ("up...").
func (s *Server) maybeSetRateVisitors(r *http.Request, v *visitor, topics []*topic) error {
// Bail out if not enabled
if !s.config.VisitorSubscriberRateLimiting {
return nil
@@ -1431,7 +1429,7 @@ func (s *Server) maybeSetRateVisitors(r *http.Request, v *visitor, topics []*top
// Make a list of topics that we'll actually set the RateVisitor on
eligibleRateTopics := make([]*topic, 0)
for _, t := range topics {
if (strings.HasPrefix(t.ID, unifiedPushTopicPrefix) && len(t.ID) == unifiedPushTopicLength) || util.Contains(rateTopics, t.ID) {
if strings.HasPrefix(t.ID, unifiedPushTopicPrefix) && len(t.ID) == unifiedPushTopicLength {
eligibleRateTopics = append(eligibleRateTopics, t)
}
}

View File

@@ -277,15 +277,14 @@
# Rate limiting: Enable subscriber-based rate limiting (mostly used for UnifiedPush)
#
# If enabled, subscribers may opt to have published messages counted against their own rate limits, as opposed
# to the publisher's rate limits. This is especially useful to increase the amount of messages that high-volume
# publishers (e.g. Matrix/Mastodon servers) are allowed to send.
# If subscriber-based rate limiting is enabled, messages published on UnifiedPush topics** (topics starting with "up")
# will be counted towards the "rate visitor" of the topic. A "rate visitor" is the first subscriber to the topic.
#
# Once enabled, a client may send a "Rate-Topics: <topic1>,<topic2>,..." header when subscribing to topics via
# HTTP stream, or websockets, thereby registering itself as the "rate visitor", i.e. the visitor whose rate limits
# to use when publishing on this topic. Note: Setting the rate visitor requires READ-WRITE permission on the topic.
# Once enabled, a client subscribing to UnifiedPush topics via HTTP stream, or websockets, will be automatically registered as
# a "rate visitor", i.e. the visitor whose rate limits will be used when publishing on this topic. Note that setting the rate visitor
# requires **read-write permission** on the topic.
#
# UnifiedPush only: If this setting is enabled, publishing to UnifiedPush topics will lead to a HTTP 507 response if
# If this setting is enabled, publishing to UnifiedPush topics will lead to a HTTP 507 response if
# no "rate visitor" has been previously registered. This is to avoid burning the publisher's "visitor-message-daily-limit".
#
# visitor-subscriber-rate-limiting: false

View File

@@ -1346,9 +1346,7 @@ func TestServer_PublishUnifiedPushBinary_AndPoll(t *testing.T) {
s := newTestServer(t, newTestConfig(t))
// Register a UnifiedPush subscriber
response := request(t, s, "GET", "/up123456789012/json?poll=1", "", map[string]string{
"Rate-Topics": "up123456789012",
})
response := request(t, s, "GET", "/up123456789012/json?poll=1", "", nil)
require.Equal(t, 200, response.Code)
// Publish message to topic
@@ -1379,9 +1377,7 @@ func TestServer_PublishUnifiedPushBinary_Truncated(t *testing.T) {
s := newTestServer(t, newTestConfig(t))
// Register a UnifiedPush subscriber
response := request(t, s, "GET", "/mytopic/json?poll=1", "", map[string]string{
"Rate-Topics": "mytopic",
})
response := request(t, s, "GET", "/mytopic/json?poll=1", "", nil)
require.Equal(t, 200, response.Code)
// Publish message to topic
@@ -1400,9 +1396,7 @@ func TestServer_PublishUnifiedPushText(t *testing.T) {
s := newTestServer(t, newTestConfig(t))
// Register a UnifiedPush subscriber
response := request(t, s, "GET", "/mytopic/json?poll=1", "", map[string]string{
"Rate-Topics": "mytopic",
})
response := request(t, s, "GET", "/mytopic/json?poll=1", "", nil)
require.Equal(t, 200, response.Code)
// Publish UnifiedPush text message
@@ -1434,9 +1428,7 @@ func TestServer_MatrixGateway_Discovery_Failure_Unconfigured(t *testing.T) {
func TestServer_MatrixGateway_Push_Success(t *testing.T) {
s := newTestServer(t, newTestConfig(t))
response := request(t, s, "GET", "/mytopic/json?poll=1", "", map[string]string{
"Rate-Topics": "mytopic", // Register first!
})
response := request(t, s, "GET", "/mytopic/json?poll=1", "", nil)
require.Equal(t, 200, response.Code)
notification := `{"notification":{"devices":[{"pushkey":"http://127.0.0.1:12345/mytopic?up=1"}]}}`
@@ -2266,16 +2258,14 @@ func TestServer_SubscriberRateLimiting_Success(t *testing.T) {
c.VisitorSubscriberRateLimiting = true
s := newTestServer(t, c)
// "Register" visitor 1.2.3.4 to topic "subscriber1topic" as a rate limit visitor
// "Register" visitor 1.2.3.4 to topic "upAAAAAAAAAAAA" as a rate limit visitor
subscriber1Fn := func(r *http.Request) {
r.RemoteAddr = "1.2.3.4"
}
rr := request(t, s, "GET", "/subscriber1topic/json?poll=1", "", map[string]string{
"Rate-Topics": "subscriber1topic",
}, subscriber1Fn)
rr := request(t, s, "GET", "/upAAAAAAAAAAAA/json?poll=1", "", nil, subscriber1Fn)
require.Equal(t, 200, rr.Code)
require.Equal(t, "", rr.Body.String())
require.Equal(t, "1.2.3.4", s.topics["subscriber1topic"].rateVisitor.ip.String())
require.Equal(t, "1.2.3.4", s.topics["upAAAAAAAAAAAA"].rateVisitor.ip.String())
// "Register" visitor 8.7.7.1 to topic "up012345678912" as a rate limit visitor (implicitly via topic name)
subscriber2Fn := func(r *http.Request) {
@@ -2289,10 +2279,10 @@ func TestServer_SubscriberRateLimiting_Success(t *testing.T) {
// Publish 2 messages to "subscriber1topic" as visitor 9.9.9.9. It'd be 3 normally, but the
// GET request before is also counted towards the request limiter.
for i := 0; i < 2; i++ {
rr := request(t, s, "PUT", "/subscriber1topic", "some message", nil)
rr := request(t, s, "PUT", "/upAAAAAAAAAAAA", "some message", nil)
require.Equal(t, 200, rr.Code)
}
rr = request(t, s, "PUT", "/subscriber1topic", "some message", nil)
rr = request(t, s, "PUT", "/upAAAAAAAAAAAA", "some message", nil)
require.Equal(t, 429, rr.Code)
// Publish another 2 messages to "up012345678912" as visitor 9.9.9.9
@@ -2325,14 +2315,12 @@ func TestServer_SubscriberRateLimiting_NotEnabled_Failed(t *testing.T) {
// Subscriber rate limiting is disabled!
// Registering visitor 1.2.3.4 to topic has no effect
rr := request(t, s, "GET", "/subscriber1topic/json?poll=1", "", map[string]string{
"Rate-Topics": "subscriber1topic",
}, func(r *http.Request) {
rr := request(t, s, "GET", "/upAAAAAAAAAAAA/json?poll=1", "", nil, func(r *http.Request) {
r.RemoteAddr = "1.2.3.4"
})
require.Equal(t, 200, rr.Code)
require.Equal(t, "", rr.Body.String())
require.Nil(t, s.topics["subscriber1topic"].rateVisitor)
require.Nil(t, s.topics["upAAAAAAAAAAAA"].rateVisitor)
// Registering visitor 8.7.7.1 to topic has no effect
rr = request(t, s, "GET", "/up012345678912/json?poll=1", "", nil, func(r *http.Request) {
@@ -2342,7 +2330,7 @@ func TestServer_SubscriberRateLimiting_NotEnabled_Failed(t *testing.T) {
require.Equal(t, "", rr.Body.String())
require.Nil(t, s.topics["up012345678912"].rateVisitor)
// Publish 3 messages to "subscriber1topic" as visitor 9.9.9.9
// Publish 3 messages to "upAAAAAAAAAAAA" as visitor 9.9.9.9
for i := 0; i < 3; i++ {
rr := request(t, s, "PUT", "/subscriber1topic", "some message", nil)
require.Equal(t, 200, rr.Code)
@@ -2415,80 +2403,30 @@ func TestServer_SubscriberRateLimiting_VisitorExpiration(t *testing.T) {
subscriberFn := func(r *http.Request) {
r.RemoteAddr = "1.2.3.4"
}
rr := request(t, s, "GET", "/mytopic/json?poll=1", "", map[string]string{
"rate-topics": "mytopic",
}, subscriberFn)
rr := request(t, s, "GET", "/upAAAAAAAAAAAA/json?poll=1", "", nil, subscriberFn)
require.Equal(t, 200, rr.Code)
require.Equal(t, "1.2.3.4", s.topics["mytopic"].rateVisitor.ip.String())
require.Equal(t, s.visitors["ip:1.2.3.4"], s.topics["mytopic"].rateVisitor)
require.Equal(t, "1.2.3.4", s.topics["upAAAAAAAAAAAA"].rateVisitor.ip.String())
require.Equal(t, s.visitors["ip:1.2.3.4"], s.topics["upAAAAAAAAAAAA"].rateVisitor)
// Publish message, observe rate visitor tokens being decreased
response := request(t, s, "POST", "/mytopic", "some message", nil)
response := request(t, s, "POST", "/upAAAAAAAAAAAA", "some message", nil)
require.Equal(t, 200, response.Code)
require.Equal(t, int64(0), s.visitors["ip:9.9.9.9"].messagesLimiter.Value())
require.Equal(t, int64(1), s.topics["mytopic"].rateVisitor.messagesLimiter.Value())
require.Equal(t, s.visitors["ip:1.2.3.4"], s.topics["mytopic"].rateVisitor)
require.Equal(t, int64(1), s.topics["upAAAAAAAAAAAA"].rateVisitor.messagesLimiter.Value())
require.Equal(t, s.visitors["ip:1.2.3.4"], s.topics["upAAAAAAAAAAAA"].rateVisitor)
// Expire visitor
s.visitors["ip:1.2.3.4"].seen = time.Now().Add(-1 * 25 * time.Hour)
s.pruneVisitors()
// Publish message again, observe that rateVisitor is not used anymore and is reset
response = request(t, s, "POST", "/mytopic", "some message", nil)
response = request(t, s, "POST", "/upAAAAAAAAAAAA", "some message", nil)
require.Equal(t, 200, response.Code)
require.Equal(t, int64(1), s.visitors["ip:9.9.9.9"].messagesLimiter.Value())
require.Nil(t, s.topics["mytopic"].rateVisitor)
require.Nil(t, s.topics["upAAAAAAAAAAAA"].rateVisitor)
require.Nil(t, s.visitors["ip:1.2.3.4"])
}
func TestServer_SubscriberRateLimiting_ProtectedTopics(t *testing.T) {
c := newTestConfigWithAuthFile(t)
c.AuthDefault = user.PermissionDenyAll
c.VisitorSubscriberRateLimiting = true
s := newTestServer(t, c)
// Create some ACLs
require.Nil(t, s.userManager.AddTier(&user.Tier{
Code: "test",
MessageLimit: 5,
}))
require.Nil(t, s.userManager.AddUser("ben", "ben", user.RoleUser))
require.Nil(t, s.userManager.ChangeTier("ben", "test"))
require.Nil(t, s.userManager.AllowAccess("ben", "announcements", user.PermissionReadWrite))
require.Nil(t, s.userManager.AllowAccess(user.Everyone, "announcements", user.PermissionRead))
require.Nil(t, s.userManager.AllowAccess(user.Everyone, "public_topic", user.PermissionReadWrite))
require.Nil(t, s.userManager.AddUser("phil", "phil", user.RoleUser))
require.Nil(t, s.userManager.ChangeTier("phil", "test"))
require.Nil(t, s.userManager.AddReservation("phil", "reserved-for-phil", user.PermissionReadWrite))
// Set rate visitor as user "phil" on topic
// - "reserved-for-phil": Allowed, because I am the owner
// - "public_topic": Allowed, because it has read-write permissions for everyone
// - "announcements": NOT allowed, because it has read-only permissions for everyone
rr := request(t, s, "GET", "/reserved-for-phil,public_topic,announcements/json?poll=1", "", map[string]string{
"Authorization": util.BasicAuth("phil", "phil"),
"Rate-Topics": "reserved-for-phil,public_topic,announcements",
})
require.Equal(t, 200, rr.Code)
require.Equal(t, "phil", s.topics["reserved-for-phil"].rateVisitor.user.Name)
require.Equal(t, "phil", s.topics["public_topic"].rateVisitor.user.Name)
require.Nil(t, s.topics["announcements"].rateVisitor)
// Set rate visitor as user "ben" on topic
// - "reserved-for-phil": NOT allowed, because I am not the owner
// - "public_topic": Allowed, because it has read-write permissions for everyone
// - "announcements": Allowed, because I have read-write permissions
rr = request(t, s, "GET", "/reserved-for-phil,public_topic,announcements/json?poll=1", "", map[string]string{
"Authorization": util.BasicAuth("ben", "ben"),
"Rate-Topics": "reserved-for-phil,public_topic,announcements",
})
require.Equal(t, 200, rr.Code)
require.Equal(t, "phil", s.topics["reserved-for-phil"].rateVisitor.user.Name)
require.Equal(t, "ben", s.topics["public_topic"].rateVisitor.user.Name)
require.Equal(t, "ben", s.topics["announcements"].rateVisitor.user.Name)
}
func TestServer_SubscriberRateLimiting_ProtectedTopics_WithDefaultReadWrite(t *testing.T) {
c := newTestConfigWithAuthFile(t)
c.AuthDefault = user.PermissionReadWrite