Наши партнеры

UnixForum





Библиотека сайта rus-linux.net

На главную -> MyLDP -> Электронные книги по ОС Linux
Цилюрик О.И. Модули ядра Linux
Назад Внешние интерфейсы модуля Вперед

Неблокирующий ввод-вывод и мультиплексирование

Здесь я имею в виду реализацию в модуле (драйвере) поддержки операций мультиплексированного ожидания возможности выполнения операций ввода-вывода: select() и poll(). Примеры этого раздела будут много объёмнее и сложнее, чем все предыдущие, в примерах будут использованы механизмы ядра, которые мы ещё не затрагивали, и которые будут рассмотрены далее... Но сложность эта обусловлена тем, что здесь мы начинаем вторгаться в обширную и сложную область: неблокирующие и асинхронные операции ввода-вывода. При первом прочтении этот раздел можно пропустить — на него никак не опирается всё последующее изложение.

Примечание: Наилучшую (наилучшую из известных мне) классификаций типов операций ввода-вывода дал У.Р. Стивенс [19], он выделяет 5 категорий, которые принципиально различаются:

  • блокируемый ввод-вывод;
  • неблокируемый ввод-вывод;
  • мультиплексирование ввода-вывода (функции select() и poll());
  • ввод-вывод, управляемый сигналом (сигнал SIGIO);
  • асинхронный ввод-вывод (функции POSIX.1 aio_*()).

Примеры использования их обстоятельнейшим образом описаны в книге того же автора [20], на которое мы будем, без излишних объяснений, опираться в своих примерах.

Сложность описания подобных механизмов и написания демонстрирующих их примеров состоит в том, чтобы придумать модель-задачу, которая: а). достаточно адекватно использует рассматриваемый механизм и б). была бы до примитивного простой, чтобы её код был не громоздким, легко анализировался и мог использоваться для дальнейшего развития. В данном разделе мы реализуем драйвер (архив poll.tgz) устройства (и тестовое окружение к нему), которое функционирует следующим образом:

  • устройство допускает неблокирующие операции записи (в буфер) — в любом количестве, последовательности и в любое время; операция записи обновляет содержимое буфера устройства и устанавливает указатель чтения в начало нового содержимого;
  • устройство чтения может запрашивать любое число байт в последовательных операциях (от 1 до 32767), последовательные чтения приводят к ситуации EOF (буфер вычитан до конца), после чего следующие операции read() или poll() будут блокироваться до обновления данных операцией write();
  • может выполняться операция read() в неблокирующем режиме, при исчерпании данных буфера она будет возвращать признак «данные не готовы».

К модулю мы изготовим тесты записи (pecho — подобие echo) и чтения (pcat — подобие cat), но позволяющие варьировать режимы ввода-вывода... И, конечно, с этим модулем должны работать и объяснимо себя вести наши неизменные POSIX-тесты echo и cat. Для согласованного поведения всех составляющих эксперимента, общие их части вынесены в два файла *.h :

poll.h :

	#define DEVNAME "poll" 
	#define LEN_MSG 160 

	#ifndef __KERNEL__         // only user space applications 
	#include <stdio.h> 
	#include <stdlib.h> 
	#include <string.h> 
	#include <unistd.h> 
	#include <fcntl.h> 
	#include <poll.h> 
	#include <errno.h> 
	#include "user.h" 

	#else                      // for kernel space module 
	#include <linux/module.h> 
	#include <linux/miscdevice.h>
	#include <linux/poll.h> 
	#include <linux/sched.h> 
	#endif 

Второй файл (user.h) используют только тесты пространства пользователя, мы их посмотрим позже, а пока — сам модуль устройства:

