feat: add subscribtion scheduler with ai, pagination

This commit is contained in:
Faynot
2026-03-29 11:25:31 +03:00
parent 1db524f757
commit 164acd6307
16 changed files with 688 additions and 358 deletions

345
kwork.py
View File

@@ -2,12 +2,12 @@ import asyncio
import json
import re
import random
from typing import Awaitable, Callable, Optional
from typing import Awaitable, Callable, Optional, Tuple
import time
from playwright.async_api import Locator, Page, async_playwright
from playwright_stealth import Stealth
# --- Константы ---
BASE_URL = "https://kwork.ru"
PROJECTS_URL = f"{BASE_URL}/projects?c=11"
@@ -18,50 +18,101 @@ USER_AGENT = (
VIEWPORT = {"width": 1920, "height": 1080}
# Задержка между запросами к страницам (в секундах), чтобы избежать блокировок
PAGE_LOAD_DELAY = (3, 6)
# Настройки задержек (мин, макс) в секундах
DELAY_PAGE = (150, 240) # 2.5 - 4 минуты для страниц списка
DELAY_PROJECT = (20, 45) # 20 - 45 секунд для детальных страниц (приоритетные)
Project = dict[str, str]
Extractor = Callable[[Page, Locator], Awaitable[Optional[Project]]]
# --- Вспомогательные функции ---
class RequestThrottler:
def __init__(self):
self._queue = []
self._last_call_time = 0
self._new_item_event = asyncio.Event()
self._worker_task = None
#Умный фоновый воркер, умеющий прерывать долгое ожидание ради приоритетных задач
async def _worker(self):
while True:
if not self._queue:
self._new_item_event.clear()
await self._new_item_event.wait()
continue
# Сортируем: сначала проекты (priority=0), потом страницы (priority=1), затем по времени
self._queue.sort(key=lambda x: (x["priority"], x["time"]))
current_task = self._queue[0]
if "required_delay" not in current_task:
min_d, max_d = current_task["delay"]
current_task["required_delay"] = random.uniform(min_d, max_d)
required_delay = current_task["required_delay"]
now = time.time()
elapsed = now - self._last_call_time
wait_time = required_delay - elapsed
if wait_time > 0:
type_str = "ПРОЕКТ" if current_task["priority"] == 0 else "СПИСОК"
print(f"[Throttler] Тип:{type_str}. Ждем {wait_time:.2f} сек... (в очереди: {len(self._queue)})")
self._new_item_event.clear()
try:
await asyncio.wait_for(self._new_item_event.wait(), timeout=wait_time)
continue
except asyncio.TimeoutError:
pass
self._queue.remove(current_task)
self._last_call_time = time.time()
if not current_task["fut"].done():
current_task["fut"].set_result(True)
#Встает в очередь на выполнение запроса.
#priority: 0 для высокого (проекты), 1 для низкого (страницы).
async def wait(self, priority: int, delay_range: Tuple[float, float]):
if self._worker_task is None:
self._worker_task = asyncio.create_task(self._worker())
loop = asyncio.get_running_loop()
fut = loop.create_future()
item = {
"priority": priority,
"delay": delay_range,
"fut": fut,
"time": time.time()
}
self._queue.append(item)
self._new_item_event.set()
await fut
throttler = RequestThrottler()
def normalize_text(text: str) -> str:
"""Очищает текст от лишних пробельных символов и переносов строк."""
return re.sub(r"\s+", " ", text).strip()
def first_line(text: str) -> str:
"""Извлекает только первую строку из переданного текста."""
text = text.strip()
if not text:
return ""
return text.splitlines()[0].strip()
return text.splitlines()[0].strip() if text else ""
def normalize_url(href: str) -> str:
"""Превращает относительную ссылку (напр. /projects/1) в полную (https://kwork.ru/...)."""
return f"{BASE_URL}{href}" if href.startswith("/") else href
async def safe_inner_text(locator: Locator, default: str = "") -> str:
"""
Безопасно извлекает внутренний текст элемента.
Если элемент не найден или произошла ошибка, возвращает значение по умолчанию.
"""
try:
text = await locator.inner_text(timeout=1500)
text = text.replace("\xa0", " ")
return text.strip()
return text.replace("\xa0", " ").strip()
except Exception:
return default
async def first_text(root: Locator, selectors: list[str], default: str = "") -> str:
"""
Пробует по очереди список CSS-селекторов внутри корневого элемента.
Возвращает очищенный текст первого найденного элемента.
"""
for selector in selectors:
try:
loc = root.locator(selector).first
@@ -73,253 +124,138 @@ async def first_text(root: Locator, selectors: list[str], default: str = "") ->
continue
return default
async def get_card_root(page: Page, href: str) -> Locator:
"""
Находит родительский контейнер (карточку проекта) на основе ссылки на проект.
Использует поиск по XPath 'ancestor', чтобы подняться вверх по DOM-дереву.
"""
card = page.locator(
f'xpath=//a[@href="{href}"]/ancestor::div[contains(@class, "wants-card__top")][1]'
)
if await card.count() > 0:
return card.first
card = page.locator(
f'xpath=//a[@href="{href}"]/ancestor::div[contains(@class, "wants-card")][1]'
)
if await card.count() > 0:
return card.first
card = page.locator(f'xpath=//a[@href="{href}"]/ancestor::div[contains(@class, "wants-card__top")][1]')
if await card.count() > 0: return card.first
card = page.locator(f'xpath=//a[@href="{href}"]/ancestor::div[contains(@class, "wants-card")][1]')
if await card.count() > 0: return card.first
return page.locator(f'xpath=//a[@href="{href}"]/ancestor::div[1]').first
async def extract_price(card: Locator) -> str:
"""
Извлекает стоимость проекта из карточки.
Сначала ищет текст по ключевым фразам (бюджет), затем по CSS-классам цен.
"""
card_text = await safe_inner_text(card, "")
card_text = normalize_text(card_text)
patterns = [
r"(Желаемый бюджет:\s*до\s*[\d\s]+₽)",
r"(Цена до:\s*[\d\s]+₽)",
r"(Допустимый:\s*до\s*[\d\s]+₽)",
]
found: list[str] = []
card_text = normalize_text(await safe_inner_text(card, ""))
patterns = [r"(Желаемый бюджет:\s*до\s*[\d\s]+₽)", r"(Цена до:\s*[\d\s]+₽)", r"(Допустимый:\s*до\s*[\d\s]+₽)"]
found = []
for pattern in patterns:
match = re.search(pattern, card_text, flags=re.IGNORECASE)
if match:
value = normalize_text(match.group(1))
if value and value not in found:
found.append(value)
val = normalize_text(match.group(1))
if val not in found: found.append(val)
if found:
return " | ".join(found)
primary = await first_text(
card,
[
".wants-card__price",
".wants-card__header-right-block .wants-card__price",
"[class*='wants-card__price']",
"[class*='price']",
],
"",
)
higher = await first_text(
card,
[
".wants-card__description-higher-price",
"[class*='description-higher-price']",
],
"",
)
parts = [part for part in [primary, higher] if part]
if parts:
return " | ".join(parts)
return "По договоренности"
if found: return " | ".join(found)
primary = await first_text(card, [".wants-card__price", "[class*='wants-card__price']", "[class*='price']"])
higher = await first_text(card, [".wants-card__description-higher-price", "[class*='description-higher-price']"])
parts = [p for p in [primary, higher] if p]
return " | ".join(parts) if parts else "По договоренности"
async def extract_description(card: Locator) -> str:
"""
Извлекает описание проекта из карточки, удаляя лишние элементы управления
('Показать полностью', 'Скрыть').
"""
description = await first_text(
card,
[
".wants-card__description-text .overflow-hidden .d-inline",
".wants-card__description-text .overflow-hidden",
".wants-card__description-text",
"[class*='description-text']",
],
"",
)
description = description.replace("Показать полностью", "")
description = description.replace("Скрыть", "")
description = description.replace("\xa0", " ")
description = normalize_text(description)
return first_line(description)
description = await first_text(card, [
".wants-card__description-text .overflow-hidden .d-inline",
".wants-card__description-text .overflow-hidden",
".wants-card__description-text",
"[class*='description-text']"
])
for word in ["Показать полностью", "Скрыть"]:
description = description.replace(word, "")
return first_line(normalize_text(description.replace("\xa0", " ")))
async def extract_kwork_project(page: Page, title_block: Locator) -> Optional[Project]:
"""
Функция-экстрактор: собирает все поля (заголовок, цена, ссылка, описание)
для одного конкретного блока заголовка.
"""
link = title_block.locator("a").first
if await link.count() == 0:
return None
if await link.count() == 0: return None
title_text = normalize_text(await safe_inner_text(link, ""))
href = await link.get_attribute("href")
if not href:
return None
if not href: return None
card = await get_card_root(page, href)
price = await extract_price(card)
description = await extract_description(card)
return {
"title": title_text,
"price": price,
"price": await extract_price(card),
"url": normalize_url(href),
"description": description,
"description": await extract_description(card),
}
# --- Основная логика скрапинга ---
async def get_kwork_projects(max_pages: int = 1) -> list[Project]:
"""
Основная функция запуска браузера и обхода страниц.
:param max_pages: Сколько страниц нужно просмотреть.
"""
#Парсинг списка (НИЗКИЙ приоритет, ДЛИННАЯ задержка)
async def get_kwork_projects(start_page: int = 1, end_page: int = 1) -> list[Project]:
all_results: list[Project] = []
if start_page > end_page: start_page = end_page
async with Stealth().use_async(async_playwright()) as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context(
user_agent=USER_AGENT,
viewport=VIEWPORT,
)
context = await browser.new_context(user_agent=USER_AGENT, viewport=VIEWPORT)
page = await context.new_page()
try:
for current_page in range(1, max_pages + 1):
# Формируем URL с учетом номера страницы
for current_page in range(start_page, end_page + 1):
# Ожидание в очереди (Приоритет 1 = Низкий)
await throttler.wait(priority=1, delay_range=DELAY_PAGE)
url = f"{PROJECTS_URL}&page={current_page}"
print(f"Загрузка страницы {current_page}: {url}")
print(f"[Листинг] Загрузка страницы {current_page}...")
await page.goto(url, wait_until="networkidle")
# Дополнительная задержка для рендеринга JS элементов
await asyncio.sleep(2)
items = await page.locator(".wants-card__header-title").all()
print(f"Найдено проектов на странице: {len(items)}")
for item in items:
try:
data = await extract_kwork_project(page, item)
if data:
all_results.append(data)
except Exception as e:
print(f"Ошибка при парсинге карточки: {e}")
continue
# Ограничение частоты запросов (Rate Limiting)
if current_page < max_pages:
delay = random.uniform(*PAGE_LOAD_DELAY)
print(f"Ожидаем {delay:.2f} сек. перед следующей страницей...")
await asyncio.sleep(delay)
if data: all_results.append(data)
except: continue
return all_results
except Exception as e:
print(f"Произошла критическая ошибка: {e}")
print(f"Ошибка скрапинга списка: {e}")
return all_results
finally:
await browser.close()
def clean(text: str) -> str:
"""Очищает текст от мусора, неразрывных пробелов и лишних пустот."""
if not text: return ""
text = text.replace("\xa0", " ")
return re.sub(r"\s+", " ", text).strip()
return re.sub(r"\s+", " ", text.replace("\xa0", " ")).strip()
async def get_text(page: Page, selector: str) -> str:
"""Извлекает текст из элемента, если он существует."""
try:
element = page.locator(selector).first
if await element.count() > 0:
return await element.inner_text(timeout=3000)
return ""
except:
return ""
return await element.inner_text(timeout=3000) if await element.count() > 0 else ""
except: return ""
#Парсинг деталей вакансии (ВЫСОКИЙ приоритет, КОРОТКАЯ задержка)
async def get_project_details(url: str) -> Optional[dict]:
"""
Открывает страницу проекта, парсит HTML и возвращает структурированные данные.
"""
await throttler.wait(priority=0, delay_range=DELAY_PROJECT)
async with Stealth().use_async(async_playwright()) as p:
# Запускаем браузер
browser = await p.chromium.launch(headless=True)
context = await browser.new_context(user_agent=USER_AGENT)
page = await context.new_page()
try:
print(f"Парсим проект: {url}")
print(f"[Проект] Парсим детали: {url}")
await page.goto(url, wait_until="networkidle")
# Ждем немного, чтобы JS отработал до конца
await asyncio.sleep(2)
# 1. Извлекаем заголовок
title = await get_text(page, "h1.wants-card__header-title")
# 2. Извлекаем описание (сохраняем структуру)
description_raw = await get_text(page, ".wants-card__description-text")
# 3. Бюджет (Желаемый и Допустимый)
# Извлекаем только цифры через регулярку
price_desired_raw = await get_text(page, ".wants-card__price")
price_max_raw = await get_text(page, ".wants-card__description-higher-price")
# 4. Информация о заказчике
buyer_block = await get_text(page, ".want-payer-statistic")
buyer_name = await get_text(page, ".want-payer-statistic a")
# 5. Статистика проекта (предложения и время)
informers_block = await get_text(page, ".want-card__informers")
# --- Парсинг данных через регулярные выражения для точности ---
# Чистим цену: оставляем только цифры
def extract_digits(text):
digits = "".join(re.findall(r'\d', text))
return f"{digits}" if digits else "По договоренности"
# Вырезаем статистику из текста блоков
total_projects = re.search(r"Размещено проектов на бирже: (\d+)", buyer_block)
hired_percent = re.search(r"Нанято: (\d+%)", buyer_block)
offers_count = re.search(r"Предложений:\s*(\d+)", informers_block)
time_left = re.search(r"Осталось:\s*(.*?)(?:\n|$)", informers_block)
# Формируем итоговый объект
data = {
return {
"url": url,
"title": clean(title),
"description": description_raw.strip(),
@@ -337,29 +273,8 @@ async def get_project_details(url: str) -> Optional[dict]:
"time_left": clean(time_left.group(1)) if time_left else "н/д"
}
}
return data
except Exception as e:
print(f"Ошибка при парсинге {url}: {e}")
print(f"Ошибка парсинга проекта {url}: {e}")
return None
finally:
await browser.close()
if __name__ == "__main__":
#pages_to_scan = 2
#data = asyncio.run(get_kwork_projects(max_pages=pages_to_scan))
target_url = "https://kwork.ru/projects/3134701"
# Запуск парсера для одной ссылки
result = asyncio.run(get_project_details(target_url))
if result:
print("\n--- Данные проекта в формате JSON ---")
print(json.dumps(result, ensure_ascii=False, indent=4))
#print(f"\nВсего собрано проектов: {len(data)}")
#print(json.dumps(data, ensure_ascii=False, indent=4))