feat(iavl): add ChangesetFiles and ChangesetInfo (#25635)

This commit is contained in:
Aaron Craelius 2025-12-04 09:57:57 -05:00 committed by GitHub
parent 0f15702842
commit 0cd5a6819c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 682 additions and 0 deletions

View File

@ -0,0 +1,313 @@
package internal
import (
"errors"
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
)
// ChangesetFiles encapsulates management of changeset files.
// This type is shared between the Changeset and ChangesetWriter types.
type ChangesetFiles struct {
dir string
treeDir string
startVersion uint32
compactedAt uint32
kvDataFile *os.File
branchesFile *os.File
leavesFile *os.File
versionsFile *os.File
orphansFile *os.File
infoFile *os.File
info *ChangesetInfo
closed bool
}
// CreateChangesetFiles creates a new changeset directory and files that are ready to be written to.
// If compactedAt is 0, the changeset is considered original and uncompacted.
// If compactedAt is greater than 0, the changeset is considered compacted and a pending marker file
// will be created to indicate that the changeset is not yet ready for use.
// This pending marker file must be removed once the compaction is fully complete by calling MarkReady,
// otherwise the changeset will be considered incomplete and deleted at the next startup.
func CreateChangesetFiles(treeDir string, startVersion, compactedAt uint32) (*ChangesetFiles, error) {
// ensure absolute path
var err error
treeDir, err = filepath.Abs(treeDir)
if err != nil {
return nil, fmt.Errorf("failed to get absolute path for %s: %w", treeDir, err)
}
dirName := fmt.Sprintf("%d", startVersion)
if compactedAt > 0 {
dirName = fmt.Sprintf("%d.%d", startVersion, compactedAt)
}
dir := filepath.Join(treeDir, dirName)
err = os.MkdirAll(dir, 0o755)
if err != nil {
return nil, fmt.Errorf("failed to create changeset dir: %w", err)
}
// create pending marker file for compacted changesets
if compactedAt > 0 {
err := os.WriteFile(pendingFilename(dir), []byte{}, 0o600)
if err != nil {
return nil, fmt.Errorf("failed to create pending marker file for compacted changeset: %w", err)
}
}
cr := &ChangesetFiles{
dir: dir,
treeDir: treeDir,
startVersion: startVersion,
compactedAt: compactedAt,
}
err = cr.open(writeModeFlags)
if err != nil {
return nil, fmt.Errorf("failed to open changeset files: %w", err)
}
return cr, nil
}
const writeModeFlags = os.O_RDWR | os.O_CREATE | os.O_APPEND
// OpenChangesetFiles opens an existing changeset directory and files.
// All files are opened in readonly mode, except for orphans.dat and info.dat which are opened in read-write mode
// to track orphan data and statistics.
func OpenChangesetFiles(dirName string) (*ChangesetFiles, error) {
startVersion, compactedAt, valid := ParseChangesetDirName(filepath.Base(dirName))
if !valid {
return nil, fmt.Errorf("invalid changeset dir name: %s", dirName)
}
dir, err := filepath.Abs(dirName)
if err != nil {
return nil, fmt.Errorf("failed to get absolute path for %s: %w", dirName, err)
}
treeDir := filepath.Dir(dir)
cr := &ChangesetFiles{
dir: dir,
treeDir: treeDir,
startVersion: startVersion,
compactedAt: compactedAt,
}
err = cr.open(os.O_RDONLY)
if err != nil {
return nil, fmt.Errorf("failed to open changeset files: %w", err)
}
return cr, nil
}
func (cr *ChangesetFiles) open(mode int) error {
var err error
kvFile := filepath.Join(cr.dir, "kv.dat")
cr.kvDataFile, err = os.OpenFile(kvFile, mode, 0o600)
if err != nil {
return fmt.Errorf("failed to open KV data file: %w", err)
}
leavesPath := filepath.Join(cr.dir, "leaves.dat")
cr.leavesFile, err = os.OpenFile(leavesPath, mode, 0o600)
if err != nil {
return fmt.Errorf("failed to open leaves data file: %w", err)
}
branchesPath := filepath.Join(cr.dir, "branches.dat")
cr.branchesFile, err = os.OpenFile(branchesPath, mode, 0o600)
if err != nil {
return fmt.Errorf("failed to open branches data file: %w", err)
}
versionsPath := filepath.Join(cr.dir, "versions.dat")
cr.versionsFile, err = os.OpenFile(versionsPath, mode, 0o600)
if err != nil {
return fmt.Errorf("failed to open versions data file: %w", err)
}
orphansPath := filepath.Join(cr.dir, "orphans.dat")
cr.orphansFile, err = os.OpenFile(orphansPath, writeModeFlags, 0o600) // the orphans file is always opened for writing
if err != nil {
return fmt.Errorf("failed to open orphans data file: %w", err)
}
infoPath := filepath.Join(cr.dir, "info.dat")
cr.infoFile, err = os.OpenFile(infoPath, os.O_RDWR|os.O_CREATE, 0o600) // info file uses random access, not append
if err != nil {
return fmt.Errorf("failed to open changeset info file: %w", err)
}
cr.info, err = ReadChangesetInfo(cr.infoFile)
if err != nil {
return fmt.Errorf("failed to read changeset info: %w", err)
}
return nil
}
// ParseChangesetDirName parses a changeset directory name and returns the start version and compacted at version.
// If the directory name is invalid, valid will be false.
// If a changeset is original and uncompacted, compactedAt will be 0.
func ParseChangesetDirName(dirName string) (startVersion, compactedAt uint32, valid bool) {
var err error
var v uint64
// if no dot, it's an original changeset
if !strings.Contains(dirName, ".") {
v, err = strconv.ParseUint(dirName, 10, 32)
if err != nil {
return 0, 0, false
}
return uint32(v), 0, true
}
parts := strings.Split(dirName, ".")
if len(parts) != 2 {
return 0, 0, false
}
v, err = strconv.ParseUint(parts[0], 10, 32)
if err != nil {
return 0, 0, false
}
startVersion = uint32(v)
v, err = strconv.ParseUint(parts[1], 10, 32)
if err != nil {
return 0, 0, false
}
compactedAt = uint32(v)
return startVersion, compactedAt, true
}
// Dir returns the changeset directory path.
func (cr *ChangesetFiles) Dir() string {
return cr.dir
}
// TreeDir returns the parent tree directory path.
func (cr *ChangesetFiles) TreeDir() string {
return cr.treeDir
}
// KVDataFile returns the kv.dat file handle.
func (cr *ChangesetFiles) KVDataFile() *os.File {
return cr.kvDataFile
}
// BranchesFile returns the branches.dat file handle.
func (cr *ChangesetFiles) BranchesFile() *os.File {
return cr.branchesFile
}
// LeavesFile returns the leaves.dat file handle.
func (cr *ChangesetFiles) LeavesFile() *os.File {
return cr.leavesFile
}
// VersionsFile returns the versions.dat file handle.
func (cr *ChangesetFiles) VersionsFile() *os.File {
return cr.versionsFile
}
// OrphansFile returns the orphans.dat file handle.
func (cr *ChangesetFiles) OrphansFile() *os.File {
return cr.orphansFile
}
// Info returns the changeset info struct.
// This struct is writeable and changes will be persisted to disk when RewriteInfo is called.
func (cr *ChangesetFiles) Info() *ChangesetInfo {
return cr.info
}
// RewriteInfo rewrites the changeset info file with the current info struct.
func (cr *ChangesetFiles) RewriteInfo() error {
return RewriteChangesetInfo(cr.infoFile, cr.info)
}
// StartVersion returns the start version of the changeset.
func (cr *ChangesetFiles) StartVersion() uint32 {
return cr.startVersion
}
// CompactedAtVersion returns the compacted at version of the changeset.
// If the changeset is original and uncompacted, this will be 0.
func (cr *ChangesetFiles) CompactedAtVersion() uint32 {
return cr.compactedAt
}
func pendingFilename(dir string) string {
return filepath.Join(dir, "pending")
}
// IsChangesetReady checks if the changeset is ready to be used.
// A changeset is considered ready if the pending marker file does not exist.
// This is used by startup code only to detect whether a changeset compaction was interrupted before it could complete.
func IsChangesetReady(dir string) (bool, error) {
pendingPath := pendingFilename(dir)
_, err := os.Stat(pendingPath)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return true, nil
}
return false, fmt.Errorf("failed to stat pending marker file: %w", err)
}
return false, nil
}
// MarkReady marks the changeset as ready by removing the pending marker file.
// This is only necessary for compacted changesets.
func (cr *ChangesetFiles) MarkReady() error {
pendingPath := pendingFilename(cr.dir)
err := os.Remove(pendingPath)
if err != nil && !errors.Is(err, os.ErrNotExist) {
return fmt.Errorf("failed to remove pending marker file: %w", err)
}
return nil
}
// Close closes all changeset files.
func (cr *ChangesetFiles) Close() error {
if cr.closed {
return nil
}
cr.closed = true
err := errors.Join(
cr.kvDataFile.Close(),
cr.branchesFile.Close(),
cr.leavesFile.Close(),
cr.versionsFile.Close(),
cr.orphansFile.Close(),
cr.infoFile.Close(),
)
cr.info = nil
return err
}
// DeleteFiles deletes all changeset files and the changeset directory.
// If the files were not already closed, they will be closed first.
func (cr *ChangesetFiles) DeleteFiles() error {
return errors.Join(
cr.Close(), // first close all files
os.Remove(cr.infoFile.Name()),
os.Remove(cr.leavesFile.Name()),
os.Remove(cr.branchesFile.Name()),
os.Remove(cr.versionsFile.Name()),
os.Remove(cr.orphansFile.Name()),
os.Remove(cr.kvDataFile.Name()),
cr.MarkReady(), // remove pending marker file if it exists
os.Remove(cr.dir),
)
}

