PaperParser/app/blueprints/api/tasks/tasks.py

95 lines
2.8 KiB
Python

import datetime
import time
from celery import shared_task
import asyncio
from loguru import logger
from telethon.tl.types import UserStatusOnline, UserStatusOffline, UserStatusRecently
from app.models.user import User
from paper.parser import PaperParser
from app.models.task import Task
from app.extensions import db
from .handlers import run_state, failure_state, success_state
async def add_to_group(session, task, task_self):
async with PaperParser(session.name) as parser:
await parser.invite_users(task.collection.users, task.url, task_self)
async def sending_message(session, task, task_self):
async with PaperParser(session.name) as parser:
await parser.send_messages(task.collection.users, task.message, task.file, task_self)
async def parse_users(session, task):
async with PaperParser(session.name) as parser:
return await parser.get_participants(task.url)
@shared_task(bind=True)
def add_to_group_task(self, task_id: int):
try:
task: Task = Task.query.get(task_id)
run_state(self.request.id, task_id)
session = task.session
time.sleep(10)
asyncio.run(
add_to_group(session, task, self)
)
except Exception as e:
logger.exception(e)
failure_state(task_id, e)
else:
success_state(task_id)
finally:
db.session.commit()
@shared_task(bind=True)
def parse_users_task(self, task_id: int):
task: Task = Task.query.get_or_404(task_id)
session = task.session
collection = task.collection
users = asyncio.run(
parse_users(session, task)
)
for user in users:
if not user.username or user.bot:
continue
is_active = False
if isinstance(user.status, UserStatusOffline):
was_online = user.status.was_online.replace(tzinfo=datetime.timezone.utc)
today = datetime.datetime.today().replace(tzinfo=datetime.timezone.utc)
is_active = was_online + datetime.timedelta(days=5) >= today
if isinstance(user.status, UserStatusOnline) or isinstance(user.status, UserStatusRecently):
is_active = True
if not is_active:
continue
if not User.exist(user.username, collection):
db.session.add(
User(
first_name=user.first_name,
last_name=user.last_name,
username=user.username,
phone=user.phone,
collection=collection,
)
)
db.session.commit()
@shared_task(bind=True)
def send_messages_task(self, task_id: int):
task: Task = Task.query.get(task_id)
session = task.session
users = session.users