Перейти к содержанию

Глава 14: Работа с базой данных

Аудитория: DevOps, платформенные инженеры

Postgres в production

Cat-Scan использует Cloud SQL (Postgres 15) в качестве единственной операционной базы данных. API подключается через sidecar-контейнер Cloud SQL Auth Proxy на localhost:5432.

Основные таблицы и масштаб

Таблица Примерное кол-во строк Что хранит
rtb_daily ~84 миллиона Ежедневные показатели RTB по байеру, креативу, гео и т. д.
rtb_bidstream ~21 миллион Разбивка потока ставок по паблишеру, гео
rtb_quality варьируется Метрики качества (видимость, безопасность бренда)
rtb_bid_filtering ~188 тысяч Причины и объёмы фильтрации ставок
pretargeting_configs мало Снимки конфигурации претаргетинга
creatives мало Метаданные креативов и миниатюры
import_history мало Записи об импорте CSV
users, permissions, audit_log мало Данные аутентификации и администрирования

Критические индексы

Наиболее важный для производительности шаблон индекса:

CREATE INDEX idx_<table>_buyer_metric_date_desc
    ON <table> (buyer_account_id, metric_date DESC);

Такой индекс существует на таблицах rtb_daily, rtb_bidstream, rtb_quality и rtb_bid_filtering. Он обеспечивает работу запросов свежести данных и аналитики по байерам.

Другие важные индексы: - (metric_date, buyer_account_id): для фильтров по диапазону дат + байеру - (metric_date, billing_id): для запросов с привязкой к billing - (row_hash) UNIQUE: дедупликация при импорте

Дедупликация

Каждая импортированная строка хешируется (столбец row_hash). Уникальное ограничение на row_hash предотвращает дублирование при вставке, что делает повторный импорт безопасным.

Модель подключения

API использует подключение на каждый запрос (без пула соединений). Каждый запрос создаёт новое подключение через psycopg.connect(), обёрнутое в run_in_executor для совместимости с async.

async def pg_query(sql, params=()):
    loop = asyncio.get_event_loop()
    def _execute():
        with _get_connection() as conn:
            cursor = conn.execute(sql, params)
            return [dict(row) for row in cursor.fetchall()]
    return await loop.run_in_executor(None, _execute)

Для production-нагрузок рекомендуется добавить psycopg_pool, если накладные расходы на подключение станут узким местом.

Таймауты запросов

Для ресурсоёмких запросов (например, свежесть данных по большим таблицам) API использует pg_query_with_timeout:

conn.execute(f"SET LOCAL statement_timeout = {timeout_ms}")
cursor = conn.execute(sql, params)

Ключевые детали: - SET LOCAL ограничивает таймаут текущей транзакцией и автоматически сбрасывается при её завершении (commit или rollback). - Таймаут свежести данных по умолчанию: 30 секунд. - Настраивается через переменную окружения UPLOADS_DATA_FRESHNESS_QUERY_TIMEOUT_MS (минимум 1000 мс). - SET LOCAL позволяет избежать проблемы прерванной транзакции, которая возникает при использовании SET + RESET в блоке try/finally (если запрос прерван по таймауту, транзакция переходит в состояние aborted, и RESET завершается с ошибкой).

Шаблон запроса свежести данных

Эндпоинту свежести данных необходимо знать, за какие даты есть данные для каждого типа отчёта. Оптимальный шаблон использует generate_series + EXISTS:

SELECT d::date AS metric_date, 'bidsinauction' AS csv_type, 1 AS row_count
FROM generate_series(%s::date, CURRENT_DATE - 1, '1 day'::interval) AS d
WHERE EXISTS (
    SELECT 1 FROM rtb_daily
    WHERE metric_date = d::date AND buyer_account_id = %s
    LIMIT 1
)

Это выполняет N обращений по индексу (одно на каждый день в окне) вместо сканирования миллионов строк. Для 14-дневного окна: 14 обращений по ~0,1 мс каждое вместо полного параллельного последовательного сканирования, которое занимает 160+ секунд.

Почему GROUP BY здесь не работает: Даже с 1 AS row_count (без COUNT) планировщик выбирает последовательное сканирование, когда результирующий набор GROUP BY велик относительно таблицы. Индекс (buyer_account_id, metric_date DESC) существует, но планировщик оценивает, что сканирование 84 млн строк дешевле, чем 4,4 млн обращений по индексу.

Роль BigQuery

BigQuery хранит сырые гранулярные данные и выполняет пакетные аналитические задачи. Он не используется для запросов API в реальном времени. Схема работы:

  1. Сырые CSV-данные загружаются в таблицы BigQuery.
  2. Пакетные задачи агрегируют данные.
  3. Предварительно агрегированные результаты записываются в Postgres.
  4. API обслуживает запросы из Postgres.

Хранение данных

Настраивается в /settings/retention. Определяет, как долго исторические данные хранятся в Postgres до удаления.

Связанные разделы