diff --git a/README.md b/README.md index 56702ffc..c7f99369 100644 --- a/README.md +++ b/README.md @@ -210,6 +210,5 @@ DRPC is proud to get as much done in as few lines of code as possible. It's the | storj.io/drpc/drpcctx | 41 | | storj.io/drpc/internal/drpcopts | 30 | | storj.io/drpc/drpcstats | 25 | -| storj.io/drpc/drpcdebug | 22 | | storj.io/drpc/drpcenc | 15 | -| **Total** | **3611** | +| **Total** | **3589** | diff --git a/debug_off.go b/debug_off.go new file mode 100644 index 00000000..c47bf0a3 --- /dev/null +++ b/debug_off.go @@ -0,0 +1,11 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +//go:build !drpcdebug + +package drpc + +// DebugEnabled controls whether debug logging is active. When false (the +// default), the compiler eliminates debug log callsites entirely so that +// callbacks passed to log helpers are never allocated or evaluated. +const DebugEnabled = false diff --git a/debug_on.go b/debug_on.go new file mode 100644 index 00000000..e99c00ba --- /dev/null +++ b/debug_on.go @@ -0,0 +1,10 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +//go:build drpcdebug + +package drpc + +// DebugEnabled controls whether debug logging is active. Build with +// -tags=drpcdebug to enable debug log evaluation. +const DebugEnabled = true diff --git a/drpcclient/dialoptions.go b/drpcclient/dialoptions.go index 5054a83e..031dfe36 100644 --- a/drpcclient/dialoptions.go +++ b/drpcclient/dialoptions.go @@ -33,6 +33,9 @@ type dialOptions struct { // tlsConfig is an optional TLS configuration for secure connections. tlsConfig *tls.Config + // logger is used to log errors and operational events on the connection. + logger drpc.Logger + // metrics holds optional metrics the conn will populate. No metrics are // recorded if this is nil. When shouldRecord is set, metrics are recorded // only when shouldRecord returns true. @@ -94,6 +97,13 @@ func WithShouldRecordFunc(shouldRecord func() bool) DialOption { } } +// WithLogger returns a DialOption that sets the Logger for the connection. +func WithLogger(logger drpc.Logger) DialOption { + return func(o *dialOptions) { + o.logger = logger + } +} + // WithContextDialer returns a DialOption that sets a custom dialer function // to be used instead of the default net.Dialer. func WithContextDialer(dialer func(context.Context, string) (net.Conn, error)) DialOption { @@ -160,6 +170,7 @@ func DialContext( }, SoftCancel: false, }, + Logger: options.logger, ShouldRecord: options.shouldRecord, Metrics: *options.metrics, }), nil diff --git a/drpcconn/conn.go b/drpcconn/conn.go index 8e10a714..c5fd14ce 100644 --- a/drpcconn/conn.go +++ b/drpcconn/conn.go @@ -25,6 +25,10 @@ type Options struct { // Manager controls the options we pass to the manager of this conn. Manager drpcmanager.Options + // Logger is used to log errors and operational events. If nil, + // drpc.DefaultLogger is used. + Logger drpc.Logger + // TODO: (server): deprecate this // CollectStats controls whether the client should collect stats on the // rpcs it creates. @@ -77,6 +81,9 @@ func NewWithOptions(tr drpc.Transport, opts Options) *Conn { c.stats = make(map[string]*drpcstats.Stats) } + if opts.Manager.Logger == nil { + opts.Manager.Logger = opts.Logger + } c.man = drpcmanager.NewWithOptions(c.tr, opts.Manager) return c @@ -123,7 +130,9 @@ func (c *Conn) Close() (err error) { return c.man.Close() } // Invoke issues the rpc on the transport serializing in, waits for a response, and // deserializes it into out. Only one Invoke or Stream may be open at a time. -func (c *Conn) Invoke(ctx context.Context, rpc string, enc drpc.Encoding, in, out drpc.Message) (err error) { +func (c *Conn) Invoke( + ctx context.Context, rpc string, enc drpc.Encoding, in, out drpc.Message, +) (err error) { defer func() { err = drpc.ToRPCErr(err) }() var metadata []byte @@ -155,7 +164,14 @@ func (c *Conn) Invoke(ctx context.Context, rpc string, enc drpc.Encoding, in, ou return nil } -func (c *Conn) doInvoke(stream *drpcstream.Stream, enc drpc.Encoding, rpc string, data []byte, metadata []byte, out drpc.Message) (err error) { +func (c *Conn) doInvoke( + stream *drpcstream.Stream, + enc drpc.Encoding, + rpc string, + data []byte, + metadata []byte, + out drpc.Message, +) (err error) { if len(metadata) > 0 { if err := stream.RawWrite(drpcwire.KindInvokeMetadata, metadata); err != nil { return err @@ -178,7 +194,9 @@ func (c *Conn) doInvoke(stream *drpcstream.Stream, enc drpc.Encoding, rpc string // NewStream begins a streaming rpc on the connection. Only one Invoke or Stream may // be open at a time. -func (c *Conn) NewStream(ctx context.Context, rpc string, enc drpc.Encoding) (_ drpc.Stream, err error) { +func (c *Conn) NewStream( + ctx context.Context, rpc string, enc drpc.Encoding, +) (_ drpc.Stream, err error) { defer func() { err = drpc.ToRPCErr(err) }() var metadata []byte diff --git a/drpcconn/conn_test.go b/drpcconn/conn_test.go index 9f74516e..8820de16 100644 --- a/drpcconn/conn_test.go +++ b/drpcconn/conn_test.go @@ -183,6 +183,29 @@ func TestConn_NewStreamSendsGrpcAndDrpcMetadata(t *testing.T) { _ = s.CloseSend() } +func TestConnDefaultLogger(t *testing.T) { + pc, ps := net.Pipe() + defer func() { _ = pc.Close() }() + defer func() { _ = ps.Close() }() + + conn := NewWithOptions(pc, Options{}) + defer func() { _ = conn.Close() }() + // Verify construction with nil Logger does not panic. + _ = conn +} + +func TestConnCustomLogger(t *testing.T) { + pc, ps := net.Pipe() + defer func() { _ = pc.Close() }() + defer func() { _ = ps.Close() }() + + var logger drpc.InMemLogger + conn := NewWithOptions(pc, Options{Logger: &logger}) + defer func() { _ = conn.Close() }() + // Verify construction with custom Logger does not panic. + _ = conn +} + func TestConn_encodeMetadata(t *testing.T) { pc, ps := net.Pipe() defer func() { assert.NoError(t, pc.Close()) }() diff --git a/drpcdebug/README.md b/drpcdebug/README.md deleted file mode 100644 index 4454367a..00000000 --- a/drpcdebug/README.md +++ /dev/null @@ -1,19 +0,0 @@ -# package drpcdebug - -`import "storj.io/drpc/drpcdebug"` - -Package drpcdebug provides helpers for debugging. - -## Usage - -```go -const Enabled = enabled -``` -Enabled is a constant describing if logs are enabled or not. - -#### func Log - -```go -func Log(cb func() (who, what, why string)) -``` -Log executes the callback for a string to log if built with the debug tag. diff --git a/drpcdebug/doc.go b/drpcdebug/doc.go deleted file mode 100644 index 8bf3cad1..00000000 --- a/drpcdebug/doc.go +++ /dev/null @@ -1,8 +0,0 @@ -// Copyright (C) 2019 Storj Labs, Inc. -// See LICENSE for copying information. - -// Package drpcdebug provides helpers for debugging. -package drpcdebug - -// Enabled is a constant describing if logs are enabled or not. -const Enabled = enabled diff --git a/drpcdebug/log_disabled.go b/drpcdebug/log_disabled.go deleted file mode 100644 index 0ff4bfdc..00000000 --- a/drpcdebug/log_disabled.go +++ /dev/null @@ -1,14 +0,0 @@ -// Copyright (C) 2019 Storj Labs, Inc. -// See LICENSE for copying information. - -//go:build !debug -// +build !debug - -package drpcdebug - -// Log executes the callback for a string to log if built with the debug tag. -func Log(cb func() (who, what, why string)) {} - -// this exists to work around a bug in markdown doc generation so that it -// does not generate two entries for Enabled, one set to true and one to false. -const enabled = false diff --git a/drpcdebug/log_enabled.go b/drpcdebug/log_enabled.go deleted file mode 100644 index 6ac2e429..00000000 --- a/drpcdebug/log_enabled.go +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright (C) 2019 Storj Labs, Inc. -// See LICENSE for copying information. - -//go:build debug -// +build debug - -package drpcdebug - -import ( - "fmt" - "log" - "os" - "path/filepath" - "runtime" -) - -var logger = log.New(os.Stderr, "", 0) - -// Log executes the callback for a string to log if built with the debug tag. -func Log(cb func() (who, what, why string)) { - _, file, line, _ := runtime.Caller(1) - where := fmt.Sprintf("%s:%d", filepath.Base(file), line) - who, what, why := cb() - logger.Output(2, fmt.Sprintf("%24s | %-32s | %-6s | %s", - where, who, what, why)) -} - -// this exists to work around a bug in markdown doc generation so that it -// does not generate two entries for Enabled, one set to true and one to false. -const enabled = true diff --git a/drpcmanager/manager.go b/drpcmanager/manager.go index d7308366..4e787819 100644 --- a/drpcmanager/manager.go +++ b/drpcmanager/manager.go @@ -15,9 +15,7 @@ import ( "github.com/zeebo/errs" grpcmetadata "google.golang.org/grpc/metadata" - "storj.io/drpc" - "storj.io/drpc/drpcdebug" "storj.io/drpc/drpcmetadata" "storj.io/drpc/drpcsignal" "storj.io/drpc/drpcstream" @@ -60,6 +58,10 @@ type Options struct { // handling. When enabled, the server stream will decode incoming metadata // into grpc metadata in the context. GRPCMetadataCompatMode bool + + // Logger is used to log operational events. If nil, drpc.DefaultLogger is + // used. + Logger drpc.Logger } // Manager handles the logic of managing a transport for a drpc client or @@ -100,9 +102,13 @@ func New(tr drpc.Transport) *Manager { // NewWithOptions returns a new manager for the transport. It uses the provided // options to manage details of how it uses it. func NewWithOptions(tr drpc.Transport, opts Options) *Manager { + if opts.Logger == nil { + opts.Logger = drpc.DefaultLogger + } + m := &Manager{ tr: tr, - wr: drpcwire.NewWriter(tr, opts.WriterBufferSize), + wr: drpcwire.NewWriterWithLogger(tr, opts.WriterBufferSize, opts.Logger), rd: drpcwire.NewReaderWithOptions(tr, opts.Reader), opts: opts, @@ -135,8 +141,8 @@ func NewWithOptions(tr drpc.Transport, opts Options) *Manager { func (m *Manager) String() string { return fmt.Sprintf("", m) } func (m *Manager) log(what string, cb func() string) { - if drpcdebug.Enabled { - drpcdebug.Log(func() (_, _, _ string) { return m.String(), what, cb() }) + if drpc.DebugEnabled { + m.opts.Logger.Debugf("%s %s %s", m.String(), what, cb()) } } @@ -298,8 +304,11 @@ func (m *Manager) manageReader() { // // newStream creates a stream value with the appropriate configuration for this manager. -func (m *Manager) newStream(ctx context.Context, sid uint64, kind drpc.StreamKind, rpc string) (*drpcstream.Stream, error) { +func (m *Manager) newStream( + ctx context.Context, sid uint64, kind drpc.StreamKind, rpc string, +) (*drpcstream.Stream, error) { opts := m.opts.Stream + opts.Logger = m.opts.Logger drpcopts.SetStreamKind(&opts.Internal, kind) drpcopts.SetStreamRPC(&opts.Internal, rpc) if cb := drpcopts.GetManagerStatsCB(&m.opts.Internal); cb != nil { @@ -425,7 +434,9 @@ func (m *Manager) Close() error { } // NewClientStream starts a stream on the managed transport for use by a client. -func (m *Manager) NewClientStream(ctx context.Context, rpc string) (stream *drpcstream.Stream, err error) { +func (m *Manager) NewClientStream( + ctx context.Context, rpc string, +) (stream *drpcstream.Stream, err error) { if err := m.acquireSemaphore(ctx); err != nil { return nil, err } @@ -436,7 +447,9 @@ func (m *Manager) NewClientStream(ctx context.Context, rpc string) (stream *drpc // NewServerStream starts a stream on the managed transport for use by a server. // It does this by waiting for the client to issue an invoke message and // returning the details. -func (m *Manager) NewServerStream(ctx context.Context) (stream *drpcstream.Stream, rpc string, err error) { +func (m *Manager) NewServerStream( + ctx context.Context, +) (stream *drpcstream.Stream, rpc string, err error) { if err := m.acquireSemaphore(ctx); err != nil { return nil, "", err } diff --git a/drpcmanager/manager_debug_test.go b/drpcmanager/manager_debug_test.go new file mode 100644 index 00000000..8bd56943 --- /dev/null +++ b/drpcmanager/manager_debug_test.go @@ -0,0 +1,66 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +//go:build drpcdebug + +package drpcmanager + +import ( + "context" + "errors" + "io" + "net" + "strings" + "testing" + + "github.com/zeebo/assert" + "storj.io/drpc" + "storj.io/drpc/drpctest" + "storj.io/drpc/drpcwire" +) + +func TestManagerLoggerPropagation(t *testing.T) { + ctx := drpctest.NewTracker(t) + defer ctx.Close() + + cconn, sconn := net.Pipe() + defer func() { _ = cconn.Close() }() + defer func() { _ = sconn.Close() }() + + var logger drpc.InMemLogger + + cman := NewWithOptions(cconn, Options{Logger: &logger}) + defer func() { _ = cman.Close() }() + + sman := New(sconn) + defer func() { _ = sman.Close() }() + + ctx.Run(func(ctx context.Context) { + stream, err := cman.NewClientStream(ctx, "rpc") + assert.NoError(t, err) + defer func() { _ = stream.Close() }() + + assert.NoError(t, stream.RawWrite(drpcwire.KindInvoke, []byte("invoke"))) + assert.NoError(t, stream.RawWrite(drpcwire.KindMessage, []byte("message"))) + assert.NoError(t, stream.RawFlush()) + + assert.NoError(t, stream.Close()) + }) + + ctx.Run(func(ctx context.Context) { + stream, _, err := sman.NewServerStream(ctx) + assert.NoError(t, err) + defer func() { _ = stream.Close() }() + + _, err = stream.RawRecv() + assert.NoError(t, err) + + _, err = stream.RawRecv() + assert.That(t, errors.Is(err, io.EOF)) + }) + + ctx.Wait() + + logs := logger.String() + assert.That(t, strings.Contains(logs, "DEBUG:")) +} diff --git a/drpcmanager/manager_test.go b/drpcmanager/manager_test.go index 5918113d..ff56288a 100644 --- a/drpcmanager/manager_test.go +++ b/drpcmanager/manager_test.go @@ -14,8 +14,8 @@ import ( "github.com/zeebo/assert" grpcmetadata "google.golang.org/grpc/metadata" + "storj.io/drpc" "storj.io/drpc/drpcmetadata" - "storj.io/drpc/drpctest" "storj.io/drpc/drpcwire" ) @@ -295,3 +295,18 @@ func (b *blockedTransport) wait(p int, rw *bool) (int, error) { func (b *blockedTransport) Read(p []byte) (n int, err error) { return b.wait(len(p), &b.ro) } func (b *blockedTransport) Write(p []byte) (n int, err error) { return b.wait(len(p), &b.wo) } func (b *blockedTransport) Close() error { return nil } + +func TestManagerDefaultLogger(t *testing.T) { + tr := make(blockingTransport) + man := New(tr) + defer func() { _ = man.Close() }() + assert.Equal(t, man.opts.Logger, drpc.DefaultLogger) +} + +func TestManagerCustomLogger(t *testing.T) { + tr := make(blockingTransport) + var logger drpc.InMemLogger + man := NewWithOptions(tr, Options{Logger: &logger}) + defer func() { _ = man.Close() }() + assert.Equal(t, man.opts.Logger, &logger) +} diff --git a/drpcpool/pool.go b/drpcpool/pool.go index a7623410..b7f4d2f2 100644 --- a/drpcpool/pool.go +++ b/drpcpool/pool.go @@ -5,12 +5,11 @@ package drpcpool import ( "context" - "fmt" "sync" "time" "github.com/zeebo/errs" - "storj.io/drpc/drpcdebug" + "storj.io/drpc" "storj.io/drpc/drpcmetrics" ) @@ -43,6 +42,10 @@ type Options struct { // Labels holds optional labels to be attached to all metrics. Labels map[string]string + + // Logger is used to log operational events. If nil, drpc.DefaultLogger is + // used. + Logger drpc.Logger } // Pool is a connection pool with key type K. It maintains a cache of connections @@ -58,6 +61,9 @@ type Pool[K comparable, V Conn] struct { // New constructs a new Pool with the provided Options. func New[K comparable, V Conn](opts Options) *Pool[K, V] { + if opts.Logger == nil { + opts.Logger = drpc.DefaultLogger + } pool := Pool[K, V]{ opts: opts, entries: make(map[K]*list[K, V]), @@ -112,8 +118,8 @@ func (p *Pool[K, V]) updatePoolSize() { } func (p *Pool[K, V]) log(what string, cb func() string) { - if drpcdebug.Enabled { - drpcdebug.Log(func() (_, _, _ string) { return fmt.Sprintf("", p), what, cb() }) + if drpc.DebugEnabled { + p.opts.Logger.Debugf(" %s %s", p, what, cb()) } } diff --git a/drpcpool/pool_test.go b/drpcpool/pool_test.go index db5a842a..3c844dec 100644 --- a/drpcpool/pool_test.go +++ b/drpcpool/pool_test.go @@ -646,6 +646,19 @@ func TestPoolMetrics_ShouldRecordDynamic(t *testing.T) { assert.Equal(t, misses.total, 1.0) // unchanged } +func TestPoolDefaultLogger(t *testing.T) { + pool := New[string, Conn](Options{}) + defer func() { _ = pool.Close() }() + assert.Equal(t, pool.opts.Logger, drpc.DefaultLogger) +} + +func TestPoolCustomLogger(t *testing.T) { + var logger drpc.InMemLogger + pool := New[string, Conn](Options{Logger: &logger}) + defer func() { _ = pool.Close() }() + assert.Equal(t, pool.opts.Logger, &logger) +} + func BenchmarkPool(b *testing.B) { ctx := drpctest.NewTracker(b) defer ctx.Close() diff --git a/drpcserver/server.go b/drpcserver/server.go index 75e034ff..fa5ac7a5 100644 --- a/drpcserver/server.go +++ b/drpcserver/server.go @@ -26,10 +26,9 @@ type Options struct { // Manager controls the options we pass to the managers this server creates. Manager drpcmanager.Options - // Log is called when errors happen that can not be returned up, like - // temporary network errors when accepting connections, or errors - // handling individual clients. It is not called if nil. - Log func(error) + // Logger is used to log errors and operational events. If nil, + // drpc.DefaultLogger is used. + Logger drpc.Logger // CollectStats controls whether the server should collect stats on the // rpcs it serves. @@ -80,6 +79,10 @@ func New(handler drpc.Handler) *Server { // NewWithOptions constructs a new Server using the provided options to tune // how the drpc connections are handled. func NewWithOptions(handler drpc.Handler, opts Options) *Server { + if opts.Logger == nil { + opts.Logger = drpc.DefaultLogger + } + // Clone the TLS config so the server owns its copy and the caller cannot // mutate it after construction. if opts.TLSConfig != nil { @@ -161,7 +164,11 @@ func (s *Server) ServeOne(ctx context.Context, tr drpc.Transport) (err error) { } } - man := drpcmanager.NewWithOptions(tr, s.opts.Manager) + manOpts := s.opts.Manager + if manOpts.Logger == nil { + manOpts.Logger = s.opts.Logger + } + man := drpcmanager.NewWithOptions(tr, manOpts) defer func() { err = errs.Combine(err, man.Close()) }() cache := drpccache.New() @@ -206,9 +213,7 @@ func (s *Server) Serve(ctx context.Context, lis net.Listener) (err error) { } if isTemporary(err) { - if s.opts.Log != nil { - s.opts.Log(err) - } + s.opts.Logger.Errorf("temporary accept error: %v", err) t := time.NewTimer(temporarySleep) select { @@ -226,9 +231,8 @@ func (s *Server) Serve(ctx context.Context, lis net.Listener) (err error) { // TODO(jeff): connection limits? tracker.Run(func(ctx context.Context) { - err := s.ServeOne(ctx, conn) - if err != nil && s.opts.Log != nil { - s.opts.Log(err) + if err := s.ServeOne(ctx, conn); err != nil { + s.opts.Logger.Errorf("serving client: %v", err) } }) } diff --git a/drpcserver/server_test.go b/drpcserver/server_test.go index 968d7cea..8f0dc818 100644 --- a/drpcserver/server_test.go +++ b/drpcserver/server_test.go @@ -5,10 +5,11 @@ package drpcserver import ( "net" + "strings" "testing" "github.com/zeebo/assert" - + "storj.io/drpc" "storj.io/drpc/drpctest" ) @@ -35,6 +36,40 @@ func TestServerTemporarySleep(t *testing.T) { assert.NoError(t, New(nil).Serve(ctx, l)) } +func TestServerDefaultLogger(t *testing.T) { + s := New(nil) + assert.Equal(t, s.opts.Logger, drpc.DefaultLogger) +} + +func TestServerCustomLogger(t *testing.T) { + var logger drpc.InMemLogger + s := NewWithOptions(nil, Options{Logger: &logger}) + assert.Equal(t, s.opts.Logger, &logger) +} + +func TestServerLogsTemporaryErrors(t *testing.T) { + ctx := drpctest.NewTracker(t) + defer ctx.Close() + + var logger drpc.InMemLogger + + calls := 0 + l := listener(func() (net.Conn, error) { + calls++ + switch calls { + case 1: + case 2: + ctx.Cancel() + default: + panic("spinning on temporary error") + } + return nil, new(temporaryError) + }) + + assert.NoError(t, NewWithOptions(nil, Options{Logger: &logger}).Serve(ctx, l)) + assert.That(t, strings.Contains(logger.String(), "temporary accept error")) +} + type listener func() (net.Conn, error) func (l listener) Accept() (net.Conn, error) { return l() } diff --git a/drpcstream/stream.go b/drpcstream/stream.go index 29ccd636..5e042d49 100644 --- a/drpcstream/stream.go +++ b/drpcstream/stream.go @@ -11,10 +11,8 @@ import ( "sync" "github.com/zeebo/errs" - "storj.io/drpc" "storj.io/drpc/drpcctx" - "storj.io/drpc/drpcdebug" "storj.io/drpc/drpcenc" "storj.io/drpc/drpcsignal" "storj.io/drpc/drpcwire" @@ -37,6 +35,10 @@ type Options struct { // more allocations. 0 is unlimited. MaximumBufferSize int + // Logger is used to log operational events. If nil, drpc.DefaultLogger is + // used. + Logger drpc.Logger + // Internal contains options that are for internal use only. Internal drpcopts.Stream } @@ -89,6 +91,10 @@ func NewWithOptions(ctx context.Context, sid uint64, wr *drpcwire.Writer, opts O } } + if opts.Logger == nil { + opts.Logger = drpc.DefaultLogger + } + s := &Stream{ ctx: streamCtx{ Context: ctx, @@ -116,11 +122,14 @@ func (s *Stream) String() string { } func (s *Stream) log(what string, cb func() string) { - if drpcdebug.Enabled { - drpcdebug.Log(func() (_, _, _ string) { return s.String(), what, cb() }) - } - if s.task != nil { - trace.Log(&s.ctx, what, cb()) + if drpc.DebugEnabled || s.task != nil { + detail := cb() + if drpc.DebugEnabled { + s.opts.Logger.Debugf("%s %s %s", s.String(), what, detail) + } + if s.task != nil { + trace.Log(&s.ctx, what, detail) + } } } diff --git a/drpcwire/writer.go b/drpcwire/writer.go index fe909cff..402ce15e 100644 --- a/drpcwire/writer.go +++ b/drpcwire/writer.go @@ -9,7 +9,7 @@ import ( "sync" "sync/atomic" - "storj.io/drpc/drpcdebug" + "storj.io/drpc" ) // @@ -18,30 +18,40 @@ import ( // Writer is a helper to buffer and write packets and frames to an io.Writer. type Writer struct { - empty uint32 - w io.Writer - size int - mu sync.Mutex - buf []byte + empty uint32 + w io.Writer + size int + mu sync.Mutex + buf []byte + logger drpc.Logger } // NewWriter returns a Writer that will attempt to buffer size data before // sending it to the io.Writer. func NewWriter(w io.Writer, size int) *Writer { + return NewWriterWithLogger(w, size, drpc.DefaultLogger) +} + +// NewWriterWithLogger returns a Writer that uses the provided Logger. +func NewWriterWithLogger(w io.Writer, size int, logger drpc.Logger) *Writer { if size == 0 { size = 4 * 1024 } + if logger == nil { + logger = drpc.DefaultLogger + } return &Writer{ - w: w, - size: size, - buf: make([]byte, 0, size), + w: w, + size: size, + buf: make([]byte, 0, size), + logger: logger, } } func (b *Writer) log(what string, cb func() string) { - if drpcdebug.Enabled { - drpcdebug.Log(func() (_, _, _ string) { return fmt.Sprintf("", b), what, cb() }) + if drpc.DebugEnabled { + b.logger.Debugf(" %s %s", b, what, cb()) } } diff --git a/logger.go b/logger.go new file mode 100644 index 00000000..902db444 --- /dev/null +++ b/logger.go @@ -0,0 +1,93 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package drpc + +import ( + "bytes" + "fmt" + "log" + "os" + "sync" +) + +// Logger defines an interface for writing log messages. +type Logger interface { + Debugf(format string, args ...interface{}) + Infof(format string, args ...interface{}) + Errorf(format string, args ...interface{}) + Fatalf(format string, args ...interface{}) +} + +type defaultLogger struct{} + +// DefaultLogger logs to the Go stdlib log package. Debugf is a no-op. +var DefaultLogger defaultLogger + +var _ Logger = DefaultLogger + +func (defaultLogger) Debugf(format string, args ...interface{}) {} + +func (defaultLogger) Infof(format string, args ...interface{}) { + _ = log.Output(2, fmt.Sprintf(format, args...)) +} + +func (defaultLogger) Errorf(format string, args ...interface{}) { + _ = log.Output(2, fmt.Sprintf(format, args...)) +} + +func (defaultLogger) Fatalf(format string, args ...interface{}) { + _ = log.Output(2, fmt.Sprintf(format, args...)) + os.Exit(1) +} + +// InMemLogger implements Logger using an in-memory buffer (used for testing). +// The buffer can be read via String() and cleared via Reset(). +type InMemLogger struct { + mu struct { + sync.Mutex + buf bytes.Buffer + } +} + +var _ Logger = (*InMemLogger)(nil) + +// Reset clears the internal buffer. +func (b *InMemLogger) Reset() { + b.mu.Lock() + defer b.mu.Unlock() + b.mu.buf.Reset() +} + +// String returns the current internal buffer. +func (b *InMemLogger) String() string { + b.mu.Lock() + defer b.mu.Unlock() + return b.mu.buf.String() +} + +func (b *InMemLogger) writef(prefix, format string, args ...interface{}) { + s := fmt.Sprintf(prefix+format, args...) + b.mu.Lock() + defer b.mu.Unlock() + b.mu.buf.WriteString(s) + if len(s) == 0 || s[len(s)-1] != '\n' { + b.mu.buf.WriteByte('\n') + } +} + +func (b *InMemLogger) Debugf(format string, args ...interface{}) { + b.writef("DEBUG: ", format, args...) +} + +func (b *InMemLogger) Infof(format string, args ...interface{}) { + b.writef("INFO: ", format, args...) +} + +func (b *InMemLogger) Errorf(format string, args ...interface{}) { + b.writef("ERROR: ", format, args...) +} + +func (b *InMemLogger) Fatalf(format string, args ...interface{}) { + b.writef("FATAL: ", format, args...) +} diff --git a/logger_test.go b/logger_test.go new file mode 100644 index 00000000..a2f3ed6f --- /dev/null +++ b/logger_test.go @@ -0,0 +1,72 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package drpc + +import ( + "strings" + "testing" + + "github.com/zeebo/assert" +) + +func TestDefaultLogger_ImplementsInterface(t *testing.T) { + var _ Logger = DefaultLogger +} + +func TestDefaultLogger_DebugfIsNoop(t *testing.T) { + // Debugf should not panic or produce output. + DefaultLogger.Debugf("this should be a no-op: %d", 42) +} + +func TestInMemLogger_ImplementsInterface(t *testing.T) { + var _ Logger = (*InMemLogger)(nil) +} + +func TestInMemLogger_CapturesAllLevels(t *testing.T) { + var l InMemLogger + + l.Debugf("debug %d", 1) + l.Infof("info %d", 2) + l.Errorf("error %d", 3) + l.Fatalf("fatal %d", 4) + + got := l.String() + assert.That(t, strings.Contains(got, "DEBUG: debug 1")) + assert.That(t, strings.Contains(got, "INFO: info 2")) + assert.That(t, strings.Contains(got, "ERROR: error 3")) + assert.That(t, strings.Contains(got, "FATAL: fatal 4")) +} + +func TestInMemLogger_AppendsNewline(t *testing.T) { + var l InMemLogger + + l.Infof("no newline") + l.Infof("also no newline") + + lines := strings.Split(strings.TrimSpace(l.String()), "\n") + assert.Equal(t, len(lines), 2) +} + +func TestInMemLogger_PreservesExistingNewline(t *testing.T) { + var l InMemLogger + + l.Infof("has newline\n") + l.Infof("another") + + lines := strings.Split(strings.TrimSpace(l.String()), "\n") + assert.Equal(t, len(lines), 2) +} + +func TestInMemLogger_Reset(t *testing.T) { + var l InMemLogger + + l.Infof("before reset") + assert.That(t, l.String() != "") + + l.Reset() + assert.Equal(t, l.String(), "") + + l.Infof("after reset") + assert.That(t, strings.Contains(l.String(), "after reset")) +}