This commit is contained in:
Анатолий Богомолов 2023-12-26 01:32:27 +10:00
parent 32c1617f7d
commit 9a76b96e9d
17 changed files with 516 additions and 404 deletions

3
.gitignore vendored
View File

@ -1,3 +1,4 @@
venv venv
.vscode .vscode
__pycache__ __pycache__
config.ini

25
main.py
View File

@ -1,14 +1,17 @@
import json import sys
import logging
from stolichki.parser import StolichkiParser from loguru import logger
from stolichki.parsers.city import CityParser
from stolichki.types.city import City
@logger.catch
def main():
city = City(111, "Бутово", 1, [])
result = CityParser(city).parse()
print(result)
if __name__ == "__main__": if __name__ == "__main__":
logging.basicConfig( logger.add(sys.stderr, level="DEBUG", backtrace=True, enqueue=True) #type: ignore
level=logging.INFO main()
)
result = StolichkiParser().run()
with open("data.json", "w") as f:
json.dump(result, f, indent=4, ensure_ascii=False)

View File

@ -1,20 +1,70 @@
2captcha-python==1.2.2 argcomplete==2.0.0
attrs==23.1.0 beautifulsoup4==4.12.2
certifi==2023.11.17 blivet==3.8.2
charset-normalizer==3.3.2 blivet-gui==2.4.2
h11==0.14.0 Brlapi==0.8.5
idna==3.6 cffi==1.15.1
outcome==1.3.0.post0 charset-normalizer==3.2.0
packaging==23.2 click==8.1.3
cupshelpers==1.0
dasbus==1.7
dbus-python==1.3.2
distro==1.8.0
dnf==4.18.2
fb-re2==1.0.7
fedora-third-party==0.10
file-magic==0.4.0
fros==1.1
gpg==1.20.0
humanize==3.13.1
idna==3.4
langtable==0.0.64
libcomps==0.1.20
libdnf==0.72.0
libvirt-python==9.7.0
lxml==4.9.3
mercurial==6.5.3
nftables==0.1
olefile==0.46
packaging==23.1
pexpect==4.8.0
pid==2.2.3
Pillow==10.1.0
ply==3.11
podman-compose==1.0.6
productmd==1.38
ptyprocess==0.7.0
pwquality==1.4.5
pycairo==1.25.1
pycparser==2.20
pycups==2.0.1
pycurl==7.45.2
pyenchant==3.2.2
PyGObject==3.46.0
pykickstart==3.48
pyparted==3.13.0
PySocks==1.7.1 PySocks==1.7.1
python-dotenv==1.0.0 python-augeas==1.1.0
requests==2.31.0 python-dateutil==2.8.2
selenium==4.16.0 python-dotenv==0.21.1
selenium-stealth==1.0.6 python-meh==0.51
sniffio==1.3.0 pyudev==0.24.1
sortedcontainers==2.4.0 pyxdg==0.27
trio==0.23.2 PyYAML==6.0.1
trio-websocket==0.11.1 regex==2023.10.3
urllib3==2.1.0 requests==2.28.2
webdriver-manager==4.0.1 requests-file==1.5.1
wsproto==1.2.0 requests-ftp==0.3.1
rpm==4.19.1
selinux @ file:///builddir/build/BUILD/libselinux-3.5/src
sepolicy @ file:///builddir/build/BUILD/selinux-3.5/python/sepolicy
setools==4.4.3
setuptools==67.7.2
simpleaudio==1.0.4
simpleline==1.9.0
six==1.16.0
sos==4.6.0
soupsieve==2.5
systemd-python==235
urllib3==1.26.18
zombie-imp==0.0.2

View File

