From 0ea5fc66924303d1bf73ba283a383e2aadee02f2 Mon Sep 17 00:00:00 2001 From: neodarz Date: Sat, 11 Aug 2018 20:21:34 +0200 Subject: Initial commit --- docs/doxygen/nel/a03366.html | 849 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 849 insertions(+) create mode 100644 docs/doxygen/nel/a03366.html (limited to 'docs/doxygen/nel/a03366.html') diff --git a/docs/doxygen/nel/a03366.html b/docs/doxygen/nel/a03366.html new file mode 100644 index 00000000..312fcd7c --- /dev/null +++ b/docs/doxygen/nel/a03366.html @@ -0,0 +1,849 @@ + + +NeL: NLNET::CServerReceiveTask class Reference + + + +
+

NLNET::CServerReceiveTask Class Reference

#include <buf_server.h> +

+

Inheritance diagram for NLNET::CServerReceiveTask: +

+ +NLMISC::IRunnable +NLNET::CServerTask + +

Detailed Description

+Code of receiving threads for servers. Note: the methods locations in the classes do not correspond to the threads where they are executed, but to the data they use. +

+ +

+Definition at line 410 of file buf_server.h. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +

Public Member Functions

void addNewSocket (TSockId sockid)
 Add a new connection into this thread (mutexed on _Connections).

void addToRemoveSet (TSockId sockid)
void clearClosedConnections ()
 Delete all connections referenced in the remove list (mutexed on _RemoveSet and on _Connections).

 CServerReceiveTask (CBufServer *server)
 Constructor.

virtual void getName (std::string &result) const
uint numberOfConnections ()
 Returns the number of connections handled by the thread (mutexed on _Connections).

void requireExit ()
 Tells the task to exit.

virtual void run ()
 Run.

CBufServerserver ()
 Access to the server.


Data Fields

uint32 NbLoop

Protected Member Functions

bool exitRequired () const
 Returns true if the requireExit() has been called.


Private Attributes

NLMISC::CSynchronized< CConnections_Connections
NLMISC::CSynchronized< CConnections_RemoveSet
CBufServer_Server

Friends

class CBufServer
+


Constructor & Destructor Documentation

+

+ + + + +
+ + + + + + + + + + +
NLNET::CServerReceiveTask::CServerReceiveTask CBufServer server  )  [inline]
+
+ + + + + +
+   + + +

+Constructor. +

+ +

+Definition at line 415 of file buf_server.h. +

+References _Connections, and _RemoveSet. +

+

00415 : CServerTask(), _Server(server), _Connections("CServerReceiveTask::_Connections"), _RemoveSet("CServerReceiveTask::_RemoveSet") {}
+
+


Member Function Documentation

+

+ + + + +
+ + + + + + + + + + +
void NLNET::CServerReceiveTask::addNewSocket TSockId  sockid  )  [inline]
+
+ + + + + +
+   + + +

+Add a new connection into this thread (mutexed on _Connections). +

+ +

+Definition at line 432 of file buf_server.h. +

+References _Connections, nlassert, and NLNET::TSockId. +

+Referenced by NLNET::CBufServer::addNewThread(), and NLNET::CBufServer::dispatchNewSocket(). +

+

00433         {
+00434                 //nlnettrace( "CServerReceiveTask::addNewSocket" );
+00435                 nlassert( sockid != InvalidSockId );
+00436                 {
+00437                         NLMISC::CSynchronized<CConnections>::CAccessor connectionssync( &_Connections );
+00438                         connectionssync.value().insert( sockid );
+00439                 }
+00440                 // POLL3
+00441         }
+
+

+ + + + +
+ + + + + + + + + + +
void NLNET::CServerReceiveTask::addToRemoveSet TSockId  sockid  )  [inline]
+
+ + + + + +
+   + + +

+Add connection to the remove set (mutexed on _RemoveSet) Note: you must not call this method within a mutual exclusion on _Connections, or there will be a deadlock (see clearClosedConnection()) +

+Definition at line 449 of file buf_server.h. +

+References _RemoveSet, nlassert, nlnettrace, and NLNET::TSockId. +

+

