#include <buf_server.h>
Inheritance diagram for NLNET::CBufServer:
Listening socket and accepted connetions, with packet scheme. The provided buffers are sent raw (no endianness conversion). By default, the size time trigger is disabled, the time trigger is set to 20 ms.
Where do the methods take place: send(), -> send buffer -> update(), flush() bytesSent(), newBytesSent()
receive(), dataAvailable(), <- receive buffer <- receive thread, dataAvailable(), bytesReceived(), newBytesReceived(), connection callback, disconnection callback
Nevrax France
Definition at line 156 of file buf_server.h.
Public Types | |
enum | TEventType { User = 'U', Connection = 'C', Disconnection = 'D' } |
Type of incoming events (max 256). More... | |
enum | TThreadStategy { SpreadSockets, FillThreads } |
Public Member Functions | |
uint64 | bytesReceived () const |
Returns the number of bytes popped by receive() since the beginning. | |
uint64 | bytesSent () const |
Returns the number of bytes pushed by send() since the beginning. | |
CBufServer (TThreadStategy strategy=FillThreads, uint16 max_threads=64, uint16 max_sockets_per_thread=16, bool nodelay=true, bool replaymode=false) | |
bool | dataAvailable () |
void | disconnect (TSockId hostid, bool quick=false) |
void | displayReceiveQueueStat (NLMISC::CLog *log=NLMISC::InfoLog) |
void | displaySendQueueStat (NLMISC::CLog *log=NLMISC::InfoLog, TSockId destid=InvalidSockId) |
void | displayThreadStat (NLMISC::CLog *log=NLMISC::InfoLog) |
bool | flush (TSockId destid) |
uint32 | getReceiveQueueSize () |
Returns the size of the receive queue (mutexed). | |
uint32 | getSendQueueSize (TSockId destid) |
const CInetAddress & | hostAddress (TSockId hostid) |
Returns the address of the specified host. | |
void | init (uint16 port) |
Listens on the specified port. | |
const CInetAddress & | listenAddress () const |
Returns the internet address of the listening socket. | |
uint32 | maxExpectedBlockSize () const |
Returns the max size of the received messages (default: 2^31-1). | |
uint32 | maxSentBlockSize () const |
Returns the max size of the sent messages (default: 2^31-1). | |
uint32 | nbConnections () const |
Returns the number of connections (at the last update()). | |
uint64 | newBytesReceived () |
Returns the number of bytes popped by receive() since the previous call to this method. | |
uint64 | newBytesSent () |
Returns the number of bytes pushed by send() since the previous call to this method. | |
void | receive (NLMISC::CMemStream &buffer, TSockId *hostid) |
void | send (const NLMISC::CMemStream &buffer, TSockId hostid) |
void | setConnectionCallback (TNetCallback cb, void *arg) |
Sets callback for incoming connections (or NULL to disable callback). | |
void | setDisconnectionCallback (TNetCallback cb, void *arg) |
Sets callback for detecting a disconnection (or NULL to disable callback). | |
void | setMaxExpectedBlockSize (sint32 limit) |
void | setMaxSentBlockSize (sint32 limit) |
void | setSizeFlushTrigger (TSockId destid, sint32 size) |
void | setTimeFlushTrigger (TSockId destid, sint32 ms) |
void | update () |
Update the network (call this method evenly). | |
virtual | ~CBufServer () |
Destructor. | |
Protected Member Functions | |
void | addNewThread (CThreadPool &threadpool, CServerBufSock *bufsock) |
void * | argOfConnectionCallback () const |
Returns the argument of the connection callback. | |
void * | argOfDisconnectionCallback () const |
Returns the argument of the disconnection callback. | |
TNetCallback | connectionCallback () const |
Returns the connection callback. | |
volatile bool | dataAvailableFlag () const |
Return _DataAvailable. | |
TNetCallback | disconnectionCallback () const |
Returns the disconnection callback. | |
void | dispatchNewSocket (CServerBufSock *bufsock) |
bool | noDelay () const |
Returns the TCP_NODELAY flag. | |
void | pushBufferToHost (const NLMISC::CMemStream &buffer, TSockId hostid) |
Pushes a buffer to the specified host's send queue and update (unless not connected). | |
void | pushMessageIntoReceiveQueue (const uint8 *buffer, uint32 size) |
void | pushMessageIntoReceiveQueue (const std::vector< uint8 > &buffer) |
Push message into receive queue (mutexed). | |
CSynchronizedFIFO & | receiveQueue () |
Access to the receive queue. | |
CServerReceiveTask * | receiveTask (std::vector< NLMISC::IThread * >::iterator ipt) |
Returns the receive task corresponding to a particular thread. | |
void | setDataAvailableFlag (bool da) |
Sets _DataAvailable. | |
Private Attributes | |
uint64 | _BytesPoppedIn |
Number of bytes popped by receive() since the beginning. | |
uint64 | _BytesPushedOut |
Number of bytes pushed by send() since the beginning. | |
TNetCallback | _ConnectionCallback |
Connection callback. | |
void * | _ConnectionCbArg |
Argument of the connection callback. | |
CListenTask * | _ListenTask |
Listen task. | |
NLMISC::IThread * | _ListenThread |
Listen thread. | |
uint16 | _MaxSocketsPerThread |
Max number of sockets handled by one thread. | |
uint16 | _MaxThreads |
Max number of threads. | |
uint32 | _NbConnections |
Number of connections (debug stat). | |
bool | _NoDelay |
TCP_NODELAY. | |
uint64 | _PrevBytesPoppedIn |
Previous number of bytes received. | |
uint64 | _PrevBytesPushedOut |
Previous number of bytes sent. | |
bool | _ReplayMode |
Replay mode flag. | |
NLMISC::CSynchronized< CThreadPool > | _ThreadPool |
TThreadStategy | _ThreadStrategy |
Thread socket-handling strategy. | |
Friends | |
class | CListenTask |
class | CServerBufSock |
class | CServerReceiveTask |
class | NLNET::CBufSock |
|
Type of incoming events (max 256).
Definition at line 79 of file buf_net_base.h.
00079 { User = 'U', Connection = 'C', Disconnection = 'D' }; |
|
Definition at line 160 of file buf_server.h.
00160 { SpreadSockets, FillThreads }; |
|
Constructor Set nodelay to true to disable the Nagle buffering algorithm (see CTcpSock documentation) Definition at line 58 of file buf_server.cpp. References _ListenTask, _ListenThread, _ReplayMode, CListenTask, nlnettrace, and uint16.
00059 : 00060 CBufNetBase(), 00061 _NoDelay( nodelay ), 00062 _ThreadStrategy( strategy ), 00063 _MaxThreads( max_threads ), 00064 _MaxSocketsPerThread( max_sockets_per_thread ), 00065 _ListenTask( NULL ), 00066 _ListenThread( NULL ), 00067 _ThreadPool("CBufServer::_ThreadPool"), 00068 _ConnectionCallback( NULL ), 00069 _ConnectionCbArg( NULL ), 00070 _BytesPushedOut( 0 ), 00071 _BytesPoppedIn( 0 ), 00072 _PrevBytesPoppedIn( 0 ), 00073 _PrevBytesPushedOut( 0 ), 00074 _NbConnections (0), 00075 _ReplayMode( replaymode ) 00076 { 00077 nlnettrace( "CBufServer::CBufServer" ); 00078 if ( ! _ReplayMode ) 00079 { 00080 _ListenTask = new CListenTask( this ); 00081 _ListenThread = IThread::create( _ListenTask ); 00082 } 00083 /*{ 00084 CSynchronized<uint32>::CAccessor syncbpi ( &_BytesPushedIn ); 00085 syncbpi.value() = 0; 00086 }*/ 00087 } |
|
Destructor.
Definition at line 166 of file buf_server.cpp. References NLNET::CServerReceiveTask::_Connections, _ListenTask, _ListenThread, _ReplayMode, _ThreadPool, NLMISC::IThread::getRunnable(), nldebug, nlnettrace, receiveTask(), NLNET::CServerTask::requireExit(), and NLMISC::IThread::wait().
00167 { 00168 nlnettrace( "CBufServer::~CBufServer" ); 00169 00170 // Clean listen thread exit 00171 if ( ! _ReplayMode ) 00172 { 00173 ((CListenTask*)(_ListenThread->getRunnable()))->requireExit(); 00174 ((CListenTask*)(_ListenThread->getRunnable()))->close(); 00175 #ifdef NL_OS_UNIX 00176 _ListenTask->wakeUp(); 00177 #endif 00178 _ListenThread->wait(); 00179 delete _ListenThread; 00180 delete _ListenTask; 00181 00182 // Clean receive thread exits 00183 CThreadPool::iterator ipt; 00184 { 00185 nldebug( "LNETL1: Waiting for end of threads..." ); 00186 CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool ); 00187 for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt ) 00188 { 00189 // Tell the threads to exit and wake them up 00190 CServerReceiveTask *task = receiveTask(ipt); 00191 nlnettrace( "Requiring exit" ); 00192 task->requireExit(); 00193 00194 // Wake the threads up 00195 #ifdef NL_OS_UNIX 00196 task->wakeUp(); 00197 #else 00198 CConnections::iterator ipb; 00199 nlnettrace( "Closing sockets (Win32)" ); 00200 { 00201 CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections ); 00202 for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb ) 00203 { 00204 (*ipb)->Sock->close(); 00205 } 00206 } 00207 #endif 00208 00209 } 00210 00211 nlnettrace( "Waiting" ); 00212 for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt ) 00213 { 00214 // Wait until the threads have exited 00215 (*ipt)->wait(); 00216 } 00217 00218 nldebug( "LNETL1: Deleting sockets, tasks and threads..." ); 00219 for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt ) 00220 { 00221 // Delete the socket objects 00222 CServerReceiveTask *task = receiveTask(ipt); 00223 CConnections::iterator ipb; 00224 { 00225 CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections ); 00226 for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb ) 00227 { 00228 delete (*ipb); // closes and deletes the socket 00229 } 00230 } 00231 00232 // Delete the task objects 00233 delete task; 00234 00235 // Delete the thread objects 00236 delete (*ipt); 00237 } 00238 } 00239 } 00240 00241 nlnettrace( "Exiting CBufServer::~CBufServer" ); 00242 } |
|
Definition at line 958 of file buf_server.cpp. References NLNET::CServerReceiveTask::addNewSocket(), CServerReceiveTask, NLNET::CThreadPool, nlassert, nldebug, nlnettrace, and NLNET::CServerBufSock::setOwnerTask(). Referenced by dispatchNewSocket().
00959 { 00960 nlnettrace( "CBufServer::addNewThread" ); 00961 nlassert( bufsock != NULL ); 00962 00963 // Create new task and dispatch the socket to it 00964 CServerReceiveTask *task = new CServerReceiveTask( this ); 00965 bufsock->setOwnerTask( task ); 00966 task->addNewSocket( bufsock ); 00967 00968 // Add a new thread to the pool, with this task 00969 IThread *thr = IThread::create( task ); 00970 { 00971 threadpool.push_back( thr ); 00972 thr->start(); 00973 nldebug( "LNETL1: Added a new thread; pool size is %d", threadpool.size() ); 00974 nldebug( "LNETL1: New socket dispatched to thread %d", threadpool.size()-1 ); 00975 } 00976 } |
|
Returns the argument of the connection callback.
Definition at line 314 of file buf_server.h. References _ConnectionCbArg. Referenced by dataAvailable().
00314 { return _ConnectionCbArg; } |
|
Returns the argument of the disconnection callback.
Definition at line 160 of file buf_net_base.h. References NLNET::CBufNetBase::_DisconnectionCbArg. Referenced by dataAvailable(), and NLNET::CBufClient::dataAvailable().
00160 { return _DisconnectionCbArg; }
|
|
Returns the number of bytes popped by receive() since the beginning.
Definition at line 255 of file buf_server.h. References _BytesPoppedIn, and uint64. Referenced by newBytesReceived().
00255 { return _BytesPoppedIn; } |
|
Returns the number of bytes pushed by send() since the beginning.
Definition at line 261 of file buf_server.h. References _BytesPushedOut, and uint64. Referenced by newBytesSent().
00261 { return _BytesPushedOut; } |
|
Returns the connection callback.
Definition at line 311 of file buf_server.h. References _ConnectionCallback, and NLNET::TNetCallback. Referenced by dataAvailable().
00311 { return _ConnectionCallback; } |
|
Checks if there is some data to receive. Returns false if the receive queue is empty. This is where the connection/disconnection callbacks can be called. Reimplemented in NLNET::CCallbackServer. Definition at line 364 of file buf_server.cpp. References argOfConnectionCallback(), NLNET::CBufNetBase::argOfDisconnectionCallback(), NLNET::CBufSock::asString(), buffer, NLNET::CFifoAccessor, connectionCallback(), NLNET::CBufNetBase::dataAvailableFlag(), NLNET::CBufNetBase::disconnectionCallback(), nlassert, nldebug, nlerror, nlinfo, NLNET::CBufNetBase::receiveQueue(), NLNET::CBufSock::setConnectedState(), NLNET::CBufNetBase::setDataAvailableFlag(), NLMISC::stringFromVector(), NLNET::TSockId, uint16, and uint8.
00365 { 00366 // slow down the layer H_AUTO (CBufServer_dataAvailable); 00367 { 00368 /* If no data available, enter the 'while' loop and return false (1 volatile test) 00369 * If there are user data available, enter the 'while' and return true immediately (1 volatile test + 1 short locking) 00370 * If there is a connection/disconnection event (rare), call the callback and loop 00371 */ 00372 while ( dataAvailableFlag() ) 00373 { 00374 // Because _DataAvailable is true, the receive queue is not empty at this point 00375 vector<uint8> buffer; 00376 uint8 val; 00377 { 00378 CFifoAccessor recvfifo( &receiveQueue() ); 00379 val = recvfifo.value().frontLast(); 00380 if ( val != CBufNetBase::User ) 00381 { 00382 recvfifo.value().front( buffer ); 00383 } 00384 } 00385 00386 /*sint32 mbsize = recvfifo.value().size() / 1048576; 00387 if ( mbsize > 0 ) 00388 { 00389 nlwarning( "The receive queue size exceeds %d MB", mbsize ); 00390 }*/ 00391 00392 /*vector<uint8> buffer; 00393 recvfifo.value().front( buffer );*/ 00394 00395 // Test if it the next block is a system event 00396 //switch ( buffer[buffer.size()-1] ) 00397 switch ( val ) 00398 { 00399 00400 // Normal message available 00401 case CBufNetBase::User: 00402 return true; // return immediatly, do not extract the message 00403 00404 // Process disconnection event 00405 case CBufNetBase::Disconnection: 00406 { 00407 TSockId sockid = *((TSockId*)(&*buffer.begin())); 00408 nldebug( "LNETL1: Disconnection event for %p %s", sockid, sockid->asString().c_str()); 00409 00410 sockid->setConnectedState( false ); 00411 00412 // Call callback if needed 00413 if ( disconnectionCallback() != NULL ) 00414 { 00415 disconnectionCallback()( sockid, argOfDisconnectionCallback() ); 00416 } 00417 00418 // Add socket object into the synchronized remove list 00419 nldebug( "LNETL1: Adding the connection to the remove list" ); 00420 nlassert( ((CServerBufSock*)sockid)->ownerTask() != NULL ); 00421 ((CServerBufSock*)sockid)->ownerTask()->addToRemoveSet( sockid ); 00422 break; 00423 } 00424 // Process connection event 00425 case CBufNetBase::Connection: 00426 { 00427 TSockId sockid = *((TSockId*)(&*buffer.begin())); 00428 nldebug( "LNETL1: Connection event for %p %s", sockid, sockid->asString().c_str()); 00429 00430 sockid->setConnectedState( true ); 00431 00432 // Call callback if needed 00433 if ( connectionCallback() != NULL ) 00434 { 00435 connectionCallback()( sockid, argOfConnectionCallback() ); 00436 } 00437 break; 00438 } 00439 default: // should not occur 00440 nlinfo( "LNETL1: Invalid block type: %hu (should be = to %hu", (uint16)(buffer[buffer.size()-1]), (uint16)(val) ); 00441 nlinfo( "LNETL1: Buffer (%d B): [%s]", buffer.size(), stringFromVector(buffer).c_str() ); 00442 nlinfo( "LNETL1: Receive queue:" ); 00443 { 00444 CFifoAccessor recvfifo( &receiveQueue() ); 00445 recvfifo.value().display(); 00446 } 00447 nlerror( "LNETL1: Invalid system event type in server receive queue" ); 00448 00449 } 00450 00451 // Extract system event 00452 { 00453 CFifoAccessor recvfifo( &receiveQueue() ); 00454 recvfifo.value().pop(); 00455 setDataAvailableFlag( ! recvfifo.value().empty() ); 00456 } 00457 } 00458 // _DataAvailable is false here 00459 return false; 00460 } 00461 } |
|
Return _DataAvailable.
Definition at line 206 of file buf_net_base.h. References NLNET::CBufNetBase::_DataAvailable. Referenced by dataAvailable(), and NLNET::CBufClient::dataAvailable().
00206 { return _DataAvailable; }
|
|
Disconnect a connection Set hostid to InvalidSockId to disconnect all connections. If hostid is not InvalidSockId and the socket is not connected, the method does nothing. If quick is true, any pending data will not be sent before disconnecting. Definition at line 251 of file buf_server.cpp. References NLNET::CServerReceiveTask::_Connections, _ThreadPool, NLNET::CSock::connected(), NLNET::CTcpSock::disconnect(), NLNET::CBufSock::flush(), nlnettrace, receiveTask(), NLNET::CBufSock::Sock, and NLNET::TSockId.
00252 { 00253 nlnettrace( "CBufServer::disconnect" ); 00254 if ( hostid != InvalidSockId ) 00255 { 00256 // Disconnect only if physically connected 00257 if ( hostid->Sock->connected() ) 00258 { 00259 if ( ! quick ) 00260 { 00261 hostid->flush(); 00262 } 00263 hostid->Sock->disconnect(); // the connection will be removed by the next call of update() 00264 } 00265 } 00266 else 00267 { 00268 // Disconnect all 00269 CThreadPool::iterator ipt; 00270 { 00271 CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool ); 00272 for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt ) 00273 { 00274 CServerReceiveTask *task = receiveTask(ipt); 00275 CConnections::iterator ipb; 00276 { 00277 CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections ); 00278 for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb ) 00279 { 00280 if ( (*ipb)->Sock->connected() ) 00281 { 00282 if ( ! quick ) 00283 { 00284 (*ipb)->flush(); 00285 } 00286 (*ipb)->Sock->disconnect(); 00287 } 00288 } 00289 } 00290 } 00291 } 00292 } 00293 } |
|
Returns the disconnection callback.
Definition at line 157 of file buf_net_base.h. References NLNET::CBufNetBase::_DisconnectionCallback, and NLNET::TNetCallback. Referenced by dataAvailable(), and NLNET::CBufClient::dataAvailable().
00157 { return _DisconnectionCallback; }
|
|
Binds a new socket and send buffer to an existing or a new thread (that starts) Note: this method is called in the listening thread. Definition at line 866 of file buf_server.cpp. References _MaxSocketsPerThread, _MaxThreads, _ThreadPool, _ThreadStrategy, NLNET::CServerReceiveTask::addNewSocket(), addNewThread(), min, nldebug, nlnettrace, nlwarning, NLNET::CServerReceiveTask::numberOfConnections(), receiveTask(), NLNET::CServerBufSock::setOwnerTask(), SpreadSockets, and uint. Referenced by NLNET::CListenTask::run().
00867 { 00868 nlnettrace( "CBufServer::dispatchNewSocket" ); 00869 00870 CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool ); 00871 if ( _ThreadStrategy == SpreadSockets ) 00872 { 00873 // Find the thread with the smallest number of connections and check if all 00874 // threads do not have the same number of connections 00875 uint min = 0xFFFFFFFF; 00876 uint max = 0; 00877 CThreadPool::iterator ipt, iptmin, iptmax; 00878 for ( iptmin=iptmax=ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt ) 00879 { 00880 uint noc = receiveTask(ipt)->numberOfConnections(); 00881 if ( noc < min ) 00882 { 00883 min = noc; 00884 iptmin = ipt; 00885 } 00886 if ( noc > max ) 00887 { 00888 max = noc; 00889 iptmax = ipt; 00890 } 00891 } 00892 00893 // Check if we make the pool of threads grow (if we have not found vacant room 00894 // and if it is allowed to) 00895 if ( (poolsync.value().empty()) || 00896 ((min == max) && (poolsync.value().size() < _MaxThreads)) ) 00897 { 00898 addNewThread( poolsync.value(), bufsock ); 00899 } 00900 else 00901 { 00902 // Dispatch socket to an existing thread of the pool 00903 CServerReceiveTask *task = receiveTask(iptmin); 00904 bufsock->setOwnerTask( task ); 00905 task->addNewSocket( bufsock ); 00906 #ifdef NL_OS_UNIX 00907 task->wakeUp(); 00908 #endif 00909 00910 if ( min >= (uint)_MaxSocketsPerThread ) 00911 { 00912 nlwarning( "LNETL1: Exceeding the maximum number of sockets per thread" ); 00913 } 00914 nldebug( "LNETL1: New socket dispatched to thread %d", iptmin-poolsync.value().begin() ); 00915 } 00916 00917 } 00918 else // _ThreadStrategy == FillThreads 00919 { 00920 CThreadPool::iterator ipt; 00921 for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt ) 00922 { 00923 uint noc = receiveTask(ipt)->numberOfConnections(); 00924 if ( noc < _MaxSocketsPerThread ) 00925 { 00926 break; 00927 } 00928 } 00929 00930 // Check if we have to make the thread pool grow (if we have not found vacant room) 00931 if ( ipt == poolsync.value().end() ) 00932 { 00933 if ( poolsync.value().size() == _MaxThreads ) 00934 { 00935 nlwarning( "LNETL1: Exceeding the maximum number of threads" ); 00936 } 00937 addNewThread( poolsync.value(), bufsock ); 00938 } 00939 else 00940 { 00941 // Dispatch socket to an existing thread of the pool 00942 CServerReceiveTask *task = receiveTask(ipt); 00943 bufsock->setOwnerTask( task ); 00944 task->addNewSocket( bufsock ); 00945 #ifdef NL_OS_UNIX 00946 task->wakeUp(); 00947 #endif 00948 nldebug( "LNETL1: New socket dispatched to thread %d", ipt-poolsync.value().begin() ); 00949 } 00950 } 00951 } |
|
Reimplemented in NLNET::CCallbackClient, and NLNET::CCallbackServer. Definition at line 94 of file buf_net_base.h. References NLNET::CBufNetBase::_RecvFifo, and NLNET::CFifoAccessor.
00095 { 00096 CFifoAccessor syncfifo( &_RecvFifo ); 00097 syncfifo.value().displayStats(log); 00098 } |
|
Reimplemented in NLNET::CCallbackServer. Definition at line 713 of file buf_server.cpp. References NLNET::CServerReceiveTask::_Connections, _ThreadPool, NLMISC::CBufFIFO::displayStats(), receiveTask(), NLNET::CBufSock::SendFifo, and NLNET::TSockId.
00714 { 00715 if ( destid != InvalidSockId ) 00716 { 00717 destid->SendFifo.displayStats(log); 00718 } 00719 else 00720 { 00721 // add all client buffers 00722 00723 // For each thread 00724 CThreadPool::iterator ipt; 00725 { 00726 CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool ); 00727 for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt ) 00728 { 00729 // For each thread of the pool 00730 CServerReceiveTask *task = receiveTask(ipt); 00731 CConnections::iterator ipb; 00732 { 00733 CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections ); 00734 for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb ) 00735 { 00736 // For each socket of the thread, update sending 00737 (*ipb)->SendFifo.displayStats(log); 00738 } 00739 } 00740 } 00741 } 00742 } 00743 } |
|
Reimplemented in NLNET::CCallbackServer. Definition at line 695 of file buf_server.cpp. References _ListenTask, _ThreadPool, NLMISC::CLog::displayNL(), NLNET::CServerTask::NbLoop, and receiveTask().
00696 { 00697 // For each thread 00698 CThreadPool::iterator ipt; 00699 { 00700 CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool ); 00701 for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt ) 00702 { 00703 // For each thread of the pool 00704 CServerReceiveTask *task = receiveTask(ipt); 00705 // For each socket of the thread, update sending 00706 log->displayNL ("server receive thread %p nbloop %d", task, task->NbLoop); 00707 } 00708 } 00709 00710 log->displayNL ("server listen thread %p nbloop %d", _ListenTask, _ListenTask->NbLoop); 00711 } |
|
Force to send all data pending in the send queue.
Reimplemented in NLNET::CCallbackServer. Definition at line 232 of file buf_server.h. References NLNET::CBufSock::flush(), nlassert, and NLNET::TSockId.
00232 { nlassert( destid != InvalidSockId ); return destid->flush(); } |
|
Returns the size of the receive queue (mutexed).
Reimplemented in NLNET::CCallbackClient, and NLNET::CCallbackServer. Definition at line 88 of file buf_net_base.h. References NLNET::CBufNetBase::_RecvFifo, NLNET::CFifoAccessor, and uint32.
00089 { 00090 CFifoAccessor syncfifo( &_RecvFifo ); 00091 return syncfifo.value().size(); 00092 } |
|
Definition at line 660 of file buf_server.cpp. References NLNET::CServerReceiveTask::_Connections, _ThreadPool, receiveTask(), NLNET::CBufSock::SendFifo, NLMISC::CBufFIFO::size(), NLNET::TSockId, and uint32.
00661 { 00662 if ( destid != InvalidSockId ) 00663 { 00664 return destid->SendFifo.size(); 00665 } 00666 else 00667 { 00668 // add all client buffers 00669 00670 uint32 total = 0; 00671 00672 // For each thread 00673 CThreadPool::iterator ipt; 00674 { 00675 CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool ); 00676 for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt ) 00677 { 00678 // For each thread of the pool 00679 CServerReceiveTask *task = receiveTask(ipt); 00680 CConnections::iterator ipb; 00681 { 00682 CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections ); 00683 for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb ) 00684 { 00685 // For each socket of the thread, update sending 00686 total = (*ipb)->SendFifo.size (); 00687 } 00688 } 00689 } 00690 } 00691 return total; 00692 } 00693 } |
|
Returns the address of the specified host.
Reimplemented in NLNET::CCallbackServer. Definition at line 241 of file buf_server.h. References nlassert, NLNET::CSock::remoteAddr(), NLNET::CBufSock::Sock, and NLNET::TSockId.
00241 { nlassert( hostid != InvalidSockId ); return hostid->Sock->remoteAddr(); } |
|
Listens on the specified port.
Definition at line 93 of file buf_server.cpp. References _ListenTask, _ListenThread, _ReplayMode, NLNET::CListenTask::init(), NLNET::CBufNetBase::maxExpectedBlockSize(), nldebug, nlnettrace, NLMISC::IThread::start(), and uint16. Referenced by NLNET::CNetManager::addServer(), and NLNET::CUnifiedNetwork::init().
00094 { 00095 nlnettrace( "CBufServer::init" ); 00096 if ( ! _ReplayMode ) 00097 { 00098 _ListenTask->init( port, maxExpectedBlockSize() ); 00099 _ListenThread->start(); 00100 } 00101 else 00102 { 00103 nldebug( "LNETL1: Binding listen socket to any address, port %hu", port ); 00104 } 00105 } |
|
Returns the internet address of the listening socket.
Definition at line 238 of file buf_server.h. References _ListenTask, and NLNET::CListenTask::localAddr(). Referenced by NLNET::CLoginServer::init(), NLNET::NLMISC_DYNVARIABLE(), and NLNET::setListenAddress().
00238 { return _ListenTask->localAddr(); } |
|
Returns the max size of the received messages (default: 2^31-1).
Definition at line 135 of file buf_net_base.h. References NLNET::CBufNetBase::_MaxExpectedBlockSize, and uint32. Referenced by NLNET::CBufClient::connect(), and init().
00136 {
00137 return _MaxExpectedBlockSize;
00138 }
|
|
Returns the max size of the sent messages (default: 2^31-1).
Definition at line 141 of file buf_net_base.h. References NLNET::CBufNetBase::_MaxSentBlockSize, and uint32. Referenced by send(), and NLNET::CBufClient::send().
00142 {
00143 return _MaxSentBlockSize;
00144 }
|
|
Returns the number of connections (at the last update()).
Definition at line 267 of file buf_server.h. References _NbConnections, and uint32. Referenced by NLNET::CCallbackServer::send().
00267 { return _NbConnections; } |
|
Returns the number of bytes popped by receive() since the previous call to this method.
Definition at line 749 of file buf_server.cpp. References _PrevBytesPoppedIn, bytesReceived(), and uint64.
00750 { 00751 uint64 b = bytesReceived(); 00752 uint64 nbrecvd = b - _PrevBytesPoppedIn; 00753 //nlinfo( "b: %"NL_I64"u new: %"NL_I64"u", b, nbrecvd ); 00754 _PrevBytesPoppedIn = b; 00755 return nbrecvd; 00756 } |
|
Returns the number of bytes pushed by send() since the previous call to this method.
Definition at line 761 of file buf_server.cpp. References _PrevBytesPushedOut, bytesSent(), and uint64.
00762 { 00763 uint64 b = bytesSent(); 00764 uint64 nbsent = b - _PrevBytesPushedOut; 00765 //nlinfo( "b: %"NL_I64"u new: %"NL_I64"u", b, nbsent ); 00766 _PrevBytesPushedOut = b; 00767 return nbsent; 00768 } |
|
Returns the TCP_NODELAY flag.
Definition at line 276 of file buf_server.h. Referenced by NLNET::CListenTask::run().
00276 { return _NoDelay; } |
|
Pushes a buffer to the specified host's send queue and update (unless not connected).
Definition at line 298 of file buf_server.h. References _BytesPushedOut, buffer, nlassert, NLNET::CBufSock::pushBuffer(), NLNET::TBlockSize, and NLNET::TSockId. Referenced by send().
00299 { 00300 nlassert( hostid != InvalidSockId ); 00301 if ( hostid->pushBuffer( buffer ) ) 00302 { 00303 _BytesPushedOut += buffer.length() + sizeof(TBlockSize); // statistics 00304 } 00305 } |
|
Definition at line 183 of file buf_net_base.h. References NLNET::CBufNetBase::_RecvFifo, buffer, NLNET::CFifoAccessor, NLNET::CBufNetBase::setDataAvailableFlag(), size, uint32, and uint8.
00184 { 00185 //sint32 mbsize; 00186 { 00187 //nldebug( "BNB: Acquiring the receive queue... "); 00188 CFifoAccessor recvfifo( &_RecvFifo ); 00189 //nldebug( "BNB: Acquired, pushing the received buffer... "); 00190 recvfifo.value().push( buffer, size ); 00191 //nldebug( "BNB: Pushed, releasing the receive queue..." ); 00192 //mbsize = recvfifo.value().size() / 1048576; 00193 setDataAvailableFlag( true ); 00194 } 00195 //nldebug( "BNB: Released." ); 00196 /*if ( mbsize > 1 ) 00197 { 00198 nlwarning( "The receive queue size exceeds %d MB", mbsize ); 00199 }*/ 00200 } |
|
Push message into receive queue (mutexed).
Definition at line 164 of file buf_net_base.h. References NLNET::CBufNetBase::_RecvFifo, buffer, NLNET::CFifoAccessor, and NLNET::CBufNetBase::setDataAvailableFlag(). Referenced by NLNET::CBufSock::advertiseSystemEvent(), and NLNET::CClientReceiveTask::run().
00165 { 00166 //sint32 mbsize; 00167 { 00168 //nldebug( "BNB: Acquiring the receive queue... "); 00169 CFifoAccessor recvfifo( &_RecvFifo ); 00170 //nldebug( "BNB: Acquired, pushing the received buffer... "); 00171 recvfifo.value().push( buffer ); 00172 //nldebug( "BNB: Pushed, releasing the receive queue..." ); 00173 //mbsize = recvfifo.value().size() / 1048576; 00174 setDataAvailableFlag( true ); 00175 } 00176 //nldebug( "BNB: Released." ); 00177 //if ( mbsize > 1 ) 00178 //{ 00179 // nlwarning( "The receive queue size exceeds %d MB", mbsize ); 00180 //} 00181 } |
|
Receives next block of data in the specified (resizes the vector) You must call dataAvailable() before every call to receive() Definition at line 565 of file buf_server.cpp. References _BytesPoppedIn, buffer, NLNET::CFifoAccessor, nlassert, NLMISC_BSWAP32, nlnettrace, nlwarning, NLNET::CBufNetBase::receiveQueue(), NLNET::CBufNetBase::setDataAvailableFlag(), NLNET::TBlockSize, NLNET::TSockId, and uint32.
00566 { 00567 nlnettrace( "CBufServer::receive" ); 00568 //nlassert( dataAvailable() ); 00569 nlassert( phostid != NULL ); 00570 { 00571 CFifoAccessor recvfifo( &receiveQueue() ); 00572 nlassert( ! recvfifo.value().empty() ); 00573 recvfifo.value().front( buffer ); 00574 recvfifo.value().pop(); 00575 setDataAvailableFlag( ! recvfifo.value().empty() ); 00576 } 00577 00578 // Extract hostid (and event type) 00579 *phostid = *((TSockId*)&(buffer.buffer()[buffer.size()-sizeof(TSockId)-1])); 00580 nlassert( buffer.buffer()[buffer.size()-1] == CBufNetBase::User ); 00581 00582 // debug features, we number all packet to be sure that they are all sent and received 00583 // \todo remove this debug feature when ok 00584 #ifdef NL_BIG_ENDIAN 00585 uint32 val = NLMISC_BSWAP32(*(uint32*)buffer.buffer()); 00586 #else 00587 uint32 val = *(uint32*)buffer.buffer(); 00588 #endif 00589 00590 // nldebug ("receive message number %u", val); 00591 if ((*phostid)->ReceiveNextValue != val) 00592 { 00593 nlwarning ("LNETL1: !!!LOST A MESSAGE!!! I received the message number %u but I'm waiting the message number %u (cnx %s), warn lecroart@nevrax.com with the log now please", val, (*phostid)->ReceiveNextValue, (*phostid)->asString().c_str()); 00594 // resync the message number 00595 (*phostid)->ReceiveNextValue = val; 00596 } 00597 00598 (*phostid)->ReceiveNextValue++; 00599 00600 buffer.resize( buffer.size()-sizeof(TSockId)-1 ); 00601 00602 // TODO OPTIM remove the nldebug for speed 00603 //commented for optimisation nldebug( "LNETL1: Read buffer (%d+%d B) from %s", buffer.size(), sizeof(TSockId)+1, /*stringFromVector(buffer).c_str(), */(*phostid)->asString().c_str() ); 00604 00605 // Statistics 00606 _BytesPoppedIn += buffer.size() + sizeof(TBlockSize); 00607 } |
|
Access to the receive queue.
Definition at line 154 of file buf_net_base.h. References NLNET::CBufNetBase::_RecvFifo, and NLNET::CSynchronizedFIFO. Referenced by dataAvailable(), NLNET::CBufClient::dataAvailable(), NLNET::CBufClient::disconnect(), receive(), NLNET::CBufClient::receive(), and NLNET::CServerReceiveTask::run().
00154 { return _RecvFifo; }
|
|
Returns the receive task corresponding to a particular thread.
Definition at line 284 of file buf_server.h. Referenced by disconnect(), dispatchNewSocket(), displaySendQueueStat(), displayThreadStat(), getSendQueueSize(), send(), update(), and ~CBufServer().
00285 { 00286 return ((CServerReceiveTask*)((*ipt)->getRunnable())); 00287 } |
|
Send a message to the specified host, or to all hosts if hostid is InvalidSockId Reimplemented in NLNET::CCallbackServer. Definition at line 299 of file buf_server.cpp. References NLNET::CServerReceiveTask::_Connections, _ThreadPool, buffer, NLNET::CBufNetBase::maxSentBlockSize(), nlassert, nlassertex, NLMISC_BSWAP32, nlnettrace, pushBufferToHost(), receiveTask(), NLNET::CBufSock::SendNextValue, NLNET::TSockId, and uint32.
00300 { 00301 nlnettrace( "CBufServer::send" ); 00302 nlassert( buffer.length() > 0 ); 00303 nlassertex( buffer.length() <= maxSentBlockSize(), ("length=%u max=%u", buffer.length(), maxSentBlockSize()) ); 00304 00305 // slow down the layer H_AUTO (CBufServer_send); 00306 00307 if ( hostid != InvalidSockId ) 00308 { 00309 // debug features, we number all packet to be sure that they are all sent and received 00310 // \todo remove this debug feature when ok 00311 // nldebug ("send message number %u", hostid->SendNextValue); 00312 #ifdef NL_BIG_ENDIAN 00313 uint32 val = NLMISC_BSWAP32(hostid->SendNextValue); 00314 #else 00315 uint32 val = hostid->SendNextValue; 00316 #endif 00317 00318 *(uint32*)buffer.buffer() = val; 00319 hostid->SendNextValue++; 00320 00321 pushBufferToHost( buffer, hostid ); 00322 } 00323 else 00324 { 00325 // Push into all send queues 00326 CThreadPool::iterator ipt; 00327 { 00328 CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool ); 00329 for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt ) 00330 { 00331 CServerReceiveTask *task = receiveTask(ipt); 00332 CConnections::iterator ipb; 00333 { 00334 CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections ); 00335 for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb ) 00336 { 00337 // Send only if the socket is logically connected 00338 if ( (*ipb)->connectedState() ) 00339 { 00340 // debug features, we number all packet to be sure that they are all sent and received 00341 // \todo remove this debug feature when ok 00342 // nldebug ("send message number %u", (*ipb)->SendNextValue); 00343 #ifdef NL_BIG_ENDIAN 00344 uint32 val = NLMISC_BSWAP32((*ipb)->SendNextValue); 00345 #else 00346 uint32 val = (*ipb)->SendNextValue; 00347 #endif 00348 *(uint32*)buffer.buffer() = val; 00349 (*ipb)->SendNextValue++; 00350 00351 pushBufferToHost( buffer, *ipb ); 00352 } 00353 } 00354 } 00355 } 00356 } 00357 } 00358 } |
|
Sets callback for incoming connections (or NULL to disable callback).
Reimplemented in NLNET::CCallbackServer. Definition at line 184 of file buf_server.h. References _ConnectionCallback, _ConnectionCbArg, and NLNET::TNetCallback.
00184 { _ConnectionCallback = cb; _ConnectionCbArg = arg; } |
|
Sets _DataAvailable.
Definition at line 203 of file buf_net_base.h. References NLNET::CBufNetBase::_DataAvailable. Referenced by dataAvailable(), NLNET::CBufClient::dataAvailable(), NLNET::CBufClient::disconnect(), NLNET::CBufNetBase::pushMessageIntoReceiveQueue(), receive(), NLNET::CBufClient::receive(), and NLNET::CServerReceiveTask::run().
00203 { _DataAvailable = da; } |
|
Sets callback for detecting a disconnection (or NULL to disable callback).
Reimplemented in NLNET::CCallbackClient, and NLNET::CCallbackServer. Definition at line 85 of file buf_net_base.h. References NLNET::CBufNetBase::_DisconnectionCallback, NLNET::CBufNetBase::_DisconnectionCbArg, and NLNET::TNetCallback.
00085 { _DisconnectionCallback = cb; _DisconnectionCbArg = arg; } |
|
Sets the max size of the received messages. If receiving a message bigger than the limit, the connection will be dropped. Default value: 1 MegaByte If you put a negative number as limit, the max size is reset to the default value. Warning: you can call this method only at initialization time, before connecting (for a client) or calling init() (for a server) ! Definition at line 109 of file buf_net_base.h. References NLNET::CBufNetBase::_MaxExpectedBlockSize, sint32, and uint32.
00110 { 00111 if ( limit < 0 ) 00112 _MaxExpectedBlockSize = 1048576; 00113 else 00114 _MaxExpectedBlockSize = (uint32)limit; 00115 } |
|
Sets the max size of the sent messages. Note: Limiting of sending not implemented, currently Default value: 1 MegaByte If you put a negative number as limit, the max size is reset to the default value. Warning: you can call this method only at initialization time, before connecting (for a client) or calling init() (for a server) ! Definition at line 126 of file buf_net_base.h. References NLNET::CBufNetBase::_MaxSentBlockSize, sint32, and uint32.
00127 { 00128 if ( limit < 0 ) 00129 _MaxSentBlockSize = 1048576; 00130 else 00131 _MaxSentBlockSize = (uint32)limit; 00132 } |
|
Sets the size flush trigger. When the size of the send queue reaches or exceeds this value, all data in the send queue is automatically sent (-1 to disable this trigger ) Definition at line 226 of file buf_server.h. References nlassert, NLNET::CBufSock::setSizeFlushTrigger(), sint32, size, and NLNET::TSockId.
00226 { nlassert( destid != InvalidSockId ); destid->setSizeFlushTrigger( size ); } |
|
Sets the time flush trigger (in millisecond). When this time is elapsed, all data in the send queue is automatically sent (-1 to disable this trigger) Definition at line 221 of file buf_server.h. References nlassert, NLNET::CBufSock::setTimeFlushTrigger(), sint32, and NLNET::TSockId.
00221 { nlassert( destid != InvalidSockId ); destid->setTimeFlushTrigger( ms ); } |
|
Update the network (call this method evenly).
Definition at line 613 of file buf_server.cpp. References NLNET::CServerReceiveTask::_Connections, _NbConnections, _ThreadPool, nldebug, and receiveTask().
00614 { 00615 //nlnettrace( "CBufServer::update-BEGIN" ); 00616 00617 _NbConnections = 0; 00618 00619 // For each thread 00620 CThreadPool::iterator ipt; 00621 { 00622 //nldebug( "UPD: Acquiring the Thread Pool" ); 00623 CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool ); 00624 //nldebug( "UPD: Acquired." ); 00625 for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt ) 00626 { 00627 // For each thread of the pool 00628 CServerReceiveTask *task = receiveTask(ipt); 00629 CConnections::iterator ipb; 00630 { 00631 CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections ); 00632 for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb ) 00633 { 00634 // For each socket of the thread, update sending 00635 if ( ! ((*ipb)->Sock->connected() && (*ipb)->update()) ) 00636 { 00637 // Update did not work or the socket is not connected anymore 00638 nldebug( "LNETL1: Socket %s is disconnected", (*ipb)->asString().c_str() ); 00639 // Disconnection event if disconnected (known either from flush (in update) or when receiving data) 00640 (*ipb)->advertiseDisconnection( this, *ipb ); 00641 00642 /*if ( (*ipb)->advertiseDisconnection( this, *ipb ) ) 00643 { 00644 // Now the connection removal is in dataAvailable() 00645 // POLL6 00646 }*/ 00647 } 00648 else 00649 { 00650 _NbConnections++; 00651 } 00652 } 00653 } 00654 } 00655 } 00656 00657 //nlnettrace( "CBufServer::update-END" ); 00658 } |
|
Definition at line 272 of file buf_server.h. Referenced by CBufServer(). |
|
Definition at line 271 of file buf_server.h. |
|
Definition at line 273 of file buf_server.h. Referenced by addNewThread(). |
|
Definition at line 148 of file buf_net_base.h. |
|
Number of bytes popped by receive() since the beginning.
Definition at line 358 of file buf_server.h. Referenced by bytesReceived(), and receive(). |
|
Number of bytes pushed by send() since the beginning.
Definition at line 355 of file buf_server.h. Referenced by bytesSent(), and pushBufferToHost(). |
|
Connection callback.
Reimplemented in NLNET::CCallbackServer. Definition at line 349 of file buf_server.h. Referenced by connectionCallback(), and setConnectionCallback(). |
|
Argument of the connection callback.
Reimplemented in NLNET::CCallbackServer. Definition at line 352 of file buf_server.h. Referenced by argOfConnectionCallback(), and setConnectionCallback(). |
|
Listen task.
Definition at line 335 of file buf_server.h. Referenced by CBufServer(), displayThreadStat(), init(), listenAddress(), and ~CBufServer(). |
|
Listen thread.
Definition at line 338 of file buf_server.h. Referenced by CBufServer(), init(), and ~CBufServer(). |
|
Max number of sockets handled by one thread.
Definition at line 332 of file buf_server.h. Referenced by dispatchNewSocket(). |
|
Max number of threads.
Definition at line 329 of file buf_server.h. Referenced by dispatchNewSocket(). |
|
Number of connections (debug stat).
Definition at line 367 of file buf_server.h. Referenced by nbConnections(), and update(). |
|
TCP_NODELAY.
Definition at line 323 of file buf_server.h. |
|
Previous number of bytes received.
Definition at line 361 of file buf_server.h. Referenced by newBytesReceived(). |
|
Previous number of bytes sent.
Definition at line 364 of file buf_server.h. Referenced by newBytesSent(). |
|
Replay mode flag.
Definition at line 370 of file buf_server.h. Referenced by CBufServer(), init(), and ~CBufServer(). |
|
Definition at line 346 of file buf_server.h. Referenced by disconnect(), dispatchNewSocket(), displaySendQueueStat(), displayThreadStat(), getSendQueueSize(), send(), update(), and ~CBufServer(). |
|
Thread socket-handling strategy.
Definition at line 326 of file buf_server.h. Referenced by dispatchNewSocket(). |