Обработка 100 000 отзывов: Асинхронный пайплайн на Python

26.06.2026 17:00

Моя первая попытка проанализировать тональность 100 000 клиентских отзывов закончилась провалом. Я написал примитивный синхронный цикл, вставил внутрь вызов requests.post к языковой модели и ушел за кофе. Вернувшись, я смотрел на красный стек трейс: скрипт упал на 5432-м запросе с ошибкой HTTP 429 Too Many Requests. Все промежуточные данные пропали в оперативной памяти.

При работе с API LLM-провайдеров синхронный код выживает только на тестовых выборках. На реальных объемах сеть рвет соединения, серверы отбивают запросы лимитами, а пропускная способность упирается в пинг.

Для обработки 100 000 записей со средним временем ответа сервера в одну секунду однопоточному скрипту потребуется 27 часов непрерывной и безошибочной работы. Провайдеры жестко квотируют количество запросов (RPM) и токенов (TPM) в минуту. Наивный обход ограничений через try-except и жесткий time.sleep превращает код в неуправляемое месиво. Нам требовался надежный асинхронный пайплайн, способный балансировать нагрузку и утилизировать квоты через пакетную обработку (batch processing) в RouterAPI.

Иллюзия простоты asyncio

Первая реакция разработчика на медленный I/O — завернуть все в asyncio и отправить запросы через aiohttp. Многие собирают список из десятков тысяч корутин и отдают их в asyncio.gather.

Результат предсказуем. Скрипт мгновенно вываливает в сеть 100 000 HTTP-запросов. Операционная система блокирует процесс из-за исчерпания лимита открытых файловых дескрипторов (file descriptors). Если локальная машина выдержит, фаервол провайдера немедленно заблокирует IP-адрес за аномальный всплеск трафика.

Асинхронность требует контроля конкурентности. В Python инструментом сдерживания выступает asyncio.Semaphore.

Управление потоком через семафоры

Семафор ограничивает количество корутин, одновременно выполняющих определенный участок кода. Он работает как шлюз: задает жесткий лимит на количество активных сетевых соединений.

import asyncio
import aiohttp

async def fetch_analysis(session: aiohttp.ClientSession, review: str, sem: asyncio.Semaphore):
 async with sem:
 payload = {
 "model": "gpt-4o-mini",
 "messages": [{"role": "user", "content": f"Extract entities: {review}"}]
 }
 async with session.post("https://api.RouterAPI.host/v1/chat/completions", json=payload) as response:
 response.raise_for_status
 return await response.json

Конструкция sem = asyncio.Semaphore(50) гарантирует, что мы поддерживаем ровно 50 открытых соединений. Как только один HTTP-запрос завершается, семафор пропускает следующую корутину. Мы решили проблему локальных ресурсов, но провайдер по-прежнему может вернуть 429 ошибку из-за превышения квоты по токенам (TPM).

Защита от сети: Экспоненциальная задержка

Сеть агрессивна. Соединения отваливаются по таймауту, балансировщики провайдеров перезагружаются. Вместо написания велосипедов с циклами while True, я внедрил библиотеку tenacity. Она выносит логику повторных попыток в декораторы.

from tenacity import retry, wait_exponential, stop_after_attempt, retry_if_exception_type

class RateLimitError(Exception):
 pass

@retry(
 wait=wait_exponential(multiplier=1, min=2, max=60),
 stop=stop_after_attempt(5),
 retry=retry_if_exception_type((RateLimitError, aiohttp.ServerDisconnectedError))
)
async def fetch_with_retry(session, payload, headers, url, sem):
 async with sem:
 async with session.post(url, json=payload, headers=headers) as response:
 if response.status == 429:
 raise RateLimitError("Too Many Requests")
 response.raise_for_status
 return await response.json

При получении статуса 429 корутина приостанавливает выполнение на 2 секунды. При повторной ошибке время ожидания удваивается: 4, 8, 16 секунд. Экспоненциальный бэкофф сглаживает пики нагрузки и позволяет API-провайдеру обнулить счетчик лимитов.

Пакетная обработка через RouterAPI

Семафоры и ретраи стабилизируют работу, но отправка 100 000 индивидуальных запросов остается чудовищно неэффективной. Накладные расходы на TLS-рукопожатия, сериализацию JSON и передачу HTTP-заголовков съедают ресурсы. Многие платформы жестко лимитируют именно запросы в минуту (RPM). Мы упирались в потолок RPM, хотя лимит по токенам оставался свободным.

