package binarylog
import (
binlogpb
)
var (
DefaultSink Sink = &noopSink{}
)
type Sink interface {
Write(*binlogpb.GrpcLogEntry) error
Close() error
}
type noopSink struct{}
func ( *noopSink) (*binlogpb.GrpcLogEntry) error { return nil }
func ( *noopSink) () error { return nil }
func ( io.Writer) Sink {
return &writerSink{out: }
}
type writerSink struct {
out io.Writer
}
func ( *writerSink) ( *binlogpb.GrpcLogEntry) error {
, := proto.Marshal()
if != nil {
grpclogLogger.Errorf("binary logging: failed to marshal proto message: %v", )
return
}
:= make([]byte, 4)
binary.BigEndian.PutUint32(, uint32(len()))
if , := .out.Write(); != nil {
return
}
if , := .out.Write(); != nil {
return
}
return nil
}
func ( *writerSink) () error { return nil }
type bufferedSink struct {
mu sync.Mutex
closer io.Closer
out Sink
buf *bufio.Writer
flusherStarted bool
writeTicker *time.Ticker
done chan struct{}
}
func ( *bufferedSink) ( *binlogpb.GrpcLogEntry) error {
.mu.Lock()
defer .mu.Unlock()
if !.flusherStarted {
.startFlushGoroutine()
.flusherStarted = true
}
if := .out.Write(); != nil {
return
}
return nil
}
const (
bufFlushDuration = 60 * time.Second
)
func ( *bufferedSink) () {
.writeTicker = time.NewTicker(bufFlushDuration)
go func() {
for {
select {
case <-.done:
return
case <-.writeTicker.C:
}
.mu.Lock()
if := .buf.Flush(); != nil {
grpclogLogger.Warningf("failed to flush to Sink: %v", )
}
.mu.Unlock()
}
}()
}
func ( *bufferedSink) () error {
.mu.Lock()
defer .mu.Unlock()
if .writeTicker != nil {
.writeTicker.Stop()
}
close(.done)
if := .buf.Flush(); != nil {
grpclogLogger.Warningf("failed to flush to Sink: %v", )
}
if := .closer.Close(); != nil {
grpclogLogger.Warningf("failed to close the underlying WriterCloser: %v", )
}
if := .out.Close(); != nil {
grpclogLogger.Warningf("failed to close the Sink: %v", )
}
return nil
}
func ( io.WriteCloser) Sink {
:= bufio.NewWriter()
return &bufferedSink{
closer: ,
out: newWriterSink(),
buf: ,
done: make(chan struct{}),
}
}