NLNET::CBufServer Class Reference

#include <buf_server.h>

Inheritance diagram for NLNET::CBufServer:

NLNET::CBufNetBase NLNET::CCallbackServer

Detailed Description

Server class for layer 1

Listening socket and accepted connetions, with packet scheme. The provided buffers are sent raw (no endianness conversion). By default, the size time trigger is disabled, the time trigger is set to 20 ms.

Where do the methods take place: send(), -> send buffer -> update(), flush() bytesSent(), newBytesSent()

receive(), dataAvailable(), <- receive buffer <- receive thread, dataAvailable(), bytesReceived(), newBytesReceived(), connection callback, disconnection callback

Author:
Olivier Cado

Nevrax France

Date:
2001

Definition at line 156 of file buf_server.h.

Public Types

enum  TEventType { User = 'U', Connection = 'C', Disconnection = 'D' }
 Type of incoming events (max 256). More...

enum  TThreadStategy { SpreadSockets, FillThreads }

Public Member Functions

uint64 bytesReceived () const
 Returns the number of bytes popped by receive() since the beginning.

uint64 bytesSent () const
 Returns the number of bytes pushed by send() since the beginning.

 CBufServer (TThreadStategy strategy=FillThreads, uint16 max_threads=64, uint16 max_sockets_per_thread=16, bool nodelay=true, bool replaymode=false)
bool dataAvailable ()
void disconnect (TSockId hostid, bool quick=false)
void displayReceiveQueueStat (NLMISC::CLog *log=NLMISC::InfoLog)
void displaySendQueueStat (NLMISC::CLog *log=NLMISC::InfoLog, TSockId destid=InvalidSockId)
void displayThreadStat (NLMISC::CLog *log=NLMISC::InfoLog)
bool flush (TSockId destid)
uint32 getReceiveQueueSize ()
 Returns the size of the receive queue (mutexed).

uint32 getSendQueueSize (TSockId destid)
const CInetAddresshostAddress (TSockId hostid)
 Returns the address of the specified host.

void init (uint16 port)
 Listens on the specified port.

const CInetAddresslistenAddress () const
 Returns the internet address of the listening socket.

uint32 maxExpectedBlockSize () const
 Returns the max size of the received messages (default: 2^31-1).

uint32 maxSentBlockSize () const
 Returns the max size of the sent messages (default: 2^31-1).

uint32 nbConnections () const
 Returns the number of connections (at the last update()).

uint64 newBytesReceived ()
 Returns the number of bytes popped by receive() since the previous call to this method.

uint64 newBytesSent ()
 Returns the number of bytes pushed by send() since the previous call to this method.

void receive (NLMISC::CMemStream &buffer, TSockId *hostid)
void send (const NLMISC::CMemStream &buffer, TSockId hostid)
void setConnectionCallback (TNetCallback cb, void *arg)
 Sets callback for incoming connections (or NULL to disable callback).

void setDisconnectionCallback (TNetCallback cb, void *arg)
 Sets callback for detecting a disconnection (or NULL to disable callback).

void setMaxExpectedBlockSize (sint32 limit)
void setMaxSentBlockSize (sint32 limit)
void setSizeFlushTrigger (TSockId destid, sint32 size)
void setTimeFlushTrigger (TSockId destid, sint32 ms)
void update ()
 Update the network (call this method evenly).

virtual ~CBufServer ()
 Destructor.


Protected Member Functions

void addNewThread (CThreadPool &threadpool, CServerBufSock *bufsock)
void * argOfConnectionCallback () const
 Returns the argument of the connection callback.

void * argOfDisconnectionCallback () const
 Returns the argument of the disconnection callback.

TNetCallback connectionCallback () const
 Returns the connection callback.

volatile bool dataAvailableFlag () const
 Return _DataAvailable.

TNetCallback disconnectionCallback () const
 Returns the disconnection callback.

void dispatchNewSocket (CServerBufSock *bufsock)
bool noDelay () const
 Returns the TCP_NODELAY flag.

void pushBufferToHost (const NLMISC::CMemStream &buffer, TSockId hostid)
 Pushes a buffer to the specified host's send queue and update (unless not connected).

void pushMessageIntoReceiveQueue (const uint8 *buffer, uint32 size)
void pushMessageIntoReceiveQueue (const std::vector< uint8 > &buffer)
 Push message into receive queue (mutexed).

CSynchronizedFIFOreceiveQueue ()
 Access to the receive queue.

CServerReceiveTaskreceiveTask (std::vector< NLMISC::IThread * >::iterator ipt)
 Returns the receive task corresponding to a particular thread.

void setDataAvailableFlag (bool da)
 Sets _DataAvailable.


Private Attributes

uint64 _BytesPoppedIn
 Number of bytes popped by receive() since the beginning.

uint64 _BytesPushedOut
 Number of bytes pushed by send() since the beginning.

TNetCallback _ConnectionCallback
 Connection callback.

void * _ConnectionCbArg
 Argument of the connection callback.

CListenTask_ListenTask
 Listen task.

NLMISC::IThread_ListenThread
 Listen thread.

uint16 _MaxSocketsPerThread
 Max number of sockets handled by one thread.

uint16 _MaxThreads
 Max number of threads.

uint32 _NbConnections
 Number of connections (debug stat).

bool _NoDelay
 TCP_NODELAY.

uint64 _PrevBytesPoppedIn
 Previous number of bytes received.

uint64 _PrevBytesPushedOut
 Previous number of bytes sent.

bool _ReplayMode
 Replay mode flag.

NLMISC::CSynchronized< CThreadPool_ThreadPool
TThreadStategy _ThreadStrategy
 Thread socket-handling strategy.


Friends

class CListenTask
class CServerBufSock
class CServerReceiveTask
class NLNET::CBufSock


Member Enumeration Documentation

enum NLNET::CBufNetBase::TEventType [inherited]
 

Type of incoming events (max 256).

Enumeration values:
User 
Connection 
Disconnection 

Definition at line 79 of file buf_net_base.h.

00079 { User = 'U', Connection = 'C', Disconnection = 'D' };

enum NLNET::CBufServer::TThreadStategy
 

Enumeration values:
SpreadSockets 
FillThreads 

Definition at line 160 of file buf_server.h.


Constructor & Destructor Documentation

NLNET::CBufServer::CBufServer TThreadStategy  strategy = FillThreads,
uint16  max_threads = 64,
uint16  max_sockets_per_thread = 16,
bool  nodelay = true,
bool  replaymode = false
 

Constructor Set nodelay to true to disable the Nagle buffering algorithm (see CTcpSock documentation)

Definition at line 58 of file buf_server.cpp.

References _ListenTask, _ListenThread, _ReplayMode, CListenTask, nlnettrace, and uint16.

