Асинхронный API драйвера как замена для неправильного использования IN и Batch statements

На практике бывают ситуации когда нужно выполнить большое количество однотипных запросов к Cassandra, как правило это приводит к проблеме производительности. Разработчики довольно часто пытаются решить эту проблему с помощью средств для этого не предназначенных, таких как выражение IN и Batch statements, что в свою очередь приводит к другим нехорошим ситуациям.

В данной статье расскажу об этих проблемах и почему не стоит злоупотреблять IN и батчами, и почему нужно использовать асинхронные запросы.

Задача

Итак, для примера представим что у нас есть система для хранения результатов автоматических тестов, также эта система позволяет помечать упавшие тесты багами и предоставлять отчет о том сколько тестов прошло, сколько упало и сколько из них помечено багами, с точки зрения базы данных она выглядит примерно так:

Таблица для хранения тестов:

CREATE TABLE tests (
id timeuuid PRIMARY KEY,
result boolean
)

Таблица для хранения багов:

CREATE TABLE bugs (
name text,
id timeuuid,
status text,
title text,
PRIMARY KEY (name, id)
) WITH CLUSTERING ORDER BY (id DESC)

name — уникальное имя-ключ. Для одного name может быть несколько версий бага, скажем с разным статусом, но актуальная версия всегда будет последней.

Таблица для поиска багов по тестам, необходима чтобы пользователь мог видеть какие тесты каким дефектом отмечены:

CREATE TABLE bugs_by_tests (
test_id timeuuid,
bug_id timeuuid,
linker text,
PRIMARY KEY (test_id, bug_id)
)

Допустим, у нас сохранены тысяча другая тестов и  какой-нибудь баг в нашей базе данных. Теперь  рассмотрим случай, когда пользователь через пользовательский интерфейс выбрал некоторое количество тестов  и хочет отметить их этим багом.

Блокирующие запросы

В коде приложения нам необходимо добавить ссылки тестов на этот дефект в таблице bugs_by_tests, самый простой вариант — это выполнить запросы в цикле:

public void linkBugToTests(UUID bugId, Collection<UUID> testIds, String user) {
    for (UUID testId : testIds) {
        session.execute("INSERT INTO bugs_by_tests (test_id, bug_id) VALUES (?,?,?);",
                testId, bugId, user);
    }
}

Какие есть проблемы с этим кодом?

  1. Долгое время выполнения кода. Чем больше тестов пользователь захочет отметить, тем дольше ему придется ждать результата. Это действительно может стать большой проблемой, если вам необходимо выполнить больше десятка запросов. Причем большую часть времени ваше приложение будет проводить просто в ожидании ответов от базы данных и ничего не делать.
  2. Иногда может существовать требование атомарности, когда обязательно должны быть выполнены все запросы, или ни одного. В случае же WriteTimeoutException на одном из запросов часть оставшихся запросов просто не будет выполнена. В этой ситуации пользователь может остаться недоволен тем, что он хотел отметить 100 тестов багом, а в результате отметилось, скажем только 42, потому что на 42 запросе мы получили исключение, которое, кстати вовсе не означает, что данные не были сохранены для 42 запроса.  Но сам факт наличия исключения не дал исполниться оставшимся запросам.

Теперь рассмотрим способы борьбы с низкой производительностью этого кода.

IN clause

Итак, первый из вариантов — это использование выражения IN в попытке уменьшить количество запросов. Предыдущий код можно заменить на следующий с выражением с IN:

public void linkBugToTests(UUID bugId, Collection<UUID> testIds, String user) {

    Statement updateStatement = QueryBuilder.update("bugs_by_tests")
            .with(set("linker", user))
            .where(in("test_id", testIds))
            .and(eq("bug_id", bugId));

    session.execute(updateStatement);
}

Теперь будет только один запрос, скорость выполнения кода возрастет, но:

  1. Максимальное количество значений которое можно использовать для оператора IN равно 65535.
  2. Чем больше значений в IN, тем выше вероятность получить WriteTimeoutException,  так как в нашей таблице поле test_id — это partition ключ, то есть для выполнения этого запроса координатору нужно запросить большое количество реплик, и обработать большое количество ответов, что существенно повышает нагрузку на координатор. На моей практике такие запросы  приводили к тому, что ноды внезапно начинали уходить в частую сборку мусора и просто переставали обрабатывать запросы на несколько минут. Собственно, IN по partition ключам — очень не рекомендуемая практика и известный антипеттерн.

