diff --git a/src/acl.c b/src/acl.c index 177077d45f5..ec8f5e1eca7 100644 --- a/src/acl.c +++ b/src/acl.c @@ -523,6 +523,17 @@ void ACLCopyUser(user *dst, user *src) { } } +/* Set the user that client 'c' is authenticated as, performing any necessary + * bookkeeping for the switch. In particular, any pending BCAST tracking + * invalidations are flushed under the client's current ACL identity before + * c->user changes, so they are not re-filtered by the new user's key + * permissions in beforeSleep. */ +void clientSetUser(client *c, user *new_user) { + if (c->user != new_user) + trackingBroadcastFlushClientPrefixes(c); + c->user = new_user; +} + /* Given a command ID, this function set by reference 'word' and 'bit' * so that user->allowed_commands[word] will address the right word * where the corresponding bit for the provided ID is stored, and @@ -1497,7 +1508,7 @@ void addAuthErrReply(client *c, robj *err) { int checkPasswordBasedAuth(client *c, robj *username, robj *password) { if (ACLCheckUserCredentials(username,password) == C_OK) { c->authenticated = 1; - c->user = ACLGetUserByName(username->ptr,sdslen(username->ptr)); + clientSetUser(c, ACLGetUserByName(username->ptr,sdslen(username->ptr))); moduleNotifyUserChanged(c); return AUTH_OK; } else { @@ -2147,6 +2158,12 @@ sds ACLStringSetUser(user *u, sds username, sds *argv, int argc) { * disconnected if (some of) their channel permissions were revoked. */ if (u) { ACLKillPubsubClientsIfNeeded(tempu, u); + /* Deliver pending BCAST tracking invalidations under the user's + * current permissions before overwriting them in place below. + * Otherwise beforeSleep would re-filter the already accumulated keys + * by the new (possibly stricter) permissions and drop invalidations + * for keys the client could previously read. */ + trackingBroadcastInvalidationMessages(u); } /* Overwrite the user with the temporary user we modified above. */ @@ -2439,6 +2456,14 @@ sds ACLLoadFromFile(const char *filename) { /* Check if we found errors and react accordingly. */ if (sdslen(errors) == 0) { + /* Deliver pending BCAST tracking invalidations under the pre-load ACL + * identities before mutating any user. In particular DefaultUser is + * overwritten in place below, which would otherwise cause its pending + * invalidations to be re-filtered by the new permissions in + * beforeSleep. A whole-table flush is appropriate here since the load + * may change many users at once. */ + trackingBroadcastInvalidationMessages(NULL); + /* The default user pointer is referenced in different places: instead * of replacing such occurrences it is much simpler to copy the new * default user configuration in the old one. */ @@ -2481,7 +2506,7 @@ sds ACLLoadFromFile(const char *filename) { deauthenticateAndCloseClient(c); continue; } - c->user = new; + clientSetUser(c, new); } if (user_channels) @@ -3241,7 +3266,7 @@ static void internalAuth(client *c) { c->authenticated = 1; /* Set the user to the unrestricted user, if it is not already set (default). */ if (c->user != NULL) { - c->user = NULL; + clientSetUser(c, NULL); moduleNotifyUserChanged(c); } addReply(c, shared.ok); diff --git a/src/module.c b/src/module.c index 50a594987a0..6221816b446 100644 --- a/src/module.c +++ b/src/module.c @@ -10809,8 +10809,8 @@ static int authenticateClientWithUser(RedisModuleCtx *ctx, user *user, RedisModu moduleNotifyUserChanged(ctx->client); - ctx->client->user = user; ctx->client->authenticated = 1; + clientSetUser(ctx->client, user); if (clientHasModuleAuthInProgress(ctx->client)) { ctx->client->flags |= CLIENT_MODULE_AUTH_HAS_RESULT; diff --git a/src/networking.c b/src/networking.c index 2f5384c3b99..ebac816abde 100644 --- a/src/networking.c +++ b/src/networking.c @@ -103,7 +103,7 @@ void linkClient(client *c) { static void clientSetDefaultAuth(client *c) { /* If the default user does not require authentication, the user is * directly authenticated. */ - c->user = DefaultUser; + clientSetUser(c, DefaultUser); c->authenticated = (c->user->flags & USER_FLAG_NOPASS) && !(c->user->flags & USER_FLAG_DISABLED); } @@ -193,6 +193,7 @@ client *createClient(connection *conn) { c->ctime = c->lastinteraction = server.unixtime; c->io_lastinteraction = 0; c->duration = 0; + c->user = DefaultUser; /* Set a safe default value: clientSetDefaultAuth reads c->user. */ clientSetDefaultAuth(c); c->replstate = REPL_STATE_NONE; c->repl_start_cmd_stream_on_ack = 0; @@ -1614,8 +1615,8 @@ void clientAcceptHandler(connection *conn) { if (username != NULL) { user *u = ACLGetUserByName(username, sdslen(username)); if (u && !(u->flags & USER_FLAG_DISABLED)) { - c->user = u; c->authenticated = 1; + clientSetUser(c, u); moduleNotifyUserChanged(c); serverLog(LL_VERBOSE, "TLS: Auto-authenticated client as %s", server.hide_user_data_from_log ? "*redacted*" : u->name); @@ -2073,6 +2074,7 @@ void clearClientConnectionState(client *c) { } void deauthenticateAndCloseClient(client *c) { + disableTracking(c); c->user = DefaultUser; c->authenticated = 0; /* We will write replies to this client later, so we can't diff --git a/src/server.c b/src/server.c index df660175e09..ec5ab758821 100644 --- a/src/server.c +++ b/src/server.c @@ -2008,7 +2008,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { /* Send the invalidation messages to clients participating to the * client side caching protocol in broadcasting (BCAST) mode. */ - trackingBroadcastInvalidationMessages(); + trackingBroadcastInvalidationMessages(NULL); /* Record time consumption of AOF writing. */ monotime aof_start_time = getMonotonicUs(); diff --git a/src/server.h b/src/server.h index 9318eec686d..6f58e256bf6 100644 --- a/src/server.h +++ b/src/server.h @@ -3360,7 +3360,9 @@ void trackingLimitUsedSlots(void); uint64_t trackingGetTotalItems(void); uint64_t trackingGetTotalKeys(void); uint64_t trackingGetTotalPrefixes(void); -void trackingBroadcastInvalidationMessages(void); +void trackingBroadcastInvalidationMessages(user *u); +void trackingBroadcastFlushClientPrefixes(client *c); +void clientSetUser(client *c, user *new_user); int checkPrefixCollisionsOrReply(client *c, robj **prefix, size_t numprefix); /* List data type */ diff --git a/src/tracking.c b/src/tracking.c index c235d5812a3..d1cd4c4438d 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -551,32 +551,32 @@ void trackingLimitUsedSlots(void) { timeout_counter++; } -/* Generate Redis protocol for an array containing all the key names - * in the 'keys' radix tree. If the client is not NULL, the list will not - * include keys that were modified the last time by this client, in order - * to implement the NOLOOP option. +/* Build the RESP array of invalidated key names in 'keys', filtered by: + * - ACL key permissions of user 'u' (NULL means all keys are permitted). + * - NOLOOP: if 'noloop_client' is non-NULL, keys last modified by + * that client are excluded. * * If the resulting array would be empty, NULL is returned instead. */ -sds trackingBuildBroadcastReply(client *c, rax *keys) { +sds trackingBuildBroadcastReply(user *u, client *noloop_client, rax *keys) { raxIterator ri; - uint64_t count; + uint64_t count = 0; - if (c == NULL) { - count = raxSize(keys); - } else { - count = 0; - raxStart(&ri,keys); - raxSeek(&ri,"^",NULL,0); - while(raxNext(&ri)) { - if (ri.data != c) count++; - } - raxStop(&ri); - - if (count == 0) return NULL; + raxStart(&ri,keys); + raxSeek(&ri,"^",NULL,0); + while(raxNext(&ri)) { + if (noloop_client && ri.data == noloop_client) + continue; + if (ACLUserCheckKeyPerm(u, (char *)ri.key, ri.key_len, + CMD_KEY_ACCESS) != ACL_OK) + continue; + count++; } + raxStop(&ri); + + if (count == 0) return NULL; /* Create the array reply with the list of keys once, then send - * it to all the clients subscribed to this prefix. */ + * it to the receiving client. */ char buf[32]; size_t len = ll2string(buf,sizeof(buf),count); sds proto = sdsempty(); @@ -587,7 +587,11 @@ sds trackingBuildBroadcastReply(client *c, rax *keys) { raxStart(&ri,keys); raxSeek(&ri,"^",NULL,0); while(raxNext(&ri)) { - if (c && ri.data == c) continue; + if (noloop_client && ri.data == noloop_client) + continue; + if (ACLUserCheckKeyPerm(u, (char *)ri.key, ri.key_len, + CMD_KEY_ACCESS) != ACL_OK) + continue; len = ll2string(buf,sizeof(buf),ri.key_len); proto = sdscatlen(proto,"$",1); proto = sdscatlen(proto,buf,len); @@ -599,11 +603,125 @@ sds trackingBuildBroadcastReply(client *c, rax *keys) { return proto; } +/* Send the pending BCAST invalidation messages accumulated in a single + * prefix's bcastState to every client subscribed to that prefix, then reset + * bs->keys so only keys accumulated from now on are tracked. + * + * For non-NOLOOP clients the invalidation proto is cached per distinct + * ACL user pointer so that ACLUserCheckKeyPerm is called O(U*K) times + * instead of O(C*K) (U = distinct users, C = clients, K = keys). */ +static void trackingBcastInvalidationsForPrefix(bcastState *bs) { + if (raxSize(bs->keys) == 0) return; + + raxIterator ri; + + /* Per-user proto cache. Key: user * pointer (identity), + * value: sds proto (may be NULL for users whose keys are all + * filtered out by ACL). */ + dictType dt = { .hashFunction = dictPtrHash }; + dict *user_cache = dictCreate(&dt); + + /* Send this array of keys to every client in the list. */ + raxStart(&ri,bs->clients); + raxSeek(&ri,"^",NULL,0); + while(raxNext(&ri)) { + client *c; + memcpy(&c,ri.key,sizeof(c)); + + if (c->flags & CLIENT_TRACKING_NOLOOP) { + sds proto = trackingBuildBroadcastReply(c->user, c, bs->keys); + if (proto) { + sendTrackingMessage(c,proto,sdslen(proto),1); + sdsfree(proto); + } + } else { + dictEntry *existing; + dictEntry *de = dictAddRaw(user_cache, c->user, &existing); + if (de != NULL) { + sds proto = trackingBuildBroadcastReply(c->user, NULL, + bs->keys); + dictSetVal(user_cache, de, proto); + } else { + de = existing; + } + void *cached = dictGetVal(de); + if (cached) + sendTrackingMessage(c,(char*)cached,sdslen((sds)cached),1); + } + } + raxStop(&ri); + + /* Free all cached protos. */ + dictIterator *cache_di = dictGetIterator(user_cache); + dictEntry *de; + while ((de = dictNext(cache_di)) != NULL) { + sds proto = dictGetVal(de); + if (proto) sdsfree(proto); + } + dictReleaseIterator(cache_di); + dictRelease(user_cache); + + /* Clean up: we can remove everything from this state, because we + * want to only track the new keys that will be accumulated starting + * from now. */ + raxFree(bs->keys); + bs->keys = raxNew(); +} + +/* Return 1 if at least one client subscribed to 'bs' is authenticated as + * user 'u', 0 otherwise. */ +static int bcastStateHasUser(bcastState *bs, user *u) { + raxIterator ri; + raxStart(&ri,bs->clients); + raxSeek(&ri,"^",NULL,0); + while(raxNext(&ri)) { + client *c; + memcpy(&c,ri.key,sizeof(c)); + if (c->user == u) { + raxStop(&ri); + return 1; + } + } + raxStop(&ri); + return 0; +} + +/* Flush the pending BCAST invalidation messages for every prefix that client + * 'c' subscribes to, so the keys accumulated so far are delivered under c's + * CURRENT ACL identity. + * + * This must be called BEFORE c->user is changed (e.g. on re-AUTH). Otherwise + * beforeSleep would re-filter the already-accumulated keys by the new + * (possibly stricter) permissions and drop invalidations for keys the client + * could previously read. No-op if 'c' is not a BCAST tracking client. */ +void trackingBroadcastFlushClientPrefixes(client *c) { + if (!(c->flags & CLIENT_TRACKING_BCAST)) return; + if (c->client_tracking_prefixes == NULL) return; + if (TrackingTable == NULL || !server.tracking_clients) return; + + raxIterator ri; + raxStart(&ri,c->client_tracking_prefixes); + raxSeek(&ri,"^",NULL,0); + while(raxNext(&ri)) { + void *result; + int found = raxFind(PrefixTable,ri.key,ri.key_len,&result); + serverAssert(found); + trackingBcastInvalidationsForPrefix(result); + } + raxStop(&ri); +} + /* This function will run the prefixes of clients in BCAST mode and * keys that were modified about each prefix, and will send the - * notifications to each client in each prefix. */ -void trackingBroadcastInvalidationMessages(void) { - raxIterator ri, ri2; + * notifications to each client in each prefix. + * + * If 'u' is non-NULL, only prefixes that have at least one client + * authenticated as 'u' are flushed. This is used to deliver pending + * invalidations under the old identity before an in-place ACL change to 'u' + * would otherwise cause beforeSleep to re-filter them by the new permissions. + * Passing NULL flushes every prefix. */ +void trackingBroadcastInvalidationMessages(user *u) { + raxIterator ri; /* Return ASAP if there is nothing to do here. */ if (TrackingTable == NULL || !server.tracking_clients) return; @@ -614,38 +732,8 @@ void trackingBroadcastInvalidationMessages(void) { /* For each prefix... */ while(raxNext(&ri)) { bcastState *bs = ri.data; - - if (raxSize(bs->keys)) { - /* Generate the common protocol for all the clients that are - * not using the NOLOOP option. */ - sds proto = trackingBuildBroadcastReply(NULL,bs->keys); - - /* Send this array of keys to every client in the list. */ - raxStart(&ri2,bs->clients); - raxSeek(&ri2,"^",NULL,0); - while(raxNext(&ri2)) { - client *c; - memcpy(&c,ri2.key,sizeof(c)); - if (c->flags & CLIENT_TRACKING_NOLOOP) { - /* This client may have certain keys excluded. */ - sds adhoc = trackingBuildBroadcastReply(c,bs->keys); - if (adhoc) { - sendTrackingMessage(c,adhoc,sdslen(adhoc),1); - sdsfree(adhoc); - } - } else { - sendTrackingMessage(c,proto,sdslen(proto),1); - } - } - raxStop(&ri2); - - /* Clean up: we can remove everything from this state, because we - * want to only track the new keys that will be accumulated starting - * from now. */ - sdsfree(proto); - } - raxFree(bs->keys); - bs->keys = raxNew(); + if (u == NULL || bcastStateHasUser(bs, u)) + trackingBcastInvalidationsForPrefix(bs); } raxStop(&ri); } diff --git a/tests/unit/tracking.tcl b/tests/unit/tracking.tcl index 174575eee98..1f91c671dd1 100644 --- a/tests/unit/tracking.tcl +++ b/tests/unit/tracking.tcl @@ -883,6 +883,182 @@ start_server {tags {"tracking network logreqres:skip"}} { assert_equal {PONG} [$rd read] } + test {BCAST ACL filtering - two clients same user see only permitted keys} { + clean_all + + r ACL SETUSER shareduser on >pass123 ~public:* +@all + set c1 [redis_deferring_client] + set c2 [redis_deferring_client] + + $c1 AUTH shareduser pass123 + $c1 read + + $c2 AUTH shareduser pass123 + $c2 read + + $c1 HELLO 3 + $c1 read + $c2 HELLO 3 + $c2 read + + $c1 CLIENT TRACKING on BCAST PREFIX public: PREFIX admin: + assert_match {*OK*} [$c1 read] + $c2 CLIENT TRACKING on BCAST PREFIX public: PREFIX admin: + assert_match {*OK*} [$c2 read] + + $rd_sg MSET public:a{t} 1 admin:b{t} 2 + + # Both clients should receive exactly {public:a{t}} for the + # public: prefix, and nothing for admin: (filtered out by ACL). + set c1_keys {} + set c2_keys {} + # Read invalidation messages: there are two prefixes, but only + # public: should have data for shareduser. + after 100 + # $rd_sg is synchronous, so modified keys are already recorded + # on the server by the time we send PING. BCAST invalidations + # are flushed in beforeSleep before PONG, so they precede it + # on the wire. Drain all push messages until we hit the PONG. + $c1 PING + while 1 { + set resp [$c1 read] + if {[lindex $resp 0] eq "invalidate"} { + lappend c1_keys {*}[lindex $resp 1] + } else { + break + } + } + $c2 PING + while 1 { + set resp [$c2 read] + if {[lindex $resp 0] eq "invalidate"} { + lappend c2_keys {*}[lindex $resp 1] + } else { + break + } + } + + assert_equal [lsort $c1_keys] [list public:a{t}] + assert_equal [lsort $c2_keys] [list public:a{t}] + + $c1 CLIENT TRACKING off + $c1 read + $c2 CLIENT TRACKING off + $c2 read + $c1 close + $c2 close + r ACL DELUSER shareduser + } + + test {BCAST re-AUTH re-buckets correctly with ACL filtering} { + clean_all + + r ACL SETUSER usr_a on >passA ~a:* +@all + r ACL SETUSER usr_b on >passB ~b:* +@all + + set tc [redis_deferring_client] + $tc AUTH usr_a passA + $tc read + + $tc HELLO 3 + $tc read + + $tc CLIENT TRACKING on BCAST PREFIX a: PREFIX b: + assert_match {*OK*} [$tc read] + + # Write keys matching both prefixes. + $rd_sg SET a:1{t} val1 + $rd_sg SET b:1{t} val1 + + # Under usr_a, only a:* is visible. + # $rd_sg is synchronous, so modified keys are already recorded + # on the server by the time we send PING. BCAST invalidations + # are flushed in beforeSleep before PONG, so they precede it + # on the wire. Drain all push messages until we hit the PONG. + after 100 + $tc PING + set keys {} + while 1 { + set resp [$tc read] + if {[lindex $resp 0] eq "invalidate"} { + lappend keys {*}[lindex $resp 1] + } else { + break + } + } + assert_equal $keys [list a:1{t}] + + # Re-AUTH as usr_b. + $tc AUTH usr_b passB + $tc read + + # Write again. + $rd_sg SET a:2{t} val2 + $rd_sg SET b:2{t} val2 + + after 100 + $tc PING + set keys {} + while 1 { + set resp [$tc read] + if {[lindex $resp 0] eq "invalidate"} { + lappend keys {*}[lindex $resp 1] + } else { + break + } + } + assert_equal $keys [list b:2{t}] + + $tc CLIENT TRACKING off + $tc read + $tc close + r ACL DELUSER usr_a + r ACL DELUSER usr_b + } + + test {BCAST in-place ACL SETUSER flushes pending invalidations under old perms} { + clean_all + + r ACL SETUSER setu on >setpass ~p:* +@all + set tc [redis_deferring_client] + $tc AUTH setu setpass + $tc read + + $tc HELLO 3 + $tc read + + $tc CLIENT TRACKING on BCAST PREFIX p: + assert_match {*OK*} [$tc read] + + # Modify a key the user can currently read AND revoke that access in + # the same event-loop iteration (MULTI/EXEC), so the invalidation is + # still pending when the user's permissions are overwritten in place. + # Without flushing under the old identity, the pending key would be + # re-filtered by the new (stricter) perms in beforeSleep and dropped. + $rd_sg MULTI + $rd_sg SET p:x{t} 1 + $rd_sg ACL SETUSER setu resetkeys ~q:* + $rd_sg EXEC + + after 100 + $tc PING + set keys {} + while 1 { + set resp [$tc read] + if {[lindex $resp 0] eq "invalidate"} { + lappend keys {*}[lindex $resp 1] + } else { + break + } + } + assert_equal $keys [list p:x{t}] + + $tc CLIENT TRACKING off + $tc read + $tc close + r ACL DELUSER setu + } + $rd_redirection close $rd_sg close $rd close @@ -912,3 +1088,53 @@ start_server {tags {"tracking network"}} { r CLIENT GETREDIR } {-1} } + +# ACL LOAD rewrites the default user in place, so a default-user BCAST client +# must still get invalidations accumulated under the pre-load permissions. +set server_path [tmpdir "tracking.acl"] +set fd [open $server_path/tracking.acl w] +puts $fd "user default on nopass ~p:* &* +@all" +close $fd +start_server [list overrides [list "dir" $server_path "aclfile" "tracking.acl"] tags [list "tracking" "network" "logreqres:skip" "external:skip"]] { + test {BCAST ACL LOAD on default user flushes pending invalidations under old perms} { + set tc [redis_deferring_client] + $tc HELLO 3 + $tc read + + $tc CLIENT TRACKING on BCAST PREFIX p: + assert_match {*OK*} [$tc read] + + # Rewrite the ACL file so the next ACL LOAD revokes default's access + # to p:* keys. + set fd [open $server_path/tracking.acl w] + puts $fd "user default on nopass ~q:* &* +@all" + close $fd + + # Modify a p:* key and reload the (now restrictive) ACL in the same + # event-loop iteration, so the invalidation is still pending when the + # default user is overwritten in place. Without flushing under the old + # identity, the pending key would be re-filtered by the new perms in + # beforeSleep and dropped. + r MULTI + r SET p:x{t} 1 + r ACL LOAD + r EXEC + + after 100 + $tc PING + set keys {} + while 1 { + set resp [$tc read] + if {[lindex $resp 0] eq "invalidate"} { + lappend keys {*}[lindex $resp 1] + } else { + break + } + } + assert_equal $keys [list p:x{t}] + + $tc CLIENT TRACKING off + $tc read + $tc close + } +}