воскресенье, 14 июня 2015 г.

Организация асинхронного ввода/вывода на основе boost::asio и boost::context

Введение


Данная статья является результатом исследования на тему, которую можно сформулировать следующим образом: "Повышение читаемости, простоты поддержки и стабильности асинхронного кода при одновременном сохранении его производительности на прежнем уровне". К результатам исследования также относится библиотека, исходные коды которой можно найти на GitHub.

Предполагается, что читатель знаком с асинхронным программированием, C++ и библиотекой boost.

Проблемы асинхронного кода


Рассмотрим, каким образом выглядит асинхронный код при использовании библиотеки boost::asio. Здесь и дальше я буду использовать один и тот же пример - чтение данных из сокета:

socket.async_read_some(boost::asio::buffer(data, size),
  boost::bind(&MyClass::readHandler1, this,
    boost::asio::placeholders::error,
    boost::asio::placeholders::bytes_transferred));

// somewhere far from here ....

void MyClass::readHandler1(
  const boost::system::error_code& error, 
  size_t bytesTransferred)
{
  if (error) {
    // handle error
  }
  // handle result
}

Метод async_read_some принимает два параметра - буфер, куда нужно записать данные, и функцию-обработчик (далее - обработчик), который нужно вызвать после завершения операции чтения. async_read_some возвращает управление сразу же после инициализации асинхронной операции, не дожидаясь её завершения.

Приведенный код имеет целый ряд недостатков. Они все взаимосвязаны между собой, но, тем не менее, я постараюсь их классифицировать.

Объемный код

Несмотря на то, что этот код выполняет простейшую операцию, он имеет очень большой объём. Большая часть кода ничего не значит и написана исключительно для компилятора. Это значительно затрудняет понимание кода и увеличивает время, необходимое на его написание и сопровождение.

Разрозненный код

Этот код разделен на две части (вызов async_read_some и обработчик), которые могут располагаться далеко друг от друга (и с ростом объёма кодовой базы эта вероятность возрастает). Поскольку код разнесен в пространстве, то ориентироваться в нем крайне сложно. Зачастую приходится перемещаться на сотни строк в пределах одного файла и между файлами для того, чтобы проследить нить выполнения кода.

Необходимость именования обработчиков

Для каждой асинхронной операции необходимо создавать и, соответственно, именовать отдельный обработчик. Поэтому программист вынужден придумывать названия функций и методов, которые, с одной стороны, достаточно точно описывали бы контекст использования обработчика, и, с другой стороны, были бы достаточно короткими. 
В качестве обработчиков возможно использовать лямбда выражения, что решает проблему именования Но, решая одни проблемы, лямбда выражения привносят другие. Например - рост кода в ширину:

socket.async_read_some(boost::asio::buffer(data, size), 
  [=](const boost::system::error_code& error, 
    size_t bytesTransferred)
  {
    ....
    socket.async_read_some(boost::asio::buffer(data, size), 
      [=](const boost::system::error_code& error, 
        size_t bytesTransferred)
      {
        // and so on
      }
    );
  }
);

Обработка ошибок через коды ошибок

Первый параметр любого обработчика - это error_code. error_code состоит из кода ошибки (например, EOF или operation_canceled) и категории ошибки (например, системная ошибка или ошибка Вашей программы). На этом - всё. Никакой дополнительной информации получить из этого объекта нельзя. Такой подход к обработке ошибок является примитивным по сравнению с мощным механизмом исключений, который имеется в C++. Более того, обработка ошибок через коды ошибок не позволяет использовать RAII для простого управления ресурсами программ.

Отсутствие контекста выполнения

Поскольку между вызовами обработчиков стек практически полностью очищается, то программист вынужден хранить контекст выполнения где-то в куче и передавать его из обработчика в обработчик. Что, согласитесь, весьма неудобно по сравнению с хранением контекста выполнения в стеке. И куда менее безопасно. Легко можно получить утечку памяти или двойное удаление.

Сложность отладки

Разрозненный код и отсутствие контекста выполнения создают проблемы не только в процессе написания программы, но и при её отладке. Опять же, ориентироваться в коде весьма трудно, особенно, если отладчик консольный.

Корутины


Перед тем, как решать перечисленные проблемы нужно немного освоится с корутинами. Для тех, кто с ними знаком - можно переходить к следующему разделу. 

