00001
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026 #include "stdnet.h"
00027
00028 #include "nel/misc/string_id_array.h"
00029
00030 #include "nel/net/callback_server.h"
00031
00032 #ifdef USE_MESSAGE_RECORDER
00033 #include "nel/net/dummy_tcp_sock.h"
00034 #endif
00035
00036 using namespace std;
00037 using namespace NLMISC;
00038
00039 namespace NLNET {
00040
00041
00042
00043
00044
00045 void cbsNewConnection (TSockId from, void *data)
00046 {
00047 nlassert (data != NULL);
00048 CCallbackServer *server = (CCallbackServer *)data;
00049
00050 nldebug("LNETL3S: newConnection()");
00051
00052 #ifdef USE_MESSAGE_RECORDER
00053
00054 server->noticeConnection( from );
00055 #endif
00056
00057
00058
00059
00060
00061
00062 if (server->_ConnectionCallback != NULL)
00063 server->_ConnectionCallback (from, server->_ConnectionCbArg);
00064 }
00065
00066
00067
00068
00069
00070 CCallbackServer::CCallbackServer( TRecordingState rec, const string& recfilename, bool recordall ) :
00071 CCallbackNetBase( rec, recfilename, recordall ),
00072 CBufServer( DEFAULT_STRATEGY, DEFAULT_MAX_THREADS, DEFAULT_MAX_SOCKETS_PER_THREADS, true, rec==Replay ),
00073 _ConnectionCallback(NULL),
00074 _ConnectionCbArg(NULL)
00075 {
00076 #ifndef USE_MESSAGE_RECORDER
00077 nlassertex( rec==Off, ("The message recorder is disabled at compilation time ; switch the recording state Off") );
00078 #endif
00079
00080 CBufServer::setDisconnectionCallback (_NewDisconnectionCallback, this);
00081 CBufServer::setConnectionCallback (cbsNewConnection, this);
00082
00083 _IsAServer = true;
00084 _DefaultCallback = NULL;
00085 }
00086
00087
00088
00089
00090
00091 void CCallbackServer::sendAllMyAssociations (TSockId to)
00092 {
00093 nlassert (to != InvalidSockId);
00094 checkThreadId ();
00095 nlassert (connected ());
00096
00097
00098 CMessage msgout (getSIDA(), "RAA");
00099
00100 CStringIdArray::TStringId size;
00101 size = _OutputSIDA.size ();
00102
00103 nldebug ("LNETL3S: Send all (%d) my string association to %s", size, to->asString().c_str());
00104
00105 msgout.serial (size);
00106
00107 for (CStringIdArray::TStringId i = 0; i < size; i++)
00108 {
00109
00110 string str(_OutputSIDA.getString(i));
00111 msgout.serial (str);
00112 msgout.serial (i);
00113 }
00114
00115 send (msgout, to);
00116 }
00117
00118
00119
00120
00121
00122
00123
00124 void CCallbackServer::send (const CMessage &buffer, TSockId hostid, bool log)
00125 {
00126 checkThreadId ();
00127 nlassert (connected ());
00128 nlassert (buffer.length() != 0);
00129 nlassert (buffer.typeIsSet());
00130
00131 if (hostid == InvalidSockId)
00132 {
00133
00134 sint nb = nbConnections ();
00135 _BytesSent += buffer.length () * nb;
00136 }
00137 else
00138 {
00139 _BytesSent += buffer.length ();
00140 }
00141
00142
00143 {
00144
00145 }
00146
00147 #ifdef USE_MESSAGE_RECORDER
00148 if ( _MR_RecordingState != Replay )
00149 {
00150 #endif
00151
00152
00153 CBufServer::send (buffer, hostid);
00154
00155 #ifdef USE_MESSAGE_RECORDER
00156 if ( _MR_RecordingState == Record )
00157 {
00158
00159 _MR_Recorder.recordNext( _MR_UpdateCounter, Sending, hostid, const_cast<CMessage&>(buffer) );
00160 }
00161 }
00162 else
00163 {
00165 }
00166 #endif
00167 }
00168
00169
00170
00171
00172
00173
00174
00175 void CCallbackServer::update ( sint32 timeout )
00176 {
00177 checkThreadId ();
00178 nlassert (connected ());
00179
00180
00181 baseUpdate ( timeout );
00182
00183 #ifdef USE_MESSAGE_RECORDER
00184 if ( _MR_RecordingState != Replay )
00185 {
00186 #endif
00187
00188
00189 CBufServer::update ();
00190
00191 #ifdef USE_MESSAGE_RECORDER
00192 }
00193 #endif
00194 }
00195
00196
00197
00198
00199
00200
00201
00202 void CCallbackServer::receive (CMessage &buffer, TSockId *hostid)
00203 {
00204 checkThreadId ();
00205 nlassert (connected ());
00206
00207 #ifdef USE_MESSAGE_RECORDER
00208 if ( _MR_RecordingState != Replay )
00209 {
00210 #endif
00211
00212
00213 CBufServer::receive (buffer, hostid);
00214
00215 #ifdef USE_MESSAGE_RECORDER
00216 if ( _MR_RecordingState == Record )
00217 {
00218
00219 _MR_Recorder.recordNext( _MR_UpdateCounter, Receiving, *hostid, const_cast<CMessage&>(buffer) );
00220 }
00221 }
00222 else
00223 {
00224
00225 buffer = _MR_Recorder.ReceivedMessages.front().Message;
00226 *hostid = _MR_Recorder.ReceivedMessages.front().SockId;
00227 _MR_Recorder.ReceivedMessages.pop();
00228 }
00229 #endif
00230
00231 buffer.readType ();
00232 }
00233
00234
00235
00236
00237
00238
00239
00240
00241
00242
00243 void CCallbackServer::disconnect( TSockId hostid )
00244 {
00245 checkThreadId ();
00246
00247 #ifdef USE_MESSAGE_RECORDER
00248 if ( _MR_RecordingState != Replay )
00249 {
00250 #endif
00251
00252 CBufServer::disconnect( hostid );
00253
00254 #ifdef USE_MESSAGE_RECORDER
00255 }
00256
00257
00258 #endif
00259 }
00260
00261
00262
00263
00264
00265 TSockId CCallbackServer::getSockId (TSockId hostid)
00266 {
00267 nlassert (hostid != InvalidSockId);
00268 checkThreadId ();
00269 nlassert (connected ());
00270 nlassert (hostid != NULL);
00271 return hostid;
00272 }
00273
00274
00275
00276
00277
00278
00279
00280 bool CCallbackServer::dataAvailable ()
00281 {
00282 checkThreadId ();
00283
00284 #ifdef USE_MESSAGE_RECORDER
00285 if ( _MR_RecordingState != Replay )
00286 {
00287 #endif
00288
00289
00290 return CBufServer::dataAvailable ();
00291
00292 #ifdef USE_MESSAGE_RECORDER
00293 }
00294 else
00295 {
00296
00297 return CCallbackNetBase::replayDataAvailable();
00298 }
00299 #endif
00300 }
00301
00302
00303
00304 #ifdef USE_MESSAGE_RECORDER
00305
00306
00307
00308
00309
00310 bool CCallbackServer::replaySystemCallbacks()
00311 {
00312 do
00313 {
00314 if ( _MR_Recorder.ReceivedMessages.empty() )
00315 {
00316 return false;
00317 }
00318 else
00319 {
00320
00321 TSockId sockid;
00322 std::map<TSockId,TSockId>::iterator isi = _MR_SockIds.find( _MR_Recorder.ReceivedMessages.front().SockId );
00323 if ( isi != _MR_SockIds.end() )
00324 {
00325
00326 sockid = (*isi).second;
00327 _MR_Recorder.ReceivedMessages.front().SockId = sockid;
00328 }
00329
00330
00331 switch( _MR_Recorder.ReceivedMessages.front().Event )
00332 {
00333 case Receiving:
00334 return true;
00335
00336 case Disconnecting:
00337 nldebug( "Disconnection event for %p", sockid );
00338 sockid->Sock->disconnect();
00339 sockid->setConnectedState( false );
00340
00341
00342 if ( disconnectionCallback() != NULL )
00343 {
00344 disconnectionCallback()( sockid, argOfDisconnectionCallback() );
00345 }
00346 break;
00347
00348 case Accepting:
00349 {
00350
00351
00352
00353 CInetAddress addr;
00354 _MR_Recorder.ReceivedMessages.front().Message.serial( addr );
00355
00356
00357 sockid = new CBufSock( new CDummyTcpSock() );
00358 sockid->Sock->connect( addr );
00359 _MR_Connections.push_back( sockid );
00360
00361
00362 _MR_SockIds.insert( make_pair( _MR_Recorder.ReceivedMessages.front().SockId, sockid ) );
00363
00364 nldebug( "LNETL1: Connection event for %p", sockid );
00365 sockid->setConnectedState( true );
00366
00367
00368 if ( connectionCallback() != NULL )
00369 {
00370 connectionCallback()( sockid, argOfConnectionCallback() );
00371 }
00372 break;
00373 }
00374 default:
00375 nlerror( "LNETL1: Invalid system event type in client receive queue" );
00376 }
00377
00378 _MR_Recorder.ReceivedMessages.pop();
00379 }
00380 }
00381 while ( true );
00382 }
00383
00384
00385
00386
00387
00388 void CCallbackServer::noticeConnection( TSockId hostid )
00389 {
00390 nlassert (hostid != InvalidSockId);
00391 if ( _MR_RecordingState != Replay )
00392 {
00393 if ( _MR_RecordingState == Record )
00394 {
00395
00396 CMessage addrmsg;
00397 addrmsg.serial( const_cast<CInetAddress&>(hostAddress(hostid)) );
00398 _MR_Recorder.recordNext( _MR_UpdateCounter, Accepting, hostid, addrmsg );
00399 }
00400 }
00401 else
00402 {
00404 }
00405 }
00406
00407 #endif // USE_MESSAGE_RECORDER
00408
00409
00410 }