first commit

This commit is contained in:
Анатолий Богомолов 2023-12-12 16:13:58 +10:00
commit 5ee90bca3d
7 changed files with 343 additions and 0 deletions

3
.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
venv
.vscode
__pycache__

14
main.py Normal file
View File

@ -0,0 +1,14 @@
import json
import logging
from stolichki.parser import StolichkiParser
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO
)
result = StolichkiParser().run()
with open("data.json", "w") as f:
json.dump(result, f, indent=4, ensure_ascii=False)

18
requirements.txt Normal file
View File

@ -0,0 +1,18 @@
attrs==23.1.0
certifi==2023.7.22
charset-normalizer==3.2.0
exceptiongroup==1.1.3
h11==0.14.0
idna==3.4
outcome==1.2.0
PySocks==1.7.1
requests==2.31.0
selenium==4.12.0
selenium-stealth==1.0.6
sniffio==1.3.0
sortedcontainers==2.4.0
trio==0.22.2
trio-websocket==0.10.4
urllib3==2.0.4
wsproto==1.2.0
2captcha-python==1.2.1

0
stolichki/__init__.py Normal file
View File

118
stolichki/browser.py Normal file
View File

@ -0,0 +1,118 @@
import time
import logging
import uuid
import os
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.common.exceptions import (
ElementNotVisibleException,
NoSuchElementException,
)
from selenium_stealth import stealth
from twocaptcha import TwoCaptcha
class CaptchaSolverError(Exception):
def __init__(self, *args: object) -> None:
super().__init__(*args)
class StolichkiDriver(webdriver.Chrome):
def __init__(
self, options: Options = None, service: Service = None, keep_alive: bool = True
) -> None:
assert os.environ.get("TWOCAPTCA_KEY") is not None, "Can't fins environment variable TWOCAPTCHA_KEY"
if options is None:
options = webdriver.ChromeOptions()
if not os.path.exists("errors"):
os.mkdir("errors")
options.add_experimental_option("excludeSwitches", ["enable-automation"])
options.add_experimental_option("useAutomationExtension", False)
options.page_load_strategy = "eager"
self.__solver = TwoCaptcha(os.environ.get("TWOCAPTCA_KEY"))
super().__init__(options, service, keep_alive)
stealth(
self,
languages=["en-US", "en"],
vendor="Google Inc.",
platform="Win32",
webgl_vendor="Intel Inc.",
renderer="Intel Iris OpenGL Engine",
fix_hairline=True,
)
def get(self, url: str) -> None:
super().get(url)
logging.info(f"Loading {url}")
for attempt in range(5):
logging.debug(f"Attempt: {attempt + 1} for {url}")
# Ждём 60 секунд, пока не появится логотип.
# Если не появился, обновляем страницу и ждём ещё раз.
# И так пять раз. Если за 5 попыток ничего не вышло, кидаем исключение
if not self.__wait_for_presence('//img[@alt="Логотип"]'):
self.__handle_captcha()
self.execute_script("window.stop();")
time.sleep(1)
self.refresh()
continue
return
id = str(uuid.uuid4())
# Если страница не загрузилась, сохрняем скрин, ссылку и исходный код
logging.critical(f"Can't reach to {url}.")
self.get_screenshot_as_file(f"errors/{url}-{id}.png")
with open(f"errors/{url}-{id}.html") as f:
f.write(self.page_source)
raise TimeoutError("Can't reach website. Check your connection or query.")
def __wait_for_presence(self, xpath: str, delay: int = 60):
wait = WebDriverWait(self, delay)
try:
wait.until(
EC.presence_of_element_located(
(By.XPATH, xpath)
)
)
logging.info("Loading element was founded")
return True
except (NoSuchElementException, ElementNotVisibleException):
return False
def __handle_captcha(self) -> None:
for attempt in range(5):
logging.info(f"Trying to solve captcha {attempt + 1}/5")
try:
captcha_image = self.find_element(By.ID, "captcha_image")
except NoSuchElementException:
logging.info("Can't find captcha image")
return None
captcha_base64 = captcha_image.screenshot_as_base64
captcha_text = self.__solver.normal(captcha_base64)
self.find_element(By.ID, "captcha_input").send_keys(captcha_text)
self.find_element(By.ID, "submit_button").click()
if not self.__wait_for_presence('//img[@alt="Логотип"]', 60):
continue
raise CaptchaSolverError()

