NLNET::CCallbackServer Class Reference

#include <callback_server.h>

Inheritance diagram for NLNET::CCallbackServer:

NLNET::CCallbackNetBase NLNET::CBufServer NLNET::CBufNetBase

Detailed Description

Server class for layer 3
Author:
Vianney Lecroart

Nevrax France

Date:
2001

Definition at line 44 of file callback_server.h.

Public Types

enum  TEventType { User = 'U', Connection = 'C', Disconnection = 'D' }
 Type of incoming events (max 256). More...

enum  TRecordingState { Off, Record, Replay }
enum  TThreadStategy { SpreadSockets, FillThreads }

Public Member Functions

void addCallbackArray (const TCallbackItem *callbackarray, NLMISC::CStringIdArray::TStringId arraysize)
void authorizeOnly (const char *callbackName, TSockId hostid=InvalidSockId)
uint64 bytesReceived () const
 Returns the number of bytes popped by receive() since the beginning.

uint64 bytesSent () const
 Returns the number of bytes pushed by send() since the beginning.

 CCallbackServer (TRecordingState rec=Off, const std::string &recfilename="", bool recordall=true)
 Constructor.

bool connected () const
 Returns true if the connection is still connected. on server, we always "connected".

void disconnect (TSockId hostid, bool quick=false)
void disconnect (TSockId hostid)
void displayAllMyAssociations ()
void displayReceiveQueueStat (NLMISC::CLog *log=NLMISC::InfoLog)
void displaySendQueueStat (NLMISC::CLog *log=NLMISC::InfoLog, TSockId destid=InvalidSockId)
void displayThreadStat (NLMISC::CLog *log=NLMISC::InfoLog)
bool flush (TSockId destid)
 Force to send all data pending in the send queue.

uint64 getBytesReceived ()
uint64 getBytesSent ()
uint64 getReceiveQueueSize ()
uint32 getSendQueueSize (TSockId destid)
uint64 getSendQueueSize ()
NLMISC::CStringIdArraygetSIDA ()
 Use this function to get the String ID Array needed when you want to create a message.

virtual TSockId getSockId (TSockId hostid=InvalidSockId)
 Returns the sockid (cf. CCallbackClient).

const CInetAddresshostAddress (TSockId hostid)
 Returns the address of the specified host.

void ignoreAllUnknownId (bool b)
void init (uint16 port)
 Listens on the specified port.

bool isAServer () const
 Returns true if this is a CCallbackServer.

const CInetAddresslistenAddress () const
 Returns the internet address of the listening socket.

uint32 maxExpectedBlockSize () const
 Returns the max size of the received messages (default: 2^31-1).

uint32 maxSentBlockSize () const
 Returns the max size of the sent messages (default: 2^31-1).

uint32 nbConnections () const
 Returns the number of connections (at the last update()).

uint64 newBytesReceived ()
 Returns the number of bytes popped by receive() since the previous call to this method.

uint64 newBytesSent ()
 Returns the number of bytes pushed by send() since the previous call to this method.

void receive (NLMISC::CMemStream &buffer, TSockId *hostid)
void send (const CMessage &buffer, TSockId hostid, bool log=true)
 Sends a message to the specified host.

void setConnectionCallback (TNetCallback cb, void *arg)
 Sets callback for incoming connections (or NULL to disable callback).

void setDefaultCallback (TMsgCallback defaultCallback)
 Sets default callback for unknown message types.

void setDisconnectionCallback (TNetCallback cb, void *arg)
 Sets callback for disconnections (or NULL to disable callback).

void setMaxExpectedBlockSize (sint32 limit)
void setMaxSentBlockSize (sint32 limit)
void setOtherSideAssociations (const char **associationarray, NLMISC::CStringIdArray::TStringId arraysize)
void setSizeFlushTrigger (TSockId destid, sint32 size)
void setTimeFlushTrigger (TSockId destid, sint32 ms)
void update ()
 Update the network (call this method evenly).

void update (sint32 timeout=0)
 Updates the network (call this method evenly).


Protected Member Functions

void addNewThread (CThreadPool &threadpool, CServerBufSock *bufsock)
void * argOfConnectionCallback () const
 Returns the argument of the connection callback.

void * argOfDisconnectionCallback () const
 Returns the argument of the disconnection callback.

void baseUpdate (sint32 timeout=0)
 Used by client and server class.

void checkThreadId () const
TNetCallback connectionCallback () const
 Returns the connection callback.

volatile bool dataAvailableFlag () const
 Return _DataAvailable.

TNetCallback disconnectionCallback () const
 Returns the disconnection callback.

void dispatchNewSocket (CServerBufSock *bufsock)
bool noDelay () const
 Returns the TCP_NODELAY flag.

void processOneMessage ()
 Read a message from the network and process it.

void pushBufferToHost (const NLMISC::CMemStream &buffer, TSockId hostid)
 Pushes a buffer to the specified host's send queue and update (unless not connected).

void pushMessageIntoReceiveQueue (const uint8 *buffer, uint32 size)
void pushMessageIntoReceiveQueue (const std::vector< uint8 > &buffer)
 Push message into receive queue (mutexed).

CSynchronizedFIFOreceiveQueue ()
 Access to the receive queue.

CServerReceiveTaskreceiveTask (std::vector< NLMISC::IThread * >::iterator ipt)
 Returns the receive task corresponding to a particular thread.

void setDataAvailableFlag (bool da)
 Sets _DataAvailable.


Protected Attributes

uint64 _BytesReceived
uint64 _BytesSent
std::vector< TCallbackItem_CallbackArray
TMsgCallback _DefaultCallback
bool _FirstUpdate
NLMISC::CStringIdArray _InputSIDA
bool _IsAServer
TNetCallback _NewDisconnectionCallback
 Used by client and server class.

NLMISC::CStringIdArray _OutputSIDA
uint _ThreadId

Private Member Functions

bool dataAvailable ()
void receive (CMessage &buffer, TSockId *hostid)
 On this layer, you can't call directly receive, It s the update() function that receive and call your callaback.

void send (const NLMISC::CMemStream &buffer, TSockId hostid)
 This function is public in the base class and put it private here because user cannot use it in layer 2.

void sendAllMyAssociations (TSockId to)
 Used by client and server class.


Private Attributes

TNetCallback _ConnectionCallback
 Connection callback.

void * _ConnectionCbArg
 Argument of the connection callback.


Friends

void cbsNewConnection (TSockId from, void *data)
class CListenTask
class CServerBufSock
class CServerReceiveTask
class NLNET::CBufSock


Member Enumeration Documentation

enum NLNET::CBufNetBase::TEventType [inherited]
 

Type of incoming events (max 256).

Enumeration values:
User 
Connection 
Disconnection 

Definition at line 79 of file buf_net_base.h.

00079 { User = 'U', Connection = 'C', Disconnection = 'D' };

enum NLNET::CCallbackNetBase::TRecordingState [inherited]
 

Enumeration values:
Off 
Record 
Replay 

Definition at line 163 of file callback_net_base.h.

00163 { Off, Record, Replay };

enum NLNET::CBufServer::TThreadStategy [inherited]
 

Enumeration values:
SpreadSockets 
FillThreads 

Definition at line 160 of file buf_server.h.


Constructor & Destructor Documentation

NLNET::CCallbackServer::CCallbackServer TRecordingState  rec = Off,
const std::string &  recfilename = "",
bool  recordall = true
 

Constructor.

Definition at line 70 of file callback_server.cpp.

References NLNET::cbsNewConnection(), DEFAULT_MAX_SOCKETS_PER_THREADS, DEFAULT_MAX_THREADS, DEFAULT_STRATEGY, and nlassertex.

00070                                                                                                  :
00071         CCallbackNetBase( rec, recfilename, recordall ),
00072         CBufServer( DEFAULT_STRATEGY, DEFAULT_MAX_THREADS, DEFAULT_MAX_SOCKETS_PER_THREADS, true, rec==Replay ),
00073         _ConnectionCallback(NULL),
00074         _ConnectionCbArg(NULL)
00075 {
00076 #ifndef USE_MESSAGE_RECORDER
00077         nlassertex( rec==Off, ("LNETL3S: The message recorder is disabled at compilation time ; switch the recording state Off") );
00078 #endif
00079         
00080         CBufServer::setDisconnectionCallback (_NewDisconnectionCallback, this);
00081         CBufServer::setConnectionCallback (cbsNewConnection, this);
00082 
00083         _IsAServer = true;
00084         _DefaultCallback = NULL;
00085 }


Member Function Documentation

void NLNET::CCallbackNetBase::addCallbackArray const TCallbackItem callbackarray,
NLMISC::CStringIdArray::TStringId  arraysize
[inherited]
 

Appends callback array with the specified array. You can add callback only *after* adding the server or the client.

Parameters:
arraysize is the number of callback items.

Definition at line 184 of file callback_net_base.cpp.

References NLNET::CCallbackNetBase::_CallbackArray, NLNET::CCallbackNetBase::_OutputSIDA, NLMISC::CStringIdArray::addString(), NLNET::TCallbackItem::Callback, NLNET::CCallbackNetBase::checkThreadId(), nlassert, NLMISC::CStringIdArray::resize(), sint, NLMISC::CStringIdArray::size(), NLMISC::CStringIdArray::TStringId, and uint.

