Skip to content

Commit d9fccd3

Browse files
authored
refactor(community-manager): introduce batch processing for user validation (#223)
2 parents 415c162 + 7cf1d17 commit d9fccd3

4 files changed

Lines changed: 98 additions & 5 deletions

File tree

backend/community_manager/actions/chat.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1124,6 +1124,32 @@ def __init__(self, db_session: Session):
11241124
self.authorization_action = AuthorizationAction(db_session)
11251125
# self.bot_api_service = TelegramBotApiService()
11261126

1127+
async def check_chat_members_compliance(self, chat_id: int) -> int:
1128+
"""
1129+
Iterates over all members of a chat in batches and kicks ineligible members.
1130+
1131+
:param chat_id: The ID of the chat to check.
1132+
:return: The total number of members processed.
1133+
"""
1134+
logger.info(f"Starting to check chat members for chat {chat_id=!r}.")
1135+
1136+
total_processed = 0
1137+
for chat_members_chunk in self.telegram_chat_user_service.yield_all_for_chat(
1138+
chat_id=chat_id,
1139+
batch_size=100,
1140+
):
1141+
await self.kick_ineligible_chat_members(chat_members=chat_members_chunk)
1142+
total_processed += len(chat_members_chunk)
1143+
logger.info(
1144+
f"Processed chunk of {len(chat_members_chunk)} users for chat {chat_id=!r}. "
1145+
f"Total processed: {total_processed}"
1146+
)
1147+
1148+
logger.info(
1149+
f"Finished checking members for chat {chat_id=!r}. Total: {total_processed}"
1150+
)
1151+
return total_processed
1152+
11271153
async def kick_chat_member(self, chat_member: TelegramChatUser) -> None:
11281154
"""
11291155
Kicks a specified chat member from the chat. It ensures that the bot

backend/community_manager/tasks/chat.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,7 @@ async def check_target_chat_members(chat_id: int) -> None:
4444
with DBService().db_session() as db_session:
4545
# BotAPI does not need a telethon client
4646
action = CommunityManagerUserChatAction(db_session)
47-
chat_members = action.telegram_chat_user_service.get_all(
48-
chat_ids=[chat_id], with_wallet_details=True
49-
)
50-
logger.info(f"Found {len(chat_members)} chat members for chat {chat_id=!r}.")
51-
await action.kick_ineligible_chat_members(chat_members=chat_members)
47+
await action.check_chat_members_compliance(chat_id=chat_id)
5248

5349

5450
@app.task(

backend/core/src/core/services/chat/user.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,37 @@ def get_all(
170170

171171
return query.all()
172172

173+
def yield_all_for_chat(
174+
self, chat_id: int, batch_size: int = 100
175+
) -> Iterable[list[TelegramChatUser]]:
176+
"""
177+
Yields all users for a given chat in batches, using keyset pagination.
178+
This is useful for processing large chats without loading all users into memory.
179+
"""
180+
last_seen_user_id = 0
181+
while True:
182+
stmt = (
183+
select(TelegramChatUser)
184+
.where(
185+
TelegramChatUser.chat_id == chat_id,
186+
TelegramChatUser.user_id > last_seen_user_id,
187+
)
188+
.order_by(TelegramChatUser.user_id.asc())
189+
.limit(batch_size)
190+
.options(
191+
joinedload(TelegramChatUser.wallet_link).options(
192+
joinedload(TelegramChatUserWallet.wallet),
193+
)
194+
)
195+
)
196+
users = self.db_session.execute(stmt).scalars().unique().all()
197+
198+
if not users:
199+
break
200+
201+
yield users
202+
last_seen_user_id = users[-1].user_id
203+
173204
def get_all_by_linked_wallet(self, addresses: list[str]) -> list[TelegramChatUser]:
174205
query = self.db_session.query(TelegramChatUser)
175206
query = query.join(
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import pytest
2+
from sqlalchemy.orm import Session
3+
4+
from core.models.chat import TelegramChatUser
5+
from core.services.chat.user import TelegramChatUserService
6+
from tests.factories import TelegramChatFactory, TelegramChatUserFactory, UserFactory
7+
8+
9+
@pytest.mark.asyncio
10+
async def test_yield_all_for_chat_batching(db_session: Session) -> None:
11+
# Setup
12+
chat = TelegramChatFactory.with_session(db_session).create()
13+
service = TelegramChatUserService(db_session)
14+
15+
# Create 25 users
16+
users = []
17+
for i in range(25):
18+
user = UserFactory.with_session(db_session).create(telegram_id=1000 + i)
19+
chat_user = TelegramChatUserFactory.with_session(db_session).create(
20+
chat=chat, user=user, is_admin=False, is_managed=True
21+
)
22+
users.append(chat_user)
23+
24+
# Test
25+
batches: list[list[TelegramChatUser]] = []
26+
for batch in service.yield_all_for_chat(chat.id, batch_size=10):
27+
batches.append(batch)
28+
29+
# Verify
30+
assert len(batches) == 3
31+
assert len(batches[0]) == 10
32+
assert len(batches[1]) == 10
33+
assert len(batches[2]) == 5
34+
35+
all_yielded_users = [u for batch in batches for u in batch]
36+
assert len(all_yielded_users) == 25
37+
38+
# Verify order
39+
user_ids = [u.user_id for u in all_yielded_users]
40+
assert user_ids == sorted(user_ids)

0 commit comments

Comments
 (0)