Треды (threads) предоставляют программисту вытесняющую многозадачность. Раз в N миллисекунд происходит аппаратное прерывание, процессор переходит в нулевое кольцо, операционная система решает, какой тред будет выполнятся следующим, и возвращает процессор в третье кольцо. При этом состояние старого треда (значение регистров процессора) сохраняется в RAM, а состояние нового треда загружется в CPU.

Почти то же самое программист может сделать без помощи ОС - создать несколько "тредов" и переключатся между ними. Для того, чтобы создать и запустить псевдотред, нужно выполнить следующие действия:
  1. Выделить область памяти - стек для нового псевдотреда.
  2. Заменить одни значения регистров процессора на другие. В частности, в регистр Stack Pointer поместить адрес свежесозданного стека, а в регистр Instruction Pointer - адрес следующей инструкции для выполнения процессором.

Такой псевдопоток будем называть корутной. В чем состоит отличие корутин от тредов?

  • Корутины предоставляют программисту коопреативную многозадачность. Программист сам решает, когда переключать одну корутину на другую.
  • Переключение корутин осуществляется значительно быстрее переключения тредов, поскольку при этом не происходит переключения режима работы процессора.
  • Треды могут работать действительно параллельно - выполняясь на разных ядрах процессора. Корутины же работают в пределах своего треда.

Таким образом, тред можно представить как некую абстракцию ОС, которая служит для предоставления программе процессорного времени. Тред поставляется с корутиной по умолчанию (состояние регистров + стек). При необходимости программист может создать дополнительные корутины и переключатся между ними.
 
Для работы с корутинами я буду использовать следующий класс, основанный на boost::context. 

class Coro {
public:
  static Coro* current();

  Coro(std::function<void()> routine);

  void resume();
  void yield();

  ....
};

Метод resume производит вход в корутину (меняет текущий стек на стек корутины). Этот метод должен вызываться только извне корутины. 
Метод yield производит выход из корутины (меняет текущий стек на стек, из которого был произведен вход в корутину). Этот метод должен вызываться только внутри корутины.  

Классический "hello world" для корутин:

Coro coro([] {
  printf("2");
  Coro::current()->yield();
  printf("4");
});

printf("1");
coro.resume();
printf("3");
coro.resume();
printf("5");

Вывод программы будет "12345". При завершении тела корутины метод yield вызывается автоматически.
  

А зачем они нужны?


Идея заключается в следующем. Бизнес-логику приложения будем писать внутри корутин. За бортом останется только boost::asio, работающий в стеке треда. Когда потребуется выполнить асинхронную операцию - мы выйдем из корутины, дождёмся завершения операции и войдём обратно, продолжив с того места, на котором остановились. Следующий пример демонстрирует, как это можно провернуть:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
boost::asio::io_service ioService;

void DoSomeAsyncOperation() {
  auto coro = Coro::current();
  ioService.post([=] {
    coro->resume(); 
  });
  coro->yield();
}

void main() {
  Coro coro([&] {
    printf("please wait .... ");
    DoSomeAsyncOperation();
    printf("done\n");
  });

  ioService.post([&] {
    coro.resume();
  })

  ioService.run();
}

Что здесь происходит?

  • Выполняется ioService.post (строка 18). Метод io_service::post планирует выполнение функции, но НЕ выполняет её. Функция будет выполнена потом, т.е. асинхронно.
  • Выполняется ioService.run. Метод io_service::run - это по сути цикл, в котором ожидается завершение асинхронных операций и вызываются их обработчики. В данном случае - выполняется запланированная функция.
  • Выполняется coro.resume() (строка 19). Происходит вход в корутину, прыгаем на строку 13.
  • Выполняется ioService.post (строка 5).
  • Вызываеся coro->yield (строка 8). Происходит выход из корутины, прыгаем обратно на строку 19.
  • ioService.run вызывает следующую запланированную функцию.
  • Выполняется coro->resume (уже строка 6). Происходит вход в корутину, прыгаем на строку 8.
  • Корутина завершается, автоматически вызывается coro.yield. Происходит выход из корутины, прыгаем на строку 6.
  • ioService.run завершается, поскольку работы больше не осталось.

Заметьте, что  внешне функция DoSomeAsyncOperation выглядит синхронной, хотя внутри выполняется асинхронная операция.

Асинхронный сокет


Создадим класс асинхронного сокета, который будет иметь следующий интерфейс:

