.


:




:

































 

 

 

 





, 10.5, . , , . Web- .

(" /PulseEvent"), , . , (" /SetEvent"), , , , . Web- QueueObj_Sig., , .

CRITICAL_SECTION . SignalObjectAndWait EnterCriticalSection . QueueObjCS. QueueObjCS_Sig., Web- .

Web- QueueObj_noSOAW. QueueObjSig_noSOAW., SignalObjectAndWait Windows 9x.

, , , . Web- ; ThreeStage, .

, , , . , , CRITICAL_SECTIONS SignalObjectAndWait. .

:

"/", , , 8.2 "/", (pipeline model).

, (work units) . ( ), . 14 /.

, , , . . .

10.5 (ThreeStage.c) , , . , . , . :

(producers) , , , 8.2, , , (consumer), , . "/" . (transmitter), . , , .

( ) , . (receiver), , . . (blocking factor), 5:1, .

, , , .

.

- . 10.1. , , , "/" .

. 10.1.

10.5 , 10.4. , 8.1. , , .

10.5. ThreeStage.:

/* 10. ThreeStage. */

/* /. */

/* : ThreeStage npc goal. */

/* "npc" . */

/* */

/* "goal" , , */

/* , . */

/* " ", , */

/* " ", */

/* . , . */

 

#include "EvryThng.h"

#include "SynchObj.h"

#include "messages.h"

#include <time.h>

 

#define DELAY_COUNT 1000

#define MAX_THREADS 1024

 

/* . */

/* */

/* . . */

#define TBLOCK_SIZE 5 /* 5 .*/

#define TBLOCK_TIMEOUT 50 /* .*/

#define P2T_QLEN 10 /* "/ ". */

#define T2R_QLEN 4 /* " / ".*/

#define R2C_QLEN 4 /* " /" -- */

/* .*/

 

DWORD WINAPI producer(PVOID);

DWORD WINAPI consumer(PVOID);

DWORD WINAPI transmitter(PVOID);

DWORD WINAPI receiver(PVOID);

 

typedef struct _THARG {

volatile DWORD thread_number;

volatile DWORD work_goal; /* . */

volatile DWORD work_done; /* . */ '

char future[8];

} THARG;

 

/* , .*/

typedef struct t2r_msg_tag {

volatile DWORD num_msgs; /* . */

msg_block_t messages[TBLOCK_SIZE];

} t2r_msg_t;

 

queue_t p2tq, t2rq, *r2cq_array;

 

static volatile DWORD ShutDown = 0;

static DWORD EventTimeout = 50;

 

DWORD _tmain(DWORD argc, LPTSTR * argv[]) {

DWORD tstatus, nthread, ithread, goal, thid;

HANDLE *producer_th, *consumer_th, transmitter_th, receiver_th;

THARG *producer_arg, *consumer_arg;

nthread = atoi(argv[1]);

goal = atoi(argv[2]);

producer_th = malloc(nthread * sizeof(HANDLE));

producer_arg = calloc(nthread, sizeof(THARG));

consumer_th = malloc(nthread * sizeof(HANDLE));

consumer_arg = calloc(nthread, sizeof(THARG));

q_initialize(&p2tq, sizeof(msg_block_t), P2T_QLEN);

q_initialize(&t2rq, sizeof(t2r_msg_t), T2R_QLEN);

/* , " /" . */

r2cq_array = calloc(nthread, sizeof(queue_t));

for (ithread = 0; ithread < nthread; ithread++) {

/* r2 . */

q_initialize(&r2cq_array[ithread], sizeof(msg_block_t), R2C_QLEN);

/* . */

consumer_arg[ithread].thread_number = ithread;

consumer_arg[ithread].work_goal = goal;

consumer_arg[ithread].work_done = 0;

consumer_th[ithread] = (HANDLE)_beginthreadex(NULL, 0, consumer, (PVOID)&consumer_arg[ithread], 0, &thid);

producer_arg[ithread].thread_number = ithread;

producer_arg[ithread].work_goal = goal;

producer_arg[ithread].work_done = 0;

producer_th[ithread] = (HANDLE)_beginthreadex(NULL, 0, producer, (PVOID)&producer_arg[ithread], 0, &thid);

}

transraitter_th = (HANDLE)_beginthreadex(NULL, 0, transmitter, NULL, 0, &thid);

receiver_th = (HANDLE)_beginthreadex (NULL, 0, receiver, NULL, 0, &thid);

_tprintf(_T(": \n"));

/* . */

for (ithread = 0; ithread < nthread; ithread++) {

WaitForSingleObject(producer_th[ithread], INFINITE);

_tprintf(_T(": %d %d \n"), ithread, producer_arg[ithread].work_done);

}

/* . */

_tprintf(_T(": .\n"));

/* . */

for (ithread = 0; ithread < nthread; ithread++) {

WaitForSingleObject(consumer_th[ithread], INFINITE);

_tprintf(_T(": %d %d \n"), ithread, consumer_arg[ithread].work_done);

}

_tprintf(_T(": .\n"));

ShutDown = 1; /* . */

/* . */

/* , ,*/

/* , , */

/* , . ? */

TerminateThread(transmitter_th, 0);

TerminateThread(receiver_th, 0);

WaitForSingleObject(transmitter_th, INFINITE);

WaitForSingleObject(receiver_th, INFINITE);

q_destroy(&p2tq);

q_destroy(&t2rq);

for (ithread = 0; ithread < nthread; ithread++) q_destroy(&r2cq_array [ithread]);

free(r2cq_array);

free(producer_th);

free(consumer_th);

free(producer_arg);

free(consumer_arg);

_tprintf(_T(" . \n"));

return 0;

}

 

