import asyncio import configparser import random from loguru import logger import aiohttp import backoff from crawler.utils.exceptions import ConfigError from crawler.types import City, Proxy from crawler.utils.classes import Singleton log = logger class CrawlerAPI(metaclass=Singleton): api_baseurl = "https://q.asburo.ru/ch/" api_version = "v1" cities = [] proxies = [] def __init__(self) -> None: log.info("Initializing crawler API class") self.api_url = self.api_baseurl + self.api_version config = configparser.ConfigParser() try: config.read("config.ini") log.debug("Reading config file") self.config = config["crawler"] self.rival_tag = self.config["tag"] self.auth = aiohttp.BasicAuth( self.config["username"], self.config["password"] ) log.info("Successfully parsed config file.") log.debug(f"Your auth: {self.auth}") except: raise ConfigError("Can't read settings for crawler api. Check your config.ini.") timeout = aiohttp.ClientTimeout(12000) self.session = aiohttp.ClientSession(timeout=timeout) log.info("Crawler API initialized") @backoff.on_exception(backoff.expo, (aiohttp.ClientError, aiohttp.ServerConnectionError), max_time=60) async def get_cities(self) -> list[City]: if len(self.cities) <= 0: url = f"{self.api_url}/cities/{self.rival_tag}" response = await self.session.get(url, auth=self.auth) if response.status >= 500: raise aiohttp.ServerConnectionError() log.debug(f"Response status: {response.status} for {url}") if response.status == 200: json_response = await response.json() self.cities = [City(**city) for city in json_response.get("city_list")] return self.cities @backoff.on_exception(backoff.expo, aiohttp.ClientError, max_time=60) async def get_proxies(self) -> list[Proxy]: if len(self.proxies) <= 0: url = f"{self.api_url}/proxies/" response = await self.session.get(url, auth=self.auth) log.debug(f"Response status: {response.status} for {url}") if response.status == 200: json_response = await response.json() self.proxies = [Proxy(**proxy) for proxy in json_response.get("proxy_list")] return self.proxies async def get_random_proxy(self) -> Proxy: proxies = await self.get_proxies() return proxies[random.randint(0, len(proxies) - 1)] async def remove_proxy(self, proxy: Proxy) -> Proxy: self.proxies.remove(proxy) return await self.get_random_proxy() @backoff.on_exception(backoff.expo, (aiohttp.ClientError, aiohttp.ServerConnectionError), max_tries=15, logger=log) async def send_products(self, results: list): log.info("Sending data...") url = f"{self.api_url}/prices/{self.rival_tag}" data = { "rows": results } response = await self.session.post(url, json=data, auth=self.auth) status, response_text = response.status, await response.text() log.debug(f"{data} was sended. Status: {status}. Response: {response_text}") if status >= 500: await asyncio.sleep(15) raise aiohttp.ServerConnectionError(response_text) async def __aenter__(self): return self async def __aexit__(self, *args, **kwargs): await self.close() async def close(self): if not self.session.closed: await self.session.close()