Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions .github/workflows/go.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,35 +13,34 @@ jobs:
- uses: actions/checkout@v4

- name: Set up Go
uses: actions/setup-go@v4
uses: actions/setup-go@v5
with:
go-version: 'stable'
go-version: '1.23.x'

- name: Build
run: go build -v ./...

test:
runs-on: ubuntu-latest
environment: ci
steps:
- uses: actions/checkout@v4

- name: Set up Go
uses: actions/setup-go@v4
uses: actions/setup-go@v5
with:
go-version: 'stable'
go-version: '1.23.x'

- name: Run tests with coverage
run: go test -race -coverprofile=coverage.out -covermode=atomic ./...

- name: Upload coverage to Codecov
uses: codecov/codecov-action@v4-beta
uses: codecov/codecov-action@v5
with:
token: ${{ secrets.CODECOV_TOKEN }}
file: coverage.out
files: coverage.out

- name: Coveralls
uses: coverallsapp/github-action@v2
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
file: coverage.out
file: coverage.out
21 changes: 0 additions & 21 deletions .travis.yml

This file was deleted.

4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ install-tools:
## docs: Generate documentation
docs:
@echo "$(COLOR_BLUE)Generating documentation...$(COLOR_RESET)"
@echo "Open http://localhost:6060/pkg/github.com/smallnest/ringbuffer/ in your browser"
@echo "Open http://localhost:6060/pkg/github.com/argcv/ringbuffer/ in your browser"
godoc -http=:6060

## example: Run example
Expand Down Expand Up @@ -168,6 +168,6 @@ version:
info:
@printf "$(COLOR_BOLD)Project Information$(COLOR_RESET)\n"
@printf " Name: RingBuffer\n"
@printf " Module: github.com/smallnest/ringbuffer\n"
@printf " Module: github.com/argcv/ringbuffer\n"
@printf " Go Version: $$($(GOCMD) version | cut -d' ' -f3)\n"
@printf " Files: $$($(GOCMD) list ./... | wc -l | tr -d ' ')\n"
186 changes: 111 additions & 75 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,20 +1,45 @@
# ringbuffer