00059                                                                                            :
00060         CBufNetBase(),
00061         _NoDelay( nodelay ),
00062         _ThreadStrategy( strategy ),
00063         _MaxThreads( max_threads ),
00064         _MaxSocketsPerThread( max_sockets_per_thread ),
00065         _ListenTask( NULL ),
00066         _ListenThread( NULL ),
00067         _ThreadPool("CBufServer::_ThreadPool"),
00068         _ConnectionCallback( NULL ),
00069         _ConnectionCbArg( NULL ),
00070         _BytesPushedOut( 0 ),
00071         _BytesPoppedIn( 0 ),
00072         _PrevBytesPoppedIn( 0 ),
00073         _PrevBytesPushedOut( 0 ),
00074         _NbConnections (0),
00075         _ReplayMode( replaymode )
00076 {
00077         nlnettrace( "CBufServer::CBufServer" );
00078         if ( ! _ReplayMode )
00079         {
00080                 _ListenTask = new CListenTask( this );
00081                 _ListenThread = IThread::create( _ListenTask );
00082         }
00083         /*{
00084                 CSynchronized<uint32>::CAccessor syncbpi ( &_BytesPushedIn );
00085                 syncbpi.value() = 0;
00086         }*/
00087 }

NLNET::CBufServer::~CBufServer  )  [virtual]
 

Destructor.

Definition at line 166 of file buf_server.cpp.

References NLNET::CServerReceiveTask::_Connections, _ListenTask, _ListenThread, _ReplayMode, _ThreadPool, NLMISC::IThread::getRunnable(), nldebug, nlnettrace, receiveTask(), NLNET::CServerTask::requireExit(), and NLMISC::IThread::wait().

00167 {
00168         nlnettrace( "CBufServer::~CBufServer" );
00169 
00170         // Clean listen thread exit
00171         if ( ! _ReplayMode )
00172         {
00173                 ((CListenTask*)(_ListenThread->getRunnable()))->requireExit();
00174                 ((CListenTask*)(_ListenThread->getRunnable()))->close();
00175 #ifdef NL_OS_UNIX
00176                 _ListenTask->wakeUp();
00177 #endif
00178                 _ListenThread->wait();
00179                 delete _ListenThread;
00180                 delete _ListenTask;
00181 
00182                 // Clean receive thread exits
00183                 CThreadPool::iterator ipt;
00184                 {
00185                         nldebug( "LNETL1: Waiting for end of threads..." );
00186                         CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
00187                         for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
00188                         {
00189                                 // Tell the threads to exit and wake them up
00190                                 CServerReceiveTask *task = receiveTask(ipt);
00191                                 nlnettrace( "Requiring exit" );
00192                                 task->requireExit();
00193 
00194                                 // Wake the threads up
00195         #ifdef NL_OS_UNIX
00196                                 task->wakeUp();
00197         #else
00198                                 CConnections::iterator ipb;
00199                                 nlnettrace( "Closing sockets (Win32)" );
00200                                 {
00201                                         CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
00202                                         for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
00203                                         {
00204                                                 (*ipb)->Sock->close();
00205                                         }
00206                                 }
00207         #endif
00208 
00209                         }
00210                         
00211                         nlnettrace( "Waiting" );
00212                         for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
00213                         {
00214                                 // Wait until the threads have exited
00215                                 (*ipt)->wait();
00216                         }
00217 
00218                         nldebug( "LNETL1: Deleting sockets, tasks and threads..." );
00219                         for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
00220                         {
00221                                 // Delete the socket objects
00222                                 CServerReceiveTask *task = receiveTask(ipt);
00223                                 CConnections::iterator ipb;
00224                                 {
00225                                         CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
00226                                         for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
00227                                         {
00228                                                 delete (*ipb); // closes and deletes the socket
00229                                         }
00230                                 }
00231 
00232                                 // Delete the task objects
00233                                 delete task;
00234 
00235                                 // Delete the thread objects
00236                                 delete (*ipt);
00237                         }
00238                 }
00239         }
00240 
00241         nlnettrace( "Exiting CBufServer::~CBufServer" );
00242 }


Member Function Documentation

void NLNET::CBufServer::addNewThread CThreadPool threadpool,
CServerBufSock bufsock
[protected]
 

Definition at line 958 of file buf_server.cpp.

References NLNET::CServerReceiveTask::addNewSocket(), CServerReceiveTask, NLNET::CThreadPool, nlassert, nldebug, nlnettrace, and NLNET::CServerBufSock::setOwnerTask().

Referenced by dispatchNewSocket().

00959 {
00960         nlnettrace( "CBufServer::addNewThread" );
00961         nlassert( bufsock != NULL );
00962 
00963         // Create new task and dispatch the socket to it
00964         CServerReceiveTask *task = new CServerReceiveTask( this );
00965         bufsock->setOwnerTask( task );
00966         task->addNewSocket( bufsock );
00967 
00968         // Add a new thread to the pool, with this task
00969         IThread *thr = IThread::create( task );
00970         {
00971                 threadpool.push_back( thr );
00972                 thr->start();
00973                 nldebug( "LNETL1: Added a new thread; pool size is %d", threadpool.size() );
00974                 nldebug( "LNETL1: New socket dispatched to thread %d", threadpool.size()-1 );
00975         }
00976 }

void* NLNET::CBufServer::argOfConnectionCallback  )  const [inline, protected]
 

Returns the argument of the connection callback.

Definition at line 314 of file buf_server.h.

References _ConnectionCbArg.

Referenced by dataAvailable().

00314 { return _ConnectionCbArg; }

void* NLNET::CBufNetBase::argOfDisconnectionCallback  )  const [inline, protected, inherited]
 

Returns the argument of the disconnection callback.

Definition at line 160 of file buf_net_base.h.

References NLNET::CBufNetBase::_DisconnectionCbArg.

Referenced by dataAvailable(), and NLNET::CBufClient::dataAvailable().

00160 { return _DisconnectionCbArg; }

uint64 NLNET::CBufServer::bytesReceived  )  const [inline]
 

Returns the number of bytes popped by receive() since the beginning.

Definition at line 255 of file buf_server.h.

References _BytesPoppedIn, and uint64.

Referenced by newBytesReceived().

00255 { return _BytesPoppedIn; }

uint64 NLNET::CBufServer::bytesSent  )  const [inline]
 

Returns the number of bytes pushed by send() since the beginning.

Definition at line 261 of file buf_server.h.

References _BytesPushedOut, and uint64.

Referenced by newBytesSent().

00261 { return _BytesPushedOut; }

TNetCallback NLNET::CBufServer::connectionCallback  )  const [inline, protected]
 

Returns the connection callback.

Definition at line 311 of file buf_server.h.