Referenced by NLNET::CCallbackClient::CCallbackClient(), NLNET::CCallbackNetBase::CCallbackNetBase(), NLNET::CPacsClient::connect(), NLNET::CNamingClient::connect(), NLNET::CLoginClient::connectToShard(), NLNET::CUnifiedNetwork::init(), NLNET::CLoginServer::init(), and NLNET::_CUniTime::installServer().

00185 {
00186         checkThreadId ();
00187 
00188         // be sure that the 2 array have the same size
00189         nlassert (_CallbackArray.size () == (uint)_OutputSIDA.size ());
00190 
00191         if (arraysize == 1 && callbackarray[0].Callback == NULL && string("") == callbackarray[0].Key)
00192         {
00193                 // it's an empty array, ignore it
00194                 return;
00195         }
00196 
00197         // resize the array
00198         sint oldsize = _CallbackArray.size();
00199 
00200         _CallbackArray.resize (oldsize + arraysize);
00201         _OutputSIDA.resize (oldsize + arraysize);
00202 
00203 //TOO MUCH MESSAGE      nldebug ("L3NB_CB: Adding %d callback to the array", arraysize);
00204 
00205         for (sint i = 0; i < arraysize; i++)
00206         {
00207                 CStringIdArray::TStringId ni = oldsize + i;
00208 //TOO MUCH MESSAGE              nldebug ("L3NB_CB: Adding callback to message '%s', id '%d'", callbackarray[i].Key, ni);
00209                 // copy callback value
00210                 
00211                 _CallbackArray[ni] = callbackarray[i];
00212                 // add the string to the string id array
00213                 _OutputSIDA.addString (callbackarray[i].Key, ni);
00214 
00215         }
00216 //      nldebug ("LNETL3NB_CB: Added %d callback Now, there're %d callback associated with message type", arraysize, _CallbackArray.size ());
00217 }

void NLNET::CBufServer::addNewThread CThreadPool threadpool,
CServerBufSock bufsock
[protected, inherited]
 

Definition at line 958 of file buf_server.cpp.

References NLNET::CServerReceiveTask::addNewSocket(), NLNET::CBufServer::CServerReceiveTask, NLNET::CThreadPool, nlassert, nldebug, nlnettrace, and NLNET::CServerBufSock::setOwnerTask().

Referenced by NLNET::CBufServer::dispatchNewSocket().

00959 {
00960         nlnettrace( "CBufServer::addNewThread" );
00961         nlassert( bufsock != NULL );
00962 
00963         // Create new task and dispatch the socket to it
00964         CServerReceiveTask *task = new CServerReceiveTask( this );
00965         bufsock->setOwnerTask( task );
00966         task->addNewSocket( bufsock );
00967 
00968         // Add a new thread to the pool, with this task
00969         IThread *thr = IThread::create( task );
00970         {
00971                 threadpool.push_back( thr );
00972                 thr->start();
00973                 nldebug( "LNETL1: Added a new thread; pool size is %d", threadpool.size() );
00974                 nldebug( "LNETL1: New socket dispatched to thread %d", threadpool.size()-1 );
00975         }
00976 }

void* NLNET::CBufServer::argOfConnectionCallback  )  const [inline, protected, inherited]
 

Returns the argument of the connection callback.

Definition at line 314 of file buf_server.h.

References NLNET::CBufServer::_ConnectionCbArg.

Referenced by NLNET::CBufServer::dataAvailable().

00314 { return _ConnectionCbArg; }

void* NLNET::CBufNetBase::argOfDisconnectionCallback  )  const [inline, protected, inherited]
 

Returns the argument of the disconnection callback.

Definition at line 160 of file buf_net_base.h.

References NLNET::CBufNetBase::_DisconnectionCbArg.

Referenced by NLNET::CBufServer::dataAvailable(), and NLNET::CBufClient::dataAvailable().

00160 { return _DisconnectionCbArg; }

void NLNET::CCallbackNetBase::authorizeOnly const char *  callbackName,
TSockId  hostid = InvalidSockId
[inherited]
 

Sets the callback that you want the other side calls. If it didn't call this callback, it will be disconnected If cb is NULL, we authorize *all* callback. On a client, the hostid must be InvalidSockId (or ommited). On a server, you must provide a hostid.

Definition at line 471 of file callback_net_base.cpp.

References NLNET::CBufSock::asString(), NLNET::CBufSock::AuthorizedCallback, NLNET::CCallbackNetBase::checkThreadId(), NLNET::CCallbackNetBase::getSockId(), nlassert, nldebug, and NLNET::TSockId.

Referenced by NLNET::cbShardValidation(), and NLNET::ClientConnection().

00472 {
00473         checkThreadId ();
00474 
00475         nldebug ("LNETL3NB: authorizeOnly (%s, %s)", callbackName, hostid->asString().c_str());
00476 
00477         hostid = getSockId (hostid);
00478         
00479         nlassert (hostid != InvalidSockId);
00480 
00481         hostid->AuthorizedCallback = (callbackName == NULL)?"":callbackName;
00482 }

void NLNET::CCallbackNetBase::baseUpdate sint32  timeout = 0  )  [protected, inherited]
 

Used by client and server class.

Definition at line 328 of file callback_net_base.cpp.

References NLNET::CCallbackNetBase::_FirstUpdate, NLNET::CCallbackNetBase::_InputSIDA, NLNET::CCallbackNetBase::_LastMovedStringArray, NLNET::CCallbackNetBase::_LastUpdateTime, NLNET::CCallbackNetBase::checkThreadId(), NLNET::CCallbackNetBase::dataAvailable(), NLMISC::CStringIdArray::getAskedStringArray(), NLMISC::CStringIdArray::getNeedToAskedStringArray(), H_AUTO, NLMISC::CStringIdArray::moveNeedToAskToAskedStringArray(), nlassert, nldebug, NLMISC::nlSleep(), NLNET::CCallbackNetBase::processOneMessage(), NLNET::CCallbackNetBase::send(), NLMISC::CMemStream::serial(), sint32, size, NLMISC::CStringIdArray::TStringId, and NLMISC::TTime.

Referenced by update(), and NLNET::CCallbackClient::update().

00329 {
00330         H_AUTO(L3UpdateCallbackNetBase);
00331 
00332         checkThreadId ();
00333 
00334         // slow down the layer H_AUTO (CCallbackNetBase_baseUpdate);
00335 
00336         nlassert( timeout >= -1 );
00337         TTime t0 = CTime::getLocalTime();
00338 
00339         //
00340         // The first time, we init time counters
00341         //
00342         if (_FirstUpdate)
00343         {
00344 //              nldebug("LNETL3NB: First update()");
00345                 _FirstUpdate = false;
00346                 _LastUpdateTime = t0;
00347                 _LastMovedStringArray = t0;
00348         }
00349 
00350         //
00351         // Every 1 seconds if we have new unknown association, we ask them to the other side
00352         //
00353         if (t0 - _LastUpdateTime >  1000)
00354         {
00355 //              nldebug("LNETL3NB: baseUpdate()");
00356                 _LastUpdateTime = t0;
00357 
00358                 const set<string> &sa = _InputSIDA.getNeedToAskedStringArray ();
00359                 if (!sa.empty ())
00360                 {
00361                         CMessage msgout (_InputSIDA, "AA");
00362                         //nlassert (sa.size () < 65536); // no size limit anymore
00363                         CStringIdArray::TStringId size = sa.size ();
00364                         nldebug ("LNETL3NB_ASSOC: I need %d string association, ask them to the other side", size);
00365                         msgout.serial (size);
00366                         for (set<string>::iterator it = sa.begin(); it != sa.end(); it++)
00367                         {
00368                                 nldebug ("LNETL3NB_ASSOC:  what is the id of '%s'?", (*it).c_str ());
00369                                 string str(*it);
00370                                 msgout.serial (str);
00371                         }
00372                         // send the message to the other side
00373                         send (msgout, 0);
00374                         _InputSIDA.moveNeedToAskToAskedStringArray();
00375                         _LastMovedStringArray = t0;
00376                 }
00377         }
00378 
00379         //
00380         // Every 60 seconds if we have not answered association, we ask again to get them!
00381         //
00382         if (!_InputSIDA.getAskedStringArray().empty() && t0 - _LastMovedStringArray > 60000)
00383         {
00384                 // we didn't have an answer for the association, resend them
00385                 const set<string> sa = _InputSIDA.getAskedStringArray ();
00386                 CMessage msgout (_InputSIDA, "AA");
00387                 //nlassert (sa.size () < 65536); // no size limit anymore
00388                 CStringIdArray::TStringId size = sa.size ();
00389                 nldebug ("LNETL3NB_ASSOC: client didn't answer my asked association, retry! I need %d string association, ask them to the other side", size);
00390                 msgout.serial (size);
00391                 for (set<string>::iterator it = sa.begin(); it != sa.end(); it++)
00392                 {
00393                         nldebug ("LNETL3NB_ASSOC:  what is the id of '%s'?", (*it).c_str ());
00394                         string str(*it);
00395                         msgout.serial (str);
00396                 }
00397                 // sends the message to the other side
00398                 send (msgout, 0);
00399                 _LastMovedStringArray = t0;
00400         }
00401 
00402         /*
00403          * timeout -1    =>  read one message in the queue or nothing if no message in queue
00404          * timeout 0     =>  read all messages in the queue
00405          * timeout other =>  read all messages in the queue until timeout expired (at least all one time)
00406          */
00407 
00408         bool exit = false;
00409 
00410         while (!exit)
00411         {
00412                 // process all messages in the queue
00413                 while (dataAvailable ())
00414                 {
00415                         processOneMessage ();
00416                         if (timeout == -1)
00417                         {
00418                                 exit = true;
00419                                 break;
00420                         }
00421                 }
00422 
00423                 // need to exit?
00424                 if (timeout == 0 || (sint32)(CTime::getLocalTime() - t0) > timeout)
00425                 {
00426                         exit = true;
00427                 }
00428                 else
00429                 {
00430                         // enable multithreading on windows :-/
00431                         // slow down the layer H_AUTO (CCallbackNetBase_baseUpdate_nlSleep);
00432                         nlSleep (10);
00433                 }
00434         }
00435 
00436 #ifdef USE_MESSAGE_RECORDER
00437         _MR_UpdateCounter++;
00438 #endif
00439 
00440 }