poll.c :

	#include "poll.h" 

	MODULE_LICENSE( "GPL" );
	MODULE_AUTHOR( "Oleg Tsiliuric <olej@front.ru>" ); 
	MODULE_VERSION( "5.2" );

	static int pause = 100;       // задержка на операции poll, мсек.
	module_param( pause, int, S_IRUGO ); 

	static struct private {       // блок данных устройства 
	   atomic_t roff;             // смещение для чтения 
	   char buf[ LEN_MSG + 2 ];   // буфер данных 
	} devblock = {   // статическая инициализация того, что динамически делается в open() 
	   .roff = ATOMIC_INIT( 0 ), 
	   .buf = "not initialized yet!\n", 
	}; 
	static struct private *dev = &devblock; 

	static DECLARE_WAIT_QUEUE_HEAD( qwait ); 
	
	static ssize_t read( struct file *file, char *buf, size_t count, loff_t *ppos ) { 
	   int len = 0; 
	   int off = atomic_read( &dev->roff ); 
	   if( off > strlen( dev->buf ) ) {         // нет доступных данных 
	      if( file->f_flags & O_NONBLOCK ) 
	         return -EAGAIN; 
	      else interruptible_sleep_on( &qwait ); 
	   } 
	   off = atomic_read( &dev->roff );         // повторное обновление 
	   if( off == strlen( dev->buf ) ) { 
	      atomic_set( &dev->roff, off + 1 ); 
	      return 0;                            // EOF 
	   } 
	   len = strlen( dev->buf ) - off;          // данные есть (появились?) 
	   len = count < len ? count : len; 
	   if( copy_to_user( buf, dev->buf + off, len ) ) 
	      return -EFAULT; 
	   atomic_set( &dev->roff, off + len ); 
	   return len; 
	} 
	
	static ssize_t write( struct file *file, const char *buf, size_t count, loff_t *ppos ) { 
	   int res, len = count < LEN_MSG ? count : LEN_MSG; 
	   res = copy_from_user( dev->buf, (void*)buf, len ); 
	   dev->buf[ len ] = '\0';                 // восстановить завершение строки 
	   if( '\n' != dev->buf[ len - 1 ] ) strcat( dev->buf, "\n" ); 
	   atomic_set( &dev->roff, 0 );             // разрешить следующее чтение 
	   wake_up_interruptible( &qwait ); 
	   return len; 
	} 

	unsigned int poll( struct file *file, struct poll_table_struct *poll ) { 
	   int flag = POLLOUT | POLLWRNORM; 
	   poll_wait( file, &qwait, poll ); 
	   sleep_on_timeout( &qwait, pause ); 
	   if( atomic_read( &dev->roff ) <= strlen( dev->buf ) ) 
	      flag |= ( POLLIN | POLLRDNORM ); 
	   return flag; 
	}; 

	static const struct file_operations fops = { 
	   .owner = THIS_MODULE, 
	   .read  = read, 
	   .write = write, 
	   .poll  = poll, 
	}; 
	
	static struct miscdevice pool_dev = { 
	   MISC_DYNAMIC_MINOR, DEVNAME, &fops 
	}; 
	
	static int __init init( void ) { 
	   int ret = misc_register( &pool_dev ); 
	   if( ret ) printk( KERN_ERR "unable to register device\n" ); 
	   return ret; 
	} 
	module_init( init ); 
	
	static void __exit exit( void ) { 
	   misc_deregister( &pool_dev ); 
	} 
	module_exit( exit ); 

По большей части здесь использованы элементы уже рассмотренных ранее примеров, принципиально новые вещи относятся к реализации операции poll() и блокирования:

  • Операции poll() вызывает (всегда) poll_wait() для одной (в нашем случае это qwait), или нескольких (часто одна очередь для чтения и одна для записи);
  • Далее производится анализ доступности условий для выполнения операций записи и чтения, и на основе этого анализа и возвращается флаг результата (биты тех операций, которые могут быть выполнены вослед без блокирования);
  • В операции read() может быть указан неблокирующий режим операции: бит O_NONBLOCK в поле f_flags переданной параметром struct file ...
  • Если же затребована блокирующая операция чтения, а данные для её выполнения недоступны, вызывающий процесс блокируется;
  • Разблокирован читающий процесс будет при выполнении более поздней операции записи (в условиях теста — с другого терминала).

Теперь относительно процессов пространства пользователя. Вот обещанный общий включаемый файл:

user.h :

	#define ERR(...) fprintf( stderr, "\7" __VA_ARGS__ ), exit( EXIT_FAILURE ) 

	struct parm { 
	   int blk, vis, mlt; 
	}; 
	struct parm parms( int argc, char *argv[], int par ) { 
	   int c; 
	   struct parm p = { 0, 0, 0 }; 
	   while( ( c = getopt( argc, argv, "bvm" ) ) != EOF ) 
	      switch( c ) { 
	         case 'b': p.blk = 1; break; 
	         case 'm': p.mlt = 1; break; 
	         case 'v': p.vis++; break;
	         default: goto err; 
	   } 
	   if( par > 0 && ( argc - optind ) < par ) goto err; 
	   return p; 
	err: 
	   ERR( "usage: %s [-b][-m][-v] %s\n", argv[ 0 ], par < 0 ? 
	        "[<read block size>]" : "<write string>" ); 
	} 

	int opendev( void ) { 
	   char name[ 40 ] = "/dev/";
	   int dfd;                             // дескриптор устройства 
	   strcat( name, DEVNAME ); 
	   if( ( dfd = open( name, O_RDWR ) ) < 0 ) 
	      ERR( "open device error: %m\n" ); 
	   return dfd; 
	} 
	
	void nonblock( int dfd ) {              // операции в режиме O_NONBLOCK 
	   int cur_flg = fcntl( dfd, F_GETFL ); 
	   if( -1 == fcntl( dfd, F_SETFL, cur_flg | O_NONBLOCK ) ) 
	      ERR( "fcntl device error: %m\n" ); 
	} 
	
	const char *interval( struct timeval b, struct timeval a ) { 
	   static char res[ 40 ]; 
	   long msec = ( a.tv_sec - b.tv_sec ) * 1000 + ( a.tv_usec - b.tv_usec ) / 1000; 
	   if( ( a.tv_usec - b.tv_usec ) % 1000 >= 500 ) msec++; 
	   sprintf( res, "%02d:%03d", msec / 1000, msec % 1000 ); 
	   return res; 
	}; 

Тест записи

pecho.c :

	#include "poll.h" 
	int main( int argc, char *argv[] ) { 
	   struct parm p = parms( argc, argv, 1 ); 
	   const char *sout = argv[ optind ]; 
	   if( p.vis > 0 ) 
	      fprintf( stdout, "nonblocked: %s, multiplexed: %s, string for output: %s\n",
	         ( 0 == p.blk ? "yes" : "no" ), 
	         ( 0 == p.mlt ? "yes" : "no" ), 
	         argv[ optind ] ); 
	   int dfd = opendev();                      // дескриптор устройства 
	   if( 0 == p.blk ) nonblock( dfd ); 
	   struct pollfd client[ 1 ] = { 
	      { .fd = dfd, 
	        .events = POLLOUT | POLLWRNORM, 
	      } 
	   }; 
	   struct timeval t1, t2; 
	   gettimeofday( &t1, NULL ); 
	   int res; 
	   if( 0 == p.mlt ) res = poll( client, 1, -1 ); 
	   res = write( dfd, sout, strlen( sout ) ); // запись 
	   gettimeofday( &t2, NULL ); 
	   fprintf( stdout, "interval %s write %d bytes: ", interval( t1, t2 ), res ); 
	   if( res < 0 ) ERR( "write error: %m\n" ); 
	   else if( 0 == res ) { 
	      if( errno == EAGAIN ) 
	         fprintf( stdout, "device NOT READY!\n" ); 
	   } 
	   else fprintf( stdout, "%s\n", sout ); 
	   close( dfd ); 
	   return EXIT_SUCCESS; 
	}; 

Формат запуска этой программы (но если вы ошибётесь с опциями и параметрами, то оба из тестов выругаются и подскажут правильный синтаксис):

$ ./pecho

usage: ./pecho [-b][-m][-v] <write string>

где:

-b — установить блокирующий режим операции (по умолчанию неблокирующий);

-m — не использовать ожидание на poll() (по умолчанию используется);

-v — увеличить степень детализации вывода (для отладки);

Параметром задана строка, которая будет записана в устройство /dev/poll, если строка содержит пробелы или другие спецсимволы, то она, естественно, должна быть заключена в кавычки.

Тест чтения (главное действующее лицо всего эксперимента, из-за чего всё делалось):

pcat.c :

	#include "poll.h" 
	int main( int argc, char *argv[] ) { 
	   struct parm p = parms( argc, argv, -1 ); 
	   int blk = LEN_MSG; 
	   if( optind < argc && atoi( argv[ optind ] ) > 0 ) 
	      blk = atoi( argv[ optind ] ); 
	   if( p.vis > 0 ) 
	      fprintf( stdout, "nonblocked: %s, multiplexed: %s, read block size: %s bytes\n",
	         ( 0 == p.blk ? "yes" : "no" ), 
	         ( 0 == p.mlt ? "yes" : "no" ), 
	         argv[ optind ] ); 
	   int dfd = opendev();                // дескриптор устройства 
	   if( 0 == p.blk ) nonblock( dfd ); 
	   struct pollfd client[ 1 ] = { 
	      { .fd = dfd, 
	        .events = POLLIN | POLLRDNORM, 
	      } 
	   }; 
	   while( 1 ) { 
	      char buf[ LEN_MSG + 2 ];         // буфер данных 
	      struct timeval t1, t2; 
	      int res; 
	      gettimeofday( &t1, NULL ); 
	      if( 0 == p.mlt ) res = poll( client, 1, -1 ); 
	      res = read( dfd, buf, blk );      // чтение 
	      gettimeofday( &t2, NULL ); 
	      fprintf( stdout, "interval %s read %d bytes: ", interval( t1, t2 ), res ); 
	      fflush( stdout ); 
	      if( res < 0 ) { 
	         if( errno == EAGAIN ) { 
	            fprintf( stdout, "device NOT READY\n" ); 
	            if( p.mlt != 0 ) sleep( 3 ); 
	         } 
	         else 
	            ERR( "read error: %m\n" ); 
	      } 
	      else if( 0 == res ) { 
	         fprintf( stdout, "read EOF\n" ); 
	         break; 
	      } 
	      else { 
	         buf[ res ] = '\0'; 
	         fprintf( stdout, "%s\n", buf ); 
	      } 
	   } 
	   close( dfd ); 
	   return EXIT_SUCCESS; 
	}; 

