NLNET::CServerReceiveTask Class Reference

#include <buf_server.h>

Inheritance diagram for NLNET::CServerReceiveTask:

NLMISC::IRunnable NLNET::CServerTask

Detailed Description

Code of receiving threads for servers. Note: the methods locations in the classes do not correspond to the threads where they are executed, but to the data they use.

Definition at line 410 of file buf_server.h.

Public Member Functions

void addNewSocket (TSockId sockid)
 Add a new connection into this thread (mutexed on _Connections).

void addToRemoveSet (TSockId sockid)
void clearClosedConnections ()
 Delete all connections referenced in the remove list (mutexed on _RemoveSet and on _Connections).

 CServerReceiveTask (CBufServer *server)
 Constructor.

virtual void getName (std::string &result) const
uint numberOfConnections ()
 Returns the number of connections handled by the thread (mutexed on _Connections).

void requireExit ()
 Tells the task to exit.

virtual void run ()
 Run.

CBufServerserver ()
 Access to the server.


Data Fields

uint32 NbLoop

Protected Member Functions

bool exitRequired () const
 Returns true if the requireExit() has been called.


Private Attributes

NLMISC::CSynchronized< CConnections_Connections
NLMISC::CSynchronized< CConnections_RemoveSet
CBufServer_Server

Friends

class CBufServer


Constructor & Destructor Documentation

NLNET::CServerReceiveTask::CServerReceiveTask CBufServer server  )  [inline]
 

Constructor.

Definition at line 415 of file buf_server.h.

References _Connections, and _RemoveSet.

00415 : CServerTask(), _Server(server), _Connections("CServerReceiveTask::_Connections"), _RemoveSet("CServerReceiveTask::_RemoveSet") {}


Member Function Documentation

void NLNET::CServerReceiveTask::addNewSocket TSockId  sockid  )  [inline]
 

Add a new connection into this thread (mutexed on _Connections).

Definition at line 432 of file buf_server.h.

References _Connections, nlassert, and NLNET::TSockId.

Referenced by NLNET::CBufServer::addNewThread(), and NLNET::CBufServer::dispatchNewSocket().

00433         {
00434                 //nlnettrace( "CServerReceiveTask::addNewSocket" );
00435                 nlassert( sockid != InvalidSockId );
00436                 {
00437                         NLMISC::CSynchronized<CConnections>::CAccessor connectionssync( &_Connections );
00438                         connectionssync.value().insert( sockid );
00439                 }
00440                 // POLL3
00441         }

void NLNET::CServerReceiveTask::addToRemoveSet TSockId  sockid  )  [inline]
 

Add connection to the remove set (mutexed on _RemoveSet) Note: you must not call this method within a mutual exclusion on _Connections, or there will be a deadlock (see clearClosedConnection())

Definition at line 449 of file buf_server.h.

References _RemoveSet, nlassert, nlnettrace, and NLNET::TSockId.

00450         {
00451                 nlnettrace( "CServerReceiveTask::addToRemoveSet" );
00452                 nlassert( sockid != InvalidSockId );
00453                 {
00454                         // Three possibilities :
00455                         // - The value is inserted into the set.
00456                         // - The value is already present in the set.
00457                         // - The set is locked by a receive thread which is removing the closed connections.
00458                         //   When the set gets unlocked, it is empty so the value is inserted. It means the
00459                         //   value could be already in the set before it was cleared.
00460                         //   Note: with a fonction such as tryAcquire(), we could avoid to enter the mutex
00461                         //   when it is already locked
00462                         // See clearClosedConnections().
00463                         NLMISC::CSynchronized<CConnections>::CAccessor removesetsync( &_RemoveSet );
00464                         removesetsync.value().insert( sockid );
00465                         //nldebug( "LNETL1: ic: %p - RemoveSet.size(): %d", ic, removesetsync.value().size() );
00466                 }
00467 #ifdef NL_OS_UNIX
00468                 wakeUp();
00469 #endif
00470         }

void NLNET::CServerReceiveTask::clearClosedConnections  ) 
 

Delete all connections referenced in the remove list (mutexed on _RemoveSet and on _Connections).

Definition at line 1191 of file buf_server.cpp.

References _Connections, _RemoveSet, nldebug, and NLNET::TSockId.

Referenced by run().

