// The `fwd` package provides a buffered reader // and writer. Each has methods that help improve // the encoding/decoding performance of some binary // protocols. // // The `fwd.Writer` and `fwd.Reader` type provide similar // functionality to their counterparts in `bufio`, plus // a few extra utility methods that simplify read-ahead // and write-ahead. I wrote this package to improve serialization // performance for http://github.com/tinylib/msgp, // where it provided about a 2x speedup over `bufio` for certain // workloads. However, care must be taken to understand the semantics of the // extra methods provided by this package, as they allow // the user to access and manipulate the buffer memory // directly. // // The extra methods for `fwd.Reader` are `Peek`, `Skip` // and `Next`. `(*fwd.Reader).Peek`, unlike `(*bufio.Reader).Peek`, // will re-allocate the read buffer in order to accommodate arbitrarily // large read-ahead. `(*fwd.Reader).Skip` skips the next `n` bytes // in the stream, and uses the `io.Seeker` interface if the underlying // stream implements it. `(*fwd.Reader).Next` returns a slice pointing // to the next `n` bytes in the read buffer (like `Peek`), but also // increments the read position. This allows users to process streams // in arbitrary block sizes without having to manage appropriately-sized // slices. Additionally, obviating the need to copy the data from the // buffer to another location in memory can improve performance dramatically // in CPU-bound applications. // // `fwd.Writer` only has one extra method, which is `(*fwd.Writer).Next`, which // returns a slice pointing to the next `n` bytes of the writer, and increments // the write position by the length of the returned slice. This allows users // to write directly to the end of the buffer. // package fwd import ( "io" "os" ) const ( // DefaultReaderSize is the default size of the read buffer DefaultReaderSize = 2048 // minimum read buffer; straight from bufio minReaderSize = 16 ) // NewReader returns a new *Reader that reads from 'r' func NewReader(r io.Reader) *Reader { return NewReaderSize(r, DefaultReaderSize) } // NewReaderSize returns a new *Reader that // reads from 'r' and has a buffer size 'n'. func NewReaderSize(r io.Reader, n int) *Reader { buf := make([]byte, 0, max(n, minReaderSize)) return NewReaderBuf(r, buf) } // NewReaderBuf returns a new *Reader that // reads from 'r' and uses 'buf' as a buffer. // 'buf' is not used when has smaller capacity than 16, // custom buffer is allocated instead. func NewReaderBuf(r io.Reader, buf []byte) *Reader { if cap(buf) < minReaderSize { buf = make([]byte, 0, minReaderSize) } buf = buf[:0] rd := &Reader{ r: r, data: buf, } if s, ok := r.(io.Seeker); ok { rd.rs = s } return rd } // Reader is a buffered look-ahead reader type Reader struct { r io.Reader // underlying reader // data[n:len(data)] is buffered data; data[len(data):cap(data)] is free buffer space data []byte // data n int // read offset state error // last read error // if the reader past to NewReader was // also an io.Seeker, this is non-nil rs io.Seeker } // Reset resets the underlying reader // and the read buffer. func (r *Reader) Reset(rd io.Reader) { r.r = rd r.data = r.data[0:0] r.n = 0 r.state = nil if s, ok := rd.(io.Seeker); ok { r.rs = s } else { r.rs = nil } } // more() does one read on the underlying reader func (r *Reader) more() { // move data backwards so that // the read offset is 0; this way // we can supply the maximum number of // bytes to the reader if r.n != 0 { if r.n < len(r.data) { r.data = r.data[:copy(r.data[0:], r.data[r.n:])] } else { r.data = r.data[:0] } r.n = 0 } var a int a, r.state = r.r.Read(r.data[len(r.data):cap(r.data)]) if a == 0 && r.state == nil { r.state = io.ErrNoProgress return } else if a > 0 && r.state == io.EOF { // discard the io.EOF if we read more than 0 bytes. // the next call to Read should return io.EOF again. r.state = nil } r.data = r.data[:len(r.data)+a] } // pop error func (r *Reader) err() (e error) { e, r.state = r.state, nil return } // pop error; EOF -> io.ErrUnexpectedEOF func (r *Reader) noEOF() (e error) { e, r.state = r.state, nil if e == io.EOF { e = io.ErrUnexpectedEOF } return } // buffered bytes func (r *Reader) buffered() int { return len(r.data) - r.n } // Buffered returns the number of bytes currently in the buffer func (r *Reader) Buffered() int { return len(r.data) - r.n } // BufferSize returns the total size of the buffer func (r *Reader) BufferSize() int { return cap(r.data) } // Peek returns the next 'n' buffered bytes, // reading from the underlying reader if necessary. // It will only return a slice shorter than 'n' bytes // if it also returns an error. Peek does not advance // the reader. EOF errors are *not* returned as // io.ErrUnexpectedEOF. func (r *Reader) Peek(n int) ([]byte, error) { // in the degenerate case, // we may need to realloc // (the caller asked for more // bytes than the size of the buffer) if cap(r.data) < n { old := r.data[r.n:] r.data = make([]byte, n+r.buffered()) r.data = r.data[:copy(r.data, old)] r.n = 0 } // keep filling until // we hit an error or // read enough bytes for r.buffered() < n && r.state == nil { r.more() } // we must have hit an error if r.buffered() < n { return r.data[r.n:], r.err() } return r.data[r.n : r.n+n], nil } // discard(n) discards up to 'n' buffered bytes, and // and returns the number of bytes discarded func (r *Reader) discard(n int) int { inbuf := r.buffered() if inbuf <= n { r.n = 0 r.data = r.data[:0] return inbuf } r.n += n return n } // Skip moves the reader forward 'n' bytes. // Returns the number of bytes skipped and any // errors encountered. It is analogous to Seek(n, 1). // If the underlying reader implements io.Seeker, then // that method will be used to skip forward. // // If the reader encounters // an EOF before skipping 'n' bytes, it // returns io.ErrUnexpectedEOF. If the // underlying reader implements io.Seeker, then // those rules apply instead. (Many implementations // will not return `io.EOF` until the next call // to Read.) func (r *Reader) Skip(n int) (int, error) { if n < 0 { return 0, os.ErrInvalid } // discard some or all of the current buffer skipped := r.discard(n) // if we can Seek() through the remaining bytes, do that if n > skipped && r.rs != nil { nn, err := r.rs.Seek(int64(n-skipped), 1) return int(nn) + skipped, err } // otherwise, keep filling the buffer // and discarding it up to 'n' for skipped < n && r.state == nil { r.more() skipped += r.discard(n - skipped) } return skipped, r.noEOF() } // Next returns the next 'n' bytes in the stream. // Unlike Peek, Next advances the reader position. // The returned bytes point to the same // data as the buffer, so the slice is // only valid until the next reader method call. // An EOF is considered an unexpected error. // If an the returned slice is less than the // length asked for, an error will be returned, // and the reader position will not be incremented. func (r *Reader) Next(n int) ([]byte, error) { // in case the buffer is too small if cap(r.data) < n { old := r.data[r.n:] r.data = make([]byte, n+r.buffered()) r.data = r.data[:copy(r.data, old)] r.n = 0 } // fill at least 'n' bytes for r.buffered() < n && r.state == nil { r.more() } if r.buffered() < n { return r.data[r.n:], r.noEOF() } out := r.data[r.n : r.n+n] r.n += n return out, nil } // Read implements `io.Reader` func (r *Reader) Read(b []byte) (int, error) { // if we have data in the buffer, just // return that. if r.buffered() != 0 { x := copy(b, r.data[r.n:]) r.n += x return x, nil } var n int // we have no buffered data; determine // whether or not to buffer or call // the underlying reader directly if len(b) >= cap(r.data) { n, r.state = r.r.Read(b) } else { r.more() n = copy(b, r.data) r.n = n } if n == 0 { return 0, r.err() } return n, nil } // ReadFull attempts to read len(b) bytes into // 'b'. It returns the number of bytes read into // 'b', and an error if it does not return len(b). // EOF is considered an unexpected error. func (r *Reader) ReadFull(b []byte) (int, error) { var n int // read into b var nn int // scratch l := len(b) // either read buffered data, // or read directly for the underlying // buffer, or fetch more buffered data. for n < l && r.state == nil { if r.buffered() != 0 { nn = copy(b[n:], r.data[r.n:]) n += nn r.n += nn } else if l-n > cap(r.data) { nn, r.state = r.r.Read(b[n:]) n += nn } else { r.more() } } if n < l { return n, r.noEOF() } return n, nil } // ReadByte implements `io.ByteReader` func (r *Reader) ReadByte() (byte, error) { for r.buffered() < 1 && r.state == nil { r.more() } if r.buffered() < 1 { return 0, r.err() } b := r.data[r.n] r.n++ return b, nil } // WriteTo implements `io.WriterTo` func (r *Reader) WriteTo(w io.Writer) (int64, error) { var ( i int64 ii int err error ) // first, clear buffer if r.buffered() > 0 { ii, err = w.Write(r.data[r.n:]) i += int64(ii) if err != nil { return i, err } r.data = r.data[0:0] r.n = 0 } for r.state == nil { // here we just do // 1:1 reads and writes r.more() if r.buffered() > 0 { ii, err = w.Write(r.data) i += int64(ii) if err != nil { return i, err } r.data = r.data[0:0] r.n = 0 } } if r.state != io.EOF { return i, r.err() } return i, nil } func max(a int, b int) int { if a < b { return b } return a }