References _ConnectionCallback, and NLNET::TNetCallback.

Referenced by dataAvailable().

00311 { return _ConnectionCallback; }

bool NLNET::CBufServer::dataAvailable  ) 
 

Checks if there is some data to receive. Returns false if the receive queue is empty. This is where the connection/disconnection callbacks can be called.

Reimplemented in NLNET::CCallbackServer.

Definition at line 364 of file buf_server.cpp.

References argOfConnectionCallback(), NLNET::CBufNetBase::argOfDisconnectionCallback(), NLNET::CBufSock::asString(), buffer, NLNET::CFifoAccessor, connectionCallback(), NLNET::CBufNetBase::dataAvailableFlag(), NLNET::CBufNetBase::disconnectionCallback(), nlassert, nldebug, nlerror, nlinfo, NLNET::CBufNetBase::receiveQueue(), NLNET::CBufSock::setConnectedState(), NLNET::CBufNetBase::setDataAvailableFlag(), NLMISC::stringFromVector(), NLNET::TSockId, uint16, and uint8.

00365 {
00366         // slow down the layer H_AUTO (CBufServer_dataAvailable);
00367         {
00368                 /* If no data available, enter the 'while' loop and return false (1 volatile test)
00369                  * If there are user data available, enter the 'while' and return true immediately (1 volatile test + 1 short locking)
00370                  * If there is a connection/disconnection event (rare), call the callback and loop
00371                  */
00372                 while ( dataAvailableFlag() )
00373                 {
00374                         // Because _DataAvailable is true, the receive queue is not empty at this point
00375                         vector<uint8> buffer;
00376                         uint8 val;
00377                         {
00378                                 CFifoAccessor recvfifo( &receiveQueue() );
00379                                 val = recvfifo.value().frontLast();
00380                                 if ( val != CBufNetBase::User )
00381                                 {
00382                                         recvfifo.value().front( buffer );
00383                                 }
00384                         }
00385 
00386                         /*sint32 mbsize = recvfifo.value().size() / 1048576;
00387                         if ( mbsize > 0 )
00388                         {
00389                           nlwarning( "The receive queue size exceeds %d MB", mbsize );
00390                         }*/
00391 
00392                         /*vector<uint8> buffer;
00393                         recvfifo.value().front( buffer );*/
00394 
00395                         // Test if it the next block is a system event
00396                         //switch ( buffer[buffer.size()-1] )
00397                         switch ( val )
00398                         {
00399                                 
00400                         // Normal message available
00401                         case CBufNetBase::User:
00402                                 return true; // return immediatly, do not extract the message
00403 
00404                         // Process disconnection event
00405                         case CBufNetBase::Disconnection:
00406                         {
00407                                 TSockId sockid = *((TSockId*)(&*buffer.begin()));
00408                                 nldebug( "LNETL1: Disconnection event for %p %s", sockid, sockid->asString().c_str());
00409 
00410                                 sockid->setConnectedState( false );
00411 
00412                                 // Call callback if needed
00413                                 if ( disconnectionCallback() != NULL )
00414                                 {
00415                                         disconnectionCallback()( sockid, argOfDisconnectionCallback() );
00416                                 }
00417 
00418                                 // Add socket object into the synchronized remove list
00419                                 nldebug( "LNETL1: Adding the connection to the remove list" );
00420                                 nlassert( ((CServerBufSock*)sockid)->ownerTask() != NULL );
00421                                 ((CServerBufSock*)sockid)->ownerTask()->addToRemoveSet( sockid );
00422                                 break;
00423                         }
00424                         // Process connection event
00425                         case CBufNetBase::Connection:
00426                         {
00427                                 TSockId sockid = *((TSockId*)(&*buffer.begin()));
00428                                 nldebug( "LNETL1: Connection event for %p %s", sockid, sockid->asString().c_str());
00429 
00430                                 sockid->setConnectedState( true );
00431                                 
00432                                 // Call callback if needed
00433                                 if ( connectionCallback() != NULL )
00434                                 {
00435                                         connectionCallback()( sockid, argOfConnectionCallback() );
00436                                 }
00437                                 break;
00438                         }
00439                         default: // should not occur
00440                                 nlinfo( "LNETL1: Invalid block type: %hu (should be = to %hu", (uint16)(buffer[buffer.size()-1]), (uint16)(val) );
00441                                 nlinfo( "LNETL1: Buffer (%d B): [%s]", buffer.size(), stringFromVector(buffer).c_str() );
00442                                 nlinfo( "LNETL1: Receive queue:" );
00443                                 {
00444                                         CFifoAccessor recvfifo( &receiveQueue() );
00445                                         recvfifo.value().display();
00446                                 }
00447                                 nlerror( "LNETL1: Invalid system event type in server receive queue" );
00448 
00449                         }
00450 
00451                         // Extract system event
00452                         {
00453                                 CFifoAccessor recvfifo( &receiveQueue() );
00454                                 recvfifo.value().pop();
00455                                 setDataAvailableFlag( ! recvfifo.value().empty() );
00456                         }
00457                 }
00458                 // _DataAvailable is false here
00459                 return false;
00460         }
00461 }

volatile bool NLNET::CBufNetBase::dataAvailableFlag  )  const [inline, protected, inherited]
 

Return _DataAvailable.

Definition at line 206 of file buf_net_base.h.

References NLNET::CBufNetBase::_DataAvailable.

Referenced by dataAvailable(), and NLNET::CBufClient::dataAvailable().

00206 { return _DataAvailable; }

void NLNET::CBufServer::disconnect TSockId  hostid,
bool  quick = false
 

Disconnect a connection Set hostid to InvalidSockId to disconnect all connections. If hostid is not InvalidSockId and the socket is not connected, the method does nothing. If quick is true, any pending data will not be sent before disconnecting.

Definition at line 251 of file buf_server.cpp.

References NLNET::CServerReceiveTask::_Connections, _ThreadPool, NLNET::CSock::connected(), NLNET::CTcpSock::disconnect(), NLNET::CBufSock::flush(), nlnettrace, receiveTask(), NLNET::CBufSock::Sock, and NLNET::TSockId.

00252 {
00253         nlnettrace( "CBufServer::disconnect" );
00254         if ( hostid != InvalidSockId )
00255         {
00256                 // Disconnect only if physically connected
00257                 if ( hostid->Sock->connected() )
00258                 {
00259                         if ( ! quick )
00260                         {
00261                                 hostid->flush();
00262                         }
00263                         hostid->Sock->disconnect(); // the connection will be removed by the next call of update()
00264                 }
00265         }
00266         else
00267         {
00268                 // Disconnect all
00269                 CThreadPool::iterator ipt;
00270                 {
00271                         CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
00272                         for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
00273                         {
00274                                 CServerReceiveTask *task = receiveTask(ipt);
00275                                 CConnections::iterator ipb;
00276                                 {
00277                                         CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
00278                                         for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
00279                                         {
00280                                                 if ( (*ipb)->Sock->connected() )
00281                                                 {
00282                                                         if ( ! quick )
00283                                                         {
00284                                                                 (*ipb)->flush();
00285                                                         }
00286                                                         (*ipb)->Sock->disconnect();
00287                                                 }
00288                                         }
00289                                 }
00290                         }
00291                 }
00292         }
00293 }

