import datetime import time from celery import shared_task import asyncio from loguru import logger from telethon.tl.types import UserStatusOnline, UserStatusOffline, UserStatusRecently from app.models.user import User from paper.parser import PaperParser from app.models.task import Task from app.extensions import db from .handlers import run_state, failure_state, success_state async def add_to_group(session, task, task_self): async with PaperParser(session.name) as parser: await parser.invite_users(task.collection.users, task.url, task_self) async def sending_message(session, task, task_self): async with PaperParser(session.name) as parser: await parser.send_messages(task.collection.users, task.message, task.file, task_self) async def parse_users(session, task, db, collection, task_self): parse_bio = True if task.file else False async with PaperParser(session.name) as parser: users = await parser.get_participants(task.url) filtered_users = filter(filter_users, users) for user in filtered_users: if task_self.is_aborted(): return if not User.exist(user.username, collection): bio = None if parse_bio: full = await parser.get_full_info(user) bio = full.full_user.about if full.full_user else None logger.info(f"Got bio for {user.username}. Bio: { bio }") db.session.add( User( first_name=user.first_name, last_name=user.last_name, username=user.username, phone=user.phone, description=bio, collection=collection, ) ) db.session.commit() await asyncio.sleep(5) def filter_users(user): if not user.username or user.bot: return False is_active = False if isinstance(user.status, UserStatusOffline): was_online = user.status.was_online.replace(tzinfo=datetime.timezone.utc) today = datetime.datetime.today().replace(tzinfo=datetime.timezone.utc) is_active = was_online + datetime.timedelta(days=5) >= today if isinstance(user.status, UserStatusOnline) or isinstance(user.status, UserStatusRecently): is_active = True return is_active @shared_task(bind=True) def add_to_group_task(self, task_id: int): try: task: Task = Task.query.get(task_id) run_state(self.request.id, task_id) session = task.session asyncio.run( add_to_group(session, task, self) ) except Exception as e: logger.exception(e) failure_state(task_id, e) else: success_state(task_id) @shared_task(bind=True) def parse_users_task(self, task_id: int): try: task: Task = Task.query.get_or_404(task_id) session = task.session collection = task.collection run_state(self.request.id, task_id) asyncio.run( parse_users(session, task, db, collection, self) ) except Exception as e: logger.exception(e) failure_state(task_id, e) else: success_state(task_id) @shared_task(bind=True) def send_messages_task(self, task_id: int): task: Task = Task.query.get(task_id) session = task.session users = session.users