00001
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026 #include "stdnet.h"
00027
00028 #include "nel/misc/hierarchical_timer.h"
00029
00030 #include "nel/net/buf_server.h"
00031
00032 #ifdef NL_OS_WINDOWS
00033 #include <winsock2.h>
00034
00035
00036 #elif defined NL_OS_UNIX
00037 #include <unistd.h>
00038 #include <sys/types.h>
00039 #include <sys/time.h>
00040 #endif
00041
00042
00043 using namespace NLMISC;
00044 using namespace std;
00045
00046 namespace NLNET {
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057 CBufServer::CBufServer( TThreadStategy strategy,
00058 uint16 max_threads, uint16 max_sockets_per_thread, bool nodelay, bool replaymode ) :
00059 CBufNetBase(),
00060 _NoDelay( nodelay ),
00061 _ThreadStrategy( strategy ),
00062 _MaxThreads( max_threads ),
00063 _MaxSocketsPerThread( max_sockets_per_thread ),
00064 _ListenTask( NULL ),
00065 _ListenThread( NULL ),
00066 _ThreadPool("CBufServer::_ThreadPool"),
00067 _ConnectionCallback( NULL ),
00068 _ConnectionCbArg( NULL ),
00069 _BytesPushedOut( 0 ),
00070 _BytesPoppedIn( 0 ),
00071 _PrevBytesPoppedIn( 0 ),
00072 _PrevBytesPushedOut( 0 ),
00073 _NbConnections (0),
00074 _ReplayMode( replaymode )
00075 {
00076 nlnettrace( "CBufServer::CBufServer" );
00077 if ( ! _ReplayMode )
00078 {
00079 _ListenTask = new CListenTask( this );
00080 _ListenThread = IThread::create( _ListenTask );
00081 }
00082
00083
00084
00085
00086 }
00087
00088
00089
00090
00091
00092 void CBufServer::init( uint16 port )
00093 {
00094 nlnettrace( "CBufServer::init" );
00095 if ( ! _ReplayMode )
00096 {
00097 _ListenTask->init( port );
00098 _ListenThread->start();
00099 }
00100 else
00101 {
00102 nldebug( "LNETL0: Binding listen socket to any address, port %hu", port );
00103 }
00104 }
00105
00106
00107
00108
00109
00110 void CListenTask::init( uint16 port )
00111 {
00112 nlnettrace( "CListenTask::init" );
00113 _ListenSock.init( port );
00114 }
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125 CServerTask::CServerTask() : _ExitRequired(false)
00126 {
00127 #ifdef NL_OS_UNIX
00128 pipe( _WakeUpPipeHandle );
00129 #endif
00130 }
00131
00132
00133
00134 #ifdef NL_OS_UNIX
00135
00136
00137
00138 void CServerTask::wakeUp()
00139 {
00140 uint8 b;
00141 if ( write( _WakeUpPipeHandle[PipeWrite], &b, 1 ) == -1 )
00142 {
00143 nldebug( "LNETL1: In CServerTask::wakeUp(): write() failed" );
00144 }
00145 }
00146 #endif
00147
00148
00149
00150
00151
00152 CServerTask::~CServerTask()
00153 {
00154 #ifdef NL_OS_UNIX
00155 close( _WakeUpPipeHandle[PipeRead] );
00156 close( _WakeUpPipeHandle[PipeWrite] );
00157 #endif
00158 }
00159
00160
00161
00162
00163
00164 CBufServer::~CBufServer()
00165 {
00166 nlnettrace( "CBufServer::~CBufServer" );
00167
00168
00169 if ( ! _ReplayMode )
00170 {
00171 ((CListenTask*)(_ListenThread->getRunnable()))->requireExit();
00172 ((CListenTask*)(_ListenThread->getRunnable()))->close();
00173 #ifdef NL_OS_UNIX
00174 _ListenTask->wakeUp();
00175 #endif
00176 _ListenThread->wait();
00177 delete _ListenThread;
00178 delete _ListenTask;
00179
00180
00181 CThreadPool::iterator ipt;
00182 {
00183 nldebug( "LNETL1: Waiting for end of threads..." );
00184 CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
00185 for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
00186 {
00187
00188 CServerReceiveTask *task = receiveTask(ipt);
00189 nlnettrace( "Requiring exit" );
00190 task->requireExit();
00191
00192
00193 #ifdef NL_OS_UNIX
00194 task->wakeUp();
00195 #else
00196 CConnections::iterator ipb;
00197 nlnettrace( "Closing sockets (Win32)" );
00198 {
00199 CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
00200 for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
00201 {
00202 (*ipb)->Sock->close();
00203 }
00204 }
00205 #endif
00206
00207 }
00208
00209 nlnettrace( "Waiting" );
00210 for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
00211 {
00212
00213 (*ipt)->wait();
00214 }
00215
00216 nldebug( "LNETL1: Deleting sockets, tasks and threads..." );
00217 for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
00218 {
00219
00220 CServerReceiveTask *task = receiveTask(ipt);
00221 CConnections::iterator ipb;
00222 {
00223 CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
00224 for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
00225 {
00226 delete (*ipb);
00227 }
00228 }
00229
00230 #ifdef NL_OS_UNIX
00231
00232 nlnettrace( "Closing sockets (Unix)" );
00233 {
00234 CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
00235 for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
00236 {
00237 (*ipb)->Sock->close();
00238 }
00239 }
00240 #endif
00241
00242
00243 delete task;
00244
00245
00246 delete (*ipt);
00247 }
00248 }
00249 }
00250
00251 nlnettrace( "Exiting CBufServer::~CBufServer" );
00252 }
00253
00254
00255
00256
00257
00258
00259
00260
00261 void CBufServer::disconnect( TSockId hostid, bool quick )
00262 {
00263 nlnettrace( "CBufServer::disconnect" );
00264 if ( hostid != InvalidSockId )
00265 {
00266
00267 if ( hostid->Sock->connected() )
00268 {
00269 if ( ! quick )
00270 {
00271 hostid->flush();
00272 }
00273 hostid->Sock->disconnect();
00274 }
00275 }
00276 else
00277 {
00278
00279 CThreadPool::iterator ipt;
00280 {
00281 CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
00282 for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
00283 {
00284 CServerReceiveTask *task = receiveTask(ipt);
00285 CConnections::iterator ipb;
00286 {
00287 CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
00288 for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
00289 {
00290 if ( (*ipb)->Sock->connected() )
00291 {
00292 if ( ! quick )
00293 {
00294 (*ipb)->flush();
00295 }
00296 (*ipb)->Sock->disconnect();
00297 }
00298 }
00299 }
00300 }
00301 }
00302 }
00303 }
00304
00305
00306
00307
00308
00309 void CBufServer::send( const CMemStream& buffer, TSockId hostid )
00310 {
00311 nlnettrace( "CBufServer::send" );
00312 nlassert( buffer.length() > 0);
00313 nlassert( buffer.length() <= maxSentBlockSize() );
00314
00315
00316
00317 if ( hostid != InvalidSockId )
00318 {
00319
00320
00321
00322 #ifdef NL_BIG_ENDIAN
00323 uint32 val = NLMISC_BSWAP32(hostid->SendNextValue);
00324 #else
00325 uint32 val = hostid->SendNextValue;
00326 #endif
00327
00328 *(uint32*)buffer.buffer() = val;
00329 hostid->SendNextValue++;
00330
00331 pushBufferToHost( buffer, hostid );
00332 }
00333 else
00334 {
00335
00336 CThreadPool::iterator ipt;
00337 {
00338 CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
00339 for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
00340 {
00341 CServerReceiveTask *task = receiveTask(ipt);
00342 CConnections::iterator ipb;
00343 {
00344 CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
00345 for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
00346 {
00347
00348 if ( (*ipb)->connectedState() )
00349 {
00350
00351
00352
00353 #ifdef NL_BIG_ENDIAN
00354 uint32 val = NLMISC_BSWAP32((*ipb)->SendNextValue);
00355 #else
00356 uint32 val = (*ipb)->SendNextValue;
00357 #endif
00358 *(uint32*)buffer.buffer() = val;
00359 (*ipb)->SendNextValue++;
00360
00361 pushBufferToHost( buffer, *ipb );
00362 }
00363 }
00364 }
00365 }
00366 }
00367 }
00368 }
00369
00370
00371
00372
00373
00374 bool CBufServer::dataAvailable()
00375 {
00376
00377 {
00378
00379
00380
00381
00382 while ( dataAvailableFlag() )
00383 {
00384
00385 vector<uint8> buffer;
00386 uint8 val;
00387 {
00388 CFifoAccessor recvfifo( &receiveQueue() );
00389 val = recvfifo.value().frontLast();
00390 if ( val != CBufNetBase::User )
00391 {
00392 recvfifo.value().front( buffer );
00393 }
00394 }
00395
00396
00397
00398
00399
00400
00401
00402
00403
00404
00405
00406
00407 switch ( val )
00408 {
00409
00410
00411 case CBufNetBase::User:
00412 return true;
00413
00414
00415 case CBufNetBase::Disconnection:
00416 {
00417 TSockId sockid = *((TSockId*)(&*buffer.begin()));
00418 nldebug( "LNETL1: Disconnection event for %p %s", sockid, sockid->asString().c_str());
00419
00420 sockid->setConnectedState( false );
00421
00422
00423 if ( disconnectionCallback() != NULL )
00424 {
00425 disconnectionCallback()( sockid, argOfDisconnectionCallback() );
00426 }
00427
00428
00429 nldebug( "LNETL1: Adding the connection to the remove list" );
00430 nlassert( ((CServerBufSock*)sockid)->ownerTask() != NULL );
00431 ((CServerBufSock*)sockid)->ownerTask()->addToRemoveSet( sockid );
00432 break;
00433 }
00434
00435 case CBufNetBase::Connection:
00436 {
00437 TSockId sockid = *((TSockId*)(&*buffer.begin()));
00438 nldebug( "LNETL1: Connection event for %p %s", sockid, sockid->asString().c_str());
00439
00440 sockid->setConnectedState( true );
00441
00442
00443 if ( connectionCallback() != NULL )
00444 {
00445 connectionCallback()( sockid, argOfConnectionCallback() );
00446 }
00447 break;
00448 }
00449 default:
00450 nlinfo( "LNETL1: Invalid block type: %hu (should be = to %hu", (uint16)(buffer[buffer.size()-1]), (uint16)(val) );
00451 nlinfo( "LNETL1: Buffer (%d B): [%s]", buffer.size(), stringFromVector(buffer).c_str() );
00452 nlinfo( "LNETL1: Receive queue:" );
00453 {
00454 CFifoAccessor recvfifo( &receiveQueue() );
00455 recvfifo.value().display();
00456 }
00457 nlerror( "LNETL1: Invalid system event type in server receive queue" );
00458
00459 }
00460
00461
00462 {
00463 CFifoAccessor recvfifo( &receiveQueue() );
00464 recvfifo.value().pop();
00465 setDataAvailableFlag( ! recvfifo.value().empty() );
00466 }
00467 }
00468
00469 return false;
00470 }
00471 }
00472
00473
00474
00475
00476
00477
00478
00479
00480
00481
00482
00483
00484
00485
00486
00487
00488
00489
00490
00491
00492
00493
00494
00495
00496
00497
00498
00499
00500
00501
00502
00503
00504
00505
00506
00507
00508
00509
00510
00511
00512
00513
00514
00515
00516
00517
00518
00519
00520
00521
00522
00523
00524
00525
00526
00527
00528
00529
00530
00531
00532
00533
00534
00535
00536
00537
00538
00539
00540
00541
00542
00543
00544
00545
00546
00547
00548
00549
00550
00551
00552
00553
00554
00555
00556
00557
00558
00559
00560
00561
00562
00563
00564
00565
00566
00567
00568
00569
00570
00571
00572
00573
00574
00575 void CBufServer::receive( CMemStream& buffer, TSockId* phostid )
00576 {
00577 nlnettrace( "CBufServer::receive" );
00578
00579 nlassert( phostid != NULL );
00580 {
00581 CFifoAccessor recvfifo( &receiveQueue() );
00582 nlassert( ! recvfifo.value().empty() );
00583 recvfifo.value().front( buffer );
00584 recvfifo.value().pop();
00585 setDataAvailableFlag( ! recvfifo.value().empty() );
00586 }
00587
00588
00589 *phostid = *((TSockId*)&(buffer.buffer()[buffer.length()-sizeof(TSockId)-1]));
00590 nlassert( buffer.buffer()[buffer.length()-1] == CBufNetBase::User );
00591
00592
00593
00594 #ifdef NL_BIG_ENDIAN
00595 uint32 val = NLMISC_BSWAP32(*(uint32*)buffer.buffer());
00596 #else
00597 uint32 val = *(uint32*)buffer.buffer();
00598 #endif
00599
00600
00601 if ((*phostid)->ReceiveNextValue != val)
00602 {
00603 nlwarning (("LNETL1: !!!LOST A MESSAGE!!! I received the message number %u but I'm waiting the message number %u (cnx %s), warn lecroart@nevrax.com with the log now please", val, (*phostid)->ReceiveNextValue, (*phostid)->asString().c_str()));
00604
00605 (*phostid)->ReceiveNextValue = val;
00606 }
00607
00608 (*phostid)->ReceiveNextValue++;
00609
00610 buffer.resize( buffer.length()-sizeof(TSockId)-1 );
00611
00612
00613
00614
00615
00616 _BytesPoppedIn += buffer.length() + sizeof(TBlockSize);
00617 }
00618
00619
00620
00621
00622
00623 void CBufServer::update()
00624 {
00625
00626
00627 _NbConnections = 0;
00628
00629
00630 CThreadPool::iterator ipt;
00631 {
00632
00633 CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
00634
00635 for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
00636 {
00637
00638 CServerReceiveTask *task = receiveTask(ipt);
00639 CConnections::iterator ipb;
00640 {
00641 CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
00642 for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
00643 {
00644
00645 if ( ! ((*ipb)->Sock->connected() && (*ipb)->update()) )
00646 {
00647
00648 nldebug( "LNETL1: Socket %s is disconnected", (*ipb)->asString().c_str() );
00649
00650 (*ipb)->advertiseDisconnection( this, *ipb );
00651
00652
00653
00654
00655
00656
00657 }
00658 else
00659 {
00660 _NbConnections++;
00661 }
00662 }
00663 }
00664 }
00665 }
00666
00667
00668 }
00669
00670 uint32 CBufServer::getSendQueueSize( TSockId destid )
00671 {
00672 if ( destid != InvalidSockId )
00673 {
00674 return destid->SendFifo.size();
00675 }
00676 else
00677 {
00678
00679
00680 uint32 total = 0;
00681
00682
00683 CThreadPool::iterator ipt;
00684 {
00685 CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
00686 for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
00687 {
00688
00689 CServerReceiveTask *task = receiveTask(ipt);
00690 CConnections::iterator ipb;
00691 {
00692 CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
00693 for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
00694 {
00695
00696 total = (*ipb)->SendFifo.size ();
00697 }
00698 }
00699 }
00700 }
00701 return total;
00702 }
00703 }
00704
00705
00706
00707
00708
00709 uint64 CBufServer::newBytesReceived()
00710 {
00711 uint64 b = bytesReceived();
00712 uint64 nbrecvd = b - _PrevBytesPoppedIn;
00713
00714 _PrevBytesPoppedIn = b;
00715 return nbrecvd;
00716 }
00717
00718
00719
00720
00721 uint64 CBufServer::newBytesSent()
00722 {
00723 uint64 b = bytesSent();
00724 uint64 nbsent = b - _PrevBytesPushedOut;
00725
00726 _PrevBytesPushedOut = b;
00727 return nbsent;
00728 }
00729
00730
00731
00732
00733
00734
00735
00736
00737
00738
00739 void CListenTask::run()
00740 {
00741 nlnettrace( "CListenTask::run" );
00742
00743 #ifdef NL_OS_UNIX
00744 SOCKET descmax;
00745 fd_set readers;
00746 timeval tv;
00747 descmax = _ListenSock.descriptor()>_WakeUpPipeHandle[PipeRead]?_ListenSock.descriptor():_WakeUpPipeHandle[PipeRead];
00748 #endif
00749
00750
00751 while ( ! exitRequired() )
00752 {
00753 try
00754 {
00755
00756 #ifdef NL_OS_UNIX
00757 FD_ZERO( &readers );
00758 FD_SET( _ListenSock.descriptor(), &readers );
00759 FD_SET( _WakeUpPipeHandle[PipeRead], &readers );
00760 tv.tv_sec = 60;
00761 tv.tv_usec = 0;
00762 int res = ::select( descmax+1, &readers, NULL, NULL, &tv );
00763
00764 switch ( res )
00765 {
00766 case 0 : continue;
00767 case -1 :
00768
00769 if (CSock::getLastError() == 4)
00770 {
00771 nldebug ("LNETL1: Select failed (in listen thread): %s (code %u) but IGNORED", CSock::errorString( CSock::getLastError() ).c_str(), CSock::getLastError());
00772 continue;
00773 }
00774 nlerror( "LNETL1: Select failed (in listen thread): %s (code %u)", CSock::errorString( CSock::getLastError() ).c_str(), CSock::getLastError() );
00775 }
00776
00777 if ( FD_ISSET( _WakeUpPipeHandle[PipeRead], &readers ) )
00778 {
00779 uint8 b;
00780 if ( read( _WakeUpPipeHandle[PipeRead], &b, 1 ) == -1 )
00781 {
00782 nldebug( "LNETL1: In CListenTask::run(): read() failed" );
00783 }
00784 nldebug( "LNETL1: listen thread select woken-up" );
00785 continue;
00786 }
00787 #endif
00788 nldebug( "LNETL1: Waiting incoming connection..." );
00789 CServerBufSock *bufsock = new CServerBufSock( _ListenSock.accept() );
00790 nldebug( "New connection : %s", bufsock->asString().c_str() );
00791 bufsock->Sock->setNonBlockingMode( true );
00792 if ( _Server->noDelay() )
00793 {
00794 bufsock->Sock->setNoDelay( true );
00795 }
00796
00797
00798 bufsock->advertiseConnection( _Server );
00799
00800
00801 _Server->dispatchNewSocket( bufsock );
00802 }
00803 catch ( ESocket& e )
00804 {
00805 nlinfo( "Exception in listen thread: %s", e.what() );
00806
00807 }
00808 }
00809
00810 nlnettrace( "Exiting CListenTask::run" );
00811 }
00812
00813
00814
00815
00816
00817
00818 void CBufServer::dispatchNewSocket( CServerBufSock *bufsock )
00819 {
00820 nlnettrace( "CBufServer::dispatchNewSocket" );
00821
00822 CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
00823 if ( _ThreadStrategy == SpreadSockets )
00824 {
00825
00826
00827 uint min = 0xFFFFFFFF;
00828 uint max = 0;
00829 CThreadPool::iterator ipt, iptmin, iptmax;
00830 for ( iptmin=iptmax=ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
00831 {
00832 uint noc = receiveTask(ipt)->numberOfConnections();
00833 if ( noc < min )
00834 {
00835 min = noc;
00836 iptmin = ipt;
00837 }
00838 if ( noc > max )
00839 {
00840 max = noc;
00841 iptmax = ipt;
00842 }
00843 }
00844
00845
00846
00847 if ( (poolsync.value().empty()) ||
00848 ((min == max) && (poolsync.value().size() < _MaxThreads)) )
00849 {
00850 addNewThread( poolsync.value(), bufsock );
00851 }
00852 else
00853 {
00854
00855 CServerReceiveTask *task = receiveTask(iptmin);
00856 bufsock->setOwnerTask( task );
00857 task->addNewSocket( bufsock );
00858 #ifdef NL_OS_UNIX
00859 task->wakeUp();
00860 #endif
00861
00862 if ( min >= (uint)_MaxSocketsPerThread )
00863 {
00864 nlwarning( "LNETL1: Exceeding the maximum number of sockets per thread" );
00865 }
00866 nldebug( "LNETL1: New socket dispatched to thread %d", iptmin-poolsync.value().begin() );
00867 }
00868
00869 }
00870 else
00871 {
00872 CThreadPool::iterator ipt;
00873 for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
00874 {
00875 uint noc = receiveTask(ipt)->numberOfConnections();
00876 if ( noc < _MaxSocketsPerThread )
00877 {
00878 break;
00879 }
00880 }
00881
00882
00883 if ( ipt == poolsync.value().end() )
00884 {
00885 if ( poolsync.value().size() == _MaxThreads )
00886 {
00887 nlwarning( "LNETL1: Exceeding the maximum number of threads" );
00888 }
00889 addNewThread( poolsync.value(), bufsock );
00890 }
00891 else
00892 {
00893
00894 CServerReceiveTask *task = receiveTask(ipt);
00895 bufsock->setOwnerTask( task );
00896 task->addNewSocket( bufsock );
00897 #ifdef NL_OS_UNIX
00898 task->wakeUp();
00899 #endif
00900 nldebug( "LNETL1: New socket dispatched to thread %d", ipt-poolsync.value().begin() );
00901 }
00902 }
00903 }
00904
00905
00906
00907
00908
00909
00910 void CBufServer::addNewThread( CThreadPool& threadpool, CServerBufSock *bufsock )
00911 {
00912 nlnettrace( "CBufServer::addNewThread" );
00913 nlassert( bufsock != NULL );
00914
00915
00916 CServerReceiveTask *task = new CServerReceiveTask( this );
00917 bufsock->setOwnerTask( task );
00918 task->addNewSocket( bufsock );
00919
00920
00921 IThread *thr = IThread::create( task );
00922 {
00923 threadpool.push_back( thr );
00924 thr->start();
00925 nldebug( "LNETL1: Added a new thread; pool size is %d", threadpool.size() );
00926 nldebug( "LNETL1: New socket dispatched to thread %d", threadpool.size()-1 );
00927 }
00928 }
00929
00930
00931
00932
00933
00934
00935
00936
00937
00938
00939 void CServerReceiveTask::run()
00940 {
00941 nlnettrace( "CServerReceiveTask::run" );
00942
00943 SOCKET descmax;
00944 fd_set readers;
00945
00946
00947 timeval tv;
00948 #if defined NL_OS_UNIX
00949
00950 nice( 2 );
00951 #endif // NL_OS_UNIX
00952
00953
00954 vector<TSockId> connections_copy;
00955
00956 while ( ! exitRequired() )
00957 {
00958
00959 clearClosedConnections();
00960
00961
00962
00963
00964
00965 descmax = 0;
00966 FD_ZERO( &readers );
00967 bool skip;
00968 bool alldisconnected = true;
00969 CConnections::iterator ipb;
00970 {
00971
00972 CSynchronized<CConnections>::CAccessor connectionssync( &_Connections );
00973
00974
00975 skip = connectionssync.value().empty();
00976
00977
00978 connections_copy.clear();
00979 for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
00980 {
00981 if ( (*ipb)->Sock->connected() )
00982 {
00983 alldisconnected = false;
00984
00985 connections_copy.push_back( *ipb );
00986
00987
00988 FD_SET( (*ipb)->Sock->descriptor(), &readers );
00989
00990
00991 if ( (*ipb)->Sock->descriptor() > descmax )
00992 {
00993 descmax = (*ipb)->Sock->descriptor();
00994 }
00995 }
00996 }
00997
00998 #ifdef NL_OS_UNIX
00999
01000 FD_SET( _WakeUpPipeHandle[PipeRead], &readers );
01001 if ( _WakeUpPipeHandle[PipeRead]>descmax )
01002 {
01003 descmax = _WakeUpPipeHandle[PipeRead];
01004 }
01005 #endif
01006
01007
01008 }
01009
01010 #ifndef NL_OS_UNIX
01011
01012 if ( skip || alldisconnected )
01013 {
01014 nlSleep( 1 );
01015 continue;
01016 }
01017 #endif
01018
01019 #ifdef NL_OS_WINDOWS
01020 tv.tv_sec = 0;
01021 tv.tv_usec = 10000;
01022 #elif defined NL_OS_UNIX
01023
01024 tv.tv_sec = 3600;
01025 tv.tv_usec = 0;
01026 #endif // NL_OS_WINDOWS
01027
01028
01029 int res = ::select( descmax+1, &readers, NULL, NULL, &tv );
01030
01031
01032
01033
01034 switch ( res )
01035 {
01036 case 0 : continue;
01037
01039 case -1 :
01040
01041
01042
01043
01044
01045
01046
01047 nldebug( "LNETL1: Select failed (in receive thread): %s (code %u)", CSock::errorString( CSock::getLastError() ).c_str(), CSock::getLastError() );
01048 return;
01049 }
01050
01051
01052
01053 vector<TSockId>::iterator ic;
01054 for ( ic=connections_copy.begin(); ic!=connections_copy.end(); ++ic )
01055 {
01056 if ( FD_ISSET( (*ic)->Sock->descriptor(), &readers ) != 0 )
01057 {
01058 CServerBufSock *serverbufsock = static_cast<CServerBufSock*>(static_cast<CBufSock*>(*ic));
01059 try
01060 {
01061
01062 if ( serverbufsock->receivePart() )
01063 {
01064
01065 vector<uint8> hidvec;
01066 hidvec.resize( sizeof(TSockId)+1 );
01067 memcpy( &*hidvec.begin(), &(*ic), sizeof(TSockId) );
01068
01069
01070 hidvec[sizeof(TSockId)] = (uint8)CBufNetBase::User;
01071
01072
01073
01074
01075 {
01076
01077 CFifoAccessor recvfifo( &_Server->receiveQueue() );
01078
01079 recvfifo.value().push( serverbufsock->receivedBuffer(), hidvec );
01080 _Server->setDataAvailableFlag( true );
01081
01082
01083
01084
01085 }
01086
01087
01088
01089
01090
01091
01092
01093
01094
01095
01096
01097
01098 }
01099 }
01100 catch ( ESocketConnectionClosed& )
01101 {
01102 nldebug( "LNETL1: Connection %s closed", serverbufsock->asString().c_str() );
01103
01104 }
01105 catch ( ESocket& )
01106 {
01107 nldebug( "LNETL1: Connection %s broken", serverbufsock->asString().c_str() );
01108 (*ic)->Sock->disconnect();
01109 }
01110
01111
01112
01113
01114
01115
01116 }
01117
01118 }
01119
01120 #ifdef NL_OS_UNIX
01121
01122 if ( (!skip) && (FD_ISSET( _WakeUpPipeHandle[PipeRead], &readers )) )
01123 {
01124 uint8 b;
01125 if ( read( _WakeUpPipeHandle[PipeRead], &b, 1 ) == -1 )
01126 {
01127 nldebug( "LNETL1: In CServerReceiveTask::run(): read() failed" );
01128 }
01129 nldebug( "LNETL1: Receive thread select woken-up" );
01130 }
01131 #endif
01132
01133 }
01134 nlnettrace( "Exiting CServerReceiveTask::run" );
01135 }
01136
01137
01138
01139
01140
01141
01142 void CServerReceiveTask::clearClosedConnections()
01143 {
01144 CConnections::iterator ic;
01145 {
01146 NLMISC::CSynchronized<CConnections>::CAccessor removesetsync( &_RemoveSet );
01147 {
01148 if ( ! removesetsync.value().empty() )
01149 {
01150
01151 NLMISC::CSynchronized<CConnections>::CAccessor connectionssync( &_Connections );
01152 for ( ic=removesetsync.value().begin(); ic!=removesetsync.value().end(); ++ic )
01153 {
01154 nldebug( "LNETL1: Removing a connection" );
01155
01156 TSockId sid = (*ic);
01157
01158
01159 connectionssync.value().erase( *ic );
01160
01161
01162 delete sid;
01163 }
01164
01165 removesetsync.value().clear();
01166 }
01167 }
01168 }
01169 }
01170
01171
01172 }