# Home    # nevrax.com   
Nevrax
Nevrax.org
#News
#Mailing-list
#Documentation
#CVS
#Bugs
#License
Docs
 
Documentation  
Main Page   Namespace List   Class Hierarchy   Alphabetical List   Compound List   File List   Namespace Members   Compound Members   File Members   Related Pages   Search  

buf_server.h

Go to the documentation of this file.
00001 
00007 /* Copyright, 2001 Nevrax Ltd.
00008  *
00009  * This file is part of NEVRAX NEL.
00010  * NEVRAX NEL is free software; you can redistribute it and/or modify
00011  * it under the terms of the GNU General Public License as published by
00012  * the Free Software Foundation; either version 2, or (at your option)
00013  * any later version.
00014 
00015  * NEVRAX NEL is distributed in the hope that it will be useful, but
00016  * WITHOUT ANY WARRANTY; without even the implied warranty of
00017  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
00018  * General Public License for more details.
00019 
00020  * You should have received a copy of the GNU General Public License
00021  * along with NEVRAX NEL; see the file COPYING. If not, write to the
00022  * Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
00023  * MA 02111-1307, USA.
00024  */
00025 
00026 #ifndef NL_BUF_SERVER_H
00027 #define NL_BUF_SERVER_H
00028 
00029 #include "nel/misc/types_nl.h"
00030 #include "nel/net/buf_net_base.h"
00031 #include "nel/net/listen_sock.h"
00032 #include "nel/net/buf_sock.h"
00033 #include <list>
00034 #include <set>
00035 
00036 // POLL1 (ignore POLL comments)
00037 
00038 namespace NLNET {
00039 
00040 
00041 class CInetAddress;
00042 class CBufServer;
00043 
00044 
00048 class CServerTask
00049 {
00050 public:
00051 
00053         virtual ~CServerTask();
00054 
00056         void    requireExit() { _ExitRequired = true; }
00057 
00058 #ifdef NL_OS_UNIX
00059 
00060         void    wakeUp();
00061 #endif
00062 
00063 protected:
00064 
00066         CServerTask();
00067         
00069         bool    exitRequired() const { return _ExitRequired; }
00070 
00071 #ifdef NL_OS_UNIX
00072 
00073         int                     _WakeUpPipeHandle [2];
00074 #endif
00075 
00076 private:
00077 
00078         volatile bool   _ExitRequired;  
00079 };
00080 
00081 
00085 class CListenTask : public NLMISC::IRunnable, public CServerTask
00086 {
00087 public:
00088 
00090         CListenTask( CBufServer *server ) : CServerTask(), _Server(server) {}
00091 
00093         void                    init( uint16 port );
00094 
00096         virtual void    run();
00097 
00099         void                    close() { _ListenSock.close(); }
00100 
00102         const CInetAddress&     localAddr() { return _ListenSock.localAddr(); }
00103 
00104 private:
00105 
00106         CBufServer                      *_Server;       
00107         CListenSock                     _ListenSock;
00108 
00109 };
00110 
00111 
00112 typedef std::vector<NLMISC::IThread*> CThreadPool;
00113 
00114 
00115 // Mode: Small server
00116 #undef PRESET_BIG_SERVER
00117 
00118 #ifdef PRESET_BIG_SERVER
00119 // Big server
00120 #define DEFAULT_STRATEGY SpreadSockets
00121 #define DEFAULT_MAX_THREADS 64
00122 #define DEFAULT_MAX_SOCKETS_PER_THREADS 64
00123 #else
00124 // Small server
00125 #define DEFAULT_STRATEGY FillThreads
00126 #define DEFAULT_MAX_THREADS 64
00127 #define DEFAULT_MAX_SOCKETS_PER_THREADS 16
00128 #endif
00129 
00130 
00153 class CBufServer : public CBufNetBase
00154 {
00155 public:
00156 
00157         enum TThreadStategy { SpreadSockets, FillThreads };
00158 
00162         CBufServer( TThreadStategy strategy=DEFAULT_STRATEGY,
00163                                 uint16 max_threads=DEFAULT_MAX_THREADS,
00164                                 uint16 max_sockets_per_thread=DEFAULT_MAX_SOCKETS_PER_THREADS,
00165                                 bool nodelay=true, bool replaymode=false );
00166 
00168         virtual ~CBufServer();
00169 
00171         void    init( uint16 port );
00172 
00178         void    disconnect( TSockId hostid, bool quick=false );
00179 
00181         void    setConnectionCallback( TNetCallback cb, void* arg ) { _ConnectionCallback = cb; _ConnectionCbArg = arg; }
00182 
00183 
00184 
00187         //void  send( const std::vector<uint8>& buffer, TSockId hostid );
00188         void    send( const NLMISC::CMemStream& buffer, TSockId hostid );
00189 
00193         bool    dataAvailable();
00194 
00198         //void  receive( std::vector<uint8>& buffer, TSockId* hostid );
00199         void    receive( NLMISC::CMemStream& buffer, TSockId* hostid );
00200 
00202         void    update();
00203 
00204 
00205 
00206 
00207         // Returns the size in bytes of the data stored in the send queue.
00208         uint32  getSendQueueSize( TSockId destid );
00209 
00213         void    setTimeFlushTrigger( TSockId destid, sint32 ms ) { nlassert( destid != InvalidSockId ); destid->setTimeFlushTrigger( ms ); }
00214 
00218         void    setSizeFlushTrigger( TSockId destid, sint32 size ) { nlassert( destid != InvalidSockId ); destid->setSizeFlushTrigger( size ); }
00219 
00224         bool    flush( TSockId destid ) { nlassert( destid != InvalidSockId ); return destid->flush(); }
00225 
00226 
00227 
00228 
00230         const CInetAddress&     listenAddress() const { return _ListenTask->localAddr(); }
00231 
00233         const CInetAddress& hostAddress( TSockId hostid ) { nlassert( hostid != InvalidSockId ); return hostid->Sock->remoteAddr(); }
00234 
00235         /*
00237         uint32  bytesDownloaded()
00238         {
00239                 NLMISC::CSynchronized<uint32>::CAccessor syncbpi ( &_BytesPushedIn );
00240                 return syncbpi.value();
00241         }
00243         uint32  newBytesDownloaded();
00244         */
00245 
00247         uint64  bytesReceived() const { return _BytesPoppedIn; }
00248 
00250         uint64  newBytesReceived();
00251 
00253         uint64  bytesSent() const { return _BytesPushedOut; }
00254 
00256         uint64  newBytesSent();
00257 
00259         uint32  nbConnections() const { return _NbConnections; }
00260 
00261 protected:
00262 
00263         friend class CServerBufSock;
00264         friend class CListenTask;
00265         friend class CServerReceiveTask;
00266 
00268         bool                            noDelay() const { return _NoDelay; }
00269 
00273         void                            dispatchNewSocket( CServerBufSock *bufsock );
00274 
00276         CServerReceiveTask      *receiveTask( std::vector<NLMISC::IThread*>::iterator ipt )
00277         {
00278                 return ((CServerReceiveTask*)((*ipt)->getRunnable()));
00279         }
00280 
00282         /*void pushBufferToHost( const std::vector<uint8>& buffer, TSockId hostid )
00283         {
00284                 if ( hostid->pushBuffer( buffer ) )
00285                 {
00286                         _BytesPushedOut += buffer.size() + sizeof(TBlockSize); // statistics
00287                 }
00288         }*/
00289 
00290         void pushBufferToHost( const NLMISC::CMemStream& buffer, TSockId hostid )
00291         {
00292                 nlassert( hostid != InvalidSockId );
00293                 if ( hostid->pushBuffer( buffer ) )
00294                 {
00295                         _BytesPushedOut += buffer.length() + sizeof(TBlockSize); // statistics
00296                 }
00297         }
00298 
00299         // Creates a new task and run a new thread for it
00300         void                            addNewThread( CThreadPool& threadpool, CServerBufSock *bufsock );
00301 
00303         TNetCallback            connectionCallback() const { return _ConnectionCallback; }
00304 
00306         void*                           argOfConnectionCallback() const { return _ConnectionCbArg; }
00307 
00308         /*
00309         NLMISC::CSynchronized<uint32>&  syncBytesPushedIn() { return _BytesPushedIn; }
00310         */
00311 
00312 private:
00313 
00315         bool                                                    _NoDelay;
00316 
00318         TThreadStategy                                  _ThreadStrategy;
00319 
00321         uint16                                                  _MaxThreads;
00322 
00324         uint16                                                  _MaxSocketsPerThread;
00325 
00327         CListenTask                                             *_ListenTask;
00328 
00330         NLMISC::IThread                                 *_ListenThread;
00331 
00332         /* Vector of receiving threads.
00333          * Thread: thread control
00334          * Thread->Runnable: access to the CServerReceiveTask object
00335          * Thread->getRunnable()->sock(): access to the socket
00336          * The purpose of this list is to delete the objects after use.
00337          */
00338         NLMISC::CSynchronized<CThreadPool>              _ThreadPool;
00339 
00341         TNetCallback                                    _ConnectionCallback;
00342 
00344         void*                                                   _ConnectionCbArg;
00345 
00347         uint64                                                  _BytesPushedOut;
00348 
00350         uint64                                                  _BytesPoppedIn;
00351 
00353         uint64                                                  _PrevBytesPoppedIn;
00354 
00356         uint64                                                  _PrevBytesPushedOut;
00357 
00359         uint32                                                  _NbConnections;
00360 
00362         bool                                                    _ReplayMode;
00363 
00364   /*
00366         NLMISC::CSynchronized<uint32>   _BytesPushedIn;
00367 
00369         uint32                                                  _PrevBytesPushedIn;
00370         */
00371 };
00372 
00373 
00374 typedef std::set<TSockId>                                       CConnections;
00375 
00376 /*
00377 // Workaround for Internal Compiler Error in debug mode with MSVC6
00378 #ifdef NL_RELEASE
00379 typedef CConnections::iterator                          ItConnection;
00380 #else
00381 typedef TSockId                                                         ItConnection;
00382 #endif
00383 // Set of iterators to connections (set because we must not add an element twice) 
00384 typedef std::set<ItConnection>                  CItConnections;
00385 */
00386 
00387 
00388 #ifdef NL_OS_UNIX
00389 
00390 enum TPipeWay { PipeRead, PipeWrite };
00391 #endif
00392 
00393 
00394 // POLL2
00395 
00396 
00402 class CServerReceiveTask : public NLMISC::IRunnable, public CServerTask
00403 {
00404 public:
00405 
00407         CServerReceiveTask( CBufServer *server ) : CServerTask(), _Server(server), _Connections("CServerReceiveTask::_Connections"), _RemoveSet("CServerReceiveTask::_RemoveSet") {}
00408 
00410         virtual void run();
00411 
00413         uint    numberOfConnections()
00414         {
00415                 uint nb;
00416                 {
00417                         NLMISC::CSynchronized<CConnections>::CAccessor connectionssync( &_Connections );
00418                         nb = connectionssync.value().size();
00419                 }
00420                 return nb;
00421         }
00422 
00424         void    addNewSocket( TSockId sockid )
00425         {
00426                 //nlnettrace( "CServerReceiveTask::addNewSocket" );
00427                 nlassert( sockid != InvalidSockId );
00428                 {
00429                         NLMISC::CSynchronized<CConnections>::CAccessor connectionssync( &_Connections );
00430                         connectionssync.value().insert( sockid );
00431                 }
00432                 // POLL3
00433         }
00434 
00435 // POLL4
00436 
00441         void    addToRemoveSet( TSockId sockid )
00442         {
00443                 nlnettrace( "CServerReceiveTask::addToRemoveSet" );
00444                 nlassert( sockid != InvalidSockId );
00445                 {
00446                         // Three possibilities :
00447                         // - The value is inserted into the set.
00448                         // - The value is already present in the set.
00449                         // - The set is locked by a receive thread which is removing the closed connections.
00450                         //   When the set gets unlocked, it is empty so the value is inserted. It means the
00451                         //   value could be already in the set before it was cleared.
00452                         //   Note: with a fonction such as tryAcquire(), we could avoid to enter the mutex
00453                         //   when it is already locked
00454                         // See clearClosedConnections().
00455                         NLMISC::CSynchronized<CConnections>::CAccessor removesetsync( &_RemoveSet );
00456                         removesetsync.value().insert( sockid );
00457                         //nldebug( "LNETL1: ic: %p - RemoveSet.size(): %d", ic, removesetsync.value().size() );
00458                 }
00459 #ifdef NL_OS_UNIX
00460                 wakeUp();
00461 #endif
00462         }
00463 
00465         void    clearClosedConnections();
00466 
00468         CBufServer      *server()       { return _Server; }
00469 
00470         friend  class CBufServer;
00471 
00472 private:
00473 
00474         CBufServer                                                              *_Server;
00475 
00476         /* List of sockets and send buffer.
00477          * A TSockId is a pointer to a CBufSock object
00478          */
00479         NLMISC::CSynchronized<CConnections>             _Connections;
00480 
00481         // Connections to remove
00482         NLMISC::CSynchronized<CConnections>             _RemoveSet;
00483 
00484         // POLL5
00485 
00486 };
00487 
00488 
00489 } // NLNET
00490 
00491 
00492 #endif // NL_BUF_SERVER_H
00493 
00494 /* End of buf_server.h */