# Home    # nevrax.com   
Nevrax
Nevrax.org
#News
#Mailing-list
#Documentation
#CVS
#Bugs
#License
Docs
 
Documentation  
Main Page   Namespace List   Class Hierarchy   Alphabetical List   Compound List   File List   Namespace Members   Compound Members   File Members   Related Pages   Search  

buf_server.cpp

Go to the documentation of this file.
00001 
00007 /* Copyright, 2001 Nevrax Ltd.
00008  *
00009  * This file is part of NEVRAX NEL.
00010  * NEVRAX NEL is free software; you can redistribute it and/or modify
00011  * it under the terms of the GNU General Public License as published by
00012  * the Free Software Foundation; either version 2, or (at your option)
00013  * any later version.
00014 
00015  * NEVRAX NEL is distributed in the hope that it will be useful, but
00016  * WITHOUT ANY WARRANTY; without even the implied warranty of
00017  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
00018  * General Public License for more details.
00019 
00020  * You should have received a copy of the GNU General Public License
00021  * along with NEVRAX NEL; see the file COPYING. If not, write to the
00022  * Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
00023  * MA 02111-1307, USA.
00024  */
00025 
00026 #include "stdnet.h"
00027 
00028 #include "nel/misc/hierarchical_timer.h"
00029 
00030 #include "nel/net/buf_server.h"
00031 
00032 #ifdef NL_OS_WINDOWS
00033 #include <winsock2.h>
00034 //typedef sint socklen_t;
00035 
00036 #elif defined NL_OS_UNIX
00037 #include <unistd.h>
00038 #include <sys/types.h>
00039 #include <sys/time.h>
00040 #endif
00041 
00042 
00043 using namespace NLMISC;
00044 using namespace std;
00045 
00046 namespace NLNET {
00047 
00048 
00049 /***************************************************************************************************
00050  * User main thread (initialization)
00051  **************************************************************************************************/
00052 
00053 
00054 /*
00055  * Constructor
00056  */
00057 CBufServer::CBufServer( TThreadStategy strategy,
00058         uint16 max_threads, uint16 max_sockets_per_thread, bool nodelay, bool replaymode ) :
00059         CBufNetBase(),
00060         _NoDelay( nodelay ),
00061         _ThreadStrategy( strategy ),
00062         _MaxThreads( max_threads ),
00063         _MaxSocketsPerThread( max_sockets_per_thread ),
00064         _ListenTask( NULL ),
00065         _ListenThread( NULL ),
00066         _ThreadPool("CBufServer::_ThreadPool"),
00067         _ConnectionCallback( NULL ),
00068         _ConnectionCbArg( NULL ),
00069         _BytesPushedOut( 0 ),
00070         _BytesPoppedIn( 0 ),
00071         _PrevBytesPoppedIn( 0 ),
00072         _PrevBytesPushedOut( 0 ),
00073         _NbConnections (0),
00074         _ReplayMode( replaymode )
00075 {
00076         nlnettrace( "CBufServer::CBufServer" );
00077         if ( ! _ReplayMode )
00078         {
00079                 _ListenTask = new CListenTask( this );
00080                 _ListenThread = IThread::create( _ListenTask );
00081         }
00082         /*{
00083                 CSynchronized<uint32>::CAccessor syncbpi ( &_BytesPushedIn );
00084                 syncbpi.value() = 0;
00085         }*/
00086 }
00087 
00088 
00089 /*
00090  * Listens on the specified port
00091  */
00092 void CBufServer::init( uint16 port )
00093 {
00094         nlnettrace( "CBufServer::init" );
00095         if ( ! _ReplayMode )
00096         {
00097                 _ListenTask->init( port );
00098                 _ListenThread->start();
00099         }
00100         else
00101         {
00102                 nldebug( "LNETL0: Binding listen socket to any address, port %hu", port );
00103         }
00104 }
00105 
00106 
00107 /*
00108  * Begins to listen on the specified port (call before running thread)
00109  */
00110 void CListenTask::init( uint16 port )
00111 {
00112         nlnettrace( "CListenTask::init" );
00113         _ListenSock.init( port );
00114 }
00115 
00116 
00117 /***************************************************************************************************
00118  * User main thread (running)
00119  **************************************************************************************************/
00120 
00121 
00122 /*
00123  * Constructor
00124  */
00125 CServerTask::CServerTask() : _ExitRequired(false)
00126 {
00127 #ifdef NL_OS_UNIX
00128         pipe( _WakeUpPipeHandle );
00129 #endif
00130 }
00131 
00132 
00133 
00134 #ifdef NL_OS_UNIX
00135 /*
00136  * Wake the thread up, when blocked in select (Unix only)
00137  */
00138 void CServerTask::wakeUp()
00139 {
00140         uint8 b;
00141         if ( write( _WakeUpPipeHandle[PipeWrite], &b, 1 ) == -1 )
00142         {
00143                 nldebug( "LNETL1: In CServerTask::wakeUp(): write() failed" );
00144         }
00145 }
00146 #endif
00147 
00148 
00149 /*
00150  * Destructor
00151  */
00152 CServerTask::~CServerTask()
00153 {
00154 #ifdef NL_OS_UNIX
00155         close( _WakeUpPipeHandle[PipeRead] );
00156         close( _WakeUpPipeHandle[PipeWrite] );
00157 #endif
00158 }
00159 
00160 
00161 /*
00162  * Destructor
00163  */
00164 CBufServer::~CBufServer()
00165 {
00166         nlnettrace( "CBufServer::~CBufServer" );
00167 
00168         // Clean listen thread exit
00169         if ( ! _ReplayMode )
00170         {
00171                 ((CListenTask*)(_ListenThread->getRunnable()))->requireExit();
00172                 ((CListenTask*)(_ListenThread->getRunnable()))->close();
00173 #ifdef NL_OS_UNIX
00174                 _ListenTask->wakeUp();
00175 #endif
00176                 _ListenThread->wait();
00177                 delete _ListenThread;
00178                 delete _ListenTask;
00179 
00180                 // Clean receive thread exits
00181                 CThreadPool::iterator ipt;
00182                 {
00183                         nldebug( "LNETL1: Waiting for end of threads..." );
00184                         CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
00185                         for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
00186                         {
00187                                 // Tell the threads to exit and wake them up
00188                                 CServerReceiveTask *task = receiveTask(ipt);
00189                                 nlnettrace( "Requiring exit" );
00190                                 task->requireExit();
00191 
00192                                 // Wake the threads up
00193         #ifdef NL_OS_UNIX
00194                                 task->wakeUp();
00195         #else
00196                                 CConnections::iterator ipb;
00197                                 nlnettrace( "Closing sockets (Win32)" );
00198                                 {
00199                                         CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
00200                                         for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
00201                                         {
00202                                                 (*ipb)->Sock->close();
00203                                         }
00204                                 }
00205         #endif
00206 
00207                         }
00208                         
00209                         nlnettrace( "Waiting" );
00210                         for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
00211                         {
00212                                 // Wait until the threads have exited
00213                                 (*ipt)->wait();
00214                         }
00215 
00216                         nldebug( "LNETL1: Deleting sockets, tasks and threads..." );
00217                         for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
00218                         {
00219                                 // Delete the socket objects
00220                                 CServerReceiveTask *task = receiveTask(ipt);
00221                                 CConnections::iterator ipb;
00222                                 {
00223                                         CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
00224                                         for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
00225                                         {
00226                                                 delete (*ipb);
00227                                         }
00228                                 }
00229 
00230         #ifdef NL_OS_UNIX
00231                                 // Under Unix, close the sockets now
00232                                 nlnettrace( "Closing sockets (Unix)" );
00233                                 {
00234                                         CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
00235                                         for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
00236                                         {
00237                                                 (*ipb)->Sock->close();
00238                                         }
00239                                 }
00240         #endif
00241                                 
00242                                 // Delete the task objects
00243                                 delete task;
00244 
00245                                 // Delete the thread objects
00246                                 delete (*ipt);
00247                         }
00248                 }
00249         }
00250 
00251         nlnettrace( "Exiting CBufServer::~CBufServer" );
00252 }
00253 
00254 
00255 /*
00256  * Disconnect the specified host
00257  * Set hostid to NULL to disconnect all connections.
00258  * If hostid is not null and the socket is not connected, the method does nothing.
00259  * If quick is true, any pending data will not be sent before disconnecting.
00260  */
00261 void CBufServer::disconnect( TSockId hostid, bool quick )
00262 {
00263         nlnettrace( "CBufServer::disconnect" );
00264         if ( hostid != InvalidSockId )
00265         {
00266                 // Disconnect only if physically connected
00267                 if ( hostid->Sock->connected() )
00268                 {
00269                         if ( ! quick )
00270                         {
00271                                 hostid->flush();
00272                         }
00273                         hostid->Sock->disconnect(); // the connection will be removed by the next call of update()
00274                 }
00275         }
00276         else
00277         {
00278                 // Disconnect all
00279                 CThreadPool::iterator ipt;
00280                 {
00281                         CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
00282                         for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
00283                         {
00284                                 CServerReceiveTask *task = receiveTask(ipt);
00285                                 CConnections::iterator ipb;
00286                                 {
00287                                         CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
00288                                         for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
00289                                         {
00290                                                 if ( (*ipb)->Sock->connected() )
00291                                                 {
00292                                                         if ( ! quick )
00293                                                         {
00294                                                                 (*ipb)->flush();
00295                                                         }
00296                                                         (*ipb)->Sock->disconnect();
00297                                                 }
00298                                         }
00299                                 }
00300                         }
00301                 }
00302         }
00303 }
00304 
00305 
00306 /*
00307  * Send a message to the specified host
00308  */
00309 void CBufServer::send( const CMemStream& buffer, TSockId hostid )
00310 {
00311         nlnettrace( "CBufServer::send" );
00312         nlassert( buffer.length() > 0);
00313         nlassert( buffer.length() <= maxSentBlockSize() );
00314 
00315         // slow down the layer H_AUTO (CBufServer_send);
00316 
00317         if ( hostid != InvalidSockId )
00318         {
00319                 // debug features, we number all packet to be sure that they are all sent and received
00320                 // \todo remove this debug feature when ok
00321 //              nldebug ("send message number %u", hostid->SendNextValue);
00322 #ifdef NL_BIG_ENDIAN
00323                 uint32 val = NLMISC_BSWAP32(hostid->SendNextValue);
00324 #else
00325                 uint32 val = hostid->SendNextValue;
00326 #endif
00327 
00328                 *(uint32*)buffer.buffer() = val;
00329                 hostid->SendNextValue++;
00330 
00331                 pushBufferToHost( buffer, hostid );
00332         }
00333         else
00334         {
00335                 // Push into all send queues
00336                 CThreadPool::iterator ipt;
00337                 {
00338                         CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
00339                         for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
00340                         {
00341                                 CServerReceiveTask *task = receiveTask(ipt);
00342                                 CConnections::iterator ipb;
00343                                 {
00344                                         CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
00345                                         for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
00346                                         {
00347                                                 // Send only if the socket is logically connected
00348                                                 if ( (*ipb)->connectedState() ) 
00349                                                 {
00350                                                         // debug features, we number all packet to be sure that they are all sent and received
00351                                                         // \todo remove this debug feature when ok
00352 //                                                      nldebug ("send message number %u", (*ipb)->SendNextValue);
00353 #ifdef NL_BIG_ENDIAN
00354                                                         uint32 val = NLMISC_BSWAP32((*ipb)->SendNextValue);
00355 #else
00356                                                         uint32 val = (*ipb)->SendNextValue;
00357 #endif
00358                                                         *(uint32*)buffer.buffer() = val;
00359                                                         (*ipb)->SendNextValue++;
00360 
00361                                                         pushBufferToHost( buffer, *ipb );
00362                                                 }
00363                                         }
00364                                 }
00365                         }
00366                 }
00367         }
00368 }
00369 
00370 
00371 /*
00372  * Checks if there are some data to receive
00373  */
00374 bool CBufServer::dataAvailable()
00375 {
00376         // slow down the layer H_AUTO (CBufServer_dataAvailable);
00377         {
00378                 /* If no data available, enter the 'while' loop and return false (1 volatile test)
00379                  * If there are user data available, enter the 'while' and return true immediately (1 volatile test + 1 short locking)
00380                  * If there is a connection/disconnection event (rare), call the callback and loop
00381                  */
00382                 while ( dataAvailableFlag() )
00383                 {
00384                         // Because _DataAvailable is true, the receive queue is not empty at this point
00385                         vector<uint8> buffer;
00386                         uint8 val;
00387                         {
00388                                 CFifoAccessor recvfifo( &receiveQueue() );
00389                                 val = recvfifo.value().frontLast();
00390                                 if ( val != CBufNetBase::User )
00391                                 {
00392                                         recvfifo.value().front( buffer );
00393                                 }
00394                         }
00395 
00396                         /*sint32 mbsize = recvfifo.value().size() / 1048576;
00397                         if ( mbsize > 0 )
00398                         {
00399                           nlwarning( "The receive queue size exceeds %d MB", mbsize );
00400                         }*/
00401 
00402                         /*vector<uint8> buffer;
00403                         recvfifo.value().front( buffer );*/
00404 
00405                         // Test if it the next block is a system event
00406                         //switch ( buffer[buffer.size()-1] )
00407                         switch ( val )
00408                         {
00409                                 
00410                         // Normal message available
00411                         case CBufNetBase::User:
00412                                 return true; // return immediatly, do not extract the message
00413 
00414                         // Process disconnection event
00415                         case CBufNetBase::Disconnection:
00416                         {
00417                                 TSockId sockid = *((TSockId*)(&*buffer.begin()));
00418                                 nldebug( "LNETL1: Disconnection event for %p %s", sockid, sockid->asString().c_str());
00419 
00420                                 sockid->setConnectedState( false );
00421 
00422                                 // Call callback if needed
00423                                 if ( disconnectionCallback() != NULL )
00424                                 {
00425                                         disconnectionCallback()( sockid, argOfDisconnectionCallback() );
00426                                 }
00427 
00428                                 // Add socket object into the synchronized remove list
00429                                 nldebug( "LNETL1: Adding the connection to the remove list" );
00430                                 nlassert( ((CServerBufSock*)sockid)->ownerTask() != NULL );
00431                                 ((CServerBufSock*)sockid)->ownerTask()->addToRemoveSet( sockid );
00432                                 break;
00433                         }
00434                         // Process connection event
00435                         case CBufNetBase::Connection:
00436                         {
00437                                 TSockId sockid = *((TSockId*)(&*buffer.begin()));
00438                                 nldebug( "LNETL1: Connection event for %p %s", sockid, sockid->asString().c_str());
00439 
00440                                 sockid->setConnectedState( true );
00441                                 
00442                                 // Call callback if needed
00443                                 if ( connectionCallback() != NULL )
00444                                 {
00445                                         connectionCallback()( sockid, argOfConnectionCallback() );
00446                                 }
00447                                 break;
00448                         }
00449                         default: // should not occur
00450                                 nlinfo( "LNETL1: Invalid block type: %hu (should be = to %hu", (uint16)(buffer[buffer.size()-1]), (uint16)(val) );
00451                                 nlinfo( "LNETL1: Buffer (%d B): [%s]", buffer.size(), stringFromVector(buffer).c_str() );
00452                                 nlinfo( "LNETL1: Receive queue:" );
00453                                 {
00454                                         CFifoAccessor recvfifo( &receiveQueue() );
00455                                         recvfifo.value().display();
00456                                 }
00457                                 nlerror( "LNETL1: Invalid system event type in server receive queue" );
00458 
00459                         }
00460 
00461                         // Extract system event
00462                         {
00463                                 CFifoAccessor recvfifo( &receiveQueue() );
00464                                 recvfifo.value().pop();
00465                                 setDataAvailableFlag( ! recvfifo.value().empty() );
00466                         }
00467                 }
00468                 // _DataAvailable is false here
00469                 return false;
00470         }
00471 }
00472 
00473 
00474 /* // OLD VERSION
00475 bool CBufServer::dataAvailable()
00476 {
00477         // slow down the layer H_AUTO (CBufServer_dataAvailable);
00478         {
00479                 CFifoAccessor recvfifo( &receiveQueue() );
00480                 do
00481                 {
00482                         // Check if the receive queue is empty
00483                         if ( recvfifo.value().empty() )
00484                         {
00485                                 return false;
00486                         }
00487                         else
00488                         {
00489                           //sint32 mbsize = recvfifo.value().size() / 1048576;
00490                           //if ( mbsize > 0 )
00491                             //{
00492                             //  nlwarning( "The receive queue size exceeds %d MB", mbsize );
00493                             //}
00494 
00495                                 uint8 val = recvfifo.value().frontLast();
00496                                 
00497                                 //vector<uint8> buffer;
00498                                 //recvfifo.value().front( buffer );
00499 
00500                                 // Test if it the next block is a system event
00501                                 //switch ( buffer[buffer.size()-1] )
00502                                 switch ( val )
00503                                 {
00504                                         
00505                                 // Normal message available
00506                                 case CBufNetBase::User:
00507                                         return true; // return immediatly, do not extract the message
00508 
00509                                 // Process disconnection event
00510                                 case CBufNetBase::Disconnection:
00511                                 {
00512                                         vector<uint8> buffer;
00513                                         recvfifo.value().front( buffer );
00514 
00515                                         TSockId sockid = *((TSockId*)(&*buffer.begin()));
00516                                         nldebug( "LNETL1: Disconnection event for %p %s", sockid, sockid->asString().c_str());
00517 
00518                                         sockid->setConnectedState( false );
00519 
00520                                         // Call callback if needed
00521                                         if ( disconnectionCallback() != NULL )
00522                                         {
00523                                                 disconnectionCallback()( sockid, argOfDisconnectionCallback() );
00524                                         }
00525 
00526                                         // Add socket object into the synchronized remove list
00527                                         nldebug( "LNETL1: Adding the connection to the remove list" );
00528                                         nlassert( ((CServerBufSock*)sockid)->ownerTask() != NULL );
00529                                         ((CServerBufSock*)sockid)->ownerTask()->addToRemoveSet( sockid );
00530                                         break;
00531                                 }
00532                                 // Process connection event
00533                                 case CBufNetBase::Connection:
00534                                 {
00535                                         vector<uint8> buffer;
00536                                         recvfifo.value().front( buffer );
00537 
00538                                         TSockId sockid = *((TSockId*)(&*buffer.begin()));
00539                                         nldebug( "LNETL1: Connection event for %p %s", sockid, sockid->asString().c_str());
00540 
00541                                         sockid->setConnectedState( true );
00542                                         
00543                                         // Call callback if needed
00544                                         if ( connectionCallback() != NULL )
00545                                         {
00546                                                 connectionCallback()( sockid, argOfConnectionCallback() );
00547                                         }
00548                                         break;
00549                                 }
00550                                 default:
00551                                         vector<uint8> buffer;
00552                                         recvfifo.value().front( buffer );
00553 
00554                                         nlinfo( "LNETL1: Invalid block type: %hu (should be = to %hu", (uint16)(buffer[buffer.size()-1]), (uint16)(val) );
00555                                         nlinfo( "LNETL1: Buffer (%d B): [%s]", buffer.size(), stringFromVector(buffer).c_str() );
00556                                         nlinfo( "LNETL1: Receive queue:" );
00557                                         recvfifo.value().display();
00558                                         nlerror( "LNETL1: Invalid system event type in server receive queue" );
00559 
00560                                 }
00561 
00562                                 // Extract system event
00563                                 recvfifo.value().pop();
00564                         }
00565                 }
00566                 while ( true );
00567         }
00568 }
00569 */
00570  
00571 /*
00572  * Receives next block of data in the specified. The length and hostid are output arguments.
00573  * Precond: dataAvailable() has returned true, phostid not null
00574  */
00575 void CBufServer::receive( CMemStream& buffer, TSockId* phostid )
00576 {
00577         nlnettrace( "CBufServer::receive" );
00578         //nlassert( dataAvailable() );
00579         nlassert( phostid != NULL );
00580         {
00581                 CFifoAccessor recvfifo( &receiveQueue() );
00582                 nlassert( ! recvfifo.value().empty() );
00583                 recvfifo.value().front( buffer );
00584                 recvfifo.value().pop();
00585                 setDataAvailableFlag( ! recvfifo.value().empty() );
00586         }
00587 
00588         // Extract hostid (and event type)
00589         *phostid = *((TSockId*)&(buffer.buffer()[buffer.length()-sizeof(TSockId)-1]));
00590         nlassert( buffer.buffer()[buffer.length()-1] == CBufNetBase::User );
00591 
00592         // debug features, we number all packet to be sure that they are all sent and received
00593         // \todo remove this debug feature when ok
00594 #ifdef NL_BIG_ENDIAN
00595         uint32 val = NLMISC_BSWAP32(*(uint32*)buffer.buffer());
00596 #else
00597         uint32 val = *(uint32*)buffer.buffer();
00598 #endif
00599 
00600         //      nldebug ("receive message number %u", val);
00601         if ((*phostid)->ReceiveNextValue != val)
00602         {
00603                 nlwarning (("LNETL1: !!!LOST A MESSAGE!!! I received the message number %u but I'm waiting the message number %u (cnx %s), warn lecroart@nevrax.com with the log now please", val, (*phostid)->ReceiveNextValue, (*phostid)->asString().c_str()));
00604                 // resync the message number
00605                 (*phostid)->ReceiveNextValue = val;
00606         }
00607 
00608         (*phostid)->ReceiveNextValue++;
00609 
00610         buffer.resize( buffer.length()-sizeof(TSockId)-1 );
00611 
00612         // TODO OPTIM remove the nldebug for speed
00613         //commented for optimisation nldebug( "LNETL1: Read buffer (%d+%d B) from %s", buffer.length(), sizeof(TSockId)+1, /*stringFromVector(buffer).c_str(), */(*phostid)->asString().c_str() );
00614 
00615         // Statistics
00616         _BytesPoppedIn += buffer.length() + sizeof(TBlockSize);
00617 }
00618 
00619 
00620 /*
00621  * Update the network (call this method evenly)
00622  */
00623 void CBufServer::update()
00624 {
00625         //nlnettrace( "CBufServer::update-BEGIN" );
00626 
00627         _NbConnections = 0;
00628 
00629         // For each thread
00630         CThreadPool::iterator ipt;
00631         {
00632           //nldebug( "UPD: Acquiring the Thread Pool" );
00633                 CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
00634                 //nldebug( "UPD: Acquired." );
00635                 for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
00636                 {
00637                         // For each thread of the pool
00638                         CServerReceiveTask *task = receiveTask(ipt);
00639                         CConnections::iterator ipb;
00640                         {
00641                                 CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
00642                                 for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
00643                                 {
00644                                     // For each socket of the thread, update sending
00645                                     if ( ! ((*ipb)->Sock->connected() && (*ipb)->update()) )
00646                                     {
00647                                                 // Update did not work or the socket is not connected anymore
00648                                         nldebug( "LNETL1: Socket %s is disconnected", (*ipb)->asString().c_str() );
00649                                                 // Disconnection event if disconnected (known either from flush (in update) or when receiving data)
00650                                                 (*ipb)->advertiseDisconnection( this, *ipb );
00651                                         
00652                                                 /*if ( (*ipb)->advertiseDisconnection( this, *ipb ) )
00653                                                 {
00654                                                         // Now the connection removal is in dataAvailable()
00655                                                         // POLL6
00656                                                 }*/
00657                                     }
00658                                     else
00659                                     {
00660                                                 _NbConnections++;
00661                                     }
00662                                 }
00663                         }
00664                 }
00665         }
00666 
00667         //nlnettrace( "CBufServer::update-END" );
00668 }
00669 
00670 uint32 CBufServer::getSendQueueSize( TSockId destid )
00671 {
00672         if ( destid != InvalidSockId )
00673         {
00674                 return destid->SendFifo.size();
00675         }
00676         else
00677         {
00678                 // add all client buffers
00679 
00680                 uint32 total = 0;
00681 
00682                 // For each thread
00683                 CThreadPool::iterator ipt;
00684                 {
00685                         CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
00686                         for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
00687                         {
00688                                 // For each thread of the pool
00689                                 CServerReceiveTask *task = receiveTask(ipt);
00690                                 CConnections::iterator ipb;
00691                                 {
00692                                         CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
00693                                         for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
00694                                         {
00695                                                 // For each socket of the thread, update sending
00696                                                 total = (*ipb)->SendFifo.size ();
00697                                         }
00698                                 }
00699                         }
00700                 }
00701                 return total;
00702         }
00703 }
00704 
00705 
00706 /*
00707  * Returns the number of bytes received since the previous call to this method
00708  */
00709 uint64 CBufServer::newBytesReceived()
00710 {
00711         uint64 b = bytesReceived();
00712         uint64 nbrecvd = b - _PrevBytesPoppedIn;
00713         //nlinfo( "b: %"NL_I64"u   new: %"NL_I64"u", b, nbrecvd );
00714         _PrevBytesPoppedIn = b;
00715         return nbrecvd;
00716 }
00717 
00718 /*
00719  * Returns the number of bytes sent since the previous call to this method
00720  */
00721 uint64 CBufServer::newBytesSent()
00722 {
00723         uint64 b = bytesSent();
00724         uint64 nbsent = b - _PrevBytesPushedOut;
00725         //nlinfo( "b: %"NL_I64"u   new: %"NL_I64"u", b, nbsent );
00726         _PrevBytesPushedOut = b;
00727         return nbsent;
00728 }
00729 
00730 
00731 /***************************************************************************************************
00732  * Listen thread
00733  **************************************************************************************************/
00734 
00735 
00736 /*
00737  * Code of listening thread
00738  */
00739 void CListenTask::run()
00740 {
00741         nlnettrace( "CListenTask::run" );
00742 
00743 #ifdef NL_OS_UNIX
00744         SOCKET descmax;
00745         fd_set readers;
00746         timeval tv;
00747         descmax = _ListenSock.descriptor()>_WakeUpPipeHandle[PipeRead]?_ListenSock.descriptor():_WakeUpPipeHandle[PipeRead];
00748 #endif
00749 
00750         // Accept connections
00751         while ( ! exitRequired() )
00752         {
00753                 try
00754                 {
00755                         // Get and setup the new socket
00756 #ifdef NL_OS_UNIX
00757                         FD_ZERO( &readers );
00758                         FD_SET( _ListenSock.descriptor(), &readers );
00759                         FD_SET( _WakeUpPipeHandle[PipeRead], &readers );
00760                         tv.tv_sec = 60; 
00761                         tv.tv_usec = 0;
00762                         int res = ::select( descmax+1, &readers, NULL, NULL, &tv );
00763 
00764                         switch ( res )
00765                         {
00766                         case  0 : continue; // time-out expired, no results
00767                         case -1 :
00768                                 // we'll ignore message (Interrupted system call) caused by a CTRL-C
00769                                 if (CSock::getLastError() == 4)
00770                                 {
00771                                         nldebug ("LNETL1: Select failed (in listen thread): %s (code %u) but IGNORED", CSock::errorString( CSock::getLastError() ).c_str(), CSock::getLastError());
00772                                         continue;
00773                                 }
00774                                 nlerror( "LNETL1: Select failed (in listen thread): %s (code %u)", CSock::errorString( CSock::getLastError() ).c_str(), CSock::getLastError() );
00775                         }
00776 
00777                         if ( FD_ISSET( _WakeUpPipeHandle[PipeRead], &readers ) )
00778                         {
00779                                 uint8 b;
00780                                 if ( read( _WakeUpPipeHandle[PipeRead], &b, 1 ) == -1 ) // we were woken-up by the wake-up pipe
00781                                 {
00782                                         nldebug( "LNETL1: In CListenTask::run(): read() failed" );
00783                                 }
00784                                 nldebug( "LNETL1: listen thread select woken-up" );
00785                                 continue;
00786                         }
00787 #endif
00788                         nldebug( "LNETL1: Waiting incoming connection..." );
00789                         CServerBufSock *bufsock = new CServerBufSock( _ListenSock.accept() );
00790                         nldebug( "New connection : %s", bufsock->asString().c_str() );
00791                         bufsock->Sock->setNonBlockingMode( true );
00792                         if ( _Server->noDelay() )
00793                         {
00794                                 bufsock->Sock->setNoDelay( true );
00795                         }
00796 
00797                         // Notify the new connection
00798                         bufsock->advertiseConnection( _Server );
00799 
00800                         // Dispatch the socket into the thread pool
00801                         _Server->dispatchNewSocket( bufsock );
00802                 }
00803                 catch ( ESocket& e )
00804                 {
00805                         nlinfo( "Exception in listen thread: %s", e.what() ); // It can occur in normal behavior (e.g. when exiting)
00806                         // It can also occur when too many sockets are open (e.g. 885 connections)
00807                 }
00808         }
00809 
00810         nlnettrace( "Exiting CListenTask::run" );
00811 }
00812 
00813 
00814 /*
00815  * Binds a new socket and send buffer to an existing or a new thread
00816  * Note: this method is called in the listening thread.
00817  */
00818 void CBufServer::dispatchNewSocket( CServerBufSock *bufsock )
00819 {
00820         nlnettrace( "CBufServer::dispatchNewSocket" );
00821 
00822         CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
00823         if ( _ThreadStrategy == SpreadSockets ) 
00824         {
00825                 // Find the thread with the smallest number of connections and check if all
00826                 // threads do not have the same number of connections
00827                 uint min = 0xFFFFFFFF;
00828                 uint max = 0;
00829                 CThreadPool::iterator ipt, iptmin, iptmax;
00830                 for ( iptmin=iptmax=ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
00831                 {
00832                         uint noc = receiveTask(ipt)->numberOfConnections();
00833                         if ( noc < min )
00834                         {
00835                                 min = noc;
00836                                 iptmin = ipt;
00837                         }
00838                         if ( noc > max )
00839                         {
00840                                 max = noc;
00841                                 iptmax = ipt;
00842                         }
00843                 }
00844 
00845                 // Check if we make the pool of threads grow (if we have not found vacant room
00846                 // and if it is allowed to)
00847                 if ( (poolsync.value().empty()) ||
00848                          ((min == max) && (poolsync.value().size() < _MaxThreads)) )
00849                 {
00850                         addNewThread( poolsync.value(), bufsock );
00851                 }
00852                 else
00853                 {
00854                         // Dispatch socket to an existing thread of the pool
00855                         CServerReceiveTask *task = receiveTask(iptmin);
00856                         bufsock->setOwnerTask( task );
00857                         task->addNewSocket( bufsock );
00858 #ifdef NL_OS_UNIX
00859                         task->wakeUp();
00860 #endif                  
00861                         
00862                         if ( min >= (uint)_MaxSocketsPerThread )
00863                         {
00864                                 nlwarning( "LNETL1: Exceeding the maximum number of sockets per thread" );
00865                         }
00866                         nldebug( "LNETL1: New socket dispatched to thread %d", iptmin-poolsync.value().begin() );
00867                 }
00868 
00869         }
00870         else // _ThreadStrategy == FillThreads
00871         {
00872                 CThreadPool::iterator ipt;
00873                 for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
00874                 {
00875                         uint noc = receiveTask(ipt)->numberOfConnections();
00876                         if ( noc < _MaxSocketsPerThread )
00877                         {
00878                                 break;
00879                         }
00880                 }
00881 
00882                 // Check if we have to make the thread pool grow (if we have not found vacant room)
00883                 if ( ipt == poolsync.value().end() )
00884                 {
00885                         if ( poolsync.value().size() == _MaxThreads )
00886                         {
00887                                 nlwarning( "LNETL1: Exceeding the maximum number of threads" );
00888                         }
00889                         addNewThread( poolsync.value(), bufsock );
00890                 }
00891                 else
00892                 {
00893                         // Dispatch socket to an existing thread of the pool
00894                         CServerReceiveTask *task = receiveTask(ipt);
00895                         bufsock->setOwnerTask( task );
00896                         task->addNewSocket( bufsock );
00897 #ifdef NL_OS_UNIX
00898                         task->wakeUp();
00899 #endif                  
00900                         nldebug( "LNETL1: New socket dispatched to thread %d", ipt-poolsync.value().begin() );
00901                 }
00902         }
00903 }
00904 
00905 
00906 /*
00907  * Creates a new task and run a new thread for it
00908  * Precond: bufsock not null
00909  */
00910 void CBufServer::addNewThread( CThreadPool& threadpool, CServerBufSock *bufsock )
00911 {
00912         nlnettrace( "CBufServer::addNewThread" );
00913         nlassert( bufsock != NULL );
00914 
00915         // Create new task and dispatch the socket to it
00916         CServerReceiveTask *task = new CServerReceiveTask( this );
00917         bufsock->setOwnerTask( task );
00918         task->addNewSocket( bufsock );
00919 
00920         // Add a new thread to the pool, with this task
00921         IThread *thr = IThread::create( task );
00922         {
00923                 threadpool.push_back( thr );
00924                 thr->start();
00925                 nldebug( "LNETL1: Added a new thread; pool size is %d", threadpool.size() );
00926                 nldebug( "LNETL1: New socket dispatched to thread %d", threadpool.size()-1 );
00927         }
00928 }
00929 
00930 
00931 /***************************************************************************************************
00932  * Receive threads
00933  **************************************************************************************************/
00934 
00935 
00936 /*
00937  * Code of receiving threads for servers
00938  */
00939 void CServerReceiveTask::run()
00940 {
00941         nlnettrace( "CServerReceiveTask::run" );
00942 
00943         SOCKET descmax;
00944         fd_set readers;
00945 
00946         // Time-out value for select (it can be long because we do not do any thing else in this thread)
00947         timeval tv;
00948 #if defined NL_OS_UNIX
00949         // POLL7
00950         nice( 2 );
00951 #endif // NL_OS_UNIX
00952         
00953         // Copy of _Connections
00954         vector<TSockId> connections_copy;       
00955 
00956         while ( ! exitRequired() )
00957         {
00958                 // 1. Remove closed connections
00959                 clearClosedConnections();
00960 
00961                 // POLL8
00962 
00963                 // 2-SELECT-VERSION : select() on the sockets handled in the present thread
00964 
00965                 descmax = 0;
00966                 FD_ZERO( &readers );
00967                 bool skip;
00968                 bool alldisconnected = true;
00969                 CConnections::iterator ipb;
00970                 {
00971                         // Lock _Connections
00972                         CSynchronized<CConnections>::CAccessor connectionssync( &_Connections );
00973 
00974                         // Prepare to avoid select if there is no connection
00975                         skip = connectionssync.value().empty();
00976 
00977                         // Fill the select array and copy _Connections
00978                         connections_copy.clear();
00979                         for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
00980                         {
00981                                 if ( (*ipb)->Sock->connected() ) // exclude disconnected sockets that are not deleted
00982                                 {
00983                                         alldisconnected = false;
00984                                         // Copy _Connections element
00985                                         connections_copy.push_back( *ipb );
00986 
00987                                         // Add socket descriptor to the select array
00988                                         FD_SET( (*ipb)->Sock->descriptor(), &readers );
00989 
00990                                         // Calculate descmax for select
00991                                         if ( (*ipb)->Sock->descriptor() > descmax )
00992                                         {
00993                                                 descmax = (*ipb)->Sock->descriptor();
00994                                         }
00995                                 }
00996                         }
00997 
00998 #ifdef NL_OS_UNIX
00999                         // Add the wake-up pipe into the select array
01000                         FD_SET( _WakeUpPipeHandle[PipeRead], &readers );
01001                         if ( _WakeUpPipeHandle[PipeRead]>descmax )
01002                         {
01003                                 descmax = _WakeUpPipeHandle[PipeRead];
01004                         }
01005 #endif
01006                         
01007                         // Unlock _Connections, use connections_copy instead
01008                 }
01009 
01010 #ifndef NL_OS_UNIX
01011                 // Avoid select if there is no connection (Windows only)
01012                 if ( skip || alldisconnected )
01013                 {
01014                         nlSleep( 1 ); // nice
01015                         continue;
01016                 }
01017 #endif
01018 
01019 #ifdef NL_OS_WINDOWS
01020                 tv.tv_sec = 0; // short time because the newly added connections can't be added to the select fd_set
01021                 tv.tv_usec = 10000; // NEW: set to 500ms because otherwise new connections handling are too slow
01022 #elif defined NL_OS_UNIX
01023                 // POLL7
01024                 tv.tv_sec = 3600;               // 1 hour (=> 1 select every 3.6 second for 1000 connections)
01025                 tv.tv_usec = 0;
01026 #endif // NL_OS_WINDOWS
01027 
01028                 // Call select
01029                 int res = ::select( descmax+1, &readers, NULL, NULL, &tv );
01030 
01031                 // POLL9
01032 
01033                 // 3. Test the result
01034                 switch ( res )
01035                 {
01036                         case  0 : continue; // time-out expired, no results
01037 
01039                         case -1 :
01040                                 // we'll ignore message (Interrupted system call) caused by a CTRL-C
01041                                 /*if (CSock::getLastError() == 4)
01042                                 {
01043                                         nldebug ("LNETL1: Select failed (in receive thread): %s (code %u) but IGNORED", CSock::errorString( CSock::getLastError() ).c_str(), CSock::getLastError());
01044                                         continue;
01045                                 }*/
01046                                 //nlerror( "LNETL1: Select failed (in receive thread): %s (code %u)", CSock::errorString( CSock::getLastError() ).c_str(), CSock::getLastError() );
01047                                 nldebug( "LNETL1: Select failed (in receive thread): %s (code %u)", CSock::errorString( CSock::getLastError() ).c_str(), CSock::getLastError() );
01048                                 return;
01049                 }
01050 
01051                 // 4. Get results
01052 
01053                 vector<TSockId>::iterator ic;
01054                 for ( ic=connections_copy.begin(); ic!=connections_copy.end(); ++ic )
01055                 {
01056                         if ( FD_ISSET( (*ic)->Sock->descriptor(), &readers ) != 0 )
01057                         {
01058                                 CServerBufSock *serverbufsock = static_cast<CServerBufSock*>(static_cast<CBufSock*>(*ic));
01059                                 try
01060                                 {
01061                                         // 4. Receive data
01062                                         if ( serverbufsock->receivePart() )
01063                                         {
01064                                                 // Copy sockid
01065                                                 vector<uint8> hidvec;
01066                                                 hidvec.resize( sizeof(TSockId)+1 );
01067                                                 memcpy( &*hidvec.begin(), &(*ic), sizeof(TSockId) );
01068 
01069                                                 // Add event type to hidvec
01070                                                 hidvec[sizeof(TSockId)] = (uint8)CBufNetBase::User;
01071 
01072                                                 // Push message into receive queue
01073                                                 //uint32 bufsize;
01074                                                 //sint32 mbsize;
01075                                                 {
01076                                                         //nldebug( "RCV: Acquiring the receive queue... ");
01077                                                         CFifoAccessor recvfifo( &_Server->receiveQueue() );
01078                                                         //nldebug( "RCV: Acquired, pushing the received buffer... ");
01079                                                         recvfifo.value().push( serverbufsock->receivedBuffer(), hidvec );
01080                                                         _Server->setDataAvailableFlag( true );
01081                                                         //nldebug( "RCV: Pushed, releasing the receive queue..." );
01082                                                         //recvfifo.value().display();
01083                                                         //bufsize = serverbufsock->receivedBuffer().size();
01084                                                         //mbsize = recvfifo.value().size() / 1048576;
01085                                                 }
01086                                                 //nldebug( "RCV: Released." );
01087                                                 /*if ( mbsize > 1 )
01088                                                 {
01089                                                         nlwarning( "The receive queue size exceeds %d MB", mbsize );
01090                                                 }*/
01091                                                 /*
01092                                                 // Statistics
01093                                                 {
01094                                                         CSynchronized<uint32>::CAccessor syncbpi ( &_Server->syncBytesPushedIn() );
01095                                                         syncbpi.value() += bufsize;
01096                                                 }
01097                                                 */
01098                                         }
01099                                 }
01100                                 catch ( ESocketConnectionClosed& )
01101                                 {
01102                                         nldebug( "LNETL1: Connection %s closed", serverbufsock->asString().c_str() );
01103                                         // The socket went to _Connected=false when throwing the exception
01104                                 }
01105                                 catch ( ESocket& )
01106                                 {
01107                                         nldebug( "LNETL1: Connection %s broken", serverbufsock->asString().c_str() );
01108                                         (*ic)->Sock->disconnect();
01109                                 }
01110 /*
01111 #ifdef NL_OS_UNIX
01112                                 skip = true; // don't check _WakeUpPipeHandle (yes, check it to read any written byte)
01113 #endif
01114 
01115 */
01116                         }
01117 
01118                 }
01119 
01120 #ifdef NL_OS_UNIX
01121                 // Test wake-up pipe
01122                 if ( (!skip) && (FD_ISSET( _WakeUpPipeHandle[PipeRead], &readers )) )
01123                 {
01124                         uint8 b;
01125                         if ( read( _WakeUpPipeHandle[PipeRead], &b, 1 ) == -1 ) // we were woken-up by the wake-up pipe
01126                         {
01127                                 nldebug( "LNETL1: In CServerReceiveTask::run(): read() failed" );
01128                         }
01129                         nldebug( "LNETL1: Receive thread select woken-up" );
01130                 }
01131 #endif
01132                 
01133         }
01134         nlnettrace( "Exiting CServerReceiveTask::run" );
01135 }
01136 
01137 
01138 /*
01139  * Delete all connections referenced in the remove list (double-mutexed)
01140  */
01141 
01142 void CServerReceiveTask::clearClosedConnections()
01143 {
01144         CConnections::iterator ic;
01145         {
01146                 NLMISC::CSynchronized<CConnections>::CAccessor removesetsync( &_RemoveSet );
01147                 {
01148                         if ( ! removesetsync.value().empty() )
01149                         {
01150                                 // Delete closed connections
01151                                 NLMISC::CSynchronized<CConnections>::CAccessor connectionssync( &_Connections );
01152                                 for ( ic=removesetsync.value().begin(); ic!=removesetsync.value().end(); ++ic )
01153                                 {
01154                                         nldebug( "LNETL1: Removing a connection" );
01155 
01156                                         TSockId sid = (*ic);
01157 
01158                                         // Remove from the connection list
01159                                         connectionssync.value().erase( *ic );
01160 
01161                                         // Delete the socket object
01162                                         delete sid;
01163                                 }
01164                                 // Clear remove list
01165                                 removesetsync.value().clear();
01166                         }
01167                 }
01168         }
01169 }
01170 
01171 
01172 } // NLNET