template <typename Handle>
class Socket {
public:
  template <typename Buffer>
  size_t writeSome(const Buffer& buffer);
  template <typename Buffer>
  size_t readSome(const Buffer& buffer);

  ....
};

И следующую реализацию:

template <typename Handle, typename Buffer>
size_t Socket<Handle>::readSome(const Buffer& buffer)
{
  error_code errorCode;
  size_t bytesTransferred;
  auto coro = Coro::current();

  _handle.async_read_some(buffer,
    [&](const error_code& e, size_t b) {
      errorCode = e;
      bytesTransferred = b;
      coro->resume();
    }
  );

  coro->yield();

  if (errorCode) {
    throw system_error(errorCode);
  }

  return bytesTransferred;
}

Все очень просто - мы инициализируем операцию чтения из сокета и выходим из корутины. Внутри обработчика копируем его аргументы из стека треда в стек корутины. И затем входим в корутину обратно. Ловкость рук и никакого мошенничества.

Внешне такой асинхронный сокет ничем не отличается от синхронного:

try {
  socket.readSome(buffer);
}
catch (const std::exception& error) {
  ....
}

Асинхронное использование STL алгоритмов, boost::spirit, etc


STL алгоритмы можно применять к тем данным, которые ещё не прочитаны из сокета. Этот трюк возможен как при синхронном, так и при асинхронном вводе/выводе. Идея заключается в том, чтобы спрятать чтение данных внутри итератора:

template <typename Socket, typename Buffer>
class SocketIterator {
  ....
private:
  typename Buffer::reference dereference() const {
    while (_offset >= _buffer->size()) {
      Buffer chunk(chunkSize);
      auto bytesTransfered = _socket->readSome(
        boost::asio::buffer(&chunk[0], chunk.size()))
      );
      chunk.resize(bytesTransfered);
      _buffer->insert(_buffer->end(), chunk.begin(), chunk.end());
    }
    return (*_buffer)[_offset];
  }

  Socket* _socket;
  Buffer* _buffer;
  size_t _offset;
};

Пока итератор перемещается - изменяется только переменная _offset. Как только итератор разыменовывается - недостающие данные дочитываются в конец буфера. А вот пример использования:

TcpSocket socket = ....
std::string buffer;
StreamIterator<TcpSocket, std::string> begin(socket, buffer)
  , end;
auto it = std::find(begin, end, '\n');

Потоковый итератор имеет особое значение при использовании boost::spirit. Дело в том, что функция boost::spirit::qi::parse имеет весьма неприятный недостаток. Она не делает различия между ситуацией, когда входную последовательность разобрать не удалось (невалидные данные), и ситуацией, когда входная последовательность слишком короткая (нужно больше данных). При использовании потокового итератора второй ситуации никогда не возникнет: либо данные будут получены, либо полетит исключение.

Отмена асинхронных операций


С вероятностью 100% в любой более-менее сложной программе потребуется отмена операций. Рассмотрим такой пример: одна корутина выполняет длительную (а лучше - бесконечную) операцию, а вторая ждет сигнала Ctrl+C, чтобы прикончить первую:

Coro someHugeTask([] {
  Acceptor acceptor(tcp::endpoint(tcp::v4(), 8080));
  while (true) {
    auto socket = acceptor.accept();
    ....
  }
});

Coro waitForCtrlC([] {
  SignalSet signals(SIGINT);
  signals.wait();
  // need to kill someHugeTask - but how?
});

Здесь Acceptor - это простая обёртка над boost::asio::ip::tcp::acceptor, a SignalSet - над boost::asio::signal_set.

Первая мысль, которая приходит в голову - использовать метод boost::asio::ip::tcp::acceptor::cancel. Но это плохой вариант по двум причинам. Во-первых, если Acceptor НЕ находится в состоянии accept, то cancel ничего и не сделает. И следующий accept пройдёт успешно. Во-вторых, давайте внимательно почитаем документацию к методу cancel:

This function causes all outstanding asynchronous connect, send and receive operations to finish immediately, and the handlers for cancelled operations will be passed the boost::asio::error::operation_aborted error.

Обратите внимание на слово "outstanding". Это означает, что, если операция была завершена, но обработчик ещё не был вызван, то cancel, опять же, ни к чему не приведёт. Будте осторожны и не попадайтесь в ловушки :)

