Наши партнеры

UnixForum





Библиотека сайта rus-linux.net

Веб бот, использующий сопрограммы asyncio (2 часть)

Предварительная публикация главы из сборника "500 строк или меньше", новой книги серии «The Architecture of Open Source Applications»

Оригинал: "A Web Crawler With asyncio Coroutines"
Автор: A. Jesse Jiryu Davis and Guido van Rossum
Дата публикации: 15 September 2015
Перевод: Н.Ромоданов
Дата перевода: январь 2016 г.

Это продолжение статьи. Начало смотрите здесь

Как работают генераторы языка Python

Прежде, чем разбираться с генераторами языка Python, вы должны понимать, как в нем работают обычные функции. Обычно, когда функция на языке Python вызывает подпрограмму, подпрограмма осуществляет управление до тех пор, пока не произойдет явный возврат управления или пока не возникнет исключение. Затем управление возвращается туда, откуда произошел вызов:

>>> def foo():
...     bar()
...
>>> def bar():
...     pass

Стандартный интерпретатор языка Python написан на С. Функция С, с помощью которой исполняется функция Python, называется PyEval_EvalFrameEx. Она берет объект, представляющий собой некоторый фрагмент стека языка Python, и выполняет интерпретацию байт-кода Python в контексте взятого фрагмента. Ниже приведен байт-код для функции foo:

>>> import dis
>>> dis.dis(foo)
  2           0 LOAD_GLOBAL              0 (bar)
              3 CALL_FUNCTION            0 (0 positional, 0 keyword pair)
              6 POP_TOP
              7 LOAD_CONST               0 (None)
             10 RETURN_VALUE

Функция foo загружает функцию bar в этот стек и вызывает ее, а затем выталкивает из стека возвращаемое значение, загружает None в стек и возвращает None.

Когда функция PyEval_EvalFrameEx встретит байт-код CALL_FUNCTION, она создает новый фрагмент стека Python и использует рекурсию: то есть, она рекурсивно вызывает PyEval_EvalFrameEx с новым фрагментом стека, который используется для выполнения функции bar.

Рис.9.1: Вызовы функций

Очень важно понимать, что фрагменты стека Python размещаются в динамической памяти типа «куча»! Интерпретатор языка Python является обычной программой на языке С, поэтому фрагменты его стека являются обычными фрагментами стека памяти. Но фрагменты стека языка Python, с которыми происходит вся манипуляция, находятся в памяти типа «куча». Кроме других сюрпризов, это также означает, что фрагмент стека языка может пережить вызов функции. Чтобы убедиться в этом в непосредственно, сохраните текущий фрагмент стека, используемый в bar:

>>> import inspect
>>> frame = None
>>> def foo():
...     bar()
...
>>> def bar():
...     global frame
...     frame = inspect.currentframe()
...
>>> foo()
>>> # The frame was executing the code for 'bar'.
>>> frame.f_code.co_name
'bar'
>>> # Its back pointer refers to the frame for 'foo'.
>>> caller_frame = frame.f_back
>>> caller_frame.f_code.co_name
'foo'

Теперь настал момент разобраться с генераторами языка Python, в которых используются те же самые строительные блоки — объекты с кодом и фрагменты стека — и это приводит к чудесному эффекту.

Ниже показана функция - генератор:

>>> def gen_fn():
...     result = yield 1
...     print('result of yield: {}'.format(result))
...     result2 = yield 2
...     print('result of 2nd yield: {}'.format(result2))
...     return 'done'
... 

Когда Python компилирует функцию gen_fn в байт-код, он видит инструкцию yield и знает, что функция gen_fn является функцией - генератором, а не обычной функцией. Он устанавливает флаг для того, чтобы запомнить этот факт:

>>> # The generator flag is bit position 5.
>>> generator_bit = 1 < 5
>>> bool(gen_fn.__code__.co_flags & generator_bit)
True

Когда вы вызываете функцию - генератор, то Python видит флаг, указывающий, что генератор, и, на самом деле, не запускает эту функцию. Вместо этого, он создает генератор:

