Monday, August 22, 2005

Using ACE_SPIPE_Acceptor with ACE_Reactor

Using ACE_SPIPE_Acceptor with ACE_Reactor is not very straightforward. There are three things to be noticed:
1. Use ACE_WFMO_Reactor. ACE_Select_Reactor does not work with windows named pipe. Named pipe handle is not socket, so you can not use select() on it. ACE_WFMO_Reactor works because it uses WaitForMultipleObjects().
2. ACE_SPIPE_Acceptor should be registered with the ACE_Reactor::register_handler (ACE_Event_Handler *, ACE_HANDLE) method, since the handle to demultiplex on is an event handle.
3. handle_signal() will be called instead of handle_input() and handle_output(). WaitForMultipleObjects() can not tell whether read or write condition is happening, so ACE_WFMO_Reactor has to can WSAEventSelect() again to check, which means handle_input()/handle_output() only happens for sockets.

As you can see even if they work together, it is not exactly the same "Reactor" model we used to program sockets (no handle_input and handle_output). Proactor model works better with windows named pipe.

Here is an example:


class ClientAcceptor : public ACE_Event_Handler
{
    public:
    ClientAcceptor(ACE_Reactor& reactor)
    {
        this->reactor(&reactor);
    }
    
    int open (const ACE_SPIPE_Addr &listen_addr);
    
    int handle_signal(int signum, siginfo_t* = 0, ucontext_t* = 0);
    
    protected:
    ACE_SPIPE_Acceptor acceptor_;
};

int ClientAcceptor::open (const ACE_SPIPE_Addr &listen_addr)
{
    if (this->acceptor_.open (listen_addr, 1) == -1)
    {
        ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
        ACE_TEXT ("acceptor.open")),
        -1);
    }
    
    //register the event handle
    if( this->reactor()->register_handler(this, this->acceptor_.get_handle()) <
    0)
    {
        ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
        ACE_TEXT ("register_handler")),
        -1);
    }
    
    ACE_DEBUG((LM_DEBUG, ACE_TEXT("ClientAcceptor::open\n")));
    return 0;
}

int ClientAcceptor::handle_signal(int signum, siginfo_t*, ucontext_t*)
{
    ACE_DEBUG((LM_DEBUG, ACE_TEXT("ClientAcceptor::handle_signal\n")));
    
    ACE_SPIPE_Stream stm;
    if (this->acceptor_.accept (stm) == -1)
    {
        ACE_ERROR_RETURN ((LM_ERROR,
        ACE_TEXT ("(%P|%t) %p\n"),
        ACE_TEXT ("Failed to accept ")
        ACE_TEXT ("client connection")),
        -1);
    }
    
    //If you have only one client to deal with, now you can simply play with
    stm.
    //
    //To handle multiple clients at the same time, you can either:
    //a. create a new thread for every connection
    //b. use asynchronous IO (shift to Proactor model)
    
    return 0;
}

int ACE_TMAIN(int argc, ACE_TCHAR*argv[])
{
    ACE_WFMO_Reactor wfmo_reactor;
    ACE_Reactor reactor (&wfmo_reactor);
    
    ACE_SPIPE_Addr addr("mynamedpipe");
    
    ClientAcceptor acceptor(reactor);
    if (acceptor.open(addr) == -1)
    {
        ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("Can not open to listen:
        %m\n")), -1);
    }
    
    //Run event loop
    ACE_DEBUG((LM_NOTICE, ACE_TEXT("Server started\n")));
    reactor.run_reactor_event_loop();
    return 0;
}


0 Comments:

Post a Comment

<< Home