@ -1,170 +0,0 @@
import time
import logging
import uuid
import os
import json
import re
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.common.exceptions import (
ElementNotVisibleException,
NoSuchElementException,
)
from webdriver_manager.chrome import ChromeDriverManager
from selenium_stealth import stealth
from twocaptcha import TwoCaptcha
class CaptchaSolverError(Exception):
def __init__(self, *args: object) -> None:
super().__init__(*args)
class StolichkiDriver(webdriver.Chrome):
def __init__(
self, options: Options = None, service: Service = None, keep_alive: bool = True
) -> None:
self.last_resp_index = 0
self.last_resp_url = ""
# assert os.environ.get("TWOCAPTCA_KEY") is not None, "Can't fins environment variable TWOCAPTCHA_KEY"
if options is None:
options = webdriver.ChromeOptions()
if not os.path.exists("errors"):
os.mkdir("errors")
service = webdriver.ChromeService(ChromeDriverManager().install())
options.add_experimental_option("excludeSwitches", ["enable-automation"])
options.add_experimental_option("useAutomationExtension", False)
options.page_load_strategy = "eager"
options.capabilities["goog:loggingPrefs"] = {"performance": "ALL"}
# self.__solver = TwoCaptcha(os.environ.get("TWOCAPTCA_KEY"))
super().__init__(options, service, keep_alive)
stealth(
self,
languages=["en-US", "en"],
vendor="Google Inc.",
platform="Win32",
webgl_vendor="Intel Inc.",
renderer="Intel Iris OpenGL Engine",
fix_hairline=True,
)
def set_city(self, id: int):
self.get("https://stolichki.ru/")
self.__edit_cookie("cityId", id)
self.refresh()
def get(self, url: str) -> None:
super().get(url)
logging.info(f"Loading {url}")
for attempt in range(5):
logging.debug(f"Attempt: {attempt + 1} for {url}")
# Ждём 60 секунд, пока не появится логотип.
# Если не появился, обновляем страницу и ждём ещё раз.
# И так пять раз. Если за 5 попыток ничего не вышло, кидаем исключение
if not self.__wait_for_presence('//img[@alt="Логотип"]'):
# self.__handle_captcha()
self.execute_script("window.stop();")
time.sleep(1)
self.refresh()
continue
return
id = str(uuid.uuid4())
# Если страница не загрузилась, сохрняем скрин, ссылку и исходный код
logging.critical(f"Can't reach to {url}.")
self.get_screenshot_as_file(f"errors/{url}-{id}.png")
with open(f"errors/{url}-{id}.html") as f:
f.write(self.page_source)
raise TimeoutError("Can't reach website. Check your connection or query.")
def __wait_for_presence(self, xpath: str, delay: int = 60):
try:
wait = WebDriverWait(self, delay)
wait.until(
EC.presence_of_element_located(
(By.XPATH, xpath)
)
)
logging.info("Loading element was founded")
return True
except (NoSuchElementException, ElementNotVisibleException):
return False
def __edit_cookie(self, name: str, value):
cookie = self.get_cookie(name)
if cookie:
self.delete_cookie(name)
new_cookie = cookie.copy()
new_cookie["value"] = str(value)
self.add_cookie(new_cookie)
def get_network_response(self, url_mask):
logs = self.get_log("performance")
url_mask = re.compile(url_mask)
body = None
for log in filter(lambda log: self.__filter_logs(log), logs):
message = json.loads(log.get("message"))['message']
request_id = message["params"]["requestId"]
resp_url = message["params"]["response"]["url"]
if re.fullmatch(url_mask, resp_url):
body_ = self.execute_cdp_cmd("Network.getResponseBody", {"requestId": request_id})
body = json.loads(body_['body'])
return body
def __filter_logs(self, log):
message = json.loads(log.get("message"))['message']
return (
message.get("method") == "Network.responseReceived"
and "json" in message["params"]["response"]["mimeType"]
)
def __handle_captcha(self) -> None:
for attempt in range(5):
logging.info(f"Trying to solve captcha {attempt + 1}/5")
try:
captcha_image = self.find_element(By.ID, "captcha_image")
except NoSuchElementException:
logging.info("Can't find captcha image")
return None
captcha_base64 = captcha_image.screenshot_as_base64
captcha_text = self.__solver.normal(captcha_base64)
self.find_element(By.ID, "captcha_input").send_keys(captcha_text)
self.find_element(By.ID, "submit_button").click()
if not self.__wait_for_presence('//img[@alt="Логотип"]', 60):
continue
raise CaptchaSolverError()