109
stolichki/parser.py Normal file
View File

@ -0,0 +1,109 @@
import logging
from multiprocessing import Pool
from selenium import webdriver
from selenium.webdriver.remote.webelement import WebElement
from selenium.webdriver.common.by import By
from .browser import StolichkiDriver
from .product import Product
class StolichkiParser:
city = {
"id": 1,
"name": "Москва",
}
def __init__(self, city: dict | None = None) -> None:
if city is not None:
self.city = city
service = webdriver.ChromeService("/home/winet/.local/bin/chromedriver")
self.driver = StolichkiDriver(service=service)
logging.info(f"Parser initialize complete! City: {self.city.get('name')}")
def run(self):
logging.info(f"Parser started. City: {self.city.get('name')}")
self.driver.get("https://stolichki.ru/catalog")
categories_lists = self.driver.find_elements(
By.CLASS_NAME, "categoryList__item"
)
links = []
for category_list in categories_lists:
link_tags = category_list.find_elements(
By.CLASS_NAME, "catalogPreview__caption"
)
links.extend(
[
link.get_attribute("href")
for link in link_tags
if link.get_attribute("href") is not None
]
)
logging.info(f"Finished parsing categories: Links: {links}")
items = []
for link in links:
items.extend(self.__get_items(link))
return {
"city": {
"id": self.city.get("id"),
"name": self.city.get("name"),
},
"items": items,
}.copy()
def __get_items(self, url: str):
items_list = []
page = 1
while True:
try:
self.driver.get(f"{url}?page={page}")
except TimeoutError:
continue
catalog_list = self.driver.find_element(By.ID, "catalog-list")
product_items = catalog_list.find_elements(By.CLASS_NAME, "product-item")
if len(product_items) < 1:
break
items = self.__parse_list(product_items)
items_list.extend(items)
page += 1
return items_list
def __parse_list(self, product_items: list[WebElement]):
data = []
product_links: list[str] = []
for product_item in product_items:
product_links.append(
product_item.find_element(
By.XPATH, './/p[contains(@class,"product-title")]/a'
).get_attribute("href")
)
logging.info(f"Links in product list parsed. Links: {product_links}")
for product_link in product_links:
try:
product = Product(self.driver, product_link).get_dict()
except:
continue
logging.info(f"{product} was parsed.")
data.append(product)
return data
def set_city(self, id: int):
# TODO: Написать смену города путём заменой значения в куки браузера
pass

81
stolichki/product.py Normal file
View File

@ -0,0 +1,81 @@
import logging
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from .browser import StolichkiDriver
class Product:
id: int = 0
name: str = ""
available: bool = False
stores: list = []
def __init__(self, driver: StolichkiDriver, url: str) -> None:
self.driver = driver
self.url = url
self.driver.get(self.url)
self.__parse_page()
def get_dict(self):
return {
"id": self.id,
"name": self.name,
"available": self.available,
"stores": self.stores,
}.copy()
def __parse_page(self):
self.name = (
self.driver.find_element(By.XPATH, '//h1[@itemprop="name"]')
.text.removeprefix("Купить")
.strip()
)
self.id = int(self.url.removeprefix("https://stolichki.ru/drugs/"))
try:
stores = self.__parse_stores()
if len(stores) > 0:
self.available = True
self.stores = stores
except:
logging.critical("Can't get info about stores")
def __parse_stores(self):
try:
self.driver.find_element(By.CSS_SELECTOR, "p.badge-class.product-not-found")
return []
except:
pass
try:
self.driver.find_element(By.CSS_SELECTOR, "a.stores-stock.stores-order.package")
return []
except:
pass
self.driver.find_element(By.CLASS_NAME, "stores-stock").click()
wait = WebDriverWait(self.driver, 30)
wait.until(EC.presence_of_element_located((By.CLASS_NAME, "tr-start-store")))
stores = self.driver.find_elements(By.CLASS_NAME, "tr-start-store")
stores_list = []
for store in stores:
try:
store_name = store.find_element(By.CLASS_NAME, "store-link").text
number_of_product = int(store.find_element(By.CLASS_NAME, "part-quantity").text)
stores_list.append({
"name": store_name,
"quantity": number_of_product
}.copy())
except:
continue
return stores_list