uint64 NLNET::CBufServer::bytesReceived  )  const [inline, inherited]
 

Returns the number of bytes popped by receive() since the beginning.

Definition at line 255 of file buf_server.h.

References NLNET::CBufServer::_BytesPoppedIn, and uint64.

Referenced by NLNET::CBufServer::newBytesReceived().

00255 { return _BytesPoppedIn; }

uint64 NLNET::CBufServer::bytesSent  )  const [inline, inherited]
 

Returns the number of bytes pushed by send() since the beginning.

Definition at line 261 of file buf_server.h.

References NLNET::CBufServer::_BytesPushedOut, and uint64.

Referenced by NLNET::CBufServer::newBytesSent().

00261 { return _BytesPushedOut; }

void NLNET::CCallbackNetBase::checkThreadId  )  const [protected, inherited]
 

Definition at line 533 of file callback_net_base.cpp.

Referenced by NLNET::CCallbackNetBase::addCallbackArray(), NLNET::CCallbackNetBase::authorizeOnly(), NLNET::CCallbackNetBase::baseUpdate(), NLNET::CCallbackClient::connect(), connected(), NLNET::CCallbackClient::connected(), dataAvailable(), NLNET::CCallbackClient::dataAvailable(), disconnect(), NLNET::CCallbackClient::disconnect(), NLNET::CCallbackNetBase::displayAllMyAssociations(), flush(), NLNET::CCallbackClient::flush(), getSockId(), NLNET::CCallbackClient::getSockId(), hostAddress(), NLNET::CCallbackNetBase::isAServer(), NLNET::CCallbackNetBase::processOneMessage(), receive(), NLNET::CCallbackClient::receive(), send(), NLNET::CCallbackClient::send(), sendAllMyAssociations(), setConnectionCallback(), setDisconnectionCallback(), NLNET::CCallbackNetBase::setDisconnectionCallback(), NLNET::CCallbackClient::setDisconnectionCallback(), NLNET::CCallbackNetBase::setOtherSideAssociations(), update(), and NLNET::CCallbackClient::update().

00534 {
00535 /*      some people use this class in different thread but with a mutex to be sure to have
00536         no concurent access
00537         if (getThreadId () != _ThreadId)
00538         {
00539                 nlerror ("You try to access to the same CCallbackClient or CCallbackServer with 2 differents thread (%d and %d)", _ThreadId, getThreadId());
00540         }
00541 */
00542 }

bool NLNET::CCallbackServer::connected  )  const [inline, virtual]
 

Returns true if the connection is still connected. on server, we always "connected".

Reimplemented from NLNET::CCallbackNetBase.

Definition at line 67 of file callback_server.h.

References NLNET::CCallbackNetBase::checkThreadId().

00067 { checkThreadId (); return true; } 

TNetCallback NLNET::CBufServer::connectionCallback  )  const [inline, protected, inherited]
 

Returns the connection callback.

Definition at line 311 of file buf_server.h.

References NLNET::CBufServer::_ConnectionCallback, and NLNET::TNetCallback.

Referenced by NLNET::CBufServer::dataAvailable().

00311 { return _ConnectionCallback; }

bool NLNET::CCallbackServer::dataAvailable  )  [private]
 

Checks if there is some data to receive. Returns false if the receive queue is empty. This is where the connection/disconnection callbacks can be called.

Reimplemented from NLNET::CBufServer.

Definition at line 283 of file callback_server.cpp.

References NLNET::CCallbackNetBase::checkThreadId().

00284 {
00285         checkThreadId ();
00286 
00287 #ifdef USE_MESSAGE_RECORDER
00288         if ( _MR_RecordingState != Replay )
00289         {
00290 #endif
00291 
00292                 // Real dataAvailable()
00293                 return CBufServer::dataAvailable (); 
00294 
00295 #ifdef USE_MESSAGE_RECORDER
00296         }
00297         else
00298         {
00299                 // Simulated dataAvailable()
00300                 return CCallbackNetBase::replayDataAvailable();
00301         }
00302 #endif
00303 }

volatile bool NLNET::CBufNetBase::dataAvailableFlag  )  const [inline, protected, inherited]
 

Return _DataAvailable.

Definition at line 206 of file buf_net_base.h.

References NLNET::CBufNetBase::_DataAvailable.

Referenced by NLNET::CBufServer::dataAvailable(), and NLNET::CBufClient::dataAvailable().

00206 { return _DataAvailable; }

void NLNET::CBufServer::disconnect TSockId  hostid,
bool  quick = false
[inherited]
 

Disconnect a connection Set hostid to InvalidSockId to disconnect all connections. If hostid is not InvalidSockId and the socket is not connected, the method does nothing. If quick is true, any pending data will not be sent before disconnecting.

Definition at line 251 of file buf_server.cpp.

References NLNET::CServerReceiveTask::_Connections, NLNET::CBufServer::_ThreadPool, NLNET::CSock::connected(), NLNET::CTcpSock::disconnect(), NLNET::CBufSock::flush(), nlnettrace, NLNET::CBufServer::receiveTask(), NLNET::CBufSock::Sock, and NLNET::TSockId.

00252 {
00253         nlnettrace( "CBufServer::disconnect" );
00254         if ( hostid != InvalidSockId )
00255         {
00256                 // Disconnect only if physically connected
00257                 if ( hostid->Sock->connected() )
00258                 {
00259                         if ( ! quick )
00260                         {
00261                                 hostid->flush();
00262                         }
00263                         hostid->Sock->disconnect(); // the connection will be removed by the next call of update()
00264                 }
00265         }
00266         else
00267         {
00268                 // Disconnect all
00269                 CThreadPool::iterator ipt;
00270                 {
00271                         CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
00272                         for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
00273                         {
00274                                 CServerReceiveTask *task = receiveTask(ipt);
00275                                 CConnections::iterator ipb;
00276                                 {
00277                                         CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
00278                                         for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
00279                                         {
00280                                                 if ( (*ipb)->Sock->connected() )
00281                                                 {
00282                                                         if ( ! quick )
00283                                                         {
00284                                                                 (*ipb)->flush();
00285                                                         }
00286                                                         (*ipb)->Sock->disconnect();
00287                                                 }
00288                                         }
00289                                 }
00290                         }
00291                 }
00292         }
00293 }

void NLNET::CCallbackServer::disconnect TSockId  hostid  )  [virtual]
 

Disconnect a connection Set hostid to InvalidSockId to disconnect all connections. If hostid is not InvalidSockId and the socket is not connected, the method does nothing. Before disconnecting, any pending data is actually sent.

Reimplemented from NLNET::CCallbackNetBase.

Definition at line 246 of file callback_server.cpp.

References NLNET::CCallbackNetBase::checkThreadId(), and NLNET::TSockId.

Referenced by NLNET::cbWSDisconnectClient(), and NLNET::CUnifiedNetwork::release().

00247 {
00248         checkThreadId ();
00249 
00250 #ifdef USE_MESSAGE_RECORDER
00251         if ( _MR_RecordingState != Replay )
00252         {
00253 #endif
00254                 // Disconnect
00255                 CBufServer::disconnect( hostid );
00256 
00257 #ifdef USE_MESSAGE_RECORDER
00258         }
00259         // else, no need to manually replay the disconnection, such as in CCallbackClient,
00260         // it will be replayed during the next update()
00261 #endif
00262 }

TNetCallback NLNET::CBufNetBase::disconnectionCallback  )  const [inline, protected, inherited]
 

Returns the disconnection callback.

Definition at line 157 of file buf_net_base.h.

References NLNET::CBufNetBase::_DisconnectionCallback, and NLNET::TNetCallback.

Referenced by NLNET::CBufServer::dataAvailable(), and NLNET::CBufClient::dataAvailable().

00157 { return _DisconnectionCallback; }

void NLNET::CBufServer::dispatchNewSocket CServerBufSock bufsock  )  [protected, inherited]
 

Binds a new socket and send buffer to an existing or a new thread (that starts) Note: this method is called in the listening thread.

Definition at line 866 of file buf_server.cpp.