View File

@ -0,0 +1,210 @@
package internal
import (
"os"
"path/filepath"
"testing"
"github.com/stretchr/testify/require"
)
func TestParseChangesetDirName(t *testing.T) {
tests := []struct {
name string
dirName string
wantStart uint32
wantCompactedAt uint32
wantValid bool
}{
{
name: "uncompacted",
dirName: "100",
wantStart: 100,
wantCompactedAt: 0,
wantValid: true,
},
{
name: "compacted",
dirName: "100.200",
wantStart: 100,
wantCompactedAt: 200,
wantValid: true,
},
{
name: "invalid - not a number",
dirName: "abc",
wantValid: false,
},
{
name: "invalid - too many dots",
dirName: "1.2.3",
wantValid: false,
},
{
name: "invalid - empty",
dirName: "",
wantValid: false,
},
{
name: "invalid - overflow",
dirName: "5000000000",
wantValid: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
start, compactedAt, valid := ParseChangesetDirName(tt.dirName)
require.Equal(t, tt.wantValid, valid)
if valid {
require.Equal(t, tt.wantStart, start)
require.Equal(t, tt.wantCompactedAt, compactedAt)
}
})
}
}
func TestCreateChangesetFiles_Uncompacted(t *testing.T) {
treeDir := t.TempDir()
cf, err := CreateChangesetFiles(treeDir, 100, 0)
require.NoError(t, err)
defer cf.Close()
// Check directory created
require.Equal(t, filepath.Join(treeDir, "100"), cf.Dir())
require.Equal(t, uint32(100), cf.StartVersion())
require.Equal(t, uint32(0), cf.CompactedAtVersion())
// Uncompacted should be ready immediately (no pending marker)
ready, err := IsChangesetReady(cf.Dir())
require.NoError(t, err)
require.True(t, ready)
// All files should exist
require.NotNil(t, cf.KVDataFile())
require.NotNil(t, cf.BranchesFile())
require.NotNil(t, cf.LeavesFile())
require.NotNil(t, cf.VersionsFile())
require.NotNil(t, cf.OrphansFile())
require.NotNil(t, cf.Info())
}
func TestCreateChangesetFiles_Compacted(t *testing.T) {
treeDir := t.TempDir()
cf, err := CreateChangesetFiles(treeDir, 100, 200)
require.NoError(t, err)
defer cf.Close()
// Check directory name includes compactedAt
require.Equal(t, filepath.Join(treeDir, "100.200"), cf.Dir())
require.Equal(t, uint32(100), cf.StartVersion())
require.Equal(t, uint32(200), cf.CompactedAtVersion())
// Compacted should NOT be ready until MarkReady called
ready, err := IsChangesetReady(cf.Dir())
require.NoError(t, err)
require.False(t, ready)
// Mark ready
err = cf.MarkReady()
require.NoError(t, err)
// Now should be ready
ready, err = IsChangesetReady(cf.Dir())
require.NoError(t, err)
require.True(t, ready)
}
func TestChangesetFiles_OpenExisting(t *testing.T) {
treeDir := t.TempDir()
// Create and close
cf, err := CreateChangesetFiles(treeDir, 100, 0)
require.NoError(t, err)
// Modify info
cf.Info().StartVersion = 100
cf.Info().EndVersion = 150
cf.Info().LeafOrphans = 42
// Persist info
require.NoError(t, cf.RewriteInfo())
require.NoError(t, cf.Close())
// Reopen
cf2, err := OpenChangesetFiles(filepath.Join(treeDir, "100"))
require.NoError(t, err)
defer cf2.Close()
require.Equal(t, uint32(100), cf2.StartVersion())
require.Equal(t, uint32(0), cf2.CompactedAtVersion())
// Info should be persisted
require.Equal(t, uint32(100), cf2.Info().StartVersion)
require.Equal(t, uint32(150), cf2.Info().EndVersion)
require.Equal(t, uint32(42), cf2.Info().LeafOrphans)
}
func TestChangesetFiles_DeleteFiles(t *testing.T) {
tests := []struct {
name string
startVersion uint32
compactedAt uint32
}{
{
name: "uncompacted",
startVersion: 100,
},
{
name: "compacted",
startVersion: 5,
compactedAt: 10,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
treeDir := t.TempDir()
cf, err := CreateChangesetFiles(treeDir, tt.startVersion, tt.compactedAt)
require.NoError(t, err)
dir := cf.Dir()
// Directory should exist
_, err = os.Stat(dir)
require.NoError(t, err)
// Delete
err = cf.DeleteFiles()
require.NoError(t, err)
// Directory should be gone
_, err = os.Stat(dir)
require.True(t, os.IsNotExist(err))
})
}
}
func TestChangesetFiles_CloseIdempotent(t *testing.T) {
treeDir := t.TempDir()
cf, err := CreateChangesetFiles(treeDir, 100, 0)
require.NoError(t, err)
// Close multiple times should not error
require.NoError(t, cf.Close())
require.NoError(t, cf.Close())
}
func TestOpenChangesetFiles_InvalidDir(t *testing.T) {
treeDir := t.TempDir()
// Create a directory with invalid name
invalidDir := filepath.Join(treeDir, "not-a-version")
require.NoError(t, os.MkdirAll(invalidDir, 0o755))
_, err := OpenChangesetFiles(invalidDir)
require.Error(t, err)
require.Contains(t, err.Error(), "invalid changeset dir name")
}

