108 lines
3.1 KiB
Python
108 lines
3.1 KiB
Python
import datetime
|
|
import time
|
|
|
|
from celery import shared_task
|
|
import asyncio
|
|
from loguru import logger
|
|
from telethon.tl.types import UserStatusOnline, UserStatusOffline, UserStatusRecently
|
|
|
|
from app.models.user import User
|
|
from paper.parser import PaperParser
|
|
from app.models.task import Task
|
|
from app.extensions import db
|
|
from .handlers import run_state, failure_state, success_state
|
|
|
|
async def add_to_group(session, task, task_self):
|
|
async with PaperParser(session.name) as parser:
|
|
await parser.invite_users(task.collection.users, task.url, task_self)
|
|
|
|
async def sending_message(session, task, task_self):
|
|
async with PaperParser(session.name) as parser:
|
|
await parser.send_messages(task.collection.users, task.message, task.file, task_self)
|
|
|
|
async def parse_users(session, task, db, collection):
|
|
async with PaperParser(session.name) as parser:
|
|
users = await parser.get_participants(task.url)
|
|
|
|
filtered_users = filter(filter_users, users)
|
|
|
|
for user in filtered_users:
|
|
if not User.exist(user.username, collection):
|
|
full = await parser.get_full_info()
|
|
bio = full.full_user.about if full.full_user else None
|
|
|
|
db.session.add(
|
|
User(
|
|
first_name=user.first_name,
|
|
last_name=user.last_name,
|
|
username=user.username,
|
|
phone=user.phone,
|
|
description=bio,
|
|
collection=collection,
|
|
)
|
|
)
|
|
|
|
db.session.commit()
|
|
|
|
def filter_users(user):
|
|
if not user.username or user.bot:
|
|
return False
|
|
|
|
is_active = False
|
|
if isinstance(user.status, UserStatusOffline):
|
|
was_online = user.status.was_online.replace(tzinfo=datetime.timezone.utc)
|
|
today = datetime.datetime.today().replace(tzinfo=datetime.timezone.utc)
|
|
is_active = was_online + datetime.timedelta(days=5) >= today
|
|
|
|
if isinstance(user.status, UserStatusOnline) or isinstance(user.status, UserStatusRecently):
|
|
is_active = True
|
|
|
|
return is_active
|
|
|
|
@shared_task(bind=True)
|
|
def add_to_group_task(self, task_id: int):
|
|
try:
|
|
task: Task = Task.query.get(task_id)
|
|
run_state(self.request.id, task_id)
|
|
|
|
session = task.session
|
|
|
|
asyncio.run(
|
|
add_to_group(session, task, self)
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.exception(e)
|
|
failure_state(task_id, e)
|
|
|
|
else:
|
|
success_state(task_id)
|
|
|
|
|
|
@shared_task(bind=True)
|
|
def parse_users_task(self, task_id: int):
|
|
try:
|
|
task: Task = Task.query.get_or_404(task_id)
|
|
session = task.session
|
|
collection = task.collection
|
|
|
|
run_state(self.request.id, task_id)
|
|
|
|
asyncio.run(
|
|
parse_users(session, task, db, collection)
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.exception(e)
|
|
failure_state(task_id, e)
|
|
|
|
else:
|
|
success_state(task_id)
|
|
|
|
|
|
@shared_task(bind=True)
|
|
def send_messages_task(self, task_id: int):
|
|
task: Task = Task.query.get(task_id)
|
|
|
|
session = task.session
|
|
users = session.users |