>>> gen = gen_fn()
>>> type(gen)
<class 'generator'>

В генераторе Python инкапсулируется фрагмент стека плюс ссылка на некоторый код - на тело функции gen_fn:

>>> gen.gi_code.co_name
'gen_fn'

Все генераторы, осуществляющие вызов gen_fn, указывают на один и тот же код. Но у каждого из них есть свой собственный фрагмент стека. Этот фрагмент стека не является каким-либо реально существующим стеком, он находится в динамической памяти типа «куча» и ожидает момента, когда он будет использован:

Рис.9.2. Генераторы

Во фрагменте стека есть указатель на "последнюю команду", инструкцию, которая будет выполняться последней. Первоначально значение указателя на последнюю инструкцию равно -1, что означает, что генератор не начал работать:

>>> gen.gi_frame.f_lasti
-1

Когда мы вызываем функцию send, генератор сначала берет из нее инструкцию yield и переходит в паузу. Значение, возвращаемое send, равно 1 поскольку это именно то, что gen передает в выражение yield:

>>> gen.send(None)
1

В указателе инструкций генератора теперь находятся начальные 3 байт-кода, т. е. часть из 56 байтов, откомпилированных с помощью Python:

>>> gen.gi_frame.f_lasti
3
>>> len(gen.gi_code.co_code)
56

Работа генератор может быть возобновлена в любое время, из любой функции, поскольку его фрагмент стека на самом деле находится не в стеке: это память типа «куча». Его положение в иерархии вызовов не зафиксировано, и он не должен подчиняться правилу выполнения «первый пришел — последним ушел», которому подчиняются обычные функции. Он свободен от этого и просто плавает наподобие облака.

Мы можем послать в генератор значение "hello", и оно становится результатом вычисления выражения yield, а генератор продолжит выполняться до тех пор, пока не возвратит значение 2:

>>> gen.send('hello')
result of yield: hello
2

Теперь во фрагменте его стека несть локальная переменная result:

>>> gen.gi_frame.f_locals
{'result': 'hello'}

В других генераторах, которые создают gen_fn, будут свои собственные фрагменты стека и свои собственные локальные переменные.

Когда мы снова вызовем функцию send, генератор продолжает выполнять со второго значения yield и выполнение завершится возникновением специального исключительного состояния StopIteration:

>>> gen.send('goodbye')
result of 2nd yield: goodbye
Traceback (most recent call last):
  File "<input>", line 1, in <module>
StopIteration: done

Для этого исключительного состояния есть значение, которое возвращает значение генератора: строку "done" ("выполнено").

Создание сопрограмм при помощи генераторов

Итак, генератор может перейти в паузу, может возобновить работу с использованием некоторого значения и может вернуть значение. Похоже, что это хороший примитив, на котором строится модель асинхронного программирования без спагетти из обратных вызовов! Мы хотим построить "сопрограмму": процедуру, работа которой будет регулярно планироваться вместе с другими процедурами, которые есть в программе. Наши сопрограммы будут упрощенной версией тех, что имеются в стандартной библиотек "asyncio" языка Python. Как и в asyncio, мы будем использовать генераторы, фьючерсы и инструкцию "yield from".

Во-первых нам нужен некоторый способ представления некоторого будущего результата — фьючерса (future), который мы ждем от сопрограммы. Сокращенный вариант имеет следующий вид:

class Future:
    def __init__(self):
        self.result = None
        self._callbacks = []

    def add_done_callback(self, fn):
        self._callbacks.append(fn)

    def set_result(self, result):
        self.result = result
        for fn in self._callbacks:
            fn(self)

Первоначально фьючерс имеет состояние "pending" ("ожидание"). Он будет переведен в состояние "resolved" ("решено") при помощи вызова set_result [9].

Давайте адаптируем наш сборщик ссылок для использования фьючерсов и сопрограмм. Посмотрим, как мы написали fetch с использованием обратного вызова:

class Fetcher:
    def fetch(self):
        self.sock = socket.socket()
        self.sock.setblocking(False)
        try:
            self.sock.connect(('xkcd.com', 80))
        except BlockingIOError:
            pass
        selector.register(self.sock.fileno(),
                          EVENT_WRITE,
                          self.connected)

    def connected(self, key, mask):
        print('connected!')
        # И так далее....

Метод fetch начинает подключение к сокету, а затем регистрирует обратный вызов connected, который должен быть выполнен, когда сокет будет готов к использованию. Теперь мы можем объединить эти два шага в одной сопрограмме:

    def fetch(self):
        sock = socket.socket()
        sock.setblocking(False)
        try:
            sock.connect(('xkcd.com', 80))
        except BlockingIOError:
            pass

        f = Future()

        def on_connected():
            f.set_result(None)

        selector.register(sock.fileno(),
                          EVENT_WRITE,
                          on_connected)
        yield f
        selector.unregister(sock.fileno())
        print('connected!')

Теперь fetch является функцией — генератором, а не обычной функцией, поскольку в ней есть инструкция yield. Мы создаем ожидающий фьючерс, а затем переводим fetch в режим паузы до тех пор, пока сокет не будет готов. Внутренняя функция on_connected выступает в роли фьючерса.

Но как генератор возобновит работу, когда наступит время действия фьючерса? Нам нужен драйвер сопрограмм. Назовем его "task":

class Task:
    def __init__(self, coro):
        self.coro = coro
        f = Future()
        f.set_result(None)
        self.step(f)

    def step(self, future):
        try:
            next_future = self.coro.send(future.result)
        except StopIteration:
            return

        next_future.add_done_callback(self.step)

# Begin fetching http://xkcd.com/353/
fetcher = Fetcher('/353/')
Task(fetcher.fetch())

loop()

Задача task запускает генератор fetch при помощи отсылки ему значения None. Затем fetch будет работать до тех пор, пока не будет получено значение фьючерса, которое task получит как next_future. Когда подключение к сокету произойдет, то цикл событий запустит обратный вызов on_connected, который будет представлять собой фьючерс и который вызовет метод step, в результате чего сборщик fetch возобновит свою работу.

Факторизация сопрограмм с помощью yield from

Как только будет выполнено подключение к сокету, мы отправляем запрос HTTP GET и читаем ответ сервера. Теперь эти действия не должны больше быть разбросаны по различным обратным вызовам; мы собрали их в одной и той же функции - генераторе:

    def fetch(self):
        # ... connection logic from above, then:
        sock.send(request.encode('ascii'))

        while True:
            f = Future()

            def on_readable():
                f.set_result(sock.recv(4096))

            selector.register(sock.fileno(),
                              EVENT_READ,
                              on_readable)
            chunk = yield f
            selector.unregister(sock.fileno())
            if chunk:
                self.response += chunk
            else:
                # Done reading.
                break

Оказывается, что обычно используется такой код, который читает все сообщение из сокета. Как мы можем переместить его из метода fetch в сопрограмму? Теперь, когда есть Python 3, эту операцию выполняет инструкция yield from. Она позволяет одному генератору делегировать действие другому генератору.

Чтобы увидеть, как это происходит, давайте вернемся к нашему простому примеру генератора:

>>> def gen_fn():
...     result = yield 1
...     print('result of yield: {}'.format(result))
...     result2 = yield 2
...     print('result of 2nd yield: {}'.format(result2))
...     return 'done'
... 

Чтобы вызвать такой генератор из другого генератора, делегируем часть действий второму генератору:

>>> # Generator function:
>>> def caller_fn():
...     gen = gen_fn()
...     rv = yield from gen
...     print('return value of yield-from: {}'
...           .format(rv))
...
>>> # Make a generator from the
>>> # generator function.
>>> caller = caller_fn()

Генератор caller действует точно также, как если бы он был генератором gen, т. е. Генератором, которому делегировано следующее:

