107 lines
4.1 KiB
Python
107 lines
4.1 KiB
Python
from loguru import logger
|
|
|
|
from opentele.tl import TelegramClient
|
|
from opentele.api import API
|
|
|
|
from telethon.tl.functions.channels import InviteToChannelRequest, JoinChannelRequest
|
|
from telethon.tl.functions.messages import AddChatUserRequest
|
|
|
|
# Types
|
|
from telethon.hints import (Entity, EntityLike, MessageLike)
|
|
from telethon.types import Channel, Chat, User
|
|
from telethon.sessions.abstract import Session
|
|
|
|
from paper.errors import *
|
|
from telethon.errors.rpcerrorlist import (
|
|
ChatAdminRequiredError,
|
|
UserPrivacyRestrictedError,
|
|
UserNotMutualContactError,
|
|
ChannelPrivateError,
|
|
ChatWriteForbiddenError,
|
|
PeerFloodError,
|
|
FloodWaitError,
|
|
UsersTooMuchError,
|
|
UserChannelsTooMuchError,
|
|
UserIsBlockedError,
|
|
YouBlockedUserError,
|
|
UsernameInvalidError,
|
|
)
|
|
|
|
class PaperClient(TelegramClient):
|
|
def __init__(self, session: str | Session) -> None:
|
|
api = API.TelegramAndroid.Generate("paper")
|
|
super().__init__(f"sessions/{session}.session", api, flood_sleep_threshold=0)
|
|
|
|
async def invite_self(self, group: Entity | EntityLike):
|
|
group = await self.__cast_to_entity(group)
|
|
|
|
if isinstance(group, Channel):
|
|
await self(JoinChannelRequest(group))
|
|
logger.info(f"Added self to a channel {group.title}")
|
|
|
|
@logger.catch(reraise=True)
|
|
async def invite_user(self, user: Entity | EntityLike, group: Entity | EntityLike):
|
|
user, group = await self.__cast_to_entity(user), await self.__cast_to_entity(group)
|
|
|
|
try:
|
|
if isinstance(group, Channel) and isinstance(user, User):
|
|
await self(InviteToChannelRequest(group, [user])) # type: ignore
|
|
logger.info(f"User {user} was added to channel {group.title}")
|
|
|
|
elif isinstance(group, Chat) and isinstance(user, User):
|
|
await self(AddChatUserRequest(group, user, 30)) # type: ignore
|
|
logger.info(f"User {user} was added to chat {group.title}")
|
|
|
|
else:
|
|
logger.warning(f"Can't determine group type for {group}. Skipping...")
|
|
|
|
except (UserPrivacyRestrictedError, UserNotMutualContactError) as e:
|
|
raise UserPrivacyException(e)
|
|
|
|
except (ChatWriteForbiddenError, ChannelPrivateError, ChatAdminRequiredError) as e:
|
|
raise AdminPrivilegesExceptions(e)
|
|
|
|
except (PeerFloodError, FloodWaitError) as e:
|
|
raise FloodException(e)
|
|
|
|
except (UserChannelsTooMuchError, UsersTooMuchError, UserIsBlockedError, YouBlockedUserError) as e:
|
|
raise IgnoreException(e)
|
|
|
|
@logger.catch(reraise=True)
|
|
async def send_message(self, entity: Entity | EntityLike, message: str, file: str | None):
|
|
entity = await self.__cast_to_entity(entity)
|
|
|
|
try:
|
|
return await super().send_message(entity, message=message, file=file) #type: ignore
|
|
|
|
except (UserPrivacyRestrictedError, UserNotMutualContactError) as e:
|
|
raise UserPrivacyException(e)
|
|
|
|
except (ChatWriteForbiddenError, ChannelPrivateError, ChatAdminRequiredError) as e:
|
|
raise AdminPrivilegesExceptions(e)
|
|
|
|
except (PeerFloodError, FloodWaitError) as e:
|
|
raise FloodException(e)
|
|
|
|
except (UserChannelsTooMuchError, UsersTooMuchError, UserIsBlockedError, YouBlockedUserError) as e:
|
|
raise IgnoreException(e)
|
|
|
|
|
|
@logger.catch(reraise=True)
|
|
async def get_participants(self, group: Entity | EntityLike, *args, **kwargs):
|
|
group = await self.__cast_to_entity(group)
|
|
|
|
return await super().get_participants(group, *args, **kwargs)
|
|
|
|
async def __cast_to_entity(self, entity: Entity | EntityLike) -> Entity:
|
|
try:
|
|
if not isinstance(entity, Entity):
|
|
if hasattr(entity, "username"):
|
|
entity = await self.get_entity(entity.username)
|
|
else:
|
|
entity = await self.get_entity(entity)
|
|
|
|
return entity # type: ignore
|
|
except UsernameInvalidError as e:
|
|
raise IgnoreException(e)
|