#include <buf_server.h>
Inheritance diagram for NLNET::CBufServer:

Listening socket and accepted connetions, with packet scheme. The provided buffers are sent raw (no endianness conversion). By default, the size time trigger is disabled, the time trigger is set to 20 ms.
Where do the methods take place: send(), -> send buffer -> update(), flush() bytesSent(), newBytesSent()
receive(), dataAvailable(), <- receive buffer <- receive thread, dataAvailable(), bytesReceived(), newBytesReceived(), connection callback, disconnection callback
Nevrax France
Definition at line 156 of file buf_server.h.
Public Types | |
| enum | TEventType { User = 'U', Connection = 'C', Disconnection = 'D' } |
| Type of incoming events (max 256). More... | |
| enum | TThreadStategy { SpreadSockets, FillThreads } |
Public Member Functions | |
| uint64 | bytesReceived () const |
| Returns the number of bytes popped by receive() since the beginning. | |
| uint64 | bytesSent () const |
| Returns the number of bytes pushed by send() since the beginning. | |
| CBufServer (TThreadStategy strategy=FillThreads, uint16 max_threads=64, uint16 max_sockets_per_thread=16, bool nodelay=true, bool replaymode=false) | |
| bool | dataAvailable () |
| void | disconnect (TSockId hostid, bool quick=false) |
| void | displayReceiveQueueStat (NLMISC::CLog *log=NLMISC::InfoLog) |
| void | displaySendQueueStat (NLMISC::CLog *log=NLMISC::InfoLog, TSockId destid=InvalidSockId) |
| void | displayThreadStat (NLMISC::CLog *log=NLMISC::InfoLog) |
| bool | flush (TSockId destid) |
| uint32 | getReceiveQueueSize () |
| Returns the size of the receive queue (mutexed). | |
| uint32 | getSendQueueSize (TSockId destid) |
| const CInetAddress & | hostAddress (TSockId hostid) |
| Returns the address of the specified host. | |
| void | init (uint16 port) |
| Listens on the specified port. | |
| const CInetAddress & | listenAddress () const |
| Returns the internet address of the listening socket. | |
| uint32 | maxExpectedBlockSize () const |
| Returns the max size of the received messages (default: 2^31-1). | |
| uint32 | maxSentBlockSize () const |
| Returns the max size of the sent messages (default: 2^31-1). | |
| uint32 | nbConnections () const |
| Returns the number of connections (at the last update()). | |
| uint64 | newBytesReceived () |
| Returns the number of bytes popped by receive() since the previous call to this method. | |
| uint64 | newBytesSent () |
| Returns the number of bytes pushed by send() since the previous call to this method. | |
| void | receive (NLMISC::CMemStream &buffer, TSockId *hostid) |
| void | send (const NLMISC::CMemStream &buffer, TSockId hostid) |
| void | setConnectionCallback (TNetCallback cb, void *arg) |
| Sets callback for incoming connections (or NULL to disable callback). | |
| void | setDisconnectionCallback (TNetCallback cb, void *arg) |
| Sets callback for detecting a disconnection (or NULL to disable callback). | |
| void | setMaxExpectedBlockSize (sint32 limit) |
| void | setMaxSentBlockSize (sint32 limit) |
| void | setSizeFlushTrigger (TSockId destid, sint32 size) |
| void | setTimeFlushTrigger (TSockId destid, sint32 ms) |
| void | update () |
| Update the network (call this method evenly). | |
| virtual | ~CBufServer () |
| Destructor. | |
Protected Member Functions | |
| void | addNewThread (CThreadPool &threadpool, CServerBufSock *bufsock) |
| void * | argOfConnectionCallback () const |
| Returns the argument of the connection callback. | |
| void * | argOfDisconnectionCallback () const |
| Returns the argument of the disconnection callback. | |
| TNetCallback | connectionCallback () const |
| Returns the connection callback. | |
| volatile bool | dataAvailableFlag () const |
| Return _DataAvailable. | |
| TNetCallback | disconnectionCallback () const |
| Returns the disconnection callback. | |
| void | dispatchNewSocket (CServerBufSock *bufsock) |
| bool | noDelay () const |
| Returns the TCP_NODELAY flag. | |
| void | pushBufferToHost (const NLMISC::CMemStream &buffer, TSockId hostid) |
| Pushes a buffer to the specified host's send queue and update (unless not connected). | |
| void | pushMessageIntoReceiveQueue (const uint8 *buffer, uint32 size) |
| void | pushMessageIntoReceiveQueue (const std::vector< uint8 > &buffer) |
| Push message into receive queue (mutexed). | |
| CSynchronizedFIFO & | receiveQueue () |
| Access to the receive queue. | |
| CServerReceiveTask * | receiveTask (std::vector< NLMISC::IThread * >::iterator ipt) |
| Returns the receive task corresponding to a particular thread. | |
| void | setDataAvailableFlag (bool da) |
| Sets _DataAvailable. | |
Private Attributes | |
| uint64 | _BytesPoppedIn |
| Number of bytes popped by receive() since the beginning. | |
| uint64 | _BytesPushedOut |
| Number of bytes pushed by send() since the beginning. | |
| TNetCallback | _ConnectionCallback |
| Connection callback. | |
| void * | _ConnectionCbArg |
| Argument of the connection callback. | |
| CListenTask * | _ListenTask |
| Listen task. | |
| NLMISC::IThread * | _ListenThread |
| Listen thread. | |
| uint16 | _MaxSocketsPerThread |
| Max number of sockets handled by one thread. | |
| uint16 | _MaxThreads |
| Max number of threads. | |
| uint32 | _NbConnections |
| Number of connections (debug stat). | |
| bool | _NoDelay |
| TCP_NODELAY. | |
| uint64 | _PrevBytesPoppedIn |
| Previous number of bytes received. | |
| uint64 | _PrevBytesPushedOut |
| Previous number of bytes sent. | |
| bool | _ReplayMode |
| Replay mode flag. | |
| NLMISC::CSynchronized< CThreadPool > | _ThreadPool |
| TThreadStategy | _ThreadStrategy |
| Thread socket-handling strategy. | |
Friends | |
| class | CListenTask |
| class | CServerBufSock |
| class | CServerReceiveTask |
| class | NLNET::CBufSock |
|
|
Type of incoming events (max 256).
Definition at line 79 of file buf_net_base.h.
00079 { User = 'U', Connection = 'C', Disconnection = 'D' };
|
|
|
Definition at line 160 of file buf_server.h.
00160 { SpreadSockets, FillThreads };
|
|
||||||||||||||||||||||||
|
Constructor Set nodelay to true to disable the Nagle buffering algorithm (see CTcpSock documentation) Definition at line 58 of file buf_server.cpp. References _ListenTask, _ListenThread, _ReplayMode, CListenTask, nlnettrace, and uint16.
00059 : 00060 CBufNetBase(), 00061 _NoDelay( nodelay ), 00062 _ThreadStrategy( strategy ), 00063 _MaxThreads( max_threads ), 00064 _MaxSocketsPerThread( max_sockets_per_thread ), 00065 _ListenTask( NULL ), 00066 _ListenThread( NULL ), 00067 _ThreadPool("CBufServer::_ThreadPool"), 00068 _ConnectionCallback( NULL ), 00069 _ConnectionCbArg( NULL ), 00070 _BytesPushedOut( 0 ), 00071 _BytesPoppedIn( 0 ), 00072 _PrevBytesPoppedIn( 0 ), 00073 _PrevBytesPushedOut( 0 ), 00074 _NbConnections (0), 00075 _ReplayMode( replaymode ) 00076 { 00077 nlnettrace( "CBufServer::CBufServer" ); 00078 if ( ! _ReplayMode ) 00079 { 00080 _ListenTask = new CListenTask( this ); 00081 _ListenThread = IThread::create( _ListenTask ); 00082 } 00083 /*{ 00084 CSynchronized<uint32>::CAccessor syncbpi ( &_BytesPushedIn ); 00085 syncbpi.value() = 0; 00086 }*/ 00087 } |
|
|
Destructor.
Definition at line 166 of file buf_server.cpp. References NLNET::CServerReceiveTask::_Connections, _ListenTask, _ListenThread, _ReplayMode, _ThreadPool, NLMISC::IThread::getRunnable(), nldebug, nlnettrace, receiveTask(), NLNET::CServerTask::requireExit(), and NLMISC::IThread::wait().
00167 {
00168 nlnettrace( "CBufServer::~CBufServer" );
00169
00170 // Clean listen thread exit
00171 if ( ! _ReplayMode )
00172 {
00173 ((CListenTask*)(_ListenThread->getRunnable()))->requireExit();
00174 ((CListenTask*)(_ListenThread->getRunnable()))->close();
00175 #ifdef NL_OS_UNIX
00176 _ListenTask->wakeUp();
00177 #endif
00178 _ListenThread->wait();
00179 delete _ListenThread;
00180 delete _ListenTask;
00181
00182 // Clean receive thread exits
00183 CThreadPool::iterator ipt;
00184 {
00185 nldebug( "LNETL1: Waiting for end of threads..." );
00186 CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
00187 for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
00188 {
00189 // Tell the threads to exit and wake them up
00190 CServerReceiveTask *task = receiveTask(ipt);
00191 nlnettrace( "Requiring exit" );
00192 task->requireExit();
00193
00194 // Wake the threads up
00195 #ifdef NL_OS_UNIX
00196 task->wakeUp();
00197 #else
00198 CConnections::iterator ipb;
00199 nlnettrace( "Closing sockets (Win32)" );
00200 {
00201 CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
00202 for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
00203 {
00204 (*ipb)->Sock->close();
00205 }
00206 }
00207 #endif
00208
00209 }
00210
00211 nlnettrace( "Waiting" );
00212 for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
00213 {
00214 // Wait until the threads have exited
00215 (*ipt)->wait();
00216 }
00217
00218 nldebug( "LNETL1: Deleting sockets, tasks and threads..." );
00219 for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
00220 {
00221 // Delete the socket objects
00222 CServerReceiveTask *task = receiveTask(ipt);
00223 CConnections::iterator ipb;
00224 {
00225 CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
00226 for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
00227 {
00228 delete (*ipb); // closes and deletes the socket
00229 }
00230 }
00231
00232 // Delete the task objects
00233 delete task;
00234
00235 // Delete the thread objects
00236 delete (*ipt);
00237 }
00238 }
00239 }
00240
00241 nlnettrace( "Exiting CBufServer::~CBufServer" );
00242 }
|
|
||||||||||||
|
Definition at line 958 of file buf_server.cpp. References NLNET::CServerReceiveTask::addNewSocket(), CServerReceiveTask, NLNET::CThreadPool, nlassert, nldebug, nlnettrace, and NLNET::CServerBufSock::setOwnerTask(). Referenced by dispatchNewSocket().
00959 {
00960 nlnettrace( "CBufServer::addNewThread" );
00961 nlassert( bufsock != NULL );
00962
00963 // Create new task and dispatch the socket to it
00964 CServerReceiveTask *task = new CServerReceiveTask( this );
00965 bufsock->setOwnerTask( task );
00966 task->addNewSocket( bufsock );
00967
00968 // Add a new thread to the pool, with this task
00969 IThread *thr = IThread::create( task );
00970 {
00971 threadpool.push_back( thr );
00972 thr->start();
00973 nldebug( "LNETL1: Added a new thread; pool size is %d", threadpool.size() );
00974 nldebug( "LNETL1: New socket dispatched to thread %d", threadpool.size()-1 );
00975 }
00976 }
|
|
|
Returns the argument of the connection callback.
Definition at line 314 of file buf_server.h. References _ConnectionCbArg. Referenced by dataAvailable().
00314 { return _ConnectionCbArg; }
|
|
|
Returns the argument of the disconnection callback.
Definition at line 160 of file buf_net_base.h. References NLNET::CBufNetBase::_DisconnectionCbArg. Referenced by dataAvailable(), and NLNET::CBufClient::dataAvailable().
00160 { return _DisconnectionCbArg; }
|
|
|
Returns the number of bytes popped by receive() since the beginning.
Definition at line 255 of file buf_server.h. References _BytesPoppedIn, and uint64. Referenced by newBytesReceived().
00255 { return _BytesPoppedIn; }
|
|
|
Returns the number of bytes pushed by send() since the beginning.
Definition at line 261 of file buf_server.h. References _BytesPushedOut, and uint64. Referenced by newBytesSent().
00261 { return _BytesPushedOut; }
|
|
|
Returns the connection callback.
Definition at line 311 of file buf_server.h. References _ConnectionCallback, and NLNET::TNetCallback. Referenced by dataAvailable().
00311 { return _ConnectionCallback; }
|
|
|
Checks if there is some data to receive. Returns false if the receive queue is empty. This is where the connection/disconnection callbacks can be called. Reimplemented in NLNET::CCallbackServer. Definition at line 364 of file buf_server.cpp. References argOfConnectionCallback(), NLNET::CBufNetBase::argOfDisconnectionCallback(), NLNET::CBufSock::asString(), buffer, NLNET::CFifoAccessor, connectionCallback(), NLNET::CBufNetBase::dataAvailableFlag(), NLNET::CBufNetBase::disconnectionCallback(), nlassert, nldebug, nlerror, nlinfo, NLNET::CBufNetBase::receiveQueue(), NLNET::CBufSock::setConnectedState(), NLNET::CBufNetBase::setDataAvailableFlag(), NLMISC::stringFromVector(), NLNET::TSockId, uint16, and uint8.
00365 {
00366 // slow down the layer H_AUTO (CBufServer_dataAvailable);
00367 {
00368 /* If no data available, enter the 'while' loop and return false (1 volatile test)
00369 * If there are user data available, enter the 'while' and return true immediately (1 volatile test + 1 short locking)
00370 * If there is a connection/disconnection event (rare), call the callback and loop
00371 */
00372 while ( dataAvailableFlag() )
00373 {
00374 // Because _DataAvailable is true, the receive queue is not empty at this point
00375 vector<uint8> buffer;
00376 uint8 val;
00377 {
00378 CFifoAccessor recvfifo( &receiveQueue() );
00379 val = recvfifo.value().frontLast();
00380 if ( val != CBufNetBase::User )
00381 {
00382 recvfifo.value().front( buffer );
00383 }
00384 }
00385
00386 /*sint32 mbsize = recvfifo.value().size() / 1048576;
00387 if ( mbsize > 0 )
00388 {
00389 nlwarning( "The receive queue size exceeds %d MB", mbsize );
00390 }*/
00391
00392 /*vector<uint8> buffer;
00393 recvfifo.value().front( buffer );*/
00394
00395 // Test if it the next block is a system event
00396 //switch ( buffer[buffer.size()-1] )
00397 switch ( val )
00398 {
00399
00400 // Normal message available
00401 case CBufNetBase::User:
00402 return true; // return immediatly, do not extract the message
00403
00404 // Process disconnection event
00405 case CBufNetBase::Disconnection:
00406 {
00407 TSockId sockid = *((TSockId*)(&*buffer.begin()));
00408 nldebug( "LNETL1: Disconnection event for %p %s", sockid, sockid->asString().c_str());
00409
00410 sockid->setConnectedState( false );
00411
00412 // Call callback if needed
00413 if ( disconnectionCallback() != NULL )
00414 {
00415 disconnectionCallback()( sockid, argOfDisconnectionCallback() );
00416 }
00417
00418 // Add socket object into the synchronized remove list
00419 nldebug( "LNETL1: Adding the connection to the remove list" );
00420 nlassert( ((CServerBufSock*)sockid)->ownerTask() != NULL );
00421 ((CServerBufSock*)sockid)->ownerTask()->addToRemoveSet( sockid );
00422 break;
00423 }
00424 // Process connection event
00425 case CBufNetBase::Connection:
00426 {
00427 TSockId sockid = *((TSockId*)(&*buffer.begin()));
00428 nldebug( "LNETL1: Connection event for %p %s", sockid, sockid->asString().c_str());
00429
00430 sockid->setConnectedState( true );
00431
00432 // Call callback if needed
00433 if ( connectionCallback() != NULL )
00434 {
00435 connectionCallback()( sockid, argOfConnectionCallback() );
00436 }
00437 break;
00438 }
00439 default: // should not occur
00440 nlinfo( "LNETL1: Invalid block type: %hu (should be = to %hu", (uint16)(buffer[buffer.size()-1]), (uint16)(val) );
00441 nlinfo( "LNETL1: Buffer (%d B): [%s]", buffer.size(), stringFromVector(buffer).c_str() );
00442 nlinfo( "LNETL1: Receive queue:" );
00443 {
00444 CFifoAccessor recvfifo( &receiveQueue() );
00445 recvfifo.value().display();
00446 }
00447 nlerror( "LNETL1: Invalid system event type in server receive queue" );
00448
00449 }
00450
00451 // Extract system event
00452 {
00453 CFifoAccessor recvfifo( &receiveQueue() );
00454 recvfifo.value().pop();
00455 setDataAvailableFlag( ! recvfifo.value().empty() );
00456 }
00457 }
00458 // _DataAvailable is false here
00459 return false;
00460 }
00461 }
|
|
|
Return _DataAvailable.
Definition at line 206 of file buf_net_base.h. References NLNET::CBufNetBase::_DataAvailable. Referenced by dataAvailable(), and NLNET::CBufClient::dataAvailable().
00206 { return _DataAvailable; }
|
|
||||||||||||
|
Disconnect a connection Set hostid to InvalidSockId to disconnect all connections. If hostid is not InvalidSockId and the socket is not connected, the method does nothing. If quick is true, any pending data will not be sent before disconnecting. Definition at line 251 of file buf_server.cpp. References NLNET::CServerReceiveTask::_Connections, _ThreadPool, NLNET::CSock::connected(), NLNET::CTcpSock::disconnect(), NLNET::CBufSock::flush(), nlnettrace, receiveTask(), NLNET::CBufSock::Sock, and NLNET::TSockId.
00252 {
00253 nlnettrace( "CBufServer::disconnect" );
00254 if ( hostid != InvalidSockId )
00255 {
00256 // Disconnect only if physically connected
00257 if ( hostid->Sock->connected() )
00258 {
00259 if ( ! quick )
00260 {
00261 hostid->flush();
00262 }
00263 hostid->Sock->disconnect(); // the connection will be removed by the next call of update()
00264 }
00265 }
00266 else
00267 {
00268 // Disconnect all
00269 CThreadPool::iterator ipt;
00270 {
00271 CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
00272 for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
00273 {
00274 CServerReceiveTask *task = receiveTask(ipt);
00275 CConnections::iterator ipb;
00276 {
00277 CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
00278 for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
00279 {
00280 if ( (*ipb)->Sock->connected() )
00281 {
00282 if ( ! quick )
00283 {
00284 (*ipb)->flush();
00285 }
00286 (*ipb)->Sock->disconnect();
00287 }
00288 }
00289 }
00290 }
00291 }
00292 }
00293 }
|
|
|
Returns the disconnection callback.
Definition at line 157 of file buf_net_base.h. References NLNET::CBufNetBase::_DisconnectionCallback, and NLNET::TNetCallback. Referenced by dataAvailable(), and NLNET::CBufClient::dataAvailable().
00157 { return _DisconnectionCallback; }
|
|
|
Binds a new socket and send buffer to an existing or a new thread (that starts) Note: this method is called in the listening thread. Definition at line 866 of file buf_server.cpp. References _MaxSocketsPerThread, _MaxThreads, _ThreadPool, _ThreadStrategy, NLNET::CServerReceiveTask::addNewSocket(), addNewThread(), min, nldebug, nlnettrace, nlwarning, NLNET::CServerReceiveTask::numberOfConnections(), receiveTask(), NLNET::CServerBufSock::setOwnerTask(), SpreadSockets, and uint. Referenced by NLNET::CListenTask::run().
00867 {
00868 nlnettrace( "CBufServer::dispatchNewSocket" );
00869
00870 CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
00871 if ( _ThreadStrategy == SpreadSockets )
00872 {
00873 // Find the thread with the smallest number of connections and check if all
00874 // threads do not have the same number of connections
00875 uint min = 0xFFFFFFFF;
00876 uint max = 0;
00877 CThreadPool::iterator ipt, iptmin, iptmax;
00878 for ( iptmin=iptmax=ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
00879 {
00880 uint noc = receiveTask(ipt)->numberOfConnections();
00881 if ( noc < min )
00882 {
00883 min = noc;
00884 iptmin = ipt;
00885 }
00886 if ( noc > max )
00887 {
00888 max = noc;
00889 iptmax = ipt;
00890 }
00891 }
00892
00893 // Check if we make the pool of threads grow (if we have not found vacant room
00894 // and if it is allowed to)
00895 if ( (poolsync.value().empty()) ||
00896 ((min == max) && (poolsync.value().size() < _MaxThreads)) )
00897 {
00898 addNewThread( poolsync.value(), bufsock );
00899 }
00900 else
00901 {
00902 // Dispatch socket to an existing thread of the pool
00903 CServerReceiveTask *task = receiveTask(iptmin);
00904 bufsock->setOwnerTask( task );
00905 task->addNewSocket( bufsock );
00906 #ifdef NL_OS_UNIX
00907 task->wakeUp();
00908 #endif
00909
00910 if ( min >= (uint)_MaxSocketsPerThread )
00911 {
00912 nlwarning( "LNETL1: Exceeding the maximum number of sockets per thread" );
00913 }
00914 nldebug( "LNETL1: New socket dispatched to thread %d", iptmin-poolsync.value().begin() );
00915 }
00916
00917 }
00918 else // _ThreadStrategy == FillThreads
00919 {
00920 CThreadPool::iterator ipt;
00921 for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
00922 {
00923 uint noc = receiveTask(ipt)->numberOfConnections();
00924 if ( noc < _MaxSocketsPerThread )
00925 {
00926 break;
00927 }
00928 }
00929
00930 // Check if we have to make the thread pool grow (if we have not found vacant room)
00931 if ( ipt == poolsync.value().end() )
00932 {
00933 if ( poolsync.value().size() == _MaxThreads )
00934 {
00935 nlwarning( "LNETL1: Exceeding the maximum number of threads" );
00936 }
00937 addNewThread( poolsync.value(), bufsock );
00938 }
00939 else
00940 {
00941 // Dispatch socket to an existing thread of the pool
00942 CServerReceiveTask *task = receiveTask(ipt);
00943 bufsock->setOwnerTask( task );
00944 task->addNewSocket( bufsock );
00945 #ifdef NL_OS_UNIX
00946 task->wakeUp();
00947 #endif
00948 nldebug( "LNETL1: New socket dispatched to thread %d", ipt-poolsync.value().begin() );
00949 }
00950 }
00951 }
|
|
|
Reimplemented in NLNET::CCallbackClient, and NLNET::CCallbackServer. Definition at line 94 of file buf_net_base.h. References NLNET::CBufNetBase::_RecvFifo, and NLNET::CFifoAccessor.
00095 {
00096 CFifoAccessor syncfifo( &_RecvFifo );
00097 syncfifo.value().displayStats(log);
00098 }
|
|
||||||||||||
|
Reimplemented in NLNET::CCallbackServer. Definition at line 713 of file buf_server.cpp. References NLNET::CServerReceiveTask::_Connections, _ThreadPool, NLMISC::CBufFIFO::displayStats(), receiveTask(), NLNET::CBufSock::SendFifo, and NLNET::TSockId.
00714 {
00715 if ( destid != InvalidSockId )
00716 {
00717 destid->SendFifo.displayStats(log);
00718 }
00719 else
00720 {
00721 // add all client buffers
00722
00723 // For each thread
00724 CThreadPool::iterator ipt;
00725 {
00726 CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
00727 for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
00728 {
00729 // For each thread of the pool
00730 CServerReceiveTask *task = receiveTask(ipt);
00731 CConnections::iterator ipb;
00732 {
00733 CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
00734 for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
00735 {
00736 // For each socket of the thread, update sending
00737 (*ipb)->SendFifo.displayStats(log);
00738 }
00739 }
00740 }
00741 }
00742 }
00743 }
|
|
|
Reimplemented in NLNET::CCallbackServer. Definition at line 695 of file buf_server.cpp. References _ListenTask, _ThreadPool, NLMISC::CLog::displayNL(), NLNET::CServerTask::NbLoop, and receiveTask().
00696 {
00697 // For each thread
00698 CThreadPool::iterator ipt;
00699 {
00700 CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
00701 for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
00702 {
00703 // For each thread of the pool
00704 CServerReceiveTask *task = receiveTask(ipt);
00705 // For each socket of the thread, update sending
00706 log->displayNL ("server receive thread %p nbloop %d", task, task->NbLoop);
00707 }
00708 }
00709
00710 log->displayNL ("server listen thread %p nbloop %d", _ListenTask, _ListenTask->NbLoop);
00711 }
|
|
|
Force to send all data pending in the send queue.
Reimplemented in NLNET::CCallbackServer. Definition at line 232 of file buf_server.h. References NLNET::CBufSock::flush(), nlassert, and NLNET::TSockId.
00232 { nlassert( destid != InvalidSockId ); return destid->flush(); }
|
|
|
Returns the size of the receive queue (mutexed).
Reimplemented in NLNET::CCallbackClient, and NLNET::CCallbackServer. Definition at line 88 of file buf_net_base.h. References NLNET::CBufNetBase::_RecvFifo, NLNET::CFifoAccessor, and uint32.
00089 {
00090 CFifoAccessor syncfifo( &_RecvFifo );
00091 return syncfifo.value().size();
00092 }
|
|
|
Definition at line 660 of file buf_server.cpp. References NLNET::CServerReceiveTask::_Connections, _ThreadPool, receiveTask(), NLNET::CBufSock::SendFifo, NLMISC::CBufFIFO::size(), NLNET::TSockId, and uint32.
00661 {
00662 if ( destid != InvalidSockId )
00663 {
00664 return destid->SendFifo.size();
00665 }
00666 else
00667 {
00668 // add all client buffers
00669
00670 uint32 total = 0;
00671
00672 // For each thread
00673 CThreadPool::iterator ipt;
00674 {
00675 CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
00676 for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
00677 {
00678 // For each thread of the pool
00679 CServerReceiveTask *task = receiveTask(ipt);
00680 CConnections::iterator ipb;
00681 {
00682 CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
00683 for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
00684 {
00685 // For each socket of the thread, update sending
00686 total = (*ipb)->SendFifo.size ();
00687 }
00688 }
00689 }
00690 }
00691 return total;
00692 }
00693 }
|
|
|
Returns the address of the specified host.
Reimplemented in NLNET::CCallbackServer. Definition at line 241 of file buf_server.h. References nlassert, NLNET::CSock::remoteAddr(), NLNET::CBufSock::Sock, and NLNET::TSockId.
00241 { nlassert( hostid != InvalidSockId ); return hostid->Sock->remoteAddr(); }
|
|
|
Listens on the specified port.
Definition at line 93 of file buf_server.cpp. References _ListenTask, _ListenThread, _ReplayMode, NLNET::CListenTask::init(), NLNET::CBufNetBase::maxExpectedBlockSize(), nldebug, nlnettrace, NLMISC::IThread::start(), and uint16. Referenced by NLNET::CNetManager::addServer(), and NLNET::CUnifiedNetwork::init().
00094 {
00095 nlnettrace( "CBufServer::init" );
00096 if ( ! _ReplayMode )
00097 {
00098 _ListenTask->init( port, maxExpectedBlockSize() );
00099 _ListenThread->start();
00100 }
00101 else
00102 {
00103 nldebug( "LNETL1: Binding listen socket to any address, port %hu", port );
00104 }
00105 }
|
|
|
Returns the internet address of the listening socket.
Definition at line 238 of file buf_server.h. References _ListenTask, and NLNET::CListenTask::localAddr(). Referenced by NLNET::CLoginServer::init(), NLNET::NLMISC_DYNVARIABLE(), and NLNET::setListenAddress().
00238 { return _ListenTask->localAddr(); }
|
|
|
Returns the max size of the received messages (default: 2^31-1).
Definition at line 135 of file buf_net_base.h. References NLNET::CBufNetBase::_MaxExpectedBlockSize, and uint32. Referenced by NLNET::CBufClient::connect(), and init().
00136 {
00137 return _MaxExpectedBlockSize;
00138 }
|
|
|
Returns the max size of the sent messages (default: 2^31-1).
Definition at line 141 of file buf_net_base.h. References NLNET::CBufNetBase::_MaxSentBlockSize, and uint32. Referenced by send(), and NLNET::CBufClient::send().
00142 {
00143 return _MaxSentBlockSize;
00144 }
|
|
|
Returns the number of connections (at the last update()).
Definition at line 267 of file buf_server.h. References _NbConnections, and uint32. Referenced by NLNET::CCallbackServer::send().
00267 { return _NbConnections; }
|
|
|
Returns the number of bytes popped by receive() since the previous call to this method.
Definition at line 749 of file buf_server.cpp. References _PrevBytesPoppedIn, bytesReceived(), and uint64.
00750 {
00751 uint64 b = bytesReceived();
00752 uint64 nbrecvd = b - _PrevBytesPoppedIn;
00753 //nlinfo( "b: %"NL_I64"u new: %"NL_I64"u", b, nbrecvd );
00754 _PrevBytesPoppedIn = b;
00755 return nbrecvd;
00756 }
|
|
|
Returns the number of bytes pushed by send() since the previous call to this method.
Definition at line 761 of file buf_server.cpp. References _PrevBytesPushedOut, bytesSent(), and uint64.
00762 {
00763 uint64 b = bytesSent();
00764 uint64 nbsent = b - _PrevBytesPushedOut;
00765 //nlinfo( "b: %"NL_I64"u new: %"NL_I64"u", b, nbsent );
00766 _PrevBytesPushedOut = b;
00767 return nbsent;
00768 }
|
|
|
Returns the TCP_NODELAY flag.
Definition at line 276 of file buf_server.h. Referenced by NLNET::CListenTask::run().
00276 { return _NoDelay; }
|
|
||||||||||||
|
Pushes a buffer to the specified host's send queue and update (unless not connected).
Definition at line 298 of file buf_server.h. References _BytesPushedOut, buffer, nlassert, NLNET::CBufSock::pushBuffer(), NLNET::TBlockSize, and NLNET::TSockId. Referenced by send().
00299 {
00300 nlassert( hostid != InvalidSockId );
00301 if ( hostid->pushBuffer( buffer ) )
00302 {
00303 _BytesPushedOut += buffer.length() + sizeof(TBlockSize); // statistics
00304 }
00305 }
|
|
||||||||||||
|
Definition at line 183 of file buf_net_base.h. References NLNET::CBufNetBase::_RecvFifo, buffer, NLNET::CFifoAccessor, NLNET::CBufNetBase::setDataAvailableFlag(), size, uint32, and uint8.
00184 {
00185 //sint32 mbsize;
00186 {
00187 //nldebug( "BNB: Acquiring the receive queue... ");
00188 CFifoAccessor recvfifo( &_RecvFifo );
00189 //nldebug( "BNB: Acquired, pushing the received buffer... ");
00190 recvfifo.value().push( buffer, size );
00191 //nldebug( "BNB: Pushed, releasing the receive queue..." );
00192 //mbsize = recvfifo.value().size() / 1048576;
00193 setDataAvailableFlag( true );
00194 }
00195 //nldebug( "BNB: Released." );
00196 /*if ( mbsize > 1 )
00197 {
00198 nlwarning( "The receive queue size exceeds %d MB", mbsize );
00199 }*/
00200 }
|
|
|
Push message into receive queue (mutexed).
Definition at line 164 of file buf_net_base.h. References NLNET::CBufNetBase::_RecvFifo, buffer, NLNET::CFifoAccessor, and NLNET::CBufNetBase::setDataAvailableFlag(). Referenced by NLNET::CBufSock::advertiseSystemEvent(), and NLNET::CClientReceiveTask::run().
00165 {
00166 //sint32 mbsize;
00167 {
00168 //nldebug( "BNB: Acquiring the receive queue... ");
00169 CFifoAccessor recvfifo( &_RecvFifo );
00170 //nldebug( "BNB: Acquired, pushing the received buffer... ");
00171 recvfifo.value().push( buffer );
00172 //nldebug( "BNB: Pushed, releasing the receive queue..." );
00173 //mbsize = recvfifo.value().size() / 1048576;
00174 setDataAvailableFlag( true );
00175 }
00176 //nldebug( "BNB: Released." );
00177 //if ( mbsize > 1 )
00178 //{
00179 // nlwarning( "The receive queue size exceeds %d MB", mbsize );
00180 //}
00181 }
|
|
||||||||||||
|
Receives next block of data in the specified (resizes the vector) You must call dataAvailable() before every call to receive() Definition at line 565 of file buf_server.cpp. References _BytesPoppedIn, buffer, NLNET::CFifoAccessor, nlassert, NLMISC_BSWAP32, nlnettrace, nlwarning, NLNET::CBufNetBase::receiveQueue(), NLNET::CBufNetBase::setDataAvailableFlag(), NLNET::TBlockSize, NLNET::TSockId, and uint32.
00566 {
00567 nlnettrace( "CBufServer::receive" );
00568 //nlassert( dataAvailable() );
00569 nlassert( phostid != NULL );
00570 {
00571 CFifoAccessor recvfifo( &receiveQueue() );
00572 nlassert( ! recvfifo.value().empty() );
00573 recvfifo.value().front( buffer );
00574 recvfifo.value().pop();
00575 setDataAvailableFlag( ! recvfifo.value().empty() );
00576 }
00577
00578 // Extract hostid (and event type)
00579 *phostid = *((TSockId*)&(buffer.buffer()[buffer.size()-sizeof(TSockId)-1]));
00580 nlassert( buffer.buffer()[buffer.size()-1] == CBufNetBase::User );
00581
00582 // debug features, we number all packet to be sure that they are all sent and received
00583 // \todo remove this debug feature when ok
00584 #ifdef NL_BIG_ENDIAN
00585 uint32 val = NLMISC_BSWAP32(*(uint32*)buffer.buffer());
00586 #else
00587 uint32 val = *(uint32*)buffer.buffer();
00588 #endif
00589
00590 // nldebug ("receive message number %u", val);
00591 if ((*phostid)->ReceiveNextValue != val)
00592 {
00593 nlwarning ("LNETL1: !!!LOST A MESSAGE!!! I received the message number %u but I'm waiting the message number %u (cnx %s), warn lecroart@nevrax.com with the log now please", val, (*phostid)->ReceiveNextValue, (*phostid)->asString().c_str());
00594 // resync the message number
00595 (*phostid)->ReceiveNextValue = val;
00596 }
00597
00598 (*phostid)->ReceiveNextValue++;
00599
00600 buffer.resize( buffer.size()-sizeof(TSockId)-1 );
00601
00602 // TODO OPTIM remove the nldebug for speed
00603 //commented for optimisation nldebug( "LNETL1: Read buffer (%d+%d B) from %s", buffer.size(), sizeof(TSockId)+1, /*stringFromVector(buffer).c_str(), */(*phostid)->asString().c_str() );
00604
00605 // Statistics
00606 _BytesPoppedIn += buffer.size() + sizeof(TBlockSize);
00607 }
|
|
|
Access to the receive queue.
Definition at line 154 of file buf_net_base.h. References NLNET::CBufNetBase::_RecvFifo, and NLNET::CSynchronizedFIFO. Referenced by dataAvailable(), NLNET::CBufClient::dataAvailable(), NLNET::CBufClient::disconnect(), receive(), NLNET::CBufClient::receive(), and NLNET::CServerReceiveTask::run().
00154 { return _RecvFifo; }
|
|
|
Returns the receive task corresponding to a particular thread.
Definition at line 284 of file buf_server.h. Referenced by disconnect(), dispatchNewSocket(), displaySendQueueStat(), displayThreadStat(), getSendQueueSize(), send(), update(), and ~CBufServer().
00285 {
00286 return ((CServerReceiveTask*)((*ipt)->getRunnable()));
00287 }
|
|
||||||||||||
|
Send a message to the specified host, or to all hosts if hostid is InvalidSockId Reimplemented in NLNET::CCallbackServer. Definition at line 299 of file buf_server.cpp. References NLNET::CServerReceiveTask::_Connections, _ThreadPool, buffer, NLNET::CBufNetBase::maxSentBlockSize(), nlassert, nlassertex, NLMISC_BSWAP32, nlnettrace, pushBufferToHost(), receiveTask(), NLNET::CBufSock::SendNextValue, NLNET::TSockId, and uint32.
00300 {
00301 nlnettrace( "CBufServer::send" );
00302 nlassert( buffer.length() > 0 );
00303 nlassertex( buffer.length() <= maxSentBlockSize(), ("length=%u max=%u", buffer.length(), maxSentBlockSize()) );
00304
00305 // slow down the layer H_AUTO (CBufServer_send);
00306
00307 if ( hostid != InvalidSockId )
00308 {
00309 // debug features, we number all packet to be sure that they are all sent and received
00310 // \todo remove this debug feature when ok
00311 // nldebug ("send message number %u", hostid->SendNextValue);
00312 #ifdef NL_BIG_ENDIAN
00313 uint32 val = NLMISC_BSWAP32(hostid->SendNextValue);
00314 #else
00315 uint32 val = hostid->SendNextValue;
00316 #endif
00317
00318 *(uint32*)buffer.buffer() = val;
00319 hostid->SendNextValue++;
00320
00321 pushBufferToHost( buffer, hostid );
00322 }
00323 else
00324 {
00325 // Push into all send queues
00326 CThreadPool::iterator ipt;
00327 {
00328 CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
00329 for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
00330 {
00331 CServerReceiveTask *task = receiveTask(ipt);
00332 CConnections::iterator ipb;
00333 {
00334 CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
00335 for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
00336 {
00337 // Send only if the socket is logically connected
00338 if ( (*ipb)->connectedState() )
00339 {
00340 // debug features, we number all packet to be sure that they are all sent and received
00341 // \todo remove this debug feature when ok
00342 // nldebug ("send message number %u", (*ipb)->SendNextValue);
00343 #ifdef NL_BIG_ENDIAN
00344 uint32 val = NLMISC_BSWAP32((*ipb)->SendNextValue);
00345 #else
00346 uint32 val = (*ipb)->SendNextValue;
00347 #endif
00348 *(uint32*)buffer.buffer() = val;
00349 (*ipb)->SendNextValue++;
00350
00351 pushBufferToHost( buffer, *ipb );
00352 }
00353 }
00354 }
00355 }
00356 }
00357 }
00358 }
|
|
||||||||||||
|
Sets callback for incoming connections (or NULL to disable callback).
Reimplemented in NLNET::CCallbackServer. Definition at line 184 of file buf_server.h. References _ConnectionCallback, _ConnectionCbArg, and NLNET::TNetCallback.
00184 { _ConnectionCallback = cb; _ConnectionCbArg = arg; }
|
|
|
Sets _DataAvailable.
Definition at line 203 of file buf_net_base.h. References NLNET::CBufNetBase::_DataAvailable. Referenced by dataAvailable(), NLNET::CBufClient::dataAvailable(), NLNET::CBufClient::disconnect(), NLNET::CBufNetBase::pushMessageIntoReceiveQueue(), receive(), NLNET::CBufClient::receive(), and NLNET::CServerReceiveTask::run().
00203 { _DataAvailable = da; }
|
|
||||||||||||
|
Sets callback for detecting a disconnection (or NULL to disable callback).
Reimplemented in NLNET::CCallbackClient, and NLNET::CCallbackServer. Definition at line 85 of file buf_net_base.h. References NLNET::CBufNetBase::_DisconnectionCallback, NLNET::CBufNetBase::_DisconnectionCbArg, and NLNET::TNetCallback.
00085 { _DisconnectionCallback = cb; _DisconnectionCbArg = arg; }
|
|
|
Sets the max size of the received messages. If receiving a message bigger than the limit, the connection will be dropped. Default value: 1 MegaByte If you put a negative number as limit, the max size is reset to the default value. Warning: you can call this method only at initialization time, before connecting (for a client) or calling init() (for a server) ! Definition at line 109 of file buf_net_base.h. References NLNET::CBufNetBase::_MaxExpectedBlockSize, sint32, and uint32.
00110 {
00111 if ( limit < 0 )
00112 _MaxExpectedBlockSize = 1048576;
00113 else
00114 _MaxExpectedBlockSize = (uint32)limit;
00115 }
|
|
|
Sets the max size of the sent messages. Note: Limiting of sending not implemented, currently Default value: 1 MegaByte If you put a negative number as limit, the max size is reset to the default value. Warning: you can call this method only at initialization time, before connecting (for a client) or calling init() (for a server) ! Definition at line 126 of file buf_net_base.h. References NLNET::CBufNetBase::_MaxSentBlockSize, sint32, and uint32.
00127 {
00128 if ( limit < 0 )
00129 _MaxSentBlockSize = 1048576;
00130 else
00131 _MaxSentBlockSize = (uint32)limit;
00132 }
|
|
||||||||||||
|
Sets the size flush trigger. When the size of the send queue reaches or exceeds this value, all data in the send queue is automatically sent (-1 to disable this trigger ) Definition at line 226 of file buf_server.h. References nlassert, NLNET::CBufSock::setSizeFlushTrigger(), sint32, size, and NLNET::TSockId.
00226 { nlassert( destid != InvalidSockId ); destid->setSizeFlushTrigger( size ); }
|
|
||||||||||||
|
Sets the time flush trigger (in millisecond). When this time is elapsed, all data in the send queue is automatically sent (-1 to disable this trigger) Definition at line 221 of file buf_server.h. References nlassert, NLNET::CBufSock::setTimeFlushTrigger(), sint32, and NLNET::TSockId.
00221 { nlassert( destid != InvalidSockId ); destid->setTimeFlushTrigger( ms ); }
|
|
|
Update the network (call this method evenly).
Definition at line 613 of file buf_server.cpp. References NLNET::CServerReceiveTask::_Connections, _NbConnections, _ThreadPool, nldebug, and receiveTask().
00614 {
00615 //nlnettrace( "CBufServer::update-BEGIN" );
00616
00617 _NbConnections = 0;
00618
00619 // For each thread
00620 CThreadPool::iterator ipt;
00621 {
00622 //nldebug( "UPD: Acquiring the Thread Pool" );
00623 CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
00624 //nldebug( "UPD: Acquired." );
00625 for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
00626 {
00627 // For each thread of the pool
00628 CServerReceiveTask *task = receiveTask(ipt);
00629 CConnections::iterator ipb;
00630 {
00631 CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
00632 for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
00633 {
00634 // For each socket of the thread, update sending
00635 if ( ! ((*ipb)->Sock->connected() && (*ipb)->update()) )
00636 {
00637 // Update did not work or the socket is not connected anymore
00638 nldebug( "LNETL1: Socket %s is disconnected", (*ipb)->asString().c_str() );
00639 // Disconnection event if disconnected (known either from flush (in update) or when receiving data)
00640 (*ipb)->advertiseDisconnection( this, *ipb );
00641
00642 /*if ( (*ipb)->advertiseDisconnection( this, *ipb ) )
00643 {
00644 // Now the connection removal is in dataAvailable()
00645 // POLL6
00646 }*/
00647 }
00648 else
00649 {
00650 _NbConnections++;
00651 }
00652 }
00653 }
00654 }
00655 }
00656
00657 //nlnettrace( "CBufServer::update-END" );
00658 }
|
|
|
Definition at line 272 of file buf_server.h. Referenced by CBufServer(). |
|
|
Definition at line 271 of file buf_server.h. |
|
|
Definition at line 273 of file buf_server.h. Referenced by addNewThread(). |
|
|
Definition at line 148 of file buf_net_base.h. |
|
|
Number of bytes popped by receive() since the beginning.
Definition at line 358 of file buf_server.h. Referenced by bytesReceived(), and receive(). |
|
|
Number of bytes pushed by send() since the beginning.
Definition at line 355 of file buf_server.h. Referenced by bytesSent(), and pushBufferToHost(). |
|
|
Connection callback.
Reimplemented in NLNET::CCallbackServer. Definition at line 349 of file buf_server.h. Referenced by connectionCallback(), and setConnectionCallback(). |
|
|
Argument of the connection callback.
Reimplemented in NLNET::CCallbackServer. Definition at line 352 of file buf_server.h. Referenced by argOfConnectionCallback(), and setConnectionCallback(). |
|
|
Listen task.
Definition at line 335 of file buf_server.h. Referenced by CBufServer(), displayThreadStat(), init(), listenAddress(), and ~CBufServer(). |
|
|
Listen thread.
Definition at line 338 of file buf_server.h. Referenced by CBufServer(), init(), and ~CBufServer(). |
|
|
Max number of sockets handled by one thread.
Definition at line 332 of file buf_server.h. Referenced by dispatchNewSocket(). |
|
|
Max number of threads.
Definition at line 329 of file buf_server.h. Referenced by dispatchNewSocket(). |
|
|
Number of connections (debug stat).
Definition at line 367 of file buf_server.h. Referenced by nbConnections(), and update(). |
|
|
TCP_NODELAY.
Definition at line 323 of file buf_server.h. |
|
|
Previous number of bytes received.
Definition at line 361 of file buf_server.h. Referenced by newBytesReceived(). |
|
|
Previous number of bytes sent.
Definition at line 364 of file buf_server.h. Referenced by newBytesSent(). |
|
|
Replay mode flag.
Definition at line 370 of file buf_server.h. Referenced by CBufServer(), init(), and ~CBufServer(). |
|
|
Definition at line 346 of file buf_server.h. Referenced by disconnect(), dispatchNewSocket(), displaySendQueueStat(), displayThreadStat(), getSendQueueSize(), send(), update(), and ~CBufServer(). |
|
|
Thread socket-handling strategy.
Definition at line 326 of file buf_server.h. Referenced by dispatchNewSocket(). |
1.3.6