TNetCallback NLNET::CBufNetBase::disconnectionCallback  )  const [inline, protected, inherited]
 

Returns the disconnection callback.

Definition at line 157 of file buf_net_base.h.

References NLNET::CBufNetBase::_DisconnectionCallback, and NLNET::TNetCallback.

Referenced by dataAvailable(), and NLNET::CBufClient::dataAvailable().

00157 { return _DisconnectionCallback; }

void NLNET::CBufServer::dispatchNewSocket CServerBufSock bufsock  )  [protected]
 

Binds a new socket and send buffer to an existing or a new thread (that starts) Note: this method is called in the listening thread.

Definition at line 866 of file buf_server.cpp.

References _MaxSocketsPerThread, _MaxThreads, _ThreadPool, _ThreadStrategy, NLNET::CServerReceiveTask::addNewSocket(), addNewThread(), min, nldebug, nlnettrace, nlwarning, NLNET::CServerReceiveTask::numberOfConnections(), receiveTask(), NLNET::CServerBufSock::setOwnerTask(), SpreadSockets, and uint.

Referenced by NLNET::CListenTask::run().

00867 {
00868         nlnettrace( "CBufServer::dispatchNewSocket" );
00869 
00870         CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
00871         if ( _ThreadStrategy == SpreadSockets ) 
00872         {
00873                 // Find the thread with the smallest number of connections and check if all
00874                 // threads do not have the same number of connections
00875                 uint min = 0xFFFFFFFF;
00876                 uint max = 0;
00877                 CThreadPool::iterator ipt, iptmin, iptmax;
00878                 for ( iptmin=iptmax=ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
00879                 {
00880                         uint noc = receiveTask(ipt)->numberOfConnections();
00881                         if ( noc < min )
00882                         {
00883                                 min = noc;
00884                                 iptmin = ipt;
00885                         }
00886                         if ( noc > max )
00887                         {
00888                                 max = noc;
00889                                 iptmax = ipt;
00890                         }
00891                 }
00892 
00893                 // Check if we make the pool of threads grow (if we have not found vacant room
00894                 // and if it is allowed to)
00895                 if ( (poolsync.value().empty()) ||
00896                          ((min == max) && (poolsync.value().size() < _MaxThreads)) )
00897                 {
00898                         addNewThread( poolsync.value(), bufsock );
00899                 }
00900                 else
00901                 {
00902                         // Dispatch socket to an existing thread of the pool
00903                         CServerReceiveTask *task = receiveTask(iptmin);
00904                         bufsock->setOwnerTask( task );
00905                         task->addNewSocket( bufsock );
00906 #ifdef NL_OS_UNIX
00907                         task->wakeUp();
00908 #endif                  
00909                         
00910                         if ( min >= (uint)_MaxSocketsPerThread )
00911                         {
00912                                 nlwarning( "LNETL1: Exceeding the maximum number of sockets per thread" );
00913                         }
00914                         nldebug( "LNETL1: New socket dispatched to thread %d", iptmin-poolsync.value().begin() );
00915                 }
00916 
00917         }
00918         else // _ThreadStrategy == FillThreads
00919         {
00920                 CThreadPool::iterator ipt;
00921                 for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
00922                 {
00923                         uint noc = receiveTask(ipt)->numberOfConnections();
00924                         if ( noc < _MaxSocketsPerThread )
00925                         {
00926                                 break;
00927                         }
00928                 }
00929 
00930                 // Check if we have to make the thread pool grow (if we have not found vacant room)
00931                 if ( ipt == poolsync.value().end() )
00932                 {
00933                         if ( poolsync.value().size() == _MaxThreads )
00934                         {
00935                                 nlwarning( "LNETL1: Exceeding the maximum number of threads" );
00936                         }
00937                         addNewThread( poolsync.value(), bufsock );
00938                 }
00939                 else
00940                 {
00941                         // Dispatch socket to an existing thread of the pool
00942                         CServerReceiveTask *task = receiveTask(ipt);
00943                         bufsock->setOwnerTask( task );
00944                         task->addNewSocket( bufsock );
00945 #ifdef NL_OS_UNIX
00946                         task->wakeUp();
00947 #endif                  
00948                         nldebug( "LNETL1: New socket dispatched to thread %d", ipt-poolsync.value().begin() );
00949                 }
00950         }
00951 }

void NLNET::CBufNetBase::displayReceiveQueueStat NLMISC::CLog log = NLMISC::InfoLog  )  [inline, inherited]
 

Reimplemented in NLNET::CCallbackClient, and NLNET::CCallbackServer.

Definition at line 94 of file buf_net_base.h.

References NLNET::CBufNetBase::_RecvFifo, and NLNET::CFifoAccessor.

00095         {
00096                 CFifoAccessor syncfifo( &_RecvFifo );
00097                 syncfifo.value().displayStats(log);
00098         }

void NLNET::CBufServer::displaySendQueueStat NLMISC::CLog log = NLMISC::InfoLog,
TSockId  destid = InvalidSockId
 

Reimplemented in NLNET::CCallbackServer.

Definition at line 713 of file buf_server.cpp.

References NLNET::CServerReceiveTask::_Connections, _ThreadPool, NLMISC::CBufFIFO::displayStats(), receiveTask(), NLNET::CBufSock::SendFifo, and NLNET::TSockId.

00714 {
00715         if ( destid != InvalidSockId )
00716         {
00717                 destid->SendFifo.displayStats(log);
00718         }
00719         else
00720         {
00721                 // add all client buffers
00722                 
00723                 // For each thread
00724                 CThreadPool::iterator ipt;
00725                 {
00726                         CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
00727                         for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
00728                         {
00729                                 // For each thread of the pool
00730                                 CServerReceiveTask *task = receiveTask(ipt);
00731                                 CConnections::iterator ipb;
00732                                 {
00733                                         CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
00734                                         for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
00735                                         {
00736                                                 // For each socket of the thread, update sending
00737                                                 (*ipb)->SendFifo.displayStats(log);
00738                                         }
00739                                 }
00740                         }
00741                 }
00742         }
00743 }

void NLNET::CBufServer::displayThreadStat NLMISC::CLog log = NLMISC::InfoLog  ) 
 

