PaperParser/paper/client.py

104 lines
3.9 KiB
Python

from loguru import logger
from opentele.tl import TelegramClient
from opentele.api import API
from telethon.tl.functions.channels import InviteToChannelRequest, JoinChannelRequest
from telethon.tl.functions.messages import AddChatUserRequest
# Types
from telethon.hints import (Entity, EntityLike, MessageLike)
from telethon.types import Channel, Chat, User
from telethon.sessions.abstract import Session
from paper.errors import *
from telethon.errors.rpcerrorlist import (
ChatAdminRequiredError,
UserPrivacyRestrictedError,
UserNotMutualContactError,
ChannelPrivateError,
ChatWriteForbiddenError,
PeerFloodError,
FloodWaitError,
UsersTooMuchError,
UserChannelsTooMuchError,
UserIsBlockedError,
YouBlockedUserError
)
class PaperClient(TelegramClient):
def __init__(self, session: str | Session) -> None:
api = API.TelegramAndroid.Generate("paper")
super().__init__(f"sessions/{session}.session", api, flood_sleep_threshold=120)
async def invite_self(self, group: Entity | EntityLike):
group = await self.__cast_to_entity(group)
if isinstance(group, Channel):
await self(JoinChannelRequest(group))
logger.info(f"Added self to a channel {group.title}")
@logger.catch(reraise=True)
async def invite_user(self, user: Entity | EntityLike, group: Entity | EntityLike):
user, group = await self.__cast_to_entity(user), await self.__cast_to_entity(group)
try:
if isinstance(group, Channel) and isinstance(user, User):
await self(InviteToChannelRequest(group, [user])) # type: ignore
logger.info(f"User {user} was added to channel {group.title}")
elif isinstance(group, Chat) and isinstance(user, User):
await self(AddChatUserRequest(group, user, 30)) # type: ignore
logger.info(f"User {user} was added to chat {group.title}")
else:
logger.warning(f"Can't determine group type for {group}. Skipping...")
except (UserPrivacyRestrictedError, UserNotMutualContactError) as e:
raise UserPrivacyException(e)
except (ChatWriteForbiddenError, ChannelPrivateError, ChatAdminRequiredError) as e:
raise AdminPrivilegesExceptions(e)
except (PeerFloodError, FloodWaitError) as e:
raise FloodException(e)
except (UserChannelsTooMuchError, UsersTooMuchError, UserIsBlockedError, YouBlockedUserError) as e:
raise IgnoreException(e)
@logger.catch(reraise=True)
async def send_message(self, entity: Entity | EntityLike, message: str, file: str | None):
entity = await self.__cast_to_entity(entity)
try:
return await super().send_message(entity, message=message, file=file) #type: ignore
except (UserPrivacyRestrictedError, UserNotMutualContactError) as e:
raise UserPrivacyException(e)
except (ChatWriteForbiddenError, ChannelPrivateError, ChatAdminRequiredError) as e:
raise AdminPrivilegesExceptions(e)
except (PeerFloodError, FloodWaitError) as e:
raise FloodException(e)
except (UserChannelsTooMuchError, UsersTooMuchError, UserIsBlockedError, YouBlockedUserError) as e:
raise IgnoreException(e)
@logger.catch(reraise=True)
async def get_participants(self, group: Entity | EntityLike, *args, **kwargs):
group = await self.__cast_to_entity(group)
return await super().get_participants(group, *args, **kwargs)
async def __cast_to_entity(self, entity: Entity | EntityLike) -> Entity:
if not isinstance(entity, Entity):
if hasattr(entity, "username"):
entity = await self.get_entity(entity.username)
else:
entity = await self.get_entity(entity)
return entity # type: ignore