150
stolichki/driver.py Normal file
View File

@ -0,0 +1,150 @@
import re
import json
import time
import configparser
from loguru import logger
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.common.exceptions import (
ElementNotVisibleException,
NoSuchElementException,
)
import undetected_chromedriver as uc
from webdriver_manager.chrome import ChromeDriverManager
from twocaptcha import TwoCaptcha
from stolichki.errors import CaptchaError, ConfigError, LoadingError
class StolichkiDriver(uc.Chrome):
def __init__(self, **kwargs):
self.__load_config()
self.captcha_solver = TwoCaptcha(self.config["2captcha_key"])
options = Options()
options.page_load_strategy = "eager"
options.capabilities["goog:loggingPrefs"] = {"performance": "ALL"} #type: ignore
driver_exec = ChromeDriverManager().install()
super().__init__(options=options, driver_executable_path=driver_exec, **kwargs)
@logger.catch
def get(self, url, **kwargs):
super().get(url)
self.handle_loading(**kwargs)
def refresh(self, **kwargs) -> None:
super().refresh()
def edit_cookie(self, name: str, value: str):
cookie = self.get_cookie(name)
if cookie:
self.delete_cookie(name)
cookie["value"] = value
self.add_cookie(cookie.copy())
def set_city(self, id: int):
self.edit_cookie("cityId", str(id))
self.refresh()
def set_proxy(self):
...
def get_response(self, url_re: re.Pattern[str]) -> None | dict:
logs = self.get_log("performance")
body = None
for log in filter(self.__filter_logs, logs):
message = json.loads(log.get("message"))['message']
request_id = message["params"]["requestId"]
resp_url = message["params"]["response"]["url"]
if re.fullmatch(url_re, resp_url):
body_ = self.execute_cdp_cmd("Network.getResponseBody", {"requestId": request_id})
body = json.loads(body_['body'])
return body
def __filter_logs(self, log):
message = json.loads(log.get("message"))['message']
return (
message.get("method") == "Network.responseReceived"
and "json" in message["params"]["response"]["mimeType"]
)
def handle_loading(self, **kwargs):
for _ in range(10):
try:
return self.wait_for_presence(**kwargs)
except:
if not self.__handle_captcha():
self.execute_script("window.stop();")
time.sleep(1)
self.refresh()
raise LoadingError("For some reason can't load page. Check logs")
def __handle_captcha(self):
for _ in range(10):
try:
captcha_image = self.find_element(By.ID, "captcha_image")
except NoSuchElementException:
return False
captcha_base64 = captcha_image.screenshot_as_base64
captcha_text = self.captcha_solver.normal(captcha_base64)["code"]
self.find_element(By.ID, "captcha_input").send_keys(captcha_text)
self.find_element(By.ID, "submit_button").click()
try:
if self.wait_for_presence():
return True
except:
return False
continue
raise CaptchaError("Can't find or solve captcha")
def wait_for_presence(self, by: str = By.XPATH, value: str = '//img[@alt="Логотип"]', delay: int = 30):
try:
wait = WebDriverWait(self, delay)
return wait.until(
EC.presence_of_element_located(
(by, value)
)
)
except (NoSuchElementException, ElementNotVisibleException):
return False
def __load_config(self):
config = configparser.ConfigParser()
logger.info("Loading config for driver")
try:
config.read("config.ini")
self.config = config["driver"]
# Проверка на существование полей в конфиге
_ = (
self.config["2captcha_key"]
)
logger.info("Config was loaded successfully")
except:
raise ConfigError(
"Can't read settings for parser. Check your config.ini."
)

11
stolichki/errors.py Normal file
View File