Reimplemented in NLNET::CCallbackServer.

Definition at line 695 of file buf_server.cpp.

References _ListenTask, _ThreadPool, NLMISC::CLog::displayNL(), NLNET::CServerTask::NbLoop, and receiveTask().

00696 {
00697         // For each thread
00698         CThreadPool::iterator ipt;
00699         {
00700                 CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
00701                 for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
00702                 {
00703                         // For each thread of the pool
00704                         CServerReceiveTask *task = receiveTask(ipt);
00705                         // For each socket of the thread, update sending
00706                         log->displayNL ("server receive thread %p nbloop %d", task, task->NbLoop);
00707                 }
00708         }
00709 
00710         log->displayNL ("server listen thread %p nbloop %d", _ListenTask, _ListenTask->NbLoop);
00711 }

bool NLNET::CBufServer::flush TSockId  destid  )  [inline]
 

Force to send all data pending in the send queue.

Returns:
False if an error has occured (e.g. the remote host is disconnected). To retrieve the reason of the error, call CSock::getLastError() and/or CSock::errorString()

Reimplemented in NLNET::CCallbackServer.

Definition at line 232 of file buf_server.h.

References NLNET::CBufSock::flush(), nlassert, and NLNET::TSockId.

00232 { nlassert( destid != InvalidSockId ); return destid->flush(); }

uint32 NLNET::CBufNetBase::getReceiveQueueSize  )  [inline, inherited]
 

Returns the size of the receive queue (mutexed).

Reimplemented in NLNET::CCallbackClient, and NLNET::CCallbackServer.

Definition at line 88 of file buf_net_base.h.

References NLNET::CBufNetBase::_RecvFifo, NLNET::CFifoAccessor, and uint32.

00089         {
00090                 CFifoAccessor syncfifo( &_RecvFifo );
00091                 return syncfifo.value().size();
00092         }

uint32 NLNET::CBufServer::getSendQueueSize TSockId  destid  ) 
 

Definition at line 660 of file buf_server.cpp.

References NLNET::CServerReceiveTask::_Connections, _ThreadPool, receiveTask(), NLNET::CBufSock::SendFifo, NLMISC::CBufFIFO::size(), NLNET::TSockId, and uint32.

00661 {
00662         if ( destid != InvalidSockId )
00663         {
00664                 return destid->SendFifo.size();
00665         }
00666         else
00667         {
00668                 // add all client buffers
00669 
00670                 uint32 total = 0;
00671 
00672                 // For each thread
00673                 CThreadPool::iterator ipt;
00674                 {
00675                         CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
00676                         for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
00677                         {
00678                                 // For each thread of the pool
00679                                 CServerReceiveTask *task = receiveTask(ipt);
00680                                 CConnections::iterator ipb;
00681                                 {
00682                                         CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
00683                                         for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
00684                                         {
00685                                                 // For each socket of the thread, update sending
00686                                                 total = (*ipb)->SendFifo.size ();
00687                                         }
00688                                 }
00689                         }
00690                 }
00691                 return total;
00692         }
00693 }

const CInetAddress& NLNET::CBufServer::hostAddress TSockId  hostid  )  [inline]
 

Returns the address of the specified host.

Reimplemented in NLNET::CCallbackServer.

Definition at line 241 of file buf_server.h.

References nlassert, NLNET::CSock::remoteAddr(), NLNET::CBufSock::Sock, and NLNET::TSockId.

00241 { nlassert( hostid != InvalidSockId ); return hostid->Sock->remoteAddr(); }

void NLNET::CBufServer::init uint16  port  ) 
 

Listens on the specified port.

Definition at line 93 of file buf_server.cpp.

References _ListenTask, _ListenThread, _ReplayMode, NLNET::CListenTask::init(), NLNET::CBufNetBase::maxExpectedBlockSize(), nldebug, nlnettrace, NLMISC::IThread::start(), and uint16.

Referenced by NLNET::CNetManager::addServer(), and NLNET::CUnifiedNetwork::init().

00094 {
00095         nlnettrace( "CBufServer::init" );
00096         if ( ! _ReplayMode )
00097         {
00098                 _ListenTask->init( port, maxExpectedBlockSize() );
00099                 _ListenThread->start();
00100         }
00101         else
00102         {
00103                 nldebug( "LNETL1: Binding listen socket to any address, port %hu", port );
00104         }
00105 }

const CInetAddress& NLNET::CBufServer::listenAddress  )  const [inline]
 

Returns the internet address of the listening socket.

Definition at line 238 of file buf_server.h.

References _ListenTask, and NLNET::CListenTask::localAddr().

Referenced by NLNET::CLoginServer::init(), NLNET::NLMISC_DYNVARIABLE(), and NLNET::setListenAddress().

00238 { return _ListenTask->localAddr(); }

uint32 NLNET::CBufNetBase::maxExpectedBlockSize  )  const [inline, inherited]
 

Returns the max size of the received messages (default: 2^31-1).

Definition at line 135 of file buf_net_base.h.

References NLNET::CBufNetBase::_MaxExpectedBlockSize, and uint32.

Referenced by NLNET::CBufClient::connect(), and init().

00136         {
00137                 return _MaxExpectedBlockSize;
00138         }

uint32 NLNET::CBufNetBase::maxSentBlockSize  )  const [inline, inherited]
 

Returns the max size of the sent messages (default: 2^31-1).

Definition at line 141 of file buf_net_base.h.

References NLNET::CBufNetBase::_MaxSentBlockSize, and uint32.

Referenced by send(), and NLNET::CBufClient::send().

00142         {
00143                 return _MaxSentBlockSize;
00144         }

uint32 NLNET::CBufServer::nbConnections  )  const [inline]
 

Returns the number of connections (at the last update()).

Definition at line 267 of file buf_server.h.

References _NbConnections, and uint32.

Referenced by NLNET::CCallbackServer::send().

00267 { return _NbConnections; }

uint64 NLNET::CBufServer::newBytesReceived  ) 
 

Returns the number of bytes popped by receive() since the previous call to this method.

Definition at line 749 of file buf_server.cpp.

References _PrevBytesPoppedIn, bytesReceived(), and uint64.

00750 {
00751         uint64 b = bytesReceived();
00752         uint64 nbrecvd = b - _PrevBytesPoppedIn;
00753         //nlinfo( "b: %"NL_I64"u   new: %"NL_I64"u", b, nbrecvd );
00754         _PrevBytesPoppedIn = b;
00755         return nbrecvd;
00756 }

uint64 NLNET::CBufServer::newBytesSent  ) 
 

Returns the number of bytes pushed by send() since the previous call to this method.

Definition at line 761 of file buf_server.cpp.

References _PrevBytesPushedOut, bytesSent(), and uint64.

