, 10.5, . , , . Web- .
(" /PulseEvent"), , . , (" /SetEvent"), , , , . Web- QueueObj_Sig., , .
CRITICAL_SECTION . SignalObjectAndWait EnterCriticalSection . QueueObjCS. QueueObjCS_Sig., Web- .
Web- QueueObj_noSOAW. QueueObjSig_noSOAW., SignalObjectAndWait Windows 9x.
, , , . Web- ; ThreeStage, .
, , , . , , CRITICAL_SECTIONS SignalObjectAndWait. .
:
"/", , , 8.2 "/", (pipeline model).
, (work units) . ( ), . 14 /.
|
|
, , , . . .
10.5 (ThreeStage.c) , , . , . , . :
(producers) , , , 8.2, , , (consumer), , . "/" . (transmitter), . , , .
( ) , . (receiver), , . . (blocking factor), 5:1, .
, , , .
.
- . 10.1. , , , "/" .
. 10.1.
10.5 , 10.4. , 8.1. , , .
10.5. ThreeStage.:
/* 10. ThreeStage. */
/* /. */
|
|
/* : ThreeStage npc goal. */
/* "npc" . */
/* */
/* "goal" , , */
/* , . */
/* " ", , */
/* " ", */
/* . , . */
#include "EvryThng.h"
#include "SynchObj.h"
#include "messages.h"
#include <time.h>
#define DELAY_COUNT 1000
#define MAX_THREADS 1024
/* . */
/* */
/* . . */
#define TBLOCK_SIZE 5 /* 5 .*/
#define TBLOCK_TIMEOUT 50 /* .*/
#define P2T_QLEN 10 /* "/ ". */
#define T2R_QLEN 4 /* " / ".*/
#define R2C_QLEN 4 /* " /" -- */
/* .*/
DWORD WINAPI producer(PVOID);
DWORD WINAPI consumer(PVOID);
DWORD WINAPI transmitter(PVOID);
DWORD WINAPI receiver(PVOID);
typedef struct _THARG {
volatile DWORD thread_number;
volatile DWORD work_goal; /* . */
volatile DWORD work_done; /* . */ '
char future[8];
} THARG;
/* , .*/
typedef struct t2r_msg_tag {
volatile DWORD num_msgs; /* . */
msg_block_t messages[TBLOCK_SIZE];
} t2r_msg_t;
queue_t p2tq, t2rq, *r2cq_array;
static volatile DWORD ShutDown = 0;
static DWORD EventTimeout = 50;
DWORD _tmain(DWORD argc, LPTSTR * argv[]) {
DWORD tstatus, nthread, ithread, goal, thid;
HANDLE *producer_th, *consumer_th, transmitter_th, receiver_th;
THARG *producer_arg, *consumer_arg;
nthread = atoi(argv[1]);
goal = atoi(argv[2]);
producer_th = malloc(nthread * sizeof(HANDLE));
producer_arg = calloc(nthread, sizeof(THARG));
consumer_th = malloc(nthread * sizeof(HANDLE));
consumer_arg = calloc(nthread, sizeof(THARG));
q_initialize(&p2tq, sizeof(msg_block_t), P2T_QLEN);
q_initialize(&t2rq, sizeof(t2r_msg_t), T2R_QLEN);
/* , " /" . */
r2cq_array = calloc(nthread, sizeof(queue_t));
for (ithread = 0; ithread < nthread; ithread++) {
/* r2 . */
q_initialize(&r2cq_array[ithread], sizeof(msg_block_t), R2C_QLEN);
/* . */
consumer_arg[ithread].thread_number = ithread;
consumer_arg[ithread].work_goal = goal;
consumer_arg[ithread].work_done = 0;
consumer_th[ithread] = (HANDLE)_beginthreadex(NULL, 0, consumer, (PVOID)&consumer_arg[ithread], 0, &thid);
producer_arg[ithread].thread_number = ithread;
producer_arg[ithread].work_goal = goal;
producer_arg[ithread].work_done = 0;
producer_th[ithread] = (HANDLE)_beginthreadex(NULL, 0, producer, (PVOID)&producer_arg[ithread], 0, &thid);
}
transraitter_th = (HANDLE)_beginthreadex(NULL, 0, transmitter, NULL, 0, &thid);
receiver_th = (HANDLE)_beginthreadex (NULL, 0, receiver, NULL, 0, &thid);
_tprintf(_T(": \n"));
|
|
/* . */
for (ithread = 0; ithread < nthread; ithread++) {
WaitForSingleObject(producer_th[ithread], INFINITE);
_tprintf(_T(": %d %d \n"), ithread, producer_arg[ithread].work_done);
}
/* . */
_tprintf(_T(": .\n"));
/* . */
for (ithread = 0; ithread < nthread; ithread++) {
WaitForSingleObject(consumer_th[ithread], INFINITE);
_tprintf(_T(": %d %d \n"), ithread, consumer_arg[ithread].work_done);
}
_tprintf(_T(": .\n"));
ShutDown = 1; /* . */
/* . */
/* , ,*/
/* , , */
/* , . ? */
TerminateThread(transmitter_th, 0);
TerminateThread(receiver_th, 0);
WaitForSingleObject(transmitter_th, INFINITE);
WaitForSingleObject(receiver_th, INFINITE);
q_destroy(&p2tq);
q_destroy(&t2rq);
for (ithread = 0; ithread < nthread; ithread++) q_destroy(&r2cq_array [ithread]);
free(r2cq_array);
free(producer_th);
free(consumer_th);
free(producer_arg);
free(consumer_arg);
_tprintf(_T(" . \n"));
return 0;
}
DWORD WINAPI producer(PVOID arg) {
THARG * parg;
DWORD ithread, tstatus;
msg_block_t msg;
parg = (THARG *)arg;
ithread = parg->thread_number;
while (parg->work_done < parg->work_goal) {
/* , */
/* "goal". */
/* , */
/* , */
/* . */
delay_cpu(DELAY_COUNT * rand() / RAND_MAX);
message_fill(&msg, ithread, ithread, parg->work_done);
/* . */
tstatus = q_put(&p2tq, &msg, sizeof(msg), INFINITE);
parg->work_done++;
}
return 0;
}
DWORD WINAPI transmitter(PVOID arg) {
/* , */
/* , . */
DWORD tstatus, im;
t2r_msg_t t2r_msg = {0};
msg_block_t p2t_msg;
while (!ShutDown) {
t2r_msg.num_msgs = 0;
/* . */
for (im = 0; im < TBLOCK_SIZE; im++) {
tstatus = q_get(&p2tq, &p2t_msg, sizeof(p2t_msg), INFINITE);
if (tstatus!= 0) break;
memcpy(&t2r_msg.messages[im], &p2t_msg, sizeof(p2t_msg));
t2r_rasg.num_msgs++;
}
tstatus = q_put(&t2rq, &t2r_msg, sizeof(t2r_msg), INFINITE);
if (tstatus!= 0) return tstatus;
}
return 0;
}
DWORD WINAPI receiver(PVOID arg) {
/* ; */
/* . */
DWORD tstatus, im, ic;
t2r_msg_t t2r_msg;
msg_block_t r2c_msg;
while (!ShutDown) {
tstatus = q_get(&t2rq, &t2r_msg, sizeof(t2r_msg), INFINITE);
if (tstatus!= 0) return tstatus;
/* . */
for (im = 0; im < t2r_msg.num_msgs; im++) {
|
|
memcpy(&r2c_msg, &t2r_msg.messages[im], sizeof(r2c_msg));
ic = r2c_msg.destination; /* . */
tstatus = q_put(&r2cq_array[ic], &r2c_msg, sizeof(r2c_msg), INFINITE);
if (tstatus!= 0) return tstatus;
}
}
return 0;
}
DWORD WINAPI consumer(PVOID arg) {
THARG * carg;
DWORD tstatus, ithread;
msg_block_t msg;
queue_t *pr2cq;
carg = (THARG *)arg;
ithread = carg->thread_number;
carg = (THARG *)arg;
pr2cq = &r2cq_array[ithread];
while (carg->work_done < carg->work_goal) {
/* ( ) . */
tstatus = q_get(pr2cq, &msg, sizeof(msg), INFINITE);
if (tstatus!= 0) return tstatus;
carg->work_done++;
}
return 0;
}