>>> caller.send(None)
1
>>> caller.gi_frame.f_lasti
15
>>> caller.send('hello')
result of yield: hello
2
>>> caller.gi_frame.f_lasti  # Hasn't advanced.
15
>>> caller.send('goodbye')
result of 2nd yield: goodbye
return value of yield-from: done
Traceback (most recent call last):
  File "%lt;input>", line 1, in <module>
StopIteration

Хотя caller получает управление от gen, переход дальше в caller не происходит. Обратите внимание, что его указатель команд остается на позиции 15, на месте, где находится его инструкция yield from, несмотря даже на то, что внутренний генератор gen переходит от одной инструкции yield к следующей [10]. Если наблюдать за этим извне caller, мы не можем сказать, получены ли значения из caller или от генератора, которому были делегированы действия. А изнутри gen, мы не можем сказать, были ли значения посланы из caller или извне caller. Инструкция yield from является каналом, через который значение идут в и из gen до тех пор, пока gen не завершит свою работу.

Сопрограмма может делегировать работу суб-собпрограмме с помощью инструкции yield from и получить от нее результат работы. Заметьте, что выше caller выдает сообщение "return value of yield-from: done" ("возврат значения из yield: выполнен"). Когда gen завершит работу, то он вернет значение, которое становится значением инструкции yield from в caller:

rv = yield from gen

Раньше, когда мы критиковали асинхронное программирование на основе обратных вызовов, наша самая главная претензия касалась "исчезновение состояния стека": когда в функции обратного вызова возникает исключение, трассировка стека, как правило, оказывается бесполезной. Она только показывает, что в цикле обработки событий выполнялся обратный вызов, но не указывает, почему возникло исключение. А какова ситуация у сопрограмм?

>>> def gen_fn():
...     raise Exception('my error')
>>> caller = caller_fn()
>>> caller.send(None)
Traceback (most recent call last):
  File "<input>", line 1, in <module>
  File "<input>", line 3, in caller_fn
  File "<input>", line 2, in gen_fn
Exception: my error

Здесь трассировка значительно полезнее! Она показывает, что caller_fn делегировал свои действия в gen_fn в тот момент, когда возникла ошибка. И более удобно то, что мы можем заключить вызов суб-собпрограммы внутрь обработчика исключений точно также, как и в случае использования обычных сопрограмм:

>>> def gen_fn():
...     yield 1
...     raise Exception('uh oh')
...
>>> def caller_fn():
...     try:
...         yield from gen_fn()
...     except Exception as exc:
...         print('caught {}'.format(exc))
...
>>> caller = caller_fn()
>>> caller.send(None)
1
>>> caller.send('hello')
caught uh oh

Поэтому логика использования суб-сопрограмм точно такая же, как и обычных подпрограммам. Давайте выделим из нашего сборщика несколько полезных суб-сопрограмм. Мы напишем сопрограмму read, которая будет считывать один фрагмент данных:

def read(sock):
    f = Future()

    def on_readable():
        f.set_result(sock.recv(4096))

    selector.register(sock.fileno(), EVENT_READ, on_readable)
    chunk = yield f  # Read one chunk.
    selector.unregister(sock.fileno())
    return chunk

Мы на основе read создаем сопрограмму read_all, которая будет получать все сообщение:

def read_all(sock):
    response = []
    # Read whole response.
    chunk = yield from read(sock)
    while chunk:
        response.append(chunk)
        chunk = yield from read(sock)

    return b''.join(response)

Если вы не будете приглядываться и обращать внимание на инструкции yield from, то все выглядит, как если бы использовались обычные функции с блокирующим вводом/выводом. Но на самом деле, read и read_all являются сопрограммами. read_all переуступает свою работу read и уходит в паузу до тех пор, пока не завершится ввод/вывод. Пока read_all находится в паузе, цикл событий asyncio выполняет другую работу и ожидает других событий ввода/вывода; как только событие произойдет, read_all на следующем такте цикла получит результат от read и возобновит свою работу.