References NLNET::CBufServer::_MaxSocketsPerThread, NLNET::CBufServer::_MaxThreads, NLNET::CBufServer::_ThreadPool, NLNET::CBufServer::_ThreadStrategy, NLNET::CServerReceiveTask::addNewSocket(), NLNET::CBufServer::addNewThread(), min, nldebug, nlnettrace, nlwarning, NLNET::CServerReceiveTask::numberOfConnections(), NLNET::CBufServer::receiveTask(), NLNET::CServerBufSock::setOwnerTask(), NLNET::CBufServer::SpreadSockets, and uint.

Referenced by NLNET::CListenTask::run().

00867 {
00868         nlnettrace( "CBufServer::dispatchNewSocket" );
00869 
00870         CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
00871         if ( _ThreadStrategy == SpreadSockets ) 
00872         {
00873                 // Find the thread with the smallest number of connections and check if all
00874                 // threads do not have the same number of connections
00875                 uint min = 0xFFFFFFFF;
00876                 uint max = 0;
00877                 CThreadPool::iterator ipt, iptmin, iptmax;
00878                 for ( iptmin=iptmax=ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
00879                 {
00880                         uint noc = receiveTask(ipt)->numberOfConnections();
00881                         if ( noc < min )
00882                         {
00883                                 min = noc;
00884                                 iptmin = ipt;
00885                         }
00886                         if ( noc > max )
00887                         {
00888                                 max = noc;
00889                                 iptmax = ipt;
00890                         }
00891                 }
00892 
00893                 // Check if we make the pool of threads grow (if we have not found vacant room
00894                 // and if it is allowed to)
00895                 if ( (poolsync.value().empty()) ||
00896                          ((min == max) && (poolsync.value().size() < _MaxThreads)) )
00897                 {
00898                         addNewThread( poolsync.value(), bufsock );
00899                 }
00900                 else
00901                 {
00902                         // Dispatch socket to an existing thread of the pool
00903                         CServerReceiveTask *task = receiveTask(iptmin);
00904                         bufsock->setOwnerTask( task );
00905                         task->addNewSocket( bufsock );
00906 #ifdef NL_OS_UNIX
00907                         task->wakeUp();
00908 #endif                  
00909                         
00910                         if ( min >= (uint)_MaxSocketsPerThread )
00911                         {
00912                                 nlwarning( "LNETL1: Exceeding the maximum number of sockets per thread" );
00913                         }
00914                         nldebug( "LNETL1: New socket dispatched to thread %d", iptmin-poolsync.value().begin() );
00915                 }
00916 
00917         }
00918         else // _ThreadStrategy == FillThreads
00919         {
00920                 CThreadPool::iterator ipt;
00921                 for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
00922                 {
00923                         uint noc = receiveTask(ipt)->numberOfConnections();
00924                         if ( noc < _MaxSocketsPerThread )
00925                         {
00926                                 break;
00927                         }
00928                 }
00929 
00930                 // Check if we have to make the thread pool grow (if we have not found vacant room)
00931                 if ( ipt == poolsync.value().end() )
00932                 {
00933                         if ( poolsync.value().size() == _MaxThreads )
00934                         {
00935                                 nlwarning( "LNETL1: Exceeding the maximum number of threads" );
00936                         }
00937                         addNewThread( poolsync.value(), bufsock );
00938                 }
00939                 else
00940                 {
00941                         // Dispatch socket to an existing thread of the pool
00942                         CServerReceiveTask *task = receiveTask(ipt);
00943                         bufsock->setOwnerTask( task );
00944                         task->addNewSocket( bufsock );
00945 #ifdef NL_OS_UNIX
00946                         task->wakeUp();
00947 #endif                  
00948                         nldebug( "LNETL1: New socket dispatched to thread %d", ipt-poolsync.value().begin() );
00949                 }
00950         }
00951 }

void NLNET::CCallbackNetBase::displayAllMyAssociations  )  [inherited]
 

Definition at line 464 of file callback_net_base.cpp.

References NLNET::CCallbackNetBase::_OutputSIDA, NLNET::CCallbackNetBase::checkThreadId(), and NLMISC::CStringIdArray::display().

Referenced by NLNET::CLoginClient::connectToShard().

00465 {
00466         checkThreadId ();
00467 
00468         _OutputSIDA.display ();
00469 }

void NLNET::CCallbackServer::displayReceiveQueueStat NLMISC::CLog log = NLMISC::InfoLog  )  [inline, virtual]
 

Implements NLNET::CCallbackNetBase.

Definition at line 85 of file callback_server.h.

00085 { CBufServer::displayReceiveQueueStat(log); }

void NLNET::CCallbackServer::displaySendQueueStat NLMISC::CLog log = NLMISC::InfoLog,
TSockId  destid = InvalidSockId
[inline]
 

Reimplemented from NLNET::CBufServer.

Definition at line 86 of file callback_server.h.

References NLNET::TSockId.

00086 { CBufServer::displaySendQueueStat(log, destid); }

void NLNET::CCallbackServer::displayThreadStat NLMISC::CLog log = NLMISC::InfoLog  )  [inline]
 

Reimplemented from NLNET::CBufServer.

Definition at line 88 of file callback_server.h.

00088 { CBufServer::displayThreadStat(log); }

bool NLNET::CCallbackServer::flush TSockId  destid  )  [inline]
 

Force to send all data pending in the send queue.

Reimplemented from NLNET::CBufServer.

Definition at line 55 of file callback_server.h.

References NLNET::CCallbackNetBase::checkThreadId(), nlassert, and NLNET::TSockId.

00055 { checkThreadId (); nlassert( destid != InvalidSockId ); return CBufServer::flush(destid); }

uint64 NLNET::CCallbackNetBase::getBytesReceived  )  [inline, inherited]
 

Definition at line 88 of file callback_net_base.h.

References NLNET::CCallbackNetBase::_BytesReceived, and uint64.

Referenced by NLNET::CUnifiedNetwork::getBytesReceived().

00088 { return _BytesReceived; }

uint64 NLNET::CCallbackNetBase::getBytesSent  )  [inline, inherited]
 

Definition at line 87 of file callback_net_base.h.

References NLNET::CCallbackNetBase::_BytesSent, and uint64.

Referenced by NLNET::CUnifiedNetwork::getBytesSent().

00087 { return _BytesSent; }

uint64 NLNET::CCallbackServer::getReceiveQueueSize  )  [inline, virtual]
 

Implements NLNET::CCallbackNetBase.

Definition at line 82 of file callback_server.h.

References uint64.

Referenced by NLNET::CUnifiedNetwork::getReceiveQueueSize().

00082 { return CBufServer::getReceiveQueueSize(); }

uint32 NLNET::CBufServer::getSendQueueSize TSockId  destid  )  [inherited]
 

Definition at line 660 of file buf_server.cpp.

References NLNET::CServerReceiveTask::_Connections, NLNET::CBufServer::_ThreadPool, NLNET::CBufServer::receiveTask(), NLNET::CBufSock::SendFifo, NLMISC::CBufFIFO::size(), NLNET::TSockId, and uint32.

00661 {
00662         if ( destid != InvalidSockId )
00663         {
00664                 return destid->SendFifo.size();
00665         }
00666         else
00667         {
00668                 // add all client buffers
00669 
00670                 uint32 total = 0;
00671 
00672                 // For each thread
00673                 CThreadPool::iterator ipt;
00674                 {
00675                         CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
00676                         for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
00677                         {
00678                                 // For each thread of the pool
00679                                 CServerReceiveTask *task = receiveTask(ipt);
00680                                 CConnections::iterator ipb;
00681                                 {
00682                                         CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
00683                                         for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
00684                                         {
00685                                                 // For each socket of the thread, update sending
00686                                                 total = (*ipb)->SendFifo.size ();
00687                                         }
00688                                 }
00689                         }
00690                 }
00691                 return total;
00692         }
00693 }

uint64 NLNET::CCallbackServer::getSendQueueSize  )  [inline, virtual]
 

Implements NLNET::CCallbackNetBase.

Definition at line 83 of file callback_server.h.

References uint64.

Referenced by NLNET::CUnifiedNetwork::getSendQueueSize().

00083 { return CBufServer::getSendQueueSize(0); }

NLMISC::CStringIdArray& NLNET::CCallbackNetBase::getSIDA  )  [inline, inherited]
 

Use this function to get the String ID Array needed when you want to create a message.

Definition at line 129 of file callback_net_base.h.

References NLNET::CCallbackNetBase::_InputSIDA.

Referenced by NLNET::cbcMessageRecvAllAssociations(), NLNET::cbnbMessageAskAssociations(), NLNET::cbnbMessageRecvAssociations(), NLNET::cbServerAskUniversalTime(), NLNET::cbShardValidation(), NLNET::CLoginClient::connectToShard(), NLNET::CNetDisplayer::doDisplay(), NLNET::CNamingClient::queryServicePort(), NLNET::CNamingClient::registerService(), NLNET::CNamingClient::registerServiceWithSId(), NLNET::CNamingClient::resendRegisteration(), sendAllMyAssociations(), NLNET::CCallbackNetBase::setOtherSideAssociations(), and NLNET::CNamingClient::unregisterService().

00129 { return _InputSIDA; }

