00001
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026 #include "stdnet.h"
00027
00028 #include "nel/misc/hierarchical_timer.h"
00029
00030 #include "nel/net/buf_client.h"
00031 #include "nel/misc/thread.h"
00032 #include "nel/net/dummy_tcp_sock.h"
00033
00034 #ifdef NL_OS_WINDOWS
00035 #include <winsock2.h>
00036 #elif defined NL_OS_UNIX
00037 #include <netinet/in.h>
00038 #endif
00039
00040 using namespace NLMISC;
00041 using namespace std;
00042
00043
00044 namespace NLNET {
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054 CBufClient::CBufClient( bool nodelay, bool replaymode ) :
00055 CBufNetBase(),
00056 _NoDelay( nodelay ),
00057 _PrevBytesDownloaded( 0 ),
00058 _PrevBytesUploaded( 0 ),
00059 _RecvTask( NULL ),
00060 _RecvThread( NULL )
00061
00062
00063 {
00064 nlnettrace( "CBufClient::CBufClient" );
00065
00066 if ( replaymode )
00067 {
00068 _BufSock = new CBufSock( new CDummyTcpSock() );
00069 }
00070 else
00071 {
00072 _BufSock = new CBufSock();
00073 _RecvTask = new CClientReceiveTask( this, _BufSock );
00074 }
00075 }
00076
00077
00078
00079
00080
00081
00082 void CBufClient::connect( const CInetAddress& addr )
00083 {
00084 nlnettrace( "CBufClient::connect" );
00085 nlassert( ! _BufSock->Sock->connected() );
00086 _BufSock->connect( addr, _NoDelay, true );
00087 _PrevBytesDownloaded = 0;
00088 _PrevBytesUploaded = 0;
00089
00090
00091
00092
00093 if ( _RecvThread != NULL )
00094 {
00095 delete _RecvThread;
00096 }
00097
00098 _RecvThread = IThread::create( _RecvTask );
00099 _RecvThread->start();
00100 }
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110
00111 void CBufClient::send( const NLMISC::CMemStream& buffer )
00112 {
00113 nlnettrace( "CBufClient::send" );
00114 nlassert( buffer.length() > 0 );
00115 nlassert( buffer.length() <= maxSentBlockSize() );
00116
00117
00118
00119 if ( ! _BufSock->pushBuffer( buffer ) )
00120 {
00121
00122 _BufSock->advertiseDisconnection( this, NULL );
00123 }
00124 }
00125
00126
00127
00128
00129
00130 bool CBufClient::dataAvailable()
00131 {
00132
00133 {
00134
00135
00136
00137
00138 while ( dataAvailableFlag() )
00139 {
00140
00141 uint8 val;
00142 {
00143 CFifoAccessor recvfifo( &receiveQueue() );
00144 val = recvfifo.value().frontLast ();
00145 }
00146
00147
00148 switch ( val )
00149 {
00150
00151
00152 case CBufNetBase::User:
00153 return true;
00154
00155
00156 case CBufNetBase::Disconnection:
00157
00158 nldebug( "Disconnection event" );
00159 _BufSock->setConnectedState( false );
00160
00161
00162 if ( disconnectionCallback() != NULL )
00163 {
00164 disconnectionCallback()( id(), argOfDisconnectionCallback() );
00165 }
00166
00167
00168
00169 break;
00170
00171 default:
00172 {
00173 CFifoAccessor recvfifo( &receiveQueue() );
00174 vector<uint8> buffer;
00175 recvfifo.value().front (buffer);
00176 nlinfo( "LNETL1: Invalid block type: %hu (should be = %hu)", (uint16)(buffer[buffer.size()-1]), (uint16)val );
00177 nlinfo( "LNETL1: Buffer (%d B): [%s]", buffer.size(), stringFromVector(buffer).c_str() );
00178 nlinfo( "LNETL1: Receive queue:" );
00179 recvfifo.value().display();
00180 nlerror( "LNETL1: Invalid system event type in client receive queue" );
00181 }
00182 }
00183
00184 {
00185 CFifoAccessor recvfifo( &receiveQueue() );
00186 recvfifo.value().pop();
00187 setDataAvailableFlag( ! recvfifo.value().empty() );
00188 }
00189
00190 }
00191
00192 return false;
00193 }
00194 }
00195
00196
00197
00198
00199
00200
00201
00202
00203
00204
00205
00206
00207
00208
00209
00210
00211
00212
00213
00214
00215
00216
00217
00218
00219
00220
00221
00222
00223
00224
00225
00226
00227
00228
00229
00230
00231
00232
00233
00234
00235
00236
00237
00238
00239
00240
00241
00242
00243
00244
00245
00246
00247
00248
00249
00250
00251
00252
00253
00254
00255
00256
00257
00258
00259
00260
00261
00262 void CBufClient::receive( NLMISC::CMemStream& buffer )
00263 {
00264 nlnettrace( "CBufClient::receive" );
00265
00266
00267
00268 {
00269 CFifoAccessor recvfifo( &receiveQueue() );
00270 nlassert( ! recvfifo.value().empty() );
00271 recvfifo.value().front( buffer );
00272 recvfifo.value().pop();
00273 setDataAvailableFlag( ! recvfifo.value().empty() );
00274 }
00275
00276
00277 nlassert( buffer.buffer()[buffer.length()-1] == CBufNetBase::User );
00278
00279 buffer.resize( buffer.length()-1 );
00280 }
00281
00282
00283
00284
00285
00286 void CBufClient::update()
00287 {
00288
00289
00290
00291 bool sendingok = _BufSock->update();
00292
00293
00294 if ( ! ( _BufSock->Sock->connected() && sendingok ) )
00295 {
00296 if ( _BufSock->Sock->connected() )
00297 {
00298 _BufSock->Sock->disconnect();
00299 }
00300 _BufSock->advertiseDisconnection( this, NULL );
00301 }
00302 }
00303
00304
00305
00306
00307
00308 void CBufClient::disconnect( bool quick )
00309 {
00310 nlnettrace( "CBufClient::disconnect" );
00311
00312
00313 nlassert( _BufSock->connectedState() );
00314
00315
00316
00317
00318 if ( _BufSock->Sock->connected() )
00319 {
00320
00321 if ( ! quick )
00322 {
00323 _BufSock->flush();
00324 }
00325
00326
00327 _BufSock->disconnect( false );
00328 }
00329
00330
00331 {
00332 CFifoAccessor recvfifo( &receiveQueue() );
00333 recvfifo.value().clear();
00334 setDataAvailableFlag( false );
00335 }
00336 }
00337
00338
00339
00340 inline uint64 updateStatCounter( uint64& counter, uint64 newvalue )
00341 {
00342 uint64 result = newvalue - counter;
00343 counter = newvalue;
00344 return result;
00345 }
00346
00347
00348
00349
00350
00351 uint64 CBufClient::newBytesDownloaded()
00352 {
00353 return updateStatCounter( _PrevBytesDownloaded, bytesDownloaded() );
00354 }
00355
00356
00357
00358
00359
00360 uint64 CBufClient::newBytesUploaded()
00361 {
00362 return updateStatCounter( _PrevBytesUploaded, bytesUploaded() );
00363 }
00364
00365
00366
00367
00368
00369
00370
00371
00372
00373
00374
00375
00376
00377
00378
00379
00380
00381
00382
00383
00384
00385
00386
00387 CBufClient::~CBufClient()
00388 {
00389 nlnettrace( "CBufClient::~CBufClient" );
00390
00391
00392 if ( _BufSock->Sock->connected() )
00393 {
00394 nlassert( _BufSock->connectedState() );
00395
00396 disconnect( true );
00397 }
00398
00399
00400 if ( _RecvThread != NULL )
00401 {
00402 nldebug( "LNETL1: Waiting for the end of the receive thread..." );
00403 _RecvThread->wait();
00404 }
00405
00406 if ( _RecvTask != NULL )
00407 delete _RecvTask;
00408
00409 if ( _RecvThread != NULL )
00410 delete _RecvThread;
00411
00412 if ( _BufSock != NULL )
00413 delete _BufSock;
00414
00415 nlnettrace( "Exiting CBufClient::~CBufClient" );
00416 }
00417
00418
00419
00420
00421
00422
00423
00424
00425
00426
00427 void CClientReceiveTask::run()
00428 {
00429 nlnettrace( "CClientReceiveTask::run" );
00430
00431 bool connected = true;
00432 while ( connected )
00433 {
00434 try
00435 {
00436
00437 TBlockSize blocklen;
00438 uint32 lenoflen = sizeof(blocklen);
00439 sock()->receive( (uint8*)&blocklen, lenoflen );
00440 uint32 len = ntohl( blocklen );
00441
00442 if ( len != 0 )
00443 {
00444
00445 if ( len > _Client->maxExpectedBlockSize() )
00446 {
00447 nlwarning( "LNETL1: Socket %s received length exceeding max expected, in block header... Disconnecting", _SockId->asString().c_str() );
00448 throw ESocket( "Received length exceeding max expected", false );
00449 }
00450
00451
00452 CObjectVector<uint8> buffer;
00453 buffer.resize(len+1);
00454
00455 sock()->receive( buffer.getPtr(), len );
00456
00457
00458
00459 buffer[len] = CBufNetBase::User;
00460
00461
00462 _Client->pushMessageIntoReceiveQueue( buffer.getPtr(), buffer.size() );
00463 }
00464 else
00465 {
00466 nlwarning( "LNETL1: Socket %s received null length in block header", _SockId->asString().c_str() );
00467 }
00468 }
00469 catch ( ESocketConnectionClosed& )
00470 {
00471 nldebug( "LNETL1: Client connection %s closed", _SockId->asString().c_str() );
00472
00473 connected = false;
00474 }
00475 catch ( ESocket& )
00476 {
00477 nldebug( "LNETL1: Client connection %s broken", _SockId->asString().c_str() );
00478 sock()->disconnect();
00479 connected = false;
00480 }
00481 }
00482
00483 nlnettrace( "Exiting CClientReceiveTask::run()" );
00484 }
00485
00486
00487 }