01192 {
01193         CConnections::iterator ic;
01194         {
01195                 NLMISC::CSynchronized<CConnections>::CAccessor removesetsync( &_RemoveSet );
01196                 {
01197                         if ( ! removesetsync.value().empty() )
01198                         {
01199                                 // Delete closed connections
01200                                 NLMISC::CSynchronized<CConnections>::CAccessor connectionssync( &_Connections );
01201                                 for ( ic=removesetsync.value().begin(); ic!=removesetsync.value().end(); ++ic )
01202                                 {
01203                                         nldebug( "LNETL1: Removing a connection" );
01204 
01205                                         TSockId sid = (*ic);
01206 
01207                                         // Remove from the connection list
01208                                         connectionssync.value().erase( *ic );
01209 
01210                                         // Delete the socket object
01211                                         delete sid;
01212                                 }
01213                                 // Clear remove list
01214                                 removesetsync.value().clear();
01215                         }
01216                 }
01217         }
01218 }

bool NLNET::CServerTask::exitRequired  )  const [inline, protected, inherited]
 

Returns true if the requireExit() has been called.

Definition at line 71 of file buf_server.h.

References NLNET::CServerTask::_ExitRequired.

Referenced by run(), and NLNET::CListenTask::run().

00071 { return _ExitRequired; }

virtual void NLMISC::IRunnable::getName std::string &  result  )  const [inline, virtual, inherited]
 

Reimplemented in NL3D::CAsyncFileManager3D::CMeshLoad, NL3D::CAsyncFileManager3D::CIGLoad, NL3D::CAsyncFileManager3D::CIGLoadUser, NL3D::CAsyncFileManager3D::CTextureLoad, NL3D::CZoneLoadingTask, NLPACS::CGlobalRetriever::CLrLoader, NLMISC::CAsyncFileManager::CFileLoad, NLMISC::CAsyncFileManager::CMultipleFileLoad, and NLMISC::CAsyncFileManager::CSignal.

Definition at line 74 of file thread.h.

Referenced by NLMISC::CTaskManager::run().

00075         {
00076                 result = "NoName";
00077         }

uint NLNET::CServerReceiveTask::numberOfConnections  )  [inline]
 

Returns the number of connections handled by the thread (mutexed on _Connections).

Definition at line 421 of file buf_server.h.

References _Connections, and uint.

Referenced by NLNET::CBufServer::dispatchNewSocket().

00422         {
00423                 uint nb;
00424                 {
00425                         NLMISC::CSynchronized<CConnections>::CAccessor connectionssync( &_Connections );
00426                         nb = connectionssync.value().size();
00427                 }
00428                 return nb;
00429         }

void NLNET::CServerTask::requireExit  )  [inline, inherited]
 

Tells the task to exit.

Definition at line 56 of file buf_server.h.

References NLNET::CServerTask::_ExitRequired.

Referenced by NLNET::CBufServer::~CBufServer().

00056 { _ExitRequired = true; }

void NLNET::CServerReceiveTask::run void   )  [virtual]
 

Run.

Todo:
cado: the error code is not properly retrieved

Implements NLMISC::IRunnable.

Definition at line 987 of file buf_server.cpp.

References _Connections, NLNET::CBufSock::asString(), NLNET::CFifoAccessor, clearClosedConnections(), NLNET::CServerTask::exitRequired(), NLNET::CServerBufSock::fillSockIdAndEventType(), NLNET::NbNetworkTask, NLNET::NbServerReceiveTask, nldebug, nlnettrace, NLMISC::nlSleep(), NLNET::CNonBlockingBufSock::receivedBuffer(), NLNET::CNonBlockingBufSock::receivePart(), NLNET::CBufNetBase::receiveQueue(), res, NLNET::CBufNetBase::setDataAvailableFlag(), NLNET::TSockId, and uint8.

