diff --git a/iavl/internal/changeset_files.go b/iavl/internal/changeset_files.go new file mode 100644 index 0000000000..4273e572cd --- /dev/null +++ b/iavl/internal/changeset_files.go @@ -0,0 +1,313 @@ +package internal + +import ( + "errors" + "fmt" + "os" + "path/filepath" + "strconv" + "strings" +) + +// ChangesetFiles encapsulates management of changeset files. +// This type is shared between the Changeset and ChangesetWriter types. +type ChangesetFiles struct { + dir string + treeDir string + startVersion uint32 + compactedAt uint32 + + kvDataFile *os.File + branchesFile *os.File + leavesFile *os.File + versionsFile *os.File + orphansFile *os.File + infoFile *os.File + info *ChangesetInfo + + closed bool +} + +// CreateChangesetFiles creates a new changeset directory and files that are ready to be written to. +// If compactedAt is 0, the changeset is considered original and uncompacted. +// If compactedAt is greater than 0, the changeset is considered compacted and a pending marker file +// will be created to indicate that the changeset is not yet ready for use. +// This pending marker file must be removed once the compaction is fully complete by calling MarkReady, +// otherwise the changeset will be considered incomplete and deleted at the next startup. +func CreateChangesetFiles(treeDir string, startVersion, compactedAt uint32) (*ChangesetFiles, error) { + // ensure absolute path + var err error + treeDir, err = filepath.Abs(treeDir) + if err != nil { + return nil, fmt.Errorf("failed to get absolute path for %s: %w", treeDir, err) + } + + dirName := fmt.Sprintf("%d", startVersion) + if compactedAt > 0 { + dirName = fmt.Sprintf("%d.%d", startVersion, compactedAt) + } + dir := filepath.Join(treeDir, dirName) + + err = os.MkdirAll(dir, 0o755) + if err != nil { + return nil, fmt.Errorf("failed to create changeset dir: %w", err) + } + + // create pending marker file for compacted changesets + if compactedAt > 0 { + err := os.WriteFile(pendingFilename(dir), []byte{}, 0o600) + if err != nil { + return nil, fmt.Errorf("failed to create pending marker file for compacted changeset: %w", err) + } + } + + cr := &ChangesetFiles{ + dir: dir, + treeDir: treeDir, + startVersion: startVersion, + compactedAt: compactedAt, + } + + err = cr.open(writeModeFlags) + if err != nil { + return nil, fmt.Errorf("failed to open changeset files: %w", err) + } + return cr, nil +} + +const writeModeFlags = os.O_RDWR | os.O_CREATE | os.O_APPEND + +// OpenChangesetFiles opens an existing changeset directory and files. +// All files are opened in readonly mode, except for orphans.dat and info.dat which are opened in read-write mode +// to track orphan data and statistics. +func OpenChangesetFiles(dirName string) (*ChangesetFiles, error) { + startVersion, compactedAt, valid := ParseChangesetDirName(filepath.Base(dirName)) + if !valid { + return nil, fmt.Errorf("invalid changeset dir name: %s", dirName) + } + + dir, err := filepath.Abs(dirName) + if err != nil { + return nil, fmt.Errorf("failed to get absolute path for %s: %w", dirName, err) + } + + treeDir := filepath.Dir(dir) + + cr := &ChangesetFiles{ + dir: dir, + treeDir: treeDir, + startVersion: startVersion, + compactedAt: compactedAt, + } + + err = cr.open(os.O_RDONLY) + if err != nil { + return nil, fmt.Errorf("failed to open changeset files: %w", err) + } + + return cr, nil +} + +func (cr *ChangesetFiles) open(mode int) error { + var err error + + kvFile := filepath.Join(cr.dir, "kv.dat") + cr.kvDataFile, err = os.OpenFile(kvFile, mode, 0o600) + if err != nil { + return fmt.Errorf("failed to open KV data file: %w", err) + } + + leavesPath := filepath.Join(cr.dir, "leaves.dat") + cr.leavesFile, err = os.OpenFile(leavesPath, mode, 0o600) + if err != nil { + return fmt.Errorf("failed to open leaves data file: %w", err) + } + + branchesPath := filepath.Join(cr.dir, "branches.dat") + cr.branchesFile, err = os.OpenFile(branchesPath, mode, 0o600) + if err != nil { + return fmt.Errorf("failed to open branches data file: %w", err) + } + + versionsPath := filepath.Join(cr.dir, "versions.dat") + cr.versionsFile, err = os.OpenFile(versionsPath, mode, 0o600) + if err != nil { + return fmt.Errorf("failed to open versions data file: %w", err) + } + + orphansPath := filepath.Join(cr.dir, "orphans.dat") + cr.orphansFile, err = os.OpenFile(orphansPath, writeModeFlags, 0o600) // the orphans file is always opened for writing + if err != nil { + return fmt.Errorf("failed to open orphans data file: %w", err) + } + + infoPath := filepath.Join(cr.dir, "info.dat") + cr.infoFile, err = os.OpenFile(infoPath, os.O_RDWR|os.O_CREATE, 0o600) // info file uses random access, not append + if err != nil { + return fmt.Errorf("failed to open changeset info file: %w", err) + } + + cr.info, err = ReadChangesetInfo(cr.infoFile) + if err != nil { + return fmt.Errorf("failed to read changeset info: %w", err) + } + + return nil +} + +// ParseChangesetDirName parses a changeset directory name and returns the start version and compacted at version. +// If the directory name is invalid, valid will be false. +// If a changeset is original and uncompacted, compactedAt will be 0. +func ParseChangesetDirName(dirName string) (startVersion, compactedAt uint32, valid bool) { + var err error + var v uint64 + // if no dot, it's an original changeset + if !strings.Contains(dirName, ".") { + v, err = strconv.ParseUint(dirName, 10, 32) + if err != nil { + return 0, 0, false + } + return uint32(v), 0, true + } + + parts := strings.Split(dirName, ".") + if len(parts) != 2 { + return 0, 0, false + } + + v, err = strconv.ParseUint(parts[0], 10, 32) + if err != nil { + return 0, 0, false + } + startVersion = uint32(v) + + v, err = strconv.ParseUint(parts[1], 10, 32) + if err != nil { + return 0, 0, false + } + compactedAt = uint32(v) + + return startVersion, compactedAt, true +} + +// Dir returns the changeset directory path. +func (cr *ChangesetFiles) Dir() string { + return cr.dir +} + +// TreeDir returns the parent tree directory path. +func (cr *ChangesetFiles) TreeDir() string { + return cr.treeDir +} + +// KVDataFile returns the kv.dat file handle. +func (cr *ChangesetFiles) KVDataFile() *os.File { + return cr.kvDataFile +} + +// BranchesFile returns the branches.dat file handle. +func (cr *ChangesetFiles) BranchesFile() *os.File { + return cr.branchesFile +} + +// LeavesFile returns the leaves.dat file handle. +func (cr *ChangesetFiles) LeavesFile() *os.File { + return cr.leavesFile +} + +// VersionsFile returns the versions.dat file handle. +func (cr *ChangesetFiles) VersionsFile() *os.File { + return cr.versionsFile +} + +// OrphansFile returns the orphans.dat file handle. +func (cr *ChangesetFiles) OrphansFile() *os.File { + return cr.orphansFile +} + +// Info returns the changeset info struct. +// This struct is writeable and changes will be persisted to disk when RewriteInfo is called. +func (cr *ChangesetFiles) Info() *ChangesetInfo { + return cr.info +} + +// RewriteInfo rewrites the changeset info file with the current info struct. +func (cr *ChangesetFiles) RewriteInfo() error { + return RewriteChangesetInfo(cr.infoFile, cr.info) +} + +// StartVersion returns the start version of the changeset. +func (cr *ChangesetFiles) StartVersion() uint32 { + return cr.startVersion +} + +// CompactedAtVersion returns the compacted at version of the changeset. +// If the changeset is original and uncompacted, this will be 0. +func (cr *ChangesetFiles) CompactedAtVersion() uint32 { + return cr.compactedAt +} + +func pendingFilename(dir string) string { + return filepath.Join(dir, "pending") +} + +// IsChangesetReady checks if the changeset is ready to be used. +// A changeset is considered ready if the pending marker file does not exist. +// This is used by startup code only to detect whether a changeset compaction was interrupted before it could complete. +func IsChangesetReady(dir string) (bool, error) { + pendingPath := pendingFilename(dir) + _, err := os.Stat(pendingPath) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return true, nil + } + return false, fmt.Errorf("failed to stat pending marker file: %w", err) + } + return false, nil +} + +// MarkReady marks the changeset as ready by removing the pending marker file. +// This is only necessary for compacted changesets. +func (cr *ChangesetFiles) MarkReady() error { + pendingPath := pendingFilename(cr.dir) + err := os.Remove(pendingPath) + if err != nil && !errors.Is(err, os.ErrNotExist) { + return fmt.Errorf("failed to remove pending marker file: %w", err) + } + return nil +} + +// Close closes all changeset files. +func (cr *ChangesetFiles) Close() error { + if cr.closed { + return nil + } + + cr.closed = true + err := errors.Join( + cr.kvDataFile.Close(), + cr.branchesFile.Close(), + cr.leavesFile.Close(), + cr.versionsFile.Close(), + cr.orphansFile.Close(), + cr.infoFile.Close(), + ) + cr.info = nil + return err +} + +// DeleteFiles deletes all changeset files and the changeset directory. +// If the files were not already closed, they will be closed first. +func (cr *ChangesetFiles) DeleteFiles() error { + return errors.Join( + cr.Close(), // first close all files + os.Remove(cr.infoFile.Name()), + os.Remove(cr.leavesFile.Name()), + os.Remove(cr.branchesFile.Name()), + os.Remove(cr.versionsFile.Name()), + os.Remove(cr.orphansFile.Name()), + os.Remove(cr.kvDataFile.Name()), + cr.MarkReady(), // remove pending marker file if it exists + os.Remove(cr.dir), + ) +} diff --git a/iavl/internal/changeset_files_test.go b/iavl/internal/changeset_files_test.go new file mode 100644 index 0000000000..6ad187f968 --- /dev/null +++ b/iavl/internal/changeset_files_test.go @@ -0,0 +1,210 @@ +package internal + +import ( + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestParseChangesetDirName(t *testing.T) { + tests := []struct { + name string + dirName string + wantStart uint32 + wantCompactedAt uint32 + wantValid bool + }{ + { + name: "uncompacted", + dirName: "100", + wantStart: 100, + wantCompactedAt: 0, + wantValid: true, + }, + { + name: "compacted", + dirName: "100.200", + wantStart: 100, + wantCompactedAt: 200, + wantValid: true, + }, + { + name: "invalid - not a number", + dirName: "abc", + wantValid: false, + }, + { + name: "invalid - too many dots", + dirName: "1.2.3", + wantValid: false, + }, + { + name: "invalid - empty", + dirName: "", + wantValid: false, + }, + { + name: "invalid - overflow", + dirName: "5000000000", + wantValid: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + start, compactedAt, valid := ParseChangesetDirName(tt.dirName) + require.Equal(t, tt.wantValid, valid) + if valid { + require.Equal(t, tt.wantStart, start) + require.Equal(t, tt.wantCompactedAt, compactedAt) + } + }) + } +} + +func TestCreateChangesetFiles_Uncompacted(t *testing.T) { + treeDir := t.TempDir() + + cf, err := CreateChangesetFiles(treeDir, 100, 0) + require.NoError(t, err) + defer cf.Close() + + // Check directory created + require.Equal(t, filepath.Join(treeDir, "100"), cf.Dir()) + require.Equal(t, uint32(100), cf.StartVersion()) + require.Equal(t, uint32(0), cf.CompactedAtVersion()) + + // Uncompacted should be ready immediately (no pending marker) + ready, err := IsChangesetReady(cf.Dir()) + require.NoError(t, err) + require.True(t, ready) + + // All files should exist + require.NotNil(t, cf.KVDataFile()) + require.NotNil(t, cf.BranchesFile()) + require.NotNil(t, cf.LeavesFile()) + require.NotNil(t, cf.VersionsFile()) + require.NotNil(t, cf.OrphansFile()) + require.NotNil(t, cf.Info()) +} + +func TestCreateChangesetFiles_Compacted(t *testing.T) { + treeDir := t.TempDir() + + cf, err := CreateChangesetFiles(treeDir, 100, 200) + require.NoError(t, err) + defer cf.Close() + + // Check directory name includes compactedAt + require.Equal(t, filepath.Join(treeDir, "100.200"), cf.Dir()) + require.Equal(t, uint32(100), cf.StartVersion()) + require.Equal(t, uint32(200), cf.CompactedAtVersion()) + + // Compacted should NOT be ready until MarkReady called + ready, err := IsChangesetReady(cf.Dir()) + require.NoError(t, err) + require.False(t, ready) + + // Mark ready + err = cf.MarkReady() + require.NoError(t, err) + + // Now should be ready + ready, err = IsChangesetReady(cf.Dir()) + require.NoError(t, err) + require.True(t, ready) +} + +func TestChangesetFiles_OpenExisting(t *testing.T) { + treeDir := t.TempDir() + + // Create and close + cf, err := CreateChangesetFiles(treeDir, 100, 0) + require.NoError(t, err) + + // Modify info + cf.Info().StartVersion = 100 + cf.Info().EndVersion = 150 + cf.Info().LeafOrphans = 42 + + // Persist info + require.NoError(t, cf.RewriteInfo()) + require.NoError(t, cf.Close()) + + // Reopen + cf2, err := OpenChangesetFiles(filepath.Join(treeDir, "100")) + require.NoError(t, err) + defer cf2.Close() + + require.Equal(t, uint32(100), cf2.StartVersion()) + require.Equal(t, uint32(0), cf2.CompactedAtVersion()) + + // Info should be persisted + require.Equal(t, uint32(100), cf2.Info().StartVersion) + require.Equal(t, uint32(150), cf2.Info().EndVersion) + require.Equal(t, uint32(42), cf2.Info().LeafOrphans) +} + +func TestChangesetFiles_DeleteFiles(t *testing.T) { + tests := []struct { + name string + startVersion uint32 + compactedAt uint32 + }{ + { + name: "uncompacted", + startVersion: 100, + }, + { + name: "compacted", + startVersion: 5, + compactedAt: 10, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + treeDir := t.TempDir() + + cf, err := CreateChangesetFiles(treeDir, tt.startVersion, tt.compactedAt) + require.NoError(t, err) + + dir := cf.Dir() + + // Directory should exist + _, err = os.Stat(dir) + require.NoError(t, err) + + // Delete + err = cf.DeleteFiles() + require.NoError(t, err) + + // Directory should be gone + _, err = os.Stat(dir) + require.True(t, os.IsNotExist(err)) + }) + } +} + +func TestChangesetFiles_CloseIdempotent(t *testing.T) { + treeDir := t.TempDir() + + cf, err := CreateChangesetFiles(treeDir, 100, 0) + require.NoError(t, err) + + // Close multiple times should not error + require.NoError(t, cf.Close()) + require.NoError(t, cf.Close()) +} + +func TestOpenChangesetFiles_InvalidDir(t *testing.T) { + treeDir := t.TempDir() + + // Create a directory with invalid name + invalidDir := filepath.Join(treeDir, "not-a-version") + require.NoError(t, os.MkdirAll(invalidDir, 0o755)) + + _, err := OpenChangesetFiles(invalidDir) + require.Error(t, err) + require.Contains(t, err.Error(), "invalid changeset dir name") +} diff --git a/iavl/internal/changeset_info.go b/iavl/internal/changeset_info.go new file mode 100644 index 0000000000..12c6ad6379 --- /dev/null +++ b/iavl/internal/changeset_info.go @@ -0,0 +1,63 @@ +package internal + +import ( + "fmt" + "io" + "os" + "unsafe" +) + +// ChangesetInfo holds metadata about a changeset. +// This mainly tracks the start and end version of the changeset and also contains statistics about orphans in the +// changeset so that compaction can be efficiently scheduled. +// Currently, the orphan statistics track how many total leaf and branch orphans there are as well as the total sum +// of their orphan versions. +// This should give us some heuristics as to what percentage of the changeset is composed of +// orphans and roughly how long ago they were orphaned. +type ChangesetInfo struct { + // StartVersion is the first version included in the changeset. + StartVersion uint32 + // EndVersion is the last version included in the changeset. + EndVersion uint32 + + // LeafOrphans is the number of leaf orphan nodes in the changeset. + LeafOrphans uint32 + // BranchOrphans is the number of branch orphan nodes in the changeset. + BranchOrphans uint32 + + // LeafOrphanVersionTotal is the sum of the orphan versions of all orphaned leaf nodes in the changeset. + LeafOrphanVersionTotal uint64 + // BranchOrphanVersionTotal is the sum of the orphan versions of all orphaned branch nodes in the changeset. + BranchOrphanVersionTotal uint64 +} + +// RewriteChangesetInfo rewrites the info file with the given changeset info. +// This method is okay to call the first time the file is created as well. +func RewriteChangesetInfo(file *os.File, info *ChangesetInfo) error { + data := unsafe.Slice((*byte)(unsafe.Pointer(info)), int(unsafe.Sizeof(*info))) + if _, err := file.WriteAt(data, 0); err != nil { + return fmt.Errorf("failed to write changeset info: %w", err) + } + + return nil +} + +// ReadChangesetInfo reads changeset info from a file. It returns an empty default struct if file is empty. +func ReadChangesetInfo(file *os.File) (*ChangesetInfo, error) { + var info ChangesetInfo + size := int(unsafe.Sizeof(info)) + data := unsafe.Slice((*byte)(unsafe.Pointer(&info)), size) + + n, err := file.ReadAt(data, 0) + if err == io.EOF && n == 0 { + return &ChangesetInfo{}, nil // empty file + } + if err != nil && err != io.EOF { + return nil, fmt.Errorf("failed to read changeset info: %w", err) + } + if n != size { + return nil, fmt.Errorf("info file has unexpected size: %d, expected %d", n, size) + } + + return &info, nil +} diff --git a/iavl/internal/changeset_info_test.go b/iavl/internal/changeset_info_test.go new file mode 100644 index 0000000000..453fe0adb7 --- /dev/null +++ b/iavl/internal/changeset_info_test.go @@ -0,0 +1,96 @@ +package internal + +import ( + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestChangesetInfo_RoundTrip(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "info.dat") + + file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0o600) + require.NoError(t, err) + defer file.Close() + + original := &ChangesetInfo{ + StartVersion: 100, + EndVersion: 200, + LeafOrphans: 50, + BranchOrphans: 25, + LeafOrphanVersionTotal: 5000, + BranchOrphanVersionTotal: 2500, + } + + err = RewriteChangesetInfo(file, original) + require.NoError(t, err) + + read, err := ReadChangesetInfo(file) + require.NoError(t, err) + + require.Equal(t, original.StartVersion, read.StartVersion) + require.Equal(t, original.EndVersion, read.EndVersion) + require.Equal(t, original.LeafOrphans, read.LeafOrphans) + require.Equal(t, original.BranchOrphans, read.BranchOrphans) + require.Equal(t, original.LeafOrphanVersionTotal, read.LeafOrphanVersionTotal) + require.Equal(t, original.BranchOrphanVersionTotal, read.BranchOrphanVersionTotal) +} + +func TestChangesetInfo_RewriteOverwrites(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "info.dat") + + file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0o600) + require.NoError(t, err) + defer file.Close() + + // Write first version + v1 := &ChangesetInfo{StartVersion: 1, EndVersion: 1} + err = RewriteChangesetInfo(file, v1) + require.NoError(t, err) + + // Overwrite with second version + v2 := &ChangesetInfo{StartVersion: 1, EndVersion: 2} + err = RewriteChangesetInfo(file, v2) + require.NoError(t, err) + + // Read should return second version + read, err := ReadChangesetInfo(file) + require.NoError(t, err) + require.Equal(t, uint32(1), read.StartVersion) + require.Equal(t, uint32(2), read.EndVersion) +} + +func TestChangesetInfo_ReadEmptyFile(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "info.dat") + + file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0o600) + require.NoError(t, err) + defer file.Close() + + // Reading empty file should return default struct + info, err := ReadChangesetInfo(file) + require.NoError(t, err) + require.Equal(t, &ChangesetInfo{}, info) +} + +func TestChangesetInfo_ReadWrongSize(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "info.dat") + + // Write garbage data of wrong size + err := os.WriteFile(path, []byte("short"), 0o600) + require.NoError(t, err) + + file, err := os.OpenFile(path, os.O_RDONLY, 0o600) + require.NoError(t, err) + defer file.Close() + + _, err = ReadChangesetInfo(file) + require.Error(t, err) + require.Contains(t, err.Error(), "unexpected size") +}