00762 {
00763         uint64 b = bytesSent();
00764         uint64 nbsent = b - _PrevBytesPushedOut;
00765         //nlinfo( "b: %"NL_I64"u   new: %"NL_I64"u", b, nbsent );
00766         _PrevBytesPushedOut = b;
00767         return nbsent;
00768 }

bool NLNET::CBufServer::noDelay  )  const [inline, protected]
 

Returns the TCP_NODELAY flag.

Definition at line 276 of file buf_server.h.

Referenced by NLNET::CListenTask::run().

00276 { return _NoDelay; }

void NLNET::CBufServer::pushBufferToHost const NLMISC::CMemStream buffer,
TSockId  hostid
[inline, protected]
 

Pushes a buffer to the specified host's send queue and update (unless not connected).

Definition at line 298 of file buf_server.h.

References _BytesPushedOut, buffer, nlassert, NLNET::CBufSock::pushBuffer(), NLNET::TBlockSize, and NLNET::TSockId.

Referenced by send().

00299         {
00300                 nlassert( hostid != InvalidSockId );
00301                 if ( hostid->pushBuffer( buffer ) )
00302                 {
00303                         _BytesPushedOut += buffer.length() + sizeof(TBlockSize); // statistics
00304                 }
00305         }

void NLNET::CBufNetBase::pushMessageIntoReceiveQueue const uint8 buffer,
uint32  size
[inline, protected, inherited]
 

Definition at line 183 of file buf_net_base.h.

References NLNET::CBufNetBase::_RecvFifo, buffer, NLNET::CFifoAccessor, NLNET::CBufNetBase::setDataAvailableFlag(), size, uint32, and uint8.

00184         {
00185                 //sint32 mbsize;
00186                 {
00187                         //nldebug( "BNB: Acquiring the receive queue... ");
00188                         CFifoAccessor recvfifo( &_RecvFifo );
00189                         //nldebug( "BNB: Acquired, pushing the received buffer... ");
00190                         recvfifo.value().push( buffer, size );
00191                         //nldebug( "BNB: Pushed, releasing the receive queue..." );
00192                         //mbsize = recvfifo.value().size() / 1048576;
00193                         setDataAvailableFlag( true );
00194                 }
00195                 //nldebug( "BNB: Released." );
00196                 /*if ( mbsize > 1 )
00197                 {
00198                         nlwarning( "The receive queue size exceeds %d MB", mbsize );
00199                 }*/
00200         }

void NLNET::CBufNetBase::pushMessageIntoReceiveQueue const std::vector< uint8 > &  buffer  )  [inline, protected, inherited]
 

Push message into receive queue (mutexed).

Definition at line 164 of file buf_net_base.h.

References NLNET::CBufNetBase::_RecvFifo, buffer, NLNET::CFifoAccessor, and NLNET::CBufNetBase::setDataAvailableFlag().

Referenced by NLNET::CBufSock::advertiseSystemEvent(), and NLNET::CClientReceiveTask::run().

00165         {
00166                 //sint32 mbsize;
00167                 {
00168                         //nldebug( "BNB: Acquiring the receive queue... ");
00169                         CFifoAccessor recvfifo( &_RecvFifo );
00170                         //nldebug( "BNB: Acquired, pushing the received buffer... ");
00171                         recvfifo.value().push( buffer );
00172                         //nldebug( "BNB: Pushed, releasing the receive queue..." );
00173                         //mbsize = recvfifo.value().size() / 1048576;
00174                         setDataAvailableFlag( true );
00175                 }
00176                 //nldebug( "BNB: Released." );
00177                 //if ( mbsize > 1 )
00178                 //{
00179                 //      nlwarning( "The receive queue size exceeds %d MB", mbsize );
00180                 //}
00181         }

void NLNET::CBufServer::receive NLMISC::CMemStream buffer,
TSockId hostid
 

Receives next block of data in the specified (resizes the vector) You must call dataAvailable() before every call to receive()

Definition at line 565 of file buf_server.cpp.

References _BytesPoppedIn, buffer, NLNET::CFifoAccessor, nlassert, NLMISC_BSWAP32, nlnettrace, nlwarning, NLNET::CBufNetBase::receiveQueue(), NLNET::CBufNetBase::setDataAvailableFlag(), NLNET::TBlockSize, NLNET::TSockId, and uint32.

00566 {
00567         nlnettrace( "CBufServer::receive" );
00568         //nlassert( dataAvailable() );
00569         nlassert( phostid != NULL );
00570         {
00571                 CFifoAccessor recvfifo( &receiveQueue() );
00572                 nlassert( ! recvfifo.value().empty() );
00573                 recvfifo.value().front( buffer );
00574                 recvfifo.value().pop();
00575                 setDataAvailableFlag( ! recvfifo.value().empty() );
00576         }
00577 
00578         // Extract hostid (and event type)
00579         *phostid = *((TSockId*)&(buffer.buffer()[buffer.size()-sizeof(TSockId)-1]));
00580         nlassert( buffer.buffer()[buffer.size()-1] == CBufNetBase::User );
00581 
00582         // debug features, we number all packet to be sure that they are all sent and received
00583         // \todo remove this debug feature when ok
00584 #ifdef NL_BIG_ENDIAN
00585         uint32 val = NLMISC_BSWAP32(*(uint32*)buffer.buffer());
00586 #else
00587         uint32 val = *(uint32*)buffer.buffer();
00588 #endif
00589 
00590         //      nldebug ("receive message number %u", val);
00591         if ((*phostid)->ReceiveNextValue != val)
00592         {
00593                 nlwarning ("LNETL1: !!!LOST A MESSAGE!!! I received the message number %u but I'm waiting the message number %u (cnx %s), warn lecroart@nevrax.com with the log now please", val, (*phostid)->ReceiveNextValue, (*phostid)->asString().c_str());
00594                 // resync the message number
00595                 (*phostid)->ReceiveNextValue = val;
00596         }
00597 
00598         (*phostid)->ReceiveNextValue++;
00599 
00600         buffer.resize( buffer.size()-sizeof(TSockId)-1 );
00601 
00602         // TODO OPTIM remove the nldebug for speed
00603         //commented for optimisation nldebug( "LNETL1: Read buffer (%d+%d B) from %s", buffer.size(), sizeof(TSockId)+1, /*stringFromVector(buffer).c_str(), */(*phostid)->asString().c_str() );
00604 
00605         // Statistics
00606         _BytesPoppedIn += buffer.size() + sizeof(TBlockSize);
00607 }

CSynchronizedFIFO& NLNET::CBufNetBase::receiveQueue  )  [inline, protected, inherited]
 

Access to the receive queue.

Definition at line 154 of file buf_net_base.h.