00988 {
00989         NbNetworkTask++;
00990         NbServerReceiveTask++;
00991         nlnettrace( "CServerReceiveTask::run" );
00992 
00993         SOCKET descmax;
00994         fd_set readers;
00995 
00996         // Time-out value for select (it can be long because we do not do any thing else in this thread)
00997         timeval tv;
00998 #if defined NL_OS_UNIX
00999         // POLL7
01000         nice( 2 );
01001 #endif // NL_OS_UNIX
01002         
01003         // Copy of _Connections
01004         vector<TSockId> connections_copy;       
01005 
01006         while ( ! exitRequired() )
01007         {
01008                 // 1. Remove closed connections
01009                 clearClosedConnections();
01010 
01011                 // POLL8
01012 
01013                 // 2-SELECT-VERSION : select() on the sockets handled in the present thread
01014 
01015                 descmax = 0;
01016                 FD_ZERO( &readers );
01017                 bool skip;
01018                 bool alldisconnected = true;
01019                 CConnections::iterator ipb;
01020                 {
01021                         // Lock _Connections
01022                         CSynchronized<CConnections>::CAccessor connectionssync( &_Connections );
01023 
01024                         // Prepare to avoid select if there is no connection
01025                         skip = connectionssync.value().empty();
01026 
01027                         // Fill the select array and copy _Connections
01028                         connections_copy.clear();
01029                         for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
01030                         {
01031                                 if ( (*ipb)->Sock->connected() ) // exclude disconnected sockets that are not deleted
01032                                 {
01033                                         alldisconnected = false;
01034                                         // Copy _Connections element
01035                                         connections_copy.push_back( *ipb );
01036 
01037                                         // Add socket descriptor to the select array
01038                                         FD_SET( (*ipb)->Sock->descriptor(), &readers );
01039 
01040                                         // Calculate descmax for select
01041                                         if ( (*ipb)->Sock->descriptor() > descmax )
01042                                         {
01043                                                 descmax = (*ipb)->Sock->descriptor();
01044                                         }
01045                                 }
01046                         }
01047 
01048 #ifdef NL_OS_UNIX
01049                         // Add the wake-up pipe into the select array
01050                         FD_SET( _WakeUpPipeHandle[PipeRead], &readers );
01051                         if ( _WakeUpPipeHandle[PipeRead]>descmax )
01052                         {
01053                                 descmax = _WakeUpPipeHandle[PipeRead];
01054                         }
01055 #endif
01056                         
01057                         // Unlock _Connections, use connections_copy instead
01058                 }
01059 
01060 #ifndef NL_OS_UNIX
01061                 // Avoid select if there is no connection (Windows only)
01062                 if ( skip || alldisconnected )
01063                 {
01064                         nlSleep( 1 ); // nice
01065                         continue;
01066                 }
01067 #endif
01068 
01069 #ifdef NL_OS_WINDOWS
01070                 tv.tv_sec = 0; // short time because the newly added connections can't be added to the select fd_set
01071                 tv.tv_usec = 10000; // NEW: set to 500ms because otherwise new connections handling are too slow
01072 #elif defined NL_OS_UNIX
01073                 // POLL7
01074                 tv.tv_sec = 3600;               // 1 hour (=> 1 select every 3.6 second for 1000 connections)
01075                 tv.tv_usec = 0;
01076 #endif // NL_OS_WINDOWS
01077 
01078                 // Call select
01079                 int res = ::select( descmax+1, &readers, NULL, NULL, &tv );
01080 
01081                 // POLL9
01082 
01083                 // 3. Test the result
01084                 switch ( res )
01085                 {
01086                         case  0 : continue; // time-out expired, no results
01087 
01089                         case -1 :
01090                                 // we'll ignore message (Interrupted system call) caused by a CTRL-C
01091                                 /*if (CSock::getLastError() == 4)
01092                                 {
01093                                         nldebug ("LNETL1: Select failed (in receive thread): %s (code %u) but IGNORED", CSock::errorString( CSock::getLastError() ).c_str(), CSock::getLastError());
01094                                         continue;
01095                                 }*/
01096                                 //nlerror( "LNETL1: Select failed (in receive thread): %s (code %u)", CSock::errorString( CSock::getLastError() ).c_str(), CSock::getLastError() );
01097                                 nldebug( "LNETL1: Select failed (in receive thread): %s (code %u)", CSock::errorString( CSock::getLastError() ).c_str(), CSock::getLastError() );
01098                                 goto end;
01099                 }
01100 
01101                 // 4. Get results
01102 
01103                 vector<TSockId>::iterator ic;
01104                 for ( ic=connections_copy.begin(); ic!=connections_copy.end(); ++ic )
01105                 {
01106                         if ( FD_ISSET( (*ic)->Sock->descriptor(), &readers ) != 0 )
01107                         {
01108                                 CServerBufSock *serverbufsock = static_cast<CServerBufSock*>(static_cast<CBufSock*>(*ic));
01109                                 try
01110                                 {
01111                                         // 4. Receive data
01112                                         if ( serverbufsock->receivePart( sizeof(TSockId) + 1 ) ) // +1 for the event type
01113                                         {
01114                                                 serverbufsock->fillSockIdAndEventType( *ic );
01115 
01116                                                 // Push message into receive queue
01117                                                 //uint32 bufsize;
01118                                                 //sint32 mbsize;
01119                                                 {
01120                                                         //nldebug( "RCV: Acquiring the receive queue... ");
01121                                                         CFifoAccessor recvfifo( &_Server->receiveQueue() );
01122                                                         //nldebug( "RCV: Acquired, pushing the received buffer... ");
01123                                                         recvfifo.value().push( serverbufsock->receivedBuffer() );
01124 
01125                                                         _Server->setDataAvailableFlag( true );
01126 
01127                                                         //nldebug( "RCV: Pushed, releasing the receive queue..." );
01128                                                         //recvfifo.value().display();
01129                                                         //bufsize = serverbufsock->receivedBuffer().size();
01130                                                         //mbsize = recvfifo.value().size() / 1048576;
01131                                                 }
01132                                                 //nldebug( "RCV: Released." );
01133                                                 /*if ( mbsize > 1 )
01134                                                 {
01135                                                         nlwarning( "The receive queue size exceeds %d MB", mbsize );
01136                                                 }*/
01137                                                 /*
01138                                                 // Statistics
01139                                                 {
01140                                                         CSynchronized<uint32>::CAccessor syncbpi ( &_Server->syncBytesPushedIn() );
01141                                                         syncbpi.value() += bufsize;
01142                                                 }
01143                                                 */
01144                                         }
01145                                 }
01146                                 catch ( ESocketConnectionClosed& )
01147                                 {
01148                                         nldebug( "LNETL1: Connection %s closed", serverbufsock->asString().c_str() );
01149                                         // The socket went to _Connected=false when throwing the exception
01150                                 }
01151                                 catch ( ESocket& )
01152                                 {
01153                                         nldebug( "LNETL1: Connection %s broken", serverbufsock->asString().c_str() );
01154                                         (*ic)->Sock->disconnect();
01155                                 }
01156 /*
01157 #ifdef NL_OS_UNIX
01158                                 skip = true; // don't check _WakeUpPipeHandle (yes, check it to read any written byte)
01159 #endif
01160 
01161 */
01162                         }
01163                 }
01164 
01165 #ifdef NL_OS_UNIX
01166                 // Test wake-up pipe
01167                 if ( (!skip) && (FD_ISSET( _WakeUpPipeHandle[PipeRead], &readers )) )
01168                 {
01169                         uint8 b;
01170                         if ( read( _WakeUpPipeHandle[PipeRead], &b, 1 ) == -1 ) // we were woken-up by the wake-up pipe
01171                         {
01172                                 nldebug( "LNETL1: In CServerReceiveTask::run(): read() failed" );
01173                         }
01174                         nldebug( "LNETL1: Receive thread select woken-up" );
01175                 }
01176 #endif
01177 
01178                 NbLoop++;
01179         }
01180 end:
01181         nlnettrace( "Exiting CServerReceiveTask::run" );
01182         NbServerReceiveTask--;
01183         NbNetworkTask--;
01184 }