Конечно, всегда можно влепить в Acceptor флаг, который будет запрещать инициализацию новых асинхронных операций. Но есть способ получше.

Метод Coro::resume(Exception exception)


Попробуем взглянуть на проблему под другим углом. На самом деле в данной ситуации не важно, какую операцию отменять, всё что требуется - это уничтожить someHugeTask целиком. Просто удалить объект корутины нельзя. Потому что, в этом случае, для тех объектов, которые находятся в стеке, не будут вызваны деструторы. А значит, не будут корректно освобождены ресурсы программы.

Модифицируем класс Coro:

class Coro {
public:
  ....

  template <typename Exception>
  void resume(Exception exception) {
    _exception = std::make_exception_ptr(exception);
    resume();
  }

  void yield() {
    ....
    if (_exception) {
      auto exception = _exception;
      _exception = nullptr;
      std::rethrow_exception(exception);
    }
  }

private:
  std::exception_ptr _exception = nullptr;
};

Это небольшое изменение позволяет бросать исключения внутрь корутины. Внутри корутины исключение вылетит из метода yield, но из какого именно вызова yield - неизвестно. Да и неважно с какого места раскрутка стека начнётся, важно, на каком месте она закончится.  А закончится она должна на самом дне стека. Теперь можно отменить корутину следующим образом:

struct CancelError {};

Coro waitForCtrlC([&] {
  SignalSet signals(SIGINT);
  signals.wait();
  someHugeTask.resume(CancelError());
});

Я намеренно не использовал std::exception или производный от него класс исключения. Вместо этого я использую отдельный класс CancelError. Его можно поймать только двумя способами:
  1. С помощью конструкции catch (...) - что является дурным тоном
  2. С помощью конструкции catch (const CancelError& error) - в этом случае, очевидно, программист знает, что он делает.
Таким образом, CancelError (наверняка) раскрутит стек полностью.

Теперь необходимо поправить класс Socket для того, чтобы он корректно обрабатывал исключения, брошенные внутрь корутины:

template <typename Handle, typename Buffer>
size_t Socket<Handle>::readSome(const Buffer& buffer)
{
  bool handleCanceled = false;
  error_code errorCode;
  size_t bytesTransferred;
  auto coro = Coro::current();

  _handle.async_read_some(buffer,
    [&](const error_code& e, size_t b) {
      if (handleCanceled) {
        return;
      }
      errorCode = e;
      bytesTransferred = b;
      coro->resume();
    }
  );

  try {
    coro->yield();
  }
  catch (...) {
    handleCanceled = true;
    _handle.cancel();
    throw;
  }

  if (errorCode) {
    throw system_error(errorCode);
  }

  return bytesTransferred;
}

Я добавил отмену запланированной операции с помощью cancel. Там, где есть cancel, там есть и подвох. Конечно, этот код некорректный. Вспоминаем ситуацию: операция уже завершилась, но обработчик ещё не был вызван. В этом случае cancel ни к чему не приведёт, и обработчик вызовется, когда стек (а, возможно, и сама корутина) будет разрушен исключением. В итоге обработчик попортит память.

Метод Coro::yieldNoThrow


Есть два варианта решения этой проблемы.

Вариант первый - хранить временные переменные в куче с помощью shared_ptr. Решение жизнеспособное, но слишком ресурсозатратное. На каждое обращение к сокету выделять память и использовать атомарные операции (всё же они не бесплатные) - непозволительная роскошь.

Вариант второй - не допустить изменений в стеке до тех пор, пока обработчик не будет вызван. Как? Можно опять выйти из корутины. Это предотвратит разрушение стека, и обработчик сможет отработать корректно. Но здесь есть загвоздка. Даже две.

Загвоздка первая: нельзя в блоке catch вызывать методы yield и resume. Это досадное ограничение boost::context. Но оно довольно просто обходится с помощью следующей уродливой конструкции:

std::exception_ptr exception;
try {
  throw "catch me";
}
catch (...) {
  exception = std::curret_exception();
}
if (exception) {
  // здесь можно вызывать yield/resume
}

Раз речь зашла об ограничениях, то я упомяну и о совершенно отвратительном баге компилятора Visual C++, на который я наткнулся во время отладки. Суть бага: вызов std::current_exception из вложенного блока catch всегда вернет нулевой указатель:

