From 6fc1ce0cc06d18c718826b96cf1701dac83eb865 Mon Sep 17 00:00:00 2001 From: wangyi-yd Date: Mon, 27 Jul 2015 11:09:30 +0200 Subject: [PATCH 1/4] refactor writer buffer; fix tests --- log/logstream/kinesis.go | 4 ++ log/logstream/logstream_test.go | 6 ++- log/logstream/stream.go | 79 ++++++++++++++++++++++++++++----- log/logstream/stream_test.go | 54 +++++++++++++++------- 4 files changed, 113 insertions(+), 30 deletions(-) diff --git a/log/logstream/kinesis.go b/log/logstream/kinesis.go index 3755e13..0fdc940 100644 --- a/log/logstream/kinesis.go +++ b/log/logstream/kinesis.go @@ -1,6 +1,7 @@ package logstream import ( + "errors" "strconv" "time" @@ -17,6 +18,9 @@ type Kinesis struct { // Put records into a remote kinesis stream. func (k *Kinesis) Put(records []StreamRecord) (StreamResponse, error) { + if len(records) == 0 { + return nil, errors.New("empty records for kinesis.") + } entries := make([]*kinesis.PutRecordsRequestEntry, len(records)) for i, record := range records { diff --git a/log/logstream/logstream_test.go b/log/logstream/logstream_test.go index 2199488..b03f6e2 100644 --- a/log/logstream/logstream_test.go +++ b/log/logstream/logstream_test.go @@ -30,8 +30,10 @@ func TestLogStreamLog(t *testing.T) { "message": func() interface{} { return "foo" }, } l.Log(fields) + l.Log(fields) + l.Flush() - assert.Equal(t, "now [INFO] foo\n", string(l.writer.buffer[0])) + assert.Equal(t, "now [INFO] foo\nnow [INFO] foo\n", stream.buf.String()) } func TestLogStreamRun(t *testing.T) { @@ -60,7 +62,7 @@ func TestLogStreamRun(t *testing.T) { // data is flushed every 5s time.Sleep(time.Second * 5) - assert.Nil(t, l.writer.buffer) + assert.Equal(t, 0, l.writer.buf.getSize()) assert.Equal(t, "now [INFO] foo\n", stream.buf.String()) // stop diff --git a/log/logstream/stream.go b/log/logstream/stream.go index 6eec175..69e098e 100644 --- a/log/logstream/stream.go +++ b/log/logstream/stream.go @@ -2,6 +2,7 @@ package logstream import ( "bytes" + "errors" "github.com/stretchr/testify/mock" ) @@ -27,20 +28,20 @@ type StreamResponse interface { type StreamWriter struct { stream Stream - buffer []StreamRecord - bufferSize int - maxBufferItems int maxBufferSize int + + buf *recordBuffer } // NewStreamWriter creates a new stream writer. func NewStreamWriter(s Stream) *StreamWriter { return &StreamWriter{ stream: s, - bufferSize: 0, maxBufferItems: 500, maxBufferSize: 1024 * 1024, //1MB + + buf: newRecordBuffer(500), } } @@ -50,13 +51,19 @@ func NewStreamWriter(s Stream) *StreamWriter { func (s *StreamWriter) Write(p []byte) (n int, err error) { n = len(p) - if n > 0 { - s.buffer = append(s.buffer, StreamRecord(p)) - s.bufferSize += n + if s.buf.getItems() >= s.maxBufferItems || s.buf.getSize() >= s.maxBufferSize { + if err = s.Flush(); err != nil { + return 0, err + } } - if s.bufferSize > s.maxBufferSize || len(s.buffer) > s.maxBufferItems { - err = s.Flush() + // Do not just retain or modify p, copy it! + // See:http://golang.org/pkg/io/#Writer + data := make([]byte, n) + copy(data, p) + + if err = s.buf.append(StreamRecord(data)); err != nil { + return 0, err } return @@ -64,15 +71,14 @@ func (s *StreamWriter) Write(p []byte) (n int, err error) { // Flush buffered data into the stream. func (s *StreamWriter) Flush() error { - _, err := s.stream.Put(s.buffer) + _, err := s.stream.Put(s.buf.getRecords()) s.Reset() return err } // Reset the internal fields in s. func (s *StreamWriter) Reset() { - s.buffer = nil - s.bufferSize = 0 + s.buf.reset() } // Close the stream in s. @@ -80,6 +86,55 @@ func (s *StreamWriter) Close() error { return s.stream.Close() } +// recordBuffer uses a pre-allocated, fixed-size slice to buffer StreamRecord. +type recordBuffer struct { + records []StreamRecord + pos int + size int +} + +// newWriterBuffer returns a new initialized recordBuffer. +func newRecordBuffer(maxRecords int) *recordBuffer { + return &recordBuffer{ + records: make([]StreamRecord, maxRecords), + pos: 0, + size: 0, + } +} + +// reset resets the current position and size of records. +func (r *recordBuffer) reset() { + r.pos = 0 + r.size = 0 +} + +// append appends r into the r.records. +func (r *recordBuffer) append(s StreamRecord) error { + if r.pos >= len(r.records) { + return errors.New("reach the end of buffer.") + } + + r.records[r.pos] = s + r.pos++ + r.size += len(s) + return nil +} + +// getSize returns the byte size of r. +func (r *recordBuffer) getSize() int { + return r.size +} + +// getItems returns the number of records in r. +func (r *recordBuffer) getItems() int { + return r.pos +} + +// getRecords returns a new slice of all the stored records in r. +func (r *recordBuffer) getRecords() []StreamRecord { + return r.records[0:r.pos] +} + // StreamResponseMock is a mock for StreamResponse. type StreamResponseMock struct { StreamResponse diff --git a/log/logstream/stream_test.go b/log/logstream/stream_test.go index 7cf9805..b68e14a 100644 --- a/log/logstream/stream_test.go +++ b/log/logstream/stream_test.go @@ -22,7 +22,7 @@ func TestStreamWriterWriteNoError(t *testing.T) { { writer: &StreamWriter{ stream: stream, - bufferSize: 0, + buf: newRecordBuffer(500), maxBufferItems: 2, maxBufferSize: 4, }, @@ -34,11 +34,11 @@ func TestStreamWriterWriteNoError(t *testing.T) { { writer: &StreamWriter{ stream: stream, - bufferSize: 0, + buf: newRecordBuffer(500), maxBufferItems: 2, maxBufferSize: 4, }, - writes: 1, + writes: 2, input: []byte{1, 2, 3, 4, 5}, expected: []byte{1, 2, 3, 4, 5}, }, @@ -46,13 +46,13 @@ func TestStreamWriterWriteNoError(t *testing.T) { { writer: &StreamWriter{ stream: stream, - bufferSize: 0, + buf: newRecordBuffer(500), maxBufferItems: 2, maxBufferSize: 4, }, - writes: 2, - input: []byte{1, 2, 3}, - expected: []byte{1, 2, 3, 1, 2, 3}, + writes: 3, + input: []byte{1, 2}, + expected: []byte{1, 2, 1, 2}, }, } @@ -60,8 +60,7 @@ func TestStreamWriterWriteNoError(t *testing.T) { stream.buf.Reset() for i := 0; i < test.writes; i++ { - n, err := test.writer.Write(test.input) - assert.Equal(t, len(test.input), n) + _, err := test.writer.Write(test.input) assert.NoError(t, err) } @@ -77,19 +76,42 @@ func TestStreamWriterFlushNoError(t *testing.T) { // init writer writer := &StreamWriter{ stream: stream, - buffer: []StreamRecord{StreamRecord([]byte{1, 2})}, + buf: newRecordBuffer(500), } writer.Flush() - assert.Equal(t, []StreamRecord(nil), writer.buffer) - assert.Equal(t, 0, writer.bufferSize) - assert.Equal(t, []byte{1, 2}, stream.buf.Bytes()) + assert.Equal(t, 0, writer.buf.getItems()) + assert.Equal(t, 0, writer.buf.getSize()) + assert.Equal(t, []byte(nil), stream.buf.Bytes()) // write more - writer.buffer = []StreamRecord{StreamRecord([]byte{3})} + writer.Write([]byte{1, 2, 3}) writer.Flush() - assert.Equal(t, []StreamRecord(nil), writer.buffer) - assert.Equal(t, 0, writer.bufferSize) + assert.Equal(t, 0, writer.buf.getItems()) + assert.Equal(t, 0, writer.buf.getSize()) assert.Equal(t, []byte{1, 2, 3}, stream.buf.Bytes()) } + +func BenchmarkStreamWriter(b *testing.B) { + + stream := new(StreamMock) + stream.On("Put", mock.Anything).Return(new(StreamResponseMock), nil) + + w := NewStreamWriter(stream) + p := []byte{ + 1, 2, 3, 4, 5, + 1, 2, 3, 4, 5, + 1, 2, 3, 4, 5, + 1, 2, 3, 4, 5, + } + + for i := 0; i < b.N; i++ { + _, err := w.Write(p) + if err != nil { + b.FailNow() + } + } + + b.ReportAllocs() +} From db34e22b89732c821eb046b8b0b81eb314b91e87 Mon Sep 17 00:00:00 2001 From: wangyi-yd Date: Mon, 27 Jul 2015 11:18:27 +0200 Subject: [PATCH 2/4] fix kinesis contructor --- log/logstream/kinesis.go | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/log/logstream/kinesis.go b/log/logstream/kinesis.go index 0fdc940..08d59b8 100644 --- a/log/logstream/kinesis.go +++ b/log/logstream/kinesis.go @@ -10,10 +10,17 @@ import ( ) // Kinesis implements Stream interface and wraps a kinesis client. -// TODO: need aws.Config. type Kinesis struct { - streamName string - stream kinesis.Kinesis + name string + stream *kinesis.Kinesis +} + +// NewKinesisStream created a new Kinesis stream with given name and config. +func NewKinesisStream(name string, c aws.Config) Stream { + return &Kinesis{ + name: name, + stream: kinesis.New(&c), + } } // Put records into a remote kinesis stream. @@ -32,14 +39,13 @@ func (k *Kinesis) Put(records []StreamRecord) (StreamResponse, error) { params := &kinesis.PutRecordsInput{ Records: entries, - StreamName: aws.String(k.streamName), + StreamName: aws.String(k.name), } return k.stream.PutRecords(params) } // Close. -// TODO: do we close the connection to kinesis? func (k *Kinesis) Close() error { return nil } From fcc5b4a96b53157a5d53360b5f8b87876fe6bafd Mon Sep 17 00:00:00 2001 From: wangyi-yd Date: Mon, 27 Jul 2015 11:50:06 +0200 Subject: [PATCH 3/4] remove assert/mock dependencies; fix tests; --- log/logstream/logstream_test.go | 22 ++++++++----- log/logstream/stream.go | 7 +---- log/logstream/stream_test.go | 55 +++++++++++++++++++++++---------- 3 files changed, 53 insertions(+), 31 deletions(-) diff --git a/log/logstream/logstream_test.go b/log/logstream/logstream_test.go index b03f6e2..1894865 100644 --- a/log/logstream/logstream_test.go +++ b/log/logstream/logstream_test.go @@ -4,19 +4,18 @@ import ( "testing" "time" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" "github.com/yieldr/go-log/log" ) func TestNewLogstream(t *testing.T) { l := New(nil, time.Second, log.BasicFormat, log.BasicFields) - assert.NotNil(t, l) + if l == nil { + t.Fatal("NewLogstream failed") + } } func TestLogStreamLog(t *testing.T) { stream := new(StreamMock) - stream.On("Put", mock.Anything).Return(new(StreamResponseMock), nil) l := &Logstream{ format: log.BasicFormat, @@ -33,12 +32,13 @@ func TestLogStreamLog(t *testing.T) { l.Log(fields) l.Flush() - assert.Equal(t, "now [INFO] foo\nnow [INFO] foo\n", stream.buf.String()) + if stream.buf.String() != "now [INFO] foo\nnow [INFO] foo\n" { + t.Error("Logstream log buffer not matched") + } } func TestLogStreamRun(t *testing.T) { stream := new(StreamMock) - stream.On("Put", mock.Anything).Return(new(StreamResponseMock), nil) l := &Logstream{ interval: time.Second * 3, @@ -62,8 +62,14 @@ func TestLogStreamRun(t *testing.T) { // data is flushed every 5s time.Sleep(time.Second * 5) - assert.Equal(t, 0, l.writer.buf.getSize()) - assert.Equal(t, "now [INFO] foo\n", stream.buf.String()) + + if 0 != l.writer.buf.getSize() { + t.Error("writer buffer size should be 0.") + } + + if "now [INFO] foo\n" != stream.buf.String() { + t.Error("writer buffer content not match") + } // stop l.Stop() diff --git a/log/logstream/stream.go b/log/logstream/stream.go index 69e098e..c192278 100644 --- a/log/logstream/stream.go +++ b/log/logstream/stream.go @@ -3,8 +3,6 @@ package logstream import ( "bytes" "errors" - - "github.com/stretchr/testify/mock" ) // StreamRecord represents a record to be sent to Stream. @@ -138,13 +136,11 @@ func (r *recordBuffer) getRecords() []StreamRecord { // StreamResponseMock is a mock for StreamResponse. type StreamResponseMock struct { StreamResponse - mock.Mock } // StreamMock is a mock for Stream. type StreamMock struct { Stream - mock.Mock buf bytes.Buffer } @@ -155,6 +151,5 @@ func (s *StreamMock) Put(records []StreamRecord) (StreamResponse, error) { s.buf.Write(r) } - args := s.Called(records) - return args.Get(0).(StreamResponse), args.Error(1) + return new(StreamResponseMock), nil } diff --git a/log/logstream/stream_test.go b/log/logstream/stream_test.go index b68e14a..62217fb 100644 --- a/log/logstream/stream_test.go +++ b/log/logstream/stream_test.go @@ -1,16 +1,23 @@ package logstream -import ( - "testing" +import "testing" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" -) +func assertByteSliceEqual(a, b []byte) bool { + if len(a) != len(b) { + return false + } + + for i := 0; i < len(a); i++ { + if a[i] != b[i] { + return false + } + } + return true +} func TestStreamWriterWriteNoError(t *testing.T) { stream := new(StreamMock) - stream.On("Put", mock.Anything).Return(new(StreamResponseMock), nil) tests := []struct { writer *StreamWriter @@ -28,7 +35,7 @@ func TestStreamWriterWriteNoError(t *testing.T) { }, writes: 1, input: []byte{1, 2}, - expected: nil, + expected: []byte{}, }, // buffer size exceeds, flush is triggered { @@ -61,17 +68,20 @@ func TestStreamWriterWriteNoError(t *testing.T) { for i := 0; i < test.writes; i++ { _, err := test.writer.Write(test.input) - assert.NoError(t, err) + if err != nil { + t.Error(err) + } } - assert.Equal(t, test.expected, stream.buf.Bytes()) + if !assertByteSliceEqual(test.expected, stream.buf.Bytes()) { + t.Error("buffer data not match.") + } } } func TestStreamWriterFlushNoError(t *testing.T) { stream := new(StreamMock) - stream.On("Put", mock.Anything).Return(new(StreamResponseMock), nil) // init writer writer := &StreamWriter{ @@ -80,23 +90,34 @@ func TestStreamWriterFlushNoError(t *testing.T) { } writer.Flush() - assert.Equal(t, 0, writer.buf.getItems()) - assert.Equal(t, 0, writer.buf.getSize()) - assert.Equal(t, []byte(nil), stream.buf.Bytes()) + if 0 != writer.buf.getItems() { + t.Error("writer buffer items should be 0.") + } + if 0 != writer.buf.getSize() { + t.Error("writer buffer size should be 0.") + } + if []byte(nil) != stream.buf.Bytes() { + t.Error("writer buffer should be nil.") + } // write more writer.Write([]byte{1, 2, 3}) writer.Flush() - assert.Equal(t, 0, writer.buf.getItems()) - assert.Equal(t, 0, writer.buf.getSize()) - assert.Equal(t, []byte{1, 2, 3}, stream.buf.Bytes()) + if 0 != writer.buf.getItems() { + t.Error("writer buffer items should be 0.") + } + if 0 != writer.buf.getSize() { + t.Error("writer buffer size should be 0.") + } + if !assertByteSliceEqual([]byte{1, 2, 3}, stream.buf.Bytes()) { + t.Error("writer buffer not match,") + } } func BenchmarkStreamWriter(b *testing.B) { stream := new(StreamMock) - stream.On("Put", mock.Anything).Return(new(StreamResponseMock), nil) w := NewStreamWriter(stream) p := []byte{ From 9630af33ae6cf36b03da801829e67e092a4b6bcb Mon Sep 17 00:00:00 2001 From: wangyi-yd Date: Tue, 11 Aug 2015 11:57:37 +0200 Subject: [PATCH 4/4] move kinesis stream to new folder; add Write() method for logstream --- log/logstream/logstream.go | 7 +++++++ log/logstream/{ => stream}/kinesis.go | 10 ++++++---- 2 files changed, 13 insertions(+), 4 deletions(-) rename log/logstream/{ => stream}/kinesis.go (79%) diff --git a/log/logstream/logstream.go b/log/logstream/logstream.go index 29893ce..5afb3e4 100644 --- a/log/logstream/logstream.go +++ b/log/logstream/logstream.go @@ -77,6 +77,13 @@ func (l *Logstream) Log(fields log.Fields) { fmt.Fprintf(l.writer, l.format, vals...) } +// Write writes p to the internal buffer. +func (l *Logstream) Write(p []byte) (int, error) { + l.mux.Lock() + defer l.mux.Unlock() + return l.writer.Write(p) +} + // Run is usually used as a deamon. All the buffered data is flushed periodically // until it is stopped. func (l *Logstream) Run() { diff --git a/log/logstream/kinesis.go b/log/logstream/stream/kinesis.go similarity index 79% rename from log/logstream/kinesis.go rename to log/logstream/stream/kinesis.go index 08d59b8..ee23916 100644 --- a/log/logstream/kinesis.go +++ b/log/logstream/stream/kinesis.go @@ -1,4 +1,4 @@ -package logstream +package stream import ( "errors" @@ -7,6 +7,8 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/kinesis" + + "github.com/yieldr/go-log/log/logstream" ) // Kinesis implements Stream interface and wraps a kinesis client. @@ -15,8 +17,8 @@ type Kinesis struct { stream *kinesis.Kinesis } -// NewKinesisStream created a new Kinesis stream with given name and config. -func NewKinesisStream(name string, c aws.Config) Stream { +// New created a new Kinesis stream with given name and config. +func New(name string, c aws.Config) logstream.Stream { return &Kinesis{ name: name, stream: kinesis.New(&c), @@ -24,7 +26,7 @@ func NewKinesisStream(name string, c aws.Config) Stream { } // Put records into a remote kinesis stream. -func (k *Kinesis) Put(records []StreamRecord) (StreamResponse, error) { +func (k *Kinesis) Put(records []logstream.StreamRecord) (logstream.StreamResponse, error) { if len(records) == 0 { return nil, errors.New("empty records for kinesis.") }