TSockId NLNET::CCallbackServer::getSockId TSockId  hostid = InvalidSockId  )  [virtual]
 

Returns the sockid (cf. CCallbackClient).

Implements NLNET::CCallbackNetBase.

Definition at line 268 of file callback_server.cpp.

References NLNET::CCallbackNetBase::checkThreadId(), nlassert, and NLNET::TSockId.

00269 {
00270         nlassert (hostid != InvalidSockId);     // invalid hostid
00271         checkThreadId ();
00272         nlassert (connected ());
00273         nlassert (hostid != NULL);
00274         return hostid;
00275 }

const CInetAddress& NLNET::CCallbackServer::hostAddress TSockId  hostid  )  [inline]
 

Returns the address of the specified host.

Reimplemented from NLNET::CBufServer.

Definition at line 77 of file callback_server.h.

References NLNET::CCallbackNetBase::checkThreadId(), nlassert, and NLNET::TSockId.

00077 { nlassert(hostid!=InvalidSockId); checkThreadId(); return CBufServer::hostAddress (hostid); }

void NLNET::CCallbackNetBase::ignoreAllUnknownId bool  b  )  [inline, inherited]
 

If you ignore all unknown id, the net will not ask for other side to know new association. It's used in the naming service for example because the naming client will never answer. In this case, it will always send the message with the full string name (slower)

Definition at line 157 of file callback_net_base.h.

References NLNET::CCallbackNetBase::_InputSIDA, and NLMISC::CStringIdArray::ignoreAllUnknownId().

00158         {
00159                 _InputSIDA.ignoreAllUnknownId (b);
00160         }

void NLNET::CBufServer::init uint16  port  )  [inherited]
 

Listens on the specified port.

Definition at line 93 of file buf_server.cpp.

References NLNET::CBufServer::_ListenTask, NLNET::CBufServer::_ListenThread, NLNET::CBufServer::_ReplayMode, NLNET::CListenTask::init(), NLNET::CBufNetBase::maxExpectedBlockSize(), nldebug, nlnettrace, NLMISC::IThread::start(), and uint16.

Referenced by NLNET::CNetManager::addServer(), and NLNET::CUnifiedNetwork::init().

00094 {
00095         nlnettrace( "CBufServer::init" );
00096         if ( ! _ReplayMode )
00097         {
00098                 _ListenTask->init( port, maxExpectedBlockSize() );
00099                 _ListenThread->start();
00100         }
00101         else
00102         {
00103                 nldebug( "LNETL1: Binding listen socket to any address, port %hu", port );
00104         }
00105 }

bool NLNET::CCallbackNetBase::isAServer  )  const [inline, inherited]
 

Returns true if this is a CCallbackServer.

Definition at line 126 of file callback_net_base.h.

References NLNET::CCallbackNetBase::_IsAServer, and NLNET::CCallbackNetBase::checkThreadId().

00126 { checkThreadId (); return _IsAServer; }

const CInetAddress& NLNET::CBufServer::listenAddress  )  const [inline, inherited]
 

Returns the internet address of the listening socket.

Definition at line 238 of file buf_server.h.

References NLNET::CBufServer::_ListenTask, and NLNET::CListenTask::localAddr().

Referenced by NLNET::CLoginServer::init(), NLNET::NLMISC_DYNVARIABLE(), and NLNET::setListenAddress().

00238 { return _ListenTask->localAddr(); }

uint32 NLNET::CBufNetBase::maxExpectedBlockSize  )  const [inline, inherited]
 

Returns the max size of the received messages (default: 2^31-1).

Definition at line 135 of file buf_net_base.h.

References NLNET::CBufNetBase::_MaxExpectedBlockSize, and uint32.

Referenced by NLNET::CBufClient::connect(), and NLNET::CBufServer::init().

00136         {
00137                 return _MaxExpectedBlockSize;
00138         }

uint32 NLNET::CBufNetBase::maxSentBlockSize  )  const [inline, inherited]
 

Returns the max size of the sent messages (default: 2^31-1).

Definition at line 141 of file buf_net_base.h.

References NLNET::CBufNetBase::_MaxSentBlockSize, and uint32.

Referenced by NLNET::CBufServer::send(), and NLNET::CBufClient::send().

00142         {
00143                 return _MaxSentBlockSize;
00144         }

uint32 NLNET::CBufServer::nbConnections  )  const [inline, inherited]
 

Returns the number of connections (at the last update()).

Definition at line 267 of file buf_server.h.

References NLNET::CBufServer::_NbConnections, and uint32.

Referenced by send().

00267 { return _NbConnections; }

uint64 NLNET::CBufServer::newBytesReceived  )  [inherited]
 

Returns the number of bytes popped by receive() since the previous call to this method.

Definition at line 749 of file buf_server.cpp.

References NLNET::CBufServer::_PrevBytesPoppedIn, NLNET::CBufServer::bytesReceived(), and uint64.

00750 {
00751         uint64 b = bytesReceived();
00752         uint64 nbrecvd = b - _PrevBytesPoppedIn;
00753         //nlinfo( "b: %"NL_I64"u   new: %"NL_I64"u", b, nbrecvd );
00754         _PrevBytesPoppedIn = b;
00755         return nbrecvd;
00756 }

uint64 NLNET::CBufServer::newBytesSent  )  [inherited]
 

Returns the number of bytes pushed by send() since the previous call to this method.

Definition at line 761 of file buf_server.cpp.

References NLNET::CBufServer::_PrevBytesPushedOut, NLNET::CBufServer::bytesSent(), and uint64.

00762 {
00763         uint64 b = bytesSent();
00764         uint64 nbsent = b - _PrevBytesPushedOut;
00765         //nlinfo( "b: %"NL_I64"u   new: %"NL_I64"u", b, nbsent );
00766         _PrevBytesPushedOut = b;
00767         return nbsent;
00768 }

bool NLNET::CBufServer::noDelay  )  const [inline, protected, inherited]
 

Returns the TCP_NODELAY flag.

Definition at line 276 of file buf_server.h.

Referenced by NLNET::CListenTask::run().

00276 { return _NoDelay; }

void NLNET::CCallbackNetBase::processOneMessage  )  [protected, inherited]
 

Read a message from the network and process it.

Definition at line 223 of file callback_net_base.cpp.

References NLNET::CCallbackNetBase::_BytesReceived, NLNET::CCallbackNetBase::_CallbackArray, NLNET::CCallbackNetBase::_DefaultCallback, NLNET::CCallbackNetBase::_OutputSIDA, NLNET::CBufSock::asString(), NLNET::CBufSock::AuthorizedCallback, NLNET::CCallbackNetBase::checkThreadId(), NLNET::CCallbackNetBase::disconnect(), NLNET::CMessage::getId(), NLNET::CMessage::getName(), NLNET::CCallbackNetBase::getSockId(), NLNET::CMessage::length(), nldebug, nlwarning, NLNET::CCallbackNetBase::receive(), sint16, NLNET::TMsgCallback, NLNET::CMessage::toString(), NLNET::TSockId, NLMISC::CStringIdArray::TStringId, and NLNET::CMessage::TypeHasAnId.

Referenced by NLNET::CCallbackNetBase::baseUpdate().

00224 {
00225         checkThreadId ();
00226 
00227         // slow down the layer H_AUTO (CCallbackNetBase_processOneMessage);
00228 
00229         CMessage msgin (_OutputSIDA, "", true);
00230         TSockId tsid;
00231         receive (msgin, &tsid);
00232 
00233         _BytesReceived += msgin.length ();
00234         
00235         // now, we have to call the good callback
00236         NLMISC::CStringIdArray::TStringId pos = -1;
00237         if (msgin.TypeHasAnId)
00238         {
00239                 pos = msgin.getId ();
00240         }
00241         else
00242         {
00243                 std::string name = msgin.getName ();
00244                 sint16 i;
00245                 for (i = 0; i < (sint16) _CallbackArray.size (); i++)
00246                 {
00247                         if (name == _CallbackArray[i].Key)
00248                         {
00249                                 pos = i;
00250                                 break;
00251                         }
00252                 }
00253         }
00254 
00255         TMsgCallback    cb = NULL;
00256         if (pos < 0 || pos >= (sint16) _CallbackArray.size ())
00257         {
00258                 if (_DefaultCallback == NULL)
00259                 {
00260                         nlwarning ("LNETL3NB_CB: Callback %s not found in _CallbackArray", msgin.toString().c_str());
00261                 }
00262                 else
00263                 {
00264                         cb = _DefaultCallback;
00265                 }
00266         }
00267         else
00268         {
00269                 cb = _CallbackArray[pos].Callback;
00270         }
00271 
00272         TSockId realid = getSockId (tsid);
00273 
00274         if (!realid->AuthorizedCallback.empty() && msgin.getName() != realid->AuthorizedCallback)
00275         {
00276                 nlwarning ("LNETL3NB_CB: %s try to call the callback %s but only %s is authorized. Disconnect him!", tsid->asString().c_str(), msgin.toString().c_str(), tsid->AuthorizedCallback.c_str());
00277                 disconnect (tsid);
00278         }
00279         else if (cb == NULL)
00280         {
00281                 nlwarning ("LNETL3NB_CB: Callback %s is NULL, can't call it", msgin.toString().c_str());
00282         }
00283         else
00284         {
00285                 nldebug ("LNETL3NB_CB: Calling callback (%s)%s", msgin.getName().c_str(), (cb==_DefaultCallback)?" DEFAULT_CB":"");
00286                 cb(msgin, realid, *this);
00287         }
00288         
00289 /*
00290         if (pos < 0 || pos >= (sint16) _CallbackArray.size ())
00291         {
00292                 if (_DefaultCallback == NULL)
00293                         nlwarning ("LNETL3NB_CB: Callback %s not found in _CallbackArray", msgin.toString().c_str());
00294                 else
00295                 {
00296                         // ...
00297                 }
00298         }
00299         else
00300         {
00301                 TSockId realid = getSockId (tsid);
00302 
00303                 if (!realid->AuthorizedCallback.empty() && msgin.getName() != realid->AuthorizedCallback)
00304                 {
00305                         nlwarning ("LNETL3NB_CB: %s try to call the callback %s but only %s is authorized. Disconnect him!", tsid->asString().c_str(), msgin.toString().c_str(), tsid->AuthorizedCallback.c_str());
00306                         disconnect (tsid);
00307                 }
00308                 else if (_CallbackArray[pos].Callback == NULL)
00309                 {
00310                         nlwarning ("LNETL3NB_CB: Callback %s is NULL, can't call it", msgin.toString().c_str());
00311                 }
00312                 else
00313                 {
00314                         nldebug ("LNETL3NB_CB: Calling callback (%s)", _CallbackArray[pos].Key);
00315                         _CallbackArray[pos].Callback (msgin, realid, *this);
00316                 }
00317         }
00318 */
00319 }

