PaperParser/app/blueprints/api/tasks/tasks.py

119 lines
3.5 KiB
Python

import datetime
import time
from celery import shared_task
import asyncio
from loguru import logger
from telethon.tl.types import UserStatusOnline, UserStatusOffline, UserStatusRecently
from app.models.user import User
from paper.parser import PaperParser
from app.models.task import Task
from app.extensions import db
from .handlers import run_state, failure_state, success_state
async def add_to_group(session, task, task_self):
async with PaperParser(session.name) as parser:
await parser.invite_users(task.collection.users, task.url, task_self)
async def sending_message(session, task, task_self):
async with PaperParser(session.name) as parser:
await parser.send_messages(task.collection.users, task.message, task.file, task_self)
async def parse_users(session, task, db, collection, task_self):
parse_bio = True if task.file else False
async with PaperParser(session.name) as parser:
users = await parser.get_participants(task.url)
filtered_users = filter(filter_users, users)
for user in filtered_users:
if task_self.is_aborted():
return
if not User.exist(user.username, collection):
bio = None
if parse_bio:
full = await parser.get_full_info(user)
bio = full.full_user.about if full.full_user else None
logger.info(f"Got bio for {user.username}. Bio: { bio }")
db.session.add(
User(
first_name=user.first_name,
last_name=user.last_name,
username=user.username,
phone=user.phone,
description=bio,
collection=collection,
)
)
db.session.commit()
await asyncio.sleep(5)
def filter_users(user):
if not user.username or user.bot:
return False
is_active = False
if isinstance(user.status, UserStatusOffline):
was_online = user.status.was_online.replace(tzinfo=datetime.timezone.utc)
today = datetime.datetime.today().replace(tzinfo=datetime.timezone.utc)
is_active = was_online + datetime.timedelta(days=5) >= today
if isinstance(user.status, UserStatusOnline) or isinstance(user.status, UserStatusRecently):
is_active = True
return is_active
@shared_task(bind=True)
def add_to_group_task(self, task_id: int):
try:
task: Task = Task.query.get(task_id)
run_state(self.request.id, task_id)
session = task.session
asyncio.run(
add_to_group(session, task, self)
)
except Exception as e:
logger.exception(e)
failure_state(task_id, e)
else:
success_state(task_id)
@shared_task(bind=True)
def parse_users_task(self, task_id: int):
try:
task: Task = Task.query.get_or_404(task_id)
session = task.session
collection = task.collection
run_state(self.request.id, task_id)
asyncio.run(
parse_users(session, task, db, collection, self)
)
except Exception as e:
logger.exception(e)
failure_state(task_id, e)
else:
success_state(task_id)
@shared_task(bind=True)
def send_messages_task(self, task_id: int):
task: Task = Task.query.get(task_id)
session = task.session
users = session.users