testMsgLoop : Inter-process communication using pipes and message loops

Download testmsgloop.zip

Synopsis:

client.cpp
server.cpp
testMsgLoop.h


client.cpp

Synopsis
#include "testMsgLoop.h"

using std::cout;
using std::flush;

static DWORD ServerCommThreadId = 0;
static DWORD gle = 0;
static HANDLE rpipe = 0;
static HANDLE wpipe = 0;

//--------------------------------------------
//-- some helper functions
static UINT CreateTimer(int clientid)
  {
  UINT timerid = SetTimer(
    NULL,            // handle of window for timer messages
    0,               // timer identifier
    clientdelay+clientid,  // time-out value
    NULL);           // address of timer procedure
  _ASSERTE(timerid != 0 && "client: SetTimer before loop failed");
  return timerid;
  }

static void RegisterWithParent(int clientid)
  {
  //register with the parent telling it out threadid
  BOOL brc = PostThreadMessage(
    ServerCommThreadId,                    // thread identifier
    WM_REGISTER,                     // message to post
    (WPARAM)clientid,                 // first message parameter
    (LPARAM)GetCurrentThreadId());   // second message parameter)
  _ASSERTE(brc && "client: postthreadmessage on register failed");
  }

static void SendMessage(int clientid, char* buf)
  {
  if (!wpipe) return;
  DWORD len = strlen(buf) + 1;
  DWORD byteswritten = 0;
  
  BOOL brc = WriteFile(
    wpipe,                   // handle to file to write to
    buf,                     // pointer to data to write to file
    len,                     // number of bytes to write
    &byteswritten,           // pointer to number of bytes written
    0);                      // pointer to structure for overlapped I/O
  gle = GetLastError();
  _ASSERTE(brc && "client: writefile failed");
  _ASSERTE(len == byteswritten && "client: number of bytes written != size of buffer");
  
  cout << "client: sent outmsg id=" << clientid << " len=" << len << " '" << buf << "'\n" << flush;
  
  brc = PostThreadMessage(
    ServerCommThreadId,      // thread identifier
    WM_OUTMSG,         // message to post
    (WPARAM) clientid,  // first message parameter
    (LPARAM) len);     // second message parameter)
  
  _ASSERTE(brc && "client: PostThreadMessage wm_outmsg failed");
  }

static void ReadIncomingMessage(int len, char* buf, int maxlen)
  {
  _ASSERTE(rpipe && "client: inmsg came in but rpipe is not set");
  if (!rpipe) return;
  
  _ASSERTE(len > 0 && len < maxlen && "client: wm_inmsg buflen is invalid");
  
  DWORD bytesread = 0;
  
  BOOL brc = ReadFile(
    rpipe,                   // handle of file to read
    buf,                     // pointer to buffer that receives data
    len,                     // number of bytes to read
    &bytesread,              // pointer to number of bytes read
    0);                      // pointer to structure for data
  buf[maxlen - 1] = 0;
  
  _ASSERTE(bytesread == len && "client: bytesread != len");
  gle = GetLastError();
  _ASSERTE(brc && "client: readfile: failed");
  _ASSERTE(buf[len-1] == 0 && "client: expected a null terminated string!");
  buf[len-1] = 0;
  }


void main(int argc, char** argv)
  {
  //_ASSERTE(0);
  _ASSERTE(argc == 3 && "client: requires 3 parms");
  
  //the client id is passed on the command line
  int clientid = atoi(argv[1]);
  _ASSERTE(clientid >= 1 && clientid <= 32 && "client: invalid clientid");
  
  //the commthread id is passed on the command line
  ServerCommThreadId = atol(argv[2]);
  _ASSERTE(ServerCommThreadId && "client: invalid commthread id");
  
  //force the queue creation
  MSG msg;
  PeekMessage(&msg, NULL, WM_USER, WM_USER, PM_NOREMOVE);
  
  cout << "client: strt id=" << clientid << " tid=" << GetCurrentThreadId() << "\n" << flush;
  
  RegisterWithParent(clientid);
  UINT timerid = CreateTimer(clientid); //start a timer so we can send messages back
  
  //start the loop going...
  while (GetMessage(&msg, 0, 0, 0))
    {
    switch (msg.message)
      {
      //--------------------------
      //-- we're dead, pop out of the loop
      case WM_PIPEHANDLES:
        {
        rpipe = (HANDLE) msg.wParam;
        wpipe = (HANDLE) msg.lParam;
        _ASSERTE(rpipe && "client: rpipe is null");
        _ASSERTE(wpipe && "client: wpipe is null");
        cout << "client: recv pipehandles id=" << clientid << "\n" << flush;
        }
        break;
        
        //--------------------------
        //-- we're dead, pop out of the loop
      case WM_CLOSE:
        cout << "client: recv wm_close id=" << clientid << "\n" << flush;
        KillTimer(NULL, timerid);
        PostQuitMessage(0);
        break;
        
        //-----------------------
        //timer popped: send a message to the parent
        //this simulates an asynchronous message generated from the client
      case WM_TIMER:
        {
        char buf[100];
        static int num = 0;
        
        sprintf(buf, "client: id=%ld hi there (msg# %d)", (long) clientid, num++);
        SendMessage(clientid, buf);
        }
        break;
        
        //--------------------------
        //parent sent us a message asynchronously
      case WM_INMSG:
        {
        _ASSERTE(msg.lParam == 0); //lParam is unused
        char buf[512];
        ReadIncomingMessage(msg.wParam, buf, sizeof(buf));
        cout << "client: recv inmsg  id=" << clientid << " len="<< msg.wParam <<  " '" << buf << "'\n" << flush;
        }
        break;
      }
    }
  
  cout << "client: exit id=" << clientid << "\n" << flush;
  
  CloseHandle(rpipe);
  CloseHandle(wpipe);
  }