void NLNET::CBufServer::pushBufferToHost const NLMISC::CMemStream buffer,
TSockId  hostid
[inline, protected, inherited]
 

Pushes a buffer to the specified host's send queue and update (unless not connected).

Definition at line 298 of file buf_server.h.

References NLNET::CBufServer::_BytesPushedOut, buffer, nlassert, NLNET::CBufSock::pushBuffer(), NLNET::TBlockSize, and NLNET::TSockId.

Referenced by NLNET::CBufServer::send().

00299         {
00300                 nlassert( hostid != InvalidSockId );
00301                 if ( hostid->pushBuffer( buffer ) )
00302                 {
00303                         _BytesPushedOut += buffer.length() + sizeof(TBlockSize); // statistics
00304                 }
00305         }

void NLNET::CBufNetBase::pushMessageIntoReceiveQueue const uint8 buffer,
uint32  size
[inline, protected, inherited]
 

Definition at line 183 of file buf_net_base.h.

References NLNET::CBufNetBase::_RecvFifo, buffer, NLNET::CFifoAccessor, NLNET::CBufNetBase::setDataAvailableFlag(), size, uint32, and uint8.

00184         {
00185                 //sint32 mbsize;
00186                 {
00187                         //nldebug( "BNB: Acquiring the receive queue... ");
00188                         CFifoAccessor recvfifo( &_RecvFifo );
00189                         //nldebug( "BNB: Acquired, pushing the received buffer... ");
00190                         recvfifo.value().push( buffer, size );
00191                         //nldebug( "BNB: Pushed, releasing the receive queue..." );
00192                         //mbsize = recvfifo.value().size() / 1048576;
00193                         setDataAvailableFlag( true );
00194                 }
00195                 //nldebug( "BNB: Released." );
00196                 /*if ( mbsize > 1 )
00197                 {
00198                         nlwarning( "The receive queue size exceeds %d MB", mbsize );
00199                 }*/
00200         }

void NLNET::CBufNetBase::pushMessageIntoReceiveQueue const std::vector< uint8 > &  buffer  )  [inline, protected, inherited]
 

Push message into receive queue (mutexed).

Definition at line 164 of file buf_net_base.h.

References NLNET::CBufNetBase::_RecvFifo, buffer, NLNET::CFifoAccessor, and NLNET::CBufNetBase::setDataAvailableFlag().

Referenced by NLNET::CBufSock::advertiseSystemEvent(), and NLNET::CClientReceiveTask::run().

00165         {
00166                 //sint32 mbsize;
00167                 {
00168                         //nldebug( "BNB: Acquiring the receive queue... ");
00169                         CFifoAccessor recvfifo( &_RecvFifo );
00170                         //nldebug( "BNB: Acquired, pushing the received buffer... ");
00171                         recvfifo.value().push( buffer );
00172                         //nldebug( "BNB: Pushed, releasing the receive queue..." );
00173                         //mbsize = recvfifo.value().size() / 1048576;
00174                         setDataAvailableFlag( true );
00175                 }
00176                 //nldebug( "BNB: Released." );
00177                 //if ( mbsize > 1 )
00178                 //{
00179                 //      nlwarning( "The receive queue size exceeds %d MB", mbsize );
00180                 //}
00181         }

void NLNET::CBufServer::receive NLMISC::CMemStream buffer,
TSockId hostid
[inherited]
 

Receives next block of data in the specified (resizes the vector) You must call dataAvailable() before every call to receive()

Definition at line 565 of file buf_server.cpp.

References NLNET::CBufServer::_BytesPoppedIn, buffer, NLNET::CFifoAccessor, nlassert, NLMISC_BSWAP32, nlnettrace, nlwarning, NLNET::CBufNetBase::receiveQueue(), NLNET::CBufNetBase::setDataAvailableFlag(), NLNET::TBlockSize, NLNET::TSockId, and uint32.

00566 {
00567         nlnettrace( "CBufServer::receive" );
00568         //nlassert( dataAvailable() );
00569         nlassert( phostid != NULL );
00570         {
00571                 CFifoAccessor recvfifo( &receiveQueue() );
00572                 nlassert( ! recvfifo.value().empty() );
00573                 recvfifo.value().front( buffer );
00574                 recvfifo.value().pop();
00575                 setDataAvailableFlag( ! recvfifo.value().empty() );
00576         }
00577 
00578         // Extract hostid (and event type)
00579         *phostid = *((TSockId*)&(buffer.buffer()[buffer.size()-sizeof(TSockId)-1]));
00580         nlassert( buffer.buffer()[buffer.size()-1] == CBufNetBase::User );
00581 
00582         // debug features, we number all packet to be sure that they are all sent and received
00583         // \todo remove this debug feature when ok
00584 #ifdef NL_BIG_ENDIAN
00585         uint32 val = NLMISC_BSWAP32(*(uint32*)buffer.buffer());
00586 #else
00587         uint32 val = *(uint32*)buffer.buffer();
00588 #endif
00589 
00590         //      nldebug ("receive message number %u", val);
00591         if ((*phostid)->ReceiveNextValue != val)
00592         {
00593                 nlwarning ("LNETL1: !!!LOST A MESSAGE!!! I received the message number %u but I'm waiting the message number %u (cnx %s), warn lecroart@nevrax.com with the log now please", val, (*phostid)->ReceiveNextValue, (*phostid)->asString().c_str());
00594                 // resync the message number
00595                 (*phostid)->ReceiveNextValue = val;
00596         }
00597 
00598         (*phostid)->ReceiveNextValue++;
00599 
00600         buffer.resize( buffer.size()-sizeof(TSockId)-1 );
00601 
00602         // TODO OPTIM remove the nldebug for speed
00603         //commented for optimisation nldebug( "LNETL1: Read buffer (%d+%d B) from %s", buffer.size(), sizeof(TSockId)+1, /*stringFromVector(buffer).c_str(), */(*phostid)->asString().c_str() );
00604 
00605         // Statistics
00606         _BytesPoppedIn += buffer.size() + sizeof(TBlockSize);
00607 }

void NLNET::CCallbackServer::receive CMessage buffer,
TSockId hostid
[private, virtual]
 

On this layer, you can't call directly receive, It s the update() function that receive and call your callaback.

Implements NLNET::CCallbackNetBase.

Definition at line 205 of file callback_server.cpp.

References buffer, NLNET::CCallbackNetBase::checkThreadId(), nlassert, NLNET::Receiving, and NLNET::TSockId.

00206 {
00207         checkThreadId ();
00208         nlassert (connected ());
00209 
00210 #ifdef USE_MESSAGE_RECORDER
00211         if ( _MR_RecordingState != Replay )
00212         {
00213 #endif
00214 
00215                 // Receive
00216                 CBufServer::receive (buffer, hostid);
00217 
00218 #ifdef USE_MESSAGE_RECORDER
00219                 if ( _MR_RecordingState == Record )
00220                 {
00221                         // Record received message
00222                         _MR_Recorder.recordNext( _MR_UpdateCounter, Receiving, *hostid, const_cast<CMessage&>(buffer) );
00223                 }
00224         }
00225         else
00226         {
00227                 // Retrieve received message loaded by dataAvailable()
00228                 buffer = _MR_Recorder.ReceivedMessages.front().Message;
00229                 *hostid = _MR_Recorder.ReceivedMessages.front().SockId;
00230                 _MR_Recorder.ReceivedMessages.pop();
00231         }
00232 #endif
00233         
00234         buffer.readType ();
00235 }

CSynchronizedFIFO& NLNET::CBufNetBase::receiveQueue  )  [inline, protected, inherited]
 