try {
  throw 1;
}
catch (...) {
  try {
    throw 2;
  }
  catch (...) {
    assert(std::curret_exception() == nullptr);
  }
}

В блоке catch. Функция std::current_exception возвращает нулевой указатель. Всегда. И мягкотелые не собираются это исправлять. Ну, в общем, Вы поняли к чему я клоню. Сохраните свои нервы, не пользуйтесь проприетарным ПО.

Идем дальше.

Загвоздка вторая - а если во время ожидания вызова обработчика в корутину будет брошено ещё одно или несколько исключений? Слудующее решение сначала может показаться безумным, но давайте рассмотрим его поближе:

std::deque<std::exception_ptr> exceptions;
while (true) {
  try {
    coro->yield();
    break;
  }
  catch (...) {
    exceptions.push_back(std::curret_exception());
  }
}
coro->throwExceptionsLater(exceptions);

Да, да, просто берем и аккуратно складываем исключения в стопочку. А обработаем их когда-нибудь потом. 

Зададимся вопросом - не противоречит ли это ожиданиям программиста, который бросил исключение? Вероятно, он ожидает, что исключение будет брошено, поймано и обработано немедленно. Из своего опыта могу с уверенностью утверждать, что нет, не противоречит. Ведь программист не может делать каких-либо предположений о том, ГДЕ в корутине вылетит исключение. На фоне этого делать какие-либо предположения о том, КОГДА оно вылетит - бессмысленно.

Для того, чтобы избежать лишних манипуляций с исключениями и лишних входов/выходов из корутин, добавим метод yieldNoThrow:

class Coro {
public:
  ....

  template <typename Exception>
  void resume(Exception exception) {
    _exceptions.push_back(std::make_exception_ptr(exception));
    if (!_yieldNoThrowMode) {
      resume();
    }
  }

  void yieldNoThrow() {
    _yieldNoThrowMode = true;
    yield();
    _yieldNoThrowMode = false;
  }

  void yield() {
    ....
    if (_exceptions.size()) {
      auto exception = _exceptions.front();
      _exceptions.pop_front();
      std::rethrow_exception(exception);
    }
  }

private:
  std::deque<std::exception_ptr> _exceptions;
  bool _yieldNoThrowMode = false;
};

После вызова yieldNoThrow войти в корутину обратно можно только с помощью resume, который без аргументов. Теперь модифицируем сокет:

template <typename Handle, typename Buffer>
size_t Socket::readSome(const Buffer& buffer)
{
  error_code errorCode;
  size_t bytesTransferred;
  auto coro = Coro::current();

  _handle.async_read_some(buffer,
    [&](const error_code& e, size_t b) {
      errorCode = e;
      bytesTransferred = b;
      coro->resume();
    }
  );

  try {
    coro->yield();
  }
  catch (...) {
    _handle.cancel();
    coro->yieldNoThrow();
    throw;
  }

  if (errorCode) {
    throw system_error(errorCode);
  }

  return bytesTransferred;
}

Фуф, почти всё. Остался ещё один скользкий момент и сокет будет готов к использованию.

Асинхронный мьюткес


Есть у boost::asio одна интересная особенность. Вне инициализируемые операции инициализируются сразу же. Т.е., если сделать два вызова async_write_some (или async_read_some) подряд на одном и том же сокете, то неизвестно, какая из операций выполнится первой. Другими словами, если необходимо отправить несколько массивов данных, то порядок действий должен быть таким:
  • инициализируем операцию записи
  • дожидаемся вызова обработчика
  • инициализируем следующую операцию записи
  • дожидаемся вызова обработчика
  • ....
В противном случае результат будет не определен. Для udp сокета это не критично, но для tcp сокета это фатально - отправляемые данные перемешаются.

Итак, возьмём в качестве примера наш класс многострадального сокета и его метод readSome. Необходимо, прежде чем вызывать async_read_some, дождаться обработчика предыдущей операции. Это равнозначно тому, запускать корутины внутрь readSome строго по одному. Т.е. на входе в readSome нужно поставить мьютекс. Асинхронный мьютекс:

class Mutex {
public:
  void lock() {
    if (_isLocked) {
      Finally cleanup([&] {
        _coros.remove(Coro::current());
      });
      _coros.push_back(Coro::current());
      _coros.back()->yield();
    }
    _isLocked = true;
  }

