From c3ac1c6c4364bc0c2cfea9d626e9b46ae2e68d8b Mon Sep 17 00:00:00 2001 From: Sea Date: Tue, 14 Apr 2026 14:44:59 +0800 Subject: [PATCH] =?UTF-8?q?1.=20=E4=BF=AE=E6=94=B9=E4=BA=86=E5=BA=95?= =?UTF-8?q?=E5=B1=82=E8=AF=BB=E5=8F=96RDB=E6=96=87=E4=BB=B6=E6=97=B6?= =?UTF-8?q?=E7=9A=84=E5=A4=84=E7=90=86=E6=96=B9=E5=BC=8F=EF=BC=88=20core/u?= =?UTF-8?q?tils.go=20=EF=BC=89=EF=BC=9A=20readByte=20=E5=92=8C=20readFull?= =?UTF-8?q?=20=E4=B8=8D=E5=86=8D=E7=9B=B4=E6=8E=A5=E4=BB=8E=E7=A3=81?= =?UTF-8?q?=E7=9B=98=E8=AF=BB=E5=8F=96=E6=95=B0=E6=8D=AE=EF=BC=8C=E8=80=8C?= =?UTF-8?q?=E6=98=AF=E4=BB=8E=E5=86=85=E5=AD=98=EF=BC=88=20Decoder.cache?= =?UTF-8?q?=20=EF=BC=89=E4=B8=AD=E8=8E=B7=E5=8F=96=EF=BC=9B=20Decoder.cach?= =?UTF-8?q?e=20=E6=AF=8F=E6=AC=A1=E5=9B=BA=E5=AE=9A=E4=BB=8E=E7=A3=81?= =?UTF-8?q?=E7=9B=98=E5=8F=96=201MiB=20=E7=9A=84=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=EF=BC=8C=E5=A6=82=E6=9E=9C=E5=B7=B2=E5=8F=96=E5=AE=8C=E5=88=99?= =?UTF-8?q?=E7=BB=A7=E7=BB=AD=E8=AF=BB=E5=8F=96=E4=B8=8B=E4=B8=80=E4=B8=AA?= =?UTF-8?q?=201MiB=20=EF=BC=88=E4=BB=A512GB=E3=80=8130=E4=B8=87key?= =?UTF-8?q?=E7=9A=84rdb=E6=96=87=E4=BB=B6=E6=B5=8B=E8=AF=95=EF=BC=8C?= =?UTF-8?q?=E5=A4=A7=E7=BA=A6=E9=99=8D=E4=BD=8E=E4=BA=8699%=E7=9A=84IOPS?= =?UTF-8?q?=EF=BC=89=202.=20Decoder=20=E4=B8=AD=E5=A2=9E=E5=8A=A0=20opCode?= =?UTF-8?q?SlotInfo=20=E7=B1=BB=E5=9E=8B=EF=BC=9A=E6=84=9F=E8=B0=A2=20sjet?= =?UTF-8?q?47=20https://github.com/HDT3213/rdb/pull/61?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/decoder.go | 38 +++++++++++++++++++-- core/utils.go | 90 ++++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 124 insertions(+), 4 deletions(-) diff --git a/core/decoder.go b/core/decoder.go index f0c83c5..c58c0ba 100644 --- a/core/decoder.go +++ b/core/decoder.go @@ -9,6 +9,8 @@ import ( "fmt" "io" "strconv" + "sync" + "sync/atomic" "time" "github.com/hdt3213/rdb/memprofiler" @@ -18,19 +20,31 @@ import ( // Decoder is an instance of rdb parsing process type Decoder struct { input *bufio.Reader - readCount int + readCount int64 buffer []byte + // to reduce disk IOPS + cache []byte + // Current Read Offset + cachePos atomic.Uint32 + // Effective length of `cache` + cacheLen atomic.Uint32 + // lock for real IO operations + refillMu sync.Mutex withSpecialOpCode bool withSpecialTypes map[string]ModuleTypeHandleFunc } +// cacheSize 1MB +const cacheSize = 1024 * 1024 + // NewDecoder creates a new RDB decoder func NewDecoder(reader io.Reader) *Decoder { parser := new(Decoder) parser.input = bufio.NewReader(reader) parser.buffer = make([]byte, 8) parser.withSpecialTypes = make(map[string]ModuleTypeHandleFunc) + parser.cache = make([]byte, cacheSize) return parser } @@ -54,7 +68,8 @@ const ( ) const ( - opCodeFunction = 245 + opCodeSlotInfo = 244 /* Individual slot info, such as slot id and size (cluster mode only). */ + opCodeFunction = 245 /* function library data */ opCodeModuleAux = 247 /* Module auxiliary data. */ opCodeIdle = 248 /* LRU idle time. */ opCodeFreq = 249 /* LFU frequency. */ @@ -466,6 +481,23 @@ func (dec *Decoder) parse(cb func(object model.RedisObject) bool) error { } } continue + } else if b == opCodeSlotInfo { + // slot id + _, _, err = dec.readLength() + if err != nil { + return err + } + // slot size + _, _, err = dec.readLength() + if err != nil { + return err + } + // expires slot size + _, _, err = dec.readLength() + if err != nil { + return err + } + continue } key, err := dec.readString() if err != nil { @@ -511,6 +543,6 @@ func (dec *Decoder) Parse(cb func(object model.RedisObject) bool) (err error) { return dec.parse(cb) } -func (dec *Decoder) GetReadCount() int { +func (dec *Decoder) GetReadCount() int64 { return dec.readCount } diff --git a/core/utils.go b/core/utils.go index f16af08..c0f8206 100644 --- a/core/utils.go +++ b/core/utils.go @@ -5,6 +5,7 @@ import ( "errors" "io" "math/rand" + "sync/atomic" "unsafe" ) @@ -42,7 +43,8 @@ func readZipListLength(buf []byte, cursor *int) int { return size } -func (dec *Decoder) readByte() (byte, error) { +// original code +/*func (dec *Decoder) readByte() (byte, error) { b, err := dec.input.ReadByte() if err != nil { return 0, err @@ -58,6 +60,92 @@ func (dec *Decoder) readFull(buf []byte) error { } dec.readCount += n return nil +}*/ + +// readByte get 1 Byte from Decoder.cache +func (dec *Decoder) readByte() (byte, error) { + for { + pos := dec.cachePos.Load() + length := dec.cacheLen.Load() + + if pos >= length { + dec.refillMu.Lock() + if dec.cachePos.Load() < dec.cacheLen.Load() { + dec.refillMu.Unlock() + continue + } + err := dec.refillCache() + dec.refillMu.Unlock() + if err != nil { + return 0, err + } + continue + } + + if dec.cachePos.CompareAndSwap(pos, pos+1) { + atomic.AddInt64(&dec.readCount, 1) + return dec.cache[pos], nil + } + } +} + +// readFull get len(buf) Bytes from Decoder.cache +func (dec *Decoder) readFull(buf []byte) error { + need := len(buf) + copied := 0 + + for copied < need { + pos := dec.cachePos.Load() + length := dec.cacheLen.Load() + + if pos >= length { + dec.refillMu.Lock() + if dec.cachePos.Load() < dec.cacheLen.Load() { + dec.refillMu.Unlock() + continue + } + err := dec.refillCache() + dec.refillMu.Unlock() + if err != nil { + if copied > 0 && err == io.EOF { + return io.ErrUnexpectedEOF + } + return err + } + continue + } + + available := int(length - pos) + toCopy := need - copied + if toCopy > available { + toCopy = available + } + + if dec.cachePos.CompareAndSwap(pos, pos+uint32(toCopy)) { + copy(buf[copied:copied+toCopy], dec.cache[pos:pos+uint32(toCopy)]) + copied += toCopy + atomic.AddInt64(&dec.readCount, int64(toCopy)) + } + } + return nil +} + +// refillCache read 1MiB from disk +func (dec *Decoder) refillCache() error { + n, err := dec.input.Read(dec.cache) + + // eof + if n == 0 && err == nil { + return io.EOF + } + + if n > 0 { + dec.cachePos.Store(0) + dec.cacheLen.Store(uint32(n)) + return nil + } + + return err } var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")