View File

@ -0,0 +1,63 @@
package internal
import (
"fmt"
"io"
"os"
"unsafe"
)
// ChangesetInfo holds metadata about a changeset.
// This mainly tracks the start and end version of the changeset and also contains statistics about orphans in the
// changeset so that compaction can be efficiently scheduled.
// Currently, the orphan statistics track how many total leaf and branch orphans there are as well as the total sum
// of their orphan versions.
// This should give us some heuristics as to what percentage of the changeset is composed of
// orphans and roughly how long ago they were orphaned.
type ChangesetInfo struct {
// StartVersion is the first version included in the changeset.
StartVersion uint32
// EndVersion is the last version included in the changeset.
EndVersion uint32
// LeafOrphans is the number of leaf orphan nodes in the changeset.
LeafOrphans uint32
// BranchOrphans is the number of branch orphan nodes in the changeset.
BranchOrphans uint32
// LeafOrphanVersionTotal is the sum of the orphan versions of all orphaned leaf nodes in the changeset.
LeafOrphanVersionTotal uint64
// BranchOrphanVersionTotal is the sum of the orphan versions of all orphaned branch nodes in the changeset.
BranchOrphanVersionTotal uint64
}
// RewriteChangesetInfo rewrites the info file with the given changeset info.
// This method is okay to call the first time the file is created as well.
func RewriteChangesetInfo(file *os.File, info *ChangesetInfo) error {
data := unsafe.Slice((*byte)(unsafe.Pointer(info)), int(unsafe.Sizeof(*info)))
if _, err := file.WriteAt(data, 0); err != nil {
return fmt.Errorf("failed to write changeset info: %w", err)
}
return nil
}
// ReadChangesetInfo reads changeset info from a file. It returns an empty default struct if file is empty.
func ReadChangesetInfo(file *os.File) (*ChangesetInfo, error) {
var info ChangesetInfo
size := int(unsafe.Sizeof(info))
data := unsafe.Slice((*byte)(unsafe.Pointer(&info)), size)
n, err := file.ReadAt(data, 0)
if err == io.EOF && n == 0 {
return &ChangesetInfo{}, nil // empty file
}
if err != nil && err != io.EOF {
return nil, fmt.Errorf("failed to read changeset info: %w", err)
}
if n != size {
return nil, fmt.Errorf("info file has unexpected size: %d, expected %d", n, size)
}
return &info, nil
}