@ -0,0 +1,11 @@
class CaptchaError(Exception):
def __init__(self, *args: object) -> None:
super().__init__(*args)
class LoadingError(Exception):
def __init__(self, *args: object) -> None:
super().__init__(*args)
class ConfigError(Exception):
def __init__(self, *args: object) -> None:
super().__init__(*args)

View File

@ -1,104 +0,0 @@
import logging
from multiprocessing import Pool
from selenium.webdriver.remote.webelement import WebElement
from selenium.webdriver.common.by import By
from .browser import StolichkiDriver
from .product import Product
class StolichkiParser:
city = {
"id": 77,
"name": "Москва",
}
def __init__(self, city: dict | None = None) -> None:
if city is not None:
self.city = city
self.driver = StolichkiDriver()
self.driver.set_city(self.city.get("id"))
logging.info(f"Parser initialize complete! City: {self.city.get('name')}")
def run(self):
logging.info(f"Parser started. City: {self.city.get('name')}")
self.driver.get("https://stolichki.ru/catalog")
categories_lists = self.driver.find_elements(
By.CLASS_NAME, "categoryList__item"
)
links = []
for category_list in categories_lists:
link_tags = category_list.find_elements(
By.CLASS_NAME, "catalogPreview__caption"
)
links.extend(
[
link.get_attribute("href")
for link in link_tags
if link.get_attribute("href") is not None
]
)
logging.info(f"Finished parsing categories: Links: {links}")
items = []
for link in links:
items.extend(self.__get_items(link))
return {
"city": {
"id": self.city.get("id"),
"name": self.city.get("name"),
},
"items": items,
}.copy()
def __get_items(self, url: str):
items_list = []
page = 1
while True:
try:
self.driver.get(f"{url}?page={page}")
except TimeoutError:
continue
catalog_list = self.driver.find_element(By.ID, "catalog-list")
product_items = catalog_list.find_elements(By.CLASS_NAME, "product-item")
if len(product_items) < 1:
break
items = self.__parse_list(product_items)
items_list.extend(items)
page += 1
return items_list
def __parse_list(self, product_items: list[WebElement]):
data = []
product_links: list[str] = []
for product_item in product_items:
product_links.append(
product_item.find_element(
By.XPATH, './/p[contains(@class,"product-title")]/a'
).get_attribute("href")
)
logging.info(f"Links in product list parsed. Links: {product_links}")
for product_link in product_links:
try:
product = Product(self.driver, product_link).get_dict()
except:
continue
logging.info(f"{product} was parsed.")
data.append(product)
return data

View File

View File

