From 5ee90bca3d5a16d406a35f467bc0597507dc21cb Mon Sep 17 00:00:00 2001 From: Anatoly Bogomolov Date: Tue, 12 Dec 2023 16:13:58 +1000 Subject: [PATCH] first commit --- .gitignore | 3 ++ main.py | 14 +++++ requirements.txt | 18 +++++++ stolichki/__init__.py | 0 stolichki/browser.py | 118 ++++++++++++++++++++++++++++++++++++++++++ stolichki/parser.py | 109 ++++++++++++++++++++++++++++++++++++++ stolichki/product.py | 81 +++++++++++++++++++++++++++++ 7 files changed, 343 insertions(+) create mode 100644 .gitignore create mode 100644 main.py create mode 100644 requirements.txt create mode 100644 stolichki/__init__.py create mode 100644 stolichki/browser.py create mode 100644 stolichki/parser.py create mode 100644 stolichki/product.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c60f1c7 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +venv +.vscode +__pycache__ \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..9849c31 --- /dev/null +++ b/main.py @@ -0,0 +1,14 @@ +import json +import logging + +from stolichki.parser import StolichkiParser + +if __name__ == "__main__": + logging.basicConfig( + level=logging.INFO + ) + + result = StolichkiParser().run() + + with open("data.json", "w") as f: + json.dump(result, f, indent=4, ensure_ascii=False) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..82aeb36 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,18 @@ +attrs==23.1.0 +certifi==2023.7.22 +charset-normalizer==3.2.0 +exceptiongroup==1.1.3 +h11==0.14.0 +idna==3.4 +outcome==1.2.0 +PySocks==1.7.1 +requests==2.31.0 +selenium==4.12.0 +selenium-stealth==1.0.6 +sniffio==1.3.0 +sortedcontainers==2.4.0 +trio==0.22.2 +trio-websocket==0.10.4 +urllib3==2.0.4 +wsproto==1.2.0 +2captcha-python==1.2.1 \ No newline at end of file diff --git a/stolichki/__init__.py b/stolichki/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/stolichki/browser.py b/stolichki/browser.py new file mode 100644 index 0000000..a1fa961 --- /dev/null +++ b/stolichki/browser.py @@ -0,0 +1,118 @@ +import time +import logging +import uuid +import os + +from selenium import webdriver +from selenium.webdriver.chrome.options import Options +from selenium.webdriver.chrome.service import Service +from selenium.webdriver.support.ui import WebDriverWait +from selenium.webdriver.support import expected_conditions as EC +from selenium.webdriver.common.by import By +from selenium.common.exceptions import ( + ElementNotVisibleException, + NoSuchElementException, +) + +from selenium_stealth import stealth + +from twocaptcha import TwoCaptcha + +class CaptchaSolverError(Exception): + def __init__(self, *args: object) -> None: + super().__init__(*args) + +class StolichkiDriver(webdriver.Chrome): + def __init__( + self, options: Options = None, service: Service = None, keep_alive: bool = True + ) -> None: + + assert os.environ.get("TWOCAPTCA_KEY") is not None, "Can't fins environment variable TWOCAPTCHA_KEY" + + if options is None: + options = webdriver.ChromeOptions() + + if not os.path.exists("errors"): + os.mkdir("errors") + + options.add_experimental_option("excludeSwitches", ["enable-automation"]) + options.add_experimental_option("useAutomationExtension", False) + options.page_load_strategy = "eager" + + self.__solver = TwoCaptcha(os.environ.get("TWOCAPTCA_KEY")) + + super().__init__(options, service, keep_alive) + + stealth( + self, + languages=["en-US", "en"], + vendor="Google Inc.", + platform="Win32", + webgl_vendor="Intel Inc.", + renderer="Intel Iris OpenGL Engine", + fix_hairline=True, + ) + + def get(self, url: str) -> None: + super().get(url) + logging.info(f"Loading {url}") + for attempt in range(5): + logging.debug(f"Attempt: {attempt + 1} for {url}") + + # Ждём 60 секунд, пока не появится логотип. + # Если не появился, обновляем страницу и ждём ещё раз. + # И так пять раз. Если за 5 попыток ничего не вышло, кидаем исключение + if not self.__wait_for_presence('//img[@alt="Логотип"]'): + self.__handle_captcha() + self.execute_script("window.stop();") + time.sleep(1) + self.refresh() + continue + + return + + id = str(uuid.uuid4()) + # Если страница не загрузилась, сохрняем скрин, ссылку и исходный код + logging.critical(f"Can't reach to {url}.") + self.get_screenshot_as_file(f"errors/{url}-{id}.png") + with open(f"errors/{url}-{id}.html") as f: + f.write(self.page_source) + + raise TimeoutError("Can't reach website. Check your connection or query.") + + + + def __wait_for_presence(self, xpath: str, delay: int = 60): + wait = WebDriverWait(self, delay) + try: + wait.until( + EC.presence_of_element_located( + (By.XPATH, xpath) + ) + ) + + logging.info("Loading element was founded") + return True + + except (NoSuchElementException, ElementNotVisibleException): + return False + + def __handle_captcha(self) -> None: + for attempt in range(5): + logging.info(f"Trying to solve captcha {attempt + 1}/5") + try: + captcha_image = self.find_element(By.ID, "captcha_image") + except NoSuchElementException: + logging.info("Can't find captcha image") + return None + + captcha_base64 = captcha_image.screenshot_as_base64 + captcha_text = self.__solver.normal(captcha_base64) + + self.find_element(By.ID, "captcha_input").send_keys(captcha_text) + self.find_element(By.ID, "submit_button").click() + + if not self.__wait_for_presence('//img[@alt="Логотип"]', 60): + continue + + raise CaptchaSolverError() diff --git a/stolichki/parser.py b/stolichki/parser.py new file mode 100644 index 0000000..9e37037 --- /dev/null +++ b/stolichki/parser.py @@ -0,0 +1,109 @@ +import logging +from multiprocessing import Pool + +from selenium import webdriver +from selenium.webdriver.remote.webelement import WebElement +from selenium.webdriver.common.by import By + +from .browser import StolichkiDriver +from .product import Product + + +class StolichkiParser: + city = { + "id": 1, + "name": "Москва", + } + + def __init__(self, city: dict | None = None) -> None: + if city is not None: + self.city = city + + service = webdriver.ChromeService("/home/winet/.local/bin/chromedriver") + self.driver = StolichkiDriver(service=service) + logging.info(f"Parser initialize complete! City: {self.city.get('name')}") + + def run(self): + logging.info(f"Parser started. City: {self.city.get('name')}") + self.driver.get("https://stolichki.ru/catalog") + + categories_lists = self.driver.find_elements( + By.CLASS_NAME, "categoryList__item" + ) + + links = [] + + for category_list in categories_lists: + link_tags = category_list.find_elements( + By.CLASS_NAME, "catalogPreview__caption" + ) + + links.extend( + [ + link.get_attribute("href") + for link in link_tags + if link.get_attribute("href") is not None + ] + ) + + logging.info(f"Finished parsing categories: Links: {links}") + items = [] + for link in links: + items.extend(self.__get_items(link)) + + return { + "city": { + "id": self.city.get("id"), + "name": self.city.get("name"), + }, + "items": items, + }.copy() + + def __get_items(self, url: str): + items_list = [] + page = 1 + while True: + try: + self.driver.get(f"{url}?page={page}") + except TimeoutError: + continue + + catalog_list = self.driver.find_element(By.ID, "catalog-list") + product_items = catalog_list.find_elements(By.CLASS_NAME, "product-item") + + if len(product_items) < 1: + break + + items = self.__parse_list(product_items) + items_list.extend(items) + + page += 1 + + return items_list + + def __parse_list(self, product_items: list[WebElement]): + data = [] + product_links: list[str] = [] + + for product_item in product_items: + product_links.append( + product_item.find_element( + By.XPATH, './/p[contains(@class,"product-title")]/a' + ).get_attribute("href") + ) + logging.info(f"Links in product list parsed. Links: {product_links}") + for product_link in product_links: + try: + product = Product(self.driver, product_link).get_dict() + except: + continue + + logging.info(f"{product} was parsed.") + data.append(product) + + return data + + + def set_city(self, id: int): + # TODO: Написать смену города путём заменой значения в куки браузера + pass diff --git a/stolichki/product.py b/stolichki/product.py new file mode 100644 index 0000000..ab39a39 --- /dev/null +++ b/stolichki/product.py @@ -0,0 +1,81 @@ +import logging + +from selenium.webdriver.common.by import By +from selenium.webdriver.support.ui import WebDriverWait +from selenium.webdriver.support import expected_conditions as EC + +from .browser import StolichkiDriver + + +class Product: + id: int = 0 + name: str = "" + available: bool = False + stores: list = [] + + def __init__(self, driver: StolichkiDriver, url: str) -> None: + self.driver = driver + self.url = url + + self.driver.get(self.url) + self.__parse_page() + + def get_dict(self): + return { + "id": self.id, + "name": self.name, + "available": self.available, + "stores": self.stores, + }.copy() + + def __parse_page(self): + self.name = ( + self.driver.find_element(By.XPATH, '//h1[@itemprop="name"]') + .text.removeprefix("Купить") + .strip() + ) + + self.id = int(self.url.removeprefix("https://stolichki.ru/drugs/")) + + try: + stores = self.__parse_stores() + if len(stores) > 0: + self.available = True + self.stores = stores + except: + logging.critical("Can't get info about stores") + + def __parse_stores(self): + try: + self.driver.find_element(By.CSS_SELECTOR, "p.badge-class.product-not-found") + return [] + except: + pass + + try: + self.driver.find_element(By.CSS_SELECTOR, "a.stores-stock.stores-order.package") + return [] + except: + pass + + self.driver.find_element(By.CLASS_NAME, "stores-stock").click() + wait = WebDriverWait(self.driver, 30) + + wait.until(EC.presence_of_element_located((By.CLASS_NAME, "tr-start-store"))) + + stores = self.driver.find_elements(By.CLASS_NAME, "tr-start-store") + stores_list = [] + for store in stores: + try: + store_name = store.find_element(By.CLASS_NAME, "store-link").text + number_of_product = int(store.find_element(By.CLASS_NAME, "part-quantity").text) + + stores_list.append({ + "name": store_name, + "quantity": number_of_product + }.copy()) + + except: + continue + + return stores_list