Merge remote-tracking branch 'origin/master' into jen/v151

This commit is contained in:
Jennifer Wang 2022-02-11 00:16:52 -05:00
commit 99d5ad097c
62 changed files with 2513 additions and 397 deletions

View File

@ -113,6 +113,8 @@ type StorageMiner interface {
// SectorCommitPending returns a list of pending Commit sectors to be sent in the next aggregate message
SectorCommitPending(ctx context.Context) ([]abi.SectorID, error) //perm:admin
SectorMatchPendingPiecesToOpenSectors(ctx context.Context) error //perm:admin
// SectorAbortUpgrade can be called on sectors that are in the process of being upgraded to abort it
SectorAbortUpgrade(context.Context, abi.SectorNumber) error //perm:admin
// WorkerConnect tells the node to connect to workers RPC
WorkerConnect(context.Context, string) error //perm:admin retry:true
@ -130,6 +132,7 @@ type StorageMiner interface {
ReturnProveReplicaUpdate1(ctx context.Context, callID storiface.CallID, vanillaProofs storage.ReplicaVanillaProofs, err *storiface.CallError) error //perm:admin retry:true
ReturnProveReplicaUpdate2(ctx context.Context, callID storiface.CallID, proof storage.ReplicaUpdateProof, err *storiface.CallError) error //perm:admin retry:true
ReturnGenerateSectorKeyFromData(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error //perm:admin retry:true
ReturnFinalizeReplicaUpdate(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error //perm:admin retry:true
ReturnReleaseUnsealed(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error //perm:admin retry:true
ReturnMoveStorage(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error //perm:admin retry:true
ReturnUnsealPiece(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error //perm:admin retry:true

View File

@ -39,6 +39,7 @@ type Worker interface {
SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) //perm:admin
SealCommit2(ctx context.Context, sector storage.SectorRef, c1o storage.Commit1Out) (storiface.CallID, error) //perm:admin
FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) //perm:admin
FinalizeReplicaUpdate(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) //perm:admin
ReplicaUpdate(ctx context.Context, sector storage.SectorRef, pieces []abi.PieceInfo) (storiface.CallID, error) //perm:admin
ProveReplicaUpdate1(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid) (storiface.CallID, error) //perm:admin
ProveReplicaUpdate2(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid, vanillaProofs storage.ReplicaVanillaProofs) (storiface.CallID, error) //perm:admin

View File

@ -723,6 +723,8 @@ type StorageMinerStruct struct {
ReturnFetch func(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error `perm:"admin"`
ReturnFinalizeReplicaUpdate func(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error `perm:"admin"`
ReturnFinalizeSector func(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error `perm:"admin"`
ReturnGenerateSectorKeyFromData func(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error `perm:"admin"`
@ -755,6 +757,8 @@ type StorageMinerStruct struct {
SealingSchedDiag func(p0 context.Context, p1 bool) (interface{}, error) `perm:"admin"`
SectorAbortUpgrade func(p0 context.Context, p1 abi.SectorNumber) error `perm:"admin"`
SectorAddPieceToAny func(p0 context.Context, p1 abi.UnpaddedPieceSize, p2 storage.Data, p3 PieceDealInfo) (SectorOffset, error) `perm:"admin"`
SectorCommitFlush func(p0 context.Context) ([]sealiface.CommitBatchRes, error) `perm:"admin"`
@ -872,6 +876,8 @@ type WorkerStruct struct {
Fetch func(p0 context.Context, p1 storage.SectorRef, p2 storiface.SectorFileType, p3 storiface.PathType, p4 storiface.AcquireMode) (storiface.CallID, error) `perm:"admin"`
FinalizeReplicaUpdate func(p0 context.Context, p1 storage.SectorRef, p2 []storage.Range) (storiface.CallID, error) `perm:"admin"`
FinalizeSector func(p0 context.Context, p1 storage.SectorRef, p2 []storage.Range) (storiface.CallID, error) `perm:"admin"`
GenerateSectorKeyFromData func(p0 context.Context, p1 storage.SectorRef, p2 cid.Cid) (storiface.CallID, error) `perm:"admin"`
@ -4280,6 +4286,17 @@ func (s *StorageMinerStub) ReturnFetch(p0 context.Context, p1 storiface.CallID,
return ErrNotSupported
}
func (s *StorageMinerStruct) ReturnFinalizeReplicaUpdate(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error {
if s.Internal.ReturnFinalizeReplicaUpdate == nil {
return ErrNotSupported
}
return s.Internal.ReturnFinalizeReplicaUpdate(p0, p1, p2)
}
func (s *StorageMinerStub) ReturnFinalizeReplicaUpdate(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error {
return ErrNotSupported
}
func (s *StorageMinerStruct) ReturnFinalizeSector(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error {
if s.Internal.ReturnFinalizeSector == nil {
return ErrNotSupported
@ -4456,6 +4473,17 @@ func (s *StorageMinerStub) SealingSchedDiag(p0 context.Context, p1 bool) (interf
return nil, ErrNotSupported
}
func (s *StorageMinerStruct) SectorAbortUpgrade(p0 context.Context, p1 abi.SectorNumber) error {
if s.Internal.SectorAbortUpgrade == nil {
return ErrNotSupported
}
return s.Internal.SectorAbortUpgrade(p0, p1)
}
func (s *StorageMinerStub) SectorAbortUpgrade(p0 context.Context, p1 abi.SectorNumber) error {
return ErrNotSupported
}
func (s *StorageMinerStruct) SectorAddPieceToAny(p0 context.Context, p1 abi.UnpaddedPieceSize, p2 storage.Data, p3 PieceDealInfo) (SectorOffset, error) {
if s.Internal.SectorAddPieceToAny == nil {
return *new(SectorOffset), ErrNotSupported
@ -5006,6 +5034,17 @@ func (s *WorkerStub) Fetch(p0 context.Context, p1 storage.SectorRef, p2 storifac
return *new(storiface.CallID), ErrNotSupported
}
func (s *WorkerStruct) FinalizeReplicaUpdate(p0 context.Context, p1 storage.SectorRef, p2 []storage.Range) (storiface.CallID, error) {
if s.Internal.FinalizeReplicaUpdate == nil {
return *new(storiface.CallID), ErrNotSupported
}
return s.Internal.FinalizeReplicaUpdate(p0, p1, p2)
}
func (s *WorkerStub) FinalizeReplicaUpdate(p0 context.Context, p1 storage.SectorRef, p2 []storage.Range) (storiface.CallID, error) {
return *new(storiface.CallID), ErrNotSupported
}
func (s *WorkerStruct) FinalizeSector(p0 context.Context, p1 storage.SectorRef, p2 []storage.Range) (storiface.CallID, error) {
if s.Internal.FinalizeSector == nil {
return *new(storiface.CallID), ErrNotSupported

View File

@ -49,10 +49,11 @@ These are options in the `[Chainstore.Splitstore]` section of the configuration:
blockstore and discards writes; this is necessary to support syncing from a snapshot.
- `MarkSetType` -- specifies the type of markset to use during compaction.
The markset is the data structure used by compaction/gc to track live objects.
The default value is `"map"`, which will use an in-memory map; if you are limited
in memory (or indeed see compaction run out of memory), you can also specify
`"badger"` which will use an disk backed markset, using badger. This will use
much less memory, but will also make compaction slower.
The default value is "badger", which will use a disk backed markset using badger.
If you have a lot of memory (48G or more) you can also use "map", which will use
an in memory markset, speeding up compaction at the cost of higher memory usage.
Note: If you are using a VPS with a network volume, you need to provision at least
3000 IOPs with the badger markset.
- `HotStoreMessageRetention` -- specifies how many finalities, beyond the 4
finalities maintained by default, to maintain messages and message receipts in the
hotstore. This is useful for assistive nodes that want to support syncing for other
@ -105,6 +106,12 @@ Compaction works transactionally with the following algorithm:
- We delete in small batches taking a lock; each batch is checked again for marks, from the concurrent transactional mark, so as to never delete anything live
- We then end the transaction and compact/gc the hotstore.
As of [#8008](https://github.com/filecoin-project/lotus/pull/8008) the compaction algorithm has been
modified to eliminate sorting and maintain the cold object set on disk. This drastically reduces
memory usage; in fact, when using badger as the markset compaction uses very little memory, and
it should be now possible to run splitstore with 32GB of RAM or less without danger of running out of
memory during compaction.
## Garbage Collection
TBD -- see [#6577](https://github.com/filecoin-project/lotus/issues/6577)

View File

@ -0,0 +1,118 @@
package splitstore
import (
"bufio"
"io"
"os"
"golang.org/x/xerrors"
cid "github.com/ipfs/go-cid"
mh "github.com/multiformats/go-multihash"
)
type Checkpoint struct {
file *os.File
buf *bufio.Writer
}
func NewCheckpoint(path string) (*Checkpoint, error) {
file, err := os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY|os.O_SYNC, 0644)
if err != nil {
return nil, xerrors.Errorf("error creating checkpoint: %w", err)
}
buf := bufio.NewWriter(file)
return &Checkpoint{
file: file,
buf: buf,
}, nil
}
func OpenCheckpoint(path string) (*Checkpoint, cid.Cid, error) {
filein, err := os.Open(path)
if err != nil {
return nil, cid.Undef, xerrors.Errorf("error opening checkpoint for reading: %w", err)
}
defer filein.Close() //nolint:errcheck
bufin := bufio.NewReader(filein)
start, err := readRawCid(bufin, nil)
if err != nil && err != io.EOF {
return nil, cid.Undef, xerrors.Errorf("error reading cid from checkpoint: %w", err)
}
fileout, err := os.OpenFile(path, os.O_WRONLY|os.O_SYNC, 0644)
if err != nil {
return nil, cid.Undef, xerrors.Errorf("error opening checkpoint for writing: %w", err)
}
bufout := bufio.NewWriter(fileout)
return &Checkpoint{
file: fileout,
buf: bufout,
}, start, nil
}
func (cp *Checkpoint) Set(c cid.Cid) error {
if _, err := cp.file.Seek(0, io.SeekStart); err != nil {
return xerrors.Errorf("error seeking beginning of checkpoint: %w", err)
}
if err := writeRawCid(cp.buf, c, true); err != nil {
return xerrors.Errorf("error writing cid to checkpoint: %w", err)
}
return nil
}
func (cp *Checkpoint) Close() error {
if cp.file == nil {
return nil
}
err := cp.file.Close()
cp.file = nil
cp.buf = nil
return err
}
func readRawCid(buf *bufio.Reader, hbuf []byte) (cid.Cid, error) {
sz, err := buf.ReadByte()
if err != nil {
return cid.Undef, err // don't wrap EOF as it is not an error here
}
if hbuf == nil {
hbuf = make([]byte, int(sz))
} else {
hbuf = hbuf[:int(sz)]
}
if _, err := io.ReadFull(buf, hbuf); err != nil {
return cid.Undef, xerrors.Errorf("error reading hash: %w", err) // wrap EOF, it's corrupt
}
hash, err := mh.Cast(hbuf)
if err != nil {
return cid.Undef, xerrors.Errorf("error casting multihash: %w", err)
}
return cid.NewCidV1(cid.Raw, hash), nil
}
func writeRawCid(buf *bufio.Writer, c cid.Cid, flush bool) error {
hash := c.Hash()
if err := buf.WriteByte(byte(len(hash))); err != nil {
return err
}
if _, err := buf.Write(hash); err != nil {
return err
}
if flush {
return buf.Flush()
}
return nil
}

View File

@ -0,0 +1,147 @@
package splitstore
import (
"io/ioutil"
"os"
"path/filepath"
"testing"
"github.com/ipfs/go-cid"
"github.com/multiformats/go-multihash"
)
func TestCheckpoint(t *testing.T) {
dir, err := ioutil.TempDir("", "checkpoint.*")
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
_ = os.RemoveAll(dir)
})
path := filepath.Join(dir, "checkpoint")
makeCid := func(key string) cid.Cid {
h, err := multihash.Sum([]byte(key), multihash.SHA2_256, -1)
if err != nil {
t.Fatal(err)
}
return cid.NewCidV1(cid.Raw, h)
}
k1 := makeCid("a")
k2 := makeCid("b")
k3 := makeCid("c")
k4 := makeCid("d")
cp, err := NewCheckpoint(path)
if err != nil {
t.Fatal(err)
}
if err := cp.Set(k1); err != nil {
t.Fatal(err)
}
if err := cp.Set(k2); err != nil {
t.Fatal(err)
}
if err := cp.Close(); err != nil {
t.Fatal(err)
}
cp, start, err := OpenCheckpoint(path)
if err != nil {
t.Fatal(err)
}
if !start.Equals(k2) {
t.Fatalf("expected start to be %s; got %s", k2, start)
}
if err := cp.Set(k3); err != nil {
t.Fatal(err)
}
if err := cp.Set(k4); err != nil {
t.Fatal(err)
}
if err := cp.Close(); err != nil {
t.Fatal(err)
}
cp, start, err = OpenCheckpoint(path)
if err != nil {
t.Fatal(err)
}
if !start.Equals(k4) {
t.Fatalf("expected start to be %s; got %s", k4, start)
}
if err := cp.Close(); err != nil {
t.Fatal(err)
}
// also test correct operation with an empty checkpoint
cp, err = NewCheckpoint(path)
if err != nil {
t.Fatal(err)
}
if err := cp.Close(); err != nil {
t.Fatal(err)
}
cp, start, err = OpenCheckpoint(path)
if err != nil {
t.Fatal(err)
}
if start.Defined() {
t.Fatal("expected start to be undefined")
}
if err := cp.Set(k1); err != nil {
t.Fatal(err)
}
if err := cp.Set(k2); err != nil {
t.Fatal(err)
}
if err := cp.Close(); err != nil {
t.Fatal(err)
}
cp, start, err = OpenCheckpoint(path)
if err != nil {
t.Fatal(err)
}
if !start.Equals(k2) {
t.Fatalf("expected start to be %s; got %s", k2, start)
}
if err := cp.Set(k3); err != nil {
t.Fatal(err)
}
if err := cp.Set(k4); err != nil {
t.Fatal(err)
}
if err := cp.Close(); err != nil {
t.Fatal(err)
}
cp, start, err = OpenCheckpoint(path)
if err != nil {
t.Fatal(err)
}
if !start.Equals(k4) {
t.Fatalf("expected start to be %s; got %s", k4, start)
}
if err := cp.Close(); err != nil {
t.Fatal(err)
}
}

View File

@ -0,0 +1,102 @@
package splitstore
import (
"bufio"
"io"
"os"
"golang.org/x/xerrors"
cid "github.com/ipfs/go-cid"
)
type ColdSetWriter struct {
file *os.File
buf *bufio.Writer
}
type ColdSetReader struct {
file *os.File
buf *bufio.Reader
}
func NewColdSetWriter(path string) (*ColdSetWriter, error) {
file, err := os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644)
if err != nil {
return nil, xerrors.Errorf("error creating coldset: %w", err)
}
buf := bufio.NewWriter(file)
return &ColdSetWriter{
file: file,
buf: buf,
}, nil
}
func NewColdSetReader(path string) (*ColdSetReader, error) {
file, err := os.Open(path)
if err != nil {
return nil, xerrors.Errorf("error opening coldset: %w", err)
}
buf := bufio.NewReader(file)
return &ColdSetReader{
file: file,
buf: buf,
}, nil
}
func (s *ColdSetWriter) Write(c cid.Cid) error {
return writeRawCid(s.buf, c, false)
}
func (s *ColdSetWriter) Close() error {
if s.file == nil {
return nil
}
err1 := s.buf.Flush()
err2 := s.file.Close()
s.buf = nil
s.file = nil
if err1 != nil {
return err1
}
return err2
}
func (s *ColdSetReader) ForEach(f func(cid.Cid) error) error {
hbuf := make([]byte, 256)
for {
next, err := readRawCid(s.buf, hbuf)
if err != nil {
if err == io.EOF {
return nil
}
return xerrors.Errorf("error reading coldset: %w", err)
}
if err := f(next); err != nil {
return err
}
}
}
func (s *ColdSetReader) Reset() error {
_, err := s.file.Seek(0, io.SeekStart)
return err
}
func (s *ColdSetReader) Close() error {
if s.file == nil {
return nil
}
err := s.file.Close()
s.file = nil
s.buf = nil
return err
}

View File

@ -0,0 +1,99 @@
package splitstore
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"testing"
"github.com/ipfs/go-cid"
"github.com/multiformats/go-multihash"
)
func TestColdSet(t *testing.T) {
dir, err := ioutil.TempDir("", "coldset.*")
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
_ = os.RemoveAll(dir)
})
path := filepath.Join(dir, "coldset")
makeCid := func(i int) cid.Cid {
h, err := multihash.Sum([]byte(fmt.Sprintf("cid.%d", i)), multihash.SHA2_256, -1)
if err != nil {
t.Fatal(err)
}
return cid.NewCidV1(cid.Raw, h)
}
const count = 1000
cids := make([]cid.Cid, 0, count)
for i := 0; i < count; i++ {
cids = append(cids, makeCid(i))
}
cw, err := NewColdSetWriter(path)
if err != nil {
t.Fatal(err)
}
for _, c := range cids {
if err := cw.Write(c); err != nil {
t.Fatal(err)
}
}
if err := cw.Close(); err != nil {
t.Fatal(err)
}
cr, err := NewColdSetReader(path)
if err != nil {
t.Fatal(err)
}
index := 0
err = cr.ForEach(func(c cid.Cid) error {
if index >= count {
t.Fatal("too many cids")
}
if !c.Equals(cids[index]) {
t.Fatalf("wrong cid %d; expected %s but got %s", index, cids[index], c)
}
index++
return nil
})
if err != nil {
t.Fatal(err)
}
if err := cr.Reset(); err != nil {
t.Fatal(err)
}
index = 0
err = cr.ForEach(func(c cid.Cid) error {
if index >= count {
t.Fatal("too many cids")
}
if !c.Equals(cids[index]) {
t.Fatalf("wrong cid; expected %s but got %s", cids[index], c)
}
index++
return nil
})
if err != nil {
t.Fatal(err)
}
}

View File

@ -14,15 +14,24 @@ var errMarkSetClosed = errors.New("markset closed")
type MarkSet interface {
ObjectVisitor
Mark(cid.Cid) error
MarkMany([]cid.Cid) error
Has(cid.Cid) (bool, error)
Close() error
// BeginCriticalSection ensures that the markset is persisted to disk for recovery in case
// of abnormal termination during the critical section span.
BeginCriticalSection() error
// EndCriticalSection ends the critical section span.
EndCriticalSection()
}
type MarkSetEnv interface {
// Create creates a new markset within the environment.
// name is a unique name for this markset, mapped to the filesystem in disk-backed environments
// New creates a new markset within the environment.
// name is a unique name for this markset, mapped to the filesystem for on-disk persistence.
// sizeHint is a hint about the expected size of the markset
Create(name string, sizeHint int64) (MarkSet, error)
New(name string, sizeHint int64) (MarkSet, error)
// Recover recovers an existing markset persisted on-disk.
Recover(name string) (MarkSet, error)
// Close closes the markset
Close() error
}
@ -30,7 +39,7 @@ type MarkSetEnv interface {
func OpenMarkSetEnv(path string, mtype string) (MarkSetEnv, error) {
switch mtype {
case "map":
return NewMapMarkSetEnv()
return NewMapMarkSetEnv(path)
case "badger":
return NewBadgerMarkSetEnv(path)
default:

View File

@ -3,6 +3,7 @@ package splitstore
import (
"os"
"path/filepath"
"runtime"
"sync"
"golang.org/x/xerrors"
@ -28,6 +29,7 @@ type BadgerMarkSet struct {
writers int
seqno int
version int
persist bool
db *badger.DB
path string
@ -47,11 +49,10 @@ func NewBadgerMarkSetEnv(path string) (MarkSetEnv, error) {
return &BadgerMarkSetEnv{path: msPath}, nil
}
func (e *BadgerMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) {
name += ".tmp"
func (e *BadgerMarkSetEnv) New(name string, sizeHint int64) (MarkSet, error) {
path := filepath.Join(e.path, name)
db, err := openTransientBadgerDB(path)
db, err := openBadgerDB(path, false)
if err != nil {
return nil, xerrors.Errorf("error creating badger db: %w", err)
}
@ -67,8 +68,72 @@ func (e *BadgerMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error)
return ms, nil
}
func (e *BadgerMarkSetEnv) Recover(name string) (MarkSet, error) {
path := filepath.Join(e.path, name)
if _, err := os.Stat(path); err != nil {
return nil, xerrors.Errorf("error stating badger db path: %w", err)
}
db, err := openBadgerDB(path, true)
if err != nil {
return nil, xerrors.Errorf("error creating badger db: %w", err)
}
ms := &BadgerMarkSet{
pend: make(map[string]struct{}),
writing: make(map[int]map[string]struct{}),
db: db,
path: path,
persist: true,
}
ms.cond.L = &ms.mx
return ms, nil
}
func (e *BadgerMarkSetEnv) Close() error {
return os.RemoveAll(e.path)
return nil
}
func (s *BadgerMarkSet) BeginCriticalSection() error {
s.mx.Lock()
if s.persist {
s.mx.Unlock()
return nil
}
var write bool
var seqno int
if len(s.pend) > 0 {
write = true
seqno = s.nextBatch()
}
s.persist = true
s.mx.Unlock()
if write {
// all writes sync once perist is true
return s.write(seqno)
}
// wait for any pending writes and sync
s.mx.Lock()
for s.writers > 0 {
s.cond.Wait()
}
s.mx.Unlock()
return s.db.Sync()
}
func (s *BadgerMarkSet) EndCriticalSection() {
s.mx.Lock()
defer s.mx.Unlock()
s.persist = false
}
func (s *BadgerMarkSet) Mark(c cid.Cid) error {
@ -88,6 +153,23 @@ func (s *BadgerMarkSet) Mark(c cid.Cid) error {
return nil
}
func (s *BadgerMarkSet) MarkMany(batch []cid.Cid) error {
s.mx.Lock()
if s.pend == nil {
s.mx.Unlock()
return errMarkSetClosed
}
write, seqno := s.putMany(batch)
s.mx.Unlock()
if write {
return s.write(seqno)
}
return nil
}
func (s *BadgerMarkSet) Has(c cid.Cid) (bool, error) {
s.mx.RLock()
defer s.mx.RUnlock()
@ -193,16 +275,34 @@ func (s *BadgerMarkSet) tryDB(key []byte) (has bool, err error) {
// writer holds the exclusive lock
func (s *BadgerMarkSet) put(key string) (write bool, seqno int) {
s.pend[key] = struct{}{}
if len(s.pend) < badgerMarkSetBatchSize {
if !s.persist && len(s.pend) < badgerMarkSetBatchSize {
return false, 0
}
seqno = s.seqno
seqno = s.nextBatch()
return true, seqno
}
func (s *BadgerMarkSet) putMany(batch []cid.Cid) (write bool, seqno int) {
for _, c := range batch {
key := string(c.Hash())
s.pend[key] = struct{}{}
}
if !s.persist && len(s.pend) < badgerMarkSetBatchSize {
return false, 0
}
seqno = s.nextBatch()
return true, seqno
}
func (s *BadgerMarkSet) nextBatch() int {
seqno := s.seqno
s.seqno++
s.writing[seqno] = s.pend
s.pend = make(map[string]struct{})
return true, seqno
return seqno
}
func (s *BadgerMarkSet) write(seqno int) (err error) {
@ -247,6 +347,14 @@ func (s *BadgerMarkSet) write(seqno int) (err error) {
return xerrors.Errorf("error flushing batch to badger markset: %w", err)
}
s.mx.RLock()
persist := s.persist
s.mx.RUnlock()
if persist {
return s.db.Sync()
}
return nil
}
@ -266,26 +374,29 @@ func (s *BadgerMarkSet) Close() error {
db := s.db
s.db = nil
return closeTransientBadgerDB(db, s.path)
return closeBadgerDB(db, s.path, s.persist)
}
func (s *BadgerMarkSet) SetConcurrent() {}
func openBadgerDB(path string, recover bool) (*badger.DB, error) {
// if it is not a recovery, clean up first
if !recover {
err := os.RemoveAll(path)
if err != nil {
return nil, xerrors.Errorf("error clearing markset directory: %w", err)
}
func openTransientBadgerDB(path string) (*badger.DB, error) {
// clean up first
err := os.RemoveAll(path)
if err != nil {
return nil, xerrors.Errorf("error clearing markset directory: %w", err)
}
err = os.MkdirAll(path, 0755) //nolint:gosec
if err != nil {
return nil, xerrors.Errorf("error creating markset directory: %w", err)
err = os.MkdirAll(path, 0755) //nolint:gosec
if err != nil {
return nil, xerrors.Errorf("error creating markset directory: %w", err)
}
}
opts := badger.DefaultOptions(path)
// we manually sync when we are in critical section
opts.SyncWrites = false
// no need to do that
opts.CompactL0OnClose = false
// we store hashes, not much to gain by compression
opts.Compression = options.None
// Note: We use FileIO for loading modes to avoid memory thrashing and interference
// between the system blockstore and the markset.
@ -294,6 +405,15 @@ func openTransientBadgerDB(path string) (*badger.DB, error) {
// exceeded 1GB in size.
opts.TableLoadingMode = options.FileIO
opts.ValueLogLoadingMode = options.FileIO
// We increase the number of L0 tables before compaction to make it unlikely to
// be necessary.
opts.NumLevelZeroTables = 20 // default is 5
opts.NumLevelZeroTablesStall = 30 // default is 10
// increase the number of compactors from default 2 so that if we ever have to
// compact, it is fast
if runtime.NumCPU()/2 > opts.NumCompactors {
opts.NumCompactors = runtime.NumCPU() / 2
}
opts.Logger = &badgerLogger{
SugaredLogger: log.Desugar().WithOptions(zap.AddCallerSkip(1)).Sugar(),
skip2: log.Desugar().WithOptions(zap.AddCallerSkip(2)).Sugar(),
@ -302,12 +422,16 @@ func openTransientBadgerDB(path string) (*badger.DB, error) {
return badger.Open(opts)
}
func closeTransientBadgerDB(db *badger.DB, path string) error {
func closeBadgerDB(db *badger.DB, path string, persist bool) error {
err := db.Close()
if err != nil {
return xerrors.Errorf("error closing badger markset: %w", err)
}
if persist {
return nil
}
err = os.RemoveAll(path)
if err != nil {
return xerrors.Errorf("error deleting badger markset: %w", err)

View File

@ -1,37 +1,104 @@
package splitstore
import (
"bufio"
"io"
"os"
"path/filepath"
"sync"
"golang.org/x/xerrors"
cid "github.com/ipfs/go-cid"
)
type MapMarkSetEnv struct{}
type MapMarkSetEnv struct {
path string
}
var _ MarkSetEnv = (*MapMarkSetEnv)(nil)
type MapMarkSet struct {
mx sync.RWMutex
set map[string]struct{}
persist bool
file *os.File
buf *bufio.Writer
path string
}
var _ MarkSet = (*MapMarkSet)(nil)
func NewMapMarkSetEnv() (*MapMarkSetEnv, error) {
return &MapMarkSetEnv{}, nil
func NewMapMarkSetEnv(path string) (*MapMarkSetEnv, error) {
msPath := filepath.Join(path, "markset.map")
err := os.MkdirAll(msPath, 0755) //nolint:gosec
if err != nil {
return nil, xerrors.Errorf("error creating markset directory: %w", err)
}
return &MapMarkSetEnv{path: msPath}, nil
}
func (e *MapMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) {
func (e *MapMarkSetEnv) New(name string, sizeHint int64) (MarkSet, error) {
path := filepath.Join(e.path, name)
return &MapMarkSet{
set: make(map[string]struct{}, sizeHint),
set: make(map[string]struct{}, sizeHint),
path: path,
}, nil
}
func (e *MapMarkSetEnv) Recover(name string) (MarkSet, error) {
path := filepath.Join(e.path, name)
s := &MapMarkSet{
set: make(map[string]struct{}),
path: path,
}
in, err := os.Open(path)
if err != nil {
return nil, xerrors.Errorf("error opening markset file for read: %w", err)
}
defer in.Close() //nolint:errcheck
// wrap a buffered reader to make this faster
buf := bufio.NewReader(in)
for {
var sz byte
if sz, err = buf.ReadByte(); err != nil {
break
}
key := make([]byte, int(sz))
if _, err = io.ReadFull(buf, key); err != nil {
break
}
s.set[string(key)] = struct{}{}
}
if err != io.EOF {
return nil, xerrors.Errorf("error reading markset file: %w", err)
}
file, err := os.OpenFile(s.path, os.O_WRONLY|os.O_APPEND, 0)
if err != nil {
return nil, xerrors.Errorf("error opening markset file for write: %w", err)
}
s.persist = true
s.file = file
s.buf = bufio.NewWriter(file)
return s, nil
}
func (e *MapMarkSetEnv) Close() error {
return nil
}
func (s *MapMarkSet) Mark(cid cid.Cid) error {
func (s *MapMarkSet) BeginCriticalSection() error {
s.mx.Lock()
defer s.mx.Unlock()
@ -39,7 +106,104 @@ func (s *MapMarkSet) Mark(cid cid.Cid) error {
return errMarkSetClosed
}
s.set[string(cid.Hash())] = struct{}{}
if s.persist {
return nil
}
file, err := os.OpenFile(s.path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644)
if err != nil {
return xerrors.Errorf("error opening markset file: %w", err)
}
// wrap a buffered writer to make this faster
s.buf = bufio.NewWriter(file)
for key := range s.set {
if err := s.writeKey([]byte(key), false); err != nil {
_ = file.Close()
s.buf = nil
return err
}
}
if err := s.buf.Flush(); err != nil {
_ = file.Close()
s.buf = nil
return xerrors.Errorf("error flushing markset file buffer: %w", err)
}
s.file = file
s.persist = true
return nil
}
func (s *MapMarkSet) EndCriticalSection() {
s.mx.Lock()
defer s.mx.Unlock()
if !s.persist {
return
}
_ = s.file.Close()
_ = os.Remove(s.path)
s.file = nil
s.buf = nil
s.persist = false
}
func (s *MapMarkSet) Mark(c cid.Cid) error {
s.mx.Lock()
defer s.mx.Unlock()
if s.set == nil {
return errMarkSetClosed
}
hash := c.Hash()
s.set[string(hash)] = struct{}{}
if s.persist {
if err := s.writeKey(hash, true); err != nil {
return err
}
if err := s.file.Sync(); err != nil {
return xerrors.Errorf("error syncing markset: %w", err)
}
}
return nil
}
func (s *MapMarkSet) MarkMany(batch []cid.Cid) error {
s.mx.Lock()
defer s.mx.Unlock()
if s.set == nil {
return errMarkSetClosed
}
for _, c := range batch {
hash := c.Hash()
s.set[string(hash)] = struct{}{}
if s.persist {
if err := s.writeKey(hash, false); err != nil {
return err
}
}
}
if s.persist {
if err := s.buf.Flush(); err != nil {
return xerrors.Errorf("error flushing markset buffer to disk: %w", err)
}
if err := s.file.Sync(); err != nil {
return xerrors.Errorf("error syncing markset: %w", err)
}
}
return nil
}
@ -63,12 +227,23 @@ func (s *MapMarkSet) Visit(c cid.Cid) (bool, error) {
return false, errMarkSetClosed
}
key := string(c.Hash())
hash := c.Hash()
key := string(hash)
if _, ok := s.set[key]; ok {
return false, nil
}
s.set[key] = struct{}{}
if s.persist {
if err := s.writeKey(hash, true); err != nil {
return false, err
}
if err := s.file.Sync(); err != nil {
return false, xerrors.Errorf("error syncing markset: %w", err)
}
}
return true, nil
}
@ -76,6 +251,39 @@ func (s *MapMarkSet) Close() error {
s.mx.Lock()
defer s.mx.Unlock()
if s.set == nil {
return nil
}
s.set = nil
if s.file != nil {
if err := s.file.Close(); err != nil {
log.Warnf("error closing markset file: %s", err)
}
if !s.persist {
if err := os.Remove(s.path); err != nil {
log.Warnf("error removing markset file: %s", err)
}
}
}
return nil
}
func (s *MapMarkSet) writeKey(k []byte, flush bool) error {
if err := s.buf.WriteByte(byte(len(k))); err != nil {
return xerrors.Errorf("error writing markset key length to disk: %w", err)
}
if _, err := s.buf.Write(k); err != nil {
return xerrors.Errorf("error writing markset key to disk: %w", err)
}
if flush {
if err := s.buf.Flush(); err != nil {
return xerrors.Errorf("error flushing markset buffer to disk: %w", err)
}
}
return nil
}

View File

@ -11,7 +11,10 @@ import (
func TestMapMarkSet(t *testing.T) {
testMarkSet(t, "map")
testMarkSetRecovery(t, "map")
testMarkSetMarkMany(t, "map")
testMarkSetVisitor(t, "map")
testMarkSetVisitorRecovery(t, "map")
}
func TestBadgerMarkSet(t *testing.T) {
@ -21,12 +24,13 @@ func TestBadgerMarkSet(t *testing.T) {
badgerMarkSetBatchSize = bs
})
testMarkSet(t, "badger")
testMarkSetRecovery(t, "badger")
testMarkSetMarkMany(t, "badger")
testMarkSetVisitor(t, "badger")
testMarkSetVisitorRecovery(t, "badger")
}
func testMarkSet(t *testing.T, lsType string) {
t.Helper()
path, err := ioutil.TempDir("", "markset.*")
if err != nil {
t.Fatal(err)
@ -42,12 +46,12 @@ func testMarkSet(t *testing.T, lsType string) {
}
defer env.Close() //nolint:errcheck
hotSet, err := env.Create("hot", 0)
hotSet, err := env.New("hot", 0)
if err != nil {
t.Fatal(err)
}
coldSet, err := env.Create("cold", 0)
coldSet, err := env.New("cold", 0)
if err != nil {
t.Fatal(err)
}
@ -62,6 +66,7 @@ func testMarkSet(t *testing.T, lsType string) {
}
mustHave := func(s MarkSet, cid cid.Cid) {
t.Helper()
has, err := s.Has(cid)
if err != nil {
t.Fatal(err)
@ -73,6 +78,7 @@ func testMarkSet(t *testing.T, lsType string) {
}
mustNotHave := func(s MarkSet, cid cid.Cid) {
t.Helper()
has, err := s.Has(cid)
if err != nil {
t.Fatal(err)
@ -114,12 +120,12 @@ func testMarkSet(t *testing.T, lsType string) {
t.Fatal(err)
}
hotSet, err = env.Create("hot", 0)
hotSet, err = env.New("hot", 0)
if err != nil {
t.Fatal(err)
}
coldSet, err = env.Create("cold", 0)
coldSet, err = env.New("cold", 0)
if err != nil {
t.Fatal(err)
}
@ -150,8 +156,6 @@ func testMarkSet(t *testing.T, lsType string) {
}
func testMarkSetVisitor(t *testing.T, lsType string) {
t.Helper()
path, err := ioutil.TempDir("", "markset.*")
if err != nil {
t.Fatal(err)
@ -167,7 +171,7 @@ func testMarkSetVisitor(t *testing.T, lsType string) {
}
defer env.Close() //nolint:errcheck
visitor, err := env.Create("test", 0)
visitor, err := env.New("test", 0)
if err != nil {
t.Fatal(err)
}
@ -219,3 +223,322 @@ func testMarkSetVisitor(t *testing.T, lsType string) {
mustNotVisit(visitor, k3)
mustNotVisit(visitor, k4)
}
func testMarkSetVisitorRecovery(t *testing.T, lsType string) {
path, err := ioutil.TempDir("", "markset.*")
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
_ = os.RemoveAll(path)
})
env, err := OpenMarkSetEnv(path, lsType)
if err != nil {
t.Fatal(err)
}
defer env.Close() //nolint:errcheck
visitor, err := env.New("test", 0)
if err != nil {
t.Fatal(err)
}
defer visitor.Close() //nolint:errcheck
makeCid := func(key string) cid.Cid {
h, err := multihash.Sum([]byte(key), multihash.SHA2_256, -1)
if err != nil {
t.Fatal(err)
}
return cid.NewCidV1(cid.Raw, h)
}
mustVisit := func(v ObjectVisitor, cid cid.Cid) {
visit, err := v.Visit(cid)
if err != nil {
t.Fatal(err)
}
if !visit {
t.Fatal("object should be visited")
}
}
mustNotVisit := func(v ObjectVisitor, cid cid.Cid) {
visit, err := v.Visit(cid)
if err != nil {
t.Fatal(err)
}
if visit {
t.Fatal("unexpected visit")
}
}
k1 := makeCid("a")
k2 := makeCid("b")
k3 := makeCid("c")
k4 := makeCid("d")
mustVisit(visitor, k1)
mustVisit(visitor, k2)
if err := visitor.BeginCriticalSection(); err != nil {
t.Fatal(err)
}
mustVisit(visitor, k3)
mustVisit(visitor, k4)
mustNotVisit(visitor, k1)
mustNotVisit(visitor, k2)
mustNotVisit(visitor, k3)
mustNotVisit(visitor, k4)
if err := visitor.Close(); err != nil {
t.Fatal(err)
}
visitor, err = env.Recover("test")
if err != nil {
t.Fatal(err)
}
mustNotVisit(visitor, k1)
mustNotVisit(visitor, k2)
mustNotVisit(visitor, k3)
mustNotVisit(visitor, k4)
visitor.EndCriticalSection()
if err := visitor.Close(); err != nil {
t.Fatal(err)
}
_, err = env.Recover("test")
if err == nil {
t.Fatal("expected recovery to fail")
}
}
func testMarkSetRecovery(t *testing.T, lsType string) {
path, err := ioutil.TempDir("", "markset.*")
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
_ = os.RemoveAll(path)
})
env, err := OpenMarkSetEnv(path, lsType)
if err != nil {
t.Fatal(err)
}
defer env.Close() //nolint:errcheck
markSet, err := env.New("test", 0)
if err != nil {
t.Fatal(err)
}
makeCid := func(key string) cid.Cid {
h, err := multihash.Sum([]byte(key), multihash.SHA2_256, -1)
if err != nil {
t.Fatal(err)
}
return cid.NewCidV1(cid.Raw, h)
}
mustHave := func(s MarkSet, cid cid.Cid) {
t.Helper()
has, err := s.Has(cid)
if err != nil {
t.Fatal(err)
}
if !has {
t.Fatal("mark not found")
}
}
mustNotHave := func(s MarkSet, cid cid.Cid) {
t.Helper()
has, err := s.Has(cid)
if err != nil {
t.Fatal(err)
}
if has {
t.Fatal("unexpected mark")
}
}
k1 := makeCid("a")
k2 := makeCid("b")
k3 := makeCid("c")
k4 := makeCid("d")
if err := markSet.Mark(k1); err != nil {
t.Fatal(err)
}
if err := markSet.Mark(k2); err != nil {
t.Fatal(err)
}
mustHave(markSet, k1)
mustHave(markSet, k2)
mustNotHave(markSet, k3)
mustNotHave(markSet, k4)
if err := markSet.BeginCriticalSection(); err != nil {
t.Fatal(err)
}
if err := markSet.Mark(k3); err != nil {
t.Fatal(err)
}
if err := markSet.Mark(k4); err != nil {
t.Fatal(err)
}
mustHave(markSet, k1)
mustHave(markSet, k2)
mustHave(markSet, k3)
mustHave(markSet, k4)
if err := markSet.Close(); err != nil {
t.Fatal(err)
}
markSet, err = env.Recover("test")
if err != nil {
t.Fatal(err)
}
mustHave(markSet, k1)
mustHave(markSet, k2)
mustHave(markSet, k3)
mustHave(markSet, k4)
markSet.EndCriticalSection()
if err := markSet.Close(); err != nil {
t.Fatal(err)
}
_, err = env.Recover("test")
if err == nil {
t.Fatal("expected recovery to fail")
}
}
func testMarkSetMarkMany(t *testing.T, lsType string) {
path, err := ioutil.TempDir("", "markset.*")
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
_ = os.RemoveAll(path)
})
env, err := OpenMarkSetEnv(path, lsType)
if err != nil {
t.Fatal(err)
}
defer env.Close() //nolint:errcheck
markSet, err := env.New("test", 0)
if err != nil {
t.Fatal(err)
}
makeCid := func(key string) cid.Cid {
h, err := multihash.Sum([]byte(key), multihash.SHA2_256, -1)
if err != nil {
t.Fatal(err)
}
return cid.NewCidV1(cid.Raw, h)
}
mustHave := func(s MarkSet, cid cid.Cid) {
t.Helper()
has, err := s.Has(cid)
if err != nil {
t.Fatal(err)
}
if !has {
t.Fatal("mark not found")
}
}
mustNotHave := func(s MarkSet, cid cid.Cid) {
t.Helper()
has, err := s.Has(cid)
if err != nil {
t.Fatal(err)
}
if has {
t.Fatal("unexpected mark")
}
}
k1 := makeCid("a")
k2 := makeCid("b")
k3 := makeCid("c")
k4 := makeCid("d")
if err := markSet.MarkMany([]cid.Cid{k1, k2}); err != nil {
t.Fatal(err)
}
mustHave(markSet, k1)
mustHave(markSet, k2)
mustNotHave(markSet, k3)
mustNotHave(markSet, k4)
if err := markSet.BeginCriticalSection(); err != nil {
t.Fatal(err)
}
if err := markSet.MarkMany([]cid.Cid{k3, k4}); err != nil {
t.Fatal(err)
}
mustHave(markSet, k1)
mustHave(markSet, k2)
mustHave(markSet, k3)
mustHave(markSet, k4)
if err := markSet.Close(); err != nil {
t.Fatal(err)
}
markSet, err = env.Recover("test")
if err != nil {
t.Fatal(err)
}
mustHave(markSet, k1)
mustHave(markSet, k2)
mustHave(markSet, k3)
mustHave(markSet, k4)
markSet.EndCriticalSection()
if err := markSet.Close(); err != nil {
t.Fatal(err)
}
_, err = env.Recover("test")
if err == nil {
t.Fatal("expected recovery to fail")
}
}

View File

@ -129,8 +129,6 @@ type SplitStore struct {
headChangeMx sync.Mutex
coldPurgeSize int
chain ChainAccessor
ds dstore.Datastore
cold bstore.Blockstore
@ -158,6 +156,10 @@ type SplitStore struct {
txnRefsMx sync.Mutex
txnRefs map[cid.Cid]struct{}
txnMissing map[cid.Cid]struct{}
txnMarkSet MarkSet
txnSyncMx sync.Mutex
txnSyncCond sync.Cond
txnSync bool
// registered protectors
protectors []func(func(cid.Cid) error) error
@ -194,11 +196,10 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co
cold: cold,
hot: hots,
markSetEnv: markSetEnv,
coldPurgeSize: defaultColdPurgeSize,
}
ss.txnViewsCond.L = &ss.txnViewsMx
ss.txnSyncCond.L = &ss.txnSyncMx
ss.ctx, ss.cancel = context.WithCancel(context.Background())
if enableDebugLog {
@ -208,6 +209,14 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co
}
}
if ss.checkpointExists() {
log.Info("found compaction checkpoint; resuming compaction")
if err := ss.completeCompaction(); err != nil {
markSetEnv.Close() //nolint:errcheck
return nil, xerrors.Errorf("error resuming compaction: %w", err)
}
}
return ss, nil
}
@ -230,6 +239,20 @@ func (s *SplitStore) Has(ctx context.Context, cid cid.Cid) (bool, error) {
s.txnLk.RLock()
defer s.txnLk.RUnlock()
// critical section
if s.txnMarkSet != nil {
has, err := s.txnMarkSet.Has(cid)
if err != nil {
return false, err
}
if has {
return s.has(cid)
}
return s.cold.Has(ctx, cid)
}
has, err := s.hot.Has(ctx, cid)
if err != nil {
@ -257,6 +280,20 @@ func (s *SplitStore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error)
s.txnLk.RLock()
defer s.txnLk.RUnlock()
// critical section
if s.txnMarkSet != nil {
has, err := s.txnMarkSet.Has(cid)
if err != nil {
return nil, err
}
if has {
return s.get(cid)
}
return s.cold.Get(ctx, cid)
}
blk, err := s.hot.Get(ctx, cid)
switch err {
@ -294,6 +331,20 @@ func (s *SplitStore) GetSize(ctx context.Context, cid cid.Cid) (int, error) {
s.txnLk.RLock()
defer s.txnLk.RUnlock()
// critical section
if s.txnMarkSet != nil {
has, err := s.txnMarkSet.Has(cid)
if err != nil {
return 0, err
}
if has {
return s.getSize(cid)
}
return s.cold.GetSize(ctx, cid)
}
size, err := s.hot.GetSize(ctx, cid)
switch err {
@ -332,6 +383,12 @@ func (s *SplitStore) Put(ctx context.Context, blk blocks.Block) error {
s.debug.LogWrite(blk)
// critical section
if s.txnMarkSet != nil {
s.markLiveRefs([]cid.Cid{blk.Cid()})
return nil
}
s.trackTxnRef(blk.Cid())
return nil
}
@ -377,6 +434,12 @@ func (s *SplitStore) PutMany(ctx context.Context, blks []blocks.Block) error {
s.debug.LogWriteMany(blks)
// critical section
if s.txnMarkSet != nil {
s.markLiveRefs(batch)
return nil
}
s.trackTxnRefMany(batch)
return nil
}
@ -436,6 +499,23 @@ func (s *SplitStore) View(ctx context.Context, cid cid.Cid, cb func([]byte) erro
return cb(data)
}
// critical section
s.txnLk.RLock() // the lock is released in protectView if we are not in critical section
if s.txnMarkSet != nil {
has, err := s.txnMarkSet.Has(cid)
s.txnLk.RUnlock()
if err != nil {
return err
}
if has {
return s.view(cid, cb)
}
return s.cold.View(ctx, cid, cb)
}
// views are (optimistically) protected two-fold:
// - if there is an active transaction, then the reference is protected.
// - if there is no active transaction, active views are tracked in a
@ -585,6 +665,11 @@ func (s *SplitStore) Close() error {
}
if atomic.LoadInt32(&s.compacting) == 1 {
s.txnSyncMx.Lock()
s.txnSync = true
s.txnSyncCond.Broadcast()
s.txnSyncMx.Unlock()
log.Warn("close with ongoing compaction in progress; waiting for it to finish...")
for atomic.LoadInt32(&s.compacting) == 1 {
time.Sleep(time.Second)

View File

@ -89,7 +89,7 @@ func (s *SplitStore) doCheck(curTs *types.TipSet) error {
coldCnt := new(int64)
missingCnt := new(int64)
visitor, err := s.markSetEnv.Create("check", 0)
visitor, err := s.markSetEnv.New("check", 0)
if err != nil {
return xerrors.Errorf("error creating visitor: %w", err)
}

View File

@ -3,8 +3,9 @@ package splitstore
import (
"bytes"
"errors"
"os"
"path/filepath"
"runtime"
"sort"
"sync"
"sync/atomic"
"time"
@ -48,6 +49,10 @@ var (
// SyncGapTime is the time delay from a tipset's min timestamp before we decide
// there is a sync gap
SyncGapTime = time.Minute
// SyncWaitTime is the time delay from a tipset's min timestamp before we decide
// we have synced.
SyncWaitTime = 30 * time.Second
)
var (
@ -57,8 +62,6 @@ var (
const (
batchSize = 16384
defaultColdPurgeSize = 7_000_000
)
func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error {
@ -141,9 +144,9 @@ func (s *SplitStore) isNearUpgrade(epoch abi.ChainEpoch) bool {
// transactionally protect incoming tipsets
func (s *SplitStore) protectTipSets(apply []*types.TipSet) {
s.txnLk.RLock()
defer s.txnLk.RUnlock()
if !s.txnActive {
s.txnLk.RUnlock()
return
}
@ -152,12 +155,115 @@ func (s *SplitStore) protectTipSets(apply []*types.TipSet) {
cids = append(cids, ts.Cids()...)
}
if len(cids) == 0 {
s.txnLk.RUnlock()
return
}
// critical section
if s.txnMarkSet != nil {
curTs := apply[len(apply)-1]
timestamp := time.Unix(int64(curTs.MinTimestamp()), 0)
doSync := time.Since(timestamp) < SyncWaitTime
go func() {
if doSync {
defer func() {
s.txnSyncMx.Lock()
defer s.txnSyncMx.Unlock()
s.txnSync = true
s.txnSyncCond.Broadcast()
}()
}
defer s.txnLk.RUnlock()
s.markLiveRefs(cids)
}()
return
}
s.trackTxnRefMany(cids)
s.txnLk.RUnlock()
}
func (s *SplitStore) markLiveRefs(cids []cid.Cid) {
log.Debugf("marking %d live refs", len(cids))
startMark := time.Now()
count := new(int32)
visitor := newConcurrentVisitor()
walkObject := func(c cid.Cid) error {
return s.walkObjectIncomplete(c, visitor,
func(c cid.Cid) error {
if isUnitaryObject(c) {
return errStopWalk
}
visit, err := s.txnMarkSet.Visit(c)
if err != nil {
return xerrors.Errorf("error visiting object: %w", err)
}
if !visit {
return errStopWalk
}
atomic.AddInt32(count, 1)
return nil
},
func(missing cid.Cid) error {
log.Warnf("missing object reference %s in %s", missing, c)
return errStopWalk
})
}
// optimize the common case of single put
if len(cids) == 1 {
if err := walkObject(cids[0]); err != nil {
log.Errorf("error marking tipset refs: %s", err)
}
log.Debugw("marking live refs done", "took", time.Since(startMark), "marked", *count)
return
}
workch := make(chan cid.Cid, len(cids))
for _, c := range cids {
workch <- c
}
close(workch)
worker := func() error {
for c := range workch {
if err := walkObject(c); err != nil {
return err
}
}
return nil
}
workers := runtime.NumCPU() / 2
if workers < 2 {
workers = 2
}
if workers > len(cids) {
workers = len(cids)
}
g := new(errgroup.Group)
for i := 0; i < workers; i++ {
g.Go(worker)
}
if err := g.Wait(); err != nil {
log.Errorf("error marking tipset refs: %s", err)
}
log.Debugw("marking live refs done", "took", time.Since(startMark), "marked", *count)
}
// transactionally protect a view
func (s *SplitStore) protectView(c cid.Cid) {
s.txnLk.RLock()
// the txnLk is held for read
defer s.txnLk.RUnlock()
if s.txnActive {
@ -387,6 +493,12 @@ func (s *SplitStore) compact(curTs *types.TipSet) {
}
func (s *SplitStore) doCompact(curTs *types.TipSet) error {
if s.checkpointExists() {
// this really shouldn't happen, but if it somehow does, it means that the hotstore
// might be potentially inconsistent; abort compaction and notify the user to intervene.
return xerrors.Errorf("checkpoint exists; aborting compaction")
}
currentEpoch := curTs.Height()
boundaryEpoch := currentEpoch - CompactionBoundary
@ -398,7 +510,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
log.Infow("running compaction", "currentEpoch", currentEpoch, "baseEpoch", s.baseEpoch, "boundaryEpoch", boundaryEpoch, "inclMsgsEpoch", inclMsgsEpoch, "compactionIndex", s.compactionIndex)
markSet, err := s.markSetEnv.Create("live", s.markSetSize)
markSet, err := s.markSetEnv.New("live", s.markSetSize)
if err != nil {
return xerrors.Errorf("error creating mark set: %w", err)
}
@ -409,9 +521,6 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
return err
}
// we are ready for concurrent marking
s.beginTxnMarking(markSet)
// 0. track all protected references at beginning of compaction; anything added later should
// be transactionally protected by the write
log.Info("protecting references with registered protectors")
@ -425,7 +534,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
log.Info("marking reachable objects")
startMark := time.Now()
var count int64
count := new(int64)
err = s.walkChain(curTs, boundaryEpoch, inclMsgsEpoch, &noopVisitor{},
func(c cid.Cid) error {
if isUnitaryObject(c) {
@ -441,7 +550,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
return errStopWalk
}
count++
atomic.AddInt64(count, 1)
return nil
})
@ -449,9 +558,9 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
return xerrors.Errorf("error marking: %w", err)
}
s.markSetSize = count + count>>2 // overestimate a bit
s.markSetSize = *count + *count>>2 // overestimate a bit
log.Infow("marking done", "took", time.Since(startMark), "marked", count)
log.Infow("marking done", "took", time.Since(startMark), "marked", *count)
if err := s.checkClosing(); err != nil {
return err
@ -471,10 +580,15 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
log.Info("collecting cold objects")
startCollect := time.Now()
coldw, err := NewColdSetWriter(s.coldSetPath())
if err != nil {
return xerrors.Errorf("error creating coldset: %w", err)
}
defer coldw.Close() //nolint:errcheck
// some stats for logging
var hotCnt, coldCnt int
cold := make([]cid.Cid, 0, s.coldPurgeSize)
err = s.hot.ForEachKey(func(c cid.Cid) error {
// was it marked?
mark, err := markSet.Has(c)
@ -488,7 +602,9 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
}
// it's cold, mark it as candidate for move
cold = append(cold, c)
if err := coldw.Write(c); err != nil {
return xerrors.Errorf("error writing cid to coldstore: %w", err)
}
coldCnt++
return nil
@ -498,12 +614,12 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
return xerrors.Errorf("error collecting cold objects: %w", err)
}
log.Infow("cold collection done", "took", time.Since(startCollect))
if coldCnt > 0 {
s.coldPurgeSize = coldCnt + coldCnt>>2 // overestimate a bit
if err := coldw.Close(); err != nil {
return xerrors.Errorf("error closing coldset: %w", err)
}
log.Infow("cold collection done", "took", time.Since(startCollect))
log.Infow("compaction stats", "hot", hotCnt, "cold", coldCnt)
stats.Record(s.ctx, metrics.SplitstoreCompactionHot.M(int64(hotCnt)))
stats.Record(s.ctx, metrics.SplitstoreCompactionCold.M(int64(coldCnt)))
@ -521,11 +637,17 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
return err
}
coldr, err := NewColdSetReader(s.coldSetPath())
if err != nil {
return xerrors.Errorf("error opening coldset: %w", err)
}
defer coldr.Close() //nolint:errcheck
// 3. copy the cold objects to the coldstore -- if we have one
if !s.cfg.DiscardColdBlocks {
log.Info("moving cold objects to the coldstore")
startMove := time.Now()
err = s.moveColdBlocks(cold)
err = s.moveColdBlocks(coldr)
if err != nil {
return xerrors.Errorf("error moving cold objects: %w", err)
}
@ -534,41 +656,64 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
if err := s.checkClosing(); err != nil {
return err
}
if err := coldr.Reset(); err != nil {
return xerrors.Errorf("error resetting coldset: %w", err)
}
}
// 4. sort cold objects so that the dags with most references are deleted first
// this ensures that we can't refer to a dag with its consituents already deleted, ie
// we lave no dangling references.
log.Info("sorting cold objects")
startSort := time.Now()
err = s.sortObjects(cold)
if err != nil {
return xerrors.Errorf("error sorting objects: %w", err)
}
log.Infow("sorting done", "took", time.Since(startSort))
// 4.1 protect transactional refs once more
// strictly speaking, this is not necessary as purge will do it before deleting each
// batch. however, there is likely a largish number of references accumulated during
// ths sort and this protects before entering pruge context.
err = s.protectTxnRefs(markSet)
if err != nil {
return xerrors.Errorf("error protecting transactional refs: %w", err)
// 4. Purge cold objects with checkpointing for recovery.
// This is the critical section of compaction, whereby any cold object not in the markSet is
// considered already deleted.
// We delete cold objects in batches, holding the transaction lock, where we check the markSet
// again for new references created by the VM.
// After each batch, we write a checkpoint to disk; if the process is interrupted before completion,
// the process will continue from the checkpoint in the next recovery.
if err := s.beginCriticalSection(markSet); err != nil {
return xerrors.Errorf("error beginning critical section: %w", err)
}
if err := s.checkClosing(); err != nil {
return err
}
// wait for the head to catch up so that the current tipset is marked
s.waitForSync()
if err := s.checkClosing(); err != nil {
return err
}
checkpoint, err := NewCheckpoint(s.checkpointPath())
if err != nil {
return xerrors.Errorf("error creating checkpoint: %w", err)
}
defer checkpoint.Close() //nolint:errcheck
// 5. purge cold objects from the hotstore, taking protected references into account
log.Info("purging cold objects from the hotstore")
startPurge := time.Now()
err = s.purge(cold, markSet)
err = s.purge(coldr, checkpoint, markSet)
if err != nil {
return xerrors.Errorf("error purging cold blocks: %w", err)
return xerrors.Errorf("error purging cold objects: %w", err)
}
log.Infow("purging cold objects from hotstore done", "took", time.Since(startPurge))
s.endCriticalSection()
if err := checkpoint.Close(); err != nil {
log.Warnf("error closing checkpoint: %s", err)
}
if err := os.Remove(s.checkpointPath()); err != nil {
log.Warnf("error removing checkpoint: %s", err)
}
if err := coldr.Close(); err != nil {
log.Warnf("error closing coldset: %s", err)
}
if err := os.Remove(s.coldSetPath()); err != nil {
log.Warnf("error removing coldset: %s", err)
}
// we are done; do some housekeeping
s.endTxnProtect()
s.gcHotstore()
@ -599,12 +744,51 @@ func (s *SplitStore) beginTxnProtect() {
defer s.txnLk.Unlock()
s.txnActive = true
s.txnSync = false
s.txnRefs = make(map[cid.Cid]struct{})
s.txnMissing = make(map[cid.Cid]struct{})
}
func (s *SplitStore) beginTxnMarking(markSet MarkSet) {
log.Info("beginning transactional marking")
func (s *SplitStore) beginCriticalSection(markSet MarkSet) error {
log.Info("beginning critical section")
// do that once first to get the bulk before the markset is in critical section
if err := s.protectTxnRefs(markSet); err != nil {
return xerrors.Errorf("error protecting transactional references: %w", err)
}
if err := markSet.BeginCriticalSection(); err != nil {
return xerrors.Errorf("error beginning critical section for markset: %w", err)
}
s.txnLk.Lock()
defer s.txnLk.Unlock()
s.txnMarkSet = markSet
// and do it again while holding the lock to mark references that might have been created
// in the meantime and avoid races of the type Has->txnRef->enterCS->Get fails because
// it's not in the markset
if err := s.protectTxnRefs(markSet); err != nil {
return xerrors.Errorf("error protecting transactional references: %w", err)
}
return nil
}
func (s *SplitStore) waitForSync() {
log.Info("waiting for sync")
startWait := time.Now()
defer func() {
log.Infow("waiting for sync done", "took", time.Since(startWait))
}()
s.txnSyncMx.Lock()
defer s.txnSyncMx.Unlock()
for !s.txnSync {
s.txnSyncCond.Wait()
}
}
func (s *SplitStore) endTxnProtect() {
@ -616,8 +800,20 @@ func (s *SplitStore) endTxnProtect() {
}
s.txnActive = false
s.txnSync = false
s.txnRefs = nil
s.txnMissing = nil
s.txnMarkSet = nil
}
func (s *SplitStore) endCriticalSection() {
log.Info("ending critical section")
s.txnLk.Lock()
defer s.txnLk.Unlock()
s.txnMarkSet.EndCriticalSection()
s.txnMarkSet = nil
}
func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEpoch,
@ -857,7 +1053,7 @@ func (s *SplitStore) walkObjectIncomplete(c cid.Cid, visitor ObjectVisitor, f, m
return nil
}
// internal version used by walk
// internal version used during compaction and related operations
func (s *SplitStore) view(c cid.Cid, cb func([]byte) error) error {
if isIdentiyCid(c) {
data, err := decodeIdentityCid(c)
@ -892,10 +1088,34 @@ func (s *SplitStore) has(c cid.Cid) (bool, error) {
return s.cold.Has(s.ctx, c)
}
func (s *SplitStore) moveColdBlocks(cold []cid.Cid) error {
func (s *SplitStore) get(c cid.Cid) (blocks.Block, error) {
blk, err := s.hot.Get(s.ctx, c)
switch err {
case nil:
return blk, nil
case bstore.ErrNotFound:
return s.cold.Get(s.ctx, c)
default:
return nil, err
}
}
func (s *SplitStore) getSize(c cid.Cid) (int, error) {
sz, err := s.hot.GetSize(s.ctx, c)
switch err {
case nil:
return sz, nil
case bstore.ErrNotFound:
return s.cold.GetSize(s.ctx, c)
default:
return 0, err
}
}
func (s *SplitStore) moveColdBlocks(coldr *ColdSetReader) error {
batch := make([]blocks.Block, 0, batchSize)
for _, c := range cold {
err := coldr.ForEach(func(c cid.Cid) error {
if err := s.checkClosing(); err != nil {
return err
}
@ -904,7 +1124,7 @@ func (s *SplitStore) moveColdBlocks(cold []cid.Cid) error {
if err != nil {
if err == bstore.ErrNotFound {
log.Warnf("hotstore missing block %s", c)
continue
return nil
}
return xerrors.Errorf("error retrieving block %s from hotstore: %w", c, err)
@ -918,6 +1138,12 @@ func (s *SplitStore) moveColdBlocks(cold []cid.Cid) error {
}
batch = batch[:0]
}
return nil
})
if err != nil {
return xerrors.Errorf("error iterating coldset: %w", err)
}
if len(batch) > 0 {
@ -930,177 +1156,202 @@ func (s *SplitStore) moveColdBlocks(cold []cid.Cid) error {
return nil
}
// sorts a slice of objects heaviest first -- it's a little expensive but worth the
// guarantee that we don't leave dangling references behind, e.g. if we die in the middle
// of a purge.
func (s *SplitStore) sortObjects(cids []cid.Cid) error {
// we cache the keys to avoid making a gazillion of strings
keys := make(map[cid.Cid]string)
key := func(c cid.Cid) string {
s, ok := keys[c]
if !ok {
s = string(c.Hash())
keys[c] = s
}
return s
}
// compute sorting weights as the cumulative number of DAG links
weights := make(map[string]int)
for _, c := range cids {
// this can take quite a while, so check for shutdown with every opportunity
if err := s.checkClosing(); err != nil {
return err
}
w := s.getObjectWeight(c, weights, key)
weights[key(c)] = w
}
// sort!
sort.Slice(cids, func(i, j int) bool {
wi := weights[key(cids[i])]
wj := weights[key(cids[j])]
if wi == wj {
return bytes.Compare(cids[i].Hash(), cids[j].Hash()) > 0
}
return wi > wj
})
return nil
}
func (s *SplitStore) getObjectWeight(c cid.Cid, weights map[string]int, key func(cid.Cid) string) int {
w, ok := weights[key(c)]
if ok {
return w
}
// we treat block headers specially to avoid walking the entire chain
var hdr types.BlockHeader
err := s.view(c, func(data []byte) error {
return hdr.UnmarshalCBOR(bytes.NewBuffer(data))
})
if err == nil {
w1 := s.getObjectWeight(hdr.ParentStateRoot, weights, key)
weights[key(hdr.ParentStateRoot)] = w1
w2 := s.getObjectWeight(hdr.Messages, weights, key)
weights[key(hdr.Messages)] = w2
return 1 + w1 + w2
}
var links []cid.Cid
err = s.view(c, func(data []byte) error {
return cbg.ScanForLinks(bytes.NewReader(data), func(c cid.Cid) {
links = append(links, c)
})
})
if err != nil {
return 1
}
w = 1
for _, c := range links {
// these are internal refs, so dags will be dags
if c.Prefix().Codec != cid.DagCBOR {
w++
continue
}
wc := s.getObjectWeight(c, weights, key)
weights[key(c)] = wc
w += wc
}
return w
}
func (s *SplitStore) purgeBatch(cids []cid.Cid, deleteBatch func([]cid.Cid) error) error {
if len(cids) == 0 {
return nil
}
// we don't delete one giant batch of millions of objects, but rather do smaller batches
// so that we don't stop the world for an extended period of time
done := false
for i := 0; !done; i++ {
start := i * batchSize
end := start + batchSize
if end >= len(cids) {
end = len(cids)
done = true
}
err := deleteBatch(cids[start:end])
if err != nil {
return xerrors.Errorf("error deleting batch: %w", err)
}
}
return nil
}
func (s *SplitStore) purge(cids []cid.Cid, markSet MarkSet) error {
func (s *SplitStore) purge(coldr *ColdSetReader, checkpoint *Checkpoint, markSet MarkSet) error {
batch := make([]cid.Cid, 0, batchSize)
deadCids := make([]cid.Cid, 0, batchSize)
var purgeCnt, liveCnt int
defer func() {
log.Infow("purged cold objects", "purged", purgeCnt, "live", liveCnt)
}()
return s.purgeBatch(cids,
func(cids []cid.Cid) error {
deadCids := deadCids[:0]
deleteBatch := func() error {
pc, lc, err := s.purgeBatch(batch, deadCids, checkpoint, markSet)
for {
if err := s.checkClosing(); err != nil {
return err
}
purgeCnt += pc
liveCnt += lc
batch = batch[:0]
s.txnLk.Lock()
if len(s.txnRefs) == 0 {
// keep the lock!
break
}
return err
}
// unlock and protect
s.txnLk.Unlock()
err := coldr.ForEach(func(c cid.Cid) error {
batch = append(batch, c)
if len(batch) == batchSize {
return deleteBatch()
}
err := s.protectTxnRefs(markSet)
if err != nil {
return xerrors.Errorf("error protecting transactional refs: %w", err)
}
return nil
})
if err != nil {
return err
}
if len(batch) > 0 {
return deleteBatch()
}
return nil
}
func (s *SplitStore) purgeBatch(batch, deadCids []cid.Cid, checkpoint *Checkpoint, markSet MarkSet) (purgeCnt int, liveCnt int, err error) {
if err := s.checkClosing(); err != nil {
return 0, 0, err
}
s.txnLk.Lock()
defer s.txnLk.Unlock()
for _, c := range batch {
has, err := markSet.Has(c)
if err != nil {
return 0, 0, xerrors.Errorf("error checking markset for liveness: %w", err)
}
if has {
liveCnt++
continue
}
deadCids = append(deadCids, c)
}
if len(deadCids) == 0 {
if err := checkpoint.Set(batch[len(batch)-1]); err != nil {
return 0, 0, xerrors.Errorf("error setting checkpoint: %w", err)
}
return 0, liveCnt, nil
}
if err := s.hot.DeleteMany(s.ctx, deadCids); err != nil {
return 0, liveCnt, xerrors.Errorf("error purging cold objects: %w", err)
}
s.debug.LogDelete(deadCids)
purgeCnt = len(deadCids)
if err := checkpoint.Set(batch[len(batch)-1]); err != nil {
return purgeCnt, liveCnt, xerrors.Errorf("error setting checkpoint: %w", err)
}
return purgeCnt, liveCnt, nil
}
func (s *SplitStore) coldSetPath() string {
return filepath.Join(s.path, "coldset")
}
func (s *SplitStore) checkpointPath() string {
return filepath.Join(s.path, "checkpoint")
}
func (s *SplitStore) checkpointExists() bool {
_, err := os.Stat(s.checkpointPath())
return err == nil
}
func (s *SplitStore) completeCompaction() error {
checkpoint, last, err := OpenCheckpoint(s.checkpointPath())
if err != nil {
return xerrors.Errorf("error opening checkpoint: %w", err)
}
defer checkpoint.Close() //nolint:errcheck
coldr, err := NewColdSetReader(s.coldSetPath())
if err != nil {
return xerrors.Errorf("error opening coldset: %w", err)
}
defer coldr.Close() //nolint:errcheck
markSet, err := s.markSetEnv.Recover("live")
if err != nil {
return xerrors.Errorf("error recovering markset: %w", err)
}
defer markSet.Close() //nolint:errcheck
// PURGE
log.Info("purging cold objects from the hotstore")
startPurge := time.Now()
err = s.completePurge(coldr, checkpoint, last, markSet)
if err != nil {
return xerrors.Errorf("error purging cold objects: %w", err)
}
log.Infow("purging cold objects from hotstore done", "took", time.Since(startPurge))
markSet.EndCriticalSection()
if err := checkpoint.Close(); err != nil {
log.Warnf("error closing checkpoint: %s", err)
}
if err := os.Remove(s.checkpointPath()); err != nil {
log.Warnf("error removing checkpoint: %s", err)
}
if err := coldr.Close(); err != nil {
log.Warnf("error closing coldset: %s", err)
}
if err := os.Remove(s.coldSetPath()); err != nil {
log.Warnf("error removing coldset: %s", err)
}
// Note: at this point we can start the splitstore; a compaction should run on
// the first head change, which will trigger gc on the hotstore.
// We don't mind the second (back-to-back) compaction as the head will
// have advanced during marking and coldset accumulation.
return nil
}
func (s *SplitStore) completePurge(coldr *ColdSetReader, checkpoint *Checkpoint, start cid.Cid, markSet MarkSet) error {
if !start.Defined() {
return s.purge(coldr, checkpoint, markSet)
}
seeking := true
batch := make([]cid.Cid, 0, batchSize)
deadCids := make([]cid.Cid, 0, batchSize)
var purgeCnt, liveCnt int
defer func() {
log.Infow("purged cold objects", "purged", purgeCnt, "live", liveCnt)
}()
deleteBatch := func() error {
pc, lc, err := s.purgeBatch(batch, deadCids, checkpoint, markSet)
purgeCnt += pc
liveCnt += lc
batch = batch[:0]
return err
}
err := coldr.ForEach(func(c cid.Cid) error {
if seeking {
if start.Equals(c) {
seeking = false
}
defer s.txnLk.Unlock()
for _, c := range cids {
live, err := markSet.Has(c)
if err != nil {
return xerrors.Errorf("error checking for liveness: %w", err)
}
if live {
liveCnt++
continue
}
deadCids = append(deadCids, c)
}
err := s.hot.DeleteMany(s.ctx, deadCids)
if err != nil {
return xerrors.Errorf("error purging cold objects: %w", err)
}
s.debug.LogDelete(deadCids)
purgeCnt += len(deadCids)
return nil
})
}
batch = append(batch, c)
if len(batch) == batchSize {
return deleteBatch()
}
return nil
})
if err != nil {
return err
}
if len(batch) > 0 {
return deleteBatch()
}
return nil
}
// I really don't like having this code, but we seem to have some occasional DAG references with

View File

@ -4,6 +4,8 @@ import (
"context"
"errors"
"fmt"
"io/ioutil"
"os"
"sync"
"sync/atomic"
"testing"
@ -20,12 +22,14 @@ import (
datastore "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
logging "github.com/ipfs/go-log/v2"
mh "github.com/multiformats/go-multihash"
)
func init() {
CompactionThreshold = 5
CompactionBoundary = 2
WarmupBoundary = 0
SyncWaitTime = time.Millisecond
logging.SetLogLevel("splitstore", "DEBUG")
}
@ -80,8 +84,17 @@ func testSplitStore(t *testing.T, cfg *Config) {
t.Fatal(err)
}
path, err := ioutil.TempDir("", "splitstore.*")
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
_ = os.RemoveAll(path)
})
// open the splitstore
ss, err := Open("", ds, hot, cold, cfg)
ss, err := Open(path, ds, hot, cold, cfg)
if err != nil {
t.Fatal(err)
}
@ -125,6 +138,10 @@ func testSplitStore(t *testing.T, cfg *Config) {
}
waitForCompaction := func() {
ss.txnSyncMx.Lock()
ss.txnSync = true
ss.txnSyncCond.Broadcast()
ss.txnSyncMx.Unlock()
for atomic.LoadInt32(&ss.compacting) == 1 {
time.Sleep(100 * time.Millisecond)
}
@ -259,8 +276,17 @@ func TestSplitStoreSuppressCompactionNearUpgrade(t *testing.T) {
t.Fatal(err)
}
path, err := ioutil.TempDir("", "splitstore.*")
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
_ = os.RemoveAll(path)
})
// open the splitstore
ss, err := Open("", ds, hot, cold, &Config{MarkSetType: "map"})
ss, err := Open(path, ds, hot, cold, &Config{MarkSetType: "map"})
if err != nil {
t.Fatal(err)
}
@ -305,6 +331,10 @@ func TestSplitStoreSuppressCompactionNearUpgrade(t *testing.T) {
}
waitForCompaction := func() {
ss.txnSyncMx.Lock()
ss.txnSync = true
ss.txnSyncCond.Broadcast()
ss.txnSyncMx.Unlock()
for atomic.LoadInt32(&ss.compacting) == 1 {
time.Sleep(100 * time.Millisecond)
}
@ -426,17 +456,25 @@ func (c *mockChain) SubscribeHeadChanges(change func(revert []*types.TipSet, app
type mockStore struct {
mx sync.Mutex
set map[cid.Cid]blocks.Block
set map[string]blocks.Block
}
func newMockStore() *mockStore {
return &mockStore{set: make(map[cid.Cid]blocks.Block)}
return &mockStore{set: make(map[string]blocks.Block)}
}
func (b *mockStore) keyOf(c cid.Cid) string {
return string(c.Hash())
}
func (b *mockStore) cidOf(k string) cid.Cid {
return cid.NewCidV1(cid.Raw, mh.Multihash([]byte(k)))
}
func (b *mockStore) Has(_ context.Context, cid cid.Cid) (bool, error) {
b.mx.Lock()
defer b.mx.Unlock()
_, ok := b.set[cid]
_, ok := b.set[b.keyOf(cid)]
return ok, nil
}
@ -446,7 +484,7 @@ func (b *mockStore) Get(_ context.Context, cid cid.Cid) (blocks.Block, error) {
b.mx.Lock()
defer b.mx.Unlock()
blk, ok := b.set[cid]
blk, ok := b.set[b.keyOf(cid)]
if !ok {
return nil, blockstore.ErrNotFound
}
@ -474,7 +512,7 @@ func (b *mockStore) Put(_ context.Context, blk blocks.Block) error {
b.mx.Lock()
defer b.mx.Unlock()
b.set[blk.Cid()] = blk
b.set[b.keyOf(blk.Cid())] = blk
return nil
}
@ -483,7 +521,7 @@ func (b *mockStore) PutMany(_ context.Context, blks []blocks.Block) error {
defer b.mx.Unlock()
for _, blk := range blks {
b.set[blk.Cid()] = blk
b.set[b.keyOf(blk.Cid())] = blk
}
return nil
}
@ -492,7 +530,7 @@ func (b *mockStore) DeleteBlock(_ context.Context, cid cid.Cid) error {
b.mx.Lock()
defer b.mx.Unlock()
delete(b.set, cid)
delete(b.set, b.keyOf(cid))
return nil
}
@ -501,7 +539,7 @@ func (b *mockStore) DeleteMany(_ context.Context, cids []cid.Cid) error {
defer b.mx.Unlock()
for _, c := range cids {
delete(b.set, c)
delete(b.set, b.keyOf(c))
}
return nil
}
@ -515,7 +553,7 @@ func (b *mockStore) ForEachKey(f func(cid.Cid) error) error {
defer b.mx.Unlock()
for c := range b.set {
err := f(c)
err := f(b.cidOf(c))
if err != nil {
return err
}

View File

@ -62,7 +62,7 @@ func (s *SplitStore) doWarmup(curTs *types.TipSet) error {
xcount := new(int64)
missing := new(int64)
visitor, err := s.markSetEnv.Create("warmup", 0)
visitor, err := s.markSetEnv.New("warmup", 0)
if err != nil {
return xerrors.Errorf("error creating visitor: %w", err)
}

View File

@ -1,2 +1,2 @@
/dns4/bootstrap-0.butterfly.fildev.network/tcp/1347/p2p/12D3KooWBdRCBLUeKvoy22u5DcXs61adFn31v8WWCZgmBjDCjbsC
/dns4/bootstrap-1.butterfly.fildev.network/tcp/1347/p2p/12D3KooWDUQJBA18njjXnG9RtLxoN3muvdU7PEy55QorUEsdAqdy
/dns4/bootstrap-0.butterfly.fildev.network/tcp/1347/p2p/12D3KooWFHDtFx7CVTy4xoCDutVo1cScvSnQjDeaM8UzwVS1qwkh
/dns4/bootstrap-1.butterfly.fildev.network/tcp/1347/p2p/12D3KooWKt8cwpkiumkT8x32c3YFxsPRwhV5J8hCYPn9mhUmcAXt

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -54,8 +54,8 @@ const UpgradeHyperdriveHeight = 420
const UpgradeChocolateHeight = 312746
// 2022-02-08T19:23:00Z
const UpgradeOhSnapHeight = 676246
// 2022-02-10T19:23:00Z
const UpgradeOhSnapHeight = 682006
func init() {
policy.SetConsensusMinerMinPower(abi.NewStoragePower(32 << 30))

View File

@ -142,7 +142,7 @@ func (db *DrandBeacon) Entry(ctx context.Context, round uint64) <-chan beacon.Re
go func() {
start := build.Clock.Now()
log.Infow("start fetching randomness", "round", round)
log.Debugw("start fetching randomness", "round", round)
resp, err := db.client.Get(ctx, round)
var br beacon.Response
@ -152,7 +152,7 @@ func (db *DrandBeacon) Entry(ctx context.Context, round uint64) <-chan beacon.Re
br.Entry.Round = resp.Round()
br.Entry.Data = resp.Signature()
}
log.Infow("done fetching randomness", "round", round, "took", build.Clock.Since(start))
log.Debugw("done fetching randomness", "round", round, "took", build.Clock.Since(start))
out <- br
close(out)
}()

View File

@ -466,6 +466,7 @@ var stateOrder = map[sealing.SectorState]stateMeta{}
var stateList = []stateMeta{
{col: 39, state: "Total"},
{col: color.FgGreen, state: sealing.Proving},
{col: color.FgGreen, state: sealing.UpdateActivating},
{col: color.FgBlue, state: sealing.Empty},
{col: color.FgBlue, state: sealing.WaitDeals},
@ -496,6 +497,7 @@ var stateList = []stateMeta{
{col: color.FgYellow, state: sealing.SubmitReplicaUpdate},
{col: color.FgYellow, state: sealing.ReplicaUpdateWait},
{col: color.FgYellow, state: sealing.FinalizeReplicaUpdate},
{col: color.FgYellow, state: sealing.ReleaseSectorKey},
{col: color.FgCyan, state: sealing.Terminating},
{col: color.FgCyan, state: sealing.TerminateWait},
@ -524,6 +526,7 @@ var stateList = []stateMeta{
{col: color.FgRed, state: sealing.SnapDealsAddPieceFailed},
{col: color.FgRed, state: sealing.SnapDealsDealsExpired},
{col: color.FgRed, state: sealing.ReplicaUpdateFailed},
{col: color.FgRed, state: sealing.ReleaseSectorKeyFailed},
}
func init() {

View File

@ -55,6 +55,7 @@ var sectorsCmd = &cli.Command{
sectorsTerminateCmd,
sectorsRemoveCmd,
sectorsSnapUpCmd,
sectorsSnapAbortCmd,
sectorsMarkForUpgradeCmd,
sectorsStartSealCmd,
sectorsSealDelayCmd,
@ -1520,6 +1521,31 @@ var sectorsSnapUpCmd = &cli.Command{
},
}
var sectorsSnapAbortCmd = &cli.Command{
Name: "abort-upgrade",
Usage: "Abort the attempted (SnapDeals) upgrade of a CC sector, reverting it to as before",
ArgsUsage: "<sectorNum>",
Action: func(cctx *cli.Context) error {
if cctx.Args().Len() != 1 {
return lcli.ShowHelp(cctx, xerrors.Errorf("must pass sector number"))
}
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
id, err := strconv.ParseUint(cctx.Args().Get(0), 10, 64)
if err != nil {
return xerrors.Errorf("could not parse sector number: %w", err)
}
return nodeApi.SectorAbortUpgrade(ctx, abi.SectorNumber(id))
},
}
var sectorsMarkForUpgradeCmd = &cli.Command{
Name: "mark-for-upgrade",
Usage: "Mark a committed capacity sector for replacement by a sector with deals",

View File

@ -261,7 +261,7 @@ var runCmd = &cli.Command{
var taskTypes []sealtasks.TaskType
taskTypes = append(taskTypes, sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTProveReplicaUpdate1, sealtasks.TTFinalize)
taskTypes = append(taskTypes, sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTProveReplicaUpdate1, sealtasks.TTFinalize, sealtasks.TTFinalizeReplicaUpdate)
if cctx.Bool("addpiece") {
taskTypes = append(taskTypes, sealtasks.TTAddPiece)

View File

@ -4,10 +4,12 @@ import (
"encoding/json"
corebig "math/big"
"os"
"strconv"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
filbig "github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/lotus/chain/types"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/ipfs/go-cid"
"github.com/urfave/cli/v2"
@ -22,29 +24,50 @@ type networkTotalsOutput struct {
Payload networkTotals `json:"payload"`
}
type networkTotals struct {
QaNetworkPower filbig.Int `json:"total_qa_power"`
RawNetworkPower filbig.Int `json:"total_raw_capacity"`
CapacityCarryingData float64 `json:"capacity_fraction_carrying_data"`
UniqueCids int `json:"total_unique_cids"`
UniqueProviders int `json:"total_unique_providers"`
UniqueClients int `json:"total_unique_clients"`
type providerMeta struct {
nonidentifiable bool
}
// force formatting as decimal to aid human readers
type humanFloat float64
func (f humanFloat) MarshalJSON() ([]byte, error) {
// 'f' uses decimal digits without exponents.
// The bit size of 32 ensures we don't use too many decimal places.
return []byte(strconv.FormatFloat(float64(f), 'f', -1, 32)), nil
}
type Totals struct {
TotalDeals int `json:"total_num_deals"`
TotalBytes int64 `json:"total_stored_data_size"`
FilplusTotalDeals int `json:"filplus_total_num_deals"`
FilplusTotalBytes int64 `json:"filplus_total_stored_data_size"`
PrivateTotalDeals int `json:"private_total_num_deals"`
PrivateTotalBytes int64 `json:"private_total_stored_data_size"`
CapacityCarryingData humanFloat `json:"capacity_fraction_carrying_data"`
}
seenClient map[address.Address]bool
seenProvider map[address.Address]bool
seenPieceCid map[cid.Cid]bool
type networkTotals struct {
QaNetworkPower filbig.Int `json:"total_qa_power"`
RawNetworkPower filbig.Int `json:"total_raw_capacity"`
UniqueCids int `json:"total_unique_cids"`
UniqueBytes int64 `json:"total_unique_data_size"`
UniqueClients int `json:"total_unique_clients"`
UniqueProviders int `json:"total_unique_providers"`
UniquePrivateProviders int `json:"total_unique_private_providers"`
Totals
FilPlus Totals `json:"filecoin_plus_subset"`
pieces map[cid.Cid]struct{}
clients map[address.Address]struct{}
providers map[address.Address]providerMeta
}
var storageStatsCmd = &cli.Command{
Name: "storage-stats",
Usage: "Translates current lotus state into a json summary suitable for driving https://storage.filecoin.io/",
Flags: []cli.Flag{
&cli.Int64Flag{
Name: "height",
&cli.StringFlag{
Name: "tipset",
Usage: "Comma separated array of cids, or @height",
},
},
Action: func(cctx *cli.Context) error {
@ -56,22 +79,24 @@ var storageStatsCmd = &cli.Command{
}
defer apiCloser()
head, err := api.ChainHead(ctx)
if err != nil {
return err
}
requestedHeight := cctx.Int64("height")
if requestedHeight > 0 {
head, err = api.ChainGetTipSetByHeight(ctx, abi.ChainEpoch(requestedHeight), head.Key())
var ts *types.TipSet
if cctx.String("tipset") == "" {
ts, err = api.ChainHead(ctx)
if err != nil {
return err
}
ts, err = api.ChainGetTipSetByHeight(ctx, ts.Height()-defaultEpochLookback, ts.Key())
if err != nil {
return err
}
} else {
head, err = api.ChainGetTipSetByHeight(ctx, head.Height()-defaultEpochLookback, head.Key())
}
if err != nil {
return err
ts, err = lcli.ParseTipSetRef(ctx, api, cctx.String("tipset"))
if err != nil {
return err
}
}
power, err := api.StateMinerPower(ctx, address.Address{}, head.Key())
power, err := api.StateMinerPower(ctx, address.Address{}, ts.Key())
if err != nil {
return err
}
@ -79,12 +104,12 @@ var storageStatsCmd = &cli.Command{
netTotals := networkTotals{
QaNetworkPower: power.TotalPower.QualityAdjPower,
RawNetworkPower: power.TotalPower.RawBytePower,
seenClient: make(map[address.Address]bool),
seenProvider: make(map[address.Address]bool),
seenPieceCid: make(map[cid.Cid]bool),
pieces: make(map[cid.Cid]struct{}),
clients: make(map[address.Address]struct{}),
providers: make(map[address.Address]providerMeta),
}
deals, err := api.StateMarketDeals(ctx, head.Key())
deals, err := api.StateMarketDeals(ctx, ts.Key())
if err != nil {
return err
}
@ -94,35 +119,76 @@ var storageStatsCmd = &cli.Command{
// Only count deals that have properly started, not past/future ones
// https://github.com/filecoin-project/specs-actors/blob/v0.9.9/actors/builtin/market/deal.go#L81-L85
// Bail on 0 as well in case SectorStartEpoch is uninitialized due to some bug
//
// Additionally if the SlashEpoch is set this means the underlying sector is
// terminated for whatever reason ( not just slashed ), and the deal record
// will soon be removed from the state entirely
if dealInfo.State.SectorStartEpoch <= 0 ||
dealInfo.State.SectorStartEpoch > head.Height() {
dealInfo.State.SectorStartEpoch > ts.Height() ||
dealInfo.State.SlashEpoch > -1 {
continue
}
netTotals.seenClient[dealInfo.Proposal.Client] = true
netTotals.clients[dealInfo.Proposal.Client] = struct{}{}
if _, seen := netTotals.providers[dealInfo.Proposal.Provider]; !seen {
pm := providerMeta{}
mi, err := api.StateMinerInfo(ctx, dealInfo.Proposal.Provider, ts.Key())
if err != nil {
return err
}
if mi.PeerId == nil || *mi.PeerId == "" {
log.Infof("private provider %s", dealInfo.Proposal.Provider)
pm.nonidentifiable = true
netTotals.UniquePrivateProviders++
}
netTotals.providers[dealInfo.Proposal.Provider] = pm
netTotals.UniqueProviders++
}
if _, seen := netTotals.pieces[dealInfo.Proposal.PieceCID]; !seen {
netTotals.pieces[dealInfo.Proposal.PieceCID] = struct{}{}
netTotals.UniqueBytes += int64(dealInfo.Proposal.PieceSize)
netTotals.UniqueCids++
}
netTotals.TotalBytes += int64(dealInfo.Proposal.PieceSize)
netTotals.seenProvider[dealInfo.Proposal.Provider] = true
netTotals.seenPieceCid[dealInfo.Proposal.PieceCID] = true
netTotals.TotalDeals++
if netTotals.providers[dealInfo.Proposal.Provider].nonidentifiable {
netTotals.PrivateTotalBytes += int64(dealInfo.Proposal.PieceSize)
netTotals.PrivateTotalDeals++
}
if dealInfo.Proposal.VerifiedDeal {
netTotals.FilplusTotalDeals++
netTotals.FilplusTotalBytes += int64(dealInfo.Proposal.PieceSize)
netTotals.FilPlus.TotalBytes += int64(dealInfo.Proposal.PieceSize)
netTotals.FilPlus.TotalDeals++
if netTotals.providers[dealInfo.Proposal.Provider].nonidentifiable {
netTotals.FilPlus.PrivateTotalBytes += int64(dealInfo.Proposal.PieceSize)
netTotals.FilPlus.PrivateTotalDeals++
}
}
}
netTotals.UniqueCids = len(netTotals.seenPieceCid)
netTotals.UniqueClients = len(netTotals.seenClient)
netTotals.UniqueProviders = len(netTotals.seenProvider)
netTotals.UniqueClients = len(netTotals.clients)
netTotals.CapacityCarryingData, _ = new(corebig.Rat).SetFrac(
ccd, _ := new(corebig.Rat).SetFrac(
corebig.NewInt(netTotals.TotalBytes),
netTotals.RawNetworkPower.Int,
).Float64()
netTotals.CapacityCarryingData = humanFloat(ccd)
ccdfp, _ := new(corebig.Rat).SetFrac(
corebig.NewInt(netTotals.FilPlus.TotalBytes),
netTotals.RawNetworkPower.Int,
).Float64()
netTotals.FilPlus.CapacityCarryingData = humanFloat(ccdfp)
return json.NewEncoder(os.Stdout).Encode(
networkTotalsOutput{
Epoch: int64(head.Height()),
Epoch: int64(ts.Height()),
Endpoint: "NETWORK_WIDE_TOTALS",
Payload: netTotals,
},

View File

@ -97,6 +97,7 @@
* [Return](#Return)
* [ReturnAddPiece](#ReturnAddPiece)
* [ReturnFetch](#ReturnFetch)
* [ReturnFinalizeReplicaUpdate](#ReturnFinalizeReplicaUpdate)
* [ReturnFinalizeSector](#ReturnFinalizeSector)
* [ReturnGenerateSectorKeyFromData](#ReturnGenerateSectorKeyFromData)
* [ReturnMoveStorage](#ReturnMoveStorage)
@ -116,6 +117,7 @@
* [SealingAbort](#SealingAbort)
* [SealingSchedDiag](#SealingSchedDiag)
* [Sector](#Sector)
* [SectorAbortUpgrade](#SectorAbortUpgrade)
* [SectorAddPieceToAny](#SectorAddPieceToAny)
* [SectorCommitFlush](#SectorCommitFlush)
* [SectorCommitPending](#SectorCommitPending)
@ -2054,6 +2056,30 @@ Response: `{}`
### ReturnFetch
Perms: admin
Inputs:
```json
[
{
"Sector": {
"Miner": 1000,
"Number": 9
},
"ID": "07070707-0707-0707-0707-070707070707"
},
{
"Code": 0,
"Message": "string value"
}
]
```
Response: `{}`
### ReturnFinalizeReplicaUpdate
Perms: admin
Inputs:
@ -2474,6 +2500,21 @@ Response: `{}`
## Sector
### SectorAbortUpgrade
SectorAbortUpgrade can be called on sectors that are in the process of being upgraded to abort it
Perms: admin
Inputs:
```json
[
9
]
```
Response: `{}`
### SectorAddPieceToAny
Add piece to an open sector. If no sectors with enough space are open,
either a new sector will be created, or this call will block until more

View File

@ -10,6 +10,7 @@
* [Add](#Add)
* [AddPiece](#AddPiece)
* [Finalize](#Finalize)
* [FinalizeReplicaUpdate](#FinalizeReplicaUpdate)
* [FinalizeSector](#FinalizeSector)
* [Generate](#Generate)
* [GenerateSectorKeyFromData](#GenerateSectorKeyFromData)
@ -1112,6 +1113,41 @@ Response:
## Finalize
### FinalizeReplicaUpdate
Perms: admin
Inputs:
```json
[
{
"ID": {
"Miner": 1000,
"Number": 9
},
"ProofType": 8
},
[
{
"Offset": 1024,
"Size": 1024
}
]
]
```
Response:
```json
{
"Sector": {
"Miner": 1000,
"Number": 9
},
"ID": "07070707-0707-0707-0707-070707070707"
}
```
### FinalizeSector

View File

@ -1580,6 +1580,7 @@ COMMANDS:
terminate Terminate sector on-chain then remove (WARNING: This means losing power and collateral for the removed sector)
remove Forcefully remove a sector (WARNING: This means losing power and collateral for the removed sector (use 'terminate' for lower penalty))
snap-up Mark a committed capacity sector to be filled with deals
abort-upgrade Abort the attempted (SnapDeals) upgrade of a CC sector, reverting it to as before
mark-for-upgrade Mark a committed capacity sector for replacement by a sector with deals
seal Manually start sealing a sector (filling any unused space with junk)
set-seal-delay Set the time, in minutes, that a new sector waits for deals before sealing starts
@ -1815,6 +1816,19 @@ OPTIONS:
```
### lotus-miner sectors abort-upgrade
```
NAME:
lotus-miner sectors abort-upgrade - Abort the attempted (SnapDeals) upgrade of a CC sector, reverting it to as before
USAGE:
lotus-miner sectors abort-upgrade [command options] <sectorNum>
OPTIONS:
--help, -h show help (default: false)
```
### lotus-miner sectors mark-for-upgrade
```
NAME:

View File

@ -163,11 +163,11 @@
#HotStoreType = "badger"
# MarkSetType specifies the type of the markset.
# It can be "map" (default) for in memory marking or "badger" for on-disk marking.
# It can be "map" for in memory marking or "badger" (default) for on-disk marking.
#
# type: string
# env var: LOTUS_CHAINSTORE_SPLITSTORE_MARKSETTYPE
#MarkSetType = "map"
#MarkSetType = "badger"
# HotStoreMessageRetention specifies the retention policy for messages, in finalities beyond
# the compaction boundary; default is 0.

2
extern/filecoin-ffi vendored

@ -1 +1 @@
Subproject commit c03fc07aadf76b464644c785f59292b93542c0ee
Subproject commit 5ec5d805c01ea85224f6448dd6c6fa0a2a73c028

View File

@ -55,6 +55,25 @@ func (m *Manager) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof,
return nil
}
// temporary hack to make the check work with snapdeals
// will go away in https://github.com/filecoin-project/lotus/pull/7971
if lp.Sealed == "" || lp.Cache == "" {
// maybe it's update
lockedUpdate, err := m.index.StorageTryLock(ctx, sector.ID, storiface.FTUpdate|storiface.FTUpdateCache, storiface.FTNone)
if err != nil {
return xerrors.Errorf("acquiring sector lock: %w", err)
}
if lockedUpdate {
lp, _, err = m.localStore.AcquireSector(ctx, sector, storiface.FTUpdate|storiface.FTUpdateCache, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove)
if err != nil {
log.Warnw("CheckProvable Sector FAULT: acquire sector in checkProvable", "sector", sector, "error", err)
bad[sector.ID] = fmt.Sprintf("acquire sector failed: %s", err)
return nil
}
lp.Sealed, lp.Cache = lp.Update, lp.UpdateCache
}
}
if lp.Sealed == "" || lp.Cache == "" {
log.Warnw("CheckProvable Sector FAULT: cache and/or sealed paths not found", "sector", sector, "sealed", lp.Sealed, "cache", lp.Cache)
bad[sector.ID] = fmt.Sprintf("cache and/or sealed paths not found, cache %q, sealed %q", lp.Cache, lp.Sealed)

View File

@ -769,7 +769,7 @@ func (sb *Sealer) ReleaseSealed(ctx context.Context, sector storage.SectorRef) e
return xerrors.Errorf("not supported at this layer")
}
func (sb *Sealer) FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) error {
func (sb *Sealer) freeUnsealed(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) error {
ssize, err := sector.ProofType.SectorSize()
if err != nil {
return err
@ -834,6 +834,19 @@ func (sb *Sealer) FinalizeSector(ctx context.Context, sector storage.SectorRef,
}
return nil
}
func (sb *Sealer) FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) error {
ssize, err := sector.ProofType.SectorSize()
if err != nil {
return err
}
if err := sb.freeUnsealed(ctx, sector, keepUnsealed); err != nil {
return err
}
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTCache, 0, storiface.PathStorage)
if err != nil {
return xerrors.Errorf("acquiring sector cache path: %w", err)
@ -843,6 +856,43 @@ func (sb *Sealer) FinalizeSector(ctx context.Context, sector storage.SectorRef,
return ffi.ClearCache(uint64(ssize), paths.Cache)
}
func (sb *Sealer) FinalizeReplicaUpdate(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) error {
ssize, err := sector.ProofType.SectorSize()
if err != nil {
return err
}
if err := sb.freeUnsealed(ctx, sector, keepUnsealed); err != nil {
return err
}
{
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTCache, 0, storiface.PathStorage)
if err != nil {
return xerrors.Errorf("acquiring sector cache path: %w", err)
}
defer done()
if err := ffi.ClearCache(uint64(ssize), paths.Cache); err != nil {
return xerrors.Errorf("clear cache: %w", err)
}
}
{
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUpdateCache, 0, storiface.PathStorage)
if err != nil {
return xerrors.Errorf("acquiring sector cache path: %w", err)
}
defer done()
if err := ffi.ClearCache(uint64(ssize), paths.UpdateCache); err != nil {
return xerrors.Errorf("clear cache: %w", err)
}
}
return nil
}
func (sb *Sealer) ReleaseUnsealed(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) error {
// This call is meant to mark storage as 'freeable'. Given that unsealing is
// very expensive, we don't remove data as soon as we can - instead we only

View File

@ -146,7 +146,7 @@ func New(ctx context.Context, lstor *stores.Local, stor *stores.Remote, ls store
go m.sched.runSched()
localTasks := []sealtasks.TaskType{
sealtasks.TTCommit1, sealtasks.TTProveReplicaUpdate1, sealtasks.TTFinalize, sealtasks.TTFetch,
sealtasks.TTCommit1, sealtasks.TTProveReplicaUpdate1, sealtasks.TTFinalize, sealtasks.TTFetch, sealtasks.TTFinalizeReplicaUpdate,
}
if sc.AllowAddPiece {
localTasks = append(localTasks, sealtasks.TTAddPiece)
@ -577,6 +577,74 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef,
return nil
}
func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
if err := m.index.StorageLock(ctx, sector.ID, storiface.FTNone, storiface.FTSealed|storiface.FTUnsealed|storiface.FTCache|storiface.FTUpdate|storiface.FTUpdateCache); err != nil {
return xerrors.Errorf("acquiring sector lock: %w", err)
}
fts := storiface.FTUnsealed
{
unsealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUnsealed, 0, false)
if err != nil {
return xerrors.Errorf("finding unsealed sector: %w", err)
}
if len(unsealedStores) == 0 { // Is some edge-cases unsealed sector may not exist already, that's fine
fts = storiface.FTNone
}
}
pathType := storiface.PathStorage
{
sealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUpdate, 0, false)
if err != nil {
return xerrors.Errorf("finding sealed sector: %w", err)
}
for _, store := range sealedStores {
if store.CanSeal {
pathType = storiface.PathSealing
break
}
}
}
selector := newExistingSelector(m.index, sector.ID, storiface.FTCache|storiface.FTSealed|storiface.FTUpdate|storiface.FTUpdateCache, false)
err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalizeReplicaUpdate, selector,
m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|storiface.FTUpdate|storiface.FTUpdateCache|fts, pathType, storiface.AcquireMove),
func(ctx context.Context, w Worker) error {
_, err := m.waitSimpleCall(ctx)(w.FinalizeReplicaUpdate(ctx, sector, keepUnsealed))
return err
})
if err != nil {
return err
}
fetchSel := newAllocSelector(m.index, storiface.FTCache|storiface.FTSealed|storiface.FTUpdate|storiface.FTUpdateCache, storiface.PathStorage)
moveUnsealed := fts
{
if len(keepUnsealed) == 0 {
moveUnsealed = storiface.FTNone
}
}
err = m.sched.Schedule(ctx, sector, sealtasks.TTFetch, fetchSel,
m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|storiface.FTUpdate|storiface.FTUpdateCache|moveUnsealed, storiface.PathStorage, storiface.AcquireMove),
func(ctx context.Context, w Worker) error {
_, err := m.waitSimpleCall(ctx)(w.MoveStorage(ctx, sector, storiface.FTCache|storiface.FTSealed|storiface.FTUpdate|storiface.FTUpdateCache|moveUnsealed))
return err
})
if err != nil {
return xerrors.Errorf("moving sector to storage: %w", err)
}
return nil
}
func (m *Manager) ReleaseUnsealed(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) error {
return nil
}
@ -875,6 +943,10 @@ func (m *Manager) ReturnProveReplicaUpdate2(ctx context.Context, callID storifac
return m.returnResult(ctx, callID, proof, err)
}
func (m *Manager) ReturnFinalizeReplicaUpdate(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
return m.returnResult(ctx, callID, nil, err)
}
func (m *Manager) ReturnGenerateSectorKeyFromData(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
return m.returnResult(ctx, callID, nil, err)
}

View File

@ -477,6 +477,10 @@ func (mgr *SectorMgr) FinalizeSector(context.Context, storage.SectorRef, []stora
return nil
}
func (mgr *SectorMgr) FinalizeReplicaUpdate(context.Context, storage.SectorRef, []storage.Range) error {
return nil
}
func (mgr *SectorMgr) ReleaseUnsealed(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) error {
return nil
}
@ -577,6 +581,10 @@ func (mgr *SectorMgr) ReturnGenerateSectorKeyFromData(ctx context.Context, callI
panic("not supported")
}
func (mgr *SectorMgr) ReturnFinalizeReplicaUpdate(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
panic("not supported")
}
func (m mockVerifProver) VerifySeal(svi proof.SealVerifyInfo) (bool, error) {
plen, err := svi.SealProof.ProofSize()
if err != nil {

View File

@ -119,6 +119,10 @@ func (s *schedTestWorker) GenerateSectorKeyFromData(ctx context.Context, sector
panic("implement me")
}
func (s *schedTestWorker) FinalizeReplicaUpdate(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) {
panic("implement me")
}
func (s *schedTestWorker) MoveStorage(ctx context.Context, sector storage.SectorRef, types storiface.SectorFileType) (storiface.CallID, error) {
panic("implement me")
}

View File

@ -14,10 +14,11 @@ const (
TTFetch TaskType = "seal/v0/fetch"
TTUnseal TaskType = "seal/v0/unseal"
TTReplicaUpdate TaskType = "seal/v0/replicaupdate"
TTProveReplicaUpdate1 TaskType = "seal/v0/provereplicaupdate/1"
TTProveReplicaUpdate2 TaskType = "seal/v0/provereplicaupdate/2"
TTRegenSectorKey TaskType = "seal/v0/regensectorkey"
TTReplicaUpdate TaskType = "seal/v0/replicaupdate"
TTProveReplicaUpdate1 TaskType = "seal/v0/provereplicaupdate/1"
TTProveReplicaUpdate2 TaskType = "seal/v0/provereplicaupdate/2"
TTRegenSectorKey TaskType = "seal/v0/regensectorkey"
TTFinalizeReplicaUpdate TaskType = "seal/v0/finalize/replicaupdate"
)
var order = map[TaskType]int{
@ -48,10 +49,11 @@ var shortNames = map[TaskType]string{
TTFetch: "GET",
TTUnseal: "UNS",
TTReplicaUpdate: "RU",
TTProveReplicaUpdate1: "PR1",
TTProveReplicaUpdate2: "PR2",
TTRegenSectorKey: "GSK",
TTReplicaUpdate: "RU",
TTProveReplicaUpdate1: "PR1",
TTProveReplicaUpdate2: "PR2",
TTRegenSectorKey: "GSK",
TTFinalizeReplicaUpdate: "FRU",
}
func (a TaskType) MuchLess(b TaskType) (bool, bool) {

View File

@ -120,6 +120,7 @@ type WorkerCalls interface {
SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (CallID, error)
SealCommit2(ctx context.Context, sector storage.SectorRef, c1o storage.Commit1Out) (CallID, error)
FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (CallID, error)
FinalizeReplicaUpdate(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (CallID, error)
ReleaseUnsealed(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) (CallID, error)
ReplicaUpdate(ctx context.Context, sector storage.SectorRef, pieces []abi.PieceInfo) (CallID, error)
ProveReplicaUpdate1(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid) (CallID, error)
@ -182,6 +183,7 @@ type WorkerReturn interface {
ReturnProveReplicaUpdate1(ctx context.Context, callID CallID, proofs storage.ReplicaVanillaProofs, err *CallError) error
ReturnProveReplicaUpdate2(ctx context.Context, callID CallID, proof storage.ReplicaUpdateProof, err *CallError) error
ReturnGenerateSectorKeyFromData(ctx context.Context, callID CallID, err *CallError) error
ReturnFinalizeReplicaUpdate(ctx context.Context, callID CallID, err *CallError) error
ReturnMoveStorage(ctx context.Context, callID CallID, err *CallError) error
ReturnUnsealPiece(ctx context.Context, callID CallID, err *CallError) error
ReturnReadPiece(ctx context.Context, callID CallID, ok bool, err *CallError) error

View File

@ -87,6 +87,10 @@ func (t *testExec) GenerateSectorKeyFromData(ctx context.Context, sector storage
panic("implement me")
}
func (t *testExec) FinalizeReplicaUpdate(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) error {
panic("implement me")
}
func (t *testExec) NewSector(ctx context.Context, sector storage.SectorRef) error {
panic("implement me")
}

View File

@ -162,20 +162,21 @@ func (l *LocalWorker) ffiExec() (ffiwrapper.Storage, error) {
type ReturnType string
const (
AddPiece ReturnType = "AddPiece"
SealPreCommit1 ReturnType = "SealPreCommit1"
SealPreCommit2 ReturnType = "SealPreCommit2"
SealCommit1 ReturnType = "SealCommit1"
SealCommit2 ReturnType = "SealCommit2"
FinalizeSector ReturnType = "FinalizeSector"
ReplicaUpdate ReturnType = "ReplicaUpdate"
ProveReplicaUpdate1 ReturnType = "ProveReplicaUpdate1"
ProveReplicaUpdate2 ReturnType = "ProveReplicaUpdate2"
GenerateSectorKey ReturnType = "GenerateSectorKey"
ReleaseUnsealed ReturnType = "ReleaseUnsealed"
MoveStorage ReturnType = "MoveStorage"
UnsealPiece ReturnType = "UnsealPiece"
Fetch ReturnType = "Fetch"
AddPiece ReturnType = "AddPiece"
SealPreCommit1 ReturnType = "SealPreCommit1"
SealPreCommit2 ReturnType = "SealPreCommit2"
SealCommit1 ReturnType = "SealCommit1"
SealCommit2 ReturnType = "SealCommit2"
FinalizeSector ReturnType = "FinalizeSector"
FinalizeReplicaUpdate ReturnType = "FinalizeReplicaUpdate"
ReplicaUpdate ReturnType = "ReplicaUpdate"
ProveReplicaUpdate1 ReturnType = "ProveReplicaUpdate1"
ProveReplicaUpdate2 ReturnType = "ProveReplicaUpdate2"
GenerateSectorKey ReturnType = "GenerateSectorKey"
ReleaseUnsealed ReturnType = "ReleaseUnsealed"
MoveStorage ReturnType = "MoveStorage"
UnsealPiece ReturnType = "UnsealPiece"
Fetch ReturnType = "Fetch"
)
// in: func(WorkerReturn, context.Context, CallID, err string)
@ -213,20 +214,21 @@ func rfunc(in interface{}) func(context.Context, storiface.CallID, storiface.Wor
}
var returnFunc = map[ReturnType]func(context.Context, storiface.CallID, storiface.WorkerReturn, interface{}, *storiface.CallError) error{
AddPiece: rfunc(storiface.WorkerReturn.ReturnAddPiece),
SealPreCommit1: rfunc(storiface.WorkerReturn.ReturnSealPreCommit1),
SealPreCommit2: rfunc(storiface.WorkerReturn.ReturnSealPreCommit2),
SealCommit1: rfunc(storiface.WorkerReturn.ReturnSealCommit1),
SealCommit2: rfunc(storiface.WorkerReturn.ReturnSealCommit2),
FinalizeSector: rfunc(storiface.WorkerReturn.ReturnFinalizeSector),
ReleaseUnsealed: rfunc(storiface.WorkerReturn.ReturnReleaseUnsealed),
ReplicaUpdate: rfunc(storiface.WorkerReturn.ReturnReplicaUpdate),
ProveReplicaUpdate1: rfunc(storiface.WorkerReturn.ReturnProveReplicaUpdate1),
ProveReplicaUpdate2: rfunc(storiface.WorkerReturn.ReturnProveReplicaUpdate2),
GenerateSectorKey: rfunc(storiface.WorkerReturn.ReturnGenerateSectorKeyFromData),
MoveStorage: rfunc(storiface.WorkerReturn.ReturnMoveStorage),
UnsealPiece: rfunc(storiface.WorkerReturn.ReturnUnsealPiece),
Fetch: rfunc(storiface.WorkerReturn.ReturnFetch),
AddPiece: rfunc(storiface.WorkerReturn.ReturnAddPiece),
SealPreCommit1: rfunc(storiface.WorkerReturn.ReturnSealPreCommit1),
SealPreCommit2: rfunc(storiface.WorkerReturn.ReturnSealPreCommit2),
SealCommit1: rfunc(storiface.WorkerReturn.ReturnSealCommit1),
SealCommit2: rfunc(storiface.WorkerReturn.ReturnSealCommit2),
FinalizeSector: rfunc(storiface.WorkerReturn.ReturnFinalizeSector),
ReleaseUnsealed: rfunc(storiface.WorkerReturn.ReturnReleaseUnsealed),
ReplicaUpdate: rfunc(storiface.WorkerReturn.ReturnReplicaUpdate),
ProveReplicaUpdate1: rfunc(storiface.WorkerReturn.ReturnProveReplicaUpdate1),
ProveReplicaUpdate2: rfunc(storiface.WorkerReturn.ReturnProveReplicaUpdate2),
GenerateSectorKey: rfunc(storiface.WorkerReturn.ReturnGenerateSectorKeyFromData),
FinalizeReplicaUpdate: rfunc(storiface.WorkerReturn.ReturnFinalizeReplicaUpdate),
MoveStorage: rfunc(storiface.WorkerReturn.ReturnMoveStorage),
UnsealPiece: rfunc(storiface.WorkerReturn.ReturnUnsealPiece),
Fetch: rfunc(storiface.WorkerReturn.ReturnFetch),
}
func (l *LocalWorker) asyncCall(ctx context.Context, sector storage.SectorRef, rt ReturnType, work func(ctx context.Context, ci storiface.CallID) (interface{}, error)) (storiface.CallID, error) {
@ -456,6 +458,27 @@ func (l *LocalWorker) FinalizeSector(ctx context.Context, sector storage.SectorR
})
}
func (l *LocalWorker) FinalizeReplicaUpdate(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) {
sb, err := l.executor()
if err != nil {
return storiface.UndefCall, err
}
return l.asyncCall(ctx, sector, FinalizeReplicaUpdate, func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
if err := sb.FinalizeReplicaUpdate(ctx, sector, keepUnsealed); err != nil {
return nil, xerrors.Errorf("finalizing sector: %w", err)
}
if len(keepUnsealed) == 0 {
if err := l.storage.Remove(ctx, sector.ID, storiface.FTUnsealed, true, nil); err != nil {
return nil, xerrors.Errorf("removing unsealed data: %w", err)
}
}
return nil, err
})
}
func (l *LocalWorker) ReleaseUnsealed(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) (storiface.CallID, error) {
return storiface.UndefCall, xerrors.Errorf("implement me")
}

View File

@ -215,4 +215,8 @@ func (t *trackedWorker) ProveReplicaUpdate2(ctx context.Context, sector storage.
})
}
func (t *trackedWorker) FinalizeReplicaUpdate(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) {
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTFinalizeReplicaUpdate, func() (storiface.CallID, error) { return t.Worker.FinalizeReplicaUpdate(ctx, sector, keepUnsealed) })
}
var _ Worker = &trackedWorker{}

View File

@ -20,6 +20,7 @@ import (
// We should implement some wait-for-api logic
type ErrApi struct{ error }
type ErrNoDeals struct{ error }
type ErrInvalidDeals struct{ error }
type ErrInvalidPiece struct{ error }
type ErrExpiredDeals struct{ error }
@ -38,12 +39,14 @@ type ErrCommitWaitFailed struct{ error }
type ErrBadRU struct{ error }
type ErrBadPR struct{ error }
func checkPieces(ctx context.Context, maddr address.Address, si SectorInfo, api SealingAPI) error {
func checkPieces(ctx context.Context, maddr address.Address, si SectorInfo, api SealingAPI, mustHaveDeals bool) error {
tok, height, err := api.ChainHead(ctx)
if err != nil {
return &ErrApi{xerrors.Errorf("getting chain head: %w", err)}
}
dealCount := 0
for i, p := range si.Pieces {
// if no deal is associated with the piece, ensure that we added it as
// filler (i.e. ensure that it has a zero PieceCID)
@ -55,6 +58,8 @@ func checkPieces(ctx context.Context, maddr address.Address, si SectorInfo, api
continue
}
dealCount++
proposal, err := api.StateMarketStorageDealProposal(ctx, p.DealInfo.DealID, tok)
if err != nil {
return &ErrInvalidDeals{xerrors.Errorf("getting deal %d for piece %d: %w", p.DealInfo.DealID, i, err)}
@ -77,13 +82,17 @@ func checkPieces(ctx context.Context, maddr address.Address, si SectorInfo, api
}
}
if mustHaveDeals && dealCount <= 0 {
return &ErrNoDeals{(xerrors.Errorf("sector %d must have deals, but does not", si.SectorNumber))}
}
return nil
}
// checkPrecommit checks that data commitment generated in the sealing process
// matches pieces, and that the seal ticket isn't expired
func checkPrecommit(ctx context.Context, maddr address.Address, si SectorInfo, tok TipSetToken, height abi.ChainEpoch, api SealingAPI) (err error) {
if err := checkPieces(ctx, maddr, si, api); err != nil {
if err := checkPieces(ctx, maddr, si, api, false); err != nil {
return err
}
@ -184,7 +193,7 @@ func (m *Sealing) checkCommit(ctx context.Context, si SectorInfo, proof []byte,
return &ErrInvalidProof{xerrors.New("invalid proof (compute error?)")}
}
if err := checkPieces(ctx, m.maddr, si, m.Api); err != nil {
if err := checkPieces(ctx, m.maddr, si, m.Api, false); err != nil {
return err
}
@ -194,7 +203,7 @@ func (m *Sealing) checkCommit(ctx context.Context, si SectorInfo, proof []byte,
// check that sector info is good after running a replica update
func checkReplicaUpdate(ctx context.Context, maddr address.Address, si SectorInfo, tok TipSetToken, api SealingAPI) error {
if err := checkPieces(ctx, maddr, si, api); err != nil {
if err := checkPieces(ctx, maddr, si, api, true); err != nil {
return err
}
if !si.CCUpdate {

View File

@ -137,27 +137,32 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
SnapDealsWaitDeals: planOne(
on(SectorAddPiece{}, SnapDealsAddPiece),
on(SectorStartPacking{}, SnapDealsPacking),
on(SectorAbortUpgrade{}, AbortUpgrade),
),
SnapDealsAddPiece: planOne(
on(SectorPieceAdded{}, SnapDealsWaitDeals),
apply(SectorStartPacking{}),
apply(SectorAddPiece{}),
on(SectorAddPieceFailed{}, SnapDealsAddPieceFailed),
on(SectorAbortUpgrade{}, AbortUpgrade),
),
SnapDealsPacking: planOne(
on(SectorPacked{}, UpdateReplica),
on(SectorAbortUpgrade{}, AbortUpgrade),
),
UpdateReplica: planOne(
on(SectorReplicaUpdate{}, ProveReplicaUpdate),
on(SectorUpdateReplicaFailed{}, ReplicaUpdateFailed),
on(SectorDealsExpired{}, SnapDealsDealsExpired),
on(SectorInvalidDealIDs{}, SnapDealsRecoverDealIDs),
on(SectorAbortUpgrade{}, AbortUpgrade),
),
ProveReplicaUpdate: planOne(
on(SectorProveReplicaUpdate{}, SubmitReplicaUpdate),
on(SectorProveReplicaUpdateFailed{}, ReplicaUpdateFailed),
on(SectorDealsExpired{}, SnapDealsDealsExpired),
on(SectorInvalidDealIDs{}, SnapDealsRecoverDealIDs),
on(SectorAbortUpgrade{}, AbortUpgrade),
),
SubmitReplicaUpdate: planOne(
on(SectorReplicaUpdateSubmitted{}, ReplicaUpdateWait),
@ -169,7 +174,14 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
on(SectorAbortUpgrade{}, AbortUpgrade),
),
FinalizeReplicaUpdate: planOne(
on(SectorFinalized{}, Proving),
on(SectorFinalized{}, UpdateActivating),
),
UpdateActivating: planOne(
on(SectorUpdateActive{}, ReleaseSectorKey),
),
ReleaseSectorKey: planOne(
on(SectorKeyReleased{}, Proving),
on(SectorReleaseKeyFailed{}, ReleaseSectorKeyFailed),
),
// Sealing errors
@ -231,6 +243,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
on(SectorRetryWaitDeals{}, SnapDealsWaitDeals),
apply(SectorStartPacking{}),
apply(SectorAddPiece{}),
on(SectorAbortUpgrade{}, AbortUpgrade),
),
SnapDealsDealsExpired: planOne(
on(SectorAbortUpgrade{}, AbortUpgrade),
@ -249,6 +262,10 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
on(SectorRetryProveReplicaUpdate{}, ProveReplicaUpdate),
on(SectorInvalidDealIDs{}, SnapDealsRecoverDealIDs),
on(SectorDealsExpired{}, SnapDealsDealsExpired),
on(SectorAbortUpgrade{}, AbortUpgrade),
),
ReleaseSectorKeyFailed: planOne(
on(SectorUpdateActive{}, ReleaseSectorKey),
),
// Post-seal
@ -477,6 +494,10 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
return m.handleReplicaUpdateWait, processed, nil
case FinalizeReplicaUpdate:
return m.handleFinalizeReplicaUpdate, processed, nil
case UpdateActivating:
return m.handleUpdateActivating, processed, nil
case ReleaseSectorKey:
return m.handleReleaseSectorKey, processed, nil
// Handled failure modes
case AddPieceFailed:
@ -513,6 +534,8 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
return m.handleSnapDealsRecoverDealIDs, processed, nil
case ReplicaUpdateFailed:
return m.handleSubmitReplicaUpdateFailed, processed, nil
case ReleaseSectorKeyFailed:
return m.handleReleaseSectorKeyFailed, 0, err
case AbortUpgrade:
return m.handleAbortUpgrade, processed, nil

View File

@ -335,6 +335,14 @@ type SectorReplicaUpdateLanded struct{}
func (evt SectorReplicaUpdateLanded) apply(state *SectorInfo) {}
type SectorUpdateActive struct{}
func (evt SectorUpdateActive) apply(state *SectorInfo) {}
type SectorKeyReleased struct{}
func (evt SectorKeyReleased) apply(state *SectorInfo) {}
// Failed state recovery
type SectorRetrySealPreCommit1 struct{}
@ -445,6 +453,13 @@ type SectorSubmitReplicaUpdateFailed struct{}
func (evt SectorSubmitReplicaUpdateFailed) apply(state *SectorInfo) {}
type SectorReleaseKeyFailed struct{ error }
func (evt SectorReleaseKeyFailed) FormatError(xerrors.Printer) (next error) {
return evt.error
}
func (evt SectorReleaseKeyFailed) apply(state *SectorInfo) {}
// Faults
type SectorFaulty struct{}

View File

@ -545,6 +545,13 @@ func (m *Sealing) StartPacking(sid abi.SectorNumber) error {
return m.sectors.Send(uint64(sid), SectorStartPacking{})
}
func (m *Sealing) AbortUpgrade(sid abi.SectorNumber) error {
m.startupWait.Wait()
log.Infow("aborting upgrade of sector", "sector", sid, "trigger", "user")
return m.sectors.Send(uint64(sid), SectorAbortUpgrade{xerrors.New("triggered by user")})
}
func proposalCID(deal api.PieceDealInfo) cid.Cid {
pc, err := deal.DealProposal.Cid()
if err != nil {

View File

@ -52,11 +52,14 @@ var ExistSectorStateList = map[SectorState]struct{}{
ProveReplicaUpdate: {},
SubmitReplicaUpdate: {},
ReplicaUpdateWait: {},
UpdateActivating: {},
ReleaseSectorKey: {},
FinalizeReplicaUpdate: {},
SnapDealsAddPieceFailed: {},
SnapDealsDealsExpired: {},
SnapDealsRecoverDealIDs: {},
ReplicaUpdateFailed: {},
ReleaseSectorKeyFailed: {},
AbortUpgrade: {},
}
@ -104,6 +107,8 @@ const (
SubmitReplicaUpdate SectorState = "SubmitReplicaUpdate"
ReplicaUpdateWait SectorState = "ReplicaUpdateWait"
FinalizeReplicaUpdate SectorState = "FinalizeReplicaUpdate"
UpdateActivating SectorState = "UpdateActivating"
ReleaseSectorKey SectorState = "ReleaseSectorKey"
// error modes
FailedUnrecoverable SectorState = "FailedUnrecoverable"
@ -124,6 +129,7 @@ const (
SnapDealsRecoverDealIDs SectorState = "SnapDealsRecoverDealIDs"
AbortUpgrade SectorState = "AbortUpgrade"
ReplicaUpdateFailed SectorState = "ReplicaUpdateFailed"
ReleaseSectorKeyFailed SectorState = "ReleaseSectorKeyFailed"
Faulty SectorState = "Faulty" // sector is corrupted or gone for some reason
FaultReported SectorState = "FaultReported" // sector has been declared as a fault on chain
@ -153,7 +159,7 @@ func toStatState(st SectorState, finEarly bool) statSectorState {
return sstProving
}
return sstSealing
case Proving, Removed, Removing, Terminating, TerminateWait, TerminateFinality, TerminateFailed:
case Proving, UpdateActivating, ReleaseSectorKey, Removed, Removing, Terminating, TerminateWait, TerminateFinality, TerminateFailed:
return sstProving
}

View File

@ -255,6 +255,16 @@ func (m *Sealing) handleSubmitReplicaUpdateFailed(ctx statemachine.Context, sect
return ctx.Send(SectorRetrySubmitReplicaUpdate{})
}
func (m *Sealing) handleReleaseSectorKeyFailed(ctx statemachine.Context, sector SectorInfo) error {
// not much we can do, wait for a bit and try again
if err := failedCooldown(ctx, sector); err != nil {
return err
}
return ctx.Send(SectorUpdateActive{})
}
func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo) error {
tok, _, err := m.Api.ChainHead(ctx.Context())
if err != nil {
@ -489,7 +499,7 @@ func (m *Sealing) HandleRecoverDealIDs(ctx statemachine.Context, sector SectorIn
}
func (m *Sealing) handleSnapDealsRecoverDealIDs(ctx statemachine.Context, sector SectorInfo) error {
return m.handleRecoverDealIDsOrFailWith(ctx, sector, SectorAbortUpgrade{})
return m.handleRecoverDealIDsOrFailWith(ctx, sector, SectorAbortUpgrade{xerrors.New("failed recovering deal ids")})
}
func recoveryPiecesToFix(ctx context.Context, api SealingAPI, sector SectorInfo, maddr address.Address) ([]int, int, error) {

View File

@ -2,17 +2,21 @@ package sealing
import (
"bytes"
"context"
"time"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-state-types/exitcode"
statemachine "github.com/filecoin-project/go-statemachine"
api "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/actors/policy"
"golang.org/x/xerrors"
)
func (m *Sealing) handleReplicaUpdate(ctx statemachine.Context, sector SectorInfo) error {
if err := checkPieces(ctx.Context(), m.maddr, sector, m.Api); err != nil { // Sanity check state
if err := checkPieces(ctx.Context(), m.maddr, sector, m.Api, true); err != nil { // Sanity check state
return handleErrors(ctx, err, sector)
}
out, err := m.sealer.ReplicaUpdate(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.pieceInfos())
@ -52,7 +56,7 @@ func (m *Sealing) handleProveReplicaUpdate(ctx statemachine.Context, sector Sect
return ctx.Send(SectorProveReplicaUpdateFailed{xerrors.Errorf("prove replica update (1) failed: %w", err)})
}
if err := checkPieces(ctx.Context(), m.maddr, sector, m.Api); err != nil { // Sanity check state
if err := checkPieces(ctx.Context(), m.maddr, sector, m.Api, true); err != nil { // Sanity check state
return handleErrors(ctx, err, sector)
}
@ -74,10 +78,6 @@ func (m *Sealing) handleSubmitReplicaUpdate(ctx statemachine.Context, sector Sec
return nil
}
if err := checkPieces(ctx.Context(), m.maddr, sector, m.Api); err != nil { // Sanity check state
return handleErrors(ctx, err, sector)
}
if err := checkReplicaUpdate(ctx.Context(), m.maddr, sector, tok, m.Api); err != nil {
return ctx.Send(SectorSubmitReplicaUpdateFailed{})
}
@ -211,16 +211,76 @@ func (m *Sealing) handleReplicaUpdateWait(ctx statemachine.Context, sector Secto
}
if !si.SealedCID.Equals(*sector.UpdateSealed) {
log.Errorf("mismatch of expected onchain sealed cid after replica update, expected %s got %s", sector.UpdateSealed, si.SealedCID)
return ctx.Send(SectorAbortUpgrade{})
return ctx.Send(SectorAbortUpgrade{xerrors.Errorf("mismatch of expected onchain sealed cid after replica update, expected %s got %s", sector.UpdateSealed, si.SealedCID)})
}
return ctx.Send(SectorReplicaUpdateLanded{})
}
func (m *Sealing) handleFinalizeReplicaUpdate(ctx statemachine.Context, sector SectorInfo) error {
cfg, err := m.getConfig()
if err != nil {
return xerrors.Errorf("getting sealing config: %w", err)
}
if err := m.sealer.FinalizeReplicaUpdate(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.keepUnsealedRanges(false, cfg.AlwaysKeepUnsealedCopy)); err != nil {
return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("finalize sector: %w", err)})
}
return ctx.Send(SectorFinalized{})
}
func (m *Sealing) handleUpdateActivating(ctx statemachine.Context, sector SectorInfo) error {
try := func() error {
mw, err := m.Api.StateWaitMsg(ctx.Context(), *sector.ReplicaUpdateMessage)
if err != nil {
return err
}
tok, _, err := m.Api.ChainHead(ctx.Context())
if err != nil {
return err
}
nv, err := m.Api.StateNetworkVersion(ctx.Context(), tok)
if err != nil {
return err
}
lb := policy.GetWinningPoStSectorSetLookback(nv)
targetHeight := mw.Height + lb + InteractivePoRepConfidence
return m.events.ChainAt(func(context.Context, TipSetToken, abi.ChainEpoch) error {
return ctx.Send(SectorUpdateActive{})
}, func(ctx context.Context, ts TipSetToken) error {
log.Warn("revert in handleUpdateActivating")
return nil
}, InteractivePoRepConfidence, targetHeight)
}
for {
err := try()
if err == nil {
break
}
log.Errorw("error in handleUpdateActivating", "error", err)
// likely an API issue, sleep for a bit and retry
time.Sleep(time.Minute)
}
return nil
}
func (m *Sealing) handleReleaseSectorKey(ctx statemachine.Context, sector SectorInfo) error {
if err := m.sealer.ReleaseSectorKey(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber)); err != nil {
return ctx.Send(SectorReleaseKeyFailed{err})
}
return ctx.Send(SectorKeyReleased{})
}
func handleErrors(ctx statemachine.Context, err error, sector SectorInfo) error {
switch err.(type) {
case *ErrApi:

View File

@ -198,7 +198,7 @@ func (m *Sealing) handleGetTicket(ctx statemachine.Context, sector SectorInfo) e
}
func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo) error {
if err := checkPieces(ctx.Context(), m.maddr, sector, m.Api); err != nil { // Sanity check state
if err := checkPieces(ctx.Context(), m.maddr, sector, m.Api, false); err != nil { // Sanity check state
switch err.(type) {
case *ErrApi:
log.Errorf("handlePreCommit1: api error, not proceeding: %+v", err)

2
go.mod
View File

@ -51,7 +51,7 @@ require (
github.com/filecoin-project/specs-actors/v5 v5.0.4
github.com/filecoin-project/specs-actors/v6 v6.0.1
github.com/filecoin-project/specs-actors/v7 v7.0.0-rc1
github.com/filecoin-project/specs-storage v0.1.1-0.20211228030229-6d460d25a0c9
github.com/filecoin-project/specs-storage v0.2.0
github.com/filecoin-project/test-vectors/schema v0.0.5
github.com/gbrlsnchs/jwt/v3 v3.0.1
github.com/gdamore/tcell/v2 v2.2.0

4
go.sum
View File

@ -382,8 +382,8 @@ github.com/filecoin-project/specs-actors/v7 v7.0.0-20211117170924-fd07a4c7dff9/g
github.com/filecoin-project/specs-actors/v7 v7.0.0-20211222192039-c83bea50c402/go.mod h1:p6LIOFezA1rgRLMewbvdi3Pp6SAu+q9FtJ9CAleSjrE=
github.com/filecoin-project/specs-actors/v7 v7.0.0-rc1 h1:FuDaXIbcw2hRsFI8SDTmsGGCE+NumpF6aiBoU/2X5W4=
github.com/filecoin-project/specs-actors/v7 v7.0.0-rc1/go.mod h1:TA5FwCna+Yi36POaT7SLKXsgEDvJwc0V/L6ZsO19B9M=
github.com/filecoin-project/specs-storage v0.1.1-0.20211228030229-6d460d25a0c9 h1:oUYOvF7EvdXS0Zmk9mNkaB6Bu0l+WXBYPzVodKMiLug=
github.com/filecoin-project/specs-storage v0.1.1-0.20211228030229-6d460d25a0c9/go.mod h1:Tb88Zq+IBJbvAn3mS89GYj3jdRThBTE/771HCVZdRJU=
github.com/filecoin-project/specs-storage v0.2.0 h1:Y4UDv0apRQ3zI2GiPPubi8JblpUZZphEdaJUxCutfyg=
github.com/filecoin-project/specs-storage v0.2.0/go.mod h1:Tb88Zq+IBJbvAn3mS89GYj3jdRThBTE/771HCVZdRJU=
github.com/filecoin-project/test-vectors/schema v0.0.5 h1:w3zHQhzM4pYxJDl21avXjOKBLF8egrvwUwjpT8TquDg=
github.com/filecoin-project/test-vectors/schema v0.0.5/go.mod h1:iQ9QXLpYWL3m7warwvK1JC/pTri8mnfEmKygNDqqY6E=
github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc=

View File

@ -30,23 +30,14 @@ func TestCCUpgrade(t *testing.T) {
//stm: @MINER_SECTOR_LIST_001
kit.QuietMiningLogs()
for _, height := range []abi.ChainEpoch{
-1, // before
162, // while sealing
560, // after upgrade deal
} {
height := height // make linters happy by copying
t.Run(fmt.Sprintf("upgrade-%d", height), func(t *testing.T) {
runTestCCUpgrade(t, height)
})
}
runTestCCUpgrade(t)
}
func runTestCCUpgrade(t *testing.T, upgradeHeight abi.ChainEpoch) *kit.TestFullNode {
func runTestCCUpgrade(t *testing.T) *kit.TestFullNode {
ctx := context.Background()
blockTime := 1 * time.Millisecond
client, miner, ens := kit.EnsembleMinimal(t, kit.GenesisNetworkVersion(network.Version15))
client, miner, ens := kit.EnsembleMinimal(t, kit.GenesisNetworkVersion(network.Version15), kit.ThroughRPC())
ens.InterconnectAll().BeginMiningMustPost(blockTime)
maddr, err := miner.ActorAddress(ctx)
@ -72,7 +63,6 @@ func runTestCCUpgrade(t *testing.T, upgradeHeight abi.ChainEpoch) *kit.TestFullN
}
waitForSectorActive(ctx, t, CCUpgrade, client, maddr)
//stm: @SECTOR_CC_UPGRADE_001
err = miner.SectorMarkForUpgrade(ctx, sl[0], true)
require.NoError(t, err)
@ -91,6 +81,11 @@ func runTestCCUpgrade(t *testing.T, upgradeHeight abi.ChainEpoch) *kit.TestFullN
status, err := miner.SectorsStatus(ctx, CCUpgrade, true)
require.NoError(t, err)
assert.Equal(t, 1, len(status.Deals))
miner.WaitSectorsProving(ctx, map[abi.SectorNumber]struct{}{
CCUpgrade: {},
})
return client
}
@ -137,7 +132,7 @@ func TestCCUpgradeAndPoSt(t *testing.T) {
kit.QuietMiningLogs()
t.Run("upgrade and then post", func(t *testing.T) {
ctx := context.Background()
n := runTestCCUpgrade(t, 100)
n := runTestCCUpgrade(t)
ts, err := n.ChainHead(ctx)
require.NoError(t, err)
start := ts.Height()

View File

@ -87,7 +87,10 @@ type TestMiner struct {
func (tm *TestMiner) PledgeSectors(ctx context.Context, n, existing int, blockNotif <-chan struct{}) {
toCheck := tm.StartPledge(ctx, n, existing, blockNotif)
tm.WaitSectorsProving(ctx, toCheck)
}
func (tm *TestMiner) WaitSectorsProving(ctx context.Context, toCheck map[abi.SectorNumber]struct{}) {
for len(toCheck) > 0 {
tm.FlushSealingBatches(ctx)
@ -105,9 +108,8 @@ func (tm *TestMiner) PledgeSectors(ctx context.Context, n, existing int, blockNo
}
build.Clock.Sleep(100 * time.Millisecond)
fmt.Printf("WaitSeal: %d %+v\n", len(toCheck), states)
fmt.Printf("WaitSectorsProving: %d %+v\n", len(toCheck), states)
}
}
func (tm *TestMiner) StartPledge(ctx context.Context, n, existing int, blockNotif <-chan struct{}) map[abi.SectorNumber]struct{} {

View File

@ -39,6 +39,7 @@ func fullRpc(t *testing.T, f *TestFullNode) *TestFullNode {
require.NoError(t, err)
srv, maddr := CreateRPCServer(t, handler, l)
fmt.Printf("FULLNODE RPC ENV FOR CLI DEBUGGING `export FULLNODE_API_INFO=%s`\n", "ws://"+srv.Listener.Addr().String())
cl, stop, err := client.NewFullNodeRPCV1(context.Background(), "ws://"+srv.Listener.Addr().String()+"/rpc/v1", nil)
require.NoError(t, err)
@ -54,7 +55,9 @@ func minerRpc(t *testing.T, m *TestMiner) *TestMiner {
srv, maddr := CreateRPCServer(t, handler, m.RemoteListener)
fmt.Println("creating RPC server for", m.ActorAddr, "at: ", srv.Listener.Addr().String())
fmt.Printf("creating RPC server for %s at %s\n", m.ActorAddr, srv.Listener.Addr().String())
fmt.Printf("SP RPC ENV FOR CLI DEBUGGING `export MINER_API_INFO=%s`\n", "ws://"+srv.Listener.Addr().String())
url := "ws://" + srv.Listener.Addr().String() + "/rpc/v0"
cl, stop, err := client.NewStorageMinerRPCV0(context.Background(), url, nil)
require.NoError(t, err)

View File

@ -85,7 +85,7 @@ func DefaultFullNode() *FullNode {
Splitstore: Splitstore{
ColdStoreType: "universal",
HotStoreType: "badger",
MarkSetType: "map",
MarkSetType: "badger",
HotStoreFullGCFrequency: 20,
},

View File

@ -810,7 +810,7 @@ Only currently supported value is "badger".`,
Type: "string",
Comment: `MarkSetType specifies the type of the markset.
It can be "map" (default) for in memory marking or "badger" for on-disk marking.`,
It can be "map" for in memory marking or "badger" (default) for on-disk marking.`,
},
{
Name: "HotStoreMessageRetention",

View File

@ -363,7 +363,7 @@ type Splitstore struct {
// Only currently supported value is "badger".
HotStoreType string
// MarkSetType specifies the type of the markset.
// It can be "map" (default) for in memory marking or "badger" for on-disk marking.
// It can be "map" for in memory marking or "badger" (default) for on-disk marking.
MarkSetType string
// HotStoreMessageRetention specifies the retention policy for messages, in finalities beyond

View File

@ -391,6 +391,10 @@ func (sm *StorageMinerAPI) SectorMarkForUpgrade(ctx context.Context, id abi.Sect
return sm.Miner.MarkForUpgrade(ctx, id, snap)
}
func (sm *StorageMinerAPI) SectorAbortUpgrade(ctx context.Context, number abi.SectorNumber) error {
return sm.Miner.SectorAbortUpgrade(number)
}
func (sm *StorageMinerAPI) SectorCommitFlush(ctx context.Context) ([]sealiface.CommitBatchRes, error) {
return sm.Miner.CommitFlush(ctx)
}

View File

@ -86,6 +86,10 @@ func (m *Miner) IsMarkedForUpgrade(id abi.SectorNumber) bool {
return m.sealing.IsMarkedForUpgrade(id)
}
func (m *Miner) SectorAbortUpgrade(sectorNum abi.SectorNumber) error {
return m.sealing.AbortUpgrade(sectorNum)
}
func (m *Miner) SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPieceSize, r storage.Data, d api.PieceDealInfo) (api.SectorOffset, error) {
return m.sealing.SectorAddPieceToAny(ctx, size, r, d)
}

View File

@ -1,4 +1,54 @@
{
"v28-empty-sector-update-merkletree-poseidon_hasher-8-0-0-61fa69f38b9cc771ba27b670124714b4ea77fbeae05e377fb859c4a43b73a30c.params": {
"cid": "Qma5WL6abSqYg9uUQAZ3EHS286bsNsha7oAGsJBD48Bq2q",
"digest": "c3ad7bb549470b82ad52ed070aebb4f4",
"sector_size": 536870912
},
"v28-empty-sector-update-merkletree-poseidon_hasher-8-0-0-61fa69f38b9cc771ba27b670124714b4ea77fbeae05e377fb859c4a43b73a30c.vk": {
"cid": "QmUa7f9JtJMsqJJ3s3ZXk6WyF4xJLE8FiqYskZGgk8GCDv",
"digest": "994c5b7d450ca9da348c910689f2dc7f",
"sector_size": 536870912
},
"v28-empty-sector-update-merkletree-poseidon_hasher-8-0-0-92180959e1918d26350b8e6cfe217bbdd0a2d8de51ebec269078b364b715ad63.params": {
"cid": "QmQiT4qBGodrVNEgVTDXxBNDdPbaD8Ag7Sx3ZTq1zHX79S",
"digest": "5aedd2cf3e5c0a15623d56a1b43110ad",
"sector_size": 8388608
},
"v28-empty-sector-update-merkletree-poseidon_hasher-8-0-0-92180959e1918d26350b8e6cfe217bbdd0a2d8de51ebec269078b364b715ad63.vk": {
"cid": "QmdcpKUQvHM8RFRVKbk1yHfEqMcBzhtFWKRp9SNEmWq37i",
"digest": "abd80269054d391a734febdac0d2e687",
"sector_size": 8388608
},
"v28-empty-sector-update-merkletree-poseidon_hasher-8-0-0-fb9e095bebdd77511c0269b967b4d87ba8b8a525edaa0e165de23ba454510194.params": {
"cid": "QmYM6Hg7mjmvA3ZHTsqkss1fkdyDju5dDmLiBZGJ5pz9y9",
"digest": "311f92a3e75036ced01b1c0025f1fa0c",
"sector_size": 2048
},
"v28-empty-sector-update-merkletree-poseidon_hasher-8-0-0-fb9e095bebdd77511c0269b967b4d87ba8b8a525edaa0e165de23ba454510194.vk": {
"cid": "QmaQsTLL3nc5dw6wAvaioJSBfd1jhQrA2o6ucFf7XeV74P",
"digest": "eadad9784969890d30f2749708c79771",
"sector_size": 2048
},
"v28-empty-sector-update-merkletree-poseidon_hasher-8-8-0-3b7f44a9362e3985369454947bc94022e118211e49fd672d52bec1cbfd599d18.params": {
"cid": "QmNPc75iEfcahCwNKdqnWLtxnjspUGGR4iscjiz3wP3RtS",
"digest": "1b3cfd761a961543f9eb273e435a06a2",
"sector_size": 34359738368
},
"v28-empty-sector-update-merkletree-poseidon_hasher-8-8-0-3b7f44a9362e3985369454947bc94022e118211e49fd672d52bec1cbfd599d18.vk": {
"cid": "QmdFFUe1gcz9MMHc6YW8aoV48w4ckvcERjt7PkydQAMfCN",
"digest": "3a6941983754737fde880d29c7094905",
"sector_size": 34359738368
},
"v28-empty-sector-update-merkletree-poseidon_hasher-8-8-2-102e1444a7e9a97ebf1e3d6855dcc77e66c011ea66f936d9b2c508f87f2f83a7.params": {
"cid": "QmUB6xTVjzBQGuDNeyJMrrJ1byk58vhPm8eY2Lv9pgwanp",
"digest": "1a392e7b759fb18e036c7559b5ece816",
"sector_size": 68719476736
},
"v28-empty-sector-update-merkletree-poseidon_hasher-8-8-2-102e1444a7e9a97ebf1e3d6855dcc77e66c011ea66f936d9b2c508f87f2f83a7.vk": {
"cid": "Qmd794Jty7k26XJ8Eg4NDEks65Qk8G4GVfGkwqvymv8HAg",
"digest": "80e366df2f1011953c2d01c7b7c9ee8e",
"sector_size": 68719476736
},
"v28-proof-of-spacetime-fallback-merkletree-poseidon_hasher-8-0-0-0170db1f394b35d995252228ee359194b13199d259380541dc529fb0099096b0.params": {
"cid": "QmVxjFRyhmyQaZEtCh7nk2abc7LhFkzhnRX4rcHqCCpikR",
"digest": "7610b9f82bfc88405b7a832b651ce2f6",

View File

@ -2941,4 +2941,4 @@ sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=
sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0/go.mod h1:hI742Nqp5OhwiqlzhgfbWU4mW4yO10fP+LoT9WOswdU=
sourcegraph.com/sourcegraph/go-diff v0.5.0/go.mod h1:kuch7UrkMzY0X+p9CRK03kfuPQ2zzQcaEFbx8wA8rck=
sourcegraph.com/sqs/pbtypes v0.0.0-20180604144634-d3ebe8f20ae4/go.mod h1:ketZ/q3QxT9HOBeFhu6RdvsftgpsbFHBF5Cas6cDKZ0=
sourcegraph.com/sqs/pbtypes v0.0.0-20180604144634-d3ebe8f20ae4/go.mod h1:ketZ/q3QxT9HOBeFhu6RdvsftgpsbFHBF5Cas6cDKZ0=