Files
qwork/kwork.py

281 lines
11 KiB
Python

import asyncio
import json
import re
import random
from typing import Awaitable, Callable, Optional, Tuple
import time
from playwright.async_api import Locator, Page, async_playwright
from playwright_stealth import Stealth
BASE_URL = "https://kwork.ru"
PROJECTS_URL = f"{BASE_URL}/projects?c=11"
USER_AGENT = (
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36"
)
VIEWPORT = {"width": 1920, "height": 1080}
# Настройки задержек (мин, макс) в секундах
DELAY_PAGE = (150, 240) # 2.5 - 4 минуты для страниц списка
DELAY_PROJECT = (20, 45) # 20 - 45 секунд для детальных страниц (приоритетные)
Project = dict[str, str]
class RequestThrottler:
def __init__(self):
self._queue = []
self._last_call_time = 0
self._new_item_event = asyncio.Event()
self._worker_task = None
#Умный фоновый воркер, умеющий прерывать долгое ожидание ради приоритетных задач
async def _worker(self):
while True:
if not self._queue:
self._new_item_event.clear()
await self._new_item_event.wait()
continue
# Сортируем: сначала проекты (priority=0), потом страницы (priority=1), затем по времени
self._queue.sort(key=lambda x: (x["priority"], x["time"]))
current_task = self._queue[0]
if "required_delay" not in current_task:
min_d, max_d = current_task["delay"]
current_task["required_delay"] = random.uniform(min_d, max_d)
required_delay = current_task["required_delay"]
now = time.time()
elapsed = now - self._last_call_time
wait_time = required_delay - elapsed
if wait_time > 0:
type_str = "ПРОЕКТ" if current_task["priority"] == 0 else "СПИСОК"
print(f"[Throttler] Тип:{type_str}. Ждем {wait_time:.2f} сек... (в очереди: {len(self._queue)})")
self._new_item_event.clear()
try:
await asyncio.wait_for(self._new_item_event.wait(), timeout=wait_time)
continue
except asyncio.TimeoutError:
pass
self._queue.remove(current_task)
self._last_call_time = time.time()
if not current_task["fut"].done():
current_task["fut"].set_result(True)
#Встает в очередь на выполнение запроса.
#priority: 0 для высокого (проекты), 1 для низкого (страницы).
async def wait(self, priority: int, delay_range: Tuple[float, float]):
if self._worker_task is None:
self._worker_task = asyncio.create_task(self._worker())
loop = asyncio.get_running_loop()
fut = loop.create_future()
item = {
"priority": priority,
"delay": delay_range,
"fut": fut,
"time": time.time()
}
self._queue.append(item)
self._new_item_event.set()
await fut
throttler = RequestThrottler()
def normalize_text(text: str) -> str:
return re.sub(r"\s+", " ", text).strip()
def first_line(text: str) -> str:
text = text.strip()
return text.splitlines()[0].strip() if text else ""
def normalize_url(href: str) -> str:
return f"{BASE_URL}{href}" if href.startswith("/") else href
async def safe_inner_text(locator: Locator, default: str = "") -> str:
try:
text = await locator.inner_text(timeout=1500)
return text.replace("\xa0", " ").strip()
except Exception:
return default
async def first_text(root: Locator, selectors: list[str], default: str = "") -> str:
for selector in selectors:
try:
loc = root.locator(selector).first
text = await safe_inner_text(loc, "")
text = normalize_text(text)
if text:
return text
except Exception:
continue
return default
async def get_card_root(page: Page, href: str) -> Locator:
card = page.locator(f'xpath=//a[@href="{href}"]/ancestor::div[contains(@class, "wants-card__top")][1]')
if await card.count() > 0: return card.first
card = page.locator(f'xpath=//a[@href="{href}"]/ancestor::div[contains(@class, "wants-card")][1]')
if await card.count() > 0: return card.first
return page.locator(f'xpath=//a[@href="{href}"]/ancestor::div[1]').first
async def extract_price(card: Locator) -> str:
card_text = normalize_text(await safe_inner_text(card, ""))
patterns = [r"(Желаемый бюджет:\s*до\s*[\d\s]+₽)", r"(Цена до:\s*[\d\s]+₽)", r"(Допустимый:\s*до\s*[\d\s]+₽)"]
found = []
for pattern in patterns:
match = re.search(pattern, card_text, flags=re.IGNORECASE)
if match:
val = normalize_text(match.group(1))
if val not in found: found.append(val)
if found: return " | ".join(found)
primary = await first_text(card, [".wants-card__price", "[class*='wants-card__price']", "[class*='price']"])
higher = await first_text(card, [".wants-card__description-higher-price", "[class*='description-higher-price']"])
parts = [p for p in [primary, higher] if p]
return " | ".join(parts) if parts else "По договоренности"
async def extract_description(card: Locator) -> str:
description = await first_text(card, [
".wants-card__description-text .overflow-hidden .d-inline",
".wants-card__description-text .overflow-hidden",
".wants-card__description-text",
"[class*='description-text']"
])
for word in ["Показать полностью", "Скрыть"]:
description = description.replace(word, "")
return first_line(normalize_text(description.replace("\xa0", " ")))
async def extract_kwork_project(page: Page, title_block: Locator) -> Optional[Project]:
link = title_block.locator("a").first
if await link.count() == 0: return None
title_text = normalize_text(await safe_inner_text(link, ""))
href = await link.get_attribute("href")
if not href: return None
card = await get_card_root(page, href)
return {
"title": title_text,
"price": await extract_price(card),
"url": normalize_url(href),
"description": await extract_description(card),
}
#Парсинг списка (НИЗКИЙ приоритет, ДЛИННАЯ задержка)
async def get_kwork_projects(start_page: int = 1, end_page: int = 1) -> list[Project]:
all_results: list[Project] = []
if start_page > end_page: start_page = end_page
async with Stealth().use_async(async_playwright()) as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context(user_agent=USER_AGENT, viewport=VIEWPORT)
page = await context.new_page()
try:
for current_page in range(start_page, end_page + 1):
# Ожидание в очереди (Приоритет 1 = Низкий)
await throttler.wait(priority=1, delay_range=DELAY_PAGE)
url = f"{PROJECTS_URL}&page={current_page}"
print(f"[Листинг] Загрузка страницы {current_page}...")
await page.goto(url, wait_until="networkidle")
items = await page.locator(".wants-card__header-title").all()
for item in items:
try:
data = await extract_kwork_project(page, item)
if data: all_results.append(data)
except: continue
return all_results
except Exception as e:
print(f"Ошибка скрапинга списка: {e}")
return all_results
finally:
await browser.close()
def clean(text: str) -> str:
if not text: return ""
return re.sub(r"\s+", " ", text.replace("\xa0", " ")).strip()
async def get_text(page: Page, selector: str) -> str:
try:
element = page.locator(selector).first
return await element.inner_text(timeout=3000) if await element.count() > 0 else ""
except: return ""
#Парсинг деталей вакансии (ВЫСОКИЙ приоритет, КОРОТКАЯ задержка)
async def get_project_details(url: str) -> Optional[dict]:
await throttler.wait(priority=0, delay_range=DELAY_PROJECT)
async with Stealth().use_async(async_playwright()) as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context(user_agent=USER_AGENT)
page = await context.new_page()
try:
print(f"[Проект] Парсим детали: {url}")
await page.goto(url, wait_until="networkidle")
await asyncio.sleep(2)
title = await get_text(page, "h1.wants-card__header-title")
description_raw = await get_text(page, ".wants-card__description-text")
price_desired_raw = await get_text(page, ".wants-card__price")
price_max_raw = await get_text(page, ".wants-card__description-higher-price")
buyer_block = await get_text(page, ".want-payer-statistic")
buyer_name = await get_text(page, ".want-payer-statistic a")
informers_block = await get_text(page, ".want-card__informers")
def extract_digits(text):
digits = "".join(re.findall(r'\d', text))
return f"{digits}" if digits else "По договоренности"
total_projects = re.search(r"Размещено проектов на бирже: (\d+)", buyer_block)
hired_percent = re.search(r"Нанято: (\d+%)", buyer_block)
offers_count = re.search(r"Предложений:\s*(\d+)", informers_block)
time_left = re.search(r"Осталось:\s*(.*?)(?:\n|$)", informers_block)
return {
"url": url,
"title": clean(title),
"description": description_raw.strip(),
"budget": {
"desired": extract_digits(price_desired_raw),
"maximum": extract_digits(price_max_raw)
},
"buyer": {
"name": clean(buyer_name),
"total_projects": total_projects.group(1) if total_projects else "0",
"hired_percent": hired_percent.group(1) if hired_percent else "н/д"
},
"stats": {
"offers": offers_count.group(1) if offers_count else "0",
"time_left": clean(time_left.group(1)) if time_left else "н/д"
}
}
except Exception as e:
print(f"Ошибка парсинга проекта {url}: {e}")
return None
finally:
await browser.close()