Наши партнеры

UnixForum





Библиотека сайта rus-linux.net

Riak и Erlang/OTP

Глава 15 из книги "Архитектура приложений с открытым исходным кодом", том 1.

Оригинал: "Riak and Erlang/OTP", глава из книги "The Architecture of Open Source Applications"
Авторы: Francesco Cesarini, Andy Gross and Justin Sheehy
Перевод: Н.Ромоданов

Creative Commons: Перевод был сделан в соответствие с лицензией Creative Commons. С русским вариантом лицензии можно ознакомиться здесь.

15.3. Поведения OTP

Основной костях команды разработчиков Riak разбросан почти по десятку различных географических точек. Без очень жесткой координации и шаблонов, по которым должна строиться работа, результат мог бы представлять собой разнообразные клиент/серверные реализации, в которых не обрабатывались специальные пограничные случаи и которые бы не справились с ошибками, касающихся параллельной обработки. Вероятно, нет единого способа работы с клиент/серверными сбоями, и нет гарантии, что ответ на запрос, действительно является ответом, а не просто некоторым сообщением, не противоречащим внутреннему протоколу передачи сообщений.

OTP представляет собой набор библиотек Erlang и принципов проектирования, предлагаемых в виде набора готовых к применению инструментальных средств, предназначенных для разработки надежных систем. Многие из этих образцов и библиотек предоставлены в виде «поведений» («behaviors»).

Поведения OTP решают эти вопросы путем предоставления библиотечных модулей, реализующих наиболее распространенные шаблонов параллельного проектирования. В самой библиотеке в фоновом режиме реализуется, причем так, что программист об этом может не знать, постоянная обработка ошибок и специальных случаев. В результате, поведения OTP предоставляют набор стандартных блоков, используемых при проектировании и создании систем промышленного класса.

15.3.1. Введение

Поведения OTP представлены в виде библиотечных модулей в приложении stdlib, которое поставляется как часть дистрибутива Erlang/OTP. Конкретный код, написанный программистом, помещается в отдельный модуль и вызывается через набор стандартных предопределенных функций обратного вызова, которые стандартизированы для каждого поведения. Такой модуль обратного вызова будет содержать весь конкретный код, необходимый для достижения требуемой функциональности.

Поведения OTP включают в себя рабочие процессы (worker process), которые выполняют фактическую обработку, и супервизоры (supervisor), задачей которых является наблюдение за рабочими процессами и другими супервизорами. Поведения рабочих процессов, часто обозначаемые в диаграммах в виде кружков, включают в себя серверы, обработчики событий и конечные автоматы. Супервизоры, обозначаемые на иллюстрациях в виде прямоугольников, следят за дочерними элементами, причем как за рабочими процессами, так и за другими супервизорами, создавая на то, что называется деревом мониторинга.

Рис.15.1: Дерево мониторинга OTP Riak

Деревья мониторинга упакованы в поведение, которое называется приложением. Приложения OTP не только являются строительными блоками систем Erlang, но также и способом упаковки повторно используемых компонентов. Системы промышленного уровня, такие как Riak, состоят из множества слабо связанных, возможно, распределенных приложений. Некоторые из этих приложений являются частью стандартного дистрибутива Erlang, а некоторые являются теми частями, в которых реализована конкретная функциональность Riak.

К числу примеров приложений OTP относятся CORBA ORB или агент Simple Network Management Protocol (SNMP). Приложение OTP является повторно используемым компонентом, в котором упакованы библиотечные модули вместе с супервизорами и рабочими процессами. Теперь, когда мы говорим о приложении, мы будем подразумевать приложение OTP.

Модули поведения содержат весь общий код для каждого конкретного типа поведения. Хотя можно реализовать свой собственный модуль поведения, делается это редко, поскольку те модули, которые приходят с дистрибутивом Erlang/OTP, обычно содержат большую часть проектных шаблонов, которые вам может потребоваться использовать в своем коде. К числу общих функциональных возможностей, которые предоставлены в модуле поведения, относятся следующие операции:

  • порождение и, возможно, регистрация процесса;
  • отправка и получение клиентских сообщений, как с помощью синхронных, так и асинхронных вызовов, в том числе определяющих внутренний протокол обмена сообщениями;
  • запоминание данных цикла и управления циклом процесса, и
  • остановка процесса.

Данные цикла являются переменной, в которой поведение будет хранить данные, необходимые ему между вызовами. После вызова будет возвращаться обновленный вариант данных цикла. Эти обновленные данные цикла, которые часто называются новыми данными цикла, передаются в качестве аргумента в следующий вызов. Данные цикла также часто называются состоянием поведения.