00450         {
+00451                 nlnettrace( "CServerReceiveTask::addToRemoveSet" );
+00452                 nlassert( sockid != InvalidSockId );
+00453                 {
+00454                         // Three possibilities :
+00455                         // - The value is inserted into the set.
+00456                         // - The value is already present in the set.
+00457                         // - The set is locked by a receive thread which is removing the closed connections.
+00458                         //   When the set gets unlocked, it is empty so the value is inserted. It means the
+00459                         //   value could be already in the set before it was cleared.
+00460                         //   Note: with a fonction such as tryAcquire(), we could avoid to enter the mutex
+00461                         //   when it is already locked
+00462                         // See clearClosedConnections().
+00463                         NLMISC::CSynchronized<CConnections>::CAccessor removesetsync( &_RemoveSet );
+00464                         removesetsync.value().insert( sockid );
+00465                         //nldebug( "LNETL1: ic: %p - RemoveSet.size(): %d", ic, removesetsync.value().size() );
+00466                 }
+00467 #ifdef NL_OS_UNIX
+00468                 wakeUp();
+00469 #endif
+00470         }
+
+

+ + + + +
+ + + + + + + + + +
void NLNET::CServerReceiveTask::clearClosedConnections  ) 
+
+ + + + + +
+   + + +

+Delete all connections referenced in the remove list (mutexed on _RemoveSet and on _Connections). +

+ +

+Definition at line 1191 of file buf_server.cpp. +

+References _Connections, _RemoveSet, nldebug, and NLNET::TSockId. +

+Referenced by run(). +

+

01192 {
+01193         CConnections::iterator ic;
+01194         {
+01195                 NLMISC::CSynchronized<CConnections>::CAccessor removesetsync( &_RemoveSet );
+01196                 {
+01197                         if ( ! removesetsync.value().empty() )
+01198                         {
+01199                                 // Delete closed connections
+01200                                 NLMISC::CSynchronized<CConnections>::CAccessor connectionssync( &_Connections );
+01201                                 for ( ic=removesetsync.value().begin(); ic!=removesetsync.value().end(); ++ic )
+01202                                 {
+01203                                         nldebug( "LNETL1: Removing a connection" );
+01204 
+01205                                         TSockId sid = (*ic);
+01206 
+01207                                         // Remove from the connection list
+01208                                         connectionssync.value().erase( *ic );
+01209 
+01210                                         // Delete the socket object
+01211                                         delete sid;
+01212                                 }
+01213                                 // Clear remove list
+01214                                 removesetsync.value().clear();
+01215                         }
+01216                 }
+01217         }
+01218 }
+
+

+ + + + +
+ + + + + + + + + +
bool NLNET::CServerTask::exitRequired  )  const [inline, protected, inherited]
+
+ + + + + +
+   + + +

+Returns true if the requireExit() has been called. +

+ +

+Definition at line 71 of file buf_server.h. +

+References NLNET::CServerTask::_ExitRequired. +

+Referenced by run(), and NLNET::CListenTask::run(). +

+

00071 { return _ExitRequired; }
+
+

+ + + + +
+ + + + + + + + + + +
virtual void NLMISC::IRunnable::getName std::string &  result  )  const [inline, virtual, inherited]
+
+ + + + + +
+   + + +

+ +

+Reimplemented in NL3D::CAsyncFileManager3D::CMeshLoad, NL3D::CAsyncFileManager3D::CIGLoad, NL3D::CAsyncFileManager3D::CIGLoadUser, NL3D::CAsyncFileManager3D::CTextureLoad, NL3D::CZoneLoadingTask, NLPACS::CGlobalRetriever::CLrLoader, NLMISC::CAsyncFileManager::CFileLoad, NLMISC::CAsyncFileManager::CMultipleFileLoad, and NLMISC::CAsyncFileManager::CSignal. +

+Definition at line 74 of file thread.h. +

+Referenced by NLMISC::CTaskManager::run(). +

+

00075         {
+00076                 result = "NoName";
+00077         }
+
+

+ + + + +
+ + + + + + + + + +
uint NLNET::CServerReceiveTask::numberOfConnections  )  [inline]
+
+ + + + + +
+   + + +

+Returns the number of connections handled by the thread (mutexed on _Connections). +

+ +

+Definition at line 421 of file buf_server.h. +

+References _Connections, and uint. +

