Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/cmd/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func TestBackupCreateCmdFunc(t *testing.T) {
})

err = backupCreateCmdFunc(cmd, []string{tempFile})
require.ErrorContains(t, err, "already exists")
require.ErrorIs(t, err, backupformat.ErrBackupUnresumable)
})

t.Run("derives backup file name from context if not provided", func(t *testing.T) {
Expand Down
62 changes: 54 additions & 8 deletions pkg/backupformat/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,21 +212,45 @@ type OcfFileEncoder struct {
*OcfEncoder
}

// ErrBackupAlreadyCompleted indicates a backup file is on disk and is marked
// complete (via the sidecar completion sentinel). Callers should refuse to
// overwrite it.
var ErrBackupAlreadyCompleted = errors.New("backup file already exists and is marked complete")

// ErrBackupUnresumable indicates a backup file exists but neither a resume
// cursor nor a completion sentinel is present. Either the previous run died
// before recording any progress, or the file was produced by an older zed
// version that did not write a completion marker. The file may be complete
// or partial; the encoder cannot tell. Delete the file to start fresh.
var ErrBackupUnresumable = errors.New("backup file has no resume cursor or completion marker; if it was produced by an older zed version it may already be complete — delete it to start over")

func (fe *OcfFileEncoder) lockFileName() string {
return fe.file.Name() + ".lock"
}

func (fe *OcfFileEncoder) doneFileName() string {
return fe.file.Name() + ".done"
}

func (fe *OcfFileEncoder) Cursor() (string, error) {
if fe.fileIsStream {
return "", errors.New("resume is not supported when streaming to stdout")
}
// A progress lockfile always indicates an in-progress backup that should
// resume from its cursor — even if a stale completion sentinel from a
// previous run is also present.
cursorBytes, err := os.ReadFile(fe.lockFileName())
if os.IsNotExist(err) {
return "", errors.New("completed backup file already exists")
} else if err != nil {
if err == nil {
return string(cursorBytes), nil
} else if !os.IsNotExist(err) {
return "", err
}
if _, err := os.Stat(fe.doneFileName()); err == nil {
return "", ErrBackupAlreadyCompleted
} else if !os.IsNotExist(err) {
return "", err
}
return string(cursorBytes), nil
return "", ErrBackupUnresumable
}

func NewFileEncoder(filename string) (e *OcfFileEncoder, existed bool, err error) {
Expand All @@ -243,6 +267,19 @@ func NewFileEncoder(filename string) (e *OcfFileEncoder, existed bool, err error
if err != nil {
return nil, backupExisted, fmt.Errorf("unable to open backup file: %w", err)
}
// When starting a fresh backup, remove any stale sidecar files from a
// previous backup at the same path. A stale .done would misreport the
// new run as already completed if it crashed before the first Append();
// a stale .lock would resume the new run from an old cursor pointing
// into the prior export's snapshot, silently skipping relationships.
if !backupExisted {
for _, sidecar := range []string{f.Name() + ".lock", f.Name() + ".done"} {
if rmErr := os.Remove(sidecar); rmErr != nil && !os.IsNotExist(rmErr) {
_ = f.Close()
return nil, backupExisted, fmt.Errorf("unable to clear stale sidecar %s: %w", sidecar, rmErr)
}
}
}
}

return &OcfFileEncoder{file: f, fileIsStream: isStream, OcfEncoder: &OcfEncoder{w: f}}, backupExisted, nil
Expand Down Expand Up @@ -285,19 +322,28 @@ func (fe *OcfFileEncoder) Close() error {
return errors.Join(fe.file.Sync(), fe.file.Close())
}

removeCompleted := func() error {
finalizeCompleted := func() error {
if fe.fileIsStream {
return nil
}
if fe.completed {
return os.Remove(fe.lockFileName())
if !fe.completed {
return nil
}
// Write the completion sentinel before removing the progress lockfile so
// that a crash between the two leaves the backup as resumable rather
// than as a misleading "completed" state.
if err := atomic.WriteFile(fe.doneFileName(), bytes.NewBuffer(nil)); err != nil {
return fmt.Errorf("failed to write completion sentinel: %w", err)
}
if err := os.Remove(fe.lockFileName()); err != nil && !os.IsNotExist(err) {
return err
}
return nil
}

return errors.Join(
safeClose(),
removeCompleted(),
finalizeCompleted(),
)
}

Expand Down
123 changes: 123 additions & 0 deletions pkg/backupformat/encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,129 @@ func TestFileEncoderStreamingToStdout(t *testing.T) {
require.Positive(t, info.Size(), "encoded data should have been written to stdout")
}

func TestOcfFileEncoder_CursorOnCompletedBackup(t *testing.T) {
tempDir := t.TempDir()
filename := filepath.Join(tempDir, "completed.zedbackup")

enc, existed, err := NewFileEncoder(filename)
require.NoError(t, err)
require.False(t, existed)
require.NoError(t, enc.WriteSchema("definition user {}", "tok"))
rel := &v1.Relationship{
Resource: &v1.ObjectReference{ObjectType: "doc", ObjectId: "1"},
Relation: "viewer",
Subject: &v1.SubjectReference{Object: &v1.ObjectReference{ObjectType: "user", ObjectId: "alice"}},
}
require.NoError(t, enc.Append(rel, "cursor-1"))
enc.MarkComplete()
require.NoError(t, enc.Close())

// Re-open the same filename: Cursor must report it as already completed,
// distinct from an incomplete/orphan file.
enc2, existed, err := NewFileEncoder(filename)
require.NoError(t, err)
require.True(t, existed)
t.Cleanup(func() { _ = enc2.Close() })

_, err = enc2.Cursor()
require.Error(t, err)
require.ErrorIs(t, err, ErrBackupAlreadyCompleted)
}

func TestOcfFileEncoder_CursorOnOrphanBackup(t *testing.T) {
tempDir := t.TempDir()
filename := filepath.Join(tempDir, "orphan.zedbackup")

// Reproduce the production failure: the previous run wrote the OCF header
// via WriteSchema but the export stream died before the first relationship
// Append(), so no lockfile was ever written.
enc, _, err := NewFileEncoder(filename)
require.NoError(t, err)
require.NoError(t, enc.WriteSchema("definition user {}", "tok"))
require.NoError(t, enc.Close())

require.NoFileExists(t, filename+".lock")
require.NoFileExists(t, filename+".done")

enc2, existed, err := NewFileEncoder(filename)
require.NoError(t, err)
require.True(t, existed)
t.Cleanup(func() { _ = enc2.Close() })

_, err = enc2.Cursor()
require.Error(t, err)
require.ErrorIs(t, err, ErrBackupUnresumable, "orphan file must not be reported as completed")
require.NotErrorIs(t, err, ErrBackupAlreadyCompleted)
}

func TestOcfFileEncoder_StaleDoneSentinelClearedOnFreshRun(t *testing.T) {
tempDir := t.TempDir()
filename := filepath.Join(tempDir, "reused.zedbackup")

// Simulate a prior completed backup at the same path, then the user
// deleted only the .zedbackup file (not the .done sentinel) before
// starting a fresh run.
require.NoError(t, os.WriteFile(filename+".done", nil, 0o644))
require.FileExists(t, filename+".done")

enc, existed, err := NewFileEncoder(filename)
require.NoError(t, err)
require.False(t, existed)
t.Cleanup(func() { _ = enc.Close() })

require.NoFileExists(t, filename+".done",
"stale completion sentinel must be cleared when starting a fresh backup")
}

func TestOcfFileEncoder_StaleLockfileClearedOnFreshRun(t *testing.T) {
tempDir := t.TempDir()
filename := filepath.Join(tempDir, "reused.zedbackup")

// Simulate a prior failed backup at the same path: a .lock with a stale
// cursor was left behind. The user deleted the .zedbackup but did not
// know about the .lock sidecar. Without cleanup, a subsequent fresh run
// that itself fails before the first Append() would resume from this
// stale cursor against an unrelated snapshot — silently skipping rows.
staleCursor := "stale-cursor-from-prior-snapshot"
require.NoError(t, os.WriteFile(filename+".lock", []byte(staleCursor), 0o644))
require.FileExists(t, filename+".lock")

enc, existed, err := NewFileEncoder(filename)
require.NoError(t, err)
require.False(t, existed)
t.Cleanup(func() { _ = enc.Close() })

require.NoFileExists(t, filename+".lock",
"stale lockfile must be cleared when starting a fresh backup")
}

func TestOcfFileEncoder_CursorOnInterruptedBackup(t *testing.T) {
tempDir := t.TempDir()
filename := filepath.Join(tempDir, "resumable.zedbackup")

enc, _, err := NewFileEncoder(filename)
require.NoError(t, err)
require.NoError(t, enc.WriteSchema("definition user {}", "tok"))
rel := &v1.Relationship{
Resource: &v1.ObjectReference{ObjectType: "doc", ObjectId: "1"},
Relation: "viewer",
Subject: &v1.SubjectReference{Object: &v1.ObjectReference{ObjectType: "user", ObjectId: "alice"}},
}
require.NoError(t, enc.Append(rel, "cursor-mid-export"))
// Simulate a mid-export failure: don't call MarkComplete; close the file but
// the lockfile is left behind because completed == false.
require.NoError(t, enc.Close())

enc2, existed, err := NewFileEncoder(filename)
require.NoError(t, err)
require.True(t, existed)
t.Cleanup(func() { _ = enc2.Close() })

cursor, err := enc2.Cursor()
require.NoError(t, err)
require.Equal(t, "cursor-mid-export", cursor)
}

func TestWithProgress(t *testing.T) {
tests := []struct {
name string
Expand Down
Loading