server.cpp

Synopsis
#include "testMsgLoop.h"

using std::cout;
using std::flush;

static DWORD __stdcall CommThread(void* parms);
static HANDLE hCommThread;   //handle of comm thread
static DWORD  dwCommThread;  //id of comm thread
static DWORD  dwMainThread;  //id of main thread

//array holds info about all of the child processes
struct  _clientarray
  {
  DWORD   tid;
  bool    registered;
  UINT    timerid;
  HANDLE  rpipe;      //read from this handle
  HANDLE  wpipe;      //write to this handle
  HANDLE  childrpipe; //temp. holder for child handles
  HANDLE  childwpipe; //temp. holder for child handles
  HANDLE  hProcess;   //child process handle
  } client[numclients+1];  //the zeroth element is not used
DWORD gle;

static void CreateChild(int id)
  {
  PROCESS_INFORMATION pi;
  
  STARTUPINFO si;
  memset(&si, 0x00, sizeof(si));
  si.cb = sizeof(si);
  
  char buf[100];
  sprintf(buf, "%s\\client.exe %d %ld", cExecutablesDir, id, (long) dwCommThread);
  BOOL brc = CreateProcess(
    0,                  // pointer to name of executable module
    buf,                // pointer to command line string
    NULL,               // pointer to process security attributes
    NULL,               // pointer to thread security attributes
    TRUE,               // handle inheritance flag
    0,                  // creation flags
    NULL,               // pointer to new environment block
    cExecutablesDir, // pointer to current directory name
    &si,                // pointer to STARTUPINFO
    &pi);               // pointer to PROCESS_INFORMATION
  
  gle = GetLastError();
  _ASSERTE(brc && "server: createprocess failed");
  
  //wait for the process to come up and the msg loop to be created
  WaitForInputIdle(
    pi.hProcess,   // handle to process
    INFINITE);     // time-out interval in milliseconds
  
  client[id].hProcess = pi.hProcess;
  CloseHandle(pi.hThread);
  }

//---------------------------------------------------------------------------------
static void StartClients()
  {
  BOOL brc;
  
  SECURITY_ATTRIBUTES sa;
  memset(&sa, 0x00, sizeof(sa));
  sa.nLength = sizeof(sa);
  sa.lpSecurityDescriptor = NULL;
  sa.bInheritHandle = TRUE;
  
  HANDLE hReadTmp;
  HANDLE hWriteTmp;
  
  for(int id=1; id <= numclients; id++)
    {
    _ASSERTE(!client[id].registered && "server: client is already registered before created?");
    
    //create the necessary pipes, we'll send the handles to the child when it registers with us
    brc = CreatePipe(
      &hReadTmp,                       // pointer to read handle
      &hWriteTmp,                      // pointer to write handle
      &sa,                             // pointer to security attributes
      0);                              // pipe size
    _ASSERTE(brc && "server: createpipe 1 failed");
    client[id].childwpipe = hWriteTmp;
    client[id].rpipe = hReadTmp;
    
    //create the pipe
    brc = CreatePipe(
      &hReadTmp,                        // pointer to read handle
      &hWriteTmp,                       // pointer to write handle
      &sa,                              // pointer to security attributes
      0);                               // pipe size
    _ASSERTE(brc && "server: createpipe 2 failed");
    
    client[id].wpipe = hWriteTmp;
    client[id].childrpipe = hReadTmp;
    
    CreateChild(id);
    }
  }