View File

@ -0,0 +1,96 @@
package internal
import (
"os"
"path/filepath"
"testing"
"github.com/stretchr/testify/require"
)
func TestChangesetInfo_RoundTrip(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "info.dat")
file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0o600)
require.NoError(t, err)
defer file.Close()
original := &ChangesetInfo{
StartVersion: 100,
EndVersion: 200,
LeafOrphans: 50,
BranchOrphans: 25,
LeafOrphanVersionTotal: 5000,
BranchOrphanVersionTotal: 2500,
}
err = RewriteChangesetInfo(file, original)
require.NoError(t, err)
read, err := ReadChangesetInfo(file)
require.NoError(t, err)
require.Equal(t, original.StartVersion, read.StartVersion)
require.Equal(t, original.EndVersion, read.EndVersion)
require.Equal(t, original.LeafOrphans, read.LeafOrphans)
require.Equal(t, original.BranchOrphans, read.BranchOrphans)
require.Equal(t, original.LeafOrphanVersionTotal, read.LeafOrphanVersionTotal)
require.Equal(t, original.BranchOrphanVersionTotal, read.BranchOrphanVersionTotal)
}
func TestChangesetInfo_RewriteOverwrites(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "info.dat")
file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0o600)
require.NoError(t, err)
defer file.Close()
// Write first version
v1 := &ChangesetInfo{StartVersion: 1, EndVersion: 1}
err = RewriteChangesetInfo(file, v1)
require.NoError(t, err)
// Overwrite with second version
v2 := &ChangesetInfo{StartVersion: 1, EndVersion: 2}
err = RewriteChangesetInfo(file, v2)
require.NoError(t, err)
// Read should return second version
read, err := ReadChangesetInfo(file)
require.NoError(t, err)
require.Equal(t, uint32(1), read.StartVersion)
require.Equal(t, uint32(2), read.EndVersion)
}
func TestChangesetInfo_ReadEmptyFile(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "info.dat")
file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0o600)
require.NoError(t, err)
defer file.Close()
// Reading empty file should return default struct
info, err := ReadChangesetInfo(file)
require.NoError(t, err)
require.Equal(t, &ChangesetInfo{}, info)
}
func TestChangesetInfo_ReadWrongSize(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "info.dat")
// Write garbage data of wrong size
err := os.WriteFile(path, []byte("short"), 0o600)
require.NoError(t, err)
file, err := os.OpenFile(path, os.O_RDONLY, 0o600)
require.NoError(t, err)
defer file.Close()
_, err = ReadChangesetInfo(file)
require.Error(t, err)
require.Contains(t, err.Error(), "unexpected size")
}