Глава 14: Работа с базой данных¶
Аудитория: DevOps, платформенные инженеры
Postgres в production¶
Cat-Scan использует Cloud SQL (Postgres 15) в качестве единственной операционной базы данных. API подключается через sidecar-контейнер Cloud SQL Auth Proxy на localhost:5432.
Основные таблицы и масштаб¶
| Таблица | Примерное кол-во строк | Что хранит |
|---|---|---|
rtb_daily |
~84 миллиона | Ежедневные показатели RTB по байеру, креативу, гео и т. д. |
rtb_bidstream |
~21 миллион | Разбивка потока ставок по паблишеру, гео |
rtb_quality |
варьируется | Метрики качества (видимость, безопасность бренда) |
rtb_bid_filtering |
~188 тысяч | Причины и объёмы фильтрации ставок |
pretargeting_configs |
мало | Снимки конфигурации претаргетинга |
creatives |
мало | Метаданные креативов и миниатюры |
import_history |
мало | Записи об импорте CSV |
users, permissions, audit_log |
мало | Данные аутентификации и администрирования |
Критические индексы¶
Наиболее важный для производительности шаблон индекса:
CREATE INDEX idx_<table>_buyer_metric_date_desc
ON <table> (buyer_account_id, metric_date DESC);
Такой индекс существует на таблицах rtb_daily, rtb_bidstream, rtb_quality и rtb_bid_filtering. Он обеспечивает работу запросов свежести данных и аналитики по байерам.
Другие важные индексы:
- (metric_date, buyer_account_id): для фильтров по диапазону дат + байеру
- (metric_date, billing_id): для запросов с привязкой к billing
- (row_hash) UNIQUE: дедупликация при импорте
Дедупликация¶
Каждая импортированная строка хешируется (столбец row_hash). Уникальное ограничение на row_hash предотвращает дублирование при вставке, что делает повторный импорт безопасным.
Модель подключения¶
API использует подключение на каждый запрос (без пула соединений). Каждый запрос создаёт новое подключение через psycopg.connect(), обёрнутое в run_in_executor для совместимости с async.
async def pg_query(sql, params=()):
loop = asyncio.get_event_loop()
def _execute():
with _get_connection() as conn:
cursor = conn.execute(sql, params)
return [dict(row) for row in cursor.fetchall()]
return await loop.run_in_executor(None, _execute)
Для production-нагрузок рекомендуется добавить psycopg_pool, если накладные расходы на подключение станут узким местом.
Таймауты запросов¶
Для ресурсоёмких запросов (например, свежесть данных по большим таблицам) API использует pg_query_with_timeout:
conn.execute(f"SET LOCAL statement_timeout = {timeout_ms}")
cursor = conn.execute(sql, params)
Ключевые детали:
- SET LOCAL ограничивает таймаут текущей транзакцией и автоматически сбрасывается при её завершении (commit или rollback).
- Таймаут свежести данных по умолчанию: 30 секунд.
- Настраивается через переменную окружения UPLOADS_DATA_FRESHNESS_QUERY_TIMEOUT_MS (минимум 1000 мс).
- SET LOCAL позволяет избежать проблемы прерванной транзакции, которая возникает при использовании SET + RESET в блоке try/finally (если запрос прерван по таймауту, транзакция переходит в состояние aborted, и RESET завершается с ошибкой).
Шаблон запроса свежести данных¶
Эндпоинту свежести данных необходимо знать, за какие даты есть данные для каждого типа отчёта. Оптимальный шаблон использует generate_series + EXISTS:
SELECT d::date AS metric_date, 'bidsinauction' AS csv_type, 1 AS row_count
FROM generate_series(%s::date, CURRENT_DATE - 1, '1 day'::interval) AS d
WHERE EXISTS (
SELECT 1 FROM rtb_daily
WHERE metric_date = d::date AND buyer_account_id = %s
LIMIT 1
)
Это выполняет N обращений по индексу (одно на каждый день в окне) вместо сканирования миллионов строк. Для 14-дневного окна: 14 обращений по ~0,1 мс каждое вместо полного параллельного последовательного сканирования, которое занимает 160+ секунд.
Почему GROUP BY здесь не работает: Даже с 1 AS row_count (без COUNT) планировщик выбирает последовательное сканирование, когда результирующий набор GROUP BY велик относительно таблицы. Индекс (buyer_account_id, metric_date DESC) существует, но планировщик оценивает, что сканирование 84 млн строк дешевле, чем 4,4 млн обращений по индексу.
Роль BigQuery¶
BigQuery хранит сырые гранулярные данные и выполняет пакетные аналитические задачи. Он не используется для запросов API в реальном времени. Схема работы:
- Сырые CSV-данные загружаются в таблицы BigQuery.
- Пакетные задачи агрегируют данные.
- Предварительно агрегированные результаты записываются в Postgres.
- API обслуживает запросы из Postgres.
Хранение данных¶
Настраивается в /settings/retention. Определяет, как долго исторические данные хранятся в Postgres до удаления.
Связанные разделы¶
- Обзор архитектуры: место базы данных в архитектуре
- Устранение неполадок: типичные проблемы с базой данных
- Для медиабайеров: Импорт данных описывает таблицу свежести данных с пользовательской точки зрения.