@ -0,0 +1,139 @@
import re
import time
from loguru import logger
from selenium.webdriver.common.by import By
from selenium.webdriver.common.action_chains import ActionChains
from stolichki.driver import StolichkiDriver
from stolichki.types import Product
from stolichki.types.city import City
class BaseCategoryParser:
category_link: str
page: int = 1
driver: StolichkiDriver
products: list[Product] = []
def __init__(self, driver: StolichkiDriver, category_link: str) -> None:
self.driver = driver
self.category_link = category_link
def parse(self):
while True:
self.driver.get(f"{self.category_link}?page={self.page}")
links = self.get_products_links()
# Если не нашли ссылки на продукты, значит достигли конца категории
# Поэтому выходим из цикла и возвращаем продукты в основной поток
if not links:
break
for link in links:
try:
product = self.get_product(link)
logger.debug(f"Product parsed: {product}")
except:
continue
if product:
self.products.append(product)
self.page += 1
return self.products
def get_products_links(self) -> list[str]:
products_links: list[str] = []
try:
catalog_list = self.driver.find_element(By.ID, "catalog-list")
product_items = catalog_list.find_elements(By.CLASS_NAME, "product-item")
for item in product_items:
link = item.find_element(
By.XPATH, './/p[contains(@class,"product-title")]/a'
).get_attribute("href")
if link:
products_links.append(link)
except Exception as e:
logger.exception(e)
finally:
return products_links
@logger.catch
def get_product(self, link: str) -> Product | None:
pass
class NormalCategoryParser(BaseCategoryParser):
def __init__(self, driver: StolichkiDriver, category_link: str) -> None:
super().__init__(driver, category_link)
@logger.catch
def get_product(self, link: str) -> Product | None:
self.driver.get(link)
product_info_re = re.compile(r"https://stolichki.ru/drugs/\d{1,}/get")
product_info = self.driver.get_response(product_info_re)
if product_info and product_info.get("status") == "ok":
return Product(product_info["drug"])
return None
class ByfarmCategoryParser(BaseCategoryParser):
def __init__(self, driver: StolichkiDriver, category_link: str) -> None:
super().__init__(driver, category_link)
@logger.catch
def get_product(self, link: str) -> Product | None:
self.driver.get(link)
for _ in range(10):
self.farms_loading_handler(self.driver)
product_info_re = re.compile(r"https://stolichki.ru/drugs/\d{1,}/get")
product_info = self.driver.get_response(product_info_re)
product_farms_re = re.compile(r"https://stolichki\.ru/drugs/\d{1,}/stores\?cityId=\d{1,}&no-captcha-token=.{1,}")
product_farms = self.driver.get_response(product_farms_re)
if (product_info and product_farms) and (product_farms.get("status") == product_info.get("status")):
return Product(product_info["drug"], product_farms["stores"])
return None
def farms_loading_handler(self, driver: StolichkiDriver):
try:
store_stock_button = self.driver.wait_for_presence(By.CLASS_NAME, "stores-stock")
if store_stock_button:
store_stock_button.click()
# Костыль для компонентов, которые начинают работать только при скроле
logger.debug("Scrolling up to 50")
ActionChains(self.driver).scroll_by_amount(0, -50).perform()
time.sleep(1)
logger.debug("Scrolling down to 50")
ActionChains(self.driver).scroll_by_amount(0, 50).perform()
time.sleep(1)
element = self.driver.wait_for_presence(by=By.CLASS_NAME, value="tr-start-store", delay=60)
return element
except:
pass
if not driver.__handle_captcha():
self.driver.execute_script("window.stop;")
self.driver.refresh()
def get_category_parser(city: City):
return ByfarmCategoryParser if bool(city.is_byapt) else NormalCategoryParser #type: ignore

48
stolichki/parsers/city.py Normal file
View File

@ -0,0 +1,48 @@
from selenium.webdriver.common.by import By
from stolichki.driver import StolichkiDriver
from stolichki.parsers.category import get_category_parser
from stolichki.types.city import City
class CityParser:
def __init__(self, city: City) -> None:
self.driver = StolichkiDriver()
self.city = city
self.driver.get("https://stolichki.ru/")
self.driver.set_city(self.city.id)
def parse(self):
self.driver.get("https://stolichki.ru/catalog")
self.get_categories_links()
if not self.links:
return
category_parser = get_category_parser(self.city)
products = []
for link in self.links:
products.extend(category_parser(self.driver, link).parse())
return City(self.city.id, self.city.city, self.city.is_byapt, products)
def get_categories_links(self):
categories_lists = self.driver.find_elements(
By.CLASS_NAME, "categoryList__item"
)
self.links = []
for category_list in categories_lists:
link_tags = category_list.find_elements(
By.CLASS_NAME, "catalogPreview__caption"
)
self.links.extend(
[
link.get_attribute("href")
for link in link_tags
if link.get_attribute("href") is not None
]
)

View File

