diff --git a/core/decoder.go b/core/decoder.go index f0c83c5..c58c0ba 100644 --- a/core/decoder.go +++ b/core/decoder.go @@ -9,6 +9,8 @@ import ( "fmt" "io" "strconv" + "sync" + "sync/atomic" "time" "github.com/hdt3213/rdb/memprofiler" @@ -18,19 +20,31 @@ import ( // Decoder is an instance of rdb parsing process type Decoder struct { input *bufio.Reader - readCount int + readCount int64 buffer []byte + // to reduce disk IOPS + cache []byte + // Current Read Offset + cachePos atomic.Uint32 + // Effective length of `cache` + cacheLen atomic.Uint32 + // lock for real IO operations + refillMu sync.Mutex withSpecialOpCode bool withSpecialTypes map[string]ModuleTypeHandleFunc } +// cacheSize 1MB +const cacheSize = 1024 * 1024 + // NewDecoder creates a new RDB decoder func NewDecoder(reader io.Reader) *Decoder { parser := new(Decoder) parser.input = bufio.NewReader(reader) parser.buffer = make([]byte, 8) parser.withSpecialTypes = make(map[string]ModuleTypeHandleFunc) + parser.cache = make([]byte, cacheSize) return parser } @@ -54,7 +68,8 @@ const ( ) const ( - opCodeFunction = 245 + opCodeSlotInfo = 244 /* Individual slot info, such as slot id and size (cluster mode only). */ + opCodeFunction = 245 /* function library data */ opCodeModuleAux = 247 /* Module auxiliary data. */ opCodeIdle = 248 /* LRU idle time. */ opCodeFreq = 249 /* LFU frequency. */ @@ -466,6 +481,23 @@ func (dec *Decoder) parse(cb func(object model.RedisObject) bool) error { } } continue + } else if b == opCodeSlotInfo { + // slot id + _, _, err = dec.readLength() + if err != nil { + return err + } + // slot size + _, _, err = dec.readLength() + if err != nil { + return err + } + // expires slot size + _, _, err = dec.readLength() + if err != nil { + return err + } + continue } key, err := dec.readString() if err != nil { @@ -511,6 +543,6 @@ func (dec *Decoder) Parse(cb func(object model.RedisObject) bool) (err error) { return dec.parse(cb) } -func (dec *Decoder) GetReadCount() int { +func (dec *Decoder) GetReadCount() int64 { return dec.readCount } diff --git a/core/utils.go b/core/utils.go index f16af08..c0f8206 100644 --- a/core/utils.go +++ b/core/utils.go @@ -5,6 +5,7 @@ import ( "errors" "io" "math/rand" + "sync/atomic" "unsafe" ) @@ -42,7 +43,8 @@ func readZipListLength(buf []byte, cursor *int) int { return size } -func (dec *Decoder) readByte() (byte, error) { +// original code +/*func (dec *Decoder) readByte() (byte, error) { b, err := dec.input.ReadByte() if err != nil { return 0, err @@ -58,6 +60,92 @@ func (dec *Decoder) readFull(buf []byte) error { } dec.readCount += n return nil +}*/ + +// readByte get 1 Byte from Decoder.cache +func (dec *Decoder) readByte() (byte, error) { + for { + pos := dec.cachePos.Load() + length := dec.cacheLen.Load() + + if pos >= length { + dec.refillMu.Lock() + if dec.cachePos.Load() < dec.cacheLen.Load() { + dec.refillMu.Unlock() + continue + } + err := dec.refillCache() + dec.refillMu.Unlock() + if err != nil { + return 0, err + } + continue + } + + if dec.cachePos.CompareAndSwap(pos, pos+1) { + atomic.AddInt64(&dec.readCount, 1) + return dec.cache[pos], nil + } + } +} + +// readFull get len(buf) Bytes from Decoder.cache +func (dec *Decoder) readFull(buf []byte) error { + need := len(buf) + copied := 0 + + for copied < need { + pos := dec.cachePos.Load() + length := dec.cacheLen.Load() + + if pos >= length { + dec.refillMu.Lock() + if dec.cachePos.Load() < dec.cacheLen.Load() { + dec.refillMu.Unlock() + continue + } + err := dec.refillCache() + dec.refillMu.Unlock() + if err != nil { + if copied > 0 && err == io.EOF { + return io.ErrUnexpectedEOF + } + return err + } + continue + } + + available := int(length - pos) + toCopy := need - copied + if toCopy > available { + toCopy = available + } + + if dec.cachePos.CompareAndSwap(pos, pos+uint32(toCopy)) { + copy(buf[copied:copied+toCopy], dec.cache[pos:pos+uint32(toCopy)]) + copied += toCopy + atomic.AddInt64(&dec.readCount, int64(toCopy)) + } + } + return nil +} + +// refillCache read 1MiB from disk +func (dec *Decoder) refillCache() error { + n, err := dec.input.Read(dec.cache) + + // eof + if n == 0 && err == nil { + return io.EOF + } + + if n > 0 { + dec.cachePos.Store(0) + dec.cacheLen.Store(uint32(n)) + return nil + } + + return err } var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")