+Referenced by NLNET::CBufServer::dispatchNewSocket(). +

+

00422         {
+00423                 uint nb;
+00424                 {
+00425                         NLMISC::CSynchronized<CConnections>::CAccessor connectionssync( &_Connections );
+00426                         nb = connectionssync.value().size();
+00427                 }
+00428                 return nb;
+00429         }
+
+

+ + + + +
+ + + + + + + + + +
void NLNET::CServerTask::requireExit  )  [inline, inherited]
+
+ + + + + +
+   + + +

+Tells the task to exit. +

+ +

+Definition at line 56 of file buf_server.h. +

+References NLNET::CServerTask::_ExitRequired. +

+Referenced by NLNET::CBufServer::~CBufServer(). +

+

00056 { _ExitRequired = true; }
+
+

+ + + + +
+ + + + + + + + + + +
void NLNET::CServerReceiveTask::run void   )  [virtual]
+
+ + + + + +
+   + + +

+Run. +

+ +

+

Todo:
cado: the error code is not properly retrieved
+ +

+Implements NLMISC::IRunnable. +

+Definition at line 987 of file buf_server.cpp. +

+References _Connections, NLNET::CBufSock::asString(), NLNET::CFifoAccessor, clearClosedConnections(), NLNET::CServerTask::exitRequired(), NLNET::CServerBufSock::fillSockIdAndEventType(), NLNET::NbNetworkTask, NLNET::NbServerReceiveTask, nldebug, nlnettrace, NLMISC::nlSleep(), NLNET::CNonBlockingBufSock::receivedBuffer(), NLNET::CNonBlockingBufSock::receivePart(), NLNET::CBufNetBase::receiveQueue(), res, NLNET::CBufNetBase::setDataAvailableFlag(), NLNET::TSockId, and uint8. +

+

