From 0ea5fc66924303d1bf73ba283a383e2aadee02f2 Mon Sep 17 00:00:00 2001 From: neodarz Date: Sat, 11 Aug 2018 20:21:34 +0200 Subject: Initial commit --- docs/doxygen/nel/buf__server_8cpp-source.html | 1240 +++++++++++++++++++++++++ 1 file changed, 1240 insertions(+) create mode 100644 docs/doxygen/nel/buf__server_8cpp-source.html (limited to 'docs/doxygen/nel/buf__server_8cpp-source.html') diff --git a/docs/doxygen/nel/buf__server_8cpp-source.html b/docs/doxygen/nel/buf__server_8cpp-source.html new file mode 100644 index 00000000..dc97cae7 --- /dev/null +++ b/docs/doxygen/nel/buf__server_8cpp-source.html @@ -0,0 +1,1240 @@ + + + + nevrax.org : docs + + + + + + + + + + + + + + +
# Home   # nevrax.com   
+ + + + +
Nevrax
+ + + + + + + + + + +
+ + +
+ Nevrax.org
+ + + + + + + +
#News
#Mailing-list
#Documentation
#CVS
#Bugs
#License
+
+ + +
+ + +
+Docs + +
+  + + + + + +
Documentation 
+ +
+Main Page   Namespace List   Class Hierarchy   Alphabetical List   Compound List   File List   Namespace Members   Compound Members   File Members   Related Pages   Search  
+

buf_server.cpp