На самом дне стека fetch вызывает read_all:

class Fetcher:
    def fetch(self):
         # ... connection logic from above, then:
        sock.send(request.encode('ascii'))
        self.response = yield from read_all(sock)

Замечательно то, что класс Task не нужно модифицировать. Это ведет к тому, что внешняя сопрограмма fetch будет выглядетьл точно так, как и раньше:

Task(fetcher.fetch())
loop()

Когда read получает фьючерс, то задача получает его через канал инструкции yield from точно также, как если бы фьючерс был бы получен непосредственно fetch. Когда фьючерс будет получен в цикле, то задача отправит результат в fetch, и значение, которое получает read, будет точно таким же, как если бы задача читала его прямо из read:

Рис.9.3. Механизм Yield From

Чтобы улучшить реализацию наших сопрограмм, мы устраним еще одну помеху: в нашем коде используется yield, когда он ожидает фьючерс, и используется yield from в случае, когда действие делегируется в суб-сопрограмму. Было бы лучше, если бы мы всякий раз, когда сопрограмма переходит в паузу, использовали инструкцию yield from. Тогда сопрограмме не потребуется разбираться с тем, какой тип данных она ожидает.

Мы воспользоваться соответствием, которое есть в языке Python между генераторами и итераторами. Возможности в генераторе с точки зрения реализации вызова, точно такие же, как и у итератора. Поэтому наш класс Future будет итерируемым благодаря реализации специального метода:

    # Method on Future class.
    def __iter__(self):
        # Tell Task to resume me here.
        yield self
        return self.result

Метод __iter__, реализуемый фьючерсом, является сопрограммой, через которую можно будет получить этот фьючерс. Теперь, когда мы заменяем следующий код:

# f is a Future.
yield f

… на код:

# f is a Future.
yield from f

... мы получаем тот же самый результат! Работающий код Task получает фьючерс из своего вызова self.coro.send(result) и, когда фьючерс будет известен, посылает новый результат обратно в сопрограмму.

В чем преимущество обращения отовсюду к yield from? Почему это лучше, чем ожидать фьючерс с помощью yield и применять делегирование в под-сопрограмму с использованием yield from? Это лучше, потому что сейчас можно свободно менять реализацию метода и это не сказывается на вызывающей части: это может быть обычный метод, который возвращает фьючерс, с помощью которого получают значение, либо это может быть сопрограмма, в которой есть инструкции yield from и которая возвращает значение. В любом случае, в вызывающем коде для того, чтобы ожидать результат, потребуется только метод yield from.

Уважаемый читатель, мы подошли к концу нашего приятного рассказа о сопрограммах в asyncio. Мы взглянули на методику использования и нарисовали общую схему реализации фьючерсов и задач. Мы рассказали, как в asyncio достигается лучшее из обоих миров: параллельный ввод/вывод, который более эффективен, чем потоки, и более понятен, чем обратные вызовы. Конечно, в настоящей библиотеке asyncio все гораздо сложнее, чем в нашем наброске. В реальном фреймворке используются ввод/вывод без копирования (zero-copy I/O), планирование специального вида (fair scheduling), обработка исключений и обилие других возможностей.

Для пользователя asyncio кодирование с использованием сопрограмм будет более простым, чем то, что вы здесь видели. В коде, который приведен выше, мы реализовывали сопрограммы с использованием первоначальных подходов, и поэтому вы видели обратные вызовы, задачи и фьючерсы. Вы даже видели неблокирующие сокеты и обращение к инструкции select. Но когда подошло время строить приложение с использованием asyncio, в коде ничего из этого не появляется. Как мы и обещали, вы можете помещать адрес URL в список с помощью следующего достаточно гладкого кода:

    @asyncio.coroutine
    def fetch(self, url):
        response = yield from self.session.get(url)
        body = yield from response.read()

Довольные полученным результатом, мы возвращаемся к нашей первоначальной назначение: написать асинхронный веб-сканер с использованием asyncio.

Продолжение статьи