#include <buf_server.h>
Inheritance diagram for NLNET::CServerReceiveTask:

Definition at line 410 of file buf_server.h.
Public Member Functions | |
| void | addNewSocket (TSockId sockid) |
| Add a new connection into this thread (mutexed on _Connections). | |
| void | addToRemoveSet (TSockId sockid) |
| void | clearClosedConnections () |
| Delete all connections referenced in the remove list (mutexed on _RemoveSet and on _Connections). | |
| CServerReceiveTask (CBufServer *server) | |
| Constructor. | |
| virtual void | getName (std::string &result) const |
| uint | numberOfConnections () |
| Returns the number of connections handled by the thread (mutexed on _Connections). | |
| void | requireExit () |
| Tells the task to exit. | |
| virtual void | run () |
| Run. | |
| CBufServer * | server () |
| Access to the server. | |
Data Fields | |
| uint32 | NbLoop |
Protected Member Functions | |
| bool | exitRequired () const |
| Returns true if the requireExit() has been called. | |
Private Attributes | |
| NLMISC::CSynchronized< CConnections > | _Connections |
| NLMISC::CSynchronized< CConnections > | _RemoveSet |
| CBufServer * | _Server |
Friends | |
| class | CBufServer |
|
|
Constructor.
Definition at line 415 of file buf_server.h. References _Connections, and _RemoveSet.
00415 : CServerTask(), _Server(server), _Connections("CServerReceiveTask::_Connections"), _RemoveSet("CServerReceiveTask::_RemoveSet") {} |
|
|
Add a new connection into this thread (mutexed on _Connections).
Definition at line 432 of file buf_server.h. References _Connections, nlassert, and NLNET::TSockId. Referenced by NLNET::CBufServer::addNewThread(), and NLNET::CBufServer::dispatchNewSocket().
00433 {
00434 //nlnettrace( "CServerReceiveTask::addNewSocket" );
00435 nlassert( sockid != InvalidSockId );
00436 {
00437 NLMISC::CSynchronized<CConnections>::CAccessor connectionssync( &_Connections );
00438 connectionssync.value().insert( sockid );
00439 }
00440 // POLL3
00441 }
|
|
|
Add connection to the remove set (mutexed on _RemoveSet) Note: you must not call this method within a mutual exclusion on _Connections, or there will be a deadlock (see clearClosedConnection()) Definition at line 449 of file buf_server.h. References _RemoveSet, nlassert, nlnettrace, and NLNET::TSockId.
00450 {
00451 nlnettrace( "CServerReceiveTask::addToRemoveSet" );
00452 nlassert( sockid != InvalidSockId );
00453 {
00454 // Three possibilities :
00455 // - The value is inserted into the set.
00456 // - The value is already present in the set.
00457 // - The set is locked by a receive thread which is removing the closed connections.
00458 // When the set gets unlocked, it is empty so the value is inserted. It means the
00459 // value could be already in the set before it was cleared.
00460 // Note: with a fonction such as tryAcquire(), we could avoid to enter the mutex
00461 // when it is already locked
00462 // See clearClosedConnections().
00463 NLMISC::CSynchronized<CConnections>::CAccessor removesetsync( &_RemoveSet );
00464 removesetsync.value().insert( sockid );
00465 //nldebug( "LNETL1: ic: %p - RemoveSet.size(): %d", ic, removesetsync.value().size() );
00466 }
00467 #ifdef NL_OS_UNIX
00468 wakeUp();
00469 #endif
00470 }
|
|
|
Delete all connections referenced in the remove list (mutexed on _RemoveSet and on _Connections).
Definition at line 1191 of file buf_server.cpp. References _Connections, _RemoveSet, nldebug, and NLNET::TSockId. Referenced by run().
01192 {
01193 CConnections::iterator ic;
01194 {
01195 NLMISC::CSynchronized<CConnections>::CAccessor removesetsync( &_RemoveSet );
01196 {
01197 if ( ! removesetsync.value().empty() )
01198 {
01199 // Delete closed connections
01200 NLMISC::CSynchronized<CConnections>::CAccessor connectionssync( &_Connections );
01201 for ( ic=removesetsync.value().begin(); ic!=removesetsync.value().end(); ++ic )
01202 {
01203 nldebug( "LNETL1: Removing a connection" );
01204
01205 TSockId sid = (*ic);
01206
01207 // Remove from the connection list
01208 connectionssync.value().erase( *ic );
01209
01210 // Delete the socket object
01211 delete sid;
01212 }
01213 // Clear remove list
01214 removesetsync.value().clear();
01215 }
01216 }
01217 }
01218 }
|
|
|
Returns true if the requireExit() has been called.
Definition at line 71 of file buf_server.h. References NLNET::CServerTask::_ExitRequired. Referenced by run(), and NLNET::CListenTask::run().
00071 { return _ExitRequired; }
|
|
|
Reimplemented in NL3D::CAsyncFileManager3D::CMeshLoad, NL3D::CAsyncFileManager3D::CIGLoad, NL3D::CAsyncFileManager3D::CIGLoadUser, NL3D::CAsyncFileManager3D::CTextureLoad, NL3D::CZoneLoadingTask, NLPACS::CGlobalRetriever::CLrLoader, NLMISC::CAsyncFileManager::CFileLoad, NLMISC::CAsyncFileManager::CMultipleFileLoad, and NLMISC::CAsyncFileManager::CSignal. Definition at line 74 of file thread.h. Referenced by NLMISC::CTaskManager::run().
00075 {
00076 result = "NoName";
00077 }
|
|
|
Returns the number of connections handled by the thread (mutexed on _Connections).
Definition at line 421 of file buf_server.h. References _Connections, and uint. Referenced by NLNET::CBufServer::dispatchNewSocket().
00422 {
00423 uint nb;
00424 {
00425 NLMISC::CSynchronized<CConnections>::CAccessor connectionssync( &_Connections );
00426 nb = connectionssync.value().size();
00427 }
00428 return nb;
00429 }
|
|
|
Tells the task to exit.
Definition at line 56 of file buf_server.h. References NLNET::CServerTask::_ExitRequired. Referenced by NLNET::CBufServer::~CBufServer().
00056 { _ExitRequired = true; }
|
|
|
Run.
Implements NLMISC::IRunnable. Definition at line 987 of file buf_server.cpp. References _Connections, NLNET::CBufSock::asString(), NLNET::CFifoAccessor, clearClosedConnections(), NLNET::CServerTask::exitRequired(), NLNET::CServerBufSock::fillSockIdAndEventType(), NLNET::NbNetworkTask, NLNET::NbServerReceiveTask, nldebug, nlnettrace, NLMISC::nlSleep(), NLNET::CNonBlockingBufSock::receivedBuffer(), NLNET::CNonBlockingBufSock::receivePart(), NLNET::CBufNetBase::receiveQueue(), res, NLNET::CBufNetBase::setDataAvailableFlag(), NLNET::TSockId, and uint8.
00988 {
00989 NbNetworkTask++;
00990 NbServerReceiveTask++;
00991 nlnettrace( "CServerReceiveTask::run" );
00992
00993 SOCKET descmax;
00994 fd_set readers;
00995
00996 // Time-out value for select (it can be long because we do not do any thing else in this thread)
00997 timeval tv;
00998 #if defined NL_OS_UNIX
00999 // POLL7
01000 nice( 2 );
01001 #endif // NL_OS_UNIX
01002
01003 // Copy of _Connections
01004 vector<TSockId> connections_copy;
01005
01006 while ( ! exitRequired() )
01007 {
01008 // 1. Remove closed connections
01009 clearClosedConnections();
01010
01011 // POLL8
01012
01013 // 2-SELECT-VERSION : select() on the sockets handled in the present thread
01014
01015 descmax = 0;
01016 FD_ZERO( &readers );
01017 bool skip;
01018 bool alldisconnected = true;
01019 CConnections::iterator ipb;
01020 {
01021 // Lock _Connections
01022 CSynchronized<CConnections>::CAccessor connectionssync( &_Connections );
01023
01024 // Prepare to avoid select if there is no connection
01025 skip = connectionssync.value().empty();
01026
01027 // Fill the select array and copy _Connections
01028 connections_copy.clear();
01029 for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
01030 {
01031 if ( (*ipb)->Sock->connected() ) // exclude disconnected sockets that are not deleted
01032 {
01033 alldisconnected = false;
01034 // Copy _Connections element
01035 connections_copy.push_back( *ipb );
01036
01037 // Add socket descriptor to the select array
01038 FD_SET( (*ipb)->Sock->descriptor(), &readers );
01039
01040 // Calculate descmax for select
01041 if ( (*ipb)->Sock->descriptor() > descmax )
01042 {
01043 descmax = (*ipb)->Sock->descriptor();
01044 }
01045 }
01046 }
01047
01048 #ifdef NL_OS_UNIX
01049 // Add the wake-up pipe into the select array
01050 FD_SET( _WakeUpPipeHandle[PipeRead], &readers );
01051 if ( _WakeUpPipeHandle[PipeRead]>descmax )
01052 {
01053 descmax = _WakeUpPipeHandle[PipeRead];
01054 }
01055 #endif
01056
01057 // Unlock _Connections, use connections_copy instead
01058 }
01059
01060 #ifndef NL_OS_UNIX
01061 // Avoid select if there is no connection (Windows only)
01062 if ( skip || alldisconnected )
01063 {
01064 nlSleep( 1 ); // nice
01065 continue;
01066 }
01067 #endif
01068
01069 #ifdef NL_OS_WINDOWS
01070 tv.tv_sec = 0; // short time because the newly added connections can't be added to the select fd_set
01071 tv.tv_usec = 10000; // NEW: set to 500ms because otherwise new connections handling are too slow
01072 #elif defined NL_OS_UNIX
01073 // POLL7
01074 tv.tv_sec = 3600; // 1 hour (=> 1 select every 3.6 second for 1000 connections)
01075 tv.tv_usec = 0;
01076 #endif // NL_OS_WINDOWS
01077
01078 // Call select
01079 int res = ::select( descmax+1, &readers, NULL, NULL, &tv );
01080
01081 // POLL9
01082
01083 // 3. Test the result
01084 switch ( res )
01085 {
01086 case 0 : continue; // time-out expired, no results
01087
01089 case -1 :
01090 // we'll ignore message (Interrupted system call) caused by a CTRL-C
01091 /*if (CSock::getLastError() == 4)
01092 {
01093 nldebug ("LNETL1: Select failed (in receive thread): %s (code %u) but IGNORED", CSock::errorString( CSock::getLastError() ).c_str(), CSock::getLastError());
01094 continue;
01095 }*/
01096 //nlerror( "LNETL1: Select failed (in receive thread): %s (code %u)", CSock::errorString( CSock::getLastError() ).c_str(), CSock::getLastError() );
01097 nldebug( "LNETL1: Select failed (in receive thread): %s (code %u)", CSock::errorString( CSock::getLastError() ).c_str(), CSock::getLastError() );
01098 goto end;
01099 }
01100
01101 // 4. Get results
01102
01103 vector<TSockId>::iterator ic;
01104 for ( ic=connections_copy.begin(); ic!=connections_copy.end(); ++ic )
01105 {
01106 if ( FD_ISSET( (*ic)->Sock->descriptor(), &readers ) != 0 )
01107 {
01108 CServerBufSock *serverbufsock = static_cast<CServerBufSock*>(static_cast<CBufSock*>(*ic));
01109 try
01110 {
01111 // 4. Receive data
01112 if ( serverbufsock->receivePart( sizeof(TSockId) + 1 ) ) // +1 for the event type
01113 {
01114 serverbufsock->fillSockIdAndEventType( *ic );
01115
01116 // Push message into receive queue
01117 //uint32 bufsize;
01118 //sint32 mbsize;
01119 {
01120 //nldebug( "RCV: Acquiring the receive queue... ");
01121 CFifoAccessor recvfifo( &_Server->receiveQueue() );
01122 //nldebug( "RCV: Acquired, pushing the received buffer... ");
01123 recvfifo.value().push( serverbufsock->receivedBuffer() );
01124
01125 _Server->setDataAvailableFlag( true );
01126
01127 //nldebug( "RCV: Pushed, releasing the receive queue..." );
01128 //recvfifo.value().display();
01129 //bufsize = serverbufsock->receivedBuffer().size();
01130 //mbsize = recvfifo.value().size() / 1048576;
01131 }
01132 //nldebug( "RCV: Released." );
01133 /*if ( mbsize > 1 )
01134 {
01135 nlwarning( "The receive queue size exceeds %d MB", mbsize );
01136 }*/
01137 /*
01138 // Statistics
01139 {
01140 CSynchronized<uint32>::CAccessor syncbpi ( &_Server->syncBytesPushedIn() );
01141 syncbpi.value() += bufsize;
01142 }
01143 */
01144 }
01145 }
01146 catch ( ESocketConnectionClosed& )
01147 {
01148 nldebug( "LNETL1: Connection %s closed", serverbufsock->asString().c_str() );
01149 // The socket went to _Connected=false when throwing the exception
01150 }
01151 catch ( ESocket& )
01152 {
01153 nldebug( "LNETL1: Connection %s broken", serverbufsock->asString().c_str() );
01154 (*ic)->Sock->disconnect();
01155 }
01156 /*
01157 #ifdef NL_OS_UNIX
01158 skip = true; // don't check _WakeUpPipeHandle (yes, check it to read any written byte)
01159 #endif
01160
01161 */
01162 }
01163 }
01164
01165 #ifdef NL_OS_UNIX
01166 // Test wake-up pipe
01167 if ( (!skip) && (FD_ISSET( _WakeUpPipeHandle[PipeRead], &readers )) )
01168 {
01169 uint8 b;
01170 if ( read( _WakeUpPipeHandle[PipeRead], &b, 1 ) == -1 ) // we were woken-up by the wake-up pipe
01171 {
01172 nldebug( "LNETL1: In CServerReceiveTask::run(): read() failed" );
01173 }
01174 nldebug( "LNETL1: Receive thread select woken-up" );
01175 }
01176 #endif
01177
01178 NbLoop++;
01179 }
01180 end:
01181 nlnettrace( "Exiting CServerReceiveTask::run" );
01182 NbServerReceiveTask--;
01183 NbNetworkTask--;
01184 }
|
|
|
Access to the server.
Definition at line 476 of file buf_server.h.
00476 { return _Server; }
|
|
|
Definition at line 478 of file buf_server.h. |
|
|
|
Definition at line 490 of file buf_server.h. Referenced by addToRemoveSet(), clearClosedConnections(), and CServerReceiveTask(). |
|
|
Definition at line 482 of file buf_server.h. |
|
|
Definition at line 63 of file buf_server.h. Referenced by NLNET::CBufServer::displayThreadStat(). |
1.3.6