import asyncio import json import re import random from typing import Awaitable, Callable, Optional, Tuple import time from playwright.async_api import Locator, Page, async_playwright from playwright_stealth import Stealth BASE_URL = "https://kwork.ru" PROJECTS_URL = f"{BASE_URL}/projects?c=11" USER_AGENT = ( "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36" ) VIEWPORT = {"width": 1920, "height": 1080} # Настройки задержек (мин, макс) в секундах DELAY_PAGE = (150, 240) # 2.5 - 4 минуты для страниц списка DELAY_PROJECT = (20, 45) # 20 - 45 секунд для детальных страниц (приоритетные) Project = dict[str, str] class RequestThrottler: def __init__(self): self._queue = [] self._last_call_time = 0 self._new_item_event = asyncio.Event() self._worker_task = None #Умный фоновый воркер, умеющий прерывать долгое ожидание ради приоритетных задач async def _worker(self): while True: if not self._queue: self._new_item_event.clear() await self._new_item_event.wait() continue # Сортируем: сначала проекты (priority=0), потом страницы (priority=1), затем по времени self._queue.sort(key=lambda x: (x["priority"], x["time"])) current_task = self._queue[0] if "required_delay" not in current_task: min_d, max_d = current_task["delay"] current_task["required_delay"] = random.uniform(min_d, max_d) required_delay = current_task["required_delay"] now = time.time() elapsed = now - self._last_call_time wait_time = required_delay - elapsed if wait_time > 0: type_str = "ПРОЕКТ" if current_task["priority"] == 0 else "СПИСОК" print(f"[Throttler] Тип:{type_str}. Ждем {wait_time:.2f} сек... (в очереди: {len(self._queue)})") self._new_item_event.clear() try: await asyncio.wait_for(self._new_item_event.wait(), timeout=wait_time) continue except asyncio.TimeoutError: pass self._queue.remove(current_task) self._last_call_time = time.time() if not current_task["fut"].done(): current_task["fut"].set_result(True) #Встает в очередь на выполнение запроса. #priority: 0 для высокого (проекты), 1 для низкого (страницы). async def wait(self, priority: int, delay_range: Tuple[float, float]): if self._worker_task is None: self._worker_task = asyncio.create_task(self._worker()) loop = asyncio.get_running_loop() fut = loop.create_future() item = { "priority": priority, "delay": delay_range, "fut": fut, "time": time.time() } self._queue.append(item) self._new_item_event.set() await fut throttler = RequestThrottler() def normalize_text(text: str) -> str: return re.sub(r"\s+", " ", text).strip() def first_line(text: str) -> str: text = text.strip() return text.splitlines()[0].strip() if text else "" def normalize_url(href: str) -> str: return f"{BASE_URL}{href}" if href.startswith("/") else href async def safe_inner_text(locator: Locator, default: str = "") -> str: try: text = await locator.inner_text(timeout=1500) return text.replace("\xa0", " ").strip() except Exception: return default async def first_text(root: Locator, selectors: list[str], default: str = "") -> str: for selector in selectors: try: loc = root.locator(selector).first text = await safe_inner_text(loc, "") text = normalize_text(text) if text: return text except Exception: continue return default async def get_card_root(page: Page, href: str) -> Locator: card = page.locator(f'xpath=//a[@href="{href}"]/ancestor::div[contains(@class, "wants-card__top")][1]') if await card.count() > 0: return card.first card = page.locator(f'xpath=//a[@href="{href}"]/ancestor::div[contains(@class, "wants-card")][1]') if await card.count() > 0: return card.first return page.locator(f'xpath=//a[@href="{href}"]/ancestor::div[1]').first async def extract_price(card: Locator) -> str: card_text = normalize_text(await safe_inner_text(card, "")) patterns = [r"(Желаемый бюджет:\s*до\s*[\d\s]+₽)", r"(Цена до:\s*[\d\s]+₽)", r"(Допустимый:\s*до\s*[\d\s]+₽)"] found = [] for pattern in patterns: match = re.search(pattern, card_text, flags=re.IGNORECASE) if match: val = normalize_text(match.group(1)) if val not in found: found.append(val) if found: return " | ".join(found) primary = await first_text(card, [".wants-card__price", "[class*='wants-card__price']", "[class*='price']"]) higher = await first_text(card, [".wants-card__description-higher-price", "[class*='description-higher-price']"]) parts = [p for p in [primary, higher] if p] return " | ".join(parts) if parts else "По договоренности" async def extract_description(card: Locator) -> str: description = await first_text(card, [ ".wants-card__description-text .overflow-hidden .d-inline", ".wants-card__description-text .overflow-hidden", ".wants-card__description-text", "[class*='description-text']" ]) for word in ["Показать полностью", "Скрыть"]: description = description.replace(word, "") return first_line(normalize_text(description.replace("\xa0", " "))) async def extract_kwork_project(page: Page, title_block: Locator) -> Optional[Project]: link = title_block.locator("a").first if await link.count() == 0: return None title_text = normalize_text(await safe_inner_text(link, "")) href = await link.get_attribute("href") if not href: return None card = await get_card_root(page, href) return { "title": title_text, "price": await extract_price(card), "url": normalize_url(href), "description": await extract_description(card), } #Парсинг списка (НИЗКИЙ приоритет, ДЛИННАЯ задержка) async def get_kwork_projects(start_page: int = 1, end_page: int = 1) -> list[Project]: all_results: list[Project] = [] if start_page > end_page: start_page = end_page async with Stealth().use_async(async_playwright()) as p: browser = await p.chromium.launch(headless=True) context = await browser.new_context(user_agent=USER_AGENT, viewport=VIEWPORT) page = await context.new_page() try: for current_page in range(start_page, end_page + 1): # Ожидание в очереди (Приоритет 1 = Низкий) await throttler.wait(priority=1, delay_range=DELAY_PAGE) url = f"{PROJECTS_URL}&page={current_page}" print(f"[Листинг] Загрузка страницы {current_page}...") await page.goto(url, wait_until="networkidle") items = await page.locator(".wants-card__header-title").all() for item in items: try: data = await extract_kwork_project(page, item) if data: all_results.append(data) except: continue return all_results except Exception as e: print(f"Ошибка скрапинга списка: {e}") return all_results finally: await browser.close() def clean(text: str) -> str: if not text: return "" return re.sub(r"\s+", " ", text.replace("\xa0", " ")).strip() async def get_text(page: Page, selector: str) -> str: try: element = page.locator(selector).first return await element.inner_text(timeout=3000) if await element.count() > 0 else "" except: return "" #Парсинг деталей вакансии (ВЫСОКИЙ приоритет, КОРОТКАЯ задержка) async def get_project_details(url: str) -> Optional[dict]: await throttler.wait(priority=0, delay_range=DELAY_PROJECT) async with Stealth().use_async(async_playwright()) as p: browser = await p.chromium.launch(headless=True) context = await browser.new_context(user_agent=USER_AGENT) page = await context.new_page() try: print(f"[Проект] Парсим детали: {url}") await page.goto(url, wait_until="networkidle") await asyncio.sleep(2) title = await get_text(page, "h1.wants-card__header-title") description_raw = await get_text(page, ".wants-card__description-text") price_desired_raw = await get_text(page, ".wants-card__price") price_max_raw = await get_text(page, ".wants-card__description-higher-price") buyer_block = await get_text(page, ".want-payer-statistic") buyer_name = await get_text(page, ".want-payer-statistic a") informers_block = await get_text(page, ".want-card__informers") def extract_digits(text): digits = "".join(re.findall(r'\d', text)) return f"{digits} ₽" if digits else "По договоренности" total_projects = re.search(r"Размещено проектов на бирже: (\d+)", buyer_block) hired_percent = re.search(r"Нанято: (\d+%)", buyer_block) offers_count = re.search(r"Предложений:\s*(\d+)", informers_block) time_left = re.search(r"Осталось:\s*(.*?)(?:\n|$)", informers_block) return { "url": url, "title": clean(title), "description": description_raw.strip(), "budget": { "desired": extract_digits(price_desired_raw), "maximum": extract_digits(price_max_raw) }, "buyer": { "name": clean(buyer_name), "total_projects": total_projects.group(1) if total_projects else "0", "hired_percent": hired_percent.group(1) if hired_percent else "н/д" }, "stats": { "offers": offers_count.group(1) if offers_count else "0", "time_left": clean(time_left.group(1)) if time_left else "н/д" } } except Exception as e: print(f"Ошибка парсинга проекта {url}: {e}") return None finally: await browser.close()