Наши партнеры

UnixForum



Библиотека сайта rus-linux.net

Кластеризация согласно консенсусу

Оригинал: Clustering by Consensus
Автор: Dustin J. Mitchell
Дата публикации: July 12, 2016
Перевод: Н.Ромоданов
Дата перевода: январь 2017 г.

Введение в библиотеку Cluster

В библиотеке Cluster, о которой рассказывается в этой главе, реализуется простую форму алгоритма Multi-Paxos. Она разработана в качестве библиотеки, позволяющей в большом приложении создать сервис консенсуса.

Пользователям важно, чтобы эта библиотека работала корректно, поэтому важно структурировать код так, чтобы мы могли видеть и протестировать ее соответствие спецификациям. В сложных протоколах могут возникать сложные отказы, поэтому мы будем создавать средства поддержки для воспроизведения и отладки редких отказов.

Главной целью этой главы является код и доказательство, что он соответствует нашим концепциям: достаточно лишь продемонстрировать, что в ядре библиотеки наша концепция реализована на практике; причем без всех обычных дополнений, необходимых в процессе эксплуатации программного обеспечения. Структура кода такова, что подобные дополнения можно будет добавить позже, причем с минимальными изменениями в реализации ядра библиотеки.

Давайте начнем.

Типы и константы

В протоколе Cluster используется пятнадцать различных типов сообщений, каждый из которых определяется как именованный кортеж namedtuple языка Python.

    Accepted = namedtuple('Accepted', ['slot', 'ballot_num'])
    Accept = namedtuple('Accept', ['slot', 'ballot_num', 'proposal'])
    Decision = namedtuple('Decision', ['slot', 'proposal'])
    Invoked = namedtuple('Invoked', ['client_id', 'output'])
    Invoke = namedtuple('Invoke', ['caller', 'client_id', 'input_value'])
    Join = namedtuple('Join', [])
    Active = namedtuple('Active', [])
    Prepare = namedtuple('Prepare', ['ballot_num'])
    Promise = namedtuple('Promise', ['ballot_num', 'accepted_proposals'])
    Propose = namedtuple('Propose', ['slot', 'proposal'])
    Welcome = namedtuple('Welcome', ['state', 'slot', 'decisions'])
    Decided = namedtuple('Decided', ['slot'])
    Preempted = namedtuple('Preempted', ['slot', 'preempted_by'])
    Adopted = namedtuple('Adopted', ['ballot_num', 'accepted_proposals'])
    Accepting = namedtuple('Accepting', ['leader'])

Использование именованных кортежей для описания каждого типа сообщений позволяет содержать код в чистоте и избегать некоторых простых ошибок. В случае, если правильные атрибуты не заданы явно, конструктор именованного кортежа возбуждает исключение, что указывает на ошибку правильности ввода кода. Работа с кортежами хорошо отображается в журнале сообщений и, как дополнительный бонус, на кортежи не тратится столько памяти, как в случае использования словаря.

Создание сообщения выглядит естественным образом:

    msg = Accepted(slot=10, ballot_num=30)

И для доступа к полям этого сообщения нужно минимум дополнительного кода:

    got_ballot_num = msg.ballot_num

Ниже мы увидим, что означают эти сообщения. В коде также используется несколько констант, большинство из которых определяют время ожидания различных сообщений:

    JOIN_RETRANSMIT = 0.7
    CATCHUP_INTERVAL = 0.6
    ACCEPT_RETRANSMIT = 1.0
    PREPARE_RETRANSMIT = 1.0
    INVOKE_RETRANSMIT = 0.5
    LEADER_TIMEOUT = 1.0
    NULL_BALLOT = Ballot(-1, -1)  # sorts before all real ballots
    NOOP_PROPOSAL = Proposal(None, None, None)  # no-op to fill otherwise empty slots

Наконец, в кластере Cluster используются два типа данных с именами, соответствующими описанию протокола:

    Proposal = namedtuple('Proposal', ['caller', 'client_id', 'input'])
    Ballot = namedtuple('Ballot', ['n', 'leader'])

Компонентная модель

Люди ограничены тем, что могут удерживать в своей активной памяти лишь немного данных. Мы не можем сразу рассуждать о всей реализации кластера Cluster - она просто слишком большая, так что легко пропустить детали. По тем же причинам, трудно тестировать большие монолитные куски кода: тесты должны манипулировать со многими взаимодействующими кусочками кода и поэтому эти тесты очень хрупкие и могут перестать работать практически при любом изменении в коде.

Чтобы улучшить тестируемость кода и сохранить его удобочитаемость, мы разбиваем Cluster на горстку классов, соответствующих ролям, описанным в протоколе. Каждый из них является подклассом класса Role.

class Role(object):

    def __init__(self, node):
        self.node = node
        self.node.register(self)
        self.running = True
        self.logger = node.logger.getChild(type(self).__name__)

    def set_timer(self, seconds, callback):
        return self.node.network.set_timer(self.node.address, seconds,
                                           lambda: self.running and callback())

    def stop(self):
        self.running = False
        self.node.unregister(self)

Роли, которые есть у узла кластера, объединятся вместе с помощью класса Node, представляющего собой отдельный узел сети. По мере того, как работа системы продолжается, в узел добавляются роли или удаляются из узла. Сообщения, которые поступают на узел, передаются во все активные роли и вызывают метод, в имени которого кроме имени типа сообщения есть еще префикс do_. Эти методы с префиксом do_ получают атрибуты сообщений, представленных а для более простого к ним доступа в виде аргументов ключевых слов. В классе Node для удобства также есть метод send, а для передачи некоторых аргументов в же самые методы класса Network используется functools.partial.

class Node(object):
    unique_ids = itertools.count()

    def __init__(self, network, address):
        self.network = network
        self.address = address or 'N%d' % self.unique_ids.next()
        self.logger = SimTimeLogger(
            logging.getLogger(self.address), {'network': self.network})
        self.logger.info('starting')
        self.roles = []
        self.send = functools.partial(self.network.send, self)

    def register(self, roles):
        self.roles.append(roles)

    def unregister(self, roles):
        self.roles.remove(roles)

    def receive(self, sender, message):
        handler_name = 'do_%s' % type(message).__name__

        for comp in self.roles[:]:
            if not hasattr(comp, handler_name):
                continue
            comp.logger.debug("received %s from %s", message, sender)
            fn = getattr(comp, handler_name)
            fn(sender=sender, **message._asdict())

Интерфейс приложения

Приложение для каждого члена кластера создает и запускает объект Member, реализующий машину состояний конкретного приложения и список пиров (peers). Объект члена кластера добавляет в узел роль bootstrap (развертывание) в случае, если узел подключается к уже существующему кластеру, или роль seed (начало создания) в случае, если узел создает новый кластер. Затем в отдельном потоке запускается протокол (с помощью Network.run).

Приложение взаимодействует с кластером с помощью метода invoke, который инициирует посылку предложения о переходе между состояниями. После того, как предложение будет принято, будет запущена машина состояний, и метод invoke вернет выходной результат машины состояний. Для ожидания результата из потока протокола метод использует простая синхронную очередь Queue.

class Member(object):

    def __init__(self, state_machine, network, peers, seed=None,
                 seed_cls=Seed, bootstrap_cls=Bootstrap):
        self.network = network
        self.node = network.new_node()
        if seed is not None:
            self.startup_role = seed_cls(self.node, initial_state=seed, peers=peers,
                                      execute_fn=state_machine)
        else:
            self.startup_role = bootstrap_cls(self.node,
                                      execute_fn=state_machine, peers=peers)
        self.requester = None

    def start(self):
        self.startup_role.start()
        self.thread = threading.Thread(target=self.network.run)
        self.thread.start()

    def invoke(self, input_value, request_cls=Requester):
        assert self.requester is None
        q = Queue.Queue()
        self.requester = request_cls(self.node, input_value, q.put)
        self.requester.start()
        output = q.get()
        self.requester = None
        return output

Классы ролей

Давайте последовательно один за другим рассмотрим каждый класс ролей, имеющихся в библиотеке.

Класс Acceptor

Класс Acceptor реализует в протоколе роль получателя сообщений, поэтому он должен запоминать номер бюллетеня с самым последним предложением, а также для каждого слота множество принятых предложений. А затем согласно протоколу отвечать сообщениями Prepare и Accept. Результатом будет небольшой класс, который легко сопоставить с протоколом.

Для получателей алгоритм Multi-Paxos выглядит почти так, как простой алгоритм Simple Paxos, но с добавлением к сообщениям номеров слотов.

class Acceptor(Role):

    def __init__(self, node):
        super(Acceptor, self).__init__(node)
        self.ballot_num = NULL_BALLOT
        self.accepted_proposals = {}  # {slot: (ballot_num, proposal)}

    def do_Prepare(self, sender, ballot_num):
        if ballot_num > self.ballot_num:
            self.ballot_num = ballot_num
            # мы получили сообщение от класса scout, который может быть следующим лидером
            self.node.send([self.node.address], Accepting(leader=sender))

        self.node.send([sender], Promise(
            ballot_num=self.ballot_num, 
            accepted_proposals=self.accepted_proposals
        ))

    def do_Accept(self, sender, ballot_num, slot, proposal):
        if ballot_num >= self.ballot_num:
            self.ballot_num = ballot_num
            acc = self.accepted_proposals
            if slot not in acc or acc[slot][0] < ballot_num:
                acc[slot] = (ballot_num, proposal)

        self.node.send([sender], Accepted(
            slot=slot, ballot_num=self.ballot_num))

Класс Replica

Класс Replica является классом самой сложной роли, поэтому у нее есть несколько тесно связанных друг с другом обязанностей:

  • Создание новых предложений;
  • Вызов локальной машины состояний в случае, когда принято предложение;
  • Отслеживание текущего лидера; а также
  • Добавление к кластеру новых только что запущенных узлов.

Класс Replica в ответ на сообщения Invoke, поступающие от клиентов, создает новые предложения, выбирая те, которые, как он считает, относятся к неиспользуемому слоту, и отправляя сообщение Propose узлу, который в настоящий момент является лидером (рис.3.2). Более того, если при принятии консенсуса для выбранного есть другое предложение, то класс Replica должен повторно послать другое предложение с указанием нового слота.

Рис.3.2. Последовательность выполнения действий для роли Replica

В сообщения Decision указывается слот, для которого кластер должен прийти к консенсусу. Здесь, роли Replica запоминают новое решение, а затем запускают машину состояний, которая будет работать пока не будет достигнут слот, для которое соглашение не принято. Роли Replica различают слоты (decided), для которых соглашение в кластере принято, от слотов (committed), которые локальная машина состояний уже обработала. Если порядок принятия соглашений для слотов нарушается, локальная обработка решений может отставать и может потребоваться подождать, когда предложение будет принято для следующего слота. Когда слот обработан, роль Replica отправляет сообщение Invoked с результатом операции обратно тому, кто сделал зарос.

Возможно, что в некоторых случаях, для слота не будет никаких активных предложений и никаких решений. Машина конечных состояний обрабатывает слоты по порядку один за другим, и для того, чтобы на кластере достичь консенсуса по какому-нибудь вопросу, слот нужно и заполнить. Для защиты от проблем в этой ситуации роль Replica всякий раз, когда начинает работу со слотом, создает предложение "no-op" (отсутствие операции). Если в конце концов такое предложение останется в силе, то для этого слота машина состояний ничего выполнять не будет.

Точно так же, вполне возможно, что одно и тоже предложение будет принято за решение дважды. Для любых таких дублирующих предложений роль Replica пропускает обращение к машине состояний и для этого слота никаких переходов выполняться не будет.

Роли Replica для того, чтобы отправить сообщение Propose активному лидеру, должны знать, какой узел является активным лидером. Как мы позже увидим, для того, чтобы это знать правильно, нужно соблюсти удивительное количество всяких частностей. Каждая роль Replica отслеживает активного лидера по трем источникам информации.

Когда роль лидера становится активной, то происходит отсылка сообщения Adopted (одобрено) роли Replica того же самого узла (рис.3.3.).

Рис.3.3: Одобрено

Когда роль, принявшая сообщение, отправляет сообщение Promise новому лидера, она посылает сообщение Accepting (на одобрение) своей локальной роли Replica (рис.3.4.)

Рис.4.4. На одобрение

Активный лидер периодически отправляет сообщения Active в качестве постоянного подтверждения своей активности (рис.3.5.). Если такое сообщение не приходит до того момента, как истечет время LEADER_TIMEOUT, роль Replica будет предполагать, что лидер мертв, и перейдет к следующему лидеру. В этом случае очень важно, чтобы все роли Replica выбирать одного и того же нового лидер, чего можно достичь путем сортировки членов и выбором следующего члена в списке.

Рис.3.5. Подтверждение активности активного лидера

Наконец, когда узел присоединяется к сети, роль bootstrap посылает сообщение Join (подключение)(рис.3.6.) Роль Replica реагирует на это приветственным сообщением Welcome, в котором содержится последнее состояние, что позволяет новому узлу подключиться быстро.

Рис.3.6. Развертывание в роли Bootstrap

class Replica(Role):

    def __init__(self, node, execute_fn, state, slot, decisions, peers):
        super(Replica, self).__init__(node)
        self.execute_fn = execute_fn
        self.state = state
        self.slot = slot
        self.decisions = decisions
        self.peers = peers
        self.proposals = {}
        # next slot num for a proposal (may lead slot)
        self.next_slot = slot
        self.latest_leader = None
        self.latest_leader_timeout = None

    # making proposals

    def do_Invoke(self, sender, caller, client_id, input_value):
        proposal = Proposal(caller, client_id, input_value)
        slot = next((s for s, p in self.proposals.iteritems() if p == proposal), None)
        # propose, or re-propose if this proposal already has a slot
        self.propose(proposal, slot)

    def propose(self, proposal, slot=None):
        """Send (or resend, if slot is specified) a proposal to the leader"""
        if not slot:
            slot, self.next_slot = self.next_slot, self.next_slot + 1
        self.proposals[slot] = proposal
        # find a leader we think is working - either the latest we know of, or
        # ourselves (which may trigger a scout to make us the leader)
        leader = self.latest_leader or self.node.address
        self.logger.info(
            "proposing %s at slot %d to leader %s" % (proposal, slot, leader))
        self.node.send([leader], Propose(slot=slot, proposal=proposal))

    # handling decided proposals

    def do_Decision(self, sender, slot, proposal):
        assert not self.decisions.get(self.slot, None), \
                "next slot to commit is already decided"
        if slot in self.decisions:
            assert self.decisions[slot] == proposal, \
                "slot %d already decided with %r!" % (slot, self.decisions[slot])
            return
        self.decisions[slot] = proposal
        self.next_slot = max(self.next_slot, slot + 1)

        # re-propose our proposal in a new slot if it lost its slot and wasn't a no-op
        our_proposal = self.proposals.get(slot)
        if (our_proposal is not None and 
            our_proposal != proposal and our_proposal.caller):
            self.propose(our_proposal)

        # execute any pending, decided proposals
        while True:
            commit_proposal = self.decisions.get(self.slot)
            if not commit_proposal:
                break  # not decided yet
            commit_slot, self.slot = self.slot, self.slot + 1

            self.commit(commit_slot, commit_proposal)

    def commit(self, slot, proposal):
        """Actually commit a proposal that is decided and in sequence"""
        decided_proposals = [p for s, p in self.decisions.iteritems() if s < slot]
        if proposal in decided_proposals:
            self.logger.info(
                "not committing duplicate proposal %r, slot %d", proposal, slot)
            return  # duplicate

        self.logger.info("committing %r at slot %d" % (proposal, slot))
        if proposal.caller is not None:
            # perform a client operation
            self.state, output = self.execute_fn(self.state, proposal.input)
            self.node.send([proposal.caller], 
                Invoked(client_id=proposal.client_id, output=output))

    # tracking the leader

    def do_Adopted(self, sender, ballot_num, accepted_proposals):
        self.latest_leader = self.node.address
        self.leader_alive()

    def do_Accepting(self, sender, leader):
        self.latest_leader = leader
        self.leader_alive()

    def do_Active(self, sender):
        if sender != self.latest_leader:
            return
        self.leader_alive()

    def leader_alive(self):
        if self.latest_leader_timeout:
            self.latest_leader_timeout.cancel()

        def reset_leader():
            idx = self.peers.index(self.latest_leader)
            self.latest_leader = self.peers[(idx + 1) % len(self.peers)]
            self.logger.debug("leader timed out; tring the next one, %s", 
                self.latest_leader)
        self.latest_leader_timeout = self.set_timer(LEADER_TIMEOUT, reset_leader)

    # adding new cluster members

    def do_Join(self, sender):
        if sender in self.peers:
            self.node.send([sender], Welcome(
                state=self.state, slot=self.slot, decisions=self.decisions))

Классы Leader, Scout и Commander

Основная задача лидера - получать сообщения Propose (Предложение) и принимать решения. Лидер "активен", когда он успешно реализует часть протокола Prepare/Promise (Предложение/Обещание). Активный лидер может ответ на сообщение Propose немедленно отправить сообщение Accept.

В соответствии с моделью "отдельный класс для каждой роли", лидер делегирует выполнение каждой части протокола ролям scout и commander.

class Leader(Role):

    def __init__(self, node, peers, commander_cls=Commander, scout_cls=Scout):
        super(Leader, self).__init__(node)
        self.ballot_num = Ballot(0, node.address)
        self.active = False
        self.proposals = {}
        self.commander_cls = commander_cls
        self.scout_cls = scout_cls
        self.scouting = False
        self.peers = peers

    def start(self):
        # reminder others we're active before LEADER_TIMEOUT expires
        def active():
            if self.active:
                self.node.send(self.peers, Active())
            self.set_timer(LEADER_TIMEOUT / 2.0, active)
        active()

    def spawn_scout(self):
        assert not self.scouting
        self.scouting = True
        self.scout_cls(self.node, self.ballot_num, self.peers).start()

    def do_Adopted(self, sender, ballot_num, accepted_proposals):
        self.scouting = False
        self.proposals.update(accepted_proposals)
        # note that we don't re-spawn commanders here; if there are undecided
        # proposals, the replicas will re-propose
        self.logger.info("leader becoming active")
        self.active = True

    def spawn_commander(self, ballot_num, slot):
        proposal = self.proposals[slot]
        self.commander_cls(self.node, ballot_num, slot, proposal, self.peers).start()

    def do_Preempted(self, sender, slot, preempted_by):
        if not slot:  # from the scout
            self.scouting = False
        self.logger.info("leader preempted by %s", preempted_by.leader)
        self.active = False
        self.ballot_num = Ballot((preempted_by or self.ballot_num).n + 1, 
                                 self.ballot_num.leader)

    def do_Propose(self, sender, slot, proposal):
        if slot not in self.proposals:
            if self.active:
                self.proposals[slot] = proposal
                self.logger.info("spawning commander for slot %d" % (slot,))
                self.spawn_commander(self.ballot_num, slot)
            else:
                if not self.scouting:
                    self.logger.info("got PROPOSE when not active - scouting")
                    self.spawn_scout()
                else:
                    self.logger.info("got PROPOSE while scouting; ignored")
        else:
            self.logger.info("got PROPOSE for a slot already being proposed")

Лидер, когда он неактивен и хочет стать активными, создает роль scout в ответ на прием сообщения Propose (рис.3.7.) Роль scout отправляет (и, в случае необходимости, отправляет повторно) сообщение Prepare, а также собирает ответы Promise до тех пор, пока он не получит ответы от большинства респондентов, или до тех пор, пока его никто не заменит. Он связывается обратно с лидером с помощью сообщений Adopted или Preempted, соответственно.

Рис.3.7. Роль Scout

class Scout(Role):

    def __init__(self, node, ballot_num, peers):
        super(Scout, self).__init__(node)
        self.ballot_num = ballot_num
        self.accepted_proposals = {}
        self.acceptors = set([])
        self.peers = peers
        self.quorum = len(peers) / 2 + 1
        self.retransmit_timer = None

    def start(self):
        self.logger.info("scout starting")
        self.send_prepare()

    def send_prepare(self):
        self.node.send(self.peers, Prepare(ballot_num=self.ballot_num))
        self.retransmit_timer = self.set_timer(PREPARE_RETRANSMIT, self.send_prepare)

    def update_accepted(self, accepted_proposals):
        acc = self.accepted_proposals
        for slot, (ballot_num, proposal) in accepted_proposals.iteritems():
            if slot not in acc or acc[slot][0] < ballot_num:
                acc[slot] = (ballot_num, proposal)

	def do_Promise(self, sender, ballot_num, accepted_proposals):
        if ballot_num == self.ballot_num:
            self.logger.info("got matching promise; need %d" % self.quorum)
            self.update_accepted(accepted_proposals)
            self.acceptors.add(sender)
            if len(self.acceptors) >= self.quorum:
                # strip the ballot numbers from self.accepted_proposals, now that it
                # represents a majority
                accepted_proposals = \ 
                    dict((s, p) for s, (b, p) in self.accepted_proposals.iteritems())
                # We're adopted; note that this does *not* mean that no other
                # leader is active.  # Any such conflicts will be handled by the
                # commanders.
                self.node.send([self.node.address],
                    Adopted(ballot_num=ballot_num, 
                            accepted_proposals=accepted_proposals))
                self.stop()
        else:
            # this acceptor has promised another leader a higher ballot number,
            # so we've lost
            self.node.send([self.node.address], 
                Preempted(slot=None, preempted_by=ballot_num))
            self.stop()

Лидер создает роль commander для каждого слота, для которого у него есть активное предложение (рис.3.8). Точно также, как и роль scout, роль commander отправляет и повторно отправляет сообщения Accept и ожидает до тех пор, пока большинство принимающих не ответят Accepted, или до тех пор, пока этот процесс не будет прерван. Если предложение будет принято, то роль commander передает всем узлам широковещательное сообщение Decision. Он отвечает лидеру сообщениями Decided или Preempted.

Рис.3.8. Роль Commander

class Commander(Role):

    def __init__(self, node, ballot_num, slot, proposal, peers):
        super(Commander, self).__init__(node)
        self.ballot_num = ballot_num
        self.slot = slot
        self.proposal = proposal
        self.acceptors = set([])
        self.peers = peers
        self.quorum = len(peers) / 2 + 1

    def start(self):
        self.node.send(set(self.peers) - self.acceptors, Accept(
            slot=self.slot, ballot_num=self.ballot_num, proposal=self.proposal))
        self.set_timer(ACCEPT_RETRANSMIT, self.start)

    def finished(self, ballot_num, preempted):
        if preempted:
            self.node.send([self.node.address], 
                           Preempted(slot=self.slot, preempted_by=ballot_num))
        else:
            self.node.send([self.node.address], 
                           Decided(slot=self.slot))
        self.stop()

    def do_Accepted(self, sender, slot, ballot_num):
        if slot != self.slot:
            return
        if ballot_num == self.ballot_num:
            self.acceptors.add(sender)
            if len(self.acceptors) < self.quorum:
                return
            self.node.send(self.peers, Decision(
                           slot=self.slot, proposal=self.proposal))
            self.finished(ballot_num, False)
        else:
            self.finished(ballot_num, True)

Здесь в процессе разработки в качестве побочного эффекта проявилась удивительно тонкая ошибка. В некоторый момент времени эмулятор сети имитирует потерю пакетов даже в сообщениях внутри узла. Когда были потеряны все сообщения Decision, то работа протокола становится невозможной. Роль Replica продолжает повторно передавать сообщения Propose, но лидер игнорирует их, т. к. для этого слота уже было предложение. Процесс, восстанавливающий состояние роль Replica, не может найти результат, поскольку ни одна из ролей Replica не слышала о решении. Решением должна быть гарантия того, что локальные сообщения всегда доставляются так, как это имеет место в реальных сетевых стеках.

Развертывание узла - роль Bootstrap

Когда узел присоединяется к кластеру, то прежде, чем он сможет принять участие в работе кластера, он должен определить текущее состояние кластера. Роль Bootstrap(развертывание узла) обрабатывает эту ситуацию при помощи отправки сообщений Join (Присоединение) каждому партнеру до тех пор, пока от него не получит ответное сообщение Welcome (Приглашение). Диаграмма коммуникации роли Bootstrap показана выше в диаграмме для роли Replica.

В первоначальной версии реализации каждый узел запускался с полным набором ролей (replica, leader и acceptor), каждая из которых начиналась с фазы "startup" ("запуске"), ожидающей сообщения Welcome. В результате логика инициализации каждой роли расширялась, что потребовало отдельного тестирования каждой из них. В окончательной версии есть роль bootstrap, добавляемая к другим ролям в узел при его запуске, и как только запуск завершается передающая начальное состояние в конструкторы других ролей.

class Bootstrap(Role):

    def __init__(self, node, peers, execute_fn,
                 replica_cls=Replica, acceptor_cls=Acceptor, leader_cls=Leader,
                 commander_cls=Commander, scout_cls=Scout):
        super(Bootstrap, self).__init__(node)
        self.execute_fn = execute_fn
        self.peers = peers
        self.peers_cycle = itertools.cycle(peers)
        self.replica_cls = replica_cls
        self.acceptor_cls = acceptor_cls
        self.leader_cls = leader_cls
        self.commander_cls = commander_cls
        self.scout_cls = scout_cls

    def start(self):
     	self.join()

    def join(self):
        self.node.send([next(self.peers_cycle)], Join())
        self.set_timer(JOIN_RETRANSMIT, self.join)

    def do_Welcome(self, sender, state, slot, decisions):
        self.acceptor_cls(self.node)
        self.replica_cls(self.node, execute_fn=self.execute_fn, peers=self.peers,
                         state=state, slot=slot, decisions=decisions)
        self.leader_cls(self.node, peers=self.peers, commander_cls=self.commander_cls,
                        scout_cls=self.scout_cls).start()
        self.stop()

Создание нового кластера — роль Seed

При нормальной работе, когда узел присоединяется к кластеру, он ожидает обнаружить, что кластер уже работает, по крайней мере, с одним узлом, готовым ответить на сообщение Join. Но как кластеру начать работу? Один из вариантов — это после попыток связаться с любым другим узлом определить в роли bootstrap, что данный узел является первым в кластере. Но здесь возникают следующие две проблемы. Во-первых, для большого кластера это означает, что потребуется долго ждать, пока для каждого сообщения Join не наступит таймаут. И, что еще более важно, в случае разделения сети, новый узел, возможно, не сможет связаться с каким-либо другим узлом и запустить новый кластер.

Фрагментация сети является наиболее сложным случаем отказа для кластерных приложений. Все члены кластера могут в отдельном фрагменте сети оставаться живыми, но взаимодействие между некоторыми из них может обрываться. Например, если сетевое соединение, подключения к кластеру с узлами в Берлине и Тайбэйе обрывается, то происходит фрагментация сети. Если обе части кластера продолжают после этого работать, то их присоединение друг к другу после того, как сетевое соединение восстанавливается, может оказаться сложной задачей. В случае использования алгоритма Multi-Paxos в восстановленной сети будут два кластера с различными решениями для одних и тех же номеров слотов.

Чтобы избежать такого результата, создание нового кластера является операция, которую инициирует пользователь. В кластере ровно один узел запускает роль seed, а все другие узлы, как и обычно, работают с ролью bootstrap. Роль seed ждет, пока она не получил сообщения Join от большинства членов кластера, а затем посылает сообщения Welcome с начальным состоянием машины состояний и пустым множеством решений. Затем роль seed самостоятельно останавливается и запускается роль bootstrap для того, чтобы присоединиться к только что созданному кластеру.

Роль seed эмулирует часть Join/Welcome взаимодействия ролей bootstrap/replica, так что ее диаграмма взаимодействия точно такая же, как и для роли replica.

class Seed(Role):

    def __init__(self, node, initial_state, execute_fn, peers, 
                 bootstrap_cls=Bootstrap):
        super(Seed, self).__init__(node)
        self.initial_state = initial_state
        self.execute_fn = execute_fn
        self.peers = peers
        self.bootstrap_cls = bootstrap_cls
        self.seen_peers = set([])
        self.exit_timer = None

    def do_Join(self, sender):
        self.seen_peers.add(sender)
        if len(self.seen_peers) <= len(self.peers) / 2:
            return

        # cluster is ready - welcome everyone
        self.node.send(list(self.seen_peers), Welcome(
            state=self.initial_state, slot=1, decisions={}))

        # stick around for long enough that we don't hear any new JOINs from
        # the newly formed cluster
        if self.exit_timer:
            self.exit_timer.cancel()
        self.exit_timer = self.set_timer(JOIN_RETRANSMIT * 2, self.finish)

    def finish(self):
        # bootstrap this node into the cluster we just seeded
        bs = self.bootstrap_cls(self.node, 
                                peers=self.peers, execute_fn=self.execute_fn)
        bs.start()
        self.stop()

Роль Requester

Роль Requester управляет запросами к распределенной машины состояний. Класс role просто посылает сообщения Invoke локальной роли replica до тех пор, пока он не получит соответствующее сообщение Invoked. Диаграмму взаимодействия для этой роли смотрите выше в разделе, где описывается роль Replica.

class Requester(Role):

    client_ids = itertools.count(start=100000)

    def __init__(self, node, n, callback):
        super(Requester, self).__init__(node)
        self.client_id = self.client_ids.next()
        self.n = n
        self.output = None
        self.callback = callback

    def start(self):
        self.node.send([self.node.address], 
                       Invoke(caller=self.node.address, 
                              client_id=self.client_id, input_value=self.n))
        self.invoke_timer = self.set_timer(INVOKE_RETRANSMIT, self.start)

    def do_Invoked(self, sender, client_id, output):
        if client_id != self.client_id:
            return
        self.logger.debug("received output %r" % (output,))
        self.invoke_timer.cancel()
        self.callback(output)
        self.stop()

Подведем итог

Еще раз напомним кластерные роли:

  • Acceptor — рассылает сообщения с обещаниями promise и принимать сообщения с предложениями proposal
  • Replica -- управлять распределенной машиной состояния: внесение предложений, принятие решений и реакция на запросы
  • Leader – управление алгоритмом Multi-Paxos
  • Scout – выполняет для лидера часть Prepare/Promise алгоритма Multi-Paxos
  • Commander -- выполняет для лидера часть Accept/Accepted алгоритма Multi-Paxos
  • Bootstrap – добавляет новый узел в существующий кластер
  • Seed – создает новый кластер
  • Requester – запрашивает выполнение операции в распределенной машине состояний

Есть еще одна составляющая, которая нужна для запуска кластера Cluster: сеть, посредством которой общаются все узлы.

Перейти к следующей части статьи.