References NLNET::CBufNetBase::_RecvFifo, and NLNET::CSynchronizedFIFO.

Referenced by dataAvailable(), NLNET::CBufClient::dataAvailable(), NLNET::CBufClient::disconnect(), receive(), NLNET::CBufClient::receive(), and NLNET::CServerReceiveTask::run().

00154 { return _RecvFifo; }

CServerReceiveTask* NLNET::CBufServer::receiveTask std::vector< NLMISC::IThread * >::iterator  ipt  )  [inline, protected]
 

Returns the receive task corresponding to a particular thread.

Definition at line 284 of file buf_server.h.

Referenced by disconnect(), dispatchNewSocket(), displaySendQueueStat(), displayThreadStat(), getSendQueueSize(), send(), update(), and ~CBufServer().

00285         {
00286                 return ((CServerReceiveTask*)((*ipt)->getRunnable()));
00287         }

void NLNET::CBufServer::send const NLMISC::CMemStream buffer,
TSockId  hostid
 

Send a message to the specified host, or to all hosts if hostid is InvalidSockId

Reimplemented in NLNET::CCallbackServer.

Definition at line 299 of file buf_server.cpp.

References NLNET::CServerReceiveTask::_Connections, _ThreadPool, buffer, NLNET::CBufNetBase::maxSentBlockSize(), nlassert, nlassertex, NLMISC_BSWAP32, nlnettrace, pushBufferToHost(), receiveTask(), NLNET::CBufSock::SendNextValue, NLNET::TSockId, and uint32.

00300 {
00301         nlnettrace( "CBufServer::send" );
00302         nlassert( buffer.length() > 0 );
00303         nlassertex( buffer.length() <= maxSentBlockSize(), ("length=%u max=%u", buffer.length(), maxSentBlockSize()) );
00304 
00305         // slow down the layer H_AUTO (CBufServer_send);
00306 
00307         if ( hostid != InvalidSockId )
00308         {
00309                 // debug features, we number all packet to be sure that they are all sent and received
00310                 // \todo remove this debug feature when ok
00311 //              nldebug ("send message number %u", hostid->SendNextValue);
00312 #ifdef NL_BIG_ENDIAN
00313                 uint32 val = NLMISC_BSWAP32(hostid->SendNextValue);
00314 #else
00315                 uint32 val = hostid->SendNextValue;
00316 #endif
00317 
00318                 *(uint32*)buffer.buffer() = val;
00319                 hostid->SendNextValue++;
00320 
00321                 pushBufferToHost( buffer, hostid );
00322         }
00323         else
00324         {
00325                 // Push into all send queues
00326                 CThreadPool::iterator ipt;
00327                 {
00328                         CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
00329                         for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
00330                         {
00331                                 CServerReceiveTask *task = receiveTask(ipt);
00332                                 CConnections::iterator ipb;
00333                                 {
00334                                         CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
00335                                         for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
00336                                         {
00337                                                 // Send only if the socket is logically connected
00338                                                 if ( (*ipb)->connectedState() ) 
00339                                                 {
00340                                                         // debug features, we number all packet to be sure that they are all sent and received
00341                                                         // \todo remove this debug feature when ok
00342 //                                                      nldebug ("send message number %u", (*ipb)->SendNextValue);
00343 #ifdef NL_BIG_ENDIAN
00344                                                         uint32 val = NLMISC_BSWAP32((*ipb)->SendNextValue);
00345 #else
00346                                                         uint32 val = (*ipb)->SendNextValue;
00347 #endif
00348                                                         *(uint32*)buffer.buffer() = val;
00349                                                         (*ipb)->SendNextValue++;
00350 
00351                                                         pushBufferToHost( buffer, *ipb );
00352                                                 }
00353                                         }
00354                                 }
00355                         }
00356                 }
00357         }
00358 }

void NLNET::CBufServer::setConnectionCallback TNetCallback  cb,
void *  arg
[inline]
 

Sets callback for incoming connections (or NULL to disable callback).

Reimplemented in NLNET::CCallbackServer.

Definition at line 184 of file buf_server.h.

References _ConnectionCallback, _ConnectionCbArg, and NLNET::TNetCallback.

00184 { _ConnectionCallback = cb; _ConnectionCbArg = arg; }

void NLNET::CBufNetBase::setDataAvailableFlag bool  da  )  [inline, protected, inherited]
 

Sets _DataAvailable.

Definition at line 203 of file buf_net_base.h.

References NLNET::CBufNetBase::_DataAvailable.

Referenced by dataAvailable(), NLNET::CBufClient::dataAvailable(), NLNET::CBufClient::disconnect(), NLNET::CBufNetBase::pushMessageIntoReceiveQueue(), receive(), NLNET::CBufClient::receive(), and NLNET::CServerReceiveTask::run().

00203 { _DataAvailable = da; }

void NLNET::CBufNetBase::setDisconnectionCallback TNetCallback  cb,
void *  arg
[inline, inherited]
 

Sets callback for detecting a disconnection (or NULL to disable callback).

Reimplemented in NLNET::CCallbackClient, and NLNET::CCallbackServer.

Definition at line 85 of file buf_net_base.h.

References NLNET::CBufNetBase::_DisconnectionCallback, NLNET::CBufNetBase::_DisconnectionCbArg, and NLNET::TNetCallback.

00085 { _DisconnectionCallback = cb; _DisconnectionCbArg = arg; }

void NLNET::CBufNetBase::setMaxExpectedBlockSize sint32  limit  )  [inline, inherited]
 

Sets the max size of the received messages. If receiving a message bigger than the limit, the connection will be dropped.

Default value: 1 MegaByte If you put a negative number as limit, the max size is reset to the default value. Warning: you can call this method only at initialization time, before connecting (for a client) or calling init() (for a server) !

Definition at line 109 of file buf_net_base.h.

References NLNET::CBufNetBase::_MaxExpectedBlockSize, sint32, and uint32.

00110         {
00111                 if ( limit < 0 )
00112                         _MaxExpectedBlockSize = 1048576;
00113                 else
00114                         _MaxExpectedBlockSize = (uint32)limit;
00115         }

void NLNET::CBufNetBase::setMaxSentBlockSize sint32  limit  )  [inline, inherited]
 

Sets the max size of the sent messages. Note: Limiting of sending not implemented, currently

Default value: 1 MegaByte If you put a negative number as limit, the max size is reset to the default value. Warning: you can call this method only at initialization time, before connecting (for a client) or calling init() (for a server) !

Definition at line 126 of file buf_net_base.h.

References NLNET::CBufNetBase::_MaxSentBlockSize, sint32, and uint32.

00127         {
00128                 if ( limit < 0 )
00129                         _MaxSentBlockSize = 1048576;
00130                 else
00131                         _MaxSentBlockSize = (uint32)limit;
00132         }