//------------------------------------------------
//-- some helper functions
void SendShutdown(DWORD id)
  {
  //tell each child to shutdown
  BOOL brc = PostThreadMessage(
    id,        // thread identifier
    WM_CLOSE,  // message to post
    0,         // first message parameter
    0);        // second message parameter)
  }

void KillAllTimers()
  {
  for(int i = 1; i <= numclients; i++)
    {
    if (client[i].registered)
      {
      //kill the associated timer
      KillTimer(NULL, client[i].timerid);
      }
    }
  }

void ShutdownAllClients()
  {
  for(int i = 1; i <= numclients; i++)
    {
    if (!client[i].registered) continue;
    SendShutdown(client[i].tid);
    }
  }

void WaitForAllToDie()
  {
  for(int i = 1; i <= numclients; i++)
    {
    if (!client[i].registered) continue;
    //wait for the child process to end
    WaitForSingleObject(client[i].hProcess, INFINITE);
    
    //close all handles
    CloseHandle(client[i].hProcess);
    CloseHandle(client[i].rpipe);
    CloseHandle(client[i].wpipe);
    }
  }

static void EndClients()
  {
  KillAllTimers(); //first stop sending messages... the client will stop responding.
  ShutdownAllClients(); //now that the clients are stable and idle, tell them to quit
  WaitForAllToDie(); //now wait for the processes to end
  }

static void StartCommunicationThread()
  {
  hCommThread = CreateThread( 
    NULL,                 // pointer to thread security attributes
    0,                    // initial thread stack size, in bytes
    CommThread,           // pointer to thread function
    (void*) 0,            // argument for new thread
    0,                    // creation flags
    &dwCommThread);       // pointer to returned thread identifier
  }

static void EndCommunicationThread()
  {
  //all clean, finally tell the comm thread to quit
  PostThreadMessage(dwCommThread,WM_QUIT,0,0);
  
  //wait for it to exit
  WaitForSingleObject(hCommThread, INFINITE);
  
  //all done
  CloseHandle(hCommThread);
  }

static void ReadIncomingMessage(int clientid, int len, char* buf, int maxlen)
  {
  _ASSERTE(clientid >= 1 && clientid <= numclients && "server: wm_outmsg clientid is invalid");
  _ASSERTE(client[clientid].registered && "server: client not registered");
  
  _ASSERTE(len > 0 && len < maxlen && "server: wm_outmsg buflen is invalid");
  DWORD bytesread = 0;
  
  BOOL brc = ReadFile(
    client[clientid].rpipe,    // handle of file to read
    buf,                     // pointer to buffer that receives data
    len,                     // number of bytes to read
    &bytesread,              // pointer to number of bytes read
    0);                      // pointer to structure for data
  buf[maxlen - 1] = 0;
  gle = GetLastError();
  _ASSERTE(buf[len-1] == 0 && "server: expected a null terminated string!");
  buf[len - 1] = 0;
  }

static void WriteMessage(int i, char* buf)
  {
  DWORD len = strlen(buf) + 1; //one for the null!
  DWORD byteswritten = 0;
  BOOL brc = WriteFile(
    client[i].wpipe,          // handle to file to write to
    buf,                     // pointer to data to write to file
    len,                     // number of bytes to write
    &byteswritten,           // pointer to number of bytes written
    0);                      // pointer to structure for overlapped I/O
  gle = GetLastError();
  _ASSERTE(brc && "server: writefile failed");
  _ASSERTE(len == byteswritten && "server: number of bytes written != size of buffer");
  
  //tell the child that we've written to the pipe
  brc = PostThreadMessage(
    client[i].tid,   // thread identifier
    WM_INMSG,       // message to post
    (WPARAM) len,   // first message parameter
    0);             // second message parameter)
  
  _ASSERTE(brc && "server: PostThreadMessage wm_inmsg failed");
  }

static int FindClientFromTimerId(UINT timerid)
  {
  //search for timerid in the array, send a msg to the client appl
  int i;
  for(i = 1; i <= numclients; i++)
    {
    if (client[i].timerid == timerid)
      break;
    }
  _ASSERTE(i >= 1 && i <= numclients && "server: could not find timerid in the array");
  return i;
  }

//----------------------
void main()
  {
  //ensure everything is 0
  memset(&client, 0x00, sizeof(client));
  
  //save the current threadid so we can be shutdown
  dwMainThread = GetCurrentThreadId();
  
  //start everything up
  StartCommunicationThread();
  StartClients();
  
  //wait for the world to finish, handle incoming messages
  MSG msg;
  while (GetMessage(&msg, 0, 0, 0))
    {
    DispatchMessage(&msg);
    }
  
  //the comm. thread has finished and sent a wm_quit to us, so cleanup
  EndClients();
  EndCommunicationThread();
  }