Access to the receive queue.

Definition at line 154 of file buf_net_base.h.

References NLNET::CBufNetBase::_RecvFifo, and NLNET::CSynchronizedFIFO.

Referenced by NLNET::CBufServer::dataAvailable(), NLNET::CBufClient::dataAvailable(), NLNET::CBufClient::disconnect(), NLNET::CBufServer::receive(), NLNET::CBufClient::receive(), and NLNET::CServerReceiveTask::run().

00154 { return _RecvFifo; }

CServerReceiveTask* NLNET::CBufServer::receiveTask std::vector< NLMISC::IThread * >::iterator  ipt  )  [inline, protected, inherited]
 

Returns the receive task corresponding to a particular thread.

Definition at line 284 of file buf_server.h.

Referenced by NLNET::CBufServer::disconnect(), NLNET::CBufServer::dispatchNewSocket(), NLNET::CBufServer::displaySendQueueStat(), NLNET::CBufServer::displayThreadStat(), NLNET::CBufServer::getSendQueueSize(), NLNET::CBufServer::send(), NLNET::CBufServer::update(), and NLNET::CBufServer::~CBufServer().

00285         {
00286                 return ((CServerReceiveTask*)((*ipt)->getRunnable()));
00287         }

void NLNET::CCallbackServer::send const NLMISC::CMemStream buffer,
TSockId  hostid
[inline, private]
 

This function is public in the base class and put it private here because user cannot use it in layer 2.

Reimplemented from NLNET::CBufServer.

Definition at line 93 of file callback_server.h.

References nlstop, and NLNET::TSockId.

00093 { nlstop; }

void NLNET::CCallbackServer::send const CMessage buffer,
TSockId  hostid,
bool  log = true
[virtual]
 

Sends a message to the specified host.

Implements NLNET::CCallbackNetBase.

Definition at line 124 of file callback_server.cpp.

References buffer, NLNET::CCallbackNetBase::checkThreadId(), NLNET::CBufServer::nbConnections(), nlassert, NLNET::Sending, sint, and NLNET::TSockId.

Referenced by sendAllMyAssociations().

00125 {
00126         checkThreadId ();
00127         nlassert (connected ());
00128         nlassert (buffer.length() != 0);
00129         nlassert (buffer.typeIsSet());
00130 
00131         if (hostid == InvalidSockId)
00132         {
00133                 // broadcast
00134                 sint nb = nbConnections ();
00135                 _BytesSent += buffer.length () * nb;
00136         }
00137         else
00138         {
00139                 _BytesSent += buffer.length ();
00140         }
00141 
00142 //      if (log)
00143         {
00144 //              nldebug ("LNETL3S: Server: send(%s, %s)", buffer.toString().c_str(), hostid->asString().c_str());
00145         }
00146 
00147 #ifdef USE_MESSAGE_RECORDER
00148         if ( _MR_RecordingState != Replay )
00149         {
00150 #endif
00151 
00152                 // Send
00153                 CBufServer::send (buffer, hostid);
00154 
00155 #ifdef USE_MESSAGE_RECORDER
00156                 if ( _MR_RecordingState == Record )
00157                 {
00158                         // Record sent message
00159                         _MR_Recorder.recordNext( _MR_UpdateCounter, Sending, hostid, const_cast<CMessage&>(buffer) );
00160                 }
00161         }
00162         else
00163         {       
00165         }
00166 #endif
00167 }

void NLNET::CCallbackServer::sendAllMyAssociations TSockId  to  )  [private]
 

Used by client and server class.

Reimplemented from NLNET::CCallbackNetBase.

Definition at line 91 of file callback_server.cpp.

References NLNET::CBufSock::asString(), NLNET::CCallbackNetBase::checkThreadId(), NLNET::CCallbackNetBase::getSIDA(), NLMISC::CStringIdArray::getString(), nlassert, nldebug, send(), NLMISC::CMemStream::serial(), NLMISC::CStringIdArray::size(), size, NLNET::TSockId, and NLMISC::CStringIdArray::TStringId.

00092 {
00093         nlassert (to != InvalidSockId); // invalid hostid
00094         checkThreadId ();
00095         nlassert (connected ());
00096 
00097         // he wants all associations
00098         CMessage msgout (getSIDA(), "RAA");
00099 
00100         CStringIdArray::TStringId size;
00101         size = _OutputSIDA.size ();
00102 
00103         nldebug ("LNETL3S: Send all (%d) my string association to %s", size, to->asString().c_str());
00104         
00105         msgout.serial (size);
00106 
00107         for (CStringIdArray::TStringId i = 0; i < size; i++)
00108         {
00109 //              nldebug ("LNETL3S:  sending association '%s' -> %d", _OutputSIDA.getString(i).c_str (), i);
00110                 string str(_OutputSIDA.getString(i));
00111                 msgout.serial (str);
00112                 msgout.serial (i);
00113         }
00114 
00115         send (msgout, to);
00116 }

void NLNET::CCallbackServer::setConnectionCallback TNetCallback  cb,
void *  arg
[inline]
 

Sets callback for incoming connections (or NULL to disable callback).

Reimplemented from NLNET::CBufServer.

Definition at line 61 of file callback_server.h.

References NLNET::CCallbackNetBase::checkThreadId(), and NLNET::TNetCallback.

Referenced by NLNET::CNetManager::addServer(), NLNET::CUnifiedNetwork::init(), and NLNET::CLoginServer::init().

void NLNET::CBufNetBase::setDataAvailableFlag bool  da  )  [inline, protected, inherited]
 

Sets _DataAvailable.

Definition at line 203 of file buf_net_base.h.

References NLNET::CBufNetBase::_DataAvailable.

Referenced by NLNET::CBufServer::dataAvailable(), NLNET::CBufClient::dataAvailable(), NLNET::CBufClient::disconnect(), NLNET::CBufNetBase::pushMessageIntoReceiveQueue(), NLNET::CBufServer::receive(), NLNET::CBufClient::receive(), and NLNET::CServerReceiveTask::run().

00203 { _DataAvailable = da; }

void NLNET::CCallbackNetBase::setDefaultCallback TMsgCallback  defaultCallback  )  [inline, inherited]
 

Sets default callback for unknown message types.

Definition at line 110 of file callback_net_base.h.

References NLNET::CCallbackNetBase::_DefaultCallback, and NLNET::TMsgCallback.

Referenced by NLNET::CUnifiedNetwork::addService(), and NLNET::CUnifiedNetwork::init().

00110 { _DefaultCallback = defaultCallback; }

void NLNET::CCallbackServer::setDisconnectionCallback TNetCallback  cb,
void *  arg
[inline]
 

Sets callback for disconnections (or NULL to disable callback).

Reimplemented from NLNET::CCallbackNetBase.

Definition at line 64 of file callback_server.h.

References NLNET::CCallbackNetBase::checkThreadId(), and NLNET::TNetCallback.

Referenced by NLNET::CUnifiedNetwork::init().

00064 { checkThreadId (); CCallbackNetBase::setDisconnectionCallback (cb, arg); }

void NLNET::CBufNetBase::setMaxExpectedBlockSize sint32  limit  )  [inline, inherited]
 

Sets the max size of the received messages. If receiving a message bigger than the limit, the connection will be dropped.

Default value: 1 MegaByte If you put a negative number as limit, the max size is reset to the default value. Warning: you can call this method only at initialization time, before connecting (for a client) or calling init() (for a server) !

Definition at line 109 of file buf_net_base.h.

References NLNET::CBufNetBase::_MaxExpectedBlockSize, sint32, and uint32.

00110         {
00111                 if ( limit < 0 )
00112                         _MaxExpectedBlockSize = 1048576;
00113                 else
00114                         _MaxExpectedBlockSize = (uint32)limit;
00115         }

void NLNET::CBufNetBase::setMaxSentBlockSize sint32  limit  )  [inline, inherited]
 

Sets the max size of the sent messages. Note: Limiting of sending not implemented, currently

Default value: 1 MegaByte If you put a negative number as limit, the max size is reset to the default value. Warning: you can call this method only at initialization time, before connecting (for a client) or calling init() (for a server) !

Definition at line 126 of file buf_net_base.h.

References NLNET::CBufNetBase::_MaxSentBlockSize, sint32, and uint32.

00127         {
00128                 if ( limit < 0 )
00129                         _MaxSentBlockSize = 1048576;
00130                 else
00131                         _MaxSentBlockSize = (uint32)limit;
00132         }

void NLNET::CCallbackNetBase::setOtherSideAssociations const char **  associationarray,
NLMISC::CStringIdArray::TStringId  arraysize
[inherited]
 

Gives some association of the other side. The goal is, in specific case, we don't want to ask associations to the other side (client is not secure for example). In this case, we can set other side associations by hand using this functions.

Definition at line 451 of file callback_net_base.cpp.

References NLMISC::CStringIdArray::addString(), NLNET::CCallbackNetBase::checkThreadId(), NLNET::CCallbackNetBase::getSIDA(), nldebug, sint, and NLMISC::CStringIdArray::TStringId.