00988 {
+00989         NbNetworkTask++;
+00990         NbServerReceiveTask++;
+00991         nlnettrace( "CServerReceiveTask::run" );
+00992 
+00993         SOCKET descmax;
+00994         fd_set readers;
+00995 
+00996         // Time-out value for select (it can be long because we do not do any thing else in this thread)
+00997         timeval tv;
+00998 #if defined NL_OS_UNIX
+00999         // POLL7
+01000         nice( 2 );
+01001 #endif // NL_OS_UNIX
+01002         
+01003         // Copy of _Connections
+01004         vector<TSockId> connections_copy;       
+01005 
+01006         while ( ! exitRequired() )
+01007         {
+01008                 // 1. Remove closed connections
+01009                 clearClosedConnections();
+01010 
+01011                 // POLL8
+01012 
+01013                 // 2-SELECT-VERSION : select() on the sockets handled in the present thread
+01014 
+01015                 descmax = 0;
+01016                 FD_ZERO( &readers );
+01017                 bool skip;
+01018                 bool alldisconnected = true;
+01019                 CConnections::iterator ipb;
+01020                 {
+01021                         // Lock _Connections
+01022                         CSynchronized<CConnections>::CAccessor connectionssync( &_Connections );
+01023 
+01024                         // Prepare to avoid select if there is no connection
+01025                         skip = connectionssync.value().empty();
+01026 
+01027                         // Fill the select array and copy _Connections
+01028                         connections_copy.clear();
+01029                         for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
+01030                         {
+01031                                 if ( (*ipb)->Sock->connected() ) // exclude disconnected sockets that are not deleted
+01032                                 {
+01033                                         alldisconnected = false;
+01034                                         // Copy _Connections element
+01035                                         connections_copy.push_back( *ipb );
+01036 
+01037                                         // Add socket descriptor to the select array
+01038                                         FD_SET( (*ipb)->Sock->descriptor(), &readers );
+01039 
+01040                                         // Calculate descmax for select
+01041                                         if ( (*ipb)->Sock->descriptor() > descmax )
+01042                                         {
+01043                                                 descmax = (*ipb)->Sock->descriptor();
+01044                                         }
+01045                                 }
+01046                         }
+01047 
+01048 #ifdef NL_OS_UNIX
+01049                         // Add the wake-up pipe into the select array
+01050                         FD_SET( _WakeUpPipeHandle[PipeRead], &readers );
+01051                         if ( _WakeUpPipeHandle[PipeRead]>descmax )
+01052                         {
+01053                                 descmax = _WakeUpPipeHandle[PipeRead];
+01054                         }
+01055 #endif
+01056                         
+01057                         // Unlock _Connections, use connections_copy instead
+01058                 }
+01059 
+01060 #ifndef NL_OS_UNIX
+01061                 // Avoid select if there is no connection (Windows only)
+01062                 if ( skip || alldisconnected )
+01063                 {
+01064                         nlSleep( 1 ); // nice
+01065                         continue;
+01066                 }
+01067 #endif
+01068 
+01069 #ifdef NL_OS_WINDOWS
+01070                 tv.tv_sec = 0; // short time because the newly added connections can't be added to the select fd_set
+01071                 tv.tv_usec = 10000; // NEW: set to 500ms because otherwise new connections handling are too slow
+01072 #elif defined NL_OS_UNIX
+01073                 // POLL7
+01074                 tv.tv_sec = 3600;               // 1 hour (=> 1 select every 3.6 second for 1000 connections)
+01075                 tv.tv_usec = 0;
+01076 #endif // NL_OS_WINDOWS
+01077 
+01078                 // Call select
+01079                 int res = ::select( descmax+1, &readers, NULL, NULL, &tv );
+01080 
+01081                 // POLL9
+01082 
+01083                 // 3. Test the result
+01084                 switch ( res )
+01085                 {
+01086                         case  0 : continue; // time-out expired, no results
+01087 
+01089                         case -1 :
+01090                                 // we'll ignore message (Interrupted system call) caused by a CTRL-C
+01091                                 /*if (CSock::getLastError() == 4)
+01092                                 {
+01093                                         nldebug ("LNETL1: Select failed (in receive thread): %s (code %u) but IGNORED", CSock::errorString( CSock::getLastError() ).c_str(), CSock::getLastError());
+01094                                         continue;
+01095                                 }*/
+01096                                 //nlerror( "LNETL1: Select failed (in receive thread): %s (code %u)", CSock::errorString( CSock::getLastError() ).c_str(), CSock::getLastError() );
+01097                                 nldebug( "LNETL1: Select failed (in receive thread): %s (code %u)", CSock::errorString( CSock::getLastError() ).c_str(), CSock::getLastError() );
+01098                                 goto end;
+01099                 }
+01100 
+01101                 // 4. Get results
+01102 
+01103                 vector<TSockId>::iterator ic;
+01104                 for ( ic=connections_copy.begin(); ic!=connections_copy.end(); ++ic )
+01105                 {
+01106                         if ( FD_ISSET( (*ic)->Sock->descriptor(), &readers ) != 0 )
+01107                         {
+01108                                 CServerBufSock *serverbufsock = static_cast<CServerBufSock*>(static_cast<CBufSock*>(*ic));
+01109                                 try
+01110                                 {
+01111                                         // 4. Receive data
+01112                                         if ( serverbufsock->receivePart( sizeof(TSockId) + 1 ) ) // +1 for the event type
+01113                                         {
+01114                                                 serverbufsock->fillSockIdAndEventType( *ic );
+01115 
+01116                                                 // Push message into receive queue
+01117                                                 //uint32 bufsize;
+01118                                                 //sint32 mbsize;
+01119                                                 {
+01120                                                         //nldebug( "RCV: Acquiring the receive queue... ");
+01121                                                         CFifoAccessor recvfifo( &_Server->receiveQueue() );
+01122                                                         //nldebug( "RCV: Acquired, pushing the received buffer... ");
+01123                                                         recvfifo.value().push( serverbufsock->receivedBuffer() );
+01124 
+01125                                                         _Server->setDataAvailableFlag( true );
+01126 
+01127                                                         //nldebug( "RCV: Pushed, releasing the receive queue..." );
+01128                                                         //recvfifo.value().display();
+01129                                                         //bufsize = serverbufsock->receivedBuffer().size();
+01130                                                         //mbsize = recvfifo.value().size() / 1048576;
+01131                                                 }
+01132                                                 //nldebug( "RCV: Released." );
+01133                                                 /*if ( mbsize > 1 )
+01134                                                 {
+01135                                                         nlwarning( "The receive queue size exceeds %d MB", mbsize );
+01136                                                 }*/
+01137                                                 /*
+01138                                                 // Statistics
+01139                                                 {
+01140                                                         CSynchronized<uint32>::CAccessor syncbpi ( &_Server->syncBytesPushedIn() );
+01141                                                         syncbpi.value() += bufsize;
+01142                                                 }
+01143                                                 */
+01144                                         }
+01145                                 }
+01146                                 catch ( ESocketConnectionClosed& )
+01147                                 {
+01148                                         nldebug( "LNETL1: Connection %s closed", serverbufsock->asString().c_str() );
+01149                                         // The socket went to _Connected=false when throwing the exception
+01150                                 }
+01151                                 catch ( ESocket& )
+01152                                 {
+01153                                         nldebug( "LNETL1: Connection %s broken", serverbufsock->asString().c_str() );
+01154                                         (*ic)->Sock->disconnect();
+01155                                 }
+01156 /*
+01157 #ifdef NL_OS_UNIX
+01158                                 skip = true; // don't check _WakeUpPipeHandle (yes, check it to read any written byte)
+01159 #endif
+01160 
+01161 */
+01162                         }
+01163                 }
+01164 
+01165 #ifdef NL_OS_UNIX
+01166                 // Test wake-up pipe
+01167                 if ( (!skip) && (FD_ISSET( _WakeUpPipeHandle[PipeRead], &readers )) )
+01168                 {
+01169                         uint8 b;
+01170                         if ( read( _WakeUpPipeHandle[PipeRead], &b, 1 ) == -1 ) // we were woken-up by the wake-up pipe
+01171                         {
+01172                                 nldebug( "LNETL1: In CServerReceiveTask::run(): read() failed" );
+01173                         }
+01174                         nldebug( "LNETL1: Receive thread select woken-up" );
+01175                 }
+01176 #endif
+01177 
+01178                 NbLoop++;
+01179         }
+01180 end:
+01181         nlnettrace( "Exiting CServerReceiveTask::run" );
+01182         NbServerReceiveTask--;
+01183         NbNetworkTask--;
+01184 }
+
+