  void unlock() {
    _isLocked = false;
    if (!_coros.empty()) {
      _coros.front()->resume();
    }
  }

private:
  bool _isLocked = false;
  std::list<Coro*> _coros;
};

По сути корутины выстраиваются в очередь. Сами себя добавляют и сами себя удаляют от туда. А когда корутина освобождает мьютекс, то она будит ту корутину, которая первая стоит в очереди.

Класс Finally просто вызывает в деструкторе функцию, переданную ему в конструкторе. Он нужен для корректной обработки исключений: если из yield вылетит исключение, то корутина сама себя удалит из очереди.

Стек вызовов корутин


Обратите внимание: в мьютексе осуществляется прямой переход из одной корутины в другую (в строке _coros.front()->resume()). А затем обратно (поскольку yield переходит в тот стек, из которого был вызван resume). Таким образом получается стек вызовов корутин.

Может возникнуть ситуация, когда корутина №1 входит в корутину №2. А корутина №2 пытается войти в корутину №1. Вообще - это корректная ситуация (почему бы и нет), но тут нужен другой дизайн корутин и другая концепция их использования. Та реализация корутин, о которой идет речь, не допускает их циклического вызова.

Избежать циклического вызова корутин очень просто: нужно отложить переход в корутину с помощью io_service::post 

Coro* coro = ....;

ioService.post([=] {
  coro->resume();
});

Асинхронная очередь


Вполне вероятно, что канал связи медленный и корутина не может ждать, пока данные отправятся или примутся из сокета. В этом случае можно создать отдельную корутину, которя будет заниматься отправкой (или приёмом) данных. И передавать данные между корутинами с помощью очереди. Асинхронной очереди:

template <typename T>
class Queue {
public:
  T pop() {
    if (_data.empty()) {
      Finally cleanup([&] {
        _coros.remove(Coro::current());
      });
      _coros.push_back(Coro::current());
      _coros.back()->yield();
    }

    T t = std::move(_data.front());
    _data.pop();
    return t;
  }

  template <typename U>
  void push(U&& u) {
    _data.push(std::forward<U>(u));

    if (!_coros.empty()) {
      _coros.front()->resume();
    }
  }

private:
  std::queue<T> _data;
  std::list<Coro*> _coros;
};

Использовать очередь можно так:

Coro writer([=] {
  while (true) {
    socket.write(queue.pop());
  }
});

Асинхронные таймауты


Обычно таймауты делают по остаточному принципу, поскольку они бывают сложны в реализации. Но с этим классом Вы будете втыкать таймауты повсюду, потому что пользоваться им одно удовольствие:

class Timeout {
public:
  template <typename Duration>
  Timeout(Duration duration): _timer(*IoService::current()) {
    _timer.expires_from_now(duration);
    _timer.async_wait([=](const error_code& errorCode) {
      _callbackExecuted = true;
      if (_timerCanceled) {
        return _coro->resume();
      }
      if (errorCode) {
        return _coro->resume(system_error(errorCode));
      }
      _coro->resume(TimeoutError());
    });
  }

  ~Timeout() {
    if (!_callbackExecuted) {
      _timerCanceled = true;
      _timer.cancel();
      _coro->yieldNoThrow();
    }
  }

private:
  boost::asio::steady_timer _timer;
  Coro* _coro = Coro::current();
  bool _timerCanceled = false, _callbackExecuted = false;
};

Таймер включается в конструторе и отключается в деструкторе. Соответственно, если объект этого класса живёт дольше времени, переданного как параметр конструтора, то в текущую корутину (корутину, в которой был создан timeout) бросается исключение.

Пример использования:

try {
  Timeout outer(15s);
  for (auto i = 0; i < 10; ++i)
  {
    Timeout inner(3s);
    socket.read(buffer);
    socket.write(buffer);
  }
}
catch (const TimeoutError& error) {

}

Иерархическое управление корутинами


Для управления корутинами я использую класс CoroPool. Он имеет очень простой интерфейс:

class CoroPool {
public:
  ....
  Coro* exec(std::function<void()> routine);
  void waitAll();
  void killAll();
};

Метод exec создаёт и запускает новую корутину. Метод waitAll ждет завершения и уничтожения (удаления объекта класса Coro) всех корутин. Метод killAll бросает во все корутины пула исключение CancelError и вызывает waitAll. Метод killAll вызывается автоматичеки в деструкторе пула.

