-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapp.py
More file actions
103 lines (91 loc) · 3.86 KB
/
app.py
File metadata and controls
103 lines (91 loc) · 3.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import logging
from aiogram import Bot, Dispatcher, types
from aiogram.filters import Command
from aiogram.utils.keyboard import InlineKeyboardBuilder
from aiogram.exceptions import TelegramBadRequest
from aiogram import F
from aiogram.enums import ChatType
import asyncio
from dotenv import load_dotenv
import os
from keep_alive import keep_alive
keep_alive()
# Load environment variables from .env file
load_dotenv()
API_TOKEN = os.getenv("BOT_TOKEN")
ADMIN_ID = int(os.getenv("ADMIN_ID"))
logging.basicConfig(level=logging.INFO)
bot = Bot(token=API_TOKEN)
dp = Dispatcher()
@dp.message(Command(commands=["start"]))
async def cmd_start(message: types.Message):
if message.from_user.id == ADMIN_ID:
await message.answer(
"Hello Admin!\n"
"You will receive messages from users here.\n"
"To reply to a user, just reply to their message."
)
else:
await message.answer(
"Hello!\n"
"This is admin contact bot of @gmailbazarbot.\n"
"Send messages in this bot , admin will response soon."
)
# Handler for incoming messages from users (except admin)
@dp.message(F.chat.type == ChatType.PRIVATE, ~F.from_user.id.in_([ADMIN_ID]))
async def user_message_to_admin(message: types.Message):
# Forward message content or copy message to admin with user info
user = message.from_user
try:
# Construct info header
user_info = (f"From user:{user.username}\n"
f"Name: {user.full_name}\n"
f"UserID: {user.id}\n\n")
# Send user info first
info_msg = await bot.send_message(ADMIN_ID, user_info)
# Copy the user's message to admin
forwarded_msg = await message.copy_to(ADMIN_ID, reply_to_message_id=info_msg.message_id)
except TelegramBadRequest as e:
await message.reply("Failed to forward your message to admin.")
logging.error(f"TelegramBadRequest: {e}")
# Handler for messages from admin, reply to forwarded message to send back to user
@dp.message(F.from_user.id == ADMIN_ID)
async def admin_reply_to_user(message: types.Message):
if message.reply_to_message:
# The replied message is from the bot.
replied = message.reply_to_message
if replied.text and replied.text.startswith("From user:"):
# Get user id from the forwarded info message text
lines = replied.text.splitlines()
user_id_line = next((line for line in lines if line.startswith("UserID:")), None)
if user_id_line:
try:
user_id = int(user_id_line.replace("UserID:", "").strip())
except ValueError:
await message.reply("Could not parse user ID from the replied message.")
return
# Send admin's message to the user.
try:
if message.text:
await bot.send_message(user_id, f" {message.text}")
elif message.content_type != 'text':
await message.copy_to(user_id)
else:
await message.answer("Unsupported message type to send to user.")
except TelegramBadRequest:
await message.reply("Failed to send message to the user. Maybe blocked or deleted bot.")
else:
await message.reply("Could not find user ID in the replied message.")
else:
await message.reply("Please reply to the user's info message to send message back to the user.")
else:
await message.reply(
"To reply to a user, please reply to the forwarded user message (the message containing user info)."
)
async def main():
try:
await dp.start_polling(bot)
finally:
await bot.session.close()
if __name__ == "__main__":
asyncio.run(main())