+ + + + +
+ + + + + + + + + +
CBufServer* NLNET::CServerReceiveTask::server  )  [inline]
+
+ + + + + +
+   + + +

+Access to the server. +

+ +

+Definition at line 476 of file buf_server.h. +

+

00476 { return _Server; }
+
+


Friends And Related Function Documentation

+

+ + + + +
+ + +
friend class CBufServer [friend] +
+
+ + + + + +
+   + + +

+ +

+Definition at line 478 of file buf_server.h.

+


Field Documentation

+

+ + + + +
+ + +
NLMISC::CSynchronized<CConnections> NLNET::CServerReceiveTask::_Connections [private] +
+
+ + + + + +
+   + + +

+ +

+Definition at line 487 of file buf_server.h. +

+Referenced by addNewSocket(), clearClosedConnections(), CServerReceiveTask(), NLNET::CBufServer::disconnect(), NLNET::CBufServer::displaySendQueueStat(), NLNET::CBufServer::getSendQueueSize(), numberOfConnections(), run(), NLNET::CBufServer::send(), NLNET::CBufServer::update(), and NLNET::CBufServer::~CBufServer().

+

+ + + + +
+ + +
NLMISC::CSynchronized<CConnections> NLNET::CServerReceiveTask::_RemoveSet [private] +
+
+ + + + + +
+   + + +

+ +

+Definition at line 490 of file buf_server.h. +

+Referenced by addToRemoveSet(), clearClosedConnections(), and CServerReceiveTask().

+

+ + + + +
+ + +
CBufServer* NLNET::CServerReceiveTask::_Server [private] +
+
+ + + + + +
+   + + +

+ +

+Definition at line 482 of file buf_server.h.

+

+ + + + +
+ + +
uint32 NLNET::CServerTask::NbLoop [inherited] +
+
+ + + + + +
+   + + +

+ +

+Definition at line 63 of file buf_server.h. +

+Referenced by NLNET::CBufServer::displayThreadStat().

+


The documentation for this class was generated from the following files: +
Generated on Tue Mar 16 14:03:40 2004 for NeL by + +doxygen +1.3.6
+ + -- cgit v1.2.1