#include <buf_server.h>
Inheritance diagram for NLNET::CServerReceiveTask:
Definition at line 410 of file buf_server.h.
Public Member Functions | |
void | addNewSocket (TSockId sockid) |
Add a new connection into this thread (mutexed on _Connections). | |
void | addToRemoveSet (TSockId sockid) |
void | clearClosedConnections () |
Delete all connections referenced in the remove list (mutexed on _RemoveSet and on _Connections). | |
CServerReceiveTask (CBufServer *server) | |
Constructor. | |
virtual void | getName (std::string &result) const |
uint | numberOfConnections () |
Returns the number of connections handled by the thread (mutexed on _Connections). | |
void | requireExit () |
Tells the task to exit. | |
virtual void | run () |
Run. | |
CBufServer * | server () |
Access to the server. | |
Data Fields | |
uint32 | NbLoop |
Protected Member Functions | |
bool | exitRequired () const |
Returns true if the requireExit() has been called. | |
Private Attributes | |
NLMISC::CSynchronized< CConnections > | _Connections |
NLMISC::CSynchronized< CConnections > | _RemoveSet |
CBufServer * | _Server |
Friends | |
class | CBufServer |
|
Constructor.
Definition at line 415 of file buf_server.h. References _Connections, and _RemoveSet.
00415 : CServerTask(), _Server(server), _Connections("CServerReceiveTask::_Connections"), _RemoveSet("CServerReceiveTask::_RemoveSet") {} |
|
Add a new connection into this thread (mutexed on _Connections).
Definition at line 432 of file buf_server.h. References _Connections, nlassert, and NLNET::TSockId. Referenced by NLNET::CBufServer::addNewThread(), and NLNET::CBufServer::dispatchNewSocket().
00433 { 00434 //nlnettrace( "CServerReceiveTask::addNewSocket" ); 00435 nlassert( sockid != InvalidSockId ); 00436 { 00437 NLMISC::CSynchronized<CConnections>::CAccessor connectionssync( &_Connections ); 00438 connectionssync.value().insert( sockid ); 00439 } 00440 // POLL3 00441 } |
|
Add connection to the remove set (mutexed on _RemoveSet) Note: you must not call this method within a mutual exclusion on _Connections, or there will be a deadlock (see clearClosedConnection()) Definition at line 449 of file buf_server.h. References _RemoveSet, nlassert, nlnettrace, and NLNET::TSockId.
00450 { 00451 nlnettrace( "CServerReceiveTask::addToRemoveSet" ); 00452 nlassert( sockid != InvalidSockId ); 00453 { 00454 // Three possibilities : 00455 // - The value is inserted into the set. 00456 // - The value is already present in the set. 00457 // - The set is locked by a receive thread which is removing the closed connections. 00458 // When the set gets unlocked, it is empty so the value is inserted. It means the 00459 // value could be already in the set before it was cleared. 00460 // Note: with a fonction such as tryAcquire(), we could avoid to enter the mutex 00461 // when it is already locked 00462 // See clearClosedConnections(). 00463 NLMISC::CSynchronized<CConnections>::CAccessor removesetsync( &_RemoveSet ); 00464 removesetsync.value().insert( sockid ); 00465 //nldebug( "LNETL1: ic: %p - RemoveSet.size(): %d", ic, removesetsync.value().size() ); 00466 } 00467 #ifdef NL_OS_UNIX 00468 wakeUp(); 00469 #endif 00470 } |
|
Delete all connections referenced in the remove list (mutexed on _RemoveSet and on _Connections).
Definition at line 1191 of file buf_server.cpp. References _Connections, _RemoveSet, nldebug, and NLNET::TSockId. Referenced by run().
01192 { 01193 CConnections::iterator ic; 01194 { 01195 NLMISC::CSynchronized<CConnections>::CAccessor removesetsync( &_RemoveSet ); 01196 { 01197 if ( ! removesetsync.value().empty() ) 01198 { 01199 // Delete closed connections 01200 NLMISC::CSynchronized<CConnections>::CAccessor connectionssync( &_Connections ); 01201 for ( ic=removesetsync.value().begin(); ic!=removesetsync.value().end(); ++ic ) 01202 { 01203 nldebug( "LNETL1: Removing a connection" ); 01204 01205 TSockId sid = (*ic); 01206 01207 // Remove from the connection list 01208 connectionssync.value().erase( *ic ); 01209 01210 // Delete the socket object 01211 delete sid; 01212 } 01213 // Clear remove list 01214 removesetsync.value().clear(); 01215 } 01216 } 01217 } 01218 } |
|
Returns true if the requireExit() has been called.
Definition at line 71 of file buf_server.h. References NLNET::CServerTask::_ExitRequired. Referenced by run(), and NLNET::CListenTask::run().
00071 { return _ExitRequired; }
|
|
Reimplemented in NL3D::CAsyncFileManager3D::CMeshLoad, NL3D::CAsyncFileManager3D::CIGLoad, NL3D::CAsyncFileManager3D::CIGLoadUser, NL3D::CAsyncFileManager3D::CTextureLoad, NL3D::CZoneLoadingTask, NLPACS::CGlobalRetriever::CLrLoader, NLMISC::CAsyncFileManager::CFileLoad, NLMISC::CAsyncFileManager::CMultipleFileLoad, and NLMISC::CAsyncFileManager::CSignal. Definition at line 74 of file thread.h. Referenced by NLMISC::CTaskManager::run().
00075 {
00076 result = "NoName";
00077 }
|
|
Returns the number of connections handled by the thread (mutexed on _Connections).
Definition at line 421 of file buf_server.h. References _Connections, and uint. Referenced by NLNET::CBufServer::dispatchNewSocket().
00422 { 00423 uint nb; 00424 { 00425 NLMISC::CSynchronized<CConnections>::CAccessor connectionssync( &_Connections ); 00426 nb = connectionssync.value().size(); 00427 } 00428 return nb; 00429 } |
|
Tells the task to exit.
Definition at line 56 of file buf_server.h. References NLNET::CServerTask::_ExitRequired. Referenced by NLNET::CBufServer::~CBufServer().
00056 { _ExitRequired = true; }
|
|
Run.
Implements NLMISC::IRunnable. Definition at line 987 of file buf_server.cpp. References _Connections, NLNET::CBufSock::asString(), NLNET::CFifoAccessor, clearClosedConnections(), NLNET::CServerTask::exitRequired(), NLNET::CServerBufSock::fillSockIdAndEventType(), NLNET::NbNetworkTask, NLNET::NbServerReceiveTask, nldebug, nlnettrace, NLMISC::nlSleep(), NLNET::CNonBlockingBufSock::receivedBuffer(), NLNET::CNonBlockingBufSock::receivePart(), NLNET::CBufNetBase::receiveQueue(), res, NLNET::CBufNetBase::setDataAvailableFlag(), NLNET::TSockId, and uint8.
00988 { 00989 NbNetworkTask++; 00990 NbServerReceiveTask++; 00991 nlnettrace( "CServerReceiveTask::run" ); 00992 00993 SOCKET descmax; 00994 fd_set readers; 00995 00996 // Time-out value for select (it can be long because we do not do any thing else in this thread) 00997 timeval tv; 00998 #if defined NL_OS_UNIX 00999 // POLL7 01000 nice( 2 ); 01001 #endif // NL_OS_UNIX 01002 01003 // Copy of _Connections 01004 vector<TSockId> connections_copy; 01005 01006 while ( ! exitRequired() ) 01007 { 01008 // 1. Remove closed connections 01009 clearClosedConnections(); 01010 01011 // POLL8 01012 01013 // 2-SELECT-VERSION : select() on the sockets handled in the present thread 01014 01015 descmax = 0; 01016 FD_ZERO( &readers ); 01017 bool skip; 01018 bool alldisconnected = true; 01019 CConnections::iterator ipb; 01020 { 01021 // Lock _Connections 01022 CSynchronized<CConnections>::CAccessor connectionssync( &_Connections ); 01023 01024 // Prepare to avoid select if there is no connection 01025 skip = connectionssync.value().empty(); 01026 01027 // Fill the select array and copy _Connections 01028 connections_copy.clear(); 01029 for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb ) 01030 { 01031 if ( (*ipb)->Sock->connected() ) // exclude disconnected sockets that are not deleted 01032 { 01033 alldisconnected = false; 01034 // Copy _Connections element 01035 connections_copy.push_back( *ipb ); 01036 01037 // Add socket descriptor to the select array 01038 FD_SET( (*ipb)->Sock->descriptor(), &readers ); 01039 01040 // Calculate descmax for select 01041 if ( (*ipb)->Sock->descriptor() > descmax ) 01042 { 01043 descmax = (*ipb)->Sock->descriptor(); 01044 } 01045 } 01046 } 01047 01048 #ifdef NL_OS_UNIX 01049 // Add the wake-up pipe into the select array 01050 FD_SET( _WakeUpPipeHandle[PipeRead], &readers ); 01051 if ( _WakeUpPipeHandle[PipeRead]>descmax ) 01052 { 01053 descmax = _WakeUpPipeHandle[PipeRead]; 01054 } 01055 #endif 01056 01057 // Unlock _Connections, use connections_copy instead 01058 } 01059 01060 #ifndef NL_OS_UNIX 01061 // Avoid select if there is no connection (Windows only) 01062 if ( skip || alldisconnected ) 01063 { 01064 nlSleep( 1 ); // nice 01065 continue; 01066 } 01067 #endif 01068 01069 #ifdef NL_OS_WINDOWS 01070 tv.tv_sec = 0; // short time because the newly added connections can't be added to the select fd_set 01071 tv.tv_usec = 10000; // NEW: set to 500ms because otherwise new connections handling are too slow 01072 #elif defined NL_OS_UNIX 01073 // POLL7 01074 tv.tv_sec = 3600; // 1 hour (=> 1 select every 3.6 second for 1000 connections) 01075 tv.tv_usec = 0; 01076 #endif // NL_OS_WINDOWS 01077 01078 // Call select 01079 int res = ::select( descmax+1, &readers, NULL, NULL, &tv ); 01080 01081 // POLL9 01082 01083 // 3. Test the result 01084 switch ( res ) 01085 { 01086 case 0 : continue; // time-out expired, no results 01087 01089 case -1 : 01090 // we'll ignore message (Interrupted system call) caused by a CTRL-C 01091 /*if (CSock::getLastError() == 4) 01092 { 01093 nldebug ("LNETL1: Select failed (in receive thread): %s (code %u) but IGNORED", CSock::errorString( CSock::getLastError() ).c_str(), CSock::getLastError()); 01094 continue; 01095 }*/ 01096 //nlerror( "LNETL1: Select failed (in receive thread): %s (code %u)", CSock::errorString( CSock::getLastError() ).c_str(), CSock::getLastError() ); 01097 nldebug( "LNETL1: Select failed (in receive thread): %s (code %u)", CSock::errorString( CSock::getLastError() ).c_str(), CSock::getLastError() ); 01098 goto end; 01099 } 01100 01101 // 4. Get results 01102 01103 vector<TSockId>::iterator ic; 01104 for ( ic=connections_copy.begin(); ic!=connections_copy.end(); ++ic ) 01105 { 01106 if ( FD_ISSET( (*ic)->Sock->descriptor(), &readers ) != 0 ) 01107 { 01108 CServerBufSock *serverbufsock = static_cast<CServerBufSock*>(static_cast<CBufSock*>(*ic)); 01109 try 01110 { 01111 // 4. Receive data 01112 if ( serverbufsock->receivePart( sizeof(TSockId) + 1 ) ) // +1 for the event type 01113 { 01114 serverbufsock->fillSockIdAndEventType( *ic ); 01115 01116 // Push message into receive queue 01117 //uint32 bufsize; 01118 //sint32 mbsize; 01119 { 01120 //nldebug( "RCV: Acquiring the receive queue... "); 01121 CFifoAccessor recvfifo( &_Server->receiveQueue() ); 01122 //nldebug( "RCV: Acquired, pushing the received buffer... "); 01123 recvfifo.value().push( serverbufsock->receivedBuffer() ); 01124 01125 _Server->setDataAvailableFlag( true ); 01126 01127 //nldebug( "RCV: Pushed, releasing the receive queue..." ); 01128 //recvfifo.value().display(); 01129 //bufsize = serverbufsock->receivedBuffer().size(); 01130 //mbsize = recvfifo.value().size() / 1048576; 01131 } 01132 //nldebug( "RCV: Released." ); 01133 /*if ( mbsize > 1 ) 01134 { 01135 nlwarning( "The receive queue size exceeds %d MB", mbsize ); 01136 }*/ 01137 /* 01138 // Statistics 01139 { 01140 CSynchronized<uint32>::CAccessor syncbpi ( &_Server->syncBytesPushedIn() ); 01141 syncbpi.value() += bufsize; 01142 } 01143 */ 01144 } 01145 } 01146 catch ( ESocketConnectionClosed& ) 01147 { 01148 nldebug( "LNETL1: Connection %s closed", serverbufsock->asString().c_str() ); 01149 // The socket went to _Connected=false when throwing the exception 01150 } 01151 catch ( ESocket& ) 01152 { 01153 nldebug( "LNETL1: Connection %s broken", serverbufsock->asString().c_str() ); 01154 (*ic)->Sock->disconnect(); 01155 } 01156 /* 01157 #ifdef NL_OS_UNIX 01158 skip = true; // don't check _WakeUpPipeHandle (yes, check it to read any written byte) 01159 #endif 01160 01161 */ 01162 } 01163 } 01164 01165 #ifdef NL_OS_UNIX 01166 // Test wake-up pipe 01167 if ( (!skip) && (FD_ISSET( _WakeUpPipeHandle[PipeRead], &readers )) ) 01168 { 01169 uint8 b; 01170 if ( read( _WakeUpPipeHandle[PipeRead], &b, 1 ) == -1 ) // we were woken-up by the wake-up pipe 01171 { 01172 nldebug( "LNETL1: In CServerReceiveTask::run(): read() failed" ); 01173 } 01174 nldebug( "LNETL1: Receive thread select woken-up" ); 01175 } 01176 #endif 01177 01178 NbLoop++; 01179 } 01180 end: 01181 nlnettrace( "Exiting CServerReceiveTask::run" ); 01182 NbServerReceiveTask--; 01183 NbNetworkTask--; 01184 } |
|
Access to the server.
Definition at line 476 of file buf_server.h.
00476 { return _Server; } |
|
Definition at line 478 of file buf_server.h. |
|
|
Definition at line 490 of file buf_server.h. Referenced by addToRemoveSet(), clearClosedConnections(), and CServerReceiveTask(). |
|
Definition at line 482 of file buf_server.h. |
|
Definition at line 63 of file buf_server.h. Referenced by NLNET::CBufServer::displayThreadStat(). |