Наши партнеры

UnixForum





Библиотека сайта rus-linux.net

Система непрерывной интеграции

Оригинал: A Continuous Integration System
Автор: Malini Das
Дата публикации: July 12, 2016
Перевод: Н.Ромоданов
Дата перевода: октябрь 2016 г.

Предыдущая часть статьи

Средство запуска тестов (test_runner.py)

Средство запуска тестов отвечает за выполнение тестов для ID заданного коммита и представления результатов. Он взаимодействие только с сервером диспетчера, ответственного за предоставление ему идентификаторов коммитов, для которых будут выполняться тесты и для которых будут получены результаты тестирования.

Когда вызывается файл test_runner.py, он вызывает функцию, которая запускает сервер средств запуска тестов, а также запускает поток для чтобы запуска функцию dispatcher_checker. Поскольку этот процесс запуска очень похож на те, которые описаны для repo_observer.py и dispatcher.py, мы здесь опустим его описание.

Функция dispatcher_checker пингует сервер диспетчера каждые пять секунд для того, чтобы убедиться, что он поднят и работает. Это важно для управления ресурсами. Если диспетчер остановится, то будет остановлена работа средства запуска тестов, поскольку оно не сможет выполнять какую-либо значимую работу из-за того, что диспетчер не сможет ничего передать на тестирование.

    def dispatcher_checker(server):
        while not server.dead:
            time.sleep(5)
            if (time.time() - server.last_communication) > 10:
                try:
                    response = helpers.communicate(
                                       server.dispatcher_server["host"],
                                       int(server.dispatcher_server["port"]),
                                       "status")
                    if response != "OK":
                        print "Dispatcher is no longer functional"
                        server.shutdown()
                        return
                except socket.error as e:
                    print "Can't communicate with dispatcher: %s" % e
                    server.shutdown()
                    return

Средство запуска тестов представляет собой сервер ThreadingTCPServer, который похож на сервер диспетчера. Ему требуются отдельные потоки выполнения, поскольку он будет не только указывать какой ID коммита будет использоваться для запуска, но также будет пока выполняются тесты периодически пинговать средство запуска тестов с тем, чтобы убедиться, что оно все еще доступно.

class ThreadingTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
    dispatcher_server = None # Хранит информацию о хосте/порте сервера диспетчера
    last_communication = None # Хранит информацию о последней связи с диспетчером
    busy = False # Флаг состояния
    dead = False # Флаг состояния

Взаимодействие начинается с того, что диспетчер обращается к средству запуска тестов с тем, чтобы последний принял ID коммита для запуска тестов. Если средство запуска тестов готово выполнить задание, то оно отвечает подтверждением серверу диспетчер, который затем закрывает соединение. Для того, чтобы сервер, на котором запускаются тесты, мог выполнять тесты и одновременно принимать еще запросы от диспетчера, он запускает запрошенное задание на тестирование в новом потоке.

Это означает, что, когда сервер диспетчера делает запрос (пинг, в данном случае) и ожидает ответа, то это будет происходить в отдельном потоке тогда, как средство запуска тестов будет занято выполнением тестов в своем собственном потоке. Это позволяет серверу, на котором запускаются тесты, обрабатывать несколько задач одновременно. Можно вместо такой многопоточной архитектуры сделать так, чтобы сервер диспетчера был постоянно подключенным к каждому экземпляру средства запуска тестов, но это приведет к увеличенному потреблению памяти сервера диспетчера и архитектура станет уязвимым к проблемам с сетью в случаях, когда соединения будут случайно пропадать.

Сервер, на котором запускаются тесты, отвечает на два варианта сообщений, поступающих от диспетчера. Первый вариант — это сообщение ping, которое используется сервером диспетчера для того, чтобы убедиться, что средство запуска тестов все еще активно.

class TestHandler(SocketServer.BaseRequestHandler):
    ...

    def handle(self):
        ....
        if command == "ping":
            print "pinged"
            self.server.last_communication = time.time()
            self.request.sendall("pong")

Второй вариант — это сообщение runtest, которое имеет вид runtest:<ID коммита> и используется для инициализации запуска тестов для заданного коммита. Когда будет получено сообщнение runtest, средство запуска тестов проверит можно ли запустить этот тест, и если этого сделать нельзя, то диспетчеру будет возвращено сообщение BUSY (Занято). Если это сделать будет можно, то в ответ на сервер будет послано сообщение OK, будет установлен статус занятости и будет запущена функция run_tests.

        elif command == "runtest":
            print "got runtest command: am I busy? %s" % self.server.busy
            if self.server.busy:
                self.request.sendall("BUSY")
            else:
                self.request.sendall("OK")
                print "running"
                commit_id = command_groups.group(2)[1:]
                self.server.busy = True
                self.run_tests(commit_id,
                               self.server.repo_folder)
                self.server.busy = False

Эта функция вызывает скрипт командной оболочки

test_runner_script.sh

, который обновляет репозитарий для заданного ID коммита. После того, как скрипт вернет управление, то в случае, если обновление репозитария будет успешным, мы выполним тесты с использованием функции unittest и в файле соберем результаты. Когда средство запуска тестов завершит свою работу, оно считает результаты тестирования из файла и отправит их диспетчеру в результирующем сообщении.

    def run_tests(self, commit_id, repo_folder):
        # обновляем репозитарий
        output = subprocess.check_output(["./test_runner_script.sh",
                                        repo_folder, commit_id])
        print output
        # запускаем тесты
        test_folder = os.path.join(repo_folder, "tests")
        suite = unittest.TestLoader().discover(test_folder)
        result_file = open("results", "w")
        unittest.TextTestRunner(result_file).run(suite)
        result_file.close()
        result_file = open("results", "r")
        # отправляем диспетчеру результаты
        output = result_file.read()
        helpers.communicate(self.server.dispatcher_server["host"],
                            int(self.server.dispatcher_server["port"]),
                            "results:%s:%s:%s" % (commit_id, len(output), output))

Ниже приведен текст скрипта test_runner_script.sh:

#!/bin/bash
REPO=$1
COMMIT=$2
source run_or_fail.sh
run_or_fail "Repository folder not found" pushd "$REPO" 1> /dev/null
run_or_fail "Could not clean repository" git clean -d -f -x
run_or_fail "Could not call git pull" git pull
run_or_fail "Could not update to given commit hash" git reset --hard "$COMMIT"

Для того чтобы запустить test_runner.py, вы должны указать ему клон репозитария, предназначенного для выполнения тестов. Для этого вы можете в качестве аргумента использовать ранее созданный клон /path/to/test_repo test_repo_clone_runner. По умолчанию test_runner.py запустит свой собственный сервер на localhost, используя порт в диапазоне 8900-9000, а затем будет пытаться подключиться к серверу диспетчера на localhost:8888. Вы можете воспользоваться необязательными параметрами для того, чтобы изменить эти значения. Воспользуйтесь параметрами --host и --port для указания конкретного адреса для запуска сервера, на котором будут выполняться тесты, и параметром --dispatcher-server для указания адреса диспетчера.

Перейти к следующей части статьи.