//-------------------------------------------------------
DWORD __stdcall CommThread(void* parms)
  {
  MSG msg;
  UINT timerid;
  BOOL brc;
  
  //force the queue creation
  PeekMessage(&msg, NULL, WM_USER, WM_USER, PM_NOREMOVE);
  
  while (GetMessage(&msg, 0, 0, 0))
    {
    switch (msg.message)
      {
      //----------------------
      //-- child process sends a register immediately after starting
      case WM_REGISTER :
        {
        int clientid = msg.wParam; 
        _ASSERTE(clientid >= 1 && clientid <= numclients && "server: register clientid is invalid");
        _ASSERTE(!client[clientid].registered && "server: already registered");
        client[clientid].registered = true;
        client[clientid].tid = msg.lParam;
        cout << "server: rgst client id=" << clientid << " tid=" << msg.lParam << "\n" << flush;
        
        //duplicate the handles so we can close our side
        HANDLE rpipe = 0;
        HANDLE wpipe = 0;
        brc = DuplicateHandle(
          GetCurrentProcess(),         // handle to the source process
          client[clientid].childrpipe,   // handle to duplicate
          client[clientid].hProcess,     // handle to process to duplicate to
          &rpipe,                      // pointer to duplicate handle
          GENERIC_READ,    // access for duplicate handle
          FALSE,                       // handle inheritance flag
          0);                          // optional actions
        gle = GetLastError();
        _ASSERTE(brc && "server: duplicatehandle rpipe");
        CloseHandle(client[clientid].childrpipe);
        
        brc = DuplicateHandle(
          GetCurrentProcess(),         // handle to the source process
          client[clientid].childwpipe,   // handle to duplicate
          client[clientid].hProcess,     // handle to process to duplicate to
          &wpipe,                      // pointer to duplicate handle
          GENERIC_WRITE,    // access for duplicate handle
          FALSE,                       // handle inheritance flag
          0);                          // optional actions
        gle = GetLastError();
        _ASSERTE(brc && "server: duplicatehandle wpipe");
        CloseHandle(client[clientid].childwpipe);
        
        brc = PostThreadMessage(
          client[clientid].tid,   // thread identifier
          WM_PIPEHANDLES,       // message to post
          (WPARAM) rpipe,       // first message parameter
          (LPARAM) wpipe);      // second message parameter)
        
        //start the ball rolling
        timerid = SetTimer(
          NULL,           // handle of window for timer messages
          0,              // timer identifier
          serverdelay+clientid,   // time-out value
          NULL);          // address of timer procedure
        _ASSERTE(timerid != 0 && "server: SetTimer in register failed");
        client[clientid].timerid = timerid;
        }
        break;
        
      case WM_TIMER:
        {
        static int num = 0;
        if (num > (msgsperclient * numclients))
          {
          PostThreadMessage(dwMainThread,WM_QUIT,0,0);
          break;
          }
        
        int i = FindClientFromTimerId(msg.wParam);
        char buf[100];
        sprintf(buf, "hi there (msg# %d) the server", num++);
        WriteMessage(i, buf);
        cout << "server: sent inmsg  id=" << i << " len="<< strlen(buf) + 1 << " '" << buf << "'\n" << flush;
        }
        break;
        
      case WM_OUTMSG:
        {
        char buf[512];
        int clientid = msg.wParam;
        ReadIncomingMessage(clientid, msg.lParam, buf, sizeof(buf));
        cout << "server: recv outmsg id=" << clientid << " len=" << msg.lParam << " '" << buf << "'\n" << flush;
        }
        break;
        
      default:
        cout << "server: Unknown message type: " << msg.message << "\n";
        break;
      }
    }
  
  return 0;
  }
  

testMsgLoop.h

Synopsis
#pragma once

#include <windows.h>
#include <crtdbg.h>
#include <process.h>
#include <iostream>
#include <iomanip>
#include <stdio.h>

const UINT WM_REGISTER     = WM_USER+1;
const UINT WM_OUTMSG       = WM_USER+2;
const UINT WM_INMSG        = WM_USER+3;
const UINT WM_PIPEHANDLES  = WM_USER+4;
const UINT WM_PARENTPID    = WM_USER+5;

#ifdef _DEBUG
const char * const cExecutablesDir = "d:\\projects\\debug\\testMsgLoop";
#else
const char * const cExecutablesDir = "d:\\projects\\release\\testMsgLoop";
#endif
const int   clientdelay = 100;
const int   numclients = 3;  //number of clients to start
const int   msgsperclient = 3;  //number of clients to start
const long  serverdelay = 100;







Contact me about content on this page using john_web-at-arrizza-dot-com
For Web Master or site problems contact: webadmin-at-arrizza-dot-com
Copyright John Arrizza (c) 2001-2010