void NLNET::CBufServer::setSizeFlushTrigger TSockId  destid,
sint32  size
[inline]
 

Sets the size flush trigger. When the size of the send queue reaches or exceeds this value, all data in the send queue is automatically sent (-1 to disable this trigger )

Definition at line 226 of file buf_server.h.

References nlassert, NLNET::CBufSock::setSizeFlushTrigger(), sint32, size, and NLNET::TSockId.

00226 { nlassert( destid != InvalidSockId ); destid->setSizeFlushTrigger( size ); }

void NLNET::CBufServer::setTimeFlushTrigger TSockId  destid,
sint32  ms
[inline]
 

Sets the time flush trigger (in millisecond). When this time is elapsed, all data in the send queue is automatically sent (-1 to disable this trigger)

Definition at line 221 of file buf_server.h.

References nlassert, NLNET::CBufSock::setTimeFlushTrigger(), sint32, and NLNET::TSockId.

00221 { nlassert( destid != InvalidSockId ); destid->setTimeFlushTrigger( ms ); }

void NLNET::CBufServer::update  ) 
 

Update the network (call this method evenly).

Definition at line 613 of file buf_server.cpp.

References NLNET::CServerReceiveTask::_Connections, _NbConnections, _ThreadPool, nldebug, and receiveTask().

00614 {
00615         //nlnettrace( "CBufServer::update-BEGIN" );
00616 
00617         _NbConnections = 0;
00618 
00619         // For each thread
00620         CThreadPool::iterator ipt;
00621         {
00622           //nldebug( "UPD: Acquiring the Thread Pool" );
00623                 CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
00624                 //nldebug( "UPD: Acquired." );
00625                 for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
00626                 {
00627                         // For each thread of the pool
00628                         CServerReceiveTask *task = receiveTask(ipt);
00629                         CConnections::iterator ipb;
00630                         {
00631                                 CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
00632                                 for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
00633                                 {
00634                                     // For each socket of the thread, update sending
00635                                     if ( ! ((*ipb)->Sock->connected() && (*ipb)->update()) )
00636                                     {
00637                                                 // Update did not work or the socket is not connected anymore
00638                                         nldebug( "LNETL1: Socket %s is disconnected", (*ipb)->asString().c_str() );
00639                                                 // Disconnection event if disconnected (known either from flush (in update) or when receiving data)
00640                                                 (*ipb)->advertiseDisconnection( this, *ipb );
00641                                         
00642                                                 /*if ( (*ipb)->advertiseDisconnection( this, *ipb ) )
00643                                                 {
00644                                                         // Now the connection removal is in dataAvailable()
00645                                                         // POLL6
00646                                                 }*/
00647                                     }
00648                                     else
00649                                     {
00650                                                 _NbConnections++;
00651                                     }
00652                                 }
00653                         }
00654                 }
00655         }
00656 
00657         //nlnettrace( "CBufServer::update-END" );
00658 }


Friends And Related Function Documentation

friend class CListenTask [friend]
 

Definition at line 272 of file buf_server.h.

Referenced by CBufServer().

friend class CServerBufSock [friend]
 

Definition at line 271 of file buf_server.h.

friend class CServerReceiveTask [friend]
 

Definition at line 273 of file buf_server.h.

Referenced by addNewThread().

friend class NLNET::CBufSock [friend, inherited]
 

Definition at line 148 of file buf_net_base.h.


Field Documentation

uint64 NLNET::CBufServer::_BytesPoppedIn [private]
 

Number of bytes popped by receive() since the beginning.

Definition at line 358 of file buf_server.h.

Referenced by bytesReceived(), and receive().

uint64 NLNET::CBufServer::_BytesPushedOut [private]
 

Number of bytes pushed by send() since the beginning.

Definition at line 355 of file buf_server.h.

Referenced by bytesSent(), and pushBufferToHost().

TNetCallback NLNET::CBufServer::_ConnectionCallback [private]
 

Connection callback.

Reimplemented in NLNET::CCallbackServer.

Definition at line 349 of file buf_server.h.

Referenced by connectionCallback(), and setConnectionCallback().

void* NLNET::CBufServer::_ConnectionCbArg [private]
 

Argument of the connection callback.

Reimplemented in NLNET::CCallbackServer.

Definition at line 352 of file buf_server.h.

Referenced by argOfConnectionCallback(), and setConnectionCallback().

CListenTask* NLNET::CBufServer::_ListenTask [private]
 

Listen task.

Definition at line 335 of file buf_server.h.

Referenced by CBufServer(), displayThreadStat(), init(), listenAddress(), and ~CBufServer().

NLMISC::IThread* NLNET::CBufServer::_ListenThread [private]
 

Listen thread.

Definition at line 338 of file buf_server.h.

Referenced by CBufServer(), init(), and ~CBufServer().

uint16 NLNET::CBufServer::_MaxSocketsPerThread [private]
 

Max number of sockets handled by one thread.

Definition at line 332 of file buf_server.h.

Referenced by dispatchNewSocket().

uint16 NLNET::CBufServer::_MaxThreads [private]
 

Max number of threads.

Definition at line 329 of file buf_server.h.

Referenced by dispatchNewSocket().

uint32 NLNET::CBufServer::_NbConnections [private]
 

Number of connections (debug stat).

Definition at line 367 of file buf_server.h.

Referenced by nbConnections(), and update().

bool NLNET::CBufServer::_NoDelay [private]
 

TCP_NODELAY.

Definition at line 323 of file buf_server.h.

uint64 NLNET::CBufServer::_PrevBytesPoppedIn [private]
 

Previous number of bytes received.

Definition at line 361 of file buf_server.h.

Referenced by newBytesReceived().

uint64 NLNET::CBufServer::_PrevBytesPushedOut [private]
 

Previous number of bytes sent.

Definition at line 364 of file buf_server.h.

Referenced by newBytesSent().

bool NLNET::CBufServer::_ReplayMode [private]
 

Replay mode flag.

Definition at line 370 of file buf_server.h.

Referenced by CBufServer(), init(), and ~CBufServer().

NLMISC::CSynchronized<CThreadPool> NLNET::CBufServer::_ThreadPool [private]
 

Definition at line 346 of file buf_server.h.

Referenced by disconnect(), dispatchNewSocket(), displaySendQueueStat(), displayThreadStat(), getSendQueueSize(), send(), update(), and ~CBufServer().

TThreadStategy NLNET::CBufServer::_ThreadStrategy [private]
 

Thread socket-handling strategy.

Definition at line 326 of file buf_server.h.

Referenced by dispatchNewSocket().


The documentation for this class was generated from the following files:
Generated on Tue Mar 16 13:53:23 2004 for NeL by doxygen 1.3.6