CBufServer* NLNET::CServerReceiveTask::server  )  [inline]
 

Access to the server.

Definition at line 476 of file buf_server.h.

00476 { return _Server; }


Friends And Related Function Documentation

friend class CBufServer [friend]
 

Definition at line 478 of file buf_server.h.


Field Documentation

NLMISC::CSynchronized<CConnections> NLNET::CServerReceiveTask::_Connections [private]
 

Definition at line 487 of file buf_server.h.

Referenced by addNewSocket(), clearClosedConnections(), CServerReceiveTask(), NLNET::CBufServer::disconnect(), NLNET::CBufServer::displaySendQueueStat(), NLNET::CBufServer::getSendQueueSize(), numberOfConnections(), run(), NLNET::CBufServer::send(), NLNET::CBufServer::update(), and NLNET::CBufServer::~CBufServer().

NLMISC::CSynchronized<CConnections> NLNET::CServerReceiveTask::_RemoveSet [private]
 

Definition at line 490 of file buf_server.h.

Referenced by addToRemoveSet(), clearClosedConnections(), and CServerReceiveTask().

CBufServer* NLNET::CServerReceiveTask::_Server [private]
 

Definition at line 482 of file buf_server.h.

uint32 NLNET::CServerTask::NbLoop [inherited]
 

Definition at line 63 of file buf_server.h.

Referenced by NLNET::CBufServer::displayThreadStat().


The documentation for this class was generated from the following files:
Generated on Tue Mar 16 14:03:40 2004 for NeL by doxygen 1.3.6