[![License](https://img.shields.io/:license-MIT-blue.svg)](https://opensource.org/licenses/MIT) [![GoDoc](https://godoc.org/github.com/smallnest/ringbuffer?status.png)](http://godoc.org/github.com/smallnest/ringbuffer) [![Go Report Card](https://goreportcard.com/badge/github.com/smallnest/ringbuffer)](https://goreportcard.com/report/github.com/smallnest/ringbuffer) [![coveralls](https://coveralls.io/repos/smallnest/ringbuffer/badge.svg?branch=master&service=github)](https://coveralls.io/github/smallnest/ringbuffer?branch=master)
[![License](https://img.shields.io/:license-MIT-blue.svg)](https://opensource.org/licenses/MIT) [![GoDoc](https://godoc.org/github.com/argcv/ringbuffer?status.png)](http://godoc.org/github.com/argcv/ringbuffer) [![Go Report Card](https://goreportcard.com/badge/github.com/argcv/ringbuffer)](https://goreportcard.com/report/github.com/argcv/ringbuffer)

A circular buffer (ring buffer) in Go, implemented io.ReaderWriter interface
A circular buffer (ring buffer) in Go, now with **generics** support and modernized for Go 1.23+.

[![wikipedia](Circular_Buffer_Animation.gif)](https://github.com/smallnest/ringbuffer)
This is a hard fork of [smallnest/ringbuffer](https://github.com/smallnest/ringbuffer) with breaking changes to adopt generics and modern Go features.

# Usage
[![wikipedia](Circular_Buffer_Animation.gif)](https://github.com/argcv/ringbuffer)

## Requirements

- Go 1.23+

## Features

- **Generic API**: `RingBuffer[T any]` supports arbitrary types, not just bytes.
- **Standard library compatibility**: `*RingBuffer[byte]` implements `io.Reader`, `io.Writer`, `io.Closer`, `io.ByteReader`, `io.ByteWriter`.
- **Blocking and non-blocking modes**: Configurable via `SetBlocking(true)`.
- **Overwrite mode**: `SetOverwrite(true)` allows overwriting oldest data when full.
- **Iterator support** (Go 1.23+): `All()` returns `iter.Seq[T]` for zero-allocation traversal.
- **Zero-copy string writing**: Modern `unsafe` usage for `WriteString`.
- **Goroutine-safe**: All operations are protected by `sync.Mutex`.
- **No external dependencies**.

## Installation

```bash
go get github.com/argcv/ringbuffer
```

## Usage

### Byte buffer (backward compatible)

```go
package main

import (
"fmt"

"github.com/smallnest/ringbuffer"
"github.com/argcv/ringbuffer"
)

func main() {
Expand All @@ -32,100 +57,76 @@ func main() {
}
```

It is possible to use an existing buffer with by replacing `New` with `NewBuffer`.


# Blocking vs Non-blocking

The default behavior of the ring buffer is non-blocking,
meaning that reads and writes will return immediately with an error if the operation cannot be completed.
If you want to block when reading or writing, you must enable it:
### Generic ring buffer

```go
rb := ringbuffer.New(1024).SetBlocking(true)
```

Enabling blocking will cause the ring buffer to behave like a buffered [io.Pipe](https://pkg.go.dev/io#Pipe).

Regular Reads will block until data is available, but not wait for a full buffer.
Writes will block until there is space available and writes bigger than the buffer will wait for reads to make space.

`TryRead` and `TryWrite` are still available for non-blocking reads and writes.
package main

To signify the end of the stream, close the ring buffer from the writer side with `rb.CloseWriter()`
import (
"fmt"

Either side can use `rb.CloseWithError(err error)` to signal an error and close the ring buffer.
Any reads or writes will return the error on next call.
"github.com/argcv/ringbuffer"
)

In blocking mode errors are stateful and the same error will be returned until `rb.Reset()` is called.
func main() {
rb := ringbuffer.NewGeneric[int](1024)

It is possible to set a deadline for blocking Read/Write operations using `rb.WithDeadline(time.Duration)`.
// write
rb.Write([]int{1, 2, 3, 4})
fmt.Println(rb.Length())

# io.Copy replacement
// read
buf := make([]int, 4)
rb.Read(buf)
fmt.Println(buf)

The ring buffer can replace `io.Copy` and `io.CopyBuffer` to do async copying through the ring buffer.
// iterate without consuming
for v := range rb.All() {
fmt.Println(v)
}
}
```

The copy operation will happen directly on the buffer, so between reads and writes there is no memory copy.
### Blocking vs Non-blocking

Here is a simple example where the copy operation is replaced by a ring buffer:
The default behavior is non-blocking. Enable blocking to make it behave like a buffered `io.Pipe`:

```go
func saveWebsite(url, file string) {
in, _ := http.Get(url)
out, _ := os.Create(file)
rb := ringbuffer.New(1024).SetBlocking(true)
```

// Copy with regular buffered copy
// n, err := io.Copy(out, in.Body)
### Overwrite mode

// Copy with ring buffer
n, err := ringbuffer.New(1024).Copy(out, in.Body)
fmt.Println(n, err)
}
```go
rb := ringbuffer.New(64).SetOverwrite(true)
rb.Write([]byte("fill the buffer"))
rb.Write([]byte("new data")) // oldest data is discarded
```

The ring buffer implements `io.ReaderFrom` and `io.WriterTo` interfaces, which allows to fill either or both
the write and read side respectively.

This will provide an async method for writing or reading directly into the ring buffer.
These functions require that "blocking" is set on the pipe.
### io.Copy replacement

Example:
`Copy` is now a package-level function that takes a `*RingBuffer[byte]`:

```go
func readWebsite(url string) io.ReadCloser {
func saveWebsite(url, file string) {
in, _ := http.Get(url)
out, _ := os.Create(file)

// Create blocking ring buffer
ring := ringbuffer.New(1024).SetBlocking(true)

// Read from the input in a goroutine into the ring buffer
go func() {
ring.ReadFrom(in.Body)
ring.CloseWriter()
}()
return ring.ReadCloser()
// Copy with ring buffer
rb := ringbuffer.New(1024)
n, err := ringbuffer.Copy(rb, out, in.Body)
fmt.Println(n, err)
}
```

# io.Pipe replacement

The ring buffer can be used as a compatible, but *asynchronous* replacement of `io.Pipe`.

That means that Reads and Writes will go to the ring buffer.
Writes will complete as long as the data fits within the ring buffer.

Reads will attempt to satisfy reads with data from the ring buffer.
The read will only block if the ring buffer is empty.
### io.Pipe replacement

In the common case, where the Read and Write side can run concurrently,
it is safe to replace `io.Pipe()` with `(*Ringbuffer).Pipe()`.

Compare the following to the [io.Pipe example](https://pkg.go.dev/io#example-Pipe):
`Pipe` is now a package-level function:

```go
func main() {
// Create pipe from a 4KB ring buffer.
r, w := ringbuffer.New(4 << 10).Pipe()
r, w := ringbuffer.Pipe(4 << 10)

go func() {
fmt.Fprint(w, "some io.Reader stream to be read\n")
Expand All @@ -138,10 +139,45 @@ func main() {
}
```

When creating the pipe, the ring buffer is internally switched to blocking mode.
### Iterator (Go 1.23+)

```go
rb := ringbuffer.NewGeneric[string](10)
rb.Write([]string{"a", "b", "c"})

for s := range rb.All() {
fmt.Println(s)
}
```

## API Changes from upstream

This fork contains **breaking changes**:

| Before (upstream) | After (this fork) |
|---|---|
| `RingBuffer` (byte only) | `RingBuffer[T any]` (generic) |
| `New(size int) *RingBuffer` | `New(size int) *RingBuffer[byte]` |
| `NewBuffer(b []byte) *RingBuffer` | `NewBuffer(b []byte) *RingBuffer[byte]` |
| `NewGeneric[T](size int) *RingBuffer[T]` | **New** |
| `NewFromSlice[T](b []T) *RingBuffer[T]` | **New** |
| `rb.WriteString(s)` | `WriteString(rb, s)` (function) |
| `rb.ReadFrom(rd)` | `ReadFrom(rb, rd)` (function) |
| `rb.WriteTo(w)` | `WriteTo(rb, w)` (function) |
| `rb.Copy(dst, src)` | `Copy(rb, dst, src)` (function) |
| `rb.Pipe()` | `Pipe(size int)` or `PipeFrom(rb)` (functions) |
| `rb.All()` | **New** `iter.Seq[T]` iterator |
| `rb.Reset()` | Now clears underlying buffer with `clear()` |
| `rb.WithCancel(ctx)` | Goroutine leak fixed; exits cleanly on `Close`/`Reset` |

## Performance

Run benchmarks with:

```bash
make benchmark
```

Error reporting on Close and CloseWithError functions is similar to `io.Pipe`.
## License

It is possible to use the original ring buffer alongside the pipe functions.
So for example it is possible to "seed" the ring buffer with data,
so reads can complete at once.
MIT
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module github.com/smallnest/ringbuffer
module github.com/argcv/ringbuffer

go 1.19
go 1.23.0
14 changes: 10 additions & 4 deletions pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,20 @@ import "io"
// Only if the ring buffer is empty will the read block.
//
// It is safe (and intended) to call Read and Write in parallel with each other or with Close.
func (r *RingBuffer) Pipe() (*PipeReader, *PipeWriter) {
func Pipe(size int) (*PipeReader, *PipeWriter) {
return PipeFrom(NewRing[byte](size))
}

// PipeFrom creates a pipe from an existing Ring[byte].
// The buffer will be switched to blocking mode.
func PipeFrom(r *Ring[byte]) (*PipeReader, *PipeWriter) {
r.SetBlocking(true)
pr := PipeReader{pipe: r}
return &pr, &PipeWriter{pipe: r}
}

// A PipeReader is the read half of a pipe.
type PipeReader struct{ pipe *RingBuffer }
type PipeReader struct{ pipe *Ring[byte] }

// Read implements the standard Read interface:
// it reads data from the pipe, blocking until a writer
Expand Down Expand Up @@ -55,13 +61,13 @@ func (r *PipeReader) CloseWithError(err error) error {
}

// A PipeWriter is the write half of a pipe.
type PipeWriter struct{ pipe *RingBuffer }
type PipeWriter struct{ pipe *Ring[byte] }

// Write implements the standard Write interface:
// it writes data to the pipe.
// The Write will block until all data has been written to the ring buffer.
// If the read end is closed with an error, that err is
// returned as err; otherwise err is [io.ErrClosedPipe].
// returned as err; otherwise it is [io.ErrClosedPipe].
func (w *PipeWriter) Write(data []byte) (n int, err error) {
if n, err = w.pipe.Write(data); err == ErrWriteOnClosed {
// Replace error.
Expand Down
Loading
Loading