Для теста чтения опции гораздо важнее, чем для предыдущего, но они почти те же:

$ ./pcat -w

	./pcat: invalid option -- 'w' 
	usage: ./pcat [-b][-m][-v] [<read block size>] 

- отличие только в необязательном параметре, который на этот раз несёт смысл: размер блока (в байтах), который читать за одну операцию чтения (если он не указан то читается максимальный размер буфера).

И окончательно наблюдаем как это всё работает...

Примечание: У этого набора тестов множество степеней свободы (набором опций), позволяющих наблюдать самые различные операции: блокирующие и нет, с ожиданием на poll() и нет, и др. Ниже показывается только самый характерный набор результатов.

$ sudo insmod poll.ko

$ ls -l /dev/po*

	crw-rw---- 1 root root 10, 54 Июн 30 11:57 /dev/poll 
	crw-r----- 1 root kmem  1,  4 Июн 30 09:52 /dev/port 

Запись производим сколько угодно раз последовательно:

$ echo qwerqr > /dev/poll

$ echo qwerqr > /dev/poll

$ echo qwerqr > /dev/poll

А вот чтение можем произвести только один раз:

$ cat /dev/poll

qwerqr

При повторной операции чтения:

$ cat /dev/poll

...

12346456

- операция блокируется и ожидает (там, где нарисованы: ...), до тех пор, пока с другого терминала на произведена операция:

$ echo 12346456 > /dev/poll

И, как легко можно видеть, заблокированная операция cat после разблокирования выводит уже новое, обновлённое значение буфера устройства (а не то, которое было в момент запуска cat).

Теперь посмотрим что говорят наши, более детализированные тесты... Вот итог повторного (блокирующегося) чтения, в режиме блокировки на pool() и циклическим чтением по 3 байта:

$ ./pcat -v 3

	nonblocked: yes, multiplexed: yes, read block size: 3 bytes 
	interval 43:271 read 3 bytes: xxx 
	interval 00:100 read 3 bytes: xx 
	interval 00:100 read 3 bytes: yyy 
	interval 00:100 read 3 bytes: yyy 
	interval 00:100 read 3 bytes:  zz 
	interval 00:100 read 3 bytes: zzz 
	interval 00:100 read 3 bytes:  tt 
	interval 00:100 read 1 bytes: 
	interval 00:100 read 0 bytes: read EOF 

Выполнение команды блокировалось (на этот раз на pool()) до выполнения (>43 секунд) в другом терминале:

$ ./pecho 'xxxxx yyyyyy zzzzz tt'

interval 00:099 write 21 bytes: xxxxx yyyyyy zzzzz tt

А вот как выглядит неблокирующая операция чтения не ожидающая на pool() (несколько первых строк с интервалом 3 сек. показывают неготовность до обновления данных):

$ ./pcat -v 3 -m

	nonblocked: yes, multiplexed: no, read block size: 3 bytes 
	interval 00:000 read -1 bytes: device NOT READY 
	interval 00:000 read -1 bytes: device NOT READY 
	interval 00:000 read -1 bytes: device NOT READY 
	interval 00:000 read -1 bytes: device NOT READY 
	interval 00:000 read 3 bytes: 123 
	interval 00:000 read 3 bytes: 45 
	interval 00:000 read 3 bytes: 678 
	interval 00:000 read 3 bytes: 90 
	interval 00:000 read 0 bytes: read EOF 

Опять же, делающая доступными данные операция с другого терминала:

$ ./pecho '12345 67890'

interval 00:099 write 11 bytes: 12345 67890


Предыдущий раздел: Оглавление Следующий раздел:
Счётчик ссылок использования модуля   Блочные устройства