К числу функциональных возможностей, которые должны быть в модуле обратного вызова для общего сервера приложений с тем, чтобы в нем реализовать требуемое поведение, относятся следующие:

  • Инициализация данных цикла процесса, и, если процесс зарегистрирован, имени процесса.
  • Обработка конкретных клиентских запросов и, если есть синхронизация, отсылка ответов обратно клиенту.
  • Обработка и обновление данных цикла процесса, осуществляемые между запросами процесса.
  • Очистка данных цикла процесса по окончанию.

15.3.2 Основные серверы

Основные серверы, в которых реализовано поведение клиент/сервер, определены в поведении gen_server, которое поставляется как часть стандартного библиотечного приложения. При рассмотрении основных серверов, мы будем пользоваться модулем riak_core_node_watcher.erl из приложения riak_core. Это сервер, который отслеживает и сообщает о том, какие суб-сервисы и узлы имеются в кластере Riak. Заголовки и директивы модуля выглядят следующим образом:

-module(riak_core_node_watcher).
-behavior(gen_server).
%% API
-export([start_link/0,service_up/2,service_down/1,node_up/0,node_down/0,services/0,
         services/1,nodes/1,avsn/0]).
%% gen_server callbacks
-export([init/1,handle_call/3,handle_cast/2,handle_info/2,terminate/2, code_change/3]).

-record(state, {status=up, services=[], peers=[], avsn=0, bcast_tref,
                bcast_mod={gen_server, abcast}}).

Мы можем легко отличить общий сервер по директиве -behavior(gen_server). Эта директива используется компилятором для того, чтобы правильно экспортировать все необходимые функции обратного вызова. В данных цикла сервера используется информация о статусе записи.

15.3.3. Запуск вашего сервера

Когда используется поведение gen_server, вы вместо встроенных функций spawn и spawn_link будете пользоваться функциями gen_server:start и gen_server:start_link. Основным различием между spawn и start является синхронизированная природа выполнения вызова. Использование start вместо of spawn делает запуск рабочего процесса более детерминированным и предотвращает возникновение непредвиденных состояний гонки (race conditions), поскольку вызов не вернет идентификатор PID рабочего процесса до тех пор, пока процесс не будет инициализирован. Вы вызываете функции одним из следующих способов:

gen_server:start_link(ServerName, CallbackModule, Arguments, Options)
gen_server:start_link(CallbackModule, Arguments, Options)

ServerName является кортежем вида {local, Name} или {global, Name}, в котором указывается локальное или глобальное имя Name алиаса процесса для случая, если имя должно быть зарегистрировано. Глобальные имена позволяют серверам прозрачно обращаться в кластеры распределенных узлов Erlang. Если вы не хотите регистрировать процесс и вместо этого ссылаетесь на него по его идентификатору PID, вы не указываете имя и вместо этого пользуетесь функцией start_link/3 или start/3. CallbackModule является именем модуля, в котором размещены конкретные функции обратного вызова, Arguments является допустимым термином языка Erlang, который передается в функцию обратного вызова init/1, а Options является списком, с помощью которого вы сможете устанавливать флаги fullsweep_after и heapsize, а также другие флаги, используемые при трассировке и отладке.

В нашем примере, мы вызываем start_link/4 и с помощью макровызова ?MODULE, регистрируя процесс с тем же именем, что и у модуля обратного вызова. В этом макросе указывается имя модуля, которое будет определено препроцессором во время компиляции кода. Рекомендуется всегда задавать имя вашего поведения точно таким же, как и имя модуля обратного вызова, в котором оно реализовано. Мы не передаем никаких аргументов, и, в результате, просто отправляем пустой список. Список параметров остается пустым:

start_link() ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

Очевидным различием между функциями start_link и start является то, что start_link скомпонована со своим родителем, которым чаще всего оказывается супервизором, тогда как для start этого не происходит. Об этом необходимо особо упомянуть, поскольку поведение OTP само должно ссылаться на супервизор. Функции start часто используются при тестировании поведения внутри командной оболочки, т. к. ошибки ввода, из-за которых возникает сбой работы командной оболочки, не оказывают влияние на поведение. Все варианты функций start и start_link возвращают {ok, Pid}.

Функции start и start_link порождают новый процесс, в котором будет вызвана функция обратного вызова init(Arguments), находящаяся в модуле CallbackModule, с аргументами Arguments. Функция init должна инициализировать данные цикла сервера LoopData и должен возвращаться кортеж вида {ok, LoopData}. В данных LoopData содержится первый экземпляр цикла данных, которые будут передаваться между функциями обратного вызова. Если вы хотите сохранить некоторые из аргументов, передаваемые вами в функцию init, вы их также должны сохранять в переменной LoopData. Данные LoopData в сервере-наблюдателе узла Riak являются результатом работы функции schedule_broadcast/1, вызываемой с записью типа state, в которой значения, используемые по умолчанию, устанавливаются в полях этой записи следующим образом:

init([]) ->

    %% Watch for node up/down events
    net_kernel:monitor_nodes(true),

    %% Setup ETS table to track node status
    ets:new(?MODULE, [protected, named_table]),

    {ok, schedule_broadcast(#state{})}.

Хотя процесс супервизора может вызвать функция start_link/4, другой процесс вызывает функцию обратного вызова init/1: это тот процесс, который был только что создан. Поскольку назначение этого сервера обнаруживать, записывать и передавать всем сообщения о наличии внутри Riak подсервисов, во время инициализации делается запрос среде времени исполнения языка Erlang обнаруживать такие события и настраивается таблица, в которой запоминается эта информация. Это нужно делать во время инициализации, поскольку в случае, если этой структуры еще нет, любые обращения к серверу окончатся неудачей. В вашей функции init делайте только то, что необходимо, и минимизируйте количество операций, поскольку вызов функции init является синхронным вызовом, который не позволит запустить какой-нибудь другой процесс сериализации до тех пор, пока управление не будет возвращено из этой функции.

15.3.4. Передача сообщений

Если вы хотите отправить синхронное сообщение на ваш сервер, вы используете функцию gen_server:call/2. Асинхронные вызовы выполняются с помощью функции gen_server:cast/2. Давайте начнем рассмотрение с этих двух функций API, относящихся к сервисам приложения Riak; остальной код мы рассмотрим позже. Эти функции вызываются клиентским процессом и результатом является синхронное сообщение, посылаемое серверному процессу, зарегистрированному с тем же самым именем, что и модуль обратного вызова. Обратите внимание, что проверка данных, передаваемых на сервер, должна происходить на клиентской стороне. Если клиент посылает неверную информацию, сервер должен завершить свою работу.

service_up(Id, Pid) ->
    gen_server:call(?MODULE, {service_up, Id, Pid}).

service_down(Id) ->
    gen_server:call(?MODULE, {service_down, Id}).

После получения сообщения, процесс gen_server вызывает функцию обратного вызова handle_call/3, получающую сообщения в том же самом порядке, в котором они были отправлены:

handle_call({service_up, Id, Pid}, _From, State) ->
    %% Update the set of active services locally
    Services = ordsets:add_element(Id, State#state.services),
    S2 = State#state { services = Services },

    %% Remove any existing mrefs for this service
    delete_service_mref(Id),

    %% Setup a monitor for the Pid representing this service
    Mref = erlang:monitor(process, Pid),
    erlang:put(Mref, Id),
    erlang:put(Id, Mref),

    %% Update our local ETS table and broadcast
    S3 = local_update(S2),
    {reply, ok, update_avsn(S3)};

handle_call({service_down, Id}, _From, State) ->
    %% Update the set of active services locally
    Services = ordsets:del_element(Id, State#state.services),
    S2 = State#state { services = Services },

    %% Remove any existing mrefs for this service
    delete_service_mref(Id),

    %% Update local ETS table and broadcast
    S3 = local_update(S2),
    {reply, ok, update_avsn(S3)};

Обратите внимание на значение, возвращаемое функцией обратного вызова. В кортеже содержится управляющий атом reply, сообщающий общему коду gen_server о том, что второй элемент кортежа (который в обоих наших случаях является атомом ok) является ответом, отправляемым обратно к клиенту. Третий элемент кортежа является новым состоянием new State, которое, в новой итерации сервера, передается в качестве третьего аргумента в функцию handle_call/3; в обоих случаях оно здесь обновляется с целью отобразить новое состояние имеющихся сервисов. Аргумент _From является кортежем, содержащим уникальную ссылку на сообщение и идентификатор клиентского процесса. Кортеж, как целое, используется в библиотечных функциях, которые мы в этой главе рассматривать не будем. В большинстве случаев, вам он не понадобится.

Библиотечный модуль gen_server имеет ряд встроенных механизмов и средств защиты, действующими за кулисами. Если ваш клиент посылает синхронное сообщение на ваш сервер, и вы не получаете ответ в течение пяти секунд, то процесс выполнения функции call/2 завершается. Вы можете изменить это, использовав для этого gen_server:call(Name, Message, Timeout), где Timeout является значением, указываемым в миллисекундах, или атомом бесконечности infinity.

Механизм тайм-аута сначала был добавлен для того, чтобы предотвратить взаимной блокировки и гарантировать, что работа серверов, которые случайно вызовут друг друга, будет завершена после тайм-аута, заданного по умолчанию. Сообщение об аварийной ситуации будет зарегистрировано, и, можно надеяться, приведет к тому, что ошибка будет найдена и исправлена. При тайм-ауте в пять секунд большинство приложений будут работать так, как это необходимо, но при очень больших нагрузках, вам, возможно, придется более точно задавать это значение или, возможно, даже использовать значение бесконечности infinity; этот выбор зависит от приложения. Во всех фрагментах с критическим кодом в Erlang/OTP используется infinity. В различных местах в Riak используются различные значения тайм-аута: infinity обычно используется в случае связанных между собой частей кода, а значения Timeout устанавливаются с учетом значения параметра, передаваемого пользователем, там, где в клиентском коде в Riak передается информация о том, что у операции должна быть возможность использовать тайм-аут.

Другие защитные механизмы, применяемые в случае использования функции gen_server:call/2, требуются в случае отправки сообщения на несуществующий сервер и в случае, когда сбой сервера происходит раньше, чем он отправит свой ответ. В обоих случаях, вызывающий процесс будет завершен. В чистом языке Erlang отправка сообщения, для которого в принимающем предложении никогда не происходит совпадения с образцом, рассматривается как ошибка, которая может привести к утечке памяти. Чтобы смягчить эту ситуацию, в Riak применяются две различные стратегии, использующих предложения сравнения, в которых сравнение осуществляется «со всем». В местах, где сообщение может быть инициировано пользователем, сообщение, которое не прошло сравнение, может быть просто отброшено. В местах, где такое сообщение может поступать только изнутри Riak, оно представляет собой ошибку, и поэтому будет выдан рапорт о внутреннем сбое, вызванном ошибкой, и рабочий процесс, который получил это сообщение, будет перезапущен.

Отправка асинхронных сообщений работает аналогичным образом. Сообщения отправляются асинхронно в общий сервер и обрабатываются с помощью функция обратного вызова handle_cast/2. Функция должна возвращать кортеж вида {reply, NewState}. Асинхронные вызовы используются, когда нас не интересует запрос сервера и мы не беспокоимся о том, что отравляем больше сообщений, чем сервер может принять. В случаях, когда нас не интересует ответ, но мы хотим перед тем, как отослать следующее сообщение, подождать, пока не будет обработано первое сообщение, нам нужно использовать gen_server:call/2, которое в ответе возвратит атом ok. Представьте себе процесс создания записей базы данных, который происходит быстрее, чем может принять Riak. Если мы используем асинхронные вызовы, мы рискуем заполнить почтовый ящик процесса и создать в узле состояние нехватки памяти. Riak для регулировки нагрузки пользуется возможностью сериализации сообщений синхронных вызовов gen_server, при котором обработка следующего запроса происходит только после того, как был обработан предыдущий запрос. Такой подход устраняет необходимость в более сложных схемах управления кодом: в добавок тому, что процессы gen_server позволяют осуществлять параллельную обработку, их также можно использовать для выполнения сериализации.

15.3.5. Остановка сервера

Как остановить сервер? Вы можете в функциях обратного вызова вместо возврата кортежа {reply, Reply, NewState} или {noreply, NewState}, возвратить кортеж {stop, Reason, Reply, NewState} или {stop, Reason, NewState}, соответственно. Что-то должно быть причиной возврат такого значения — часто это сообщение об остановке, отправляемое на сервер. После получения кортежа, сообщающего об остановке и указывающего причину Reason и состояние State, общий код выполнит функцию обратного вызова terminate(Reason, State).

Функция terminate является обычно тем местом, куда вставляется код, необходимый для очистки состояния сервера State и любых других постоянно хранящихся данных, используемых системой. В нашем примере мы посылаем последнее сообщение всем нашим процессам, наблюдающим за этим узлом, с тем, чтобы они знали, что этот узел будет остановлен. В этом примере в переменной State содержится запись с полями состояний status и наблюдателей peers:

terminate(_Reason, State) ->
    %% Let our peers know that we are shutting down
    broadcast(State#state.peers, State#state { status = down }).

Использование функций обратного вызова поведения в качестве библиотечных функций и их вызов из других частей программы является очень плохим практическим примером. Например, для того, чтобы получить исходные данные цикла, вы никогда не должны вызывать riak_core_node_watcher:init(Args) из другого модуля. Такое получение данных должно осуществляется с помощью синхронного обращения к серверу. Обращения к функциям обратного вызова поведения должно происходить из библиотечных модулей только в случае событий, возникающих в самой системе, а не напрямую пользователем.


Продолжение статьи: Другие поведения рабочих процессов.