DWORD WINAPI producer(PVOID arg) {

THARG * parg;

DWORD ithread, tstatus;

msg_block_t msg;

parg = (THARG *)arg;

ithread = parg->thread_number;

while (parg->work_done < parg->work_goal) {

/* , */

/* "goal". */

/* , */

/* , */

/* . */

delay_cpu(DELAY_COUNT * rand() / RAND_MAX);

message_fill(&msg, ithread, ithread, parg->work_done);

/* . */

tstatus = q_put(&p2tq, &msg, sizeof(msg), INFINITE);

parg->work_done++;

}

return 0;

}

 

DWORD WINAPI transmitter(PVOID arg) {

/* , */

/* , . */

DWORD tstatus, im;

t2r_msg_t t2r_msg = {0};

msg_block_t p2t_msg;

while (!ShutDown) {

t2r_msg.num_msgs = 0;

/* . */

for (im = 0; im < TBLOCK_SIZE; im++) {

tstatus = q_get(&p2tq, &p2t_msg, sizeof(p2t_msg), INFINITE);

if (tstatus!= 0) break;

memcpy(&t2r_msg.messages[im], &p2t_msg, sizeof(p2t_msg));

t2r_rasg.num_msgs++;

}

tstatus = q_put(&t2rq, &t2r_msg, sizeof(t2r_msg), INFINITE);

if (tstatus!= 0) return tstatus;

}

return 0;

}

 

DWORD WINAPI receiver(PVOID arg) {

/* ; */

/* . */

DWORD tstatus, im, ic;

t2r_msg_t t2r_msg;

msg_block_t r2c_msg;

while (!ShutDown) {

tstatus = q_get(&t2rq, &t2r_msg, sizeof(t2r_msg), INFINITE);

if (tstatus!= 0) return tstatus;

/* . */

for (im = 0; im < t2r_msg.num_msgs; im++) {

memcpy(&r2c_msg, &t2r_msg.messages[im], sizeof(r2c_msg));

ic = r2c_msg.destination; /* . */

tstatus = q_put(&r2cq_array[ic], &r2c_msg, sizeof(r2c_msg), INFINITE);

if (tstatus!= 0) return tstatus;

}

}

return 0;

}

 

DWORD WINAPI consumer(PVOID arg) {

THARG * carg;

DWORD tstatus, ithread;

msg_block_t msg;

queue_t *pr2cq;

carg = (THARG *)arg;

ithread = carg->thread_number;

carg = (THARG *)arg;

pr2cq = &r2cq_array[ithread];

while (carg->work_done < carg->work_goal) {

/* ( ) . */

tstatus = q_get(pr2cq, &msg, sizeof(msg), INFINITE);

if (tstatus!= 0) return tstatus;

carg->work_done++;

}

return 0;

}





:


: 2015-09-20; !; : 475 |


:

:

! . .
==> ...

1913 - | 1702 -


© 2015-2024 lektsii.org - -

: 0.078 .