00001
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026 #ifndef NL_BUF_SERVER_H
00027 #define NL_BUF_SERVER_H
00028
00029 #include "nel/misc/types_nl.h"
00030 #include "nel/net/buf_net_base.h"
00031 #include "nel/net/listen_sock.h"
00032 #include "nel/net/buf_sock.h"
00033 #include <list>
00034 #include <set>
00035
00036
00037
00038 namespace NLNET {
00039
00040
00041 class CInetAddress;
00042 class CBufServer;
00043
00044
00048 class CServerTask
00049 {
00050 public:
00051
00053 virtual ~CServerTask();
00054
00056 void requireExit() { _ExitRequired = true; }
00057
00058 #ifdef NL_OS_UNIX
00059
00060 void wakeUp();
00061 #endif
00062
00063 protected:
00064
00066 CServerTask();
00067
00069 bool exitRequired() const { return _ExitRequired; }
00070
00071 #ifdef NL_OS_UNIX
00072
00073 int _WakeUpPipeHandle [2];
00074 #endif
00075
00076 private:
00077
00078 volatile bool _ExitRequired;
00079 };
00080
00081
00085 class CListenTask : public NLMISC::IRunnable, public CServerTask
00086 {
00087 public:
00088
00090 CListenTask( CBufServer *server ) : CServerTask(), _Server(server) {}
00091
00093 void init( uint16 port );
00094
00096 virtual void run();
00097
00099 void close() { _ListenSock.close(); }
00100
00102 const CInetAddress& localAddr() { return _ListenSock.localAddr(); }
00103
00104 private:
00105
00106 CBufServer *_Server;
00107 CListenSock _ListenSock;
00108
00109 };
00110
00111
00112 typedef std::vector<NLMISC::IThread*> CThreadPool;
00113
00114
00115
00116 #undef PRESET_BIG_SERVER
00117
00118 #ifdef PRESET_BIG_SERVER
00119
00120 #define DEFAULT_STRATEGY SpreadSockets
00121 #define DEFAULT_MAX_THREADS 64
00122 #define DEFAULT_MAX_SOCKETS_PER_THREADS 64
00123 #else
00124
00125 #define DEFAULT_STRATEGY FillThreads
00126 #define DEFAULT_MAX_THREADS 64
00127 #define DEFAULT_MAX_SOCKETS_PER_THREADS 16
00128 #endif
00129
00130
00153 class CBufServer : public CBufNetBase
00154 {
00155 public:
00156
00157 enum TThreadStategy { SpreadSockets, FillThreads };
00158
00162 CBufServer( TThreadStategy strategy=DEFAULT_STRATEGY,
00163 uint16 max_threads=DEFAULT_MAX_THREADS,
00164 uint16 max_sockets_per_thread=DEFAULT_MAX_SOCKETS_PER_THREADS,
00165 bool nodelay=true, bool replaymode=false );
00166
00168 virtual ~CBufServer();
00169
00171 void init( uint16 port );
00172
00178 void disconnect( TSockId hostid, bool quick=false );
00179
00181 void setConnectionCallback( TNetCallback cb, void* arg ) { _ConnectionCallback = cb; _ConnectionCbArg = arg; }
00182
00183
00184
00187
00188 void send( const NLMISC::CMemStream& buffer, TSockId hostid );
00189
00193 bool dataAvailable();
00194
00198
00199 void receive( NLMISC::CMemStream& buffer, TSockId* hostid );
00200
00202 void update();
00203
00204
00205
00206
00207
00208 uint32 getSendQueueSize( TSockId destid );
00209
00213 void setTimeFlushTrigger( TSockId destid, sint32 ms ) { nlassert( destid != InvalidSockId ); destid->setTimeFlushTrigger( ms ); }
00214
00218 void setSizeFlushTrigger( TSockId destid, sint32 size ) { nlassert( destid != InvalidSockId ); destid->setSizeFlushTrigger( size ); }
00219
00224 bool flush( TSockId destid ) { nlassert( destid != InvalidSockId ); return destid->flush(); }
00225
00226
00227
00228
00230 const CInetAddress& listenAddress() const { return _ListenTask->localAddr(); }
00231
00233 const CInetAddress& hostAddress( TSockId hostid ) { nlassert( hostid != InvalidSockId ); return hostid->Sock->remoteAddr(); }
00234
00235
00237
00238
00239
00240
00241
00243
00244
00245
00247 uint64 bytesReceived() const { return _BytesPoppedIn; }
00248
00250 uint64 newBytesReceived();
00251
00253 uint64 bytesSent() const { return _BytesPushedOut; }
00254
00256 uint64 newBytesSent();
00257
00259 uint32 nbConnections() const { return _NbConnections; }
00260
00261 protected:
00262
00263 friend class CServerBufSock;
00264 friend class CListenTask;
00265 friend class CServerReceiveTask;
00266
00268 bool noDelay() const { return _NoDelay; }
00269
00273 void dispatchNewSocket( CServerBufSock *bufsock );
00274
00276 CServerReceiveTask *receiveTask( std::vector<NLMISC::IThread*>::iterator ipt )
00277 {
00278 return ((CServerReceiveTask*)((*ipt)->getRunnable()));
00279 }
00280
00282
00283
00284
00285
00286
00287
00288
00289
00290 void pushBufferToHost( const NLMISC::CMemStream& buffer, TSockId hostid )
00291 {
00292 nlassert( hostid != InvalidSockId );
00293 if ( hostid->pushBuffer( buffer ) )
00294 {
00295 _BytesPushedOut += buffer.length() + sizeof(TBlockSize);
00296 }
00297 }
00298
00299
00300 void addNewThread( CThreadPool& threadpool, CServerBufSock *bufsock );
00301
00303 TNetCallback connectionCallback() const { return _ConnectionCallback; }
00304
00306 void* argOfConnectionCallback() const { return _ConnectionCbArg; }
00307
00308
00309
00310
00311
00312 private:
00313
00315 bool _NoDelay;
00316
00318 TThreadStategy _ThreadStrategy;
00319
00321 uint16 _MaxThreads;
00322
00324 uint16 _MaxSocketsPerThread;
00325
00327 CListenTask *_ListenTask;
00328
00330 NLMISC::IThread *_ListenThread;
00331
00332
00333
00334
00335
00336
00337
00338 NLMISC::CSynchronized<CThreadPool> _ThreadPool;
00339
00341 TNetCallback _ConnectionCallback;
00342
00344 void* _ConnectionCbArg;
00345
00347 uint64 _BytesPushedOut;
00348
00350 uint64 _BytesPoppedIn;
00351
00353 uint64 _PrevBytesPoppedIn;
00354
00356 uint64 _PrevBytesPushedOut;
00357
00359 uint32 _NbConnections;
00360
00362 bool _ReplayMode;
00363
00364
00366
00367
00369
00370
00371 };
00372
00373
00374 typedef std::set<TSockId> CConnections;
00375
00376
00377
00378
00379
00380
00381
00382
00383
00384
00385
00386
00387
00388 #ifdef NL_OS_UNIX
00389
00390 enum TPipeWay { PipeRead, PipeWrite };
00391 #endif
00392
00393
00394
00395
00396
00402 class CServerReceiveTask : public NLMISC::IRunnable, public CServerTask
00403 {
00404 public:
00405
00407 CServerReceiveTask( CBufServer *server ) : CServerTask(), _Server(server), _Connections("CServerReceiveTask::_Connections"), _RemoveSet("CServerReceiveTask::_RemoveSet") {}
00408
00410 virtual void run();
00411
00413 uint numberOfConnections()
00414 {
00415 uint nb;
00416 {
00417 NLMISC::CSynchronized<CConnections>::CAccessor connectionssync( &_Connections );
00418 nb = connectionssync.value().size();
00419 }
00420 return nb;
00421 }
00422
00424 void addNewSocket( TSockId sockid )
00425 {
00426
00427 nlassert( sockid != InvalidSockId );
00428 {
00429 NLMISC::CSynchronized<CConnections>::CAccessor connectionssync( &_Connections );
00430 connectionssync.value().insert( sockid );
00431 }
00432
00433 }
00434
00435
00436
00441 void addToRemoveSet( TSockId sockid )
00442 {
00443 nlnettrace( "CServerReceiveTask::addToRemoveSet" );
00444 nlassert( sockid != InvalidSockId );
00445 {
00446
00447
00448
00449
00450
00451
00452
00453
00454
00455 NLMISC::CSynchronized<CConnections>::CAccessor removesetsync( &_RemoveSet );
00456 removesetsync.value().insert( sockid );
00457
00458 }
00459 #ifdef NL_OS_UNIX
00460 wakeUp();
00461 #endif
00462 }
00463
00465 void clearClosedConnections();
00466
00468 CBufServer *server() { return _Server; }
00469
00470 friend class CBufServer;
00471
00472 private:
00473
00474 CBufServer *_Server;
00475
00476
00477
00478
00479 NLMISC::CSynchronized<CConnections> _Connections;
00480
00481
00482 NLMISC::CSynchronized<CConnections> _RemoveSet;
00483
00484
00485
00486 };
00487
00488
00489 }
00490
00491
00492 #endif // NL_BUF_SERVER_H
00493
00494