#include "testMsgLoop.h"
using std::cout;
using std::flush;
static DWORD ServerCommThreadId = 0;
static DWORD gle = 0;
static HANDLE rpipe = 0;
static HANDLE wpipe = 0;
//--------------------------------------------
//-- some helper functions
static UINT CreateTimer(int clientid)
{
UINT timerid = SetTimer(
NULL, // handle of window for timer messages
0, // timer identifier
clientdelay+clientid, // time-out value
NULL); // address of timer procedure
_ASSERTE(timerid != 0 && "client: SetTimer before loop failed");
return timerid;
}
static void RegisterWithParent(int clientid)
{
//register with the parent telling it out threadid
BOOL brc = PostThreadMessage(
ServerCommThreadId, // thread identifier
WM_REGISTER, // message to post
(WPARAM)clientid, // first message parameter
(LPARAM)GetCurrentThreadId()); // second message parameter)
_ASSERTE(brc && "client: postthreadmessage on register failed");
}
static void SendMessage(int clientid, char* buf)
{
if (!wpipe) return;
DWORD len = strlen(buf) + 1;
DWORD byteswritten = 0;
BOOL brc = WriteFile(
wpipe, // handle to file to write to
buf, // pointer to data to write to file
len, // number of bytes to write
&byteswritten, // pointer to number of bytes written
0); // pointer to structure for overlapped I/O
gle = GetLastError();
_ASSERTE(brc && "client: writefile failed");
_ASSERTE(len == byteswritten && "client: number of bytes written != size of buffer");
cout << "client: sent outmsg id=" << clientid << " len=" << len << " '" << buf << "'\n" << flush;
brc = PostThreadMessage(
ServerCommThreadId, // thread identifier
WM_OUTMSG, // message to post
(WPARAM) clientid, // first message parameter
(LPARAM) len); // second message parameter)
_ASSERTE(brc && "client: PostThreadMessage wm_outmsg failed");
}
static void ReadIncomingMessage(int len, char* buf, int maxlen)
{
_ASSERTE(rpipe && "client: inmsg came in but rpipe is not set");
if (!rpipe) return;
_ASSERTE(len > 0 && len < maxlen && "client: wm_inmsg buflen is invalid");
DWORD bytesread = 0;
BOOL brc = ReadFile(
rpipe, // handle of file to read
buf, // pointer to buffer that receives data
len, // number of bytes to read
&bytesread, // pointer to number of bytes read
0); // pointer to structure for data
buf[maxlen - 1] = 0;
_ASSERTE(bytesread == len && "client: bytesread != len");
gle = GetLastError();
_ASSERTE(brc && "client: readfile: failed");
_ASSERTE(buf[len-1] == 0 && "client: expected a null terminated string!");
buf[len-1] = 0;
}
void main(int argc, char** argv)
{
//_ASSERTE(0);
_ASSERTE(argc == 3 && "client: requires 3 parms");
//the client id is passed on the command line
int clientid = atoi(argv[1]);
_ASSERTE(clientid >= 1 && clientid <= 32 && "client: invalid clientid");
//the commthread id is passed on the command line
ServerCommThreadId = atol(argv[2]);
_ASSERTE(ServerCommThreadId && "client: invalid commthread id");
//force the queue creation
MSG msg;
PeekMessage(&msg, NULL, WM_USER, WM_USER, PM_NOREMOVE);
cout << "client: strt id=" << clientid << " tid=" << GetCurrentThreadId() << "\n" << flush;
RegisterWithParent(clientid);
UINT timerid = CreateTimer(clientid); //start a timer so we can send messages back
//start the loop going...
while (GetMessage(&msg, 0, 0, 0))
{
switch (msg.message)
{
//--------------------------
//-- we're dead, pop out of the loop
case WM_PIPEHANDLES:
{
rpipe = (HANDLE) msg.wParam;
wpipe = (HANDLE) msg.lParam;
_ASSERTE(rpipe && "client: rpipe is null");
_ASSERTE(wpipe && "client: wpipe is null");
cout << "client: recv pipehandles id=" << clientid << "\n" << flush;
}
break;
//--------------------------
//-- we're dead, pop out of the loop
case WM_CLOSE:
cout << "client: recv wm_close id=" << clientid << "\n" << flush;
KillTimer(NULL, timerid);
PostQuitMessage(0);
break;
//-----------------------
//timer popped: send a message to the parent
//this simulates an asynchronous message generated from the client
case WM_TIMER:
{
char buf[100];
static int num = 0;
sprintf(buf, "client: id=%ld hi there (msg# %d)", (long) clientid, num++);
SendMessage(clientid, buf);
}
break;
//--------------------------
//parent sent us a message asynchronously
case WM_INMSG:
{
_ASSERTE(msg.lParam == 0); //lParam is unused
char buf[512];
ReadIncomingMessage(msg.wParam, buf, sizeof(buf));
cout << "client: recv inmsg id=" << clientid << " len="<< msg.wParam << " '" << buf << "'\n" << flush;
}
break;
}
}
cout << "client: exit id=" << clientid << "\n" << flush;
CloseHandle(rpipe);
CloseHandle(wpipe);
}
|
#include "testMsgLoop.h"
using std::cout;
using std::flush;
static DWORD __stdcall CommThread(void* parms);
static HANDLE hCommThread; //handle of comm thread
static DWORD dwCommThread; //id of comm thread
static DWORD dwMainThread; //id of main thread
//array holds info about all of the child processes
struct _clientarray
{
DWORD tid;
bool registered;
UINT timerid;
HANDLE rpipe; //read from this handle
HANDLE wpipe; //write to this handle
HANDLE childrpipe; //temp. holder for child handles
HANDLE childwpipe; //temp. holder for child handles
HANDLE hProcess; //child process handle
} client[numclients+1]; //the zeroth element is not used
DWORD gle;
static void CreateChild(int id)
{
PROCESS_INFORMATION pi;
STARTUPINFO si;
memset(&si, 0x00, sizeof(si));
si.cb = sizeof(si);
char buf[100];
sprintf(buf, "%s\\client.exe %d %ld", cExecutablesDir, id, (long) dwCommThread);
BOOL brc = CreateProcess(
0, // pointer to name of executable module
buf, // pointer to command line string
NULL, // pointer to process security attributes
NULL, // pointer to thread security attributes
TRUE, // handle inheritance flag
0, // creation flags
NULL, // pointer to new environment block
cExecutablesDir, // pointer to current directory name
&si, // pointer to STARTUPINFO
&pi); // pointer to PROCESS_INFORMATION
gle = GetLastError();
_ASSERTE(brc && "server: createprocess failed");
//wait for the process to come up and the msg loop to be created
WaitForInputIdle(
pi.hProcess, // handle to process
INFINITE); // time-out interval in milliseconds
client[id].hProcess = pi.hProcess;
CloseHandle(pi.hThread);
}
//---------------------------------------------------------------------------------
static void StartClients()
{
BOOL brc;
SECURITY_ATTRIBUTES sa;
memset(&sa, 0x00, sizeof(sa));
sa.nLength = sizeof(sa);
sa.lpSecurityDescriptor = NULL;
sa.bInheritHandle = TRUE;
HANDLE hReadTmp;
HANDLE hWriteTmp;
for(int id=1; id <= numclients; id++)
{
_ASSERTE(!client[id].registered && "server: client is already registered before created?");
//create the necessary pipes, we'll send the handles to the child when it registers with us
brc = CreatePipe(
&hReadTmp, // pointer to read handle
&hWriteTmp, // pointer to write handle
&sa, // pointer to security attributes
0); // pipe size
_ASSERTE(brc && "server: createpipe 1 failed");
client[id].childwpipe = hWriteTmp;
client[id].rpipe = hReadTmp;
//create the pipe
brc = CreatePipe(
&hReadTmp, // pointer to read handle
&hWriteTmp, // pointer to write handle
&sa, // pointer to security attributes
0); // pipe size
_ASSERTE(brc && "server: createpipe 2 failed");
client[id].wpipe = hWriteTmp;
client[id].childrpipe = hReadTmp;
CreateChild(id);
}
}
//------------------------------------------------
//-- some helper functions
void SendShutdown(DWORD id)
{
//tell each child to shutdown
BOOL brc = PostThreadMessage(
id, // thread identifier
WM_CLOSE, // message to post
0, // first message parameter
0); // second message parameter)
}
void KillAllTimers()
{
for(int i = 1; i <= numclients; i++)
{
if (client[i].registered)
{
//kill the associated timer
KillTimer(NULL, client[i].timerid);
}
}
}
void ShutdownAllClients()
{
for(int i = 1; i <= numclients; i++)
{
if (!client[i].registered) continue;
SendShutdown(client[i].tid);
}
}
void WaitForAllToDie()
{
for(int i = 1; i <= numclients; i++)
{
if (!client[i].registered) continue;
//wait for the child process to end
WaitForSingleObject(client[i].hProcess, INFINITE);
//close all handles
CloseHandle(client[i].hProcess);
CloseHandle(client[i].rpipe);
CloseHandle(client[i].wpipe);
}
}
static void EndClients()
{
KillAllTimers(); //first stop sending messages... the client will stop responding.
ShutdownAllClients(); //now that the clients are stable and idle, tell them to quit
WaitForAllToDie(); //now wait for the processes to end
}
static void StartCommunicationThread()
{
hCommThread = CreateThread(
NULL, // pointer to thread security attributes
0, // initial thread stack size, in bytes
CommThread, // pointer to thread function
(void*) 0, // argument for new thread
0, // creation flags
&dwCommThread); // pointer to returned thread identifier
}
static void EndCommunicationThread()
{
//all clean, finally tell the comm thread to quit
PostThreadMessage(dwCommThread,WM_QUIT,0,0);
//wait for it to exit
WaitForSingleObject(hCommThread, INFINITE);
//all done
CloseHandle(hCommThread);
}
static void ReadIncomingMessage(int clientid, int len, char* buf, int maxlen)
{
_ASSERTE(clientid >= 1 && clientid <= numclients && "server: wm_outmsg clientid is invalid");
_ASSERTE(client[clientid].registered && "server: client not registered");
_ASSERTE(len > 0 && len < maxlen && "server: wm_outmsg buflen is invalid");
DWORD bytesread = 0;
BOOL brc = ReadFile(
client[clientid].rpipe, // handle of file to read
buf, // pointer to buffer that receives data
len, // number of bytes to read
&bytesread, // pointer to number of bytes read
0); // pointer to structure for data
buf[maxlen - 1] = 0;
gle = GetLastError();
_ASSERTE(buf[len-1] == 0 && "server: expected a null terminated string!");
buf[len - 1] = 0;
}
static void WriteMessage(int i, char* buf)
{
DWORD len = strlen(buf) + 1; //one for the null!
DWORD byteswritten = 0;
BOOL brc = WriteFile(
client[i].wpipe, // handle to file to write to
buf, // pointer to data to write to file
len, // number of bytes to write
&byteswritten, // pointer to number of bytes written
0); // pointer to structure for overlapped I/O
gle = GetLastError();
_ASSERTE(brc && "server: writefile failed");
_ASSERTE(len == byteswritten && "server: number of bytes written != size of buffer");
//tell the child that we've written to the pipe
brc = PostThreadMessage(
client[i].tid, // thread identifier
WM_INMSG, // message to post
(WPARAM) len, // first message parameter
0); // second message parameter)
_ASSERTE(brc && "server: PostThreadMessage wm_inmsg failed");
}
static int FindClientFromTimerId(UINT timerid)
{
//search for timerid in the array, send a msg to the client appl
int i;
for(i = 1; i <= numclients; i++)
{
if (client[i].timerid == timerid)
break;
}
_ASSERTE(i >= 1 && i <= numclients && "server: could not find timerid in the array");
return i;
}
//----------------------
void main()
{
//ensure everything is 0
memset(&client, 0x00, sizeof(client));
//save the current threadid so we can be shutdown
dwMainThread = GetCurrentThreadId();
//start everything up
StartCommunicationThread();
StartClients();
//wait for the world to finish, handle incoming messages
MSG msg;
while (GetMessage(&msg, 0, 0, 0))
{
DispatchMessage(&msg);
}
//the comm. thread has finished and sent a wm_quit to us, so cleanup
EndClients();
EndCommunicationThread();
}
//-------------------------------------------------------
DWORD __stdcall CommThread(void* parms)
{
MSG msg;
UINT timerid;
BOOL brc;
//force the queue creation
PeekMessage(&msg, NULL, WM_USER, WM_USER, PM_NOREMOVE);
while (GetMessage(&msg, 0, 0, 0))
{
switch (msg.message)
{
//----------------------
//-- child process sends a register immediately after starting
case WM_REGISTER :
{
int clientid = msg.wParam;
_ASSERTE(clientid >= 1 && clientid <= numclients && "server: register clientid is invalid");
_ASSERTE(!client[clientid].registered && "server: already registered");
client[clientid].registered = true;
client[clientid].tid = msg.lParam;
cout << "server: rgst client id=" << clientid << " tid=" << msg.lParam << "\n" << flush;
//duplicate the handles so we can close our side
HANDLE rpipe = 0;
HANDLE wpipe = 0;
brc = DuplicateHandle(
GetCurrentProcess(), // handle to the source process
client[clientid].childrpipe, // handle to duplicate
client[clientid].hProcess, // handle to process to duplicate to
&rpipe, // pointer to duplicate handle
GENERIC_READ, // access for duplicate handle
FALSE, // handle inheritance flag
0); // optional actions
gle = GetLastError();
_ASSERTE(brc && "server: duplicatehandle rpipe");
CloseHandle(client[clientid].childrpipe);
brc = DuplicateHandle(
GetCurrentProcess(), // handle to the source process
client[clientid].childwpipe, // handle to duplicate
client[clientid].hProcess, // handle to process to duplicate to
&wpipe, // pointer to duplicate handle
GENERIC_WRITE, // access for duplicate handle
FALSE, // handle inheritance flag
0); // optional actions
gle = GetLastError();
_ASSERTE(brc && "server: duplicatehandle wpipe");
CloseHandle(client[clientid].childwpipe);
brc = PostThreadMessage(
client[clientid].tid, // thread identifier
WM_PIPEHANDLES, // message to post
(WPARAM) rpipe, // first message parameter
(LPARAM) wpipe); // second message parameter)
//start the ball rolling
timerid = SetTimer(
NULL, // handle of window for timer messages
0, // timer identifier
serverdelay+clientid, // time-out value
NULL); // address of timer procedure
_ASSERTE(timerid != 0 && "server: SetTimer in register failed");
client[clientid].timerid = timerid;
}
break;
case WM_TIMER:
{
static int num = 0;
if (num > (msgsperclient * numclients))
{
PostThreadMessage(dwMainThread,WM_QUIT,0,0);
break;
}
int i = FindClientFromTimerId(msg.wParam);
char buf[100];
sprintf(buf, "hi there (msg# %d) the server", num++);
WriteMessage(i, buf);
cout << "server: sent inmsg id=" << i << " len="<< strlen(buf) + 1 << " '" << buf << "'\n" << flush;
}
break;
case WM_OUTMSG:
{
char buf[512];
int clientid = msg.wParam;
ReadIncomingMessage(clientid, msg.lParam, buf, sizeof(buf));
cout << "server: recv outmsg id=" << clientid << " len=" << msg.lParam << " '" << buf << "'\n" << flush;
}
break;
default:
cout << "server: Unknown message type: " << msg.message << "\n";
break;
}
}
return 0;
}
|