From 0ea5fc66924303d1bf73ba283a383e2aadee02f2 Mon Sep 17 00:00:00 2001 From: neodarz Date: Sat, 11 Aug 2018 20:21:34 +0200 Subject: Initial commit --- .../x-cvsweb-markup&sortby=date/index.html | 1083 ++++++++++++++++++++ 1 file changed, 1083 insertions(+) create mode 100644 cvs/cvsweb.cgi/code/nel/src/net/buf_server.cpp?rev=1.32&content-type=text/x-cvsweb-markup&sortby=date/index.html (limited to 'cvs/cvsweb.cgi/code/nel/src/net/buf_server.cpp?rev=1.32&content-type=text/x-cvsweb-markup&sortby=date/index.html') diff --git a/cvs/cvsweb.cgi/code/nel/src/net/buf_server.cpp?rev=1.32&content-type=text/x-cvsweb-markup&sortby=date/index.html b/cvs/cvsweb.cgi/code/nel/src/net/buf_server.cpp?rev=1.32&content-type=text/x-cvsweb-markup&sortby=date/index.html new file mode 100644 index 00000000..1fd0056b --- /dev/null +++ b/cvs/cvsweb.cgi/code/nel/src/net/buf_server.cpp?rev=1.32&content-type=text/x-cvsweb-markup&sortby=date/index.html @@ -0,0 +1,1083 @@ + + + +code/nel/src/net/buf_server.cpp - view - 1.32 + +
[BACK] Return to buf_server.cpp + CVS log [TXT][DIR] Up to Nevrax / code / nel / src / net

File: Nevrax / code / nel / src / net / buf_server.cpp (download)
+Revision 1.32, Tue Jul 2 15:56:58 2002 UTC (3 weeks, 5 days ago) by lecroart +
Branch: MAIN +
CVS Tags: HEAD
Changes since 1.31: +2 -2 + lines
+CHANGED: commented some nldebug for performance
+
+