00452 {
00453         checkThreadId ();
00454 
00455         nldebug ("LNETL3NB_ASSOC: setOtherSideAssociations() sets %d association strings", arraysize);
00456 
00457         for (sint i = 0; i < arraysize; i++)
00458         {
00459                 nldebug ("LNETL3NB_ASSOC:  association '%s' -> %d", associationarray[i], i);
00460                 getSIDA().addString (associationarray[i], i);
00461         }
00462 }

void NLNET::CBufServer::setSizeFlushTrigger TSockId  destid,
sint32  size
[inline, inherited]
 

Sets the size flush trigger. When the size of the send queue reaches or exceeds this value, all data in the send queue is automatically sent (-1 to disable this trigger )

Definition at line 226 of file buf_server.h.

References nlassert, NLNET::CBufSock::setSizeFlushTrigger(), sint32, size, and NLNET::TSockId.

00226 { nlassert( destid != InvalidSockId ); destid->setSizeFlushTrigger( size ); }

void NLNET::CBufServer::setTimeFlushTrigger TSockId  destid,
sint32  ms
[inline, inherited]
 

Sets the time flush trigger (in millisecond). When this time is elapsed, all data in the send queue is automatically sent (-1 to disable this trigger)

Definition at line 221 of file buf_server.h.

References nlassert, NLNET::CBufSock::setTimeFlushTrigger(), sint32, and NLNET::TSockId.

00221 { nlassert( destid != InvalidSockId ); destid->setTimeFlushTrigger( ms ); }

void NLNET::CBufServer::update  )  [inherited]
 

Update the network (call this method evenly).

Definition at line 613 of file buf_server.cpp.

References NLNET::CServerReceiveTask::_Connections, NLNET::CBufServer::_NbConnections, NLNET::CBufServer::_ThreadPool, nldebug, and NLNET::CBufServer::receiveTask().

00614 {
00615         //nlnettrace( "CBufServer::update-BEGIN" );
00616 
00617         _NbConnections = 0;
00618 
00619         // For each thread
00620         CThreadPool::iterator ipt;
00621         {
00622           //nldebug( "UPD: Acquiring the Thread Pool" );
00623                 CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
00624                 //nldebug( "UPD: Acquired." );
00625                 for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
00626                 {
00627                         // For each thread of the pool
00628                         CServerReceiveTask *task = receiveTask(ipt);
00629                         CConnections::iterator ipb;
00630                         {
00631                                 CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
00632                                 for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
00633                                 {
00634                                     // For each socket of the thread, update sending
00635                                     if ( ! ((*ipb)->Sock->connected() && (*ipb)->update()) )
00636                                     {
00637                                                 // Update did not work or the socket is not connected anymore
00638                                         nldebug( "LNETL1: Socket %s is disconnected", (*ipb)->asString().c_str() );
00639                                                 // Disconnection event if disconnected (known either from flush (in update) or when receiving data)
00640                                                 (*ipb)->advertiseDisconnection( this, *ipb );
00641                                         
00642                                                 /*if ( (*ipb)->advertiseDisconnection( this, *ipb ) )
00643                                                 {
00644                                                         // Now the connection removal is in dataAvailable()
00645                                                         // POLL6
00646                                                 }*/
00647                                     }
00648                                     else
00649                                     {
00650                                                 _NbConnections++;
00651                                     }
00652                                 }
00653                         }
00654                 }
00655         }
00656 
00657         //nlnettrace( "CBufServer::update-END" );
00658 }

void NLNET::CCallbackServer::update sint32  timeout = 0  )  [virtual]
 

Updates the network (call this method evenly).

Reimplemented from NLNET::CCallbackNetBase.

Definition at line 175 of file callback_server.cpp.

References NLNET::CCallbackNetBase::baseUpdate(), NLNET::CCallbackNetBase::checkThreadId(), H_AUTO, nlassert, and sint32.

Referenced by NLNET::CUnifiedNetwork::update().

00176 {
00177         H_AUTO(L3UpdateServer);
00178 
00179         checkThreadId ();
00180         nlassert (connected ());
00181 
00182         //      nldebug ("LNETL3S: Client: update()");
00183         baseUpdate ( timeout ); // first receive
00184 
00185 #ifdef USE_MESSAGE_RECORDER
00186         if ( _MR_RecordingState != Replay )
00187         {
00188 #endif
00189 
00190                 // L1-2 Update (nothing to do in replay mode)
00191                 CBufServer::update (); // then send
00192 
00193 #ifdef USE_MESSAGE_RECORDER
00194         }
00195 #endif
00196 
00197 }


Friends And Related Function Documentation

void cbsNewConnection TSockId  from,
void *  data
[friend]
 

Definition at line 45 of file callback_server.cpp.

00046 {
00047         nlassert (data != NULL);
00048         CCallbackServer *server = (CCallbackServer *)data;
00049 
00050         nldebug("LNETL3S: newConnection()");
00051 
00052 #ifdef USE_MESSAGE_RECORDER
00053         // Record connection
00054         server->noticeConnection( from );
00055 #endif
00056 
00057         // send all my association to the new client
00058 // association are disactivated so we don t need to send them
00059 //      server->sendAllMyAssociations (from);
00060 
00061         // call the client callback if necessary
00062         if (server->_ConnectionCallback != NULL)
00063                 server->_ConnectionCallback (from, server->_ConnectionCbArg);
00064 }

friend class CListenTask [friend, inherited]
 

Definition at line 272 of file buf_server.h.

Referenced by NLNET::CBufServer::CBufServer().

friend class CServerBufSock [friend, inherited]
 

Definition at line 271 of file buf_server.h.

friend class CServerReceiveTask [friend, inherited]
 

Definition at line 273 of file buf_server.h.

Referenced by NLNET::CBufServer::addNewThread().

friend class NLNET::CBufSock [friend, inherited]
 

Definition at line 148 of file buf_net_base.h.


Field Documentation

uint64 NLNET::CCallbackNetBase::_BytesReceived [protected, inherited]
 

Definition at line 167 of file callback_net_base.h.

Referenced by NLNET::CCallbackNetBase::CCallbackNetBase(), NLNET::CCallbackNetBase::getBytesReceived(), and NLNET::CCallbackNetBase::processOneMessage().

uint64 NLNET::CCallbackNetBase::_BytesSent [protected, inherited]
 

Definition at line 167 of file callback_net_base.h.

Referenced by NLNET::CCallbackNetBase::CCallbackNetBase(), and NLNET::CCallbackNetBase::getBytesSent().

std::vector<TCallbackItem> NLNET::CCallbackNetBase::_CallbackArray [protected, inherited]
 

Definition at line 196 of file callback_net_base.h.

Referenced by NLNET::CCallbackNetBase::addCallbackArray(), and NLNET::CCallbackNetBase::processOneMessage().

TNetCallback NLNET::CCallbackServer::_ConnectionCallback [private]
 

Connection callback.

Reimplemented from NLNET::CBufServer.

Definition at line 100 of file callback_server.h.

Referenced by NLNET::cbsNewConnection().

void* NLNET::CCallbackServer::_ConnectionCbArg [private]
 

Argument of the connection callback.

Reimplemented from NLNET::CBufServer.

Definition at line 101 of file callback_server.h.

Referenced by NLNET::cbsNewConnection().

TMsgCallback NLNET::CCallbackNetBase::_DefaultCallback [protected, inherited]
 

Definition at line 199 of file callback_net_base.h.

Referenced by NLNET::CCallbackNetBase::processOneMessage(), and NLNET::CCallbackNetBase::setDefaultCallback().

bool NLNET::CCallbackNetBase::_FirstUpdate [protected, inherited]
 

Definition at line 202 of file callback_net_base.h.

Referenced by NLNET::CCallbackNetBase::baseUpdate().

NLMISC::CStringIdArray NLNET::CCallbackNetBase::_InputSIDA [protected, inherited]
 

Definition at line 188 of file callback_net_base.h.

Referenced by NLNET::CCallbackNetBase::baseUpdate(), NLNET::CCallbackNetBase::getSIDA(), and NLNET::CCallbackNetBase::ignoreAllUnknownId().

bool NLNET::CCallbackNetBase::_IsAServer [protected, inherited]
 

Definition at line 201 of file callback_net_base.h.

Referenced by NLNET::CCallbackNetBase::isAServer().

TNetCallback NLNET::CCallbackNetBase::_NewDisconnectionCallback [protected, inherited]
 

Used by client and server class.

Definition at line 170 of file callback_net_base.h.

Referenced by NLNET::CCallbackNetBase::CCallbackNetBase().

NLMISC::CStringIdArray NLNET::CCallbackNetBase::_OutputSIDA [protected, inherited]
 

Definition at line 193 of file callback_net_base.h.

Referenced by NLNET::CCallbackNetBase::addCallbackArray(), NLNET::cbnbMessageAskAssociations(), NLNET::CCallbackNetBase::displayAllMyAssociations(), and NLNET::CCallbackNetBase::processOneMessage().

uint NLNET::CCallbackNetBase::_ThreadId [protected, inherited]
 

Todo:
ace: debug feature that we should remove one day nefore releasing the game

Definition at line 232 of file callback_net_base.h.

Referenced by NLNET::CCallbackNetBase::CCallbackNetBase().


The documentation for this class was generated from the following files:
Generated on Tue Mar 16 13:56:32 2004 for NeL by doxygen 1.3.6