//@C Copyright Notice //@C ================ //@C This file is part of CPSW. It is subject to the license terms in the LICENSE.txt //@C file found in the top-level directory of this distribution and at //@C https://confluence.slac.stanford.edu/display/ppareg/LICENSE.html. //@C //@C No part of CPSW, including this file, may be copied, modified, propagated, or //@C distributed except according to the terms contained in the LICENSE.txt file. #ifndef CPSW_RSSI_H #define CPSW_RSSI_H #include #include #include #include #include #include #include #include #include using cpsw::atomic; #define RSSI_DEBUG 0 // level #include class IRxEventHandler : public IEventHandler { protected: virtual void handleRxEvent(IIntEventSource*) = 0; virtual void handle(IIntEventSource *s) { handleRxEvent(s); } }; class IUsrInputEventHandler : public IEventHandler { protected: virtual void handleUsrInputEvent(IIntEventSource*) = 0; virtual void handle(IIntEventSource *s) { handleUsrInputEvent(s); } }; class IUsrOutputEventHandler : public IEventHandler { protected: virtual void handleUsrOutputEvent(IIntEventSource*) = 0; virtual void handle(IIntEventSource *s) { handleUsrOutputEvent(s); } }; class IRexTimer : public RssiTimer { protected: virtual void processRetransmissionTimeout() = 0; public: IRexTimer(RssiTimerList *l) : RssiTimer("REX", l) {} virtual void process() { processRetransmissionTimeout(); } }; class IAckTimer : public RssiTimer { protected: virtual void processAckTimeout() = 0; public: IAckTimer(RssiTimerList *l) : RssiTimer("ACK", l) {} virtual void process() { processAckTimeout(); } }; class INulTimer : public RssiTimer { protected: virtual void processNulTimeout() = 0; public: INulTimer(RssiTimerList *l) : RssiTimer("NUL", l) {} virtual void process() { processNulTimeout(); } }; // posts events when the state changes class CConnectionStateChangedEventSource : public CIntEventSource { public: static const int CONN_STATE_OPEN = 10; // could support other states static const int CONN_STATE_CLOSED = -1; }; // posts events when the state is changed to // the defined target state (currently: open/closed) class CConnectionStateEventSource : public IIntEventSource { protected: atomic connState_; public: CConnectionStateEventSource(); virtual void connectionStateChanged(int); }; class CConnectionOpenEventSource : public CConnectionStateEventSource { public: CConnectionOpenEventSource(); protected: virtual bool checkForEvent(); }; class CConnectionNotOpenEventSource : public CConnectionStateEventSource { public: CConnectionNotOpenEventSource(); protected: virtual bool checkForEvent(); }; class CConnectionClosedEventSource : public CConnectionStateEventSource { public: CConnectionClosedEventSource(); protected: virtual bool checkForEvent(); }; // Obtain the max. segment size to advertise to the peer // The return value of 'getRxMTU' is the max. size the // system can accept **including** any RSSI headers class IMTUQuerier { public: virtual int getRxMTU() = 0; virtual ~IMTUQuerier() {} }; struct CRssiConfigParams { /* Default values; these can be overridden by YAML and ultimately * (some of them) are negotiated with the peer (as per RUDP spec). */ static const uint8_t LD_MAX_UNACKED_SEGS_DFLT = 4; static const unsigned QUEUE_DEPTH_DFLT = 0; static const unsigned UNIT_US_DFLT = 1000; static const uint64_t REX_TIMEOUT_US_DFLT = 100*(uint64_t)UNIT_US_DFLT; // ms static const uint64_t CAK_TIMEOUT_US_DFLT = 50*(uint64_t)UNIT_US_DFLT; // ms; < rex timeout static const uint64_t NUL_TIMEOUT_US_DFLT = 3000*(uint64_t)UNIT_US_DFLT; // ms static const uint8_t REX_MAX_DFLT = 15; static const uint8_t CAK_MAX_DFLT = 5; static const unsigned SGS_MAX_DFLT = 0; static const unsigned UNIT_US_EXP_DFLT = 3; // value used by server; must match UNIT_US (i.e., UNIT_US = 10^-UNIT_US_EXP) uint8_t ldMaxUnackedSegs_; unsigned outQueueDepth_; unsigned inpQueueDepth_; uint64_t rexTimeoutUS_; uint64_t cumAckTimeoutUS_; uint64_t nulTimeoutUS_; uint8_t rexMax_; uint8_t cumAckMax_; unsigned forcedSegsMax_; CRssiConfigParams( uint8_t ldMaxUnackedSegs = LD_MAX_UNACKED_SEGS_DFLT, unsigned outQueueDepth = QUEUE_DEPTH_DFLT, unsigned inpQueueDepth = QUEUE_DEPTH_DFLT, uint64_t rexTimeoutUS = REX_TIMEOUT_US_DFLT, uint64_t cumAckTimeoutUS = CAK_TIMEOUT_US_DFLT, uint64_t nulTimeoutUS = NUL_TIMEOUT_US_DFLT, uint8_t rexMax = REX_MAX_DFLT, uint8_t cumAckMax = CAK_MAX_DFLT, unsigned forcedSegsMax = SGS_MAX_DFLT ) : ldMaxUnackedSegs_( ldMaxUnackedSegs ), outQueueDepth_ ( outQueueDepth ), inpQueueDepth_ ( inpQueueDepth ), rexTimeoutUS_ ( rexTimeoutUS ), cumAckTimeoutUS_ ( cumAckTimeoutUS ), nulTimeoutUS_ ( nulTimeoutUS ), rexMax_ ( rexMax ), cumAckMax_ ( cumAckMax ), forcedSegsMax_ ( forcedSegsMax ) {} const CRssiConfigParams & assertValid() const; }; class CRssi : public CRunnable, public IRxEventHandler, public IUsrInputEventHandler, public IUsrOutputEventHandler, public IRexTimer, public IAckTimer, public INulTimer, public CConnectionStateChangedEventSource, public CConnectionOpenEventSource, public CConnectionNotOpenEventSource, public CConnectionClosedEventSource { public: /* Default values; these can be overridden by YAML and ultimately * (some of them) are negotiated with the peer (as per RUDP spec). * Duplicate constants here exist for legacy reasons. */ static const unsigned UNIT_US = CRssiConfigParams::UNIT_US_DFLT; static const unsigned UNIT_US_EXP = CRssiConfigParams::UNIT_US_EXP_DFLT; static const uint8_t LD_MAX_UNACKED_SEGS = CRssiConfigParams::LD_MAX_UNACKED_SEGS_DFLT; /* * Legacy timeout constants are in 'units' */ static const uint16_t RETRANSMIT_TIMEO = CRssiConfigParams::REX_TIMEOUT_US_DFLT/UNIT_US; static const uint16_t CUMLTD_ACK_TIMEO = CRssiConfigParams::CAK_TIMEOUT_US_DFLT/UNIT_US; static const uint16_t NUL_SEGMEN_TIMEO = CRssiConfigParams::NUL_TIMEOUT_US_DFLT/UNIT_US; static const uint8_t MAX_RETRANSMIT_N = CRssiConfigParams::REX_MAX_DFLT; static const uint8_t MAX_CUMLTD_ACK_N = CRssiConfigParams::CAK_MAX_DFLT; static const uint16_t MAX_SEGMENT_SIZE = 1500 - 20 - 8 - 8; // - IP - UDP - RSSI private: CRssiConfigParams defaults_; bool isServer_; const char *name_; EventSet eventSet_; IMTUQuerier *mtuQuerier_; protected: BufQueue outQ_; BufQueue inpQ_; public: // Note: the max. segment size can be set by providing an // appropriate MTU querier. CRssi(bool isServer, int threadPrio = DFLT_PRIORITY, IMTUQuerier *mtuQuerier = 0, const CRssiConfigParams *defaults = 0 ); virtual ~CRssi(); const char *getName() { return name_; } virtual void dumpStats(FILE *); protected: typedef uint8_t SeqNo; class BufBase { protected: typedef unsigned BufIdx; unsigned capa_; std::vector buf_; BufIdx rp_; BufIdx msk_; SeqNo oldest_; public: BufBase(unsigned ldsz) : buf_( (capa_= (1<rp_) {} iterator & operator++() { if ( idx_ != rb_->wp_ ) idx_++; return *this; } BufChain operator*() { if ( idx_ == rb_->wp_ ) return BufChain(); return rb_->buf_.at( (idx_ & rb_->msk_) ); } }; iterator begin() { return iterator(this); } unsigned getSize() { return (unsigned)(wp_ - rp_); } int ack(SeqNo ackNo) { SeqNo cumAck = ackNo - oldest_ + 1; unsigned i; if ( cumAck > getSize() ) { // an error -- misformed packet? // the caller should discard... return -1; } for ( i=0; i capa_ ) { // should never happen since we must // not have more than ossMX outstanding // transmits throw InternalError("RingBuf Overflow"); } buf_[ widx ] = b; wp_++; } void resize(unsigned new_capa) { unsigned i; unsigned sz = getSize(); std::vector tmp( sz ); SeqNo tmpo = getOldest(); for ( i=0; itv_ ) ) throw InternalError("clock_gettime failed"); } virtual void forceACK() { numCak_ = cakMX_; } virtual void setNulTimeout(uint64_t us); virtual uint64_t getNulTimeout(); virtual void * threadBody(); virtual void changeState(STATE *newState); virtual bool threadStop(); }; #endif