Batch statements

Второй из вариантов — это использование Batch statements, опять же в попытке уменьшить количество запросов от нашего кода  к базе данных:

public void linkBugToTests(UUID bugId, Collection<UUID> testIds, String user) {
    
    BatchStatement batchStatement = new BatchStatement();
    for (UUID testId : testIds) {
        batchStatement.add(new SimpleStatement("INSERT INTO bugs_by_tests (test_id, bug_id) VALUES (?,?,?);",
                testId, bugId, user));
    }

    session.execute(batchStatement);
}

Как и в случае с IN, количество запросов от нашего кода к базе данных уменьшилось до одного, но, точно также, теперь мы создали проблемы для координатора и получили следующие проблемы:

  1. Максимальное количество запросов внутри одного батча не более 65535
  2. Вся нагрузка на выполнение запросов внутри батча ложиться на координатор, что  имеет точно такие же последствия как и с оператором IN: запрос большого количества узлов, перегрузка координатора, больше сборки мусора, больше TimeoutExceptions.

Использование батчей для оптимизации производительности также является антипаттерном, потому как батчи предназначены для того чтобы обеспечивать целостность данных в таблицах, а не для оптимизации количества взаимодействий между клиентом и базой данных.

Asynchronous driver API

В нашем случае самый оптимальный вариант для того чтобы обеспечить высокую производительность клиентского кода  — это использование асинхронного API драйвера, которое, собственно, и предназначено для таких случаев и представлено методом executeAsync. Перепишем код линковки следующим образом используем немного Guava:

public void linkBugToTests(UUID bugId, Collection<UUID> testIds, String user) {

    List<ResultSetFuture> futures = new ArrayList<>(testIds.size());
    for (UUID testId : testIds) {
        futures.add(session.executeAsync("INSERT INTO bugs_by_tests (test_id, bug_id) VALUES (?,?,?);",
                testId, bugId, user));
    }
    try {
        Futures.successfulAsList(futures).get();
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}

Метод executeAsync выполняет запрос асинхронно и возвращает ResultSetFuture, при этом наш код не ждет пока драйвер получит ответ от базы, а продолжает выполнение.

Когда же нам понадобится дождаться выполнения запроса мы можем воспользоваться методом ResultSetFuture.getUninterruptibly(), чтоб дождаться ответа от базы и получить результат. В данном же коде  используем Futures.successfulAsList из библиотеки Guava, чтобы дождаться выполнения всех запросов за раз и затем выйти из метода.

Что в результате?

Во первых, теперь этот метод будет выполнятся намного быстрее чем первый вариант, благодаря тому, что каждый следующий запрос не ждет завершения предыдущего. Для коллекции из сотни или тысячи элементов это дает колоссальный прирост скорости работы метода.

Во-вторых, WriteTimeoutException для одного запроса никак не влияет на выполнение других запросов, в итоге, даже при наличии таких исключений, вероятность того, что наши данные будут полностью и правильно записаны очень и очень высока.

«Небольшое отступление по поводу WriteTimeoutException. Это исключение  вовсе не означает, что данные не были сохранены, а говорит лишь о том, что координатор не дождался ответа от реплик, и даже в этом случае координатор запишет hint, для того чтобы повторить попытку записи. Так что операция записи в конечном итоге будет выполнена.»

В-третьих, не перегружаем координатор, так как в этом случае координатор обрабатывает один ключ за один запрос клиента. Более того, благодаря TokenAwarePolicy драйвер сразу выбирает координатором один из узлов-реплик с данными соответствующими ключу.

Заключение

Иногда бывает необходимо выполнить большое количество однотипных запросов, использование классических блокирующих запросов может существенно сказаться на производительности приложения, а также стать причиной несогласованности данных или просто их неполного обновления.

Использование Batch statements и запросов с IN может улучшить производительность, но создать проблемы с доступностью узлов кластера, и в большинстве случаев является плохой идеей.

Наиболее подходящим способом повышения производительности и стабильности является использование асинхронного API драйвера, которое позволяет сократить время обработки большого количества данных при этом не обрушивая всю нагрузку на один единственный узел Cassandra кластера.

Полезные ссылки по теме


Комментарии: