Наши партнеры

UnixForum





Библиотека сайта rus-linux.net

Система непрерывной интеграции

Оригинал: A Continuous Integration System
Автор: Malini Das
Дата публикации: July 12, 2016
Перевод: Н.Ромоданов
Дата перевода: октябрь 2016 г.

Предыдущая часть статьи

Диспетчер (dispatcher.py)

Диспетчер — это отдельный сервис, который используется для делегирования заданий на тестирование. Он прослушивает порт для обнаружения запросов, которые могут поступить от средства запуска тестов и от наблюдателя за репозитарием. Он позволяет средствам запуска тестов самостоятельно регистрироваться, а когда от наблюдателя за репозитарием поступает ID коммита, он направляет средство запуска тестов на обработку поступившего коммита. Он также корректно обрабатывает любые проблемы, возникающие со средствами запуска тестов и перераспределяет ID коммита на другое средство запуска тестов в случае, если что-нибудь пойдет не так.

Когда выполняется dispatch.py, то вызывается функция serve. Сначала она анализирует аргументы, с помощью которых вы можете указать хост диспетчера и порт:

def serve():
    parser = argparse.ArgumentParser()
    parser.add_argument("--host",
                        help="dispatcher's host, by default it uses localhost",
                        default="localhost",
                        action="store")
    parser.add_argument("--port",
                        help="dispatcher's port, by default it uses 8888",
                        default=8888,
                        action="store")
    args = parser.parse_args()

При этом запускается сервер диспетчера, а также два других потока. Один поток запускает функцию runner_checker, а другой запускает функцию redistribute.

 server = ThreadingTCPServer((args.host, int(args.port)), DispatcherHandler)
    print `serving on %s:%s` % (args.host, int(args.port))

    ...

    runner_heartbeat = threading.Thread(target=runner_checker, args=(server,))
    redistributor = threading.Thread(target=redistribute, args=(server,))
    try:
        runner_heartbeat.start()
        redistributor.start()
        # Активация сервера; он будет работать до тех пор, пока 
        # вы не прервете работу программы с помощью Ctrl+C или Cmd+C
        server.serve_forever()
    except (KeyboardInterrupt, Exception):
        # Если возникло какое-нибудь исключение, то уничтожаем поток
        server.dead = True
        runner_heartbeat.join()
        redistributor.join()

Функция runner_checker периодически пингует каждое зарегистрированное средство запуска тестов с тем, чтобы быть уверенным, что оно по-прежнему выполняется. Если ответа не последует, то такое средство запуска тестов будет удалено из пула, а его ID коммита будет передан другому доступному средству запуска тестов. Функция будет регистрировать ID коммита в переменной pending_commits.

    def runner_checker(server):
        def manage_commit_lists(runner):
            for commit, assigned_runner in server.dispatched_commits.iteritems():
                if assigned_runner == runner:
                    del server.dispatched_commits[commit]
                    server.pending_commits.append(commit)
                    break
            server.runners.remove(runner)
        while not server.dead:
            time.sleep(1)
            for runner in server.runners:
                s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                try:
                    response = helpers.communicate(runner["host"],
                                                   int(runner["port"]),
                                                   "ping")
                    if response != "pong":
                        print "removing runner %s" % runner
                        manage_commit_lists(runner)
                except socket.error as e:
                    manage_commit_lists(runner)

Функция redistribute используется для диспетчеризации идентификаторов коммитов, зарегистрированных в переменной pending_commits. Когда ьзапускается функция redistribute, она проверяет, есть ли в переменной pending_commits какие-нибудь идентификаторы коммитов. Если есть, то она вызывает функцию dispatch_tests с ID коммита.

    def redistribute(server):
        while not server.dead:
            for commit in server.pending_commits:
                print "running redistribute"
                print server.pending_commits
                dispatch_tests(server, commit)
                time.sleep(5)

Функция dispatch_tests используется для выборки из пула доступного средства запуска тестов. Если такое средство есть в наличии, то эта функция пошлет ему сообщение о запуске теста с ID коммита. Если в настоящее время нет ни одного из таких средств, то функция подождет две секунды и повторит этот процесс. После того, как сообщение будет отправлено, в переменной dispatched_commits будет записано, какой ID тестируется и с помощью какого средства запуска тестов. Если ID коммита находится в переменной pending_commits, то функция dispatch_tests удалит его отуда, поскольку он уже успешно повторно перераспределен.

def dispatch_tests(server, commit_id):
    # ЗАМЕЧАНИЕ: обычно мы эту часть никогда не запускаем
    while True:
        print "trying to dispatch to runners"
        for runner in server.runners:
            response = helpers.communicate(runner["host"],
                                           int(runner["port"]),
                                           "runtest:%s" % commit_id)
            if response == "OK":
                print "adding id %s" % commit_id
                server.dispatched_commits[commit_id] = runner
                if commit_id in server.pending_commits:
                    server.pending_commits.remove(commit_id)
                return
        time.sleep(2)

Сервер диспетчера использует модуль SocketServer, являющийся очень простым сервером и представляющий собой часть стандартной библиотеки. В модуле SocketServer есть четыре основных типа серверов: TCP, UDP, UnixStreamServer и UnixDatagramServer. Мы будем использовать сервер сокетов на основе TCP и, тем самым, сможем обеспечить непрерывное, упорядоченную передачу потоков данных между серверами, а протокол UDP этого не гарантирует.

По умолчанию сервер TCPServer, который реализован с помощью SocketServer, может обрабатывать только один запрос за один раз, поэтому он не может обрабатывать случай, когда диспетчер работает с одним соединением, скажем со средством запуска тестов, а вэто время поступает запрос на новое соединение, скажем от наблюдателя за репозитарием. Если это происходит, то наблюдателю за репозитарием придется ждать, пока первое соединение не будет полностью обработано и не будет отключено прежде, чем будет выполнено подключение наблюдателя. Это не годится для нашего случая, так как сервер диспетчера должен иметь возможность непосредственно и быстро подключаться ко всем средствам запуска тестов и наблюдателю за репозитарием.

Для того, чтобы сервер диспетчера обрабатывал несколько соединений одновременно, он использует специальный класс ThreadingTCPServer, который позволяет серверу SocketServer, используемому по умолчанию, работать с несколькими потоками. Это означает, что диспетчер может в любое время получить запрос на соединение и именно для нового подключения может запустить новый процесс. Это позволяет диспетчеру обрабатывать несколько запросов одновременно.

class ThreadingTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
    runners = [] # Хранит состояние пула средства запуска тестов
    dead = False # Указывает другим потокам на то, что мы больше не работаем
    dispatched_commits = {} # Хранит состояние коммитов, которые уже распределены на тестирование
    pending_commits = [] # Хранит состояние коммитов, которые мы должны еще распределить на тестирование

Сервер диспетчер работает при помощи создания специального обработчика для каждого запроса. Это определяется классом DispatcherHandler, который наследует от обработчика BaseRequestHandler сервера SocketServer. В этом базовом классе нам просто нужно определить функцию обработчика, который будет вызываться всякий раз, когда будет запрошено соединение. Функция обработчика, определяемая в DispatcherHandler, является нашим специальным обработчиком, который будет вызываться для каждого соединения. Он ищет запрос на соединение (информация о запросе содержится в self.request) и выполняет анализ команды, которая соединение запрашивает.

class DispatcherHandler(SocketServer.BaseRequestHandler):
    """
    Класс RequestHandler нашего диспетчера. Когда будет получен запрос
    на обработку коммита, он обратится к средству запуска тестов
    и обработает результаты его работы и результаты выполненных тестов
    """
    command_re = re.compile(r"(\w+)(:.+)*")
    BUF_SIZE = 1024
    def handle(self):
        self.data = self.request.recv(self.BUF_SIZE).strip()
        command_groups = self.command_re.match(self.data)
        if not command_groups:
            self.request.sendall("Invalid command")
            return
        command = command_groups.group(1)

Он обрабатывает четыре команды: status, register, dispatch и results. Команда status используется для проверки запущен и работает ли сервер диспетчера.

        if command == "status":
            print "in status"
            self.request.sendall("OK")

Для того, чтобы диспетчер делал что-нибудь полезное, у него должен быть зарегистрировано по крайней мере одно средство запуска тестов. Когда для пары хост:порт выполняется команда register, диспетчер сохраняет в списке информацию о средстве запуска тестов (объект средства запуска тестов подключен к объекту ThreadingTCPServer) с тем, чтобы позже, когда будет получен ID коммита, он мог обратиться к этому средства запуска тестов.

        elif command == "register":
            # Добавляет в наш пул это средство запуска тестов
            print "register"
            address = command_groups.group(2)
            host, port = re.findall(r":(\w*)", address)
            runner = {"host": host, "port":port}
            self.server.runners.append(runner)
            self.request.sendall("OK")

Команда dispatch используется наблюдателем за репозитарием для того, запускать тесты с некоторым коммитом. Формат этой команды следующий: dispatch:<ID коммита>. Диспетчеру извлекает ID коммита из этого сообщения и отправляет его средству запуска тестов.

elif command == "dispatch":
            print "going to dispatch"
            commit_id = command_groups.group(2)[1:]
            if not self.server.runners:
                self.request.sendall("No runners are registered")
            else:
                # Выполняется диспетчеризация запусков тестов
                self.request.sendall("OK")
                dispatch_tests(self.server, commit_id)

Команда results используется средством запуска тестов для того, чтобы сообщить о результатах завершения выполнения тестов. Формат этой команды следующий: results:<ID коммита>:<длина данных результата в байтах>:<разультат>. <ID коммита> используется для определения того, какой ID коммита будет использоваться для выполнения тестов. <длина данных результата в байтах> используется для того, чтобы указать, какого размера должен быть буфер для данных полученного результата. И, наконец, <разультат> содержит фактический полученные результаты.

        elif command == "results":
            print "got test results"
            results = command_groups.group(2)[1:]
            results = results.split(":")
            commit_id = results[0]
            length_msg = int(results[1])
            # 3 – это номер элемента ":" в команде sent
            remaining_buffer = self.BUF_SIZE - \
                (len(command) + len(commit_id) + len(results[1]) + 3)
            if length_msg > remaining_buffer:
                self.data += self.request.recv(length_msg - remaining_buffer).strip()
            del self.server.dispatched_commits[commit_id]
            if not os.path.exists("test_results"):
                os.makedirs("test_results")
            with open("test_results/%s" % commit_id, "w") as f:
                data = self.data.split(":")[3:]
                data = "\n".join(data)
                f.write(data)
            self.request.sendall("OK")

Перейти к следующей части статьи.