diff --git a/.github/workflows/go.yaml b/.github/workflows/go.yaml index 233dc4f..7196022 100644 --- a/.github/workflows/go.yaml +++ b/.github/workflows/go.yaml @@ -13,35 +13,34 @@ jobs: - uses: actions/checkout@v4 - name: Set up Go - uses: actions/setup-go@v4 + uses: actions/setup-go@v5 with: - go-version: 'stable' + go-version: '1.23.x' - name: Build run: go build -v ./... test: runs-on: ubuntu-latest - environment: ci steps: - uses: actions/checkout@v4 - name: Set up Go - uses: actions/setup-go@v4 + uses: actions/setup-go@v5 with: - go-version: 'stable' + go-version: '1.23.x' - name: Run tests with coverage run: go test -race -coverprofile=coverage.out -covermode=atomic ./... - name: Upload coverage to Codecov - uses: codecov/codecov-action@v4-beta + uses: codecov/codecov-action@v5 with: token: ${{ secrets.CODECOV_TOKEN }} - file: coverage.out + files: coverage.out - name: Coveralls uses: coverallsapp/github-action@v2 with: github-token: ${{ secrets.GITHUB_TOKEN }} - file: coverage.out \ No newline at end of file + file: coverage.out diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index 452989e..0000000 --- a/.travis.yml +++ /dev/null @@ -1,21 +0,0 @@ -language: go - - -go: - - tip - - 1.15.x - - 1.16.x - -before_script: - - go get -v github.com/smallnest/ringbuffer - - go get github.com/mattn/goveralls - -script: - - go test -v ./... - - goveralls -service=travis-ci - -notifications: - email: - recipients: smallnest@gmail.com - on_success: change - on_failure: always \ No newline at end of file diff --git a/Makefile b/Makefile index 7625553..6401894 100644 --- a/Makefile +++ b/Makefile @@ -137,7 +137,7 @@ install-tools: ## docs: Generate documentation docs: @echo "$(COLOR_BLUE)Generating documentation...$(COLOR_RESET)" - @echo "Open http://localhost:6060/pkg/github.com/smallnest/ringbuffer/ in your browser" + @echo "Open http://localhost:6060/pkg/github.com/argcv/ringbuffer/ in your browser" godoc -http=:6060 ## example: Run example @@ -168,6 +168,6 @@ version: info: @printf "$(COLOR_BOLD)Project Information$(COLOR_RESET)\n" @printf " Name: RingBuffer\n" - @printf " Module: github.com/smallnest/ringbuffer\n" + @printf " Module: github.com/argcv/ringbuffer\n" @printf " Go Version: $$($(GOCMD) version | cut -d' ' -f3)\n" @printf " Files: $$($(GOCMD) list ./... | wc -l | tr -d ' ')\n" diff --git a/README.md b/README.md index 208b721..2eadc45 100644 --- a/README.md +++ b/README.md @@ -1,12 +1,37 @@ # ringbuffer -[![License](https://img.shields.io/:license-MIT-blue.svg)](https://opensource.org/licenses/MIT) [![GoDoc](https://godoc.org/github.com/smallnest/ringbuffer?status.png)](http://godoc.org/github.com/smallnest/ringbuffer) [![Go Report Card](https://goreportcard.com/badge/github.com/smallnest/ringbuffer)](https://goreportcard.com/report/github.com/smallnest/ringbuffer) [![coveralls](https://coveralls.io/repos/smallnest/ringbuffer/badge.svg?branch=master&service=github)](https://coveralls.io/github/smallnest/ringbuffer?branch=master) +[![License](https://img.shields.io/:license-MIT-blue.svg)](https://opensource.org/licenses/MIT) [![GoDoc](https://godoc.org/github.com/argcv/ringbuffer?status.png)](http://godoc.org/github.com/argcv/ringbuffer) [![Go Report Card](https://goreportcard.com/badge/github.com/argcv/ringbuffer)](https://goreportcard.com/report/github.com/argcv/ringbuffer) -A circular buffer (ring buffer) in Go, implemented io.ReaderWriter interface +A circular buffer (ring buffer) in Go, now with **generics** support and modernized for Go 1.23+. -[![wikipedia](Circular_Buffer_Animation.gif)](https://github.com/smallnest/ringbuffer) +This is a hard fork of [smallnest/ringbuffer](https://github.com/smallnest/ringbuffer) with breaking changes to adopt generics and modern Go features. -# Usage +[![wikipedia](Circular_Buffer_Animation.gif)](https://github.com/argcv/ringbuffer) + +## Requirements + +- Go 1.23+ + +## Features + +- **Generic API**: `RingBuffer[T any]` supports arbitrary types, not just bytes. +- **Standard library compatibility**: `*RingBuffer[byte]` implements `io.Reader`, `io.Writer`, `io.Closer`, `io.ByteReader`, `io.ByteWriter`. +- **Blocking and non-blocking modes**: Configurable via `SetBlocking(true)`. +- **Overwrite mode**: `SetOverwrite(true)` allows overwriting oldest data when full. +- **Iterator support** (Go 1.23+): `All()` returns `iter.Seq[T]` for zero-allocation traversal. +- **Zero-copy string writing**: Modern `unsafe` usage for `WriteString`. +- **Goroutine-safe**: All operations are protected by `sync.Mutex`. +- **No external dependencies**. + +## Installation + +```bash +go get github.com/argcv/ringbuffer +``` + +## Usage + +### Byte buffer (backward compatible) ```go package main @@ -14,7 +39,7 @@ package main import ( "fmt" - "github.com/smallnest/ringbuffer" + "github.com/argcv/ringbuffer" ) func main() { @@ -32,100 +57,76 @@ func main() { } ``` -It is possible to use an existing buffer with by replacing `New` with `NewBuffer`. - - -# Blocking vs Non-blocking - -The default behavior of the ring buffer is non-blocking, -meaning that reads and writes will return immediately with an error if the operation cannot be completed. -If you want to block when reading or writing, you must enable it: +### Generic ring buffer ```go - rb := ringbuffer.New(1024).SetBlocking(true) -``` - -Enabling blocking will cause the ring buffer to behave like a buffered [io.Pipe](https://pkg.go.dev/io#Pipe). - -Regular Reads will block until data is available, but not wait for a full buffer. -Writes will block until there is space available and writes bigger than the buffer will wait for reads to make space. - -`TryRead` and `TryWrite` are still available for non-blocking reads and writes. +package main -To signify the end of the stream, close the ring buffer from the writer side with `rb.CloseWriter()` +import ( + "fmt" -Either side can use `rb.CloseWithError(err error)` to signal an error and close the ring buffer. -Any reads or writes will return the error on next call. + "github.com/argcv/ringbuffer" +) -In blocking mode errors are stateful and the same error will be returned until `rb.Reset()` is called. +func main() { + rb := ringbuffer.NewGeneric[int](1024) -It is possible to set a deadline for blocking Read/Write operations using `rb.WithDeadline(time.Duration)`. + // write + rb.Write([]int{1, 2, 3, 4}) + fmt.Println(rb.Length()) -# io.Copy replacement + // read + buf := make([]int, 4) + rb.Read(buf) + fmt.Println(buf) -The ring buffer can replace `io.Copy` and `io.CopyBuffer` to do async copying through the ring buffer. + // iterate without consuming + for v := range rb.All() { + fmt.Println(v) + } +} +``` -The copy operation will happen directly on the buffer, so between reads and writes there is no memory copy. +### Blocking vs Non-blocking -Here is a simple example where the copy operation is replaced by a ring buffer: +The default behavior is non-blocking. Enable blocking to make it behave like a buffered `io.Pipe`: ```go -func saveWebsite(url, file string) { - in, _ := http.Get(url) - out, _ := os.Create(file) +rb := ringbuffer.New(1024).SetBlocking(true) +``` - // Copy with regular buffered copy - // n, err := io.Copy(out, in.Body) +### Overwrite mode - // Copy with ring buffer - n, err := ringbuffer.New(1024).Copy(out, in.Body) - fmt.Println(n, err) -} +```go +rb := ringbuffer.New(64).SetOverwrite(true) +rb.Write([]byte("fill the buffer")) +rb.Write([]byte("new data")) // oldest data is discarded ``` -The ring buffer implements `io.ReaderFrom` and `io.WriterTo` interfaces, which allows to fill either or both -the write and read side respectively. - -This will provide an async method for writing or reading directly into the ring buffer. -These functions require that "blocking" is set on the pipe. +### io.Copy replacement -Example: +`Copy` is now a package-level function that takes a `*RingBuffer[byte]`: ```go -func readWebsite(url string) io.ReadCloser { +func saveWebsite(url, file string) { in, _ := http.Get(url) + out, _ := os.Create(file) - // Create blocking ring buffer - ring := ringbuffer.New(1024).SetBlocking(true) - - // Read from the input in a goroutine into the ring buffer - go func() { - ring.ReadFrom(in.Body) - ring.CloseWriter() - }() - return ring.ReadCloser() + // Copy with ring buffer + rb := ringbuffer.New(1024) + n, err := ringbuffer.Copy(rb, out, in.Body) + fmt.Println(n, err) } ``` -# io.Pipe replacement - -The ring buffer can be used as a compatible, but *asynchronous* replacement of `io.Pipe`. - -That means that Reads and Writes will go to the ring buffer. -Writes will complete as long as the data fits within the ring buffer. - -Reads will attempt to satisfy reads with data from the ring buffer. -The read will only block if the ring buffer is empty. +### io.Pipe replacement -In the common case, where the Read and Write side can run concurrently, -it is safe to replace `io.Pipe()` with `(*Ringbuffer).Pipe()`. - -Compare the following to the [io.Pipe example](https://pkg.go.dev/io#example-Pipe): +`Pipe` is now a package-level function: ```go func main() { // Create pipe from a 4KB ring buffer. - r, w := ringbuffer.New(4 << 10).Pipe() + r, w := ringbuffer.Pipe(4 << 10) go func() { fmt.Fprint(w, "some io.Reader stream to be read\n") @@ -138,10 +139,45 @@ func main() { } ``` -When creating the pipe, the ring buffer is internally switched to blocking mode. +### Iterator (Go 1.23+) + +```go +rb := ringbuffer.NewGeneric[string](10) +rb.Write([]string{"a", "b", "c"}) + +for s := range rb.All() { + fmt.Println(s) +} +``` + +## API Changes from upstream + +This fork contains **breaking changes**: + +| Before (upstream) | After (this fork) | +|---|---| +| `RingBuffer` (byte only) | `RingBuffer[T any]` (generic) | +| `New(size int) *RingBuffer` | `New(size int) *RingBuffer[byte]` | +| `NewBuffer(b []byte) *RingBuffer` | `NewBuffer(b []byte) *RingBuffer[byte]` | +| `NewGeneric[T](size int) *RingBuffer[T]` | **New** | +| `NewFromSlice[T](b []T) *RingBuffer[T]` | **New** | +| `rb.WriteString(s)` | `WriteString(rb, s)` (function) | +| `rb.ReadFrom(rd)` | `ReadFrom(rb, rd)` (function) | +| `rb.WriteTo(w)` | `WriteTo(rb, w)` (function) | +| `rb.Copy(dst, src)` | `Copy(rb, dst, src)` (function) | +| `rb.Pipe()` | `Pipe(size int)` or `PipeFrom(rb)` (functions) | +| `rb.All()` | **New** `iter.Seq[T]` iterator | +| `rb.Reset()` | Now clears underlying buffer with `clear()` | +| `rb.WithCancel(ctx)` | Goroutine leak fixed; exits cleanly on `Close`/`Reset` | + +## Performance + +Run benchmarks with: + +```bash +make benchmark +``` -Error reporting on Close and CloseWithError functions is similar to `io.Pipe`. +## License -It is possible to use the original ring buffer alongside the pipe functions. -So for example it is possible to "seed" the ring buffer with data, -so reads can complete at once. \ No newline at end of file +MIT diff --git a/go.mod b/go.mod index d9ad533..ee83a08 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,3 @@ -module github.com/smallnest/ringbuffer +module github.com/argcv/ringbuffer -go 1.19 +go 1.23.0 diff --git a/pipe.go b/pipe.go index aa5cf6e..b50cd06 100644 --- a/pipe.go +++ b/pipe.go @@ -16,14 +16,20 @@ import "io" // Only if the ring buffer is empty will the read block. // // It is safe (and intended) to call Read and Write in parallel with each other or with Close. -func (r *RingBuffer) Pipe() (*PipeReader, *PipeWriter) { +func Pipe(size int) (*PipeReader, *PipeWriter) { + return PipeFrom(NewRing[byte](size)) +} + +// PipeFrom creates a pipe from an existing Ring[byte]. +// The buffer will be switched to blocking mode. +func PipeFrom(r *Ring[byte]) (*PipeReader, *PipeWriter) { r.SetBlocking(true) pr := PipeReader{pipe: r} return &pr, &PipeWriter{pipe: r} } // A PipeReader is the read half of a pipe. -type PipeReader struct{ pipe *RingBuffer } +type PipeReader struct{ pipe *Ring[byte] } // Read implements the standard Read interface: // it reads data from the pipe, blocking until a writer @@ -55,13 +61,13 @@ func (r *PipeReader) CloseWithError(err error) error { } // A PipeWriter is the write half of a pipe. -type PipeWriter struct{ pipe *RingBuffer } +type PipeWriter struct{ pipe *Ring[byte] } // Write implements the standard Write interface: // it writes data to the pipe. // The Write will block until all data has been written to the ring buffer. // If the read end is closed with an error, that err is -// returned as err; otherwise err is [io.ErrClosedPipe]. +// returned as err; otherwise it is [io.ErrClosedPipe]. func (w *PipeWriter) Write(data []byte) (n int, err error) { if n, err = w.pipe.Write(data); err == ErrWriteOnClosed { // Replace error. diff --git a/ring.go b/ring.go new file mode 100644 index 0000000..18dd710 --- /dev/null +++ b/ring.go @@ -0,0 +1,923 @@ +// Copyright 2019 smallnest. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package ringbuffer + +import ( + "context" + "errors" + "io" + "iter" + "sync" + "time" +) + +var ( + // ErrTooMuchDataToWrite is returned when the data to write is more than the buffer size. + ErrTooMuchDataToWrite = errors.New("too much data to write") + + // ErrIsFull is returned when the buffer is full and not blocking. + ErrIsFull = errors.New("ringbuffer is full") + + // ErrIsEmpty is returned when the buffer is empty and not blocking. + ErrIsEmpty = errors.New("ringbuffer is empty") + + // ErrIsNotEmpty is returned when the buffer is not empty and not blocking. + ErrIsNotEmpty = errors.New("ringbuffer is not empty") + + // ErrAcquireLock is returned when the lock is not acquired on Try operations. + ErrAcquireLock = errors.New("unable to acquire lock") + + // ErrWriteOnClosed is returned when write on a closed ringbuffer. + ErrWriteOnClosed = errors.New("write on closed ringbuffer") + + // ErrReaderClosed is returned when a ReadClosed closed the ringbuffer. + ErrReaderClosed = errors.New("reader closed") + + // ErrReset is returned when Reset() is called, causing pending operations to abort. + ErrReset = errors.New("reset called") +) + +// Ring is a generic circular buffer. +// It operates like a buffered pipe, where data is written to a Ring +// and can be read back from another goroutine. +// It is safe to concurrently read and write Ring. +type Ring[T any] struct { + buf []T + size int + r int // next position to read + w int // next position to write + isFull bool + err error + block bool + overwrite bool // when true, overwrite old data when buffer is full + rTimeout time.Duration // Applies to writes (waits for the read condition) + wTimeout time.Duration // Applies to read (wait for the write condition) + mu sync.Mutex + wg sync.WaitGroup + readCond *sync.Cond // Signaled when data has been read. + writeCond *sync.Cond // Signaled when data has been written. + generation int64 // Incremented on Reset() to invalidate current waiters + done chan struct{} + closeOnce sync.Once +} + +// NewRing returns a new Ring whose buffer has the given size. +func NewRing[T any](size int) *Ring[T] { + if size < 0 { + size = 0 + } + return &Ring[T]{ + buf: make([]T, size), + size: size, + done: make(chan struct{}), + } +} + +// NewRingFromSlice returns a new Ring whose buffer is provided. +func NewRingFromSlice[T any](b []T) *Ring[T] { + return &Ring[T]{ + buf: b, + size: len(b), + done: make(chan struct{}), + } +} + +// SetBlocking sets the blocking mode of the ring buffer. +// If block is true, Read and Write will block when there is no data to read or no space to write. +// If block is false, Read and Write will return ErrIsEmpty or ErrIsFull immediately. +// By default, the ring buffer is not blocking. +// This setting should be called before any Read or Write operation or after a Reset. +func (r *Ring[T]) SetBlocking(block bool) *Ring[T] { + r.block = block + if block { + r.readCond = sync.NewCond(&r.mu) + r.writeCond = sync.NewCond(&r.mu) + } + return r +} + +// SetOverwrite sets the overwrite mode of the ring buffer. +// If overwrite is true, Write operations will overwrite the oldest data when the buffer is full, +// similar to a traditional circular buffer. The read pointer will advance to skip overwritten data. +// If overwrite is false (default), Write will return ErrIsFull or block (if blocking mode is enabled). +func (r *Ring[T]) SetOverwrite(overwrite bool) *Ring[T] { + r.overwrite = overwrite + return r +} + +// WithCancel sets a context to cancel the ring buffer. +// When the context is canceled, the ring buffer will be closed with the context error. +// The goroutine will exit cleanly when the ring buffer is closed. +func (r *Ring[T]) WithCancel(ctx context.Context) *Ring[T] { + go func() { + select { + case <-ctx.Done(): + r.CloseWithError(ctx.Err()) + case <-r.done: + } + }() + return r +} + +// WithTimeout will set a blocking read/write timeout. +// If no reads or writes occur within the timeout, +// the ringbuffer will be closed and context.DeadlineExceeded will be returned. +// A timeout of 0 or less will disable timeouts (default). +func (r *Ring[T]) WithTimeout(d time.Duration) *Ring[T] { + r.mu.Lock() + r.rTimeout = d + r.wTimeout = d + r.mu.Unlock() + return r +} + +// WithReadTimeout will set a blocking read timeout. +// Reads refers to any call that reads data from the buffer. +// If no writes occur within the timeout, +// the ringbuffer will be closed and context.DeadlineExceeded will be returned. +// A timeout of 0 or less will disable timeouts (default). +func (r *Ring[T]) WithReadTimeout(d time.Duration) *Ring[T] { + r.mu.Lock() + // Read operations wait for writes to complete, + // therefore we set the wTimeout. + r.wTimeout = d + r.mu.Unlock() + return r +} + +// WithWriteTimeout will set a blocking write timeout. +// Write refers to any call that writes data into the buffer. +// If no reads occur within the timeout, +// the ringbuffer will be closed and context.DeadlineExceeded will be returned. +// A timeout of 0 or less will disable timeouts (default). +func (r *Ring[T]) WithWriteTimeout(d time.Duration) *Ring[T] { + r.mu.Lock() + // Write operations wait for reads to complete, + // therefore we set the rTimeout. + r.rTimeout = d + r.mu.Unlock() + return r +} + +func (r *Ring[T]) setErr(err error, locked bool) error { + if !locked { + r.mu.Lock() + defer r.mu.Unlock() + } + if r.err != nil && r.err != io.EOF { + return r.err + } + + switch err { + // Internal errors are transient + case nil, ErrIsEmpty, ErrIsFull, ErrAcquireLock, ErrTooMuchDataToWrite, ErrIsNotEmpty: + return err + default: + r.err = err + if r.block { + r.readCond.Broadcast() + r.writeCond.Broadcast() + } + } + return err +} + +func (r *Ring[T]) readErr(locked bool) error { + if !locked { + r.mu.Lock() + defer r.mu.Unlock() + } + if r.err != nil { + if r.err == io.EOF { + if r.w == r.r && !r.isFull { + return io.EOF + } + return nil + } + return r.err + } + return nil +} + +// Read reads up to len(p) items into p. It returns the number of items read (0 <= n <= len(p)) and any error encountered. +// Even if Read returns n < len(p), it may use all of p as scratch space during the call. +// If some data is available but not len(p) items, Read conventionally returns what is available instead of waiting for more. +// When Read encounters an error or end-of-file condition after successfully reading n > 0 items, it returns the number of items read. +// It may return the (non-nil) error from the same call or return the error (and n == 0) from a subsequent call. +// Callers should always process the n > 0 items returned before considering the error err. +// Doing so correctly handles I/O errors that happen after reading some items and also both of the allowed EOF behaviors. +func (r *Ring[T]) Read(p []T) (n int, err error) { + if len(p) == 0 { + return 0, r.readErr(false) + } + + r.mu.Lock() + defer r.mu.Unlock() + if err := r.readErr(true); err != nil { + return 0, err + } + + r.wg.Add(1) + defer r.wg.Done() + n, err = r.read(p) + for err == ErrIsEmpty && r.block { + // Record the generation before waiting. If Reset() increments the generation, + // we will detect it after waking up and continue waiting (as if nothing happened). + myGen := r.generation + if !r.waitWrite() { + return 0, context.DeadlineExceeded + } + // If generation changed, Reset() happened while we were waiting. + // Go back to waiting - the buffer was reset and we should continue as normal. + if myGen != r.generation { + continue + } + if err = r.readErr(true); err != nil { + break + } + n, err = r.read(p) + } + if r.block && n > 0 { + r.readCond.Broadcast() + } + return n, err +} + +// TryRead read up to len(p) items into p like Read, but it is never blocking. +// If it does not succeed to acquire the lock, it returns ErrAcquireLock. +func (r *Ring[T]) TryRead(p []T) (n int, err error) { + ok := r.mu.TryLock() + if !ok { + return 0, ErrAcquireLock + } + defer r.mu.Unlock() + if err := r.readErr(true); err != nil { + return 0, err + } + if len(p) == 0 { + return 0, r.readErr(true) + } + + n, err = r.read(p) + if r.block && n > 0 { + r.readCond.Broadcast() + } + return n, err +} + +// copyFromBuffer copies data from the ring buffer to dst without modifying the read pointer. +// Returns the number of items copied. Does not check for errors. +func (r *Ring[T]) copyFromBuffer(dst []T) int { + if r.w == r.r && !r.isFull { + return 0 + } + + var n int + if r.w > r.r { + n = min(r.w-r.r, len(dst)) + copy(dst, r.buf[r.r:r.r+n]) + return n + } + + n = min(r.size-r.r+r.w, len(dst)) + + if r.r+n <= r.size { + copy(dst, r.buf[r.r:r.r+n]) + } else { + c1 := r.size - r.r + copy(dst, r.buf[r.r:r.size]) + c2 := n - c1 + copy(dst[c1:], r.buf[0:c2]) + } + return n +} + +func (r *Ring[T]) read(p []T) (n int, err error) { + if r.w == r.r && !r.isFull { + return 0, ErrIsEmpty + } + + n = r.copyFromBuffer(p) + if n == 0 { + return 0, ErrIsEmpty + } + + r.r = (r.r + n) % r.size + r.isFull = false + + return n, r.readErr(true) +} + +// Returns true if a read may have happened. +// Returns false if waited longer than rTimeout. +// Must be called when locked and returns locked. +func (r *Ring[T]) waitRead() (ok bool) { + if r.rTimeout <= 0 { + r.readCond.Wait() + return true + } + start := time.Now() + defer time.AfterFunc(r.rTimeout, r.readCond.Broadcast).Stop() + + r.readCond.Wait() + if time.Since(start) >= r.rTimeout { + r.setErr(context.DeadlineExceeded, true) //nolint errcheck + return false + } + return true +} + +func (r *Ring[T]) readOne() (b T, err error) { + if err = r.readErr(true); err != nil { + return b, err + } + for r.w == r.r && !r.isFull { + if r.block { + // Record the generation before waiting. If Reset() increments the generation, + // we will detect it after waking up and continue waiting. + myGen := r.generation + if !r.waitWrite() { + return b, context.DeadlineExceeded + } + // If generation changed, Reset() happened while we were waiting. + // Go back to waiting - the buffer was reset. + if myGen != r.generation { + continue + } + err = r.readErr(true) + if err != nil { + return b, err + } + continue + } + return b, ErrIsEmpty + } + b = r.buf[r.r] + r.r++ + if r.r == r.size { + r.r = 0 + } + + r.isFull = false + return b, r.readErr(true) +} + +// ReadOne reads and returns the next item from the input or ErrIsEmpty. +func (r *Ring[T]) ReadOne() (b T, err error) { + r.mu.Lock() + defer r.mu.Unlock() + return r.readOne() +} + +// TryReadOne reads and returns the next item without blocking. +// If it does not succeed to acquire the lock, it returns ErrAcquireLock. +func (r *Ring[T]) TryReadOne() (T, error) { + var zero T + ok := r.mu.TryLock() + if !ok { + return zero, ErrAcquireLock + } + defer r.mu.Unlock() + if err := r.readErr(true); err != nil { + return zero, err + } + return r.readOne() +} + +// checkWriteErr checks if the buffer has an error that should prevent writes. +// Returns the appropriate error to return (nil if write can proceed). +// Must be called with mutex held. +func (r *Ring[T]) checkWriteErr() error { + if r.err == nil { + return nil + } + if r.err == io.EOF { + return ErrWriteOnClosed + } + return r.err +} + +// Write writes len(p) items from p to the underlying buf. +// It returns the number of items written from p (0 <= n <= len(p)) +// and any error encountered that caused the write to stop early. +// If blocking n < len(p) will be returned only if an error occurred. +// Write returns a non-nil error if it returns n < len(p). +// Write will not modify the slice data, even temporarily. +func (r *Ring[T]) Write(p []T) (n int, err error) { + if len(p) == 0 { + return 0, r.setErr(nil, false) + } + r.mu.Lock() + defer r.mu.Unlock() + if err := r.checkWriteErr(); err != nil { + return 0, err + } + wrote := 0 + for len(p) > 0 { + n, err = r.write(p) + wrote += n + if !r.block || err == nil { + break + } + err = r.setErr(err, true) + if r.block && (err == ErrIsFull || err == ErrTooMuchDataToWrite) { + // Record the generation before waiting. If Reset() increments the generation, + // we will detect it after waking up and continue waiting. + myGen := r.generation + r.writeCond.Broadcast() + r.waitRead() + // If generation changed, Reset() happened while we were waiting. + // Go back to waiting - the buffer was reset. + if myGen != r.generation { + p = p[n:] + err = nil + continue + } + p = p[n:] + err = nil + continue + } + break + } + if r.block && wrote > 0 { + r.writeCond.Broadcast() + } + + return wrote, r.setErr(err, true) +} + +// waitWrite will wait for a write event. +// Returns true if a write may have happened. +// Returns false if waited longer than wTimeout. +// Must be called when locked and returns locked. +func (r *Ring[T]) waitWrite() (ok bool) { + if r.wTimeout <= 0 { + r.writeCond.Wait() + return true + } + + start := time.Now() + defer time.AfterFunc(r.wTimeout, r.writeCond.Broadcast).Stop() + + r.writeCond.Wait() + if time.Since(start) >= r.wTimeout { + r.setErr(context.DeadlineExceeded, true) //nolint errcheck + return false + } + return true +} + +// TryWrite writes len(p) items from p to the underlying buf like Write, but it is not blocking. +// If it does not succeed to acquire the lock, it returns ErrAcquireLock. +func (r *Ring[T]) TryWrite(p []T) (n int, err error) { + if len(p) == 0 { + return 0, r.setErr(nil, false) + } + ok := r.mu.TryLock() + if !ok { + return 0, ErrAcquireLock + } + defer r.mu.Unlock() + if err := r.checkWriteErr(); err != nil { + return 0, err + } + + n, err = r.write(p) + if r.block && n > 0 { + r.writeCond.Broadcast() + } + return n, r.setErr(err, true) +} + +func (r *Ring[T]) write(p []T) (n int, err error) { + // In overwrite mode, we always allow writing by discarding old data + if r.overwrite && len(p) > 0 { + var avail int + if r.w == r.r && r.isFull { + // Buffer is full, no space available + avail = 0 + } else if r.w >= r.r { + avail = r.size - r.w + r.r + } else { + avail = r.r - r.w + } + + // If we need more space than available, discard old data + if len(p) > avail { + // Advance read pointer to make room + needed := len(p) - avail + r.r = (r.r + needed) % r.size + // If buffer was full, it's no longer full after advancing read pointer + r.isFull = false + } + } + + if r.isFull { + return 0, ErrIsFull + } + + var avail int + if r.w >= r.r { + avail = r.size - r.w + r.r + } else { + avail = r.r - r.w + } + + if len(p) > avail { + err = ErrTooMuchDataToWrite + p = p[:avail] + } + n = len(p) + + if r.w >= r.r { + c1 := r.size - r.w + if c1 >= n { + copy(r.buf[r.w:], p) + r.w += n + } else { + copy(r.buf[r.w:], p[:c1]) + c2 := n - c1 + copy(r.buf[0:], p[c1:]) + r.w = c2 + } + } else { + copy(r.buf[r.w:], p) + r.w += n + } + + if r.w == r.size { + r.w = 0 + } + if r.w == r.r { + r.isFull = true + } + + return n, err +} + +// WriteOne writes one item into buffer, and returns ErrIsFull if the buffer is full. +func (r *Ring[T]) WriteOne(c T) error { + r.mu.Lock() + defer r.mu.Unlock() + if err := r.checkWriteErr(); err != nil { + return err + } + err := r.writeByte(c) + for err == ErrIsFull && r.block { + // Record the generation before waiting. If Reset() increments the generation, + // we will detect it after waking up and continue waiting. + myGen := r.generation + if !r.waitRead() { + return context.DeadlineExceeded + } + // If generation changed, Reset() happened while we were waiting. + // Go back to waiting - the buffer was reset. + if myGen != r.generation { + err = r.writeByte(c) + continue + } + err = r.setErr(r.writeByte(c), true) + } + if r.block && err == nil { + r.writeCond.Broadcast() + } + return err +} + +// TryWriteOne writes one item into buffer without blocking. +// If it does not succeed to acquire the lock, it returns ErrAcquireLock. +func (r *Ring[T]) TryWriteOne(c T) error { + ok := r.mu.TryLock() + if !ok { + return ErrAcquireLock + } + defer r.mu.Unlock() + if err := r.checkWriteErr(); err != nil { + return err + } + + err := r.writeByte(c) + if err == nil && r.block { + r.writeCond.Broadcast() + } + return err +} + +func (r *Ring[T]) writeByte(c T) error { + if r.err != nil { + return r.err + } + if r.w == r.r && r.isFull { + // In overwrite mode, discard the oldest item and write the new one + if r.overwrite { + r.r++ + if r.r == r.size { + r.r = 0 + } + } else { + return ErrIsFull + } + } + r.buf[r.w] = c + r.w++ + + if r.w == r.size { + r.w = 0 + } + if r.w == r.r { + r.isFull = true + } + + return nil +} + +// Length returns the number of items that can be read without blocking. +func (r *Ring[T]) Length() int { + r.mu.Lock() + defer r.mu.Unlock() + + if r.w == r.r { + if r.isFull { + return r.size + } + return 0 + } + + if r.w > r.r { + return r.w - r.r + } + + return r.size - r.r + r.w +} + +// Capacity returns the size of the underlying buffer. +func (r *Ring[T]) Capacity() int { + return r.size +} + +// Free returns the number of items that can be written without blocking. +func (r *Ring[T]) Free() int { + r.mu.Lock() + defer r.mu.Unlock() + + if r.w == r.r { + if r.isFull { + return 0 + } + return r.size + } + + if r.w < r.r { + return r.r - r.w + } + + return r.size - r.w + r.r +} + +// IsFull returns true when the ringbuffer is full. +func (r *Ring[T]) IsFull() bool { + r.mu.Lock() + defer r.mu.Unlock() + + return r.isFull +} + +// IsEmpty returns true when the ringbuffer is empty. +func (r *Ring[T]) IsEmpty() bool { + r.mu.Lock() + defer r.mu.Unlock() + + return !r.isFull && r.w == r.r +} + +// CloseWithError closes the writer; reads will return +// no items and the error err, or EOF if err is nil. +// +// CloseWithError never overwrites the previous error if it exists +// and always returns nil. +func (r *Ring[T]) CloseWithError(err error) { + if err == nil { + err = io.EOF + } + r.setErr(err, false) //nolint errcheck + r.closeOnce.Do(func() { close(r.done) }) +} + +// CloseWriter closes the writer. +// Reads will return any remaining items and io.EOF. +func (r *Ring[T]) CloseWriter() { + r.setErr(io.EOF, false) //nolint errcheck + r.closeOnce.Do(func() { close(r.done) }) +} + +// Flush waits for the buffer to be empty and fully read. +// If not blocking ErrIsNotEmpty will be returned if the buffer still contains data. +func (r *Ring[T]) Flush() error { + r.mu.Lock() + defer r.mu.Unlock() + for r.w != r.r || r.isFull { + err := r.readErr(true) + if err != nil { + if err == io.EOF { + err = nil + } + return err + } + if !r.block { + return ErrIsNotEmpty + } + if !r.waitRead() { + return context.DeadlineExceeded + } + } + + err := r.readErr(true) + if err == io.EOF { + return nil + } + return err +} + +// Reset the read pointer and writer pointer to zero. +func (r *Ring[T]) Reset() { + r.mu.Lock() + defer r.mu.Unlock() + + // Signal any WithCancel goroutines to exit + r.closeOnce.Do(func() { close(r.done) }) + + if r.block { + // In blocking mode, increment generation to invalidate current waiters. + // They will wake up, see the generation changed, and go back to waiting. + // This makes Reset() transparent to waiters - they continue waiting as if nothing happened. + // + // Note: We don't wait for wg.Wait() in blocking mode since waiters may + // block indefinitely. The generation mechanism ensures they handle the reset correctly. + r.generation++ + r.readCond.Broadcast() + r.writeCond.Broadcast() + r.r = 0 + r.w = 0 + r.err = nil + r.isFull = false + clear(r.buf) + r.done = make(chan struct{}) + r.closeOnce = sync.Once{} + return + } + + // In non-blocking mode, set error to return immediately to any readers/writers. + r.setErr(ErrReset, true) //nolint errcheck + + // Unlock the mutex so readers/writers can finish. + r.mu.Unlock() + r.wg.Wait() + r.mu.Lock() + r.r = 0 + r.w = 0 + r.err = nil + r.isFull = false + clear(r.buf) + r.done = make(chan struct{}) + r.closeOnce = sync.Once{} +} + +// WriteCloser returns a WriteCloser that writes to the ring buffer. +// When the returned WriteCloser is closed, it will wait for all data to be read before returning. +func (r *Ring[T]) WriteCloser() *writeCloser[T] { + return &writeCloser[T]{r} +} + +type writeCloser[T any] struct { + *Ring[T] +} + +// Close provides a close method for the WriteCloser. +func (wc *writeCloser[T]) Close() error { + wc.CloseWriter() + return wc.Flush() +} + +// ReadCloser returns a ReadCloser that reads to the ring buffer. +// When the returned ReadCloser is closed, ErrReaderClosed will be returned on any writes done afterwards. +func (r *Ring[T]) ReadCloser() *readCloser[T] { + return &readCloser[T]{r} +} + +type readCloser[T any] struct { + *Ring[T] +} + +// Close provides a close method for the ReadCloser. +func (rc *readCloser[T]) Close() error { + rc.CloseWithError(ErrReaderClosed) + err := rc.readErr(false) + if err == ErrReaderClosed { + err = nil + } + return err +} + +// Peek reads up to len(p) items into p without moving the read pointer. +func (r *Ring[T]) Peek(p []T) (n int, err error) { + if len(p) == 0 { + return 0, r.readErr(false) + } + + r.mu.Lock() + defer r.mu.Unlock() + if err := r.readErr(true); err != nil { + return 0, err + } + + return r.peek(p) +} + +func (r *Ring[T]) peek(p []T) (n int, err error) { + n = r.copyFromBuffer(p) + if n == 0 { + return 0, ErrIsEmpty + } + return n, r.readErr(true) +} + +// Bytes returns all available read items. +// It does not move the read pointer and only copies the available data. +// If the dst is big enough, it will be used as destination, +// otherwise a new buffer will be allocated. +func (r *Ring[T]) Bytes(dst []T) []T { + r.mu.Lock() + defer r.mu.Unlock() + getDst := func(n int) []T { + if cap(dst) < n { + return make([]T, n) + } + return dst[:n] + } + + if r.w == r.r { + if r.isFull { + buf := getDst(r.size) + copy(buf, r.buf[r.r:]) + copy(buf[r.size-r.r:], r.buf[:r.w]) + return buf + } + return nil + } + + if r.w > r.r { + buf := getDst(r.w - r.r) + copy(buf, r.buf[r.r:r.w]) + return buf + } + + n := r.size - r.r + r.w + buf := getDst(n) + + if r.r+n < r.size { + copy(buf, r.buf[r.r:r.r+n]) + } else { + c1 := r.size - r.r + copy(buf, r.buf[r.r:r.size]) + c2 := n - c1 + copy(buf[c1:], r.buf[0:c2]) + } + + return buf +} + +// All returns an iterator over the unread items in the buffer. +// It does not modify the read pointer. +func (r *Ring[T]) All() iter.Seq[T] { + return func(yield func(T) bool) { + r.mu.Lock() + defer r.mu.Unlock() + + if r.w == r.r && !r.isFull { + return + } + + if r.w > r.r { + for i := r.r; i < r.w; i++ { + if !yield(r.buf[i]) { + return + } + } + return + } + + // Handle wrap-around + for i := r.r; i < r.size; i++ { + if !yield(r.buf[i]) { + return + } + } + for i := 0; i < r.w; i++ { + if !yield(r.buf[i]) { + return + } + } + } +} diff --git a/ring_buffer.go b/ring_buffer.go index c617ffb..bb16dcd 100644 --- a/ring_buffer.go +++ b/ring_buffer.go @@ -13,437 +13,88 @@ import ( "unsafe" ) -var ( - // ErrTooMuchDataToWrite is returned when the data to write is more than the buffer size. - ErrTooMuchDataToWrite = errors.New("too much data to write") - - // ErrIsFull is returned when the buffer is full and not blocking. - ErrIsFull = errors.New("ringbuffer is full") - - // ErrIsEmpty is returned when the buffer is empty and not blocking. - ErrIsEmpty = errors.New("ringbuffer is empty") - - // ErrIsNotEmpty is returned when the buffer is not empty and not blocking. - ErrIsNotEmpty = errors.New("ringbuffer is not empty") - - // ErrAcquireLock is returned when the lock is not acquired on Try operations. - ErrAcquireLock = errors.New("unable to acquire lock") - - // ErrWriteOnClosed is returned when write on a closed ringbuffer. - ErrWriteOnClosed = errors.New("write on closed ringbuffer") - - // ErrReaderClosed is returned when a ReadClosed closed the ringbuffer. - ErrReaderClosed = errors.New("reader closed") - - // ErrReset is returned when Reset() is called, causing pending operations to abort. - ErrReset = errors.New("reset called") -) - -// RingBuffer is a circular buffer that implements io.ReaderWriter interface. -// It operates like a buffered pipe, where data is written to a RingBuffer -// and can be read back from another goroutine. -// It is safe to concurrently read and write RingBuffer. +// RingBuffer is a circular buffer for byte slices. +// It wraps *Ring[byte] and provides all standard library io interfaces +// as well as convenience methods for byte-stream operations. type RingBuffer struct { - buf []byte - size int - r int // next position to read - w int // next position to write - isFull bool - err error - block bool - overwrite bool // when true, overwrite old data when buffer is full - rTimeout time.Duration // Applies to writes (waits for the read condition) - wTimeout time.Duration // Applies to read (wait for the write condition) - mu sync.Mutex - wg sync.WaitGroup - readCond *sync.Cond // Signaled when data has been read. - writeCond *sync.Cond // Signaled when data has been written. - generation int64 // Incremented on Reset() to invalidate current waiters + *Ring[byte] } // New returns a new RingBuffer whose buffer has the given size. func New(size int) *RingBuffer { - return &RingBuffer{ - buf: make([]byte, size), - size: size, - } + return &RingBuffer{Ring: NewRing[byte](size)} } // NewBuffer returns a new RingBuffer whose buffer is provided. func NewBuffer(b []byte) *RingBuffer { - return &RingBuffer{ - buf: b, - size: len(b), - } + return &RingBuffer{Ring: NewRingFromSlice(b)} } // SetBlocking sets the blocking mode of the ring buffer. -// If block is true, Read and Write will block when there is no data to read or no space to write. -// If block is false, Read and Write will return ErrIsEmpty or ErrIsFull immediately. -// By default, the ring buffer is not blocking. -// This setting should be called before any Read or Write operation or after a Reset. func (r *RingBuffer) SetBlocking(block bool) *RingBuffer { - r.block = block - if block { - r.readCond = sync.NewCond(&r.mu) - r.writeCond = sync.NewCond(&r.mu) - } + r.Ring.SetBlocking(block) return r } // SetOverwrite sets the overwrite mode of the ring buffer. -// If overwrite is true, Write operations will overwrite the oldest data when the buffer is full, -// similar to a traditional circular buffer. The read pointer will advance to skip overwritten data. -// If overwrite is false (default), Write will return ErrIsFull or block (if blocking mode is enabled). func (r *RingBuffer) SetOverwrite(overwrite bool) *RingBuffer { - r.overwrite = overwrite + r.Ring.SetOverwrite(overwrite) return r } // WithCancel sets a context to cancel the ring buffer. -// When the context is canceled, the ring buffer will be closed with the context error. -// A goroutine will be started and run until the provided context is canceled. func (r *RingBuffer) WithCancel(ctx context.Context) *RingBuffer { - go func() { - <-ctx.Done() - r.CloseWithError(ctx.Err()) - }() + r.Ring.WithCancel(ctx) return r } // WithTimeout will set a blocking read/write timeout. -// If no reads or writes occur within the timeout, -// the ringbuffer will be closed and context.DeadlineExceeded will be returned. -// A timeout of 0 or less will disable timeouts (default). func (r *RingBuffer) WithTimeout(d time.Duration) *RingBuffer { - r.mu.Lock() - r.rTimeout = d - r.wTimeout = d - r.mu.Unlock() + r.Ring.WithTimeout(d) return r } // WithReadTimeout will set a blocking read timeout. -// Reads refers to any call that reads data from the buffer. -// If no writes occur within the timeout, -// the ringbuffer will be closed and context.DeadlineExceeded will be returned. -// A timeout of 0 or less will disable timeouts (default). func (r *RingBuffer) WithReadTimeout(d time.Duration) *RingBuffer { - r.mu.Lock() - // Read operations wait for writes to complete, - // therefore we set the wTimeout. - r.wTimeout = d - r.mu.Unlock() + r.Ring.WithReadTimeout(d) return r } // WithWriteTimeout will set a blocking write timeout. -// Write refers to any call that writes data into the buffer. -// If no reads occur within the timeout, -// the ringbuffer will be closed and context.DeadlineExceeded will be returned. -// A timeout of 0 or less will disable timeouts (default). func (r *RingBuffer) WithWriteTimeout(d time.Duration) *RingBuffer { - r.mu.Lock() - // Write operations wait for reads to complete, - // therefore we set the rTimeout. - r.rTimeout = d - r.mu.Unlock() + r.Ring.WithWriteTimeout(d) return r } -func (r *RingBuffer) setErr(err error, locked bool) error { - if !locked { - r.mu.Lock() - defer r.mu.Unlock() - } - if r.err != nil && r.err != io.EOF { - return r.err - } - - switch err { - // Internal errors are transient - case nil, ErrIsEmpty, ErrIsFull, ErrAcquireLock, ErrTooMuchDataToWrite, ErrIsNotEmpty: - return err - default: - r.err = err - if r.block { - r.readCond.Broadcast() - r.writeCond.Broadcast() - } - } - return err -} - -func (r *RingBuffer) readErr(locked bool) error { - if !locked { - r.mu.Lock() - defer r.mu.Unlock() - } - if r.err != nil { - if r.err == io.EOF { - if r.w == r.r && !r.isFull { - return io.EOF - } - return nil - } - return r.err - } - return nil -} - -// Read reads up to len(p) bytes into p. It returns the number of bytes read (0 <= n <= len(p)) and any error encountered. -// Even if Read returns n < len(p), it may use all of p as scratch space during the call. -// If some data is available but not len(p) bytes, Read conventionally returns what is available instead of waiting for more. -// When Read encounters an error or end-of-file condition after successfully reading n > 0 bytes, it returns the number of bytes read. -// It may return the (non-nil) error from the same call or return the error (and n == 0) from a subsequent call. -// Callers should always process the n > 0 bytes returned before considering the error err. -// Doing so correctly handles I/O errors that happen after reading some bytes and also both of the allowed EOF behaviors. -func (r *RingBuffer) Read(p []byte) (n int, err error) { - if len(p) == 0 { - return 0, r.readErr(false) - } - - r.mu.Lock() - defer r.mu.Unlock() - if err := r.readErr(true); err != nil { - return 0, err - } - - r.wg.Add(1) - defer r.wg.Done() - n, err = r.read(p) - for err == ErrIsEmpty && r.block { - // Record the generation before waiting. If Reset() increments the generation, - // we will detect it after waking up and continue waiting (as if nothing happened). - myGen := r.generation - if !r.waitWrite() { - return 0, context.DeadlineExceeded - } - // If generation changed, Reset() happened while we were waiting. - // Go back to waiting - the buffer was reset and we should continue as normal. - if myGen != r.generation { - continue - } - if err = r.readErr(true); err != nil { - break - } - n, err = r.read(p) - } - if r.block && n > 0 { - r.readCond.Broadcast() - } - return n, err +// ReadByte reads and returns the next byte from the input or ErrIsEmpty. +func (r *RingBuffer) ReadByte() (byte, error) { + return r.Ring.ReadOne() } -// TryRead read up to len(p) bytes into p like Read, but it is never blocking. +// TryReadByte reads and returns the next byte without blocking. // If it does not succeed to acquire the lock, it returns ErrAcquireLock. -func (r *RingBuffer) TryRead(p []byte) (n int, err error) { - ok := r.mu.TryLock() - if !ok { - return 0, ErrAcquireLock - } - defer r.mu.Unlock() - if err := r.readErr(true); err != nil { - return 0, err - } - if len(p) == 0 { - return 0, r.readErr(true) - } - - n, err = r.read(p) - if r.block && n > 0 { - r.readCond.Broadcast() - } - return n, err -} - -// copyFromBuffer copies data from the ring buffer to dst without modifying the read pointer. -// Returns the number of bytes copied. Does not check for errors. -func (r *RingBuffer) copyFromBuffer(dst []byte) int { - if r.w == r.r && !r.isFull { - return 0 - } - - var n int - if r.w > r.r { - n = r.w - r.r - if n > len(dst) { - n = len(dst) - } - copy(dst, r.buf[r.r:r.r+n]) - return n - } - - n = r.size - r.r + r.w - if n > len(dst) { - n = len(dst) - } - - if r.r+n <= r.size { - copy(dst, r.buf[r.r:r.r+n]) - } else { - c1 := r.size - r.r - copy(dst, r.buf[r.r:r.size]) - c2 := n - c1 - copy(dst[c1:], r.buf[0:c2]) - } - return n -} - -func (r *RingBuffer) read(p []byte) (n int, err error) { - if r.w == r.r && !r.isFull { - return 0, ErrIsEmpty - } - - n = r.copyFromBuffer(p) - if n == 0 { - return 0, ErrIsEmpty - } - - r.r = (r.r + n) % r.size - r.isFull = false - - return n, r.readErr(true) -} - -// Returns true if a read may have happened. -// Returns false if waited longer than rTimeout. -// Must be called when locked and returns locked. -func (r *RingBuffer) waitRead() (ok bool) { - if r.rTimeout <= 0 { - r.readCond.Wait() - return true - } - start := time.Now() - defer time.AfterFunc(r.rTimeout, r.readCond.Broadcast).Stop() - - r.readCond.Wait() - if time.Since(start) >= r.rTimeout { - r.setErr(context.DeadlineExceeded, true) //nolint errcheck - return false - } - return true +func (r *RingBuffer) TryReadByte() (byte, error) { + return r.Ring.TryReadOne() } -// ReadByte reads and returns the next byte from the input or ErrIsEmpty. -func (r *RingBuffer) ReadByte() (b byte, err error) { - r.mu.Lock() - defer r.mu.Unlock() - if err = r.readErr(true); err != nil { - return 0, err - } - for r.w == r.r && !r.isFull { - if r.block { - // Record the generation before waiting. If Reset() increments the generation, - // we will detect it after waking up and continue waiting. - myGen := r.generation - if !r.waitWrite() { - return 0, context.DeadlineExceeded - } - // If generation changed, Reset() happened while we were waiting. - // Go back to waiting - the buffer was reset. - if myGen != r.generation { - continue - } - err = r.readErr(true) - if err != nil { - return 0, err - } - continue - } - return 0, ErrIsEmpty - } - b = r.buf[r.r] - r.r++ - if r.r == r.size { - r.r = 0 - } - - r.isFull = false - return b, r.readErr(true) +// WriteByte writes one byte into buffer, and returns ErrIsFull if the buffer is full. +func (r *RingBuffer) WriteByte(c byte) error { + return r.Ring.WriteOne(c) } -// checkWriteErr checks if the buffer has an error that should prevent writes. -// Returns the appropriate error to return (nil if write can proceed). -// Must be called with mutex held. -func (r *RingBuffer) checkWriteErr() error { - if r.err == nil { - return nil - } - if r.err == io.EOF { - return ErrWriteOnClosed - } - return r.err +// TryWriteByte writes one byte into buffer without blocking. +// If it does not succeed to acquire the lock, it returns ErrAcquireLock. +func (r *RingBuffer) TryWriteByte(c byte) error { + return r.Ring.TryWriteOne(c) } -// Write writes len(p) bytes from p to the underlying buf. -// It returns the number of bytes written from p (0 <= n <= len(p)) -// and any error encountered that caused the write to stop early. -// If blocking n < len(p) will be returned only if an error occurred. -// Write returns a non-nil error if it returns n < len(p). -// Write will not modify the slice data, even temporarily. -func (r *RingBuffer) Write(p []byte) (n int, err error) { - if len(p) == 0 { +// WriteString writes the contents of the string s to buffer. +func (r *RingBuffer) WriteString(s string) (n int, err error) { + if len(s) == 0 { return 0, r.setErr(nil, false) } - r.mu.Lock() - defer r.mu.Unlock() - if err := r.checkWriteErr(); err != nil { - return 0, err - } - wrote := 0 - for len(p) > 0 { - n, err = r.write(p) - wrote += n - if !r.block || err == nil { - break - } - err = r.setErr(err, true) - if r.block && (err == ErrIsFull || err == ErrTooMuchDataToWrite) { - // Record the generation before waiting. If Reset() increments the generation, - // we will detect it after waking up and continue waiting. - myGen := r.generation - r.writeCond.Broadcast() - r.waitRead() - // If generation changed, Reset() happened while we were waiting. - // Go back to waiting - the buffer was reset. - if myGen != r.generation { - p = p[n:] - err = nil - continue - } - p = p[n:] - err = nil - continue - } - break - } - if r.block && wrote > 0 { - r.writeCond.Broadcast() - } - - return wrote, r.setErr(err, true) -} - -// waitWrite will wait for a write event. -// Returns true if a write may have happened. -// Returns false if waited longer than wTimeout. -// Must be called when locked and returns locked. -func (r *RingBuffer) waitWrite() (ok bool) { - if r.wTimeout <= 0 { - r.writeCond.Wait() - return true - } - - start := time.Now() - defer time.AfterFunc(r.wTimeout, r.writeCond.Broadcast).Stop() - - r.writeCond.Wait() - if time.Since(start) >= r.wTimeout { - r.setErr(context.DeadlineExceeded, true) //nolint errcheck - return false - } - return true + buf := unsafe.Slice(unsafe.StringData(s), len(s)) + return r.Write(buf) } // ReadFrom will fulfill the write side of the ringbuffer. @@ -453,7 +104,7 @@ func (r *RingBuffer) waitWrite() (ok bool) { // ReadFrom will not automatically close the buffer even after returning. // For that call CloseWriter(). // -// ReadFrom reads data from r until EOF or error. +// ReadFrom reads data from rd until EOF or error. // The return value n is the number of bytes read. // Any error except EOF encountered during the read is also returned, // and the error will cause the Read side to fail as well. @@ -556,9 +207,7 @@ func (r *RingBuffer) WriteTo(w io.Writer) (n int64, err error) { // Before reader, we can read until writer. toWrite = r.buf[r.r:r.w] } - if len(toWrite) > maxWrite { - toWrite = toWrite[:maxWrite] - } + toWrite = toWrite[:min(len(toWrite), maxWrite)] // Unlock while reading r.mu.Unlock() nr, werr := w.Write(toWrite) @@ -608,418 +257,20 @@ func (r *RingBuffer) Copy(dst io.Writer, src io.Reader) (written int64, err erro return r.WriteTo(dst) } -// TryWrite writes len(p) bytes from p to the underlying buf like Write, but it is not blocking. -// If it does not succeed to acquire the lock, it returns ErrAcquireLock. -func (r *RingBuffer) TryWrite(p []byte) (n int, err error) { - if len(p) == 0 { - return 0, r.setErr(nil, false) - } - ok := r.mu.TryLock() - if !ok { - return 0, ErrAcquireLock - } - defer r.mu.Unlock() - if err := r.checkWriteErr(); err != nil { - return 0, err - } - - n, err = r.write(p) - if r.block && n > 0 { - r.writeCond.Broadcast() - } - return n, r.setErr(err, true) -} - -func (r *RingBuffer) write(p []byte) (n int, err error) { - // In overwrite mode, we always allow writing by discarding old data - if r.overwrite && len(p) > 0 { - var avail int - if r.w == r.r && r.isFull { - // Buffer is full, no space available - avail = 0 - } else if r.w >= r.r { - avail = r.size - r.w + r.r - } else { - avail = r.r - r.w - } - - // If we need more space than available, discard old data - if len(p) > avail { - // Advance read pointer to make room - needed := len(p) - avail - r.r = (r.r + needed) % r.size - // If buffer was full, it's no longer full after advancing read pointer - r.isFull = false - } - } - - if r.isFull { - return 0, ErrIsFull - } - - var avail int - if r.w >= r.r { - avail = r.size - r.w + r.r - } else { - avail = r.r - r.w - } - - if len(p) > avail { - err = ErrTooMuchDataToWrite - p = p[:avail] - } - n = len(p) - - if r.w >= r.r { - c1 := r.size - r.w - if c1 >= n { - copy(r.buf[r.w:], p) - r.w += n - } else { - copy(r.buf[r.w:], p[:c1]) - c2 := n - c1 - copy(r.buf[0:], p[c1:]) - r.w = c2 - } - } else { - copy(r.buf[r.w:], p) - r.w += n - } - - if r.w == r.size { - r.w = 0 - } - if r.w == r.r { - r.isFull = true - } - - return n, err -} - -// WriteByte writes one byte into buffer, and returns ErrIsFull if the buffer is full. -func (r *RingBuffer) WriteByte(c byte) error { - r.mu.Lock() - defer r.mu.Unlock() - if err := r.checkWriteErr(); err != nil { - return err - } - err := r.writeByte(c) - for err == ErrIsFull && r.block { - // Record the generation before waiting. If Reset() increments the generation, - // we will detect it after waking up and continue waiting. - myGen := r.generation - if !r.waitRead() { - return context.DeadlineExceeded - } - // If generation changed, Reset() happened while we were waiting. - // Go back to waiting - the buffer was reset. - if myGen != r.generation { - err = r.writeByte(c) - continue - } - err = r.setErr(r.writeByte(c), true) - } - if r.block && err == nil { - r.writeCond.Broadcast() - } - return err -} - -// TryWriteByte writes one byte into buffer without blocking. -// If it does not succeed to acquire the lock, it returns ErrAcquireLock. -func (r *RingBuffer) TryWriteByte(c byte) error { - ok := r.mu.TryLock() - if !ok { - return ErrAcquireLock - } - defer r.mu.Unlock() - if err := r.checkWriteErr(); err != nil { - return err - } - - err := r.writeByte(c) - if err == nil && r.block { - r.writeCond.Broadcast() - } - return err -} - -func (r *RingBuffer) writeByte(c byte) error { - if r.err != nil { - return r.err - } - if r.w == r.r && r.isFull { - // In overwrite mode, discard the oldest byte and write the new one - if r.overwrite { - r.r++ - if r.r == r.size { - r.r = 0 - } - } else { - return ErrIsFull - } - } - r.buf[r.w] = c - r.w++ - - if r.w == r.size { - r.w = 0 - } - if r.w == r.r { - r.isFull = true - } - - return nil -} - -// Length returns the number of bytes that can be read without blocking. -func (r *RingBuffer) Length() int { - r.mu.Lock() - defer r.mu.Unlock() - - if r.w == r.r { - if r.isFull { - return r.size - } - return 0 - } - - if r.w > r.r { - return r.w - r.r - } - - return r.size - r.r + r.w -} - -// Capacity returns the size of the underlying buffer. -func (r *RingBuffer) Capacity() int { - return r.size -} - -// Free returns the number of bytes that can be written without blocking. -func (r *RingBuffer) Free() int { - r.mu.Lock() - defer r.mu.Unlock() - - if r.w == r.r { - if r.isFull { - return 0 - } - return r.size - } - - if r.w < r.r { - return r.r - r.w - } - - return r.size - r.w + r.r -} - -// WriteString writes the contents of the string s to buffer, which accepts a slice of bytes. -func (r *RingBuffer) WriteString(s string) (n int, err error) { - x := (*[2]uintptr)(unsafe.Pointer(&s)) - h := [3]uintptr{x[0], x[1], x[1]} - buf := *(*[]byte)(unsafe.Pointer(&h)) - return r.Write(buf) -} - -// Bytes returns all available read bytes. -// It does not move the read pointer and only copy the available data. -// If the dst is big enough, it will be used as destination, -// otherwise a new buffer will be allocated. -func (r *RingBuffer) Bytes(dst []byte) []byte { - r.mu.Lock() - defer r.mu.Unlock() - getDst := func(n int) []byte { - if cap(dst) < n { - return make([]byte, n) - } - return dst[:n] - } - - if r.w == r.r { - if r.isFull { - buf := getDst(r.size) - copy(buf, r.buf[r.r:]) - copy(buf[r.size-r.r:], r.buf[:r.w]) - return buf - } - return nil - } - - if r.w > r.r { - buf := getDst(r.w - r.r) - copy(buf, r.buf[r.r:r.w]) - return buf - } - - n := r.size - r.r + r.w - buf := getDst(n) - - if r.r+n < r.size { - copy(buf, r.buf[r.r:r.r+n]) - } else { - c1 := r.size - r.r - copy(buf, r.buf[r.r:r.size]) - c2 := n - c1 - copy(buf[c1:], r.buf[0:c2]) - } - - return buf -} - -// IsFull returns true when the ringbuffer is full. -func (r *RingBuffer) IsFull() bool { - r.mu.Lock() - defer r.mu.Unlock() - - return r.isFull +// Pipe creates an asynchronous in-memory pipe from this RingBuffer. +// The buffer is switched to blocking mode. +func (r *RingBuffer) Pipe() (*PipeReader, *PipeWriter) { + return PipeFrom(r.Ring) } -// IsEmpty returns true when the ringbuffer is empty. -func (r *RingBuffer) IsEmpty() bool { - r.mu.Lock() - defer r.mu.Unlock() - - return !r.isFull && r.w == r.r -} - -// CloseWithError closes the writer; reads will return -// no bytes and the error err, or EOF if err is nil. -// -// CloseWithError never overwrites the previous error if it exists -// and always returns nil. -func (r *RingBuffer) CloseWithError(err error) { - if err == nil { - err = io.EOF - } - r.setErr(err, false) //nolint errcheck -} - -// CloseWriter closes the writer. -// Reads will return any remaining bytes and io.EOF. -func (r *RingBuffer) CloseWriter() { - r.setErr(io.EOF, false) //nolint errcheck -} - -// Flush waits for the buffer to be empty and fully read. -// If not blocking ErrIsNotEmpty will be returned if the buffer still contains data. -func (r *RingBuffer) Flush() error { - r.mu.Lock() - defer r.mu.Unlock() - for r.w != r.r || r.isFull { - err := r.readErr(true) - if err != nil { - if err == io.EOF { - err = nil - } - return err - } - if !r.block { - return ErrIsNotEmpty - } - if !r.waitRead() { - return context.DeadlineExceeded - } - } - - err := r.readErr(true) - if err == io.EOF { - return nil - } - return err -} - -// Reset the read pointer and writer pointer to zero. -func (r *RingBuffer) Reset() { - r.mu.Lock() - defer r.mu.Unlock() - - if r.block { - // In blocking mode, increment generation to invalidate current waiters. - // They will wake up, see the generation changed, and go back to waiting. - // This makes Reset() transparent to waiters - they continue waiting as if nothing happened. - // - // Note: We don't wait for wg.Wait() in blocking mode since waiters may - // block indefinitely. The generation mechanism ensures they handle the reset correctly. - r.generation++ - r.readCond.Broadcast() - r.writeCond.Broadcast() - r.r = 0 - r.w = 0 - r.err = nil - r.isFull = false - return - } - - // In non-blocking mode, set error to return immediately to any readers/writers. - r.setErr(ErrReset, true) //nolint errcheck - - // Unlock the mutex so readers/writers can finish. - r.mu.Unlock() - r.wg.Wait() - r.mu.Lock() - r.r = 0 - r.w = 0 - r.err = nil - r.isFull = false -} - -// WriteCloser returns a WriteCloser that writes to the ring buffer. +// WriteCloser returns an io.WriteCloser that writes to the ring buffer. // When the returned WriteCloser is closed, it will wait for all data to be read before returning. func (r *RingBuffer) WriteCloser() io.WriteCloser { - return &writeCloser{RingBuffer: r} -} - -type writeCloser struct { - *RingBuffer -} - -// Close provides a close method for the WriteCloser. -func (wc *writeCloser) Close() error { - wc.CloseWriter() - return wc.Flush() + return r.Ring.WriteCloser() } -// ReadCloser returns a io.ReadCloser that reads to the ring buffer. +// ReadCloser returns an io.ReadCloser that reads from the ring buffer. // When the returned ReadCloser is closed, ErrReaderClosed will be returned on any writes done afterwards. func (r *RingBuffer) ReadCloser() io.ReadCloser { - return &readCloser{RingBuffer: r} -} - -type readCloser struct { - *RingBuffer -} - -// Close provides a close method for the ReadCloser. -func (rc *readCloser) Close() error { - rc.CloseWithError(ErrReaderClosed) - err := rc.readErr(false) - if err == ErrReaderClosed { - err = nil - } - return err -} - -// Peek reads up to len(p) bytes into p without moving the read pointer. -func (r *RingBuffer) Peek(p []byte) (n int, err error) { - if len(p) == 0 { - return 0, r.readErr(false) - } - - r.mu.Lock() - defer r.mu.Unlock() - if err := r.readErr(true); err != nil { - return 0, err - } - - return r.peek(p) -} - -func (r *RingBuffer) peek(p []byte) (n int, err error) { - n = r.copyFromBuffer(p) - if n == 0 { - return 0, ErrIsEmpty - } - return n, r.readErr(true) + return r.Ring.ReadCloser() }