Мы перестроили пайплайн на пакетную обработку (batch processing). Платформа RouterAPI (шлюз) поддерживает обработку объемных контекстов. Вместо одиночных вызовов мы склеиваем отзывы в блоки и отправляем их единым пулом.

Архитектура батч-пайплайна

Пайплайн делится на четыре этапа:

  1. Чанкование (Chunking). Дробим исходный массив из 100 000 записей на блоки по 50 штук.
  2. Формирование контекста. Упаковываем 50 отзывов в один JSON-объект, где ключами служат уникальные идентификаторы записей.
  3. Асинхронная отправка. Применяем семафор к обработке батчей, а не отдельных отзывов.
  4. Потоковая запись на диск. Сохраняем результаты работы каждой корутины в файл JSON Lines сразу после получения ответа.

Реализация чанкования:

def chunk_iterable(iterable, size):
 iterator = iter(iterable)
 for first in iterator:
 yield [first] + [x for _, x in zip(range(size - 1), iterator)]

Сам вызов обрабатывает весь батч целиком:

import aiofiles
import json

async def process_batch(session: aiohttp.ClientSession, chunk: list, sem: asyncio.Semaphore):
 prompt_data = {item['id']: item['text'] for item in chunk}
 
 payload = {
 "model": "anthropic/claude-3.5-sonnet", # RouterAPI автоматически балансирует апстримы
 "messages": [
 {
 "role": "system", 
 "content": "Analyze the JSON of reviews. Return a JSON object with identical keys containing sentiment and tags."
 },
 {"role": "user", "content": json.dumps(prompt_data)}
 ],
 "response_format": {"type": "json_object"}
 }
 
 headers = {
 "Authorization": "Bearer $RouterAPI_KEY",
 "Content-Type": "application/json"
 }
 
 result = await fetch_with_retry(session, payload, headers, "https://api.RouterAPI.host/v1/chat/completions", sem)
 
 # Потоковое сохранение исключает потерю данных
 async with aiofiles.open("results.jsonl", mode="a") as f:
 await f.write(json.dumps({"data": result['choices'][0]['message']['content']}) + "\n")

Оркестратор запускает процесс:

async def main:
 dataset = load_reviews("reviews.json")
 chunks = list(chunk_iterable(dataset, size=50))
 
 # 20 корутин обрабатывают по 50 отзывов. Итого 1000 отзывов в полете.
 sem = asyncio.Semaphore(20) 
 
 timeout = aiohttp.ClientTimeout(total=180) # Для батчей требуется больший таймаут
 async with aiohttp.ClientSession(timeout=timeout) as session:
 tasks = [asyncio.create_task(process_batch(session, chunk, sem)) for chunk in chunks]
 await asyncio.gather(*tasks)

Узкие места батч-архитектуры

При переходе на пакетную генерацию всплывают новые инженерные сложности.

Во-первых, LLM склонны к галлюцинациям в форматировании. Обрабатывая 50 записей, модель может пропустить один идентификатор или нарушить синтаксис JSON. Параметр response_format: {"type": "json_object"} в RouterAPI минимизирует эту вероятность, заставляя парсер шлюза и апстрима жестко валидировать структуру на выходе. Если скрипт ловит json.JSONDecodeError, проблемный батч отправляется в отдельную очередь для одиночной обработки или разбивается пополам.

Во-вторых, aiohttp агрессивно переиспользует TCP-соединения (Keep-Alive). При длительной обработке батчей балансировщики провайдера принудительно закрывают старые соединения. Мы регулярно получали ServerDisconnectedError. Добавление этого исключения в декоратор tenacity полностью закрыло проблему срыва пайплайна.

В-третьих, маршрутизация и фоллбэки. RouterAPI избавляет клиентский код от сложной логики выбора провайдеров. Если OpenAI отдает 502 Bad Gateway или исчерпывает свои лимиты, шлюз на основе конфигурации маршрутизацию автоматически перебрасывает запрос на резервный апстрим (например, Azure или другой узел). Клиентский скрипт просто ждет ответ, не занимаясь микроменеджментом конечных точек.

Синхронный подход падал через час и уничтожал результаты. Асинхронный пайплайн с пакетной обработкой, семафорами и экспоненциальными ретраями обрабатывает 100 000 отзывов за 12 минут. Данные стабильно оседают на жестком диске. Система утилизирует лимиты TPM, минимизирует затраты на HTTP-рукопожатия и полностью игнорирует сетевые аномалии. Инженерия данных начинается там, где заканчиваются наивные циклы и начинается контроль над конкурентностью.

Теги

Ещё по теме