@ -1,89 +0,0 @@
import logging
import dataclasses
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from .browser import StolichkiDriver
from .types import Store
class Product:
id: int = 0
name: str = ""
available: bool = False
stores: list = []
def __init__(self, driver: StolichkiDriver, url: str) -> None:
self.driver = driver
self.url = url
self.driver.get(self.url)
self.__parse_page()
def get_dict(self):
return {
"id": self.id,
"name": self.name,
"available": self.available,
"stores": self.stores,
}.copy()
def __parse_page(self):
self.name = (
self.driver.find_element(By.XPATH, '//h1[@itemprop="name"]')
.text.removeprefix("Купить")
.strip()
)
self.id = int(self.url.removeprefix("https://stolichki.ru/drugs/"))
try:
stores = self.__parse_stores()
if len(stores) > 0:
self.available = True
self.stores = stores
except:
logging.critical("Can't get info about stores")
def __parse_stores(self):
try:
self.driver.find_element(By.CSS_SELECTOR, "p.badge-class.product-not-found")
self.driver.find_element(By.CSS_SELECTOR, "a.stores-stock.stores-order.package")
return []
except:
pass
self.driver.find_element(By.CLASS_NAME, "stores-stock").click()
wait = WebDriverWait(self.driver, 30)
wait.until(EC.presence_of_element_located((By.CLASS_NAME, "tr-start-store")))
reg_stores = r"https://stolichki\.ru/drugs/\d{1,}/stores\?cityId=\d{1,}&no-captcha-token=.{1,}"
response = self.driver.get_network_response(reg_stores)
for store in response.get("stores"):
if store.get("parts"):
prices = store.get("parts")[0]
store_normal = Store(
id = store.get("id"),
name=store.get("name"),
address=store.get("address"),
price=prices.get("priceStore"),
price_order=prices.get("priceOnline")
)
self.stores.append(store_normal)
if bool(prices.get("bad")):
store_special = dataclasses.replace(store_normal)
store_special.name += " СП"
discounts = prices.get("discounts")
discount = discounts[0].get("value")
store_special.price = store_normal.price - (store_normal.price * (discount / 100))
store_special.price_order = store_normal.price_order - (store_normal.price_order * (discount / 100))
self.stores.append(store_special)

View File

@ -1 +1,4 @@
from .store import * from stolichki.types.city import *
from stolichki.types.product import *
from stolichki.types.farm import *
from stolichki.types.price import *

10
stolichki/types/city.py Normal file
View File

@ -0,0 +1,10 @@
from dataclasses import dataclass
from stolichki.types.product import Product
@dataclass
class City:
id: int
city: str
is_byapt: int
products: list[Product]

26
stolichki/types/farm.py Normal file
View File

@ -0,0 +1,26 @@
from dataclasses import dataclass
from stolichki.types.price import Price
@dataclass(init=False)
class Farm:
id: int
name: str
address: str
special: bool
price: Price
def __init__(self, farm: dict) -> None:
discount = 0
self.id = int(farm["id"])
self.name = str(farm["name"])
self.address = str(farm["address"])
part = farm["parts"][0]
self.special = bool(part["bad"])
if part.get("discounts"):
discount = part["discounts"][0]["value"]
self.price = Price(part["priceStore"], part["priceStoreWithDisc"], part["priceOnline"], part["priceOnlineWithDisc"], discount)

10
stolichki/types/price.py Normal file
View File

@ -0,0 +1,10 @@
from dataclasses import dataclass
@dataclass
class Price:
store: float
store_disc: float
online: float
online_disc: float
discount: int

View File

@ -0,0 +1,33 @@
from dataclasses import dataclass
from loguru import logger
from stolichki.types.farm import Farm
from stolichki.types.price import Price
@dataclass(init=False)
class Product:
id: int
name: str
price: Price
farms: list[Farm]
@logger.catch
def __init__(self, product: dict, farms: list[dict] | None = None) -> None:
assert product.get("id"), "Can't find product information"
self.farms = []
discount = 0
self.id = int(product["id"])
self.name = str(product["name"])
for farm in farms or []:
if farm.get("parts"):
self.farms.append(Farm(farm))
prices: dict = product["prices"]
if product.get("discounts"):
discount = product["discounts"][0]["value"]
self.price = Price(prices["store"], prices["storeWithDisc"], prices["online"], prices["onlineWithDisc"], discount)

View File

@ -1,9 +0,0 @@
from dataclasses import dataclass
@dataclass
class Store():
id: int
name: str
address: str
price: float = 0.0
price_order: float = 0.0