# Home    # nevrax.com   
Nevrax
Nevrax.org
#News
#Mailing-list
#Documentation
#CVS
#Bugs
#License
Docs
 
Documentation  
Main Page   Namespace List   Class Hierarchy   Alphabetical List   Compound List   File List   Namespace Members   Compound Members   File Members   Related Pages   Search  

buf_client.cpp

Go to the documentation of this file.
00001 
00007 /* Copyright, 2001 Nevrax Ltd.
00008  *
00009  * This file is part of NEVRAX NEL.
00010  * NEVRAX NEL is free software; you can redistribute it and/or modify
00011  * it under the terms of the GNU General Public License as published by
00012  * the Free Software Foundation; either version 2, or (at your option)
00013  * any later version.
00014 
00015  * NEVRAX NEL is distributed in the hope that it will be useful, but
00016  * WITHOUT ANY WARRANTY; without even the implied warranty of
00017  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
00018  * General Public License for more details.
00019 
00020  * You should have received a copy of the GNU General Public License
00021  * along with NEVRAX NEL; see the file COPYING. If not, write to the
00022  * Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
00023  * MA 02111-1307, USA.
00024  */
00025 
00026 #include "stdnet.h"
00027 
00028 #include "nel/misc/hierarchical_timer.h"
00029 
00030 #include "nel/net/buf_client.h"
00031 #include "nel/misc/thread.h"
00032 #include "nel/net/dummy_tcp_sock.h"
00033 
00034 #ifdef NL_OS_WINDOWS
00035 #include <winsock2.h>
00036 #elif defined NL_OS_UNIX
00037 #include <netinet/in.h>
00038 #endif
00039 
00040 using namespace NLMISC;
00041 using namespace std;
00042 
00043 
00044 namespace NLNET {
00045 
00046 
00047 /***************************************************************************************************
00048  * User main thread (initialization)
00049  **************************************************************************************************/
00050 
00051 /*
00052  * Constructor
00053  */
00054 CBufClient::CBufClient( bool nodelay, bool replaymode ) :
00055         CBufNetBase(),
00056         _NoDelay( nodelay ),
00057         _PrevBytesDownloaded( 0 ),
00058         _PrevBytesUploaded( 0 ),
00059         _RecvTask( NULL ),
00060         _RecvThread( NULL )
00061         /*_PrevBytesReceived( 0 ),
00062         _PrevBytesSent( 0 )*/
00063 {
00064         nlnettrace( "CBufClient::CBufClient" ); // don't define a global object
00065 
00066         if ( replaymode )
00067         {
00068                 _BufSock = new CBufSock( new CDummyTcpSock() );
00069         }
00070         else
00071         {
00072                 _BufSock = new CBufSock();
00073                 _RecvTask = new CClientReceiveTask( this, _BufSock );
00074         }
00075 }
00076 
00077 
00078 /*
00079  * Connects to the specified host
00080  * Precond: not connected
00081  */
00082 void CBufClient::connect( const CInetAddress& addr )
00083 {
00084         nlnettrace( "CBufClient::connect" );
00085         nlassert( ! _BufSock->Sock->connected() );
00086         _BufSock->connect( addr, _NoDelay, true );
00087         _PrevBytesDownloaded = 0;
00088         _PrevBytesUploaded = 0;
00089         /*_PrevBytesReceived = 0;
00090         _PrevBytesSent = 0;*/
00091 
00092         // Allow reconnection
00093         if ( _RecvThread != NULL )
00094         {
00095                 delete _RecvThread;
00096         }
00097 
00098         _RecvThread = IThread::create( _RecvTask );
00099         _RecvThread->start();
00100 }
00101 
00102 
00103 /***************************************************************************************************
00104  * User main thread (running)
00105  **************************************************************************************************/
00106 
00107 
00108 /*
00109  * Sends a message to the remote host
00110  */
00111 void CBufClient::send( const NLMISC::CMemStream& buffer )
00112 {
00113         nlnettrace( "CBufClient::send" );
00114         nlassert( buffer.length() > 0 );
00115         nlassert( buffer.length() <= maxSentBlockSize() );
00116 
00117         // slow down the layer H_AUTO (CBufServer_send);
00118 
00119         if ( ! _BufSock->pushBuffer( buffer ) )
00120         {
00121                 // Disconnection event if disconnected
00122                 _BufSock->advertiseDisconnection( this, NULL );
00123         }
00124 }
00125 
00126 
00127 /*
00128  * Checks if there are some data to receive
00129  */
00130 bool CBufClient::dataAvailable()
00131 {
00132         // slow down the layer H_AUTO (CBufClient_dataAvailable);
00133         {
00134                 /* If no data available, enter the 'while' loop and return false (1 volatile test)
00135                  * If there are user data available, enter the 'while' and return true immediately (1 volatile test + 1 short locking)
00136                  * If there is a disconnection event (rare), call the callback and loop
00137                  */
00138                 while ( dataAvailableFlag() )
00139                 {
00140                         // Because _DataAvailable is true, the receive queue is not empty at this point
00141                         uint8 val;
00142                         {
00143                                 CFifoAccessor recvfifo( &receiveQueue() );
00144                                 val = recvfifo.value().frontLast ();
00145                         }
00146 
00147                         // Test if it the next block is a system event
00148                         switch ( val )
00149                         {
00150                                 
00151                         // Normal message available
00152                         case CBufNetBase::User:
00153                                 return true; // return immediatly, do not extract the message
00154 
00155                         // Process disconnection event
00156                         case CBufNetBase::Disconnection:
00157 
00158                                 nldebug( "Disconnection event" );
00159                                 _BufSock->setConnectedState( false );
00160 
00161                                 // Call callback if needed
00162                                 if ( disconnectionCallback() != NULL )
00163                                 {
00164                                         disconnectionCallback()( id(), argOfDisconnectionCallback() );
00165                                 }
00166 
00167                                 // Unlike the server version, we do not delete the CBufSock object here,
00168                                 // it will be done in the destructor of CBufClient
00169                                 break;
00170 
00171                         default: // should not occur
00172                                 {
00173                                         CFifoAccessor recvfifo( &receiveQueue() );
00174                                         vector<uint8> buffer;
00175                                         recvfifo.value().front (buffer);
00176                                         nlinfo( "LNETL1: Invalid block type: %hu (should be = %hu)", (uint16)(buffer[buffer.size()-1]), (uint16)val );
00177                                         nlinfo( "LNETL1: Buffer (%d B): [%s]", buffer.size(), stringFromVector(buffer).c_str() );
00178                                         nlinfo( "LNETL1: Receive queue:" );
00179                                         recvfifo.value().display();
00180                                         nlerror( "LNETL1: Invalid system event type in client receive queue" );
00181                                 }
00182                         }
00183                         // Extract system event
00184                         {
00185                                 CFifoAccessor recvfifo( &receiveQueue() );
00186                                 recvfifo.value().pop();
00187                                 setDataAvailableFlag( ! recvfifo.value().empty() );
00188                         }
00189 
00190                 }
00191                 // _DataAvailable is false here
00192                 return false;
00193         }
00194 }
00195 
00196 
00197 /* // OLD VERSION
00198 bool CBufClient::dataAvailable()
00199 {
00200         // slow down the layer H_AUTO (CBufClient_dataAvailable);
00201         {
00202                 CFifoAccessor recvfifo( &receiveQueue() );
00203                 do
00204                 {
00205                         // Check if the receive queue is empty
00206                         if ( recvfifo.value().empty() )
00207                         {
00208                                 return false;
00209                         }
00210                         else
00211                         {
00212                                 uint8 val = recvfifo.value().frontLast ();
00213 
00214                                 // Test if it the next block is a system event
00215                                 switch ( val )
00216                                 {
00217                                         
00218                                 // Normal message available
00219                                 case CBufNetBase::User:
00220                                         return true; // return immediatly, do not extract the message
00221 
00222                                 // Process disconnection event
00223                                 case CBufNetBase::Disconnection:
00224 
00225                                         nldebug( "Disconnection event" );
00226                                         _BufSock->setConnectedState( false );
00227 
00228                                         // Call callback if needed
00229                                         if ( disconnectionCallback() != NULL )
00230                                         {
00231                                                 disconnectionCallback()( id(), argOfDisconnectionCallback() );
00232                                         }
00233 
00234                                         // Unlike the server version, we do not delete the CBufSock object here,
00235                                         // it will be done in the destructor of CBufClient
00236                                         break;
00237 
00238                                 default:
00239                                         {
00240                                         vector<uint8> buffer;
00241                                         recvfifo.value().front (buffer);
00242                                         nlinfo( "LNETL1: Invalid block type: %hu (should be = %hu)", (uint16)(buffer[buffer.size()-1]), (uint16)val );
00243                                         nlinfo( "LNETL1: Buffer (%d B): [%s]", buffer.size(), stringFromVector(buffer).c_str() );
00244                                         nlinfo( "LNETL1: Receive queue:" );
00245                                         recvfifo.value().display();
00246                                         nlerror( "LNETL1: Invalid system event type in client receive queue" );
00247                                         }
00248                                 }
00249                                 // Extract system event
00250                                 recvfifo.value().pop();
00251                         }
00252                 }
00253                 while ( true );
00254         }
00255 }
00256 */
00257 
00258 /*
00259  * Receives next block of data in the specified buffer (resizes the vector)
00260  * Precond: dataAvailable() has returned true
00261  */
00262 void CBufClient::receive( NLMISC::CMemStream& buffer )
00263 {
00264         nlnettrace( "CBufClient::receive" );
00265         //nlassert( dataAvailable() );
00266 
00267         // Extract buffer from the receive queue
00268         {
00269                 CFifoAccessor recvfifo( &receiveQueue() );
00270                 nlassert( ! recvfifo.value().empty() );
00271                 recvfifo.value().front( buffer );
00272                 recvfifo.value().pop();
00273                 setDataAvailableFlag( ! recvfifo.value().empty() );
00274         }
00275 
00276         // Extract event type
00277         nlassert( buffer.buffer()[buffer.length()-1] == CBufNetBase::User );
00278         //commented for optimisation nldebug( "LNETL1: Client read buffer (%d+%d B)", buffer.length(), sizeof(TSockId)+1 );
00279         buffer.resize( buffer.length()-1 );
00280 }
00281 
00282 
00283 /*
00284  * Update the network (call this method evenly)
00285  */
00286 void CBufClient::update()
00287 {
00288         //nlnettrace( "CBufClient::update" );
00289 
00290         // Update sending
00291         bool sendingok = _BufSock->update();
00292 
00293         // Disconnection event if disconnected
00294         if ( ! ( _BufSock->Sock->connected() && sendingok ) )
00295         {
00296                 if ( _BufSock->Sock->connected() )
00297                 {
00298                         _BufSock->Sock->disconnect();
00299                 }
00300                 _BufSock->advertiseDisconnection( this, NULL );
00301         }
00302 }
00303 
00304 
00305 /*
00306  * Disconnect the remote host
00307  */
00308 void CBufClient::disconnect( bool quick )
00309 {
00310         nlnettrace( "CBufClient::disconnect" );
00311 
00312         // Do not allow to disconnect a socket that is not connected
00313         nlassert( _BufSock->connectedState() );
00314 
00315         // When the NS tells us to remove this connection AND the connection has physically
00316         // disconnected but not yet logically (i.e. disconnection event not processed yet),
00317         // skip flushing and physical active disconnection
00318         if ( _BufSock->Sock->connected() )
00319         {
00320                 // Flush sending is asked for
00321                 if ( ! quick )
00322                 {
00323                         _BufSock->flush();
00324                 }
00325 
00326                 // Disconnect and prevent from advertising the disconnection
00327                 _BufSock->disconnect( false );
00328         }
00329 
00330         // Empty the receive queue
00331         {
00332                 CFifoAccessor recvfifo( &receiveQueue() );
00333                 recvfifo.value().clear();
00334                 setDataAvailableFlag( false );
00335         }
00336 }
00337 
00338 
00339 // Utility function for newBytes...()
00340 inline uint64 updateStatCounter( uint64& counter, uint64 newvalue )
00341 {
00342         uint64 result = newvalue - counter;
00343         counter = newvalue;
00344         return result;
00345 }
00346 
00347 
00348 /*
00349  * Returns the number of bytes downloaded since the previous call to this method
00350  */
00351 uint64 CBufClient::newBytesDownloaded()
00352 {
00353         return updateStatCounter( _PrevBytesDownloaded, bytesDownloaded() );
00354 }
00355 
00356 
00357 /*
00358  * Returns the number of bytes uploaded since the previous call to this method
00359  */
00360 uint64 CBufClient::newBytesUploaded()
00361 {
00362         return updateStatCounter( _PrevBytesUploaded, bytesUploaded() );
00363 }
00364 
00365 
00366 /*
00367  * Returns the number of bytes popped by receive() since the previous call to this method
00368  */
00369 /*uint64 CBufClient::newBytesReceived()
00370 {
00371         return updateStatCounter( _PrevBytesReceived, bytesReceived() );
00372 }*/
00373 
00374 
00375 /*
00376  * Returns the number of bytes pushed by send() since the previous call to this method
00377  */
00378 /*uint64 CBufClient::newBytesSent()
00379 {
00380         return updateStatCounter( _PrevBytesSent, bytesSent() );
00381 }*/
00382 
00383 
00384 /*
00385  * Destructor
00386  */
00387 CBufClient::~CBufClient()
00388 {
00389         nlnettrace( "CBufClient::~CBufClient" );
00390 
00391         // Disconnect if not done
00392         if ( _BufSock->Sock->connected() )
00393         {
00394                 nlassert( _BufSock->connectedState() );
00395 
00396                 disconnect( true );
00397         }
00398 
00399         // Clean thread termination
00400         if ( _RecvThread != NULL )
00401         {
00402                 nldebug( "LNETL1: Waiting for the end of the receive thread..." );
00403                 _RecvThread->wait();
00404         }
00405 
00406         if ( _RecvTask != NULL )
00407                 delete _RecvTask;
00408 
00409         if ( _RecvThread != NULL )
00410                 delete _RecvThread;
00411 
00412         if ( _BufSock != NULL )
00413                 delete _BufSock;
00414 
00415         nlnettrace( "Exiting CBufClient::~CBufClient" );
00416 }
00417 
00418 
00419 /***************************************************************************************************
00420  * Receive thread 
00421  **************************************************************************************************/
00422 
00423 
00424 /*
00425  * Code of receiving thread for clients
00426  */
00427 void CClientReceiveTask::run()
00428 {
00429         nlnettrace( "CClientReceiveTask::run" );
00430 
00431         bool connected = true;
00432         while ( connected ) // does not call _Sock->connected() to avoid mutex (not needed for client)
00433         {
00434                 try
00435                 {
00436                         // Receive message length (in blocking mode)
00437                         TBlockSize blocklen;
00438                         uint32 lenoflen = sizeof(blocklen);
00439                         sock()->receive( (uint8*)&blocklen, lenoflen );
00440                         uint32 len = ntohl( blocklen );
00441         
00442                         if ( len != 0 )
00443                         {
00444                                 // Test size limit
00445                                 if ( len > _Client->maxExpectedBlockSize() )
00446                                 {
00447                                         nlwarning( "LNETL1: Socket %s received length exceeding max expected, in block header... Disconnecting", _SockId->asString().c_str() );
00448                                         throw ESocket( "Received length exceeding max expected", false );
00449                                 }
00450 
00451                                 // Receive message payload (in blocking mode)
00452                                 CObjectVector<uint8> buffer;
00453                                 buffer.resize(len+1);
00454 
00455                                 sock()->receive( buffer.getPtr(), len );
00456                                 
00457                                 //commented out for optimisation: nldebug( "LNETL1: Client %s received buffer (%u bytes)", _SockId->asString().c_str(), buffer.size()/*, stringFromVector(buffer).c_str()*/ );
00458                                 // Add event type
00459                                 buffer[len] = CBufNetBase::User;
00460 
00461                                 // Push message into receive queue
00462                                 _Client->pushMessageIntoReceiveQueue( buffer.getPtr(), buffer.size() );
00463                         }
00464                         else
00465                         {
00466                                 nlwarning( "LNETL1: Socket %s received null length in block header", _SockId->asString().c_str() );
00467                         }
00468                 }
00469                 catch ( ESocketConnectionClosed& )
00470                 {
00471                         nldebug( "LNETL1: Client connection %s closed", _SockId->asString().c_str() );
00472                         // The socket went to _Connected=false when throwing the exception
00473                         connected = false;
00474                 }
00475                 catch ( ESocket& )
00476                 {
00477                         nldebug( "LNETL1: Client connection %s broken", _SockId->asString().c_str() );
00478                         sock()->disconnect();
00479                         connected = false;
00480                 }
00481         }
00482 
00483         nlnettrace( "Exiting CClientReceiveTask::run()" );
00484 }
00485 
00486 
00487 } // NLNET