В приложении В представлены данные, характеризующие производительность программы 10.5, в которой используются функции управления очередью. Приведенные ниже замечания по поводу различных факторов, которые могут оказывать влияние на производительность, основываются на этих данных. Программные коды упоминаемых ниже альтернативных вариантов реализации находятся на Web-сайте книги.
• В данной реализации используется широковещательная модель ("вручную сбрасываемое событие/PulseEvent"), обеспечивающая поддержку общего случая, когда один поток может запрашивать или создавать несколько сообщений. Если такая общность не требуется, можно использовать сигнальную модель ("автоматически сбрасываемое событие/SetEvent"), которая, к тому же, обеспечит значительно более высокую производительность, поскольку для тестирования предиката будет освобождаться только один поток. На Web-сайте находится файл QueueObj_Sig.с, содержащий исходный код, в котором вместо широковещательной модели используется сигнальная модель.
• Использование для защиты объекта очереди объекта CRITICAL_SECTION вместо мьютекса также может привести к повышению производительности. Однако в этом случае вместо функции SignalObjectAndWait следует использовать функцию EnterCriticalSection с последующим ожиданием события. Этот альтернативный подход иллюстрируется двумя файлами — QueueObjCS.с и QueueObjCS_Sig.с, находящимися на Web-сайте книги.
• На Web-сайте находятся два других файла с исходными кодами — QueueObj_noSOAW.с и QueueObjSig_noSOAW.с, в которых функция SignalObjectAndWait не используется и которые обеспечивают выполнение программы под управлением Windows 9x.
• Результаты, приведенные в приложении В, свидетельствуют о нелинейном поведении производительности при большом количестве потоков, состязающихся за доступ к очереди. Проекты для каждой из альтернативных стратегий содержатся на Web-сайте книги; эти проекты соответствуют различным вариантам конвейерной системы ThreeStage, описанной в следующих разделах.
•Резюмируя, следует подчеркнуть, что свойства очередей могут быть расширены таким образом, чтобы очередь могла совместно использоваться несколькими процессами и обеспечивать отправку или получение сразу нескольких сообщений за одну операцию. В то же время, некоторого выигрыша в производительности можно добиться за счет использования сигнальной модели, объектов CRITICAL_SECTIONS или функции SignalObjectAndWait. Соответствующие результаты представлены в приложении В.
Пример: использование очередей в многоступенчатом конвейере
Модель "хозяин/рабочий", во всех ее вариациях, является одной из наиболее популярных моделей многопоточного программирования, а программа 8.2 представляет простую модель "производитель/потребитель", являющуюся частным случаем более общей конвейерной модели (pipeline model).
В другом важном частном случае имеется один главный поток, который производит единичные рабочие задания (work units) для ограниченного количества рабочих потоков и помещает их в очередь. Такая методика может оказаться полезной при создании масштабируемого сервера с большим количеством клиентов (число которых может достигать тысячи и более), когда возможность выделения независимого рабочего потока для каждого клиента весьма сомнительна. В главе 14 задача создания масштабируемого сервера обсуждается в контексте портов завершения ввода/вывода.
В конвейерной модели каждый поток или группа потоков определенным образом обрабатывает единичные задания, например, сообщения, и передает их другим потокам для дальнейшей обработки. Аналогом многопоточного конвейера может служить производственная сборочная линия. Идеальным механизмом реализации конвейера являются очереди.
В программе 10.5 (ThreeStage.c) предусмотрено создание нескольких этапов производства и потребления, на каждой из которых поддерживается очередь рабочих заданий, подлежащих обработке. Каждая очередь имеет ограниченную, конечную длину. Всего существует три конвейерных ступени, соединяющих четыре этапа обработки. Программа имеет следующую структуру:
• Производители (producers) периодически создают единичные сообщения, дополненные контрольными суммами, используя для этого ту же функцию, что и в программе 8.2, если не считать того, что в каждом сообщении содержится дополнительное поле адресата, указывающее поток потребителя (consumer), для которой предназначено это сообщение, причем каждый производитель связывается только с одним потребителем. Количество пар "производитель/потребитель" задается в виде параметра командной строки. Далее производитель посылает одиночное сообщение передающему потоку (transmitter), помещая его в очередь передачи сообщений. Если очередь заполнена, производитель ждет, пока ее состояние не изменится.
• Передающий поток объединяет имеющиеся единичные сообщения (но не более пяти за один раз) и создает одно передаваемое сообщение, которое содержит заголовок и ряд единичных сообщений. Затем передающий поток помещает каждое передаваемое сообщение в очередь приема сообщений (receiver), блокируясь, если очередь заполнена. В общем случае передатчик и приемник могут связываться между собой через сетевое соединение. Произвольно выбранное здесь значение коэффициента блокирования (blocking factor), равное 5:1, легко поддается регулировке.
• Принимающий поток обрабатывает единичные сообщения, входящие в состав каждого передаваемого сообщения, и помещает каждое из них в соответствующую очередь потребителя, если она не заполнена.
• Каждый поток потребителя получает одиночные сообщения по мере их поступления и записывает сообщение в файл журнала регистрации.
Блок-схема системы представлена на рис. 10.1. Обратите внимание, что эта система моделирует сетевое соединение, в котором сообщения, относящиеся к различным парам "отправитель/получатель" объединяются и передаются по общему каналу связи.
Рис. 10.1. Многоступенчатый конвейер
В программе 10.5 предложен вариант реализации, в котором используются функции очереди из программы 10.4. Функции генерации и отображения сообщений здесь не представлены, но они взяты из программы 8.1. При этом, наряду с контрольными суммами и данными, в блоки сообщений введены поля производителя и адресата.
Программа 10.5. ThreeStage.с: многоступенчатыйконвейер
/* Глава 10. ThreeStage.с */
/* Трехступенчатая система производитель/потребитель. */
/* Использование: ThreeStage npc goal. */
/* Запустить "npc" пар потоков производителя и потребителя. */
/* Каждый производитель должен сгенерировать в общей сложности */
/* "goal" сообщений, каждое из которых снабжается меткой, указывающей */
/* потребителя, для которого оно предназначено. */
/* Сообщения отправляются "передающему потоку", который, прежде чем */
/* отправить группу сообщений "принимающему потоку", выполняет некоторую*/
/* дополнительную обработку. Наконец, принимающий поток отправляет сообщения потокам потребителя. */
#include "EvryThng.h"
#include "SynchObj.h"
#include "messages.h"
#include <time.h>
#define DELAY_COUNT 1000
#define MAX_THREADS 1024
/* Размеры и коэффициенты блокирования очередей. Эти величины являются */
/* произвольными и могут регулироваться для обеспечения оптимальной */
/* производительности. Текущие значения не являются сбалансированными. */
#define TBLOCK_SIZE 5 /*Передающий поток формирует группы из 5 сообщений.*/
#define TBLOCK_TIMEOUT 50 /*Интервал ожидания сообщений передающим потоком.*/
#define P2T_QLEN 10 /* Размер очереди "производитель/передающий поток". */
#define T2R_QLEN 4 /*Размер очереди "передающий поток/принимающий поток".*/
#define R2C_QLEN 4 /* Размер очереди "принимающий поток/потребитель" -- */
/* для каждого потребителя существует только одна очередь.*/
DWORD WINAPI producer(PVOID);
DWORD WINAPI consumer(PVOID);
DWORD WINAPI transmitter(PVOID);
DWORD WINAPI receiver(PVOID);
typedef struct _THARG {
volatile DWORD thread_number;
volatile DWORD work_goal; /* Используется потоками производителей. */
volatile DWORD work_done; /* Используется потоками производителей и потребителей. */ '
char future[8];
} THARG;
/* Сгруппированные сообщения, посылаемые передающим потоком потребителю.*/
typedef struct t2r_msg_tag {
volatile DWORD num_msgs; /* Количество содержащихся сообщений. */
msg_block_t messages[TBLOCK_SIZE];
} t2r_msg_t;
queue_t p2tq, t2rq, *r2cq_array;
static volatile DWORD ShutDown = 0;
static DWORD EventTimeout = 50;
DWORD _tmain(DWORD argc, LPTSTR * argv[]) {
DWORD tstatus, nthread, ithread, goal, thid;
HANDLE *producer_th, *consumer_th, transmitter_th, receiver_th;
THARG *producer_arg, *consumer_arg;
nthread = atoi(argv[1]);
goal = atoi(argv[2]);
producer_th = malloc(nthread * sizeof(HANDLE));
producer_arg = calloc(nthread, sizeof(THARG));
consumer_th = malloc(nthread * sizeof(HANDLE));
consumer_arg = calloc(nthread, sizeof(THARG));
q_initialize(&p2tq, sizeof(msg_block_t), P2T_QLEN);
q_initialize(&t2rq, sizeof(t2r_msg_t), T2R_QLEN);
/* Распределить ресурсы, инициализировать очереди "принимающий поток/потребитель" для каждого потребителя. */
r2cq_array = calloc(nthread, sizeof(queue_t));
for (ithread = 0; ithread < nthread; ithread++) {
/* Инициализировать очередь r2с для потока данного потребителя. */
q_initialize(&r2cq_array[ithread], sizeof(msg_block_t), R2C_QLEN);
/* Заполнить аргументы потока. */
consumer_arg[ithread].thread_number = ithread;
consumer_arg[ithread].work_goal = goal;
consumer_arg[ithread].work_done = 0;
consumer_th[ithread] = (HANDLE)_beginthreadex(NULL, 0, consumer, (PVOID)&consumer_arg[ithread], 0, &thid);
producer_arg[ithread].thread_number = ithread;
producer_arg[ithread].work_goal = goal;
producer_arg[ithread].work_done = 0;
producer_th[ithread] = (HANDLE)_beginthreadex(NULL, 0, producer, (PVOID)&producer_arg[ithread], 0, &thid);
}
transraitter_th = (HANDLE)_beginthreadex(NULL, 0, transmitter, NULL, 0, &thid);
receiver_th = (HANDLE)_beginthreadex (NULL, 0, receiver, NULL, 0, &thid);
_tprintf(_T("ХОЗЯИН: Выполняются все потоки\n"));
/* Ждать завершения потоков производителя. */
for (ithread = 0; ithread < nthread; ithread++) {
WaitForSingleObject(producer_th[ithread], INFINITE);
_tprintf(_T("ХОЗЯИН: производитель %d выработал %d единичных сообщений\n"), ithread, producer_arg[ithread].work_done);
}
/* Производители завершили работу. */
_tprintf(_T("ХОЗЯИН: Все потоки производителя выполнили свою работу.\n"));
/* Ждать завершения потоков потребителя. */
for (ithread = 0; ithread < nthread; ithread++) {
WaitForSingleObject(consumer_th[ithread], INFINITE);
_tprintf(_T("ХОЗЯИН: потребитель %d принял %d одиночных сообщений\n"), ithread, consumer_arg[ithread].work_done);
}
_tprintf(_T("ХОЗЯИН: Все потоки потребителя выполнили свою работу.\n"));
ShutDown = 1; /* Установить флаг завершения работы. */
/* Завершить выполнение и перейти в состояние ожидания передающих и принимающих потоков. */
/* Эта процедура завершения работает нормально, поскольку и передающий,*/
/* и принимающий потоки не владеют иными ресурсами, кроме мьютекса, */
/* которые они могли бы покинуть по завершении выполнения, не уступив прав владения ими. Можете ли вы улучшить эту процедуру? */
TerminateThread(transmitter_th, 0);
TerminateThread(receiver_th, 0);
WaitForSingleObject(transmitter_th, INFINITE);
WaitForSingleObject(receiver_th, INFINITE);
q_destroy(&p2tq);
q_destroy(&t2rq);
for (ithread = 0; ithread < nthread; ithread++) q_destroy(&r2cq_array [ithread]);
free(r2cq_array);
free(producer_th);
free(consumer_th);
free(producer_arg);
free(consumer_arg);
_tprintf(_T("Система завершила работу. Останов системы\n"));
return 0;
}
DWORD WINAPI producer(PVOID arg) {
THARG * parg;
DWORD ithread, tstatus;
msg_block_t msg;
parg = (THARG *)arg;
ithread = parg->thread_number;
while (parg->work_done < parg->work_goal) {
/* Вырабатывать единичные сообщения, пока их общее количество */
/* не станет равным "goal". */
/* Сообщения снабжаются адресами отправителя и адресата, которые в */
/* нашем примере одинаковы для всех сообщений, но в общем случае */
/* могут быть различными. */
delay_cpu(DELAY_COUNT * rand() / RAND_MAX);
message_fill(&msg, ithread, ithread, parg->work_done);
/* Поместить сообщение в очередь. */
tstatus = q_put(&p2tq, &msg, sizeof(msg), INFINITE);
parg->work_done++;
}
return 0;
}
DWORD WINAPI transmitter(PVOID arg) {
/* Получись несколько сообщений от производителя, объединяя их в одно*/
/* составное сообщение, предназначенное для принимающего потока. */
DWORD tstatus, im;
t2r_msg_t t2r_msg = {0};
msg_block_t p2t_msg;
while (!ShutDown) {
t2r_msg.num_msgs = 0;
/* Упаковать сообщения для передачи принимающему потоку. */
for (im = 0; im < TBLOCK_SIZE; im++) {
tstatus = q_get(&p2tq, &p2t_msg, sizeof(p2t_msg), INFINITE);
if (tstatus!= 0) break;
memcpy(&t2r_msg.messages[im], &p2t_msg, sizeof(p2t_msg));
t2r_rasg.num_msgs++;
}
tstatus = q_put(&t2rq, &t2r_msg, sizeof(t2r_msg), INFINITE);
if (tstatus!= 0) return tstatus;
}
return 0;
}
DWORD WINAPI receiver(PVOID arg) {
/* Получить составные сообщения от передающего потока; распаковать */
/* их и передать соответствующему потребителю. */
DWORD tstatus, im, ic;
t2r_msg_t t2r_msg;
msg_block_t r2c_msg;
while (!ShutDown) {
tstatus = q_get(&t2rq, &t2r_msg, sizeof(t2r_msg), INFINITE);
if (tstatus!= 0) return tstatus;
/* Распределить сообщения между соответствующими потребителями. */
for (im = 0; im < t2r_msg.num_msgs; im++) {
memcpy(&r2c_msg, &t2r_msg.messages[im], sizeof(r2c_msg));
ic = r2c_msg.destination; /* Конечный потребитель. */
tstatus = q_put(&r2cq_array[ic], &r2c_msg, sizeof(r2c_msg), INFINITE);
if (tstatus!= 0) return tstatus;
}
}
return 0;
}
DWORD WINAPI consumer(PVOID arg) {
THARG * carg;
DWORD tstatus, ithread;
msg_block_t msg;
queue_t *pr2cq;
carg = (THARG *)arg;
ithread = carg->thread_number;
carg = (THARG *)arg;
pr2cq = &r2cq_array[ithread];
while (carg->work_done < carg->work_goal) {
/* Получить и отобразить (необязательно — не показано) сообщения. */
tstatus = q_get(pr2cq, &msg, sizeof(msg), INFINITE);
if (tstatus!= 0) return tstatus;
carg->work_done++;
}
return 0;
}