/** \file buf_server.cpp
+ * Network engine, layer 1, server
+ *
+ * $Id: buf_server.cpp,v 1.32 2002/07/02 15:56:58 lecroart Exp $
+ */
+
+/* Copyright, 2001 Nevrax Ltd.
+ *
+ * This file is part of NEVRAX NEL.
+ * NEVRAX NEL is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2, or (at your option)
+ * any later version.
+
+ * NEVRAX NEL is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+
+ * You should have received a copy of the GNU General Public License
+ * along with NEVRAX NEL; see the file COPYING. If not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
+ * MA 02111-1307, USA.
+ */
+
+#include "stdnet.h"
+
+#include "nel/misc/hierarchical_timer.h"
+
+#include "nel/net/buf_server.h"
+
+#ifdef NL_OS_WINDOWS
+#include <winsock2.h>
+//typedef sint socklen_t;
+
+#elif defined NL_OS_UNIX
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/time.h>
+#endif
+
+
+using namespace NLMISC;
+using namespace std;
+
+namespace NLNET {
+
+
+/***************************************************************************************************
+ * User main thread (initialization)
+ **************************************************************************************************/
+
+
+/*
+ * Constructor
+ */
+CBufServer::CBufServer( TThreadStategy strategy,
+        uint16 max_threads, uint16 max_sockets_per_thread, bool nodelay, bool replaymode ) :
+        CBufNetBase(),
+        _ThreadStrategy( strategy ),
+        _NoDelay( nodelay ),
+        _MaxThreads( max_threads ),
+        _MaxSocketsPerThread( max_sockets_per_thread ),
+        _ConnectionCallback( NULL ),
+        _ConnectionCbArg( NULL ),
+        _BytesPushedOut( 0 ),
+        _BytesPoppedIn( 0 ),
+        _PrevBytesPoppedIn( 0 ),
+        _PrevBytesPushedOut( 0 ),
+        _ReplayMode( replaymode ),
+        _ListenTask( NULL ),
+        _ListenThread( NULL ),
+        _NbConnections (0),
+        _ThreadPool("CBufServer::_ThreadPool")
+{
+        nlnettrace( "CBufServer::CBufServer" );
+        if ( ! _ReplayMode )
+        {
+                _ListenTask = new CListenTask( this );
+                _ListenThread = IThread::create( _ListenTask );
+        }
+        /*{
+                CSynchronized<uint32>::CAccessor syncbpi ( &_BytesPushedIn );
+                syncbpi.value() = 0;
+        }*/
+}
+
+
+/*
+ * Listens on the specified port
+ */
+void CBufServer::init( uint16 port )
+{
+        nlnettrace( "CBufServer::init" );
+        if ( ! _ReplayMode )
+        {
+                _ListenTask->init( port );
+                _ListenThread->start();
+        }
+        else
+        {
+                nldebug( "LNETL0: Binding listen socket to any address, port %hu", port );
+        }
+}
+
+
+/*
+ * Begins to listen on the specified port (call before running thread)
+ */
+void CListenTask::init( uint16 port )
+{
+        nlnettrace( "CListenTask::init" );
+        _ListenSock.init( port );
+}
+
+
+/***************************************************************************************************
+ * User main thread (running)
+ **************************************************************************************************/
+
+
+/*
+ * Constructor
+ */
+CServerTask::CServerTask() : _ExitRequired(false)
+{
+#ifdef NL_OS_UNIX
+        pipe( _WakeUpPipeHandle );
+#endif
+}
+
+
+
+#ifdef NL_OS_UNIX
+/*
+ * Wake the thread up, when blocked in select (Unix only)
+ */
+void CServerTask::wakeUp()
+{
+        uint8 b;
+        if ( write( _WakeUpPipeHandle[PipeWrite], &b, 1 ) == -1 )
+        {
+                nldebug( "LNETL1: In CServerTask::wakeUp(): write() failed" );
+        }
+}
+#endif
+
+
+/*
+ * Destructor
+ */
+CServerTask::~CServerTask()
+{
+#ifdef NL_OS_UNIX
+        close( _WakeUpPipeHandle[PipeRead] );
+        close( _WakeUpPipeHandle[PipeWrite] );
+#endif
+}
+
+
+/*
+ * Destructor
+ */
+CBufServer::~CBufServer()
+{
+        nlnettrace( "CBufServer::~CBufServer" );
+
+        // Clean listen thread exit
+        if ( ! _ReplayMode )
+        {
+                ((CListenTask*)(_ListenThread->getRunnable()))->requireExit();
+                ((CListenTask*)(_ListenThread->getRunnable()))->close();
+#ifdef NL_OS_UNIX
+                _ListenTask->wakeUp();
+#endif
+                _ListenThread->wait();
+                delete _ListenThread;
+                delete _ListenTask;
+
+                // Clean receive thread exits
+                CThreadPool::iterator ipt;
+                {
+                        nldebug( "LNETL1: Waiting for end of threads..." );
+                        CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
+                        for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
+                        {
+                                // Tell the threads to exit and wake them up
+                                CServerReceiveTask *task = receiveTask(ipt);
+                                nlnettrace( "Requiring exit" );
+                                task->requireExit();
+
+                                // Wake the threads up
+        #ifdef NL_OS_UNIX
+                                task->wakeUp();
+        #else
+                                CConnections::iterator ipb;
+                                nlnettrace( "Closing sockets (Win32)" );
+                                {
+                                        CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
+                                        for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
+                                        {
+                                                (*ipb)->Sock->close();
+                                        }
+                                }
+        #endif
+
+                        }
+                        
+                        nlnettrace( "Waiting" );
+                        for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
+                        {
+                                // Wait until the threads have exited
+                                (*ipt)->wait();
+                        }
+
+                        nldebug( "LNETL1: Deleting sockets, tasks and threads..." );
+                        for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
+                        {
+                                // Delete the socket objects
+                                CServerReceiveTask *task = receiveTask(ipt);
+                                CConnections::iterator ipb;
+                                {
+                                        CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
+                                        for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
+                                        {
+                                                delete (*ipb);
+                                        }
+                                }
+
+        #ifdef NL_OS_UNIX
+                                // Under Unix, close the sockets now
+                                nlnettrace( "Closing sockets (Unix)" );
+                                {
+                                        CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
+                                        for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
+                                        {
+                                                (*ipb)->Sock->close();
+                                        }
+                                }
+        #endif
+                                
+                                // Delete the task objects
+                                delete task;
+
+                                // Delete the thread objects
+                                delete (*ipt);
+                        }
+                }
+        }
+
+        nlnettrace( "Exiting CBufServer::~CBufServer" );
+}
+
+
+/*
+ * Disconnect the specified host
+ * Set hostid to NULL to disconnect all connections.
+ * If hostid is not null and the socket is not connected, the method does nothing.
+ * If quick is true, any pending data will not be sent before disconnecting.
+ */
+void CBufServer::disconnect( TSockId hostid, bool quick )
+{
+        nlnettrace( "CBufServer::disconnect" );
+        if ( hostid != InvalidSockId )
+        {
+                // Disconnect only if physically connected
+                if ( hostid->Sock->connected() )
+                {
+                        if ( ! quick )
+                        {
+                                hostid->flush();
+                        }
+                        hostid->Sock->disconnect(); // the connection will be removed by the next call of update()
+                }
+        }
+        else
+        {
+                // Disconnect all
+                CThreadPool::iterator ipt;
+                {
+                        CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
+                        for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
+                        {
+                                CServerReceiveTask *task = receiveTask(ipt);
+                                CConnections::iterator ipb;
+                                {
+                                        CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
+                                        for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
+                                        {
+                                                if ( (*ipb)->Sock->connected() )
+                                                {
+                                                        if ( ! quick )
+                                                        {
+                                                                (*ipb)->flush();
+                                                        }
+                                                        (*ipb)->Sock->disconnect();
+                                                }
+                                        }
+                                }
+                        }
+                }
+        }
+}
+
+
+/*
+ * Send a message to the specified host
+ */
+void CBufServer::send( const CMemStream& buffer, TSockId hostid )
+{
+        nlnettrace( "CBufServer::send" );
+        nlassert( buffer.length() > 0);
+        nlassert( buffer.length() <= maxSentBlockSize() );
+
+        H_AUTO (CBufServer_send);
+
+        if ( hostid != InvalidSockId )
+        {
+                // debug features, we number all packet to be sure that they are all sent and received
+                // \todo remove this debug feature when ok
+//              nldebug ("send message number %u", hostid->SendNextValue);
+#ifdef NL_BIG_ENDIAN
+                uint32 val = NLMISC_BSWAP32(hostid->SendNextValue);
+#else
+                uint32 val = hostid->SendNextValue;
+#endif
+
+                *(uint32*)buffer.buffer() = val;
+                hostid->SendNextValue++;
+
+                pushBufferToHost( buffer, hostid );
+        }
+        else
+        {
+                // Push into all send queues
+                CThreadPool::iterator ipt;
+                {
+                        CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
+                        for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
+                        {
+                                CServerReceiveTask *task = receiveTask(ipt);
+                                CConnections::iterator ipb;
+                                {
+                                        CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
+                                        for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
+                                        {
+                                                // Send only if the socket is logically connected
+                                                if ( (*ipb)->connectedState() ) 
+                                                {
+                                                        // debug features, we number all packet to be sure that they are all sent and received
+                                                        // \todo remove this debug feature when ok
+//                                                      nldebug ("send message number %u", (*ipb)->SendNextValue);
+#ifdef NL_BIG_ENDIAN
+                                                        uint32 val = NLMISC_BSWAP32((*ipb)->SendNextValue);
+#else
+                                                        uint32 val = (*ipb)->SendNextValue;
+#endif
+                                                        *(uint32*)buffer.buffer() = val;
+                                                        (*ipb)->SendNextValue++;
+
+                                                        pushBufferToHost( buffer, *ipb );
+                                                }
+                                        }
+                                }
+                        }
+                }
+        }
+}
+
+
+/*
+ * Checks if there are some data to receive
+ */
+bool CBufServer::dataAvailable()
+{
+        H_AUTO (CBufServer_dataAvailable);
+        {
+                CFifoAccessor recvfifo( &receiveQueue() );
+                do
+                {
+                        // Check if the receive queue is empty
+                        if ( recvfifo.value().empty() )
+                        {
+                                return false;
+                        }
+                        else
+                        {
+                          /*sint32 mbsize = recvfifo.value().size() / 1048576;
+                          if ( mbsize > 0 )
+                            {
+                              nlwarning( "The receive queue size exceeds %d MB", mbsize );
+                            }*/
+
+                                uint8 val = recvfifo.value().frontLast();
+                                
+                                /*vector<uint8> buffer;
+                                recvfifo.value().front( buffer );*/
+
+                                // Test if it the next block is a system event
+                                //switch ( buffer[buffer.size()-1] )
+                                switch ( val )
+                                {
+                                        
+                                // Normal message available
+                                case CBufNetBase::User:
+                                        return true; // return immediatly, do not extract the message
+
+                                // Process disconnection event
+                                case CBufNetBase::Disconnection:
+                                {
+                                        vector<uint8> buffer;
+                                        recvfifo.value().front( buffer );
+
+                                        TSockId sockid = *((TSockId*)(&*buffer.begin()));
+                                        nldebug( "LNETL1: Disconnection event for %p %s", sockid, sockid->asString().c_str());
+
+                                        sockid->setConnectedState( false );
+
+                                        // Call callback if needed
+                                        if ( disconnectionCallback() != NULL )
+                                        {
+                                                disconnectionCallback()( sockid, argOfDisconnectionCallback() );
+                                        }
+
+                                        // Add socket object into the synchronized remove list
+                                        nldebug( "LNETL1: Adding the connection to the remove list" );
+                                        nlassert( ((CServerBufSock*)sockid)->ownerTask() != NULL );
+                                        ((CServerBufSock*)sockid)->ownerTask()->addToRemoveSet( sockid );
+                                        break;
+                                }
+                                // Process connection event
+                                case CBufNetBase::Connection:
+                                {
+                                        vector<uint8> buffer;
+                                        recvfifo.value().front( buffer );
+
+                                        TSockId sockid = *((TSockId*)(&*buffer.begin()));
+                                        nldebug( "LNETL1: Connection event for %p %s", sockid, sockid->asString().c_str());
+
+                                        sockid->setConnectedState( true );
+                                        
+                                        // Call callback if needed
+                                        if ( connectionCallback() != NULL )
+                                        {
+                                                connectionCallback()( sockid, argOfConnectionCallback() );
+                                        }
+                                        break;
+                                }
+                                default:
+                                        vector<uint8> buffer;
+                                        recvfifo.value().front( buffer );
+
+                                        nlinfo( "LNETL1: Invalid block type: %hu (should be = to %hu", (uint16)(buffer[buffer.size()-1]), (uint16)(val) );
+                                        nlinfo( "LNETL1: Buffer (%d B): [%s]", buffer.size(), stringFromVector(buffer).c_str() );
+                                        nlinfo( "LNETL1: Receive queue:" );
+                                        recvfifo.value().display();
+                                        nlerror( "LNETL1: Invalid system event type in server receive queue" );
+
+                                }
+
+                                // Extract system event
+                                recvfifo.value().pop();
+                        }
+                }
+                while ( true );
+        }
+}
+
+ 
+/*
+ * Receives next block of data in the specified. The length and hostid are output arguments.
+ * Precond: dataAvailable() has returned true, phostid not null
+ */
+void CBufServer::receive( CMemStream& buffer, TSockId* phostid )
+{
+        nlnettrace( "CBufServer::receive" );
+        //nlassert( dataAvailable() );
+        nlassert( phostid != NULL );
+        {
+                CFifoAccessor recvfifo( &receiveQueue() );
+                nlassert( ! recvfifo.value().empty() );
+                recvfifo.value().front( buffer );
+                recvfifo.value().pop();
+        }
+
+        // Extract hostid (and event type)
+        *phostid = *((TSockId*)&(buffer.buffer()[buffer.length()-sizeof(TSockId)-1]));
+        nlassert( buffer.buffer()[buffer.length()-1] == CBufNetBase::User );
+
+        // debug features, we number all packet to be sure that they are all sent and received
+        // \todo remove this debug feature when ok
+#ifdef NL_BIG_ENDIAN
+        uint32 val = NLMISC_BSWAP32(*(uint32*)buffer.buffer());
+#else
+        uint32 val = *(uint32*)buffer.buffer();
+#endif
+
+        //        nldebug ("receive message number %u", val);
+        if ((*phostid)->ReceiveNextValue != val)
+        {
+                nlstopex (("LNETL1: !!!LOST A MESSAGE!!! I received the message number %u but I'm waiting the message number %u (cnx %s), warn lecroart@nevrax.com with the log now please", val, (*phostid)->ReceiveNextValue, (*phostid)->asString().c_str()));
+                // resync the message number
+                (*phostid)->ReceiveNextValue = val;
+        }
+
+        (*phostid)->ReceiveNextValue++;
+
+        buffer.resize( buffer.length()-sizeof(TSockId)-1 );
+
+        // TODO OPTIM remove the nldebug for speed
+        //commented for optimisation nldebug( "LNETL1: Read buffer (%d+%d B) from %s", buffer.length(), sizeof(TSockId)+1, /*stringFromVector(buffer).c_str(), */(*phostid)->asString().c_str() );
+
+        // Statistics
+        _BytesPoppedIn += buffer.length() + sizeof(TBlockSize);
+}
+
+
+/*
+ * Update the network (call this method evenly)
+ */
+void CBufServer::update()
+{
+        //nlnettrace( "CBufServer::update-BEGIN" );
+
+        _NbConnections = 0;
+
+        // For each thread
+        CThreadPool::iterator ipt;
+        {
+          //nldebug( "UPD: Acquiring the Thread Pool" );
+                CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
+                //nldebug( "UPD: Acquired." );
+                for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
+                {
+                        // For each thread of the pool
+                        CServerReceiveTask *task = receiveTask(ipt);
+                        CConnections::iterator ipb;
+                        {
+                                CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
+                                for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
+                                {
+                                    // For each socket of the thread, update sending
+                                    if ( ! ((*ipb)->Sock->connected() && (*ipb)->update()) )
+                                    {
+                                                // Update did not work or the socket is not connected anymore
+                                        nldebug( "LNETL1: Socket %s is disconnected", (*ipb)->asString().c_str() );
+                                                // Disconnection event if disconnected (known either from flush (in update) or when receiving data)
+                                                (*ipb)->advertiseDisconnection( this, *ipb );
+                                        
+                                                /*if ( (*ipb)->advertiseDisconnection( this, *ipb ) )
+                                                {
+                                                        // Now the connection removal is in dataAvailable()
+                                                        // POLL6
+                                                }*/
+                                    }
+                                    else
+                                    {
+                                                _NbConnections++;
+                                    }
+                                }
+                        }
+                }
+        }
+
+        //nlnettrace( "CBufServer::update-END" );
+}
+
+uint32 CBufServer::getSendQueueSize( TSockId destid )
+{
+        if ( destid != InvalidSockId )
+        {
+                return destid->SendFifo.size();
+        }
+        else
+        {
+                // add all client buffers
+
+                uint32 total = 0;
+
+                // For each thread
+                CThreadPool::iterator ipt;
+                {
+                        CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
+                        for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
+                        {
+                                // For each thread of the pool
+                                CServerReceiveTask *task = receiveTask(ipt);
+                                CConnections::iterator ipb;
+                                {
+                                        CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
+                                        for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
+                                        {
+                                                // For each socket of the thread, update sending
+                                                total = (*ipb)->SendFifo.size ();
+                                        }
+                                }
+                        }
+                }
+                return total;
+        }
+}
+
+
+/*
+ * Returns the number of bytes received since the previous call to this method
+ */
+uint64 CBufServer::newBytesReceived()
+{
+        uint64 b = bytesReceived();
+        uint64 nbrecvd = b - _PrevBytesPoppedIn;
+        //nlinfo( "b: %"NL_I64"u   new: %"NL_I64"u", b, nbrecvd );
+        _PrevBytesPoppedIn = b;
+        return nbrecvd;
+}
+
+/*
+ * Returns the number of bytes sent since the previous call to this method
+ */
+uint64 CBufServer::newBytesSent()
+{
+        uint64 b = bytesSent();
+        uint64 nbsent = b - _PrevBytesPushedOut;
+        //nlinfo( "b: %"NL_I64"u   new: %"NL_I64"u", b, nbsent );
+        _PrevBytesPushedOut = b;
+        return nbsent;
+}
+
+
+/***************************************************************************************************
+ * Listen thread
+ **************************************************************************************************/
+
+
+/*
+ * Code of listening thread
+ */
+void CListenTask::run()
+{
+        nlnettrace( "CListenTask::run" );
+
+#ifdef NL_OS_UNIX
+        SOCKET descmax;
+        fd_set readers;
+        timeval tv;
+        descmax = _ListenSock.descriptor()>_WakeUpPipeHandle[PipeRead]?_ListenSock.descriptor():_WakeUpPipeHandle[PipeRead];
+#endif
+
+        // Accept connections
+        while ( ! exitRequired() )
+        {
+                try
+                {
+                        // Get and setup the new socket
+#ifdef NL_OS_UNIX
+                        FD_ZERO( &readers );
+                        FD_SET( _ListenSock.descriptor(), &readers );
+                        FD_SET( _WakeUpPipeHandle[PipeRead], &readers );
+                        tv.tv_sec = 60; /// \todo ace: we perhaps could put NULL to never wake up the select (look at the select man page)
+                        tv.tv_usec = 0;
+                        int res = ::select( descmax+1, &readers, NULL, NULL, &tv );
+
+                        switch ( res )
+                        {
+                        case  0 : continue; // time-out expired, no results
+                        case -1 :
+                                // we'll ignore message (Interrupted system call) caused by a CTRL-C
+                                if (CSock::getLastError() == 4)
+                                {
+                                        nldebug ("LNETL1: Select failed (in listen thread): %s (code %u) but IGNORED", CSock::errorString( CSock::getLastError() ).c_str(), CSock::getLastError());
+                                        continue;
+                                }
+                                nlerror( "LNETL1: Select failed (in listen thread): %s (code %u)", CSock::errorString( CSock::getLastError() ).c_str(), CSock::getLastError() );
+                        }
+
+                        if ( FD_ISSET( _WakeUpPipeHandle[PipeRead], &readers ) )
+                        {
+                                uint8 b;
+                                if ( read( _WakeUpPipeHandle[PipeRead], &b, 1 ) == -1 ) // we were woken-up by the wake-up pipe
+                                {
+                                        nldebug( "LNETL1: In CListenTask::run(): read() failed" );
+                                }
+                                nldebug( "LNETL1: listen thread select woken-up" );
+                                continue;
+                        }
+#endif
+                        nldebug( "LNETL1: Incoming connection..." );
+                        CServerBufSock *bufsock = new CServerBufSock( _ListenSock.accept() );
+                        nldebug( "New connection : %s", bufsock->asString().c_str() );
+                        bufsock->Sock->setNonBlockingMode( true );
+                        if ( _Server->noDelay() )
+                        {
+                                bufsock->Sock->setNoDelay( true );
+                        }
+
+                        // Notify the new connection
+                        bufsock->advertiseConnection( _Server );
+
+                        // Dispatch the socket into the thread pool
+                        _Server->dispatchNewSocket( bufsock );
+                }
+                catch ( ESocket& e )
+                {
+                        nlinfo( "Exception in listen thread: %s", e.what() ); // It can occur in normal behavior (e.g. when exiting)
+                        // It can also occur when too many sockets are open (e.g. 885 connections)
+                }
+        }
+
+        nlnettrace( "Exiting CListenTask::run" );
+}
+
+
+/*
+ * Binds a new socket and send buffer to an existing or a new thread
+ * Note: this method is called in the listening thread.
+ */
+void CBufServer::dispatchNewSocket( CServerBufSock *bufsock )
+{
+        nlnettrace( "CBufServer::dispatchNewSocket" );
+
+        CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
+        if ( _ThreadStrategy == SpreadSockets )        
+        {
+                // Find the thread with the smallest number of connections and check if all
+                // threads do not have the same number of connections
+                uint min = 0xFFFFFFFF;
+                uint max = 0;
+                CThreadPool::iterator ipt, iptmin, iptmax;
+                for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
+                {
+                        uint noc = receiveTask(ipt)->numberOfConnections();
+                        if ( noc < min )
+                        {
+                                min = noc;
+                                iptmin = ipt;
+                        }
+                        if ( noc > max )
+                        {
+                                max = noc;
+                                iptmax = ipt;
+                        }
+                }
+
+                // Check if we make the pool of threads grow (if we have not found vacant room
+                // and if it is allowed to)
+                if ( (poolsync.value().empty()) ||
+                         ((min == max) && (poolsync.value().size() < _MaxThreads)) )
+                {
+                        addNewThread( poolsync.value(), bufsock );
+                }
+                else
+                {
+                        // Dispatch socket to an existing thread of the pool
+                        CServerReceiveTask *task = receiveTask(iptmin);
+                        bufsock->setOwnerTask( task );
+                        task->addNewSocket( bufsock );
+#ifdef NL_OS_UNIX
+                        task->wakeUp();
+#endif                  
+                        
+                        if ( min >= (uint)_MaxSocketsPerThread )
+                        {
+                                nlwarning( "LNETL1: Exceeding the maximum number of sockets per thread" );
+                        }
+                        nldebug( "LNETL1: New socket dispatched to thread %d", iptmin-poolsync.value().begin() );
+                }
+
+        }
+        else // _ThreadStrategy == FillThreads
+        {
+                CThreadPool::iterator ipt;
+                for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
+                {
+                        uint noc = receiveTask(ipt)->numberOfConnections();
+                        if ( noc < _MaxSocketsPerThread )
+                        {
+                                break;
+                        }
+                }
+
+                // Check if we have to make the thread pool grow (if we have not found vacant room)
+                if ( ipt == poolsync.value().end() )
+                {
+                        if ( poolsync.value().size() == _MaxThreads )
+                        {
+                                nlwarning( "LNETL1: Exceeding the maximum number of threads" );
+                        }
+                        addNewThread( poolsync.value(), bufsock );
+                }
+                else
+                {
+                        // Dispatch socket to an existing thread of the pool
+                        CServerReceiveTask *task = receiveTask(ipt);
+                        bufsock->setOwnerTask( task );
+                        task->addNewSocket( bufsock );
+#ifdef NL_OS_UNIX
+                        task->wakeUp();
+#endif                  
+                        nldebug( "LNETL1: New socket dispatched to thread %d", ipt-poolsync.value().begin() );
+                }
+        }
+}
+
+
+/*
+ * Creates a new task and run a new thread for it
+ * Precond: bufsock not null
+ */
+void CBufServer::addNewThread( CThreadPool& threadpool, CServerBufSock *bufsock )
+{
+        nlnettrace( "CBufServer::addNewThread" );
+        nlassert( bufsock != NULL );
+
+        // Create new task and dispatch the socket to it
+        CServerReceiveTask *task = new CServerReceiveTask( this );
+        bufsock->setOwnerTask( task );
+        task->addNewSocket( bufsock );
+
+        // Add a new thread to the pool, with this task
+        IThread *thr = IThread::create( task );
+        {
+                threadpool.push_back( thr );
+                thr->start();
+                nldebug( "LNETL1: Added a new thread; pool size is %d", threadpool.size() );
+                nldebug( "LNETL1: New socket dispatched to thread %d", threadpool.size()-1 );
+        }
+}
+
+
+/***************************************************************************************************
+ * Receive threads
+ **************************************************************************************************/
+
+
+/*
+ * Code of receiving threads for servers
+ */
+void CServerReceiveTask::run()
+{
+        nlnettrace( "CServerReceiveTask::run" );
+
+        SOCKET descmax;
+        fd_set readers;
+
+        // Time-out value for select (it can be long because we do not do any thing else in this thread)
+        timeval tv;
+#if defined NL_OS_UNIX
+        // POLL7
+        nice( 2 );
+#endif // NL_OS_UNIX
+        
+        // Copy of _Connections
+        vector<TSockId>        connections_copy;        
+
+        while ( ! exitRequired() )
+        {
+                // 1. Remove closed connections
+                clearClosedConnections();
+
+                // POLL8
+
+                // 2-SELECT-VERSION : select() on the sockets handled in the present thread
+
+                descmax = 0;
+                FD_ZERO( &readers );
+                bool skip;
+                bool alldisconnected = true;
+                CConnections::iterator ipb;
+                {
+                        // Lock _Connections
+                        CSynchronized<CConnections>::CAccessor connectionssync( &_Connections );
+
+                        // Prepare to avoid select if there is no connection
+                        skip = connectionssync.value().empty();
+
+                        // Fill the select array and copy _Connections
+                        connections_copy.clear();
+                        for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
+                        {
+                                if ( (*ipb)->Sock->connected() ) // exclude disconnected sockets that are not deleted
+                                                                                                 // Note: there is a mutex in there !
+                                {
+                                        alldisconnected = false;
+                                        // Copy _Connections element
+                                        connections_copy.push_back( *ipb );
+
+                                        // Add socket descriptor to the select array
+                                        FD_SET( (*ipb)->Sock->descriptor(), &readers );
+
+                                        // Calculate descmax for select
+                                        if ( (*ipb)->Sock->descriptor() > descmax )
+                                        {
+                                                descmax = (*ipb)->Sock->descriptor();
+                                        }
+                                }
+                        }
+
+#ifdef NL_OS_UNIX
+                        // Add the wake-up pipe into the select array
+                        FD_SET( _WakeUpPipeHandle[PipeRead], &readers );
+                        if ( _WakeUpPipeHandle[PipeRead]>descmax )
+                        {
+                                descmax = _WakeUpPipeHandle[PipeRead];
+                        }
+#endif
+                        
+                        // Unlock _Connections, use connections_copy instead
+                }
+
+#ifndef NL_OS_UNIX
+                // Avoid select if there is no connection (Windows only)
+                if ( skip || alldisconnected )
+                {
+                        nlSleep( 1 ); // nice
+                        continue;
+                }
+#endif
+
+#ifdef NL_OS_WINDOWS
+                tv.tv_sec = 0; // short time because the newly added connections can't be added to the select fd_set
+                tv.tv_usec = 10000; // NEW: set to 500ms because otherwise new connections handling are too slow
+#elif defined NL_OS_UNIX
+                // POLL7
+                tv.tv_sec = 3600;                // 1 hour (=> 1 select every 3.6 second for 1000 connections)
+                tv.tv_usec = 0;
+#endif // NL_OS_WINDOWS
+
+                // Call select
+                int res = ::select( descmax+1, &readers, NULL, NULL, &tv );
+
+                // POLL9
+
+                // 3. Test the result
+                switch ( res )
+                {
+                        case  0 : continue; // time-out expired, no results
+
+                        /// \todo cado: the error code is not properly retrieved
+                        case -1 :
+                                // we'll ignore message (Interrupted system call) caused by a CTRL-C
+                                /*if (CSock::getLastError() == 4)
+                                {
+                                        nldebug ("LNETL1: Select failed (in receive thread): %s (code %u) but IGNORED", CSock::errorString( CSock::getLastError() ).c_str(), CSock::getLastError());
+                                        continue;
+                                }*/
+                                //nlerror( "LNETL1: Select failed (in receive thread): %s (code %u)", CSock::errorString( CSock::getLastError() ).c_str(), CSock::getLastError() );
+                                nldebug( "LNETL1: Select failed (in receive thread): %s (code %u)", CSock::errorString( CSock::getLastError() ).c_str(), CSock::getLastError() );
+                                return;
+                }
+
+                // 4. Get results
+
+                vector<TSockId>::iterator ic;
+                for ( ic=connections_copy.begin(); ic!=connections_copy.end(); ++ic )
+                {
+                        if ( FD_ISSET( (*ic)->Sock->descriptor(), &readers ) != 0 )
+                        {
+                                CServerBufSock *serverbufsock = static_cast<CServerBufSock*>(static_cast<CBufSock*>(*ic));
+                                try
+                                {
+                                        // 4. Receive data
+                                        if ( serverbufsock->receivePart() )
+                                        {
+                                                // Copy sockid
+                                                vector<uint8> hidvec;
+                                                hidvec.resize( sizeof(TSockId)+1 );
+                                                memcpy( &*hidvec.begin(), &(*ic), sizeof(TSockId) );
+
+                                                // Add event type to hidvec
+                                                hidvec[sizeof(TSockId)] = (uint8)CBufNetBase::User;
+
+                                                // Push message into receive queue
+                                                //uint32 bufsize;
+                                                //sint32 mbsize;
+                                                {
+                                                        //nldebug( "RCV: Acquiring the receive queue... ");
+                                                        CFifoAccessor recvfifo( &_Server->receiveQueue() );
+                                                        //nldebug( "RCV: Acquired, pushing the received buffer... ");
+                                                        recvfifo.value().push( serverbufsock->receivedBuffer(), hidvec );
+                                                        //nldebug( "RCV: Pushed, releasing the receive queue..." );
+                                                        //recvfifo.value().display();
+                                                        //bufsize = serverbufsock->receivedBuffer().size();
+                                                        //mbsize = recvfifo.value().size() / 1048576;
+                                                }
+                                                //nldebug( "RCV: Released." );
+                                                /*if ( mbsize > 1 )
+                                                {
+                                                        nlwarning( "The receive queue size exceeds %d MB", mbsize );
+                                                }*/
+                                                /*
+                                                // Statistics
+                                                {
+                                                        CSynchronized<uint32>::CAccessor syncbpi ( &_Server->syncBytesPushedIn() );
+                                                        syncbpi.value() += bufsize;
+                                                }
+                                                */
+                                        }
+                                }
+                                catch ( ESocketConnectionClosed& )
+                                {
+                                        nldebug( "LNETL1: Connection %s closed", serverbufsock->asString().c_str() );
+                                }
+                                catch ( ESocket& )
+                                {
+                                        nldebug( "LNETL1: Connection %s broken", serverbufsock->asString().c_str() );
+                                        (*ic)->Sock->disconnect();
+                                }
+/*
+#ifdef NL_OS_UNIX
+                                skip = true; // don't check _WakeUpPipeHandle (yes, check it to read any written byte)
+#endif
+
+*/
+                        }
+
+                }
+
+#ifdef NL_OS_UNIX
+                // Test wake-up pipe
+                if ( (!skip) && (FD_ISSET( _WakeUpPipeHandle[PipeRead], &readers )) )
+                {
+                        uint8 b;
+                        if ( read( _WakeUpPipeHandle[PipeRead], &b, 1 ) == -1 ) // we were woken-up by the wake-up pipe
+                        {
+                                nldebug( "LNETL1: In CServerReceiveTask::run(): read() failed" );
+                        }
+                        nldebug( "LNETL1: Receive thread select woken-up" );
+                }
+#endif
+                
+        }
+        nlnettrace( "Exiting CServerReceiveTask::run" );
+}
+
+
+/*
+ * Delete all connections referenced in the remove list (double-mutexed)
+ */
+
+void CServerReceiveTask::clearClosedConnections()
+{
+        CConnections::iterator ic;
+        {
+                NLMISC::CSynchronized<CConnections>::CAccessor removesetsync( &_RemoveSet );
+                {
+                        if ( ! removesetsync.value().empty() )
+                        {
+                                // Delete closed connections
+                                NLMISC::CSynchronized<CConnections>::CAccessor connectionssync( &_Connections );
+                                for ( ic=removesetsync.value().begin(); ic!=removesetsync.value().end(); ++ic )
+                                {
+                                        nldebug( "LNETL1: Removing a connection" );
+
+                                        TSockId sid = (*ic);
+
+                                        // Remove from the connection list
+                                        connectionssync.value().erase( *ic );
+
+                                        // Delete the socket object
+                                        delete sid;
+                                }
+                                // Clear remove list
+                                removesetsync.value().clear();
+                        }
+                }
+        }
+}
+
+
+} // NLNET
+
\ No newline at end of file -- cgit v1.2.1