Поскольку killAll вызывается в деструкторе пула, то корутина не может завершится, пока не завершатся все её дочерние корутины (я создаю пулы только в стеке). Таким образом программа представляет собой дерево корутин. Можно сказать - дерево стеков, каждый из которых выполняется, и в каждом из которых может возникнуть исключение.

Что касается исключений: если корутина завершилась, так и не обработав какое-то исключение, то это исключение бросается в родительскую корутину (кроме CancelError). Чтобы продемонстрировать, насколько это удобно, рассмотрим следующий пример. 

Представим, что мы пишем, ни много ни мало, HTTP2 сервер. Я выбрал в качестве примера его потому, что в нем четко прослеживается иерархическая структура. Сервер состоит из соединений. Соединение состоит из двух процессов: один на запись фреймов в сокет, второй - на чтение. Процесс на чтение ко всему прочему имеет N дочерних процессов, по одному на HTTP2 поток (поскольку HTTP2 поддерживает мультиплексирование).

Вот так это выглядит в коде:

void main() {
  Acceptor acceptor(endpoint);
  CoroPool coroPool;
  while (true) {
    auto socket = acceptor.accept();
    coroPool.exec([&] {
      try {
        HandleConnection(std::move(socket));
      }
      catch (const std::exception& error) {
        ....
      }
    });
  }
}

void HandleConnection(Http2Connection connection) {
  connection.doHandshake();
  CoroPool coroPool;
  coroPool.exec([&] { ReadLoop(connection) });
  coroPool.exec([&] { WriteLoop(connection) });
  Coro::current()->yield();
});

void ReadLoop(Http2Connection& connection) {
  CoroPool coroPool;
  while (true) {
    auto stream = connection.accept();
    coroPool.exec([&] {
      HandleStream(std::move(stream));
    });
  }
}

void HandleStream(Http2Stream stream) {
  stream.prepare();
  // do something with stream
}

Представим, что в каком-то потоке какого-то соединения, внезапно, произошла ошибка. Ошибка ввода/вывода, логическая ошибка - неважно. В любом случае полетит исключение. Оно раскрутит стек HandleStream и освободит все ресурсы потока. 

Если это исключение никак не будет обработано, то оно благополучно перейдёт в родительскую корутину.  

Начнет раскручиватся стек ReadLoop. Вызовется деструктор coroPool. Раскрутятся стеки всех дочерних корутин.

Потом исключение продолжит подниматься ....

К чему я веду? Я хочу обратить внимание на то, что в этом коде нигде явно не присутствует алгоритм тех действий, которые нужно предпринять при возникновении чрезвычайной ситуации. Мы получили его совершенно бесплатно пока писали бизнес логику. Ошибка, возникшая в ЛЮБОМ месте, в ЛЮБОЙ момент времени приведет к корректному освобождению ресурсов, к корректной последовательности действий.

Сравним этот подход с подходом, который используется в асинхронном коде.

При возникновении ошибки в асинхронном коде нельзя двигаться назад по стеку. Потому что стек уже почти пуст. Можно двигаться только вперёд. Т.е. вызывать обработчик ошибки.

Какой именно? Тот обработчик, который специально написан для ошибок, возникающих в данном конкретном месте. Почти на каждую ошибку - свой рукописный обработчик.

Этот обработчик должен проанализировать текущий контекст выполнения и, возможно (!), освободить какие-то ресурсы. А возможно ресурсы уже освобождены.

И самое неприятное - это передача ошибок наверх. Для этого в низкоуровневом коде создаются сигналы, которые соединяются со слотами более высокоуровневого кода. И их нужно соединять и разъединять вручную.

При таком подходе, закодировать полностью корректный алгоритм обработки ошибок практически невозможно .

Сравнение с synca


Многие идеи были взяты из библиотеки, разработанной в недрах компании Яндекс, ровно как и многие из этих идей были отвергнуты. Исходный код можно найти на BitBucket, подробное описание - на Habrahabr и YouTube.

Synca не использует иерархической организации корутин. Корутины создаются и выполняются где-то там. И сами себя удаляют. Соответственно, исключения не покидают пределов своей родной корутины.

Synca позволяет корутинам работать на пуле потоков. Задача по выполнению корутин равномерно распределяется по всем потокам (т.е. поток хватает первую попавшуюся корутину и выполняет её, пока та не сделает yield, потом хватает следующую без разбора и т.д.). Подход имеет свои плюсы и минусы.

С одной стороны - удобно, программа распараллеливается автоматически. Собственно, из плюсов всё.

С другой стороны - каждую секунду, каждую строчку нужно быть готовым, чтобы получить кинжал в спину со стороны многопоточности. Вы никогда не напишете простой и изящный класс для асинхронной очереди или мьютекса. Многопоточные варианты будут напичканы средствами синхронизации или lock-free уловками. И, будучи покрыт тестами, такой код все равно когда-нибудь да выстрелит ошибкой, которую никто никогда не найдет. Оно Вам надо?

Не лучше ли распределять работу между потоками на более высоком уровне, на уровне бизнес логики. Это позволит локализовать те места, где потоки взаимодействуют между собой. Меньше проблемных мест - меньше ошибок и меньше синхронизации, а значит больше производительность.

Ну и самое главное. Код Synca, несмотря на свой маленький объем, крайне запутан. Вникнуть в то, что происходит внутри библиотеки совершенно невозможно. Может быть, автору стоит потратить время на глубокий рефакторинг библиотеки.

Заключение


После перевода большого проекта на корутины я могу отметить следующее.

Код стал понятнее и короче. Отсутствие callback'ов крайне положительно сказывается на читаемости кода. Ошибки, возникающие время от времени, связанные со скользкими местами boost::asio исчезли полностью. Аварийный код стал полностью предсказуемым. Я могу с уверенностью сказать что и в какой последовательности будет выполняться при возникновении ошибки в любом месте. И ещё одно: время, необходимое на внедрение фич и исправление багов уменьшилось в разы. 

Из недостатков отмечу высокий порог вхождения. Программисты с неохотой переходят на сложные технологии. Я говорю даже не о корутинах, а об асинхронном программировании. Ведь куда проще наплодить потоков и наставить мьютексов.

Может быть со временем, я допишу сюда ещё что-нибудь.

Спасибо, что дочитали :)

4 комментария:

  1. Если я правильно понял, этот код допускает только единственный io_service, причем внутри него возможен единственный работающий поток.

    В Synca у меня, при этом, возможно создание нескольких пулов потоков с несколькими потоками в каждом пуле, что позволяет это использовать для большего класса задач: вычислительных, работа с блокирующими примитивами, разделение части задач на разные тредпулы с целью, например, чтобы вычислительная часть не интерферировала с сетевой частью.

    Как следствие, возможно использование порталов и телепортаций, что в случае одного потока делать по меньшей мере бессмысленно.

    Вообще интересный лаконичный подход, но не масштабируемый. Если немного усложнить, добавив потоки, то результат получится уже не такой простой.

    ОтветитьУдалить
    Ответы
    1. Вы слишком категоричны в утверждениях. Что значит "не масштабируемый"? Возьмём, к примеру, nodejs. Абсолютно немасштабируемый внутри, но зато прекрасно масштабируемый снаружи. Так же и здесь - многопоточность можно добавить на более высоком уровне.

      Для меня была гораздо важнее обработка ошибок. Вы обратили внимание на раздел "Иерархическое управление корутинами"? Я могу бросить исключение в любом месте программы. Путь этого исключения строго определен - вверх по дереву корутин. Либо оно будет явно обработано, либо программа аккуратно сложится как карточный домик. И эта фича для программиста бесплатна.

      Удалить
    2. Хорошо, скажу по-другому. При использовании пулов потоков возможно использование всех ресурсов процессора, если это необходимо. Таким образом, решение получается более масштабируемым.

      В вашем случае не очень понятно, как задействовать все ядра процессора при необходимости. Также непонятно, что делать, если прикодится обращаться к блокирующим вызовам, например, при работе с диском или специфическим API, допускающим только синхронную обработку.

      Про дерево корутин я, конечно же, прочитал. Но пока не очень понятно, где это могло бы пригодиться. Опять же, сделать это не представляет особого труда.

      В целом я заметил, что часть вещей взята из synca практически без изменений, а часть - существенно доработана или изменена/улучшена. Что, в общем, не может не радовать. Всегда интересно посмотреть на альтернативный и конструктивный подход.

      Удалить
  2. youtube.com and youtube.com Videos | Videodl.cc
    YouTube's videos are as follows: Videos of the latest youtube to mp3 fight taking place in Las Vegas, Nevada, at Wynn Las Vegas,

    ОтветитьУдалить