Go to the documentation of this file.
00001 
+00007 /* Copyright, 2001 Nevrax Ltd.
+00008  *
+00009  * This file is part of NEVRAX NEL.
+00010  * NEVRAX NEL is free software; you can redistribute it and/or modify
+00011  * it under the terms of the GNU General Public License as published by
+00012  * the Free Software Foundation; either version 2, or (at your option)
+00013  * any later version.
+00014 
+00015  * NEVRAX NEL is distributed in the hope that it will be useful, but
+00016  * WITHOUT ANY WARRANTY; without even the implied warranty of
+00017  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+00018  * General Public License for more details.
+00019 
+00020  * You should have received a copy of the GNU General Public License
+00021  * along with NEVRAX NEL; see the file COPYING. If not, write to the
+00022  * Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
+00023  * MA 02111-1307, USA.
+00024  */
+00025 
+00026 #include "stdnet.h"
+00027 
+00028 #include "nel/misc/hierarchical_timer.h"
+00029 
+00030 #include "nel/net/buf_server.h"
+00031 
+00032 #ifdef NL_OS_WINDOWS
+00033 #include <winsock2.h>
+00034 //typedef sint socklen_t;
+00035 
+00036 #elif defined NL_OS_UNIX
+00037 #include <unistd.h>
+00038 #include <sys/types.h>
+00039 #include <sys/time.h>
+00040 #endif
+00041 
+00042 
+00043 using namespace NLMISC;
+00044 using namespace std;
+00045 
+00046 namespace NLNET {
+00047 
+00048 
+00049 /***************************************************************************************************
+00050  * User main thread (initialization)
+00051  **************************************************************************************************/
+00052 
+00053 
+00054 /*
+00055  * Constructor
+00056  */
+00057 CBufServer::CBufServer( TThreadStategy strategy,
+00058         uint16 max_threads, uint16 max_sockets_per_thread, bool nodelay, bool replaymode ) :
+00059         CBufNetBase(),
+00060         _NoDelay( nodelay ),
+00061         _ThreadStrategy( strategy ),
+00062         _MaxThreads( max_threads ),
+00063         _MaxSocketsPerThread( max_sockets_per_thread ),
+00064         _ListenTask( NULL ),
+00065         _ListenThread( NULL ),
+00066         _ThreadPool("CBufServer::_ThreadPool"),
+00067         _ConnectionCallback( NULL ),
+00068         _ConnectionCbArg( NULL ),
+00069         _BytesPushedOut( 0 ),
+00070         _BytesPoppedIn( 0 ),
+00071         _PrevBytesPoppedIn( 0 ),
+00072         _PrevBytesPushedOut( 0 ),
+00073         _NbConnections (0),
+00074         _ReplayMode( replaymode )
+00075 {
+00076         nlnettrace( "CBufServer::CBufServer" );
+00077         if ( ! _ReplayMode )
+00078         {
+00079                 _ListenTask = new CListenTask( this );
+00080                 _ListenThread = IThread::create( _ListenTask );
+00081         }
+00082         /*{
+00083                 CSynchronized<uint32>::CAccessor syncbpi ( &_BytesPushedIn );
+00084                 syncbpi.value() = 0;
+00085         }*/
+00086 }
+00087 
+00088 
+00089 /*
+00090  * Listens on the specified port
+00091  */
+00092 void CBufServer::init( uint16 port )
+00093 {
+00094         nlnettrace( "CBufServer::init" );
+00095         if ( ! _ReplayMode )
+00096         {
+00097                 _ListenTask->init( port );
+00098                 _ListenThread->start();
+00099         }
+00100         else
+00101         {
+00102                 nldebug( "LNETL0: Binding listen socket to any address, port %hu", port );
+00103         }
+00104 }
+00105 
+00106 
+00107 /*
+00108  * Begins to listen on the specified port (call before running thread)
+00109  */
+00110 void CListenTask::init( uint16 port )
+00111 {
+00112         nlnettrace( "CListenTask::init" );
+00113         _ListenSock.init( port );
+00114 }
+00115 
+00116 
+00117 /***************************************************************************************************
+00118  * User main thread (running)
+00119  **************************************************************************************************/
+00120 
+00121 
+00122 /*
+00123  * Constructor
+00124  */
+00125 CServerTask::CServerTask() : _ExitRequired(false)
+00126 {
+00127 #ifdef NL_OS_UNIX
+00128         pipe( _WakeUpPipeHandle );
+00129 #endif
+00130 }
+00131 
+00132 
+00133 
+00134 #ifdef NL_OS_UNIX
+00135 /*
+00136  * Wake the thread up, when blocked in select (Unix only)
+00137  */
+00138 void CServerTask::wakeUp()
+00139 {
+00140         uint8 b;
+00141         if ( write( _WakeUpPipeHandle[PipeWrite], &b, 1 ) == -1 )
+00142         {
+00143                 nldebug( "LNETL1: In CServerTask::wakeUp(): write() failed" );
+00144         }
+00145 }
+00146 #endif
+00147 
+00148 
+00149 /*
+00150  * Destructor
+00151  */
+00152 CServerTask::~CServerTask()
+00153 {
+00154 #ifdef NL_OS_UNIX
+00155         close( _WakeUpPipeHandle[PipeRead] );
+00156         close( _WakeUpPipeHandle[PipeWrite] );
+00157 #endif
+00158 }
+00159 
+00160 
+00161 /*
+00162  * Destructor
+00163  */
+00164 CBufServer::~CBufServer()
+00165 {
+00166         nlnettrace( "CBufServer::~CBufServer" );
+00167 
+00168         // Clean listen thread exit
+00169         if ( ! _ReplayMode )
+00170         {
+00171                 ((CListenTask*)(_ListenThread->getRunnable()))->requireExit();
+00172                 ((CListenTask*)(_ListenThread->getRunnable()))->close();
+00173 #ifdef NL_OS_UNIX
+00174                 _ListenTask->wakeUp();
+00175 #endif
+00176                 _ListenThread->wait();
+00177                 delete _ListenThread;
+00178                 delete _ListenTask;
+00179 
+00180                 // Clean receive thread exits
+00181                 CThreadPool::iterator ipt;
+00182                 {
+00183                         nldebug( "LNETL1: Waiting for end of threads..." );
+00184                         CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
+00185                         for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
+00186                         {
+00187                                 // Tell the threads to exit and wake them up
+00188                                 CServerReceiveTask *task = receiveTask(ipt);
+00189                                 nlnettrace( "Requiring exit" );
+00190                                 task->requireExit();
+00191 
+00192                                 // Wake the threads up
+00193         #ifdef NL_OS_UNIX
+00194                                 task->wakeUp();
+00195         #else
+00196                                 CConnections::iterator ipb;
+00197                                 nlnettrace( "Closing sockets (Win32)" );
+00198                                 {
+00199                                         CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
+00200                                         for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
+00201                                         {
+00202                                                 (*ipb)->Sock->close();
+00203                                         }
+00204                                 }
+00205         #endif
+00206 
+00207                         }
+00208                         
+00209                         nlnettrace( "Waiting" );
+00210                         for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
+00211                         {
+00212                                 // Wait until the threads have exited
+00213                                 (*ipt)->wait();
+00214                         }
+00215 
+00216                         nldebug( "LNETL1: Deleting sockets, tasks and threads..." );
+00217                         for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
+00218                         {
+00219                                 // Delete the socket objects
+00220                                 CServerReceiveTask *task = receiveTask(ipt);
+00221                                 CConnections::iterator ipb;
+00222                                 {
+00223                                         CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
+00224                                         for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
+00225                                         {
+00226                                                 delete (*ipb);
+00227                                         }
+00228                                 }
+00229 
+00230         #ifdef NL_OS_UNIX
+00231                                 // Under Unix, close the sockets now
+00232                                 nlnettrace( "Closing sockets (Unix)" );
+00233                                 {
+00234                                         CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
+00235                                         for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
+00236                                         {
+00237                                                 (*ipb)->Sock->close();
+00238                                         }
+00239                                 }
+00240         #endif
+00241                                 
+00242                                 // Delete the task objects
+00243                                 delete task;
+00244 
+00245                                 // Delete the thread objects
+00246                                 delete (*ipt);
+00247                         }
+00248                 }
+00249         }
+00250 
+00251         nlnettrace( "Exiting CBufServer::~CBufServer" );
+00252 }
+00253 
+00254 
+00255 /*
+00256  * Disconnect the specified host
+00257  * Set hostid to NULL to disconnect all connections.
+00258  * If hostid is not null and the socket is not connected, the method does nothing.
+00259  * If quick is true, any pending data will not be sent before disconnecting.
+00260  */
+00261 void CBufServer::disconnect( TSockId hostid, bool quick )
+00262 {
+00263         nlnettrace( "CBufServer::disconnect" );
+00264         if ( hostid != InvalidSockId )
+00265         {
+00266                 // Disconnect only if physically connected
+00267                 if ( hostid->Sock->connected() )
+00268                 {
+00269                         if ( ! quick )
+00270                         {
+00271                                 hostid->flush();
+00272                         }
+00273                         hostid->Sock->disconnect(); // the connection will be removed by the next call of update()
+00274                 }
+00275         }
+00276         else
+00277         {
+00278                 // Disconnect all
+00279                 CThreadPool::iterator ipt;
+00280                 {
+00281                         CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
+00282                         for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
+00283                         {
+00284                                 CServerReceiveTask *task = receiveTask(ipt);
+00285                                 CConnections::iterator ipb;
+00286                                 {
+00287                                         CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
+00288                                         for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
+00289                                         {
+00290                                                 if ( (*ipb)->Sock->connected() )
+00291                                                 {
+00292                                                         if ( ! quick )
+00293                                                         {
+00294                                                                 (*ipb)->flush();
+00295                                                         }
+00296                                                         (*ipb)->Sock->disconnect();
+00297                                                 }
+00298                                         }
+00299                                 }
+00300                         }
+00301                 }
+00302         }
+00303 }
+00304 
+00305 
+00306 /*
+00307  * Send a message to the specified host
+00308  */
+00309 void CBufServer::send( const CMemStream& buffer, TSockId hostid )
+00310 {
+00311         nlnettrace( "CBufServer::send" );
+00312         nlassert( buffer.length() > 0);
+00313         nlassert( buffer.length() <= maxSentBlockSize() );
+00314 
+00315         // slow down the layer H_AUTO (CBufServer_send);
+00316 
+00317         if ( hostid != InvalidSockId )
+00318         {
+00319                 // debug features, we number all packet to be sure that they are all sent and received
+00320                 // \todo remove this debug feature when ok
+00321 //              nldebug ("send message number %u", hostid->SendNextValue);
+00322 #ifdef NL_BIG_ENDIAN
+00323                 uint32 val = NLMISC_BSWAP32(hostid->SendNextValue);
+00324 #else
+00325                 uint32 val = hostid->SendNextValue;
+00326 #endif
+00327 
+00328                 *(uint32*)buffer.buffer() = val;
+00329                 hostid->SendNextValue++;
+00330 
+00331                 pushBufferToHost( buffer, hostid );
+00332         }
+00333         else
+00334         {
+00335                 // Push into all send queues
+00336                 CThreadPool::iterator ipt;
+00337                 {
+00338                         CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
+00339                         for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
+00340                         {
+00341                                 CServerReceiveTask *task = receiveTask(ipt);
+00342                                 CConnections::iterator ipb;
+00343                                 {
+00344                                         CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
+00345                                         for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
+00346                                         {
+00347                                                 // Send only if the socket is logically connected
+00348                                                 if ( (*ipb)->connectedState() ) 
+00349                                                 {
+00350                                                         // debug features, we number all packet to be sure that they are all sent and received
+00351                                                         // \todo remove this debug feature when ok
+00352 //                                                      nldebug ("send message number %u", (*ipb)->SendNextValue);
+00353 #ifdef NL_BIG_ENDIAN
+00354                                                         uint32 val = NLMISC_BSWAP32((*ipb)->SendNextValue);
+00355 #else
+00356                                                         uint32 val = (*ipb)->SendNextValue;
+00357 #endif
+00358                                                         *(uint32*)buffer.buffer() = val;
+00359                                                         (*ipb)->SendNextValue++;
+00360 
+00361                                                         pushBufferToHost( buffer, *ipb );
+00362                                                 }
+00363                                         }
+00364                                 }
+00365                         }
+00366                 }
+00367         }
+00368 }
+00369 
+00370 
+00371 /*
+00372  * Checks if there are some data to receive
+00373  */
+00374 bool CBufServer::dataAvailable()
+00375 {
+00376         // slow down the layer H_AUTO (CBufServer_dataAvailable);
+00377         {
+00378                 /* If no data available, enter the 'while' loop and return false (1 volatile test)
+00379                  * If there are user data available, enter the 'while' and return true immediately (1 volatile test + 1 short locking)
+00380                  * If there is a connection/disconnection event (rare), call the callback and loop
+00381                  */
+00382                 while ( dataAvailableFlag() )
+00383                 {
+00384                         // Because _DataAvailable is true, the receive queue is not empty at this point
+00385                         vector<uint8> buffer;
+00386                         uint8 val;
+00387                         {
+00388                                 CFifoAccessor recvfifo( &receiveQueue() );
+00389                                 val = recvfifo.value().frontLast();
+00390                                 if ( val != CBufNetBase::User )
+00391                                 {
+00392                                         recvfifo.value().front( buffer );
+00393                                 }
+00394                         }
+00395 
+00396                         /*sint32 mbsize = recvfifo.value().size() / 1048576;
+00397                         if ( mbsize > 0 )
+00398                         {
+00399                           nlwarning( "The receive queue size exceeds %d MB", mbsize );
+00400                         }*/
+00401 
+00402                         /*vector<uint8> buffer;
+00403                         recvfifo.value().front( buffer );*/
+00404 
+00405                         // Test if it the next block is a system event
+00406                         //switch ( buffer[buffer.size()-1] )
+00407                         switch ( val )
+00408                         {
+00409                                 
+00410                         // Normal message available
+00411                         case CBufNetBase::User:
+00412                                 return true; // return immediatly, do not extract the message
+00413 
+00414                         // Process disconnection event
+00415                         case CBufNetBase::Disconnection:
+00416                         {
+00417                                 TSockId sockid = *((TSockId*)(&*buffer.begin()));
+00418                                 nldebug( "LNETL1: Disconnection event for %p %s", sockid, sockid->asString().c_str());
+00419 
+00420                                 sockid->setConnectedState( false );
+00421 
+00422                                 // Call callback if needed
+00423                                 if ( disconnectionCallback() != NULL )
+00424                                 {
+00425                                         disconnectionCallback()( sockid, argOfDisconnectionCallback() );
+00426                                 }
+00427 
+00428                                 // Add socket object into the synchronized remove list
+00429                                 nldebug( "LNETL1: Adding the connection to the remove list" );
+00430                                 nlassert( ((CServerBufSock*)sockid)->ownerTask() != NULL );
+00431                                 ((CServerBufSock*)sockid)->ownerTask()->addToRemoveSet( sockid );
+00432                                 break;
+00433                         }
+00434                         // Process connection event
+00435                         case CBufNetBase::Connection:
+00436                         {
+00437                                 TSockId sockid = *((TSockId*)(&*buffer.begin()));
+00438                                 nldebug( "LNETL1: Connection event for %p %s", sockid, sockid->asString().c_str());
+00439 
+00440                                 sockid->setConnectedState( true );
+00441                                 
+00442                                 // Call callback if needed
+00443                                 if ( connectionCallback() != NULL )
+00444                                 {
+00445                                         connectionCallback()( sockid, argOfConnectionCallback() );
+00446                                 }
+00447                                 break;
+00448                         }
+00449                         default: // should not occur
+00450                                 nlinfo( "LNETL1: Invalid block type: %hu (should be = to %hu", (uint16)(buffer[buffer.size()-1]), (uint16)(val) );
+00451                                 nlinfo( "LNETL1: Buffer (%d B): [%s]", buffer.size(), stringFromVector(buffer).c_str() );
+00452                                 nlinfo( "LNETL1: Receive queue:" );
+00453                                 {
+00454                                         CFifoAccessor recvfifo( &receiveQueue() );
+00455                                         recvfifo.value().display();
+00456                                 }
+00457                                 nlerror( "LNETL1: Invalid system event type in server receive queue" );
+00458 
+00459                         }
+00460 
+00461                         // Extract system event
+00462                         {
+00463                                 CFifoAccessor recvfifo( &receiveQueue() );
+00464                                 recvfifo.value().pop();
+00465                                 setDataAvailableFlag( ! recvfifo.value().empty() );
+00466                         }
+00467                 }
+00468                 // _DataAvailable is false here
+00469                 return false;
+00470         }
+00471 }
+00472 
+00473 
+00474 /* // OLD VERSION
+00475 bool CBufServer::dataAvailable()
+00476 {
+00477         // slow down the layer H_AUTO (CBufServer_dataAvailable);
+00478         {
+00479                 CFifoAccessor recvfifo( &receiveQueue() );
+00480                 do
+00481                 {
+00482                         // Check if the receive queue is empty
+00483                         if ( recvfifo.value().empty() )
+00484                         {
+00485                                 return false;
+00486                         }
+00487                         else
+00488                         {
+00489                           //sint32 mbsize = recvfifo.value().size() / 1048576;
+00490                           //if ( mbsize > 0 )
+00491                             //{
+00492                             //  nlwarning( "The receive queue size exceeds %d MB", mbsize );
+00493                             //}
+00494 
+00495                                 uint8 val = recvfifo.value().frontLast();
+00496                                 
+00497                                 //vector<uint8> buffer;
+00498                                 //recvfifo.value().front( buffer );
+00499 
+00500                                 // Test if it the next block is a system event
+00501                                 //switch ( buffer[buffer.size()-1] )
+00502                                 switch ( val )
+00503                                 {
+00504                                         
+00505                                 // Normal message available
+00506                                 case CBufNetBase::User:
+00507                                         return true; // return immediatly, do not extract the message
+00508 
+00509                                 // Process disconnection event
+00510                                 case CBufNetBase::Disconnection:
+00511                                 {
+00512                                         vector<uint8> buffer;
+00513                                         recvfifo.value().front( buffer );
+00514 
+00515                                         TSockId sockid = *((TSockId*)(&*buffer.begin()));
+00516                                         nldebug( "LNETL1: Disconnection event for %p %s", sockid, sockid->asString().c_str());
+00517 
+00518                                         sockid->setConnectedState( false );
+00519 
+00520                                         // Call callback if needed
+00521                                         if ( disconnectionCallback() != NULL )
+00522                                         {
+00523                                                 disconnectionCallback()( sockid, argOfDisconnectionCallback() );
+00524                                         }
+00525 
+00526                                         // Add socket object into the synchronized remove list
+00527                                         nldebug( "LNETL1: Adding the connection to the remove list" );
+00528                                         nlassert( ((CServerBufSock*)sockid)->ownerTask() != NULL );
+00529                                         ((CServerBufSock*)sockid)->ownerTask()->addToRemoveSet( sockid );
+00530                                         break;
+00531                                 }
+00532                                 // Process connection event
+00533                                 case CBufNetBase::Connection:
+00534                                 {
+00535                                         vector<uint8> buffer;
+00536                                         recvfifo.value().front( buffer );
+00537 
+00538                                         TSockId sockid = *((TSockId*)(&*buffer.begin()));
+00539                                         nldebug( "LNETL1: Connection event for %p %s", sockid, sockid->asString().c_str());
+00540 
+00541                                         sockid->setConnectedState( true );
+00542                                         
+00543                                         // Call callback if needed
+00544                                         if ( connectionCallback() != NULL )
+00545                                         {
+00546                                                 connectionCallback()( sockid, argOfConnectionCallback() );
+00547                                         }
+00548                                         break;
+00549                                 }
+00550                                 default:
+00551                                         vector<uint8> buffer;
+00552                                         recvfifo.value().front( buffer );
+00553 
+00554                                         nlinfo( "LNETL1: Invalid block type: %hu (should be = to %hu", (uint16)(buffer[buffer.size()-1]), (uint16)(val) );
+00555                                         nlinfo( "LNETL1: Buffer (%d B): [%s]", buffer.size(), stringFromVector(buffer).c_str() );
+00556                                         nlinfo( "LNETL1: Receive queue:" );
+00557                                         recvfifo.value().display();
+00558                                         nlerror( "LNETL1: Invalid system event type in server receive queue" );
+00559 
+00560                                 }
+00561 
+00562                                 // Extract system event
+00563                                 recvfifo.value().pop();
+00564                         }
+00565                 }
+00566                 while ( true );
+00567         }
+00568 }
+00569 */
+00570  
+00571 /*
+00572  * Receives next block of data in the specified. The length and hostid are output arguments.
+00573  * Precond: dataAvailable() has returned true, phostid not null
+00574  */
+00575 void CBufServer::receive( CMemStream& buffer, TSockId* phostid )
+00576 {
+00577         nlnettrace( "CBufServer::receive" );
+00578         //nlassert( dataAvailable() );
+00579         nlassert( phostid != NULL );
+00580         {
+00581                 CFifoAccessor recvfifo( &receiveQueue() );
+00582                 nlassert( ! recvfifo.value().empty() );
+00583                 recvfifo.value().front( buffer );
+00584                 recvfifo.value().pop();
+00585                 setDataAvailableFlag( ! recvfifo.value().empty() );
+00586         }
+00587 
+00588         // Extract hostid (and event type)
+00589         *phostid = *((TSockId*)&(buffer.buffer()[buffer.length()-sizeof(TSockId)-1]));
+00590         nlassert( buffer.buffer()[buffer.length()-1] == CBufNetBase::User );
+00591 
+00592         // debug features, we number all packet to be sure that they are all sent and received
+00593         // \todo remove this debug feature when ok
+00594 #ifdef NL_BIG_ENDIAN
+00595         uint32 val = NLMISC_BSWAP32(*(uint32*)buffer.buffer());
+00596 #else
+00597         uint32 val = *(uint32*)buffer.buffer();
+00598 #endif
+00599 
+00600         //      nldebug ("receive message number %u", val);
+00601         if ((*phostid)->ReceiveNextValue != val)
+00602         {
+00603                 nlwarning (("LNETL1: !!!LOST A MESSAGE!!! I received the message number %u but I'm waiting the message number %u (cnx %s), warn lecroart@nevrax.com with the log now please", val, (*phostid)->ReceiveNextValue, (*phostid)->asString().c_str()));
+00604                 // resync the message number
+00605                 (*phostid)->ReceiveNextValue = val;
+00606         }
+00607 
+00608         (*phostid)->ReceiveNextValue++;
+00609 
+00610         buffer.resize( buffer.length()-sizeof(TSockId)-1 );
+00611 
+00612         // TODO OPTIM remove the nldebug for speed
+00613         //commented for optimisation nldebug( "LNETL1: Read buffer (%d+%d B) from %s", buffer.length(), sizeof(TSockId)+1, /*stringFromVector(buffer).c_str(), */(*phostid)->asString().c_str() );
+00614 
+00615         // Statistics
+00616         _BytesPoppedIn += buffer.length() + sizeof(TBlockSize);
+00617 }
+00618 
+00619 
+00620 /*
+00621  * Update the network (call this method evenly)
+00622  */
+00623 void CBufServer::update()
+00624 {
+00625         //nlnettrace( "CBufServer::update-BEGIN" );
+00626 
+00627         _NbConnections = 0;
+00628 
+00629         // For each thread
+00630         CThreadPool::iterator ipt;
+00631         {
+00632           //nldebug( "UPD: Acquiring the Thread Pool" );
+00633                 CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
+00634                 //nldebug( "UPD: Acquired." );
+00635                 for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
+00636                 {
+00637                         // For each thread of the pool
+00638                         CServerReceiveTask *task = receiveTask(ipt);
+00639                         CConnections::iterator ipb;
+00640                         {
+00641                                 CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
+00642                                 for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
+00643                                 {
+00644                                     // For each socket of the thread, update sending
+00645                                     if ( ! ((*ipb)->Sock->connected() && (*ipb)->update()) )
+00646                                     {
+00647                                                 // Update did not work or the socket is not connected anymore
+00648                                         nldebug( "LNETL1: Socket %s is disconnected", (*ipb)->asString().c_str() );
+00649                                                 // Disconnection event if disconnected (known either from flush (in update) or when receiving data)
+00650                                                 (*ipb)->advertiseDisconnection( this, *ipb );
+00651                                         
+00652                                                 /*if ( (*ipb)->advertiseDisconnection( this, *ipb ) )
+00653                                                 {
+00654                                                         // Now the connection removal is in dataAvailable()
+00655                                                         // POLL6
+00656                                                 }*/
+00657                                     }
+00658                                     else
+00659                                     {
+00660                                                 _NbConnections++;
+00661                                     }
+00662                                 }
+00663                         }
+00664                 }
+00665         }
+00666 
+00667         //nlnettrace( "CBufServer::update-END" );
+00668 }
+00669 
+00670 uint32 CBufServer::getSendQueueSize( TSockId destid )
+00671 {
+00672         if ( destid != InvalidSockId )
+00673         {
+00674                 return destid->SendFifo.size();
+00675         }
+00676         else
+00677         {
+00678                 // add all client buffers
+00679 
+00680                 uint32 total = 0;
+00681 
+00682                 // For each thread
+00683                 CThreadPool::iterator ipt;
+00684                 {
+00685                         CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
+00686                         for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
+00687                         {
+00688                                 // For each thread of the pool
+00689                                 CServerReceiveTask *task = receiveTask(ipt);
+00690                                 CConnections::iterator ipb;
+00691                                 {
+00692                                         CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
+00693                                         for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
+00694                                         {
+00695                                                 // For each socket of the thread, update sending
+00696                                                 total = (*ipb)->SendFifo.size ();
+00697                                         }
+00698                                 }
+00699                         }
+00700                 }
+00701                 return total;
+00702         }
+00703 }
+00704 
+00705 
+00706 /*
+00707  * Returns the number of bytes received since the previous call to this method
+00708  */
+00709 uint64 CBufServer::newBytesReceived()
+00710 {
+00711         uint64 b = bytesReceived();
+00712         uint64 nbrecvd = b - _PrevBytesPoppedIn;
+00713         //nlinfo( "b: %"NL_I64"u   new: %"NL_I64"u", b, nbrecvd );
+00714         _PrevBytesPoppedIn = b;
+00715         return nbrecvd;
+00716 }
+00717 
+00718 /*
+00719  * Returns the number of bytes sent since the previous call to this method
+00720  */
+00721 uint64 CBufServer::newBytesSent()
+00722 {
+00723         uint64 b = bytesSent();
+00724         uint64 nbsent = b - _PrevBytesPushedOut;
+00725         //nlinfo( "b: %"NL_I64"u   new: %"NL_I64"u", b, nbsent );
+00726         _PrevBytesPushedOut = b;
+00727         return nbsent;
+00728 }
+00729 
+00730 
+00731 /***************************************************************************************************
+00732  * Listen thread
+00733  **************************************************************************************************/
+00734 
+00735 
+00736 /*
+00737  * Code of listening thread
+00738  */
+00739 void CListenTask::run()
+00740 {
+00741         nlnettrace( "CListenTask::run" );
+00742 
+00743 #ifdef NL_OS_UNIX
+00744         SOCKET descmax;
+00745         fd_set readers;
+00746         timeval tv;
+00747         descmax = _ListenSock.descriptor()>_WakeUpPipeHandle[PipeRead]?_ListenSock.descriptor():_WakeUpPipeHandle[PipeRead];
+00748 #endif
+00749 
+00750         // Accept connections
+00751         while ( ! exitRequired() )
+00752         {
+00753                 try
+00754                 {
+00755                         // Get and setup the new socket
+00756 #ifdef NL_OS_UNIX
+00757                         FD_ZERO( &readers );
+00758                         FD_SET( _ListenSock.descriptor(), &readers );
+00759                         FD_SET( _WakeUpPipeHandle[PipeRead], &readers );
+00760                         tv.tv_sec = 60; 
+00761                         tv.tv_usec = 0;
+00762                         int res = ::select( descmax+1, &readers, NULL, NULL, &tv );
+00763 
+00764                         switch ( res )
+00765                         {
+00766                         case  0 : continue; // time-out expired, no results
+00767                         case -1 :
+00768                                 // we'll ignore message (Interrupted system call) caused by a CTRL-C
+00769                                 if (CSock::getLastError() == 4)
+00770                                 {
+00771                                         nldebug ("LNETL1: Select failed (in listen thread): %s (code %u) but IGNORED", CSock::errorString( CSock::getLastError() ).c_str(), CSock::getLastError());
+00772                                         continue;
+00773                                 }
+00774                                 nlerror( "LNETL1: Select failed (in listen thread): %s (code %u)", CSock::errorString( CSock::getLastError() ).c_str(), CSock::getLastError() );
+00775                         }
+00776 
+00777                         if ( FD_ISSET( _WakeUpPipeHandle[PipeRead], &readers ) )
+00778                         {
+00779                                 uint8 b;
+00780                                 if ( read( _WakeUpPipeHandle[PipeRead], &b, 1 ) == -1 ) // we were woken-up by the wake-up pipe
+00781                                 {
+00782                                         nldebug( "LNETL1: In CListenTask::run(): read() failed" );
+00783                                 }
+00784                                 nldebug( "LNETL1: listen thread select woken-up" );
+00785                                 continue;
+00786                         }
+00787 #endif
+00788                         nldebug( "LNETL1: Waiting incoming connection..." );
+00789                         CServerBufSock *bufsock = new CServerBufSock( _ListenSock.accept() );
+00790                         nldebug( "New connection : %s", bufsock->asString().c_str() );
+00791                         bufsock->Sock->setNonBlockingMode( true );
+00792                         if ( _Server->noDelay() )
+00793                         {
+00794                                 bufsock->Sock->setNoDelay( true );
+00795                         }
+00796 
+00797                         // Notify the new connection
+00798                         bufsock->advertiseConnection( _Server );
+00799 
+00800                         // Dispatch the socket into the thread pool
+00801                         _Server->dispatchNewSocket( bufsock );
+00802                 }
+00803                 catch ( ESocket& e )
+00804                 {
+00805                         nlinfo( "Exception in listen thread: %s", e.what() ); // It can occur in normal behavior (e.g. when exiting)
+00806                         // It can also occur when too many sockets are open (e.g. 885 connections)
+00807                 }
+00808         }
+00809 
+00810         nlnettrace( "Exiting CListenTask::run" );
+00811 }
+00812 
+00813 
+00814 /*
+00815  * Binds a new socket and send buffer to an existing or a new thread
+00816  * Note: this method is called in the listening thread.
+00817  */
+00818 void CBufServer::dispatchNewSocket( CServerBufSock *bufsock )
+00819 {
+00820         nlnettrace( "CBufServer::dispatchNewSocket" );
+00821 
+00822         CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
+00823         if ( _ThreadStrategy == SpreadSockets ) 
+00824         {
+00825                 // Find the thread with the smallest number of connections and check if all
+00826                 // threads do not have the same number of connections
+00827                 uint min = 0xFFFFFFFF;
+00828                 uint max = 0;
+00829                 CThreadPool::iterator ipt, iptmin, iptmax;
+00830                 for ( iptmin=iptmax=ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
+00831                 {
+00832                         uint noc = receiveTask(ipt)->numberOfConnections();
+00833                         if ( noc < min )
+00834                         {
+00835                                 min = noc;
+00836                                 iptmin = ipt;
+00837                         }
+00838                         if ( noc > max )
+00839                         {
+00840                                 max = noc;
+00841                                 iptmax = ipt;
+00842                         }
+00843                 }
+00844 
+00845                 // Check if we make the pool of threads grow (if we have not found vacant room
+00846                 // and if it is allowed to)
+00847                 if ( (poolsync.value().empty()) ||
+00848                          ((min == max) && (poolsync.value().size() < _MaxThreads)) )
+00849                 {
+00850                         addNewThread( poolsync.value(), bufsock );
+00851                 }
+00852                 else
+00853                 {
+00854                         // Dispatch socket to an existing thread of the pool
+00855                         CServerReceiveTask *task = receiveTask(iptmin);
+00856                         bufsock->setOwnerTask( task );
+00857                         task->addNewSocket( bufsock );
+00858 #ifdef NL_OS_UNIX
+00859                         task->wakeUp();
+00860 #endif                  
+00861                         
+00862                         if ( min >= (uint)_MaxSocketsPerThread )
+00863                         {
+00864                                 nlwarning( "LNETL1: Exceeding the maximum number of sockets per thread" );
+00865                         }
+00866                         nldebug( "LNETL1: New socket dispatched to thread %d", iptmin-poolsync.value().begin() );
+00867                 }
+00868 
+00869         }
+00870         else // _ThreadStrategy == FillThreads
+00871         {
+00872                 CThreadPool::iterator ipt;
+00873                 for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
+00874                 {
+00875                         uint noc = receiveTask(ipt)->numberOfConnections();
+00876                         if ( noc < _MaxSocketsPerThread )
+00877                         {
+00878                                 break;
+00879                         }
+00880                 }
+00881 
+00882                 // Check if we have to make the thread pool grow (if we have not found vacant room)
+00883                 if ( ipt == poolsync.value().end() )
+00884                 {
+00885                         if ( poolsync.value().size() == _MaxThreads )
+00886                         {
+00887                                 nlwarning( "LNETL1: Exceeding the maximum number of threads" );
+00888                         }
+00889                         addNewThread( poolsync.value(), bufsock );
+00890                 }
+00891                 else
+00892                 {
+00893                         // Dispatch socket to an existing thread of the pool
+00894                         CServerReceiveTask *task = receiveTask(ipt);
+00895                         bufsock->setOwnerTask( task );
+00896                         task->addNewSocket( bufsock );
+00897 #ifdef NL_OS_UNIX
+00898                         task->wakeUp();
+00899 #endif                  
+00900                         nldebug( "LNETL1: New socket dispatched to thread %d", ipt-poolsync.value().begin() );
+00901                 }
+00902         }
+00903 }
+00904 
+00905 
+00906 /*
+00907  * Creates a new task and run a new thread for it
+00908  * Precond: bufsock not null
+00909  */
+00910 void CBufServer::addNewThread( CThreadPool& threadpool, CServerBufSock *bufsock )
+00911 {
+00912         nlnettrace( "CBufServer::addNewThread" );
+00913         nlassert( bufsock != NULL );
+00914 
+00915         // Create new task and dispatch the socket to it
+00916         CServerReceiveTask *task = new CServerReceiveTask( this );
+00917         bufsock->setOwnerTask( task );
+00918         task->addNewSocket( bufsock );
+00919 
+00920         // Add a new thread to the pool, with this task
+00921         IThread *thr = IThread::create( task );
+00922         {
+00923                 threadpool.push_back( thr );
+00924                 thr->start();
+00925                 nldebug( "LNETL1: Added a new thread; pool size is %d", threadpool.size() );
+00926                 nldebug( "LNETL1: New socket dispatched to thread %d", threadpool.size()-1 );
+00927         }
+00928 }
+00929 
+00930 
+00931 /***************************************************************************************************
+00932  * Receive threads
+00933  **************************************************************************************************/
+00934 
+00935 
+00936 /*
+00937  * Code of receiving threads for servers
+00938  */
+00939 void CServerReceiveTask::run()
+00940 {
+00941         nlnettrace( "CServerReceiveTask::run" );
+00942 
+00943         SOCKET descmax;
+00944         fd_set readers;
+00945 
+00946         // Time-out value for select (it can be long because we do not do any thing else in this thread)
+00947         timeval tv;
+00948 #if defined NL_OS_UNIX
+00949         // POLL7
+00950         nice( 2 );
+00951 #endif // NL_OS_UNIX
+00952         
+00953         // Copy of _Connections
+00954         vector<TSockId> connections_copy;       
+00955 
+00956         while ( ! exitRequired() )
+00957         {
+00958                 // 1. Remove closed connections
+00959                 clearClosedConnections();
+00960 
+00961                 // POLL8
+00962 
+00963                 // 2-SELECT-VERSION : select() on the sockets handled in the present thread
+00964 
+00965                 descmax = 0;
+00966                 FD_ZERO( &readers );
+00967                 bool skip;
+00968                 bool alldisconnected = true;
+00969                 CConnections::iterator ipb;
+00970                 {
+00971                         // Lock _Connections
+00972                         CSynchronized<CConnections>::CAccessor connectionssync( &_Connections );
+00973 
+00974                         // Prepare to avoid select if there is no connection
+00975                         skip = connectionssync.value().empty();
+00976 
+00977                         // Fill the select array and copy _Connections
+00978                         connections_copy.clear();
+00979                         for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
+00980                         {
+00981                                 if ( (*ipb)->Sock->connected() ) // exclude disconnected sockets that are not deleted
+00982                                 {
+00983                                         alldisconnected = false;
+00984                                         // Copy _Connections element
+00985                                         connections_copy.push_back( *ipb );
+00986 
+00987                                         // Add socket descriptor to the select array
+00988                                         FD_SET( (*ipb)->Sock->descriptor(), &readers );
+00989 
+00990                                         // Calculate descmax for select
+00991                                         if ( (*ipb)->Sock->descriptor() > descmax )
+00992                                         {
+00993                                                 descmax = (*ipb)->Sock->descriptor();
+00994                                         }
+00995                                 }
+00996                         }
+00997 
+00998 #ifdef NL_OS_UNIX
+00999                         // Add the wake-up pipe into the select array
+01000                         FD_SET( _WakeUpPipeHandle[PipeRead], &readers );
+01001                         if ( _WakeUpPipeHandle[PipeRead]>descmax )
+01002                         {
+01003                                 descmax = _WakeUpPipeHandle[PipeRead];
+01004                         }
+01005 #endif
+01006                         
+01007                         // Unlock _Connections, use connections_copy instead
+01008                 }
+01009 
+01010 #ifndef NL_OS_UNIX
+01011                 // Avoid select if there is no connection (Windows only)
+01012                 if ( skip || alldisconnected )
+01013                 {
+01014                         nlSleep( 1 ); // nice
+01015                         continue;
+01016                 }
+01017 #endif
+01018 
+01019 #ifdef NL_OS_WINDOWS
+01020                 tv.tv_sec = 0; // short time because the newly added connections can't be added to the select fd_set
+01021                 tv.tv_usec = 10000; // NEW: set to 500ms because otherwise new connections handling are too slow
+01022 #elif defined NL_OS_UNIX
+01023                 // POLL7
+01024                 tv.tv_sec = 3600;               // 1 hour (=> 1 select every 3.6 second for 1000 connections)
+01025                 tv.tv_usec = 0;
+01026 #endif // NL_OS_WINDOWS
+01027 
+01028                 // Call select
+01029                 int res = ::select( descmax+1, &readers, NULL, NULL, &tv );
+01030 
+01031                 // POLL9
+01032 
+01033                 // 3. Test the result
+01034                 switch ( res )
+01035                 {
+01036                         case  0 : continue; // time-out expired, no results
+01037 
+01039                         case -1 :
+01040                                 // we'll ignore message (Interrupted system call) caused by a CTRL-C
+01041                                 /*if (CSock::getLastError() == 4)
+01042                                 {
+01043                                         nldebug ("LNETL1: Select failed (in receive thread): %s (code %u) but IGNORED", CSock::errorString( CSock::getLastError() ).c_str(), CSock::getLastError());
+01044                                         continue;
+01045                                 }*/
+01046                                 //nlerror( "LNETL1: Select failed (in receive thread): %s (code %u)", CSock::errorString( CSock::getLastError() ).c_str(), CSock::getLastError() );
+01047                                 nldebug( "LNETL1: Select failed (in receive thread): %s (code %u)", CSock::errorString( CSock::getLastError() ).c_str(), CSock::getLastError() );
+01048                                 return;
+01049                 }
+01050 
+01051                 // 4. Get results
+01052 
+01053                 vector<TSockId>::iterator ic;
+01054                 for ( ic=connections_copy.begin(); ic!=connections_copy.end(); ++ic )
+01055                 {
+01056                         if ( FD_ISSET( (*ic)->Sock->descriptor(), &readers ) != 0 )
+01057                         {
+01058                                 CServerBufSock *serverbufsock = static_cast<CServerBufSock*>(static_cast<CBufSock*>(*ic));
+01059                                 try
+01060                                 {
+01061                                         // 4. Receive data
+01062                                         if ( serverbufsock->receivePart() )
+01063                                         {
+01064                                                 // Copy sockid
+01065                                                 vector<uint8> hidvec;
+01066                                                 hidvec.resize( sizeof(TSockId)+1 );
+01067                                                 memcpy( &*hidvec.begin(), &(*ic), sizeof(TSockId) );
+01068 
+01069                                                 // Add event type to hidvec
+01070                                                 hidvec[sizeof(TSockId)] = (uint8)CBufNetBase::User;
+01071 
+01072                                                 // Push message into receive queue
+01073                                                 //uint32 bufsize;
+01074                                                 //sint32 mbsize;
+01075                                                 {
+01076                                                         //nldebug( "RCV: Acquiring the receive queue... ");
+01077                                                         CFifoAccessor recvfifo( &_Server->receiveQueue() );
+01078                                                         //nldebug( "RCV: Acquired, pushing the received buffer... ");
+01079                                                         recvfifo.value().push( serverbufsock->receivedBuffer(), hidvec );
+01080                                                         _Server->setDataAvailableFlag( true );
+01081                                                         //nldebug( "RCV: Pushed, releasing the receive queue..." );
+01082                                                         //recvfifo.value().display();
+01083                                                         //bufsize = serverbufsock->receivedBuffer().size();
+01084                                                         //mbsize = recvfifo.value().size() / 1048576;
+01085                                                 }
+01086                                                 //nldebug( "RCV: Released." );
+01087                                                 /*if ( mbsize > 1 )
+01088                                                 {
+01089                                                         nlwarning( "The receive queue size exceeds %d MB", mbsize );
+01090                                                 }*/
+01091                                                 /*
+01092                                                 // Statistics
+01093                                                 {
+01094                                                         CSynchronized<uint32>::CAccessor syncbpi ( &_Server->syncBytesPushedIn() );
+01095                                                         syncbpi.value() += bufsize;
+01096                                                 }
+01097                                                 */
+01098                                         }
+01099                                 }
+01100                                 catch ( ESocketConnectionClosed& )
+01101                                 {
+01102                                         nldebug( "LNETL1: Connection %s closed", serverbufsock->asString().c_str() );
+01103                                         // The socket went to _Connected=false when throwing the exception
+01104                                 }
+01105                                 catch ( ESocket& )
+01106                                 {
+01107                                         nldebug( "LNETL1: Connection %s broken", serverbufsock->asString().c_str() );
+01108                                         (*ic)->Sock->disconnect();
+01109                                 }
+01110 /*
+01111 #ifdef NL_OS_UNIX
+01112                                 skip = true; // don't check _WakeUpPipeHandle (yes, check it to read any written byte)
+01113 #endif
+01114 
+01115 */
+01116                         }
+01117 
+01118                 }
+01119 
+01120 #ifdef NL_OS_UNIX
+01121                 // Test wake-up pipe
+01122                 if ( (!skip) && (FD_ISSET( _WakeUpPipeHandle[PipeRead], &readers )) )
+01123                 {
+01124                         uint8 b;
+01125                         if ( read( _WakeUpPipeHandle[PipeRead], &b, 1 ) == -1 ) // we were woken-up by the wake-up pipe
+01126                         {
+01127                                 nldebug( "LNETL1: In CServerReceiveTask::run(): read() failed" );
+01128                         }
+01129                         nldebug( "LNETL1: Receive thread select woken-up" );
+01130                 }
+01131 #endif
+01132                 
+01133         }
+01134         nlnettrace( "Exiting CServerReceiveTask::run" );
+01135 }
+01136 
+01137 
+01138 /*
+01139  * Delete all connections referenced in the remove list (double-mutexed)
+01140  */
+01141 
+01142 void CServerReceiveTask::clearClosedConnections()
+01143 {
+01144         CConnections::iterator ic;
+01145         {
+01146                 NLMISC::CSynchronized<CConnections>::CAccessor removesetsync( &_RemoveSet );
+01147                 {
+01148                         if ( ! removesetsync.value().empty() )
+01149                         {
+01150                                 // Delete closed connections
+01151                                 NLMISC::CSynchronized<CConnections>::CAccessor connectionssync( &_Connections );
+01152                                 for ( ic=removesetsync.value().begin(); ic!=removesetsync.value().end(); ++ic )
+01153                                 {
+01154                                         nldebug( "LNETL1: Removing a connection" );
+01155 
+01156                                         TSockId sid = (*ic);
+01157 
+01158                                         // Remove from the connection list
+01159                                         connectionssync.value().erase( *ic );
+01160 
+01161                                         // Delete the socket object
+01162                                         delete sid;
+01163                                 }
+01164                                 // Clear remove list
+01165                                 removesetsync.value().clear();
+01166                         }
+01167                 }
+01168         }
+01169 }
+01170 
+01171 
+01172 } // NLNET
+
+ + +
                                                                                                                                                                    +
+ + -- cgit v1.2.1