Merge branch 'filecoin-project:master' into master

This commit is contained in:
swift-mx 2021-08-11 19:25:27 +08:00 committed by GitHub
commit d6562bccfc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
48 changed files with 915 additions and 417 deletions

View File

@ -113,6 +113,11 @@ type FullNode interface {
// will be returned.
ChainGetTipSetByHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error) //perm:read
// ChainGetTipSetAfterHeight looks back for a tipset at the specified epoch.
// If there are no blocks at the specified epoch, the first non-nil tipset at a later epoch
// will be returned.
ChainGetTipSetAfterHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error) //perm:read
// ChainReadObj reads ipld nodes referenced by the specified CID from chain
// blockstore and returns raw bytes.
ChainReadObj(context.Context, cid.Cid) ([]byte, error) //perm:read

View File

@ -35,6 +35,7 @@ type Gateway interface {
ChainGetMessage(ctx context.Context, mc cid.Cid) (*types.Message, error)
ChainGetTipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error)
ChainGetTipSetByHeight(ctx context.Context, h abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error)
ChainGetTipSetAfterHeight(ctx context.Context, h abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error)
ChainNotify(context.Context) (<-chan []*HeadChange, error)
ChainReadObj(context.Context, cid.Cid) ([]byte, error)
GasEstimateMessageGas(ctx context.Context, msg *types.Message, spec *MessageSendSpec, tsk types.TipSetKey) (*types.Message, error)

View File

@ -343,6 +343,21 @@ func (mr *MockFullNodeMockRecorder) ChainGetTipSet(arg0, arg1 interface{}) *gomo
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainGetTipSet", reflect.TypeOf((*MockFullNode)(nil).ChainGetTipSet), arg0, arg1)
}
// ChainGetTipSetAfterHeight mocks base method.
func (m *MockFullNode) ChainGetTipSetAfterHeight(arg0 context.Context, arg1 abi.ChainEpoch, arg2 types.TipSetKey) (*types.TipSet, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ChainGetTipSetAfterHeight", arg0, arg1, arg2)
ret0, _ := ret[0].(*types.TipSet)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// ChainGetTipSetAfterHeight indicates an expected call of ChainGetTipSetAfterHeight.
func (mr *MockFullNodeMockRecorder) ChainGetTipSetAfterHeight(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainGetTipSetAfterHeight", reflect.TypeOf((*MockFullNode)(nil).ChainGetTipSetAfterHeight), arg0, arg1, arg2)
}
// ChainGetTipSetByHeight mocks base method.
func (m *MockFullNode) ChainGetTipSetByHeight(arg0 context.Context, arg1 abi.ChainEpoch, arg2 types.TipSetKey) (*types.TipSet, error) {
m.ctrl.T.Helper()

View File

@ -132,6 +132,8 @@ type FullNodeStruct struct {
ChainGetTipSet func(p0 context.Context, p1 types.TipSetKey) (*types.TipSet, error) `perm:"read"`
ChainGetTipSetAfterHeight func(p0 context.Context, p1 abi.ChainEpoch, p2 types.TipSetKey) (*types.TipSet, error) `perm:"read"`
ChainGetTipSetByHeight func(p0 context.Context, p1 abi.ChainEpoch, p2 types.TipSetKey) (*types.TipSet, error) `perm:"read"`
ChainHasObj func(p0 context.Context, p1 cid.Cid) (bool, error) `perm:"read"`
@ -474,6 +476,8 @@ type GatewayStruct struct {
ChainGetTipSet func(p0 context.Context, p1 types.TipSetKey) (*types.TipSet, error) ``
ChainGetTipSetAfterHeight func(p0 context.Context, p1 abi.ChainEpoch, p2 types.TipSetKey) (*types.TipSet, error) ``
ChainGetTipSetByHeight func(p0 context.Context, p1 abi.ChainEpoch, p2 types.TipSetKey) (*types.TipSet, error) ``
ChainHasObj func(p0 context.Context, p1 cid.Cid) (bool, error) ``
@ -1171,6 +1175,17 @@ func (s *FullNodeStub) ChainGetTipSet(p0 context.Context, p1 types.TipSetKey) (*
return nil, ErrNotSupported
}
func (s *FullNodeStruct) ChainGetTipSetAfterHeight(p0 context.Context, p1 abi.ChainEpoch, p2 types.TipSetKey) (*types.TipSet, error) {
if s.Internal.ChainGetTipSetAfterHeight == nil {
return nil, ErrNotSupported
}
return s.Internal.ChainGetTipSetAfterHeight(p0, p1, p2)
}
func (s *FullNodeStub) ChainGetTipSetAfterHeight(p0 context.Context, p1 abi.ChainEpoch, p2 types.TipSetKey) (*types.TipSet, error) {
return nil, ErrNotSupported
}
func (s *FullNodeStruct) ChainGetTipSetByHeight(p0 context.Context, p1 abi.ChainEpoch, p2 types.TipSetKey) (*types.TipSet, error) {
if s.Internal.ChainGetTipSetByHeight == nil {
return nil, ErrNotSupported
@ -2997,6 +3012,17 @@ func (s *GatewayStub) ChainGetTipSet(p0 context.Context, p1 types.TipSetKey) (*t
return nil, ErrNotSupported
}
func (s *GatewayStruct) ChainGetTipSetAfterHeight(p0 context.Context, p1 abi.ChainEpoch, p2 types.TipSetKey) (*types.TipSet, error) {
if s.Internal.ChainGetTipSetAfterHeight == nil {
return nil, ErrNotSupported
}
return s.Internal.ChainGetTipSetAfterHeight(p0, p1, p2)
}
func (s *GatewayStub) ChainGetTipSetAfterHeight(p0 context.Context, p1 abi.ChainEpoch, p2 types.TipSetKey) (*types.TipSet, error) {
return nil, ErrNotSupported
}
func (s *GatewayStruct) ChainGetTipSetByHeight(p0 context.Context, p1 abi.ChainEpoch, p2 types.TipSetKey) (*types.TipSet, error) {
if s.Internal.ChainGetTipSetByHeight == nil {
return nil, ErrNotSupported

View File

@ -21,15 +21,26 @@ type MarkSet interface {
SetConcurrent()
}
type MarkSetVisitor interface {
MarkSet
ObjectVisitor
}
type MarkSetEnv interface {
// Create creates a new markset within the environment.
// name is a unique name for this markset, mapped to the filesystem in disk-backed environments
// sizeHint is a hint about the expected size of the markset
Create(name string, sizeHint int64) (MarkSet, error)
// CreateVisitor is like Create, but returns a wider interface that supports atomic visits.
// It may not be supported by some markset types (e.g. bloom).
CreateVisitor(name string, sizeHint int64) (MarkSetVisitor, error)
// SupportsVisitor returns true if the marksets created by this environment support the visitor interface.
SupportsVisitor() bool
Close() error
}
func OpenMarkSetEnv(path string, mtype string) (MarkSetEnv, error) {
switch mtype {
case "bloom":
return NewBloomMarkSetEnv()
case "map":
return NewMapMarkSetEnv()
case "badger":

View File

@ -27,12 +27,14 @@ type BadgerMarkSet struct {
writing map[int]map[string]struct{}
writers int
seqno int
version int
db *badger.DB
path string
}
var _ MarkSet = (*BadgerMarkSet)(nil)
var _ MarkSetVisitor = (*BadgerMarkSet)(nil)
var badgerMarkSetBatchSize = 16384
@ -46,9 +48,241 @@ func NewBadgerMarkSetEnv(path string) (MarkSetEnv, error) {
return &BadgerMarkSetEnv{path: msPath}, nil
}
func (e *BadgerMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) {
func (e *BadgerMarkSetEnv) create(name string, sizeHint int64) (*BadgerMarkSet, error) {
name += ".tmp"
path := filepath.Join(e.path, name)
db, err := openTransientBadgerDB(path)
if err != nil {
return nil, xerrors.Errorf("error creating badger db: %w", err)
}
ms := &BadgerMarkSet{
pend: make(map[string]struct{}),
writing: make(map[int]map[string]struct{}),
db: db,
path: path,
}
ms.cond.L = &ms.mx
return ms, nil
}
func (e *BadgerMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) {
return e.create(name, sizeHint)
}
func (e *BadgerMarkSetEnv) CreateVisitor(name string, sizeHint int64) (MarkSetVisitor, error) {
return e.create(name, sizeHint)
}
func (e *BadgerMarkSetEnv) SupportsVisitor() bool { return true }
func (e *BadgerMarkSetEnv) Close() error {
return os.RemoveAll(e.path)
}
func (s *BadgerMarkSet) Mark(c cid.Cid) error {
s.mx.Lock()
if s.pend == nil {
s.mx.Unlock()
return errMarkSetClosed
}
write, seqno := s.put(string(c.Hash()))
s.mx.Unlock()
if write {
return s.write(seqno)
}
return nil
}
func (s *BadgerMarkSet) Has(c cid.Cid) (bool, error) {
s.mx.RLock()
defer s.mx.RUnlock()
key := c.Hash()
pendKey := string(key)
has, err := s.tryPending(pendKey)
if has || err != nil {
return has, err
}
return s.tryDB(key)
}
func (s *BadgerMarkSet) Visit(c cid.Cid) (bool, error) {
key := c.Hash()
pendKey := string(key)
s.mx.RLock()
has, err := s.tryPending(pendKey)
if has || err != nil {
s.mx.RUnlock()
return false, err
}
has, err = s.tryDB(key)
if has || err != nil {
s.mx.RUnlock()
return false, err
}
// we need to upgrade the lock to exclusive in order to write; take the version count to see
// if there was another write while we were upgrading
version := s.version
s.mx.RUnlock()
s.mx.Lock()
// we have to do the check dance again
has, err = s.tryPending(pendKey)
if has || err != nil {
s.mx.Unlock()
return false, err
}
if version != s.version {
// something was written to the db, we need to check it
has, err = s.tryDB(key)
if has || err != nil {
s.mx.Unlock()
return false, err
}
}
write, seqno := s.put(pendKey)
s.mx.Unlock()
if write {
err = s.write(seqno)
}
return true, err
}
// reader holds the (r)lock
func (s *BadgerMarkSet) tryPending(key string) (has bool, err error) {
if s.pend == nil {
return false, errMarkSetClosed
}
if _, ok := s.pend[key]; ok {
return true, nil
}
for _, wr := range s.writing {
if _, ok := wr[key]; ok {
return true, nil
}
}
return false, nil
}
func (s *BadgerMarkSet) tryDB(key []byte) (has bool, err error) {
err = s.db.View(func(txn *badger.Txn) error {
_, err := txn.Get(key)
return err
})
switch err {
case nil:
return true, nil
case badger.ErrKeyNotFound:
return false, nil
default:
return false, err
}
}
// writer holds the exclusive lock
func (s *BadgerMarkSet) put(key string) (write bool, seqno int) {
s.pend[key] = struct{}{}
if len(s.pend) < badgerMarkSetBatchSize {
return false, 0
}
seqno = s.seqno
s.seqno++
s.writing[seqno] = s.pend
s.pend = make(map[string]struct{})
return true, seqno
}
func (s *BadgerMarkSet) write(seqno int) (err error) {
s.mx.Lock()
if s.pend == nil {
s.mx.Unlock()
return errMarkSetClosed
}
pend := s.writing[seqno]
s.writers++
s.mx.Unlock()
defer func() {
s.mx.Lock()
defer s.mx.Unlock()
if err == nil {
delete(s.writing, seqno)
s.version++
}
s.writers--
if s.writers == 0 {
s.cond.Broadcast()
}
}()
empty := []byte{} // not nil
batch := s.db.NewWriteBatch()
defer batch.Cancel()
for k := range pend {
if err = batch.Set([]byte(k), empty); err != nil {
return xerrors.Errorf("error setting batch: %w", err)
}
}
err = batch.Flush()
if err != nil {
return xerrors.Errorf("error flushing batch to badger markset: %w", err)
}
return nil
}
func (s *BadgerMarkSet) Close() error {
s.mx.Lock()
defer s.mx.Unlock()
if s.pend == nil {
return nil
}
for s.writers > 0 {
s.cond.Wait()
}
s.pend = nil
db := s.db
s.db = nil
return closeTransientBadgerDB(db, s.path)
}
func (s *BadgerMarkSet) SetConcurrent() {}
func openTransientBadgerDB(path string) (*badger.DB, error) {
// clean up first
err := os.RemoveAll(path)
if err != nil {
@ -76,140 +310,16 @@ func (e *BadgerMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error)
skip2: log.Desugar().WithOptions(zap.AddCallerSkip(2)).Sugar(),
}
db, err := badger.Open(opts)
if err != nil {
return nil, xerrors.Errorf("error creating badger markset: %w", err)
}
ms := &BadgerMarkSet{
pend: make(map[string]struct{}),
writing: make(map[int]map[string]struct{}),
db: db,
path: path,
}
ms.cond.L = &ms.mx
return ms, nil
return badger.Open(opts)
}
func (e *BadgerMarkSetEnv) Close() error {
return os.RemoveAll(e.path)
}
func (s *BadgerMarkSet) Mark(c cid.Cid) error {
s.mx.Lock()
if s.pend == nil {
s.mx.Unlock()
return errMarkSetClosed
}
s.pend[string(c.Hash())] = struct{}{}
if len(s.pend) < badgerMarkSetBatchSize {
s.mx.Unlock()
return nil
}
pend := s.pend
seqno := s.seqno
s.seqno++
s.writing[seqno] = pend
s.pend = make(map[string]struct{})
s.writers++
s.mx.Unlock()
defer func() {
s.mx.Lock()
defer s.mx.Unlock()
delete(s.writing, seqno)
s.writers--
if s.writers == 0 {
s.cond.Broadcast()
}
}()
empty := []byte{} // not nil
batch := s.db.NewWriteBatch()
defer batch.Cancel()
for k := range pend {
if err := batch.Set([]byte(k), empty); err != nil {
return err
}
}
err := batch.Flush()
if err != nil {
return xerrors.Errorf("error flushing batch to badger markset: %w", err)
}
return nil
}
func (s *BadgerMarkSet) Has(c cid.Cid) (bool, error) {
s.mx.RLock()
defer s.mx.RUnlock()
if s.pend == nil {
return false, errMarkSetClosed
}
key := c.Hash()
pendKey := string(key)
_, ok := s.pend[pendKey]
if ok {
return true, nil
}
for _, wr := range s.writing {
_, ok := wr[pendKey]
if ok {
return true, nil
}
}
err := s.db.View(func(txn *badger.Txn) error {
_, err := txn.Get(key)
return err
})
switch err {
case nil:
return true, nil
case badger.ErrKeyNotFound:
return false, nil
default:
return false, xerrors.Errorf("error checking badger markset: %w", err)
}
}
func (s *BadgerMarkSet) Close() error {
s.mx.Lock()
defer s.mx.Unlock()
if s.pend == nil {
return nil
}
for s.writers > 0 {
s.cond.Wait()
}
s.pend = nil
db := s.db
s.db = nil
func closeTransientBadgerDB(db *badger.DB, path string) error {
err := db.Close()
if err != nil {
return xerrors.Errorf("error closing badger markset: %w", err)
}
err = os.RemoveAll(s.path)
err = os.RemoveAll(path)
if err != nil {
return xerrors.Errorf("error deleting badger markset: %w", err)
}
@ -217,8 +327,6 @@ func (s *BadgerMarkSet) Close() error {
return nil
}
func (s *BadgerMarkSet) SetConcurrent() {}
// badger logging through go-log
type badgerLogger struct {
*zap.SugaredLogger

View File

@ -1,107 +0,0 @@
package splitstore
import (
"crypto/rand"
"crypto/sha256"
"sync"
"golang.org/x/xerrors"
bbloom "github.com/ipfs/bbloom"
cid "github.com/ipfs/go-cid"
)
const (
BloomFilterMinSize = 10_000_000
BloomFilterProbability = 0.01
)
type BloomMarkSetEnv struct{}
var _ MarkSetEnv = (*BloomMarkSetEnv)(nil)
type BloomMarkSet struct {
salt []byte
mx sync.RWMutex
bf *bbloom.Bloom
ts bool
}
var _ MarkSet = (*BloomMarkSet)(nil)
func NewBloomMarkSetEnv() (*BloomMarkSetEnv, error) {
return &BloomMarkSetEnv{}, nil
}
func (e *BloomMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) {
size := int64(BloomFilterMinSize)
for size < sizeHint {
size += BloomFilterMinSize
}
salt := make([]byte, 4)
_, err := rand.Read(salt)
if err != nil {
return nil, xerrors.Errorf("error reading salt: %w", err)
}
bf, err := bbloom.New(float64(size), BloomFilterProbability)
if err != nil {
return nil, xerrors.Errorf("error creating bloom filter: %w", err)
}
return &BloomMarkSet{salt: salt, bf: bf}, nil
}
func (e *BloomMarkSetEnv) Close() error {
return nil
}
func (s *BloomMarkSet) saltedKey(cid cid.Cid) []byte {
hash := cid.Hash()
key := make([]byte, len(s.salt)+len(hash))
n := copy(key, s.salt)
copy(key[n:], hash)
rehash := sha256.Sum256(key)
return rehash[:]
}
func (s *BloomMarkSet) Mark(cid cid.Cid) error {
if s.ts {
s.mx.Lock()
defer s.mx.Unlock()
}
if s.bf == nil {
return errMarkSetClosed
}
s.bf.Add(s.saltedKey(cid))
return nil
}
func (s *BloomMarkSet) Has(cid cid.Cid) (bool, error) {
if s.ts {
s.mx.RLock()
defer s.mx.RUnlock()
}
if s.bf == nil {
return false, errMarkSetClosed
}
return s.bf.Has(s.saltedKey(cid)), nil
}
func (s *BloomMarkSet) Close() error {
if s.ts {
s.mx.Lock()
defer s.mx.Unlock()
}
s.bf = nil
return nil
}
func (s *BloomMarkSet) SetConcurrent() {
s.ts = true
}

View File

@ -18,17 +18,28 @@ type MapMarkSet struct {
}
var _ MarkSet = (*MapMarkSet)(nil)
var _ MarkSetVisitor = (*MapMarkSet)(nil)
func NewMapMarkSetEnv() (*MapMarkSetEnv, error) {
return &MapMarkSetEnv{}, nil
}
func (e *MapMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) {
func (e *MapMarkSetEnv) create(name string, sizeHint int64) (*MapMarkSet, error) {
return &MapMarkSet{
set: make(map[string]struct{}, sizeHint),
}, nil
}
func (e *MapMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) {
return e.create(name, sizeHint)
}
func (e *MapMarkSetEnv) CreateVisitor(name string, sizeHint int64) (MarkSetVisitor, error) {
return e.create(name, sizeHint)
}
func (e *MapMarkSetEnv) SupportsVisitor() bool { return true }
func (e *MapMarkSetEnv) Close() error {
return nil
}
@ -61,6 +72,25 @@ func (s *MapMarkSet) Has(cid cid.Cid) (bool, error) {
return ok, nil
}
func (s *MapMarkSet) Visit(c cid.Cid) (bool, error) {
if s.ts {
s.mx.Lock()
defer s.mx.Unlock()
}
if s.set == nil {
return false, errMarkSetClosed
}
key := string(c.Hash())
if _, ok := s.set[key]; ok {
return false, nil
}
s.set[key] = struct{}{}
return true, nil
}
func (s *MapMarkSet) Close() error {
if s.ts {
s.mx.Lock()

View File

@ -2,6 +2,7 @@ package splitstore
import (
"io/ioutil"
"os"
"testing"
cid "github.com/ipfs/go-cid"
@ -10,10 +11,7 @@ import (
func TestMapMarkSet(t *testing.T) {
testMarkSet(t, "map")
}
func TestBloomMarkSet(t *testing.T) {
testMarkSet(t, "bloom")
testMarkSetVisitor(t, "map")
}
func TestBadgerMarkSet(t *testing.T) {
@ -23,16 +21,21 @@ func TestBadgerMarkSet(t *testing.T) {
badgerMarkSetBatchSize = bs
})
testMarkSet(t, "badger")
testMarkSetVisitor(t, "badger")
}
func testMarkSet(t *testing.T, lsType string) {
t.Helper()
path, err := ioutil.TempDir("", "sweep-test.*")
path, err := ioutil.TempDir("", "markset.*")
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
_ = os.RemoveAll(path)
})
env, err := OpenMarkSetEnv(path, lsType)
if err != nil {
t.Fatal(err)
@ -145,3 +148,74 @@ func testMarkSet(t *testing.T, lsType string) {
t.Fatal(err)
}
}
func testMarkSetVisitor(t *testing.T, lsType string) {
t.Helper()
path, err := ioutil.TempDir("", "markset.*")
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
_ = os.RemoveAll(path)
})
env, err := OpenMarkSetEnv(path, lsType)
if err != nil {
t.Fatal(err)
}
defer env.Close() //nolint:errcheck
visitor, err := env.CreateVisitor("test", 0)
if err != nil {
t.Fatal(err)
}
defer visitor.Close() //nolint:errcheck
makeCid := func(key string) cid.Cid {
h, err := multihash.Sum([]byte(key), multihash.SHA2_256, -1)
if err != nil {
t.Fatal(err)
}
return cid.NewCidV1(cid.Raw, h)
}
mustVisit := func(v ObjectVisitor, cid cid.Cid) {
visit, err := v.Visit(cid)
if err != nil {
t.Fatal(err)
}
if !visit {
t.Fatal("object should be visited")
}
}
mustNotVisit := func(v ObjectVisitor, cid cid.Cid) {
visit, err := v.Visit(cid)
if err != nil {
t.Fatal(err)
}
if visit {
t.Fatal("unexpected visit")
}
}
k1 := makeCid("a")
k2 := makeCid("b")
k3 := makeCid("c")
k4 := makeCid("d")
mustVisit(visitor, k1)
mustVisit(visitor, k2)
mustVisit(visitor, k3)
mustVisit(visitor, k4)
mustNotVisit(visitor, k1)
mustNotVisit(visitor, k2)
mustNotVisit(visitor, k3)
mustNotVisit(visitor, k4)
}

View File

@ -142,7 +142,6 @@ type SplitStore struct {
txnViews int
txnViewsWaiting bool
txnActive bool
txnProtect MarkSet
txnRefsMx sync.Mutex
txnRefs map[cid.Cid]struct{}
txnMissing map[cid.Cid]struct{}
@ -174,6 +173,10 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co
return nil, err
}
if !markSetEnv.SupportsVisitor() {
return nil, xerrors.Errorf("markset type does not support atomic visitors")
}
// and now we can make a SplitStore
ss := &SplitStore{
cfg: cfg,

View File

@ -83,7 +83,14 @@ func (s *SplitStore) doCheck(curTs *types.TipSet) error {
write("--")
var coldCnt, missingCnt int64
err = s.walkChain(curTs, boundaryEpoch, boundaryEpoch,
visitor, err := s.markSetEnv.CreateVisitor("check", 0)
if err != nil {
return xerrors.Errorf("error creating visitor: %w", err)
}
defer visitor.Close() //nolint
err = s.walkChain(curTs, boundaryEpoch, boundaryEpoch, visitor,
func(c cid.Cid) error {
if isUnitaryObject(c) {
return errStopWalk

View File

@ -211,7 +211,7 @@ func (s *SplitStore) trackTxnRefMany(cids []cid.Cid) {
}
// protect all pending transactional references
func (s *SplitStore) protectTxnRefs(markSet MarkSet) error {
func (s *SplitStore) protectTxnRefs(markSet MarkSetVisitor) error {
for {
var txnRefs map[cid.Cid]struct{}
@ -283,30 +283,29 @@ func (s *SplitStore) protectTxnRefs(markSet MarkSet) error {
// transactionally protect a reference by walking the object and marking.
// concurrent markings are short circuited by checking the markset.
func (s *SplitStore) doTxnProtect(root cid.Cid, markSet MarkSet) error {
func (s *SplitStore) doTxnProtect(root cid.Cid, markSet MarkSetVisitor) error {
if err := s.checkClosing(); err != nil {
return err
}
// Note: cold objects are deleted heaviest first, so the consituents of an object
// cannot be deleted before the object itself.
return s.walkObjectIncomplete(root, cid.NewSet(),
return s.walkObjectIncomplete(root, tmpVisitor(),
func(c cid.Cid) error {
if isUnitaryObject(c) {
return errStopWalk
}
mark, err := markSet.Has(c)
visit, err := markSet.Visit(c)
if err != nil {
return xerrors.Errorf("error checking markset: %w", err)
return xerrors.Errorf("error visiting object: %w", err)
}
// it's marked, nothing to do
if mark {
if !visit {
return errStopWalk
}
return markSet.Mark(c)
return nil
},
func(c cid.Cid) error {
if s.txnMissing != nil {
@ -382,7 +381,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
log.Infow("running compaction", "currentEpoch", currentEpoch, "baseEpoch", s.baseEpoch, "boundaryEpoch", boundaryEpoch, "inclMsgsEpoch", inclMsgsEpoch, "compactionIndex", s.compactionIndex)
markSet, err := s.markSetEnv.Create("live", s.markSetSize)
markSet, err := s.markSetEnv.CreateVisitor("live", s.markSetSize)
if err != nil {
return xerrors.Errorf("error creating mark set: %w", err)
}
@ -410,14 +409,23 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
startMark := time.Now()
var count int64
err = s.walkChain(curTs, boundaryEpoch, inclMsgsEpoch,
err = s.walkChain(curTs, boundaryEpoch, inclMsgsEpoch, &noopVisitor{},
func(c cid.Cid) error {
if isUnitaryObject(c) {
return errStopWalk
}
visit, err := markSet.Visit(c)
if err != nil {
return xerrors.Errorf("error visiting object: %w", err)
}
if !visit {
return errStopWalk
}
count++
return markSet.Mark(c)
return nil
})
if err != nil {
@ -578,12 +586,8 @@ func (s *SplitStore) beginTxnProtect() {
s.txnMissing = make(map[cid.Cid]struct{})
}
func (s *SplitStore) beginTxnMarking(markSet MarkSet) {
func (s *SplitStore) beginTxnMarking(markSet MarkSetVisitor) {
markSet.SetConcurrent()
s.txnLk.Lock()
s.txnProtect = markSet
s.txnLk.Unlock()
}
func (s *SplitStore) endTxnProtect() {
@ -594,21 +598,14 @@ func (s *SplitStore) endTxnProtect() {
return
}
// release markset memory
if s.txnProtect != nil {
_ = s.txnProtect.Close()
}
s.txnActive = false
s.txnProtect = nil
s.txnRefs = nil
s.txnMissing = nil
}
func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEpoch,
f func(cid.Cid) error) error {
visited := cid.NewSet()
walked := cid.NewSet()
visitor ObjectVisitor, f func(cid.Cid) error) error {
var walked *cid.Set
toWalk := ts.Cids()
walkCnt := 0
scanCnt := 0
@ -616,7 +613,7 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp
stopWalk := func(_ cid.Cid) error { return errStopWalk }
walkBlock := func(c cid.Cid) error {
if !visited.Visit(c) {
if !walked.Visit(c) {
return nil
}
@ -640,19 +637,19 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp
if inclMsgs < inclState {
// we need to use walkObjectIncomplete here, as messages/receipts may be missing early on if we
// synced from snapshot and have a long HotStoreMessageRetentionPolicy.
if err := s.walkObjectIncomplete(hdr.Messages, walked, f, stopWalk); err != nil {
if err := s.walkObjectIncomplete(hdr.Messages, visitor, f, stopWalk); err != nil {
return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err)
}
if err := s.walkObjectIncomplete(hdr.ParentMessageReceipts, walked, f, stopWalk); err != nil {
if err := s.walkObjectIncomplete(hdr.ParentMessageReceipts, visitor, f, stopWalk); err != nil {
return xerrors.Errorf("error walking messages receipts (cid: %s): %w", hdr.ParentMessageReceipts, err)
}
} else {
if err := s.walkObject(hdr.Messages, walked, f); err != nil {
if err := s.walkObject(hdr.Messages, visitor, f); err != nil {
return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err)
}
if err := s.walkObject(hdr.ParentMessageReceipts, walked, f); err != nil {
if err := s.walkObject(hdr.ParentMessageReceipts, visitor, f); err != nil {
return xerrors.Errorf("error walking message receipts (cid: %s): %w", hdr.ParentMessageReceipts, err)
}
}
@ -660,7 +657,7 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp
// state is only retained if within the inclState boundary, with the exception of genesis
if hdr.Height >= inclState || hdr.Height == 0 {
if err := s.walkObject(hdr.ParentStateRoot, walked, f); err != nil {
if err := s.walkObject(hdr.ParentStateRoot, visitor, f); err != nil {
return xerrors.Errorf("error walking state root (cid: %s): %w", hdr.ParentStateRoot, err)
}
scanCnt++
@ -679,6 +676,10 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp
return err
}
// the walk is BFS, so we can reset the walked set in every iteration and avoid building up
// a set that contains all blocks (1M epochs -> 5M blocks -> 200MB worth of memory and growing
// over time)
walked = cid.NewSet()
walking := toWalk
toWalk = nil
for _, c := range walking {
@ -693,8 +694,13 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp
return nil
}
func (s *SplitStore) walkObject(c cid.Cid, walked *cid.Set, f func(cid.Cid) error) error {
if !walked.Visit(c) {
func (s *SplitStore) walkObject(c cid.Cid, visitor ObjectVisitor, f func(cid.Cid) error) error {
visit, err := visitor.Visit(c)
if err != nil {
return xerrors.Errorf("error visiting object: %w", err)
}
if !visit {
return nil
}
@ -716,7 +722,7 @@ func (s *SplitStore) walkObject(c cid.Cid, walked *cid.Set, f func(cid.Cid) erro
}
var links []cid.Cid
err := s.view(c, func(data []byte) error {
err = s.view(c, func(data []byte) error {
return cbg.ScanForLinks(bytes.NewReader(data), func(c cid.Cid) {
links = append(links, c)
})
@ -727,7 +733,7 @@ func (s *SplitStore) walkObject(c cid.Cid, walked *cid.Set, f func(cid.Cid) erro
}
for _, c := range links {
err := s.walkObject(c, walked, f)
err := s.walkObject(c, visitor, f)
if err != nil {
return xerrors.Errorf("error walking link (cid: %s): %w", c, err)
}
@ -737,8 +743,13 @@ func (s *SplitStore) walkObject(c cid.Cid, walked *cid.Set, f func(cid.Cid) erro
}
// like walkObject, but the object may be potentially incomplete (references missing)
func (s *SplitStore) walkObjectIncomplete(c cid.Cid, walked *cid.Set, f, missing func(cid.Cid) error) error {
if !walked.Visit(c) {
func (s *SplitStore) walkObjectIncomplete(c cid.Cid, visitor ObjectVisitor, f, missing func(cid.Cid) error) error {
visit, err := visitor.Visit(c)
if err != nil {
return xerrors.Errorf("error visiting object: %w", err)
}
if !visit {
return nil
}
@ -777,7 +788,7 @@ func (s *SplitStore) walkObjectIncomplete(c cid.Cid, walked *cid.Set, f, missing
}
var links []cid.Cid
err := s.view(c, func(data []byte) error {
err = s.view(c, func(data []byte) error {
return cbg.ScanForLinks(bytes.NewReader(data), func(c cid.Cid) {
links = append(links, c)
})
@ -788,7 +799,7 @@ func (s *SplitStore) walkObjectIncomplete(c cid.Cid, walked *cid.Set, f, missing
}
for _, c := range links {
err := s.walkObjectIncomplete(c, walked, f, missing)
err := s.walkObjectIncomplete(c, visitor, f, missing)
if err != nil {
return xerrors.Errorf("error walking link (cid: %s): %w", c, err)
}
@ -984,7 +995,7 @@ func (s *SplitStore) purgeBatch(cids []cid.Cid, deleteBatch func([]cid.Cid) erro
return nil
}
func (s *SplitStore) purge(cids []cid.Cid, markSet MarkSet) error {
func (s *SplitStore) purge(cids []cid.Cid, markSet MarkSetVisitor) error {
deadCids := make([]cid.Cid, 0, batchSize)
var purgeCnt, liveCnt int
defer func() {
@ -1050,7 +1061,7 @@ func (s *SplitStore) purge(cids []cid.Cid, markSet MarkSet) error {
// have this gem[TM].
// My best guess is that they are parent message receipts or yet to be computed state roots; magik
// thinks the cause may be block validation.
func (s *SplitStore) waitForMissingRefs(markSet MarkSet) {
func (s *SplitStore) waitForMissingRefs(markSet MarkSetVisitor) {
s.txnLk.Lock()
missing := s.txnMissing
s.txnMissing = nil
@ -1079,27 +1090,27 @@ func (s *SplitStore) waitForMissingRefs(markSet MarkSet) {
}
towalk := missing
walked := cid.NewSet()
visitor := tmpVisitor()
missing = make(map[cid.Cid]struct{})
for c := range towalk {
err := s.walkObjectIncomplete(c, walked,
err := s.walkObjectIncomplete(c, visitor,
func(c cid.Cid) error {
if isUnitaryObject(c) {
return errStopWalk
}
mark, err := markSet.Has(c)
visit, err := markSet.Visit(c)
if err != nil {
return xerrors.Errorf("error checking markset for %s: %w", c, err)
return xerrors.Errorf("error visiting object: %w", err)
}
if mark {
if !visit {
return errStopWalk
}
count++
return markSet.Mark(c)
return nil
},
func(c cid.Cid) error {
missing[c] = struct{}{}

View File

@ -59,7 +59,15 @@ func (s *SplitStore) doWarmup(curTs *types.TipSet) error {
count := int64(0)
xcount := int64(0)
missing := int64(0)
err := s.walkChain(curTs, boundaryEpoch, epoch+1, // we don't load messages/receipts in warmup
visitor, err := s.markSetEnv.CreateVisitor("warmup", 0)
if err != nil {
return xerrors.Errorf("error creating visitor: %w", err)
}
defer visitor.Close() //nolint
err = s.walkChain(curTs, boundaryEpoch, epoch+1, // we don't load messages/receipts in warmup
visitor,
func(c cid.Cid) error {
if isUnitaryObject(c) {
return errStopWalk

View File

@ -0,0 +1,32 @@
package splitstore
import (
cid "github.com/ipfs/go-cid"
)
// ObjectVisitor is an interface for deduplicating objects during walks
type ObjectVisitor interface {
Visit(cid.Cid) (bool, error)
}
type noopVisitor struct{}
var _ ObjectVisitor = (*noopVisitor)(nil)
func (v *noopVisitor) Visit(_ cid.Cid) (bool, error) {
return true, nil
}
type cidSetVisitor struct {
set *cid.Set
}
var _ ObjectVisitor = (*cidSetVisitor)(nil)
func (v *cidSetVisitor) Visit(c cid.Cid) (bool, error) {
return v.set.Visit(c), nil
}
func tmpVisitor() ObjectVisitor {
return &cidSetVisitor{set: cid.NewSet()}
}

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -479,12 +479,15 @@ func (s *state{{.v}}) EraseAllUnproven() error {
return dls.UpdateDeadline(s.store, dindx, dl)
})
if err != nil {
return err
}
return s.State.SaveDeadlines(s.store, dls)
{{else}}
// field doesn't exist until v2
return nil
{{end}}
return nil
}
func (d *deadline{{.v}}) LoadPartition(idx uint64) (Partition, error) {

View File

@ -444,8 +444,8 @@ func (s *state0) decodeSectorPreCommitOnChainInfo(val *cbg.Deferred) (SectorPreC
func (s *state0) EraseAllUnproven() error {
// field doesn't exist until v2
return nil
}
func (d *deadline0) LoadPartition(idx uint64) (Partition, error) {

View File

@ -470,10 +470,12 @@ func (s *state2) EraseAllUnproven() error {
return dls.UpdateDeadline(s.store, dindx, dl)
})
if err != nil {
return err
}
return s.State.SaveDeadlines(s.store, dls)
return nil
}
func (d *deadline2) LoadPartition(idx uint64) (Partition, error) {

View File

@ -467,10 +467,12 @@ func (s *state3) EraseAllUnproven() error {
return dls.UpdateDeadline(s.store, dindx, dl)
})
if err != nil {
return err
}
return s.State.SaveDeadlines(s.store, dls)
return nil
}
func (d *deadline3) LoadPartition(idx uint64) (Partition, error) {

View File

@ -467,10 +467,12 @@ func (s *state4) EraseAllUnproven() error {
return dls.UpdateDeadline(s.store, dindx, dl)
})
if err != nil {
return err
}
return s.State.SaveDeadlines(s.store, dls)
return nil
}
func (d *deadline4) LoadPartition(idx uint64) (Partition, error) {

View File

@ -467,10 +467,12 @@ func (s *state5) EraseAllUnproven() error {
return dls.UpdateDeadline(s.store, dindx, dl)
})
if err != nil {
return err
}
return s.State.SaveDeadlines(s.store, dls)
return nil
}
func (d *deadline5) LoadPartition(idx uint64) (Partition, error) {

View File

@ -4,6 +4,7 @@ import (
"sort"
"github.com/filecoin-project/go-state-types/big"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/network"
@ -168,31 +169,31 @@ func SetMinVerifiedDealSize(size abi.StoragePower) {
}
func GetMaxProveCommitDuration(ver actors.Version, t abi.RegisteredSealProof) abi.ChainEpoch {
func GetMaxProveCommitDuration(ver actors.Version, t abi.RegisteredSealProof) (abi.ChainEpoch, error) {
switch ver {
case actors.Version0:
return miner0.MaxSealDuration[t]
return miner0.MaxSealDuration[t], nil
case actors.Version2:
return miner2.MaxProveCommitDuration[t]
return miner2.MaxProveCommitDuration[t], nil
case actors.Version3:
return miner3.MaxProveCommitDuration[t]
return miner3.MaxProveCommitDuration[t], nil
case actors.Version4:
return miner4.MaxProveCommitDuration[t]
return miner4.MaxProveCommitDuration[t], nil
case actors.Version5:
return miner5.MaxProveCommitDuration[t]
return miner5.MaxProveCommitDuration[t], nil
default:
panic("unsupported actors version")
return 0, xerrors.Errorf("unsupported actors version")
}
}
@ -227,31 +228,40 @@ func DealProviderCollateralBounds(
size abi.PaddedPieceSize, verified bool,
rawBytePower, qaPower, baselinePower abi.StoragePower,
circulatingFil abi.TokenAmount, nwVer network.Version,
) (min, max abi.TokenAmount) {
switch actors.VersionForNetwork(nwVer) {
) (min, max abi.TokenAmount, err error) {
v, err := actors.VersionForNetwork(nwVer)
if err != nil {
return big.Zero(), big.Zero(), err
}
switch v {
case actors.Version0:
return market0.DealProviderCollateralBounds(size, verified, rawBytePower, qaPower, baselinePower, circulatingFil, nwVer)
min, max := market0.DealProviderCollateralBounds(size, verified, rawBytePower, qaPower, baselinePower, circulatingFil, nwVer)
return min, max, nil
case actors.Version2:
return market2.DealProviderCollateralBounds(size, verified, rawBytePower, qaPower, baselinePower, circulatingFil)
min, max := market2.DealProviderCollateralBounds(size, verified, rawBytePower, qaPower, baselinePower, circulatingFil)
return min, max, nil
case actors.Version3:
return market3.DealProviderCollateralBounds(size, verified, rawBytePower, qaPower, baselinePower, circulatingFil)
min, max := market3.DealProviderCollateralBounds(size, verified, rawBytePower, qaPower, baselinePower, circulatingFil)
return min, max, nil
case actors.Version4:
return market4.DealProviderCollateralBounds(size, verified, rawBytePower, qaPower, baselinePower, circulatingFil)
min, max := market4.DealProviderCollateralBounds(size, verified, rawBytePower, qaPower, baselinePower, circulatingFil)
return min, max, nil
case actors.Version5:
return market5.DealProviderCollateralBounds(size, verified, rawBytePower, qaPower, baselinePower, circulatingFil)
min, max := market5.DealProviderCollateralBounds(size, verified, rawBytePower, qaPower, baselinePower, circulatingFil)
return min, max, nil
default:
panic("unsupported actors version")
return big.Zero(), big.Zero(), xerrors.Errorf("unsupported actors version")
}
}
@ -310,8 +320,11 @@ func GetMaxPoStPartitions(nv network.Version, p abi.RegisteredPoStProof) (int, e
if err != nil {
return 0, err
}
maxSectors := uint64(GetAddressedSectorsMax(nv))
return int(maxSectors / sectorsPerPart), nil
maxSectors, err := GetAddressedSectorsMax(nv)
if err != nil {
return 0, err
}
return int(uint64(maxSectors) / sectorsPerPart), nil
}
func GetDefaultSectorSize() abi.SectorSize {
@ -345,82 +358,94 @@ func GetSectorMaxLifetime(proof abi.RegisteredSealProof, nwVer network.Version)
return builtin5.SealProofPoliciesV11[proof].SectorMaxLifetime
}
func GetAddressedSectorsMax(nwVer network.Version) int {
switch actors.VersionForNetwork(nwVer) {
func GetAddressedSectorsMax(nwVer network.Version) (int, error) {
v, err := actors.VersionForNetwork(nwVer)
if err != nil {
return 0, err
}
switch v {
case actors.Version0:
return miner0.AddressedSectorsMax
return miner0.AddressedSectorsMax, nil
case actors.Version2:
return miner2.AddressedSectorsMax
return miner2.AddressedSectorsMax, nil
case actors.Version3:
return miner3.AddressedSectorsMax
return miner3.AddressedSectorsMax, nil
case actors.Version4:
return miner4.AddressedSectorsMax
return miner4.AddressedSectorsMax, nil
case actors.Version5:
return miner5.AddressedSectorsMax
return miner5.AddressedSectorsMax, nil
default:
panic("unsupported network version")
return 0, xerrors.Errorf("unsupported network version")
}
}
func GetDeclarationsMax(nwVer network.Version) int {
switch actors.VersionForNetwork(nwVer) {
func GetDeclarationsMax(nwVer network.Version) (int, error) {
v, err := actors.VersionForNetwork(nwVer)
if err != nil {
return 0, err
}
switch v {
case actors.Version0:
// TODO: Should we instead panic here since the concept doesn't exist yet?
return miner0.AddressedPartitionsMax
// TODO: Should we instead error here since the concept doesn't exist yet?
return miner0.AddressedPartitionsMax, nil
case actors.Version2:
return miner2.DeclarationsMax
return miner2.DeclarationsMax, nil
case actors.Version3:
return miner3.DeclarationsMax
return miner3.DeclarationsMax, nil
case actors.Version4:
return miner4.DeclarationsMax
return miner4.DeclarationsMax, nil
case actors.Version5:
return miner5.DeclarationsMax
return miner5.DeclarationsMax, nil
default:
panic("unsupported network version")
return 0, xerrors.Errorf("unsupported network version")
}
}
func AggregateNetworkFee(nwVer network.Version, aggregateSize int, baseFee abi.TokenAmount) abi.TokenAmount {
switch actors.VersionForNetwork(nwVer) {
func AggregateNetworkFee(nwVer network.Version, aggregateSize int, baseFee abi.TokenAmount) (abi.TokenAmount, error) {
v, err := actors.VersionForNetwork(nwVer)
if err != nil {
return big.Zero(), err
}
switch v {
case actors.Version0:
return big.Zero()
return big.Zero(), nil
case actors.Version2:
return big.Zero()
return big.Zero(), nil
case actors.Version3:
return big.Zero()
return big.Zero(), nil
case actors.Version4:
return big.Zero()
return big.Zero(), nil
case actors.Version5:
return miner5.AggregateNetworkFee(aggregateSize, baseFee)
return miner5.AggregateNetworkFee(aggregateSize, baseFee), nil
default:
panic("unsupported network version")
return big.Zero(), xerrors.Errorf("unsupported network version")
}
}

View File

@ -4,6 +4,7 @@ import (
"sort"
"github.com/filecoin-project/go-state-types/big"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/network"
@ -117,18 +118,18 @@ func SetMinVerifiedDealSize(size abi.StoragePower) {
{{end}}
}
func GetMaxProveCommitDuration(ver actors.Version, t abi.RegisteredSealProof) abi.ChainEpoch {
func GetMaxProveCommitDuration(ver actors.Version, t abi.RegisteredSealProof) (abi.ChainEpoch, error) {
switch ver {
{{range .versions}}
case actors.Version{{.}}:
{{if (eq . 0)}}
return miner{{.}}.MaxSealDuration[t]
return miner{{.}}.MaxSealDuration[t], nil
{{else}}
return miner{{.}}.MaxProveCommitDuration[t]
return miner{{.}}.MaxProveCommitDuration[t], nil
{{end}}
{{end}}
default:
panic("unsupported actors version")
return 0, xerrors.Errorf("unsupported actors version")
}
}
@ -150,18 +151,24 @@ func DealProviderCollateralBounds(
size abi.PaddedPieceSize, verified bool,
rawBytePower, qaPower, baselinePower abi.StoragePower,
circulatingFil abi.TokenAmount, nwVer network.Version,
) (min, max abi.TokenAmount) {
switch actors.VersionForNetwork(nwVer) {
) (min, max abi.TokenAmount, err error) {
v, err := actors.VersionForNetwork(nwVer)
if err != nil {
return big.Zero(), big.Zero(), err
}
switch v {
{{range .versions}}
case actors.Version{{.}}:
{{if (eq . 0)}}
return market{{.}}.DealProviderCollateralBounds(size, verified, rawBytePower, qaPower, baselinePower, circulatingFil, nwVer)
min, max := market{{.}}.DealProviderCollateralBounds(size, verified, rawBytePower, qaPower, baselinePower, circulatingFil, nwVer)
return min, max, nil
{{else}}
return market{{.}}.DealProviderCollateralBounds(size, verified, rawBytePower, qaPower, baselinePower, circulatingFil)
min, max := market{{.}}.DealProviderCollateralBounds(size, verified, rawBytePower, qaPower, baselinePower, circulatingFil)
return min, max, nil
{{end}}
{{end}}
default:
panic("unsupported actors version")
return big.Zero(), big.Zero(), xerrors.Errorf("unsupported actors version")
}
}
@ -201,8 +208,11 @@ func GetMaxPoStPartitions(nv network.Version, p abi.RegisteredPoStProof) (int, e
if err != nil {
return 0, err
}
maxSectors := uint64(GetAddressedSectorsMax(nv))
return int(maxSectors / sectorsPerPart), nil
maxSectors, err := GetAddressedSectorsMax(nv)
if err != nil {
return 0, err
}
return int(uint64(maxSectors) / sectorsPerPart), nil
}
func GetDefaultSectorSize() abi.SectorSize {
@ -236,44 +246,56 @@ func GetSectorMaxLifetime(proof abi.RegisteredSealProof, nwVer network.Version)
return builtin{{.latestVersion}}.SealProofPoliciesV11[proof].SectorMaxLifetime
}
func GetAddressedSectorsMax(nwVer network.Version) int {
switch actors.VersionForNetwork(nwVer) {
func GetAddressedSectorsMax(nwVer network.Version) (int, error) {
v, err := actors.VersionForNetwork(nwVer)
if err != nil {
return 0, err
}
switch v {
{{range .versions}}
case actors.Version{{.}}:
return miner{{.}}.AddressedSectorsMax
return miner{{.}}.AddressedSectorsMax, nil
{{end}}
default:
panic("unsupported network version")
return 0, xerrors.Errorf("unsupported network version")
}
}
func GetDeclarationsMax(nwVer network.Version) int {
switch actors.VersionForNetwork(nwVer) {
func GetDeclarationsMax(nwVer network.Version) (int, error) {
v, err := actors.VersionForNetwork(nwVer)
if err != nil {
return 0, err
}
switch v {
{{range .versions}}
case actors.Version{{.}}:
{{if (eq . 0)}}
// TODO: Should we instead panic here since the concept doesn't exist yet?
return miner{{.}}.AddressedPartitionsMax
// TODO: Should we instead error here since the concept doesn't exist yet?
return miner{{.}}.AddressedPartitionsMax, nil
{{else}}
return miner{{.}}.DeclarationsMax
return miner{{.}}.DeclarationsMax, nil
{{end}}
{{end}}
default:
panic("unsupported network version")
return 0, xerrors.Errorf("unsupported network version")
}
}
func AggregateNetworkFee(nwVer network.Version, aggregateSize int, baseFee abi.TokenAmount) abi.TokenAmount {
switch actors.VersionForNetwork(nwVer) {
func AggregateNetworkFee(nwVer network.Version, aggregateSize int, baseFee abi.TokenAmount) (abi.TokenAmount, error) {
v, err := actors.VersionForNetwork(nwVer)
if err != nil {
return big.Zero(), err
}
switch v {
{{range .versions}}
case actors.Version{{.}}:
{{if (le . 4)}}
return big.Zero()
return big.Zero(), nil
{{else}}
return miner{{.}}.AggregateNetworkFee(aggregateSize, baseFee)
return miner{{.}}.AggregateNetworkFee(aggregateSize, baseFee), nil
{{end}}
{{end}}
default:
panic("unsupported network version")
return big.Zero(), xerrors.Errorf("unsupported network version")
}
}

View File

@ -21,19 +21,19 @@ const (
)
// Converts a network version into an actors adt version.
func VersionForNetwork(version network.Version) Version {
func VersionForNetwork(version network.Version) (Version, error) {
switch version {
case network.Version0, network.Version1, network.Version2, network.Version3:
return Version0
return Version0, nil
case network.Version4, network.Version5, network.Version6, network.Version6AndAHalf, network.Version7, network.Version8, network.Version9:
return Version2
return Version2, nil
case network.Version10, network.Version11:
return Version3
return Version3, nil
case network.Version12:
return Version4
return Version4, nil
case network.Version13:
return Version5
return Version5, nil
default:
panic(fmt.Sprintf("unsupported network version %d", version))
return -1, fmt.Errorf("unsupported network version %d", version)
}
}

View File

@ -149,7 +149,10 @@ func MakeInitialStateTree(ctx context.Context, bs bstore.Blockstore, template ge
return nil, nil, xerrors.Errorf("making new state tree: %w", err)
}
av := actors.VersionForNetwork(template.NetworkVersion)
av, err := actors.VersionForNetwork(template.NetworkVersion)
if err != nil {
return nil, nil, xerrors.Errorf("getting network version: %w", err)
}
// Create system actor

View File

@ -81,7 +81,10 @@ func mkFakedSigSyscalls(base vm.SyscallBuilder) vm.SyscallBuilder {
func SetupStorageMiners(ctx context.Context, cs *store.ChainStore, sys vm.SyscallBuilder, sroot cid.Cid, miners []genesis.Miner, nv network.Version) (cid.Cid, error) {
cst := cbor.NewCborStore(cs.StateBlockstore())
av := actors.VersionForNetwork(nv)
av, err := actors.VersionForNetwork(nv)
if err != nil {
return cid.Undef, xerrors.Errorf("failed to get network version: %w", err)
}
csc := func(context.Context, abi.ChainEpoch, *state.StateTree) (abi.TokenAmount, error) {
return big.Zero(), nil
@ -291,7 +294,7 @@ func SetupStorageMiners(ctx context.Context, cs *store.ChainStore, sys vm.Syscal
return cid.Undef, xerrors.Errorf("setting power state: %w", err)
}
rewact, err := SetupRewardActor(ctx, cs.StateBlockstore(), big.Zero(), actors.VersionForNetwork(nv))
rewact, err := SetupRewardActor(ctx, cs.StateBlockstore(), big.Zero(), av)
if err != nil {
return cid.Undef, xerrors.Errorf("setup reward actor: %w", err)
}

View File

@ -39,7 +39,10 @@ type ActorPredicate func(vmr.Runtime, rtt.VMActor) error
func ActorsVersionPredicate(ver actors.Version) ActorPredicate {
return func(rt vmr.Runtime, v rtt.VMActor) error {
aver := actors.VersionForNetwork(rt.NetworkVersion())
aver, err := actors.VersionForNetwork(rt.NetworkVersion())
if err != nil {
return xerrors.Errorf("unsupported network version: %w", err)
}
if aver != ver {
return xerrors.Errorf("actor %s is a version %d actor; chain only supports actor version %d at height %d and nver %d", v.Code(), ver, aver, rt.CurrEpoch(), rt.NetworkVersion())
}

View File

@ -54,7 +54,12 @@ func TryCreateAccountActor(rt *Runtime, addr address.Address) (*types.Actor, add
return nil, address.Undef, aerrors.Escalate(err, "registering actor address")
}
act, aerr := makeActor(actors.VersionForNetwork(rt.NetworkVersion()), addr)
av, err := actors.VersionForNetwork(rt.NetworkVersion())
if err != nil {
return nil, address.Undef, aerrors.Escalate(err, "unsupported network version")
}
act, aerr := makeActor(av, addr)
if aerr != nil {
return nil, address.Undef, aerr
}

View File

@ -561,7 +561,15 @@ var sectorsExtendCmd = &cli.Command{
for l, exts := range extensions {
for newExp, numbers := range exts {
scount += len(numbers)
if scount > policy.GetAddressedSectorsMax(nv) || len(p.Extensions) == policy.GetDeclarationsMax(nv) {
addressedMax, err := policy.GetAddressedSectorsMax(nv)
if err != nil {
return xerrors.Errorf("failed to get addressed sectors max")
}
declMax, err := policy.GetDeclarationsMax(nv)
if err != nil {
return xerrors.Errorf("failed to get declarations max")
}
if scount > addressedMax || len(p.Extensions) == declMax {
params = append(params, p)
p = miner3.ExtendSectorExpirationParams{}
scount = len(numbers)

View File

@ -6,6 +6,8 @@ import (
"os"
"sort"
_init "github.com/filecoin-project/lotus/chain/actors/builtin/init"
"github.com/filecoin-project/lotus/chain/actors/builtin"
"github.com/fatih/color"
@ -171,6 +173,24 @@ var genesisVerifyCmd = &cli.Command{
}
fmt.Printf("]\n")
}
act, err := stree.GetActor(_init.Address)
if err != nil {
return err
}
ias, err := _init.Load(store, act)
if err != nil {
return err
}
nn, err := ias.NetworkName()
if err != nil {
return err
}
fmt.Println("Network name: ", nn)
return nil
},
}

View File

@ -271,7 +271,7 @@ func (bb *BlockBuilder) StateManager() *stmgr.StateManager {
}
// ActorsVersion returns the actors version for the target block.
func (bb *BlockBuilder) ActorsVersion() actors.Version {
func (bb *BlockBuilder) ActorsVersion() (actors.Version, error) {
return actors.VersionForNetwork(bb.NetworkVersion())
}

View File

@ -145,7 +145,10 @@ func (fs *FundingStage) PackMessages(ctx context.Context, bb *blockbuilder.Block
store := bb.ActorStore()
epoch := bb.Height()
actorsVersion := bb.ActorsVersion()
actorsVersion, err := bb.ActorsVersion()
if err != nil {
return err
}
var accounts, multisigs int
defer func() {

View File

@ -280,7 +280,11 @@ func (stage *ProveCommitStage) packProveCommitsMiner(
// It will drop any pre-commits that have already expired.
func (stage *ProveCommitStage) loadMiner(ctx context.Context, bb *blockbuilder.BlockBuilder, addr address.Address) error {
epoch := bb.Height()
av := bb.ActorsVersion()
av, err := bb.ActorsVersion()
if err != nil {
return err
}
minerState, err := loadMiner(bb.ActorStore(), bb.ParentStateTree(), addr)
if err != nil {
return err
@ -291,7 +295,10 @@ func (stage *ProveCommitStage) loadMiner(ctx context.Context, bb *blockbuilder.B
var total, dropped int
err = minerState.ForEachPrecommittedSector(func(info miner.SectorPreCommitOnChainInfo) error {
total++
msd := policy.GetMaxProveCommitDuration(av, info.Info.SealProof)
msd, err := policy.GetMaxProveCommitDuration(av, info.Info.SealProof)
if err != nil {
return err
}
if epoch > info.PreCommitEpoch+msd {
dropped++
return nil
@ -327,7 +334,10 @@ func (stage *ProveCommitStage) filterProveCommits(
}
nextEpoch := bb.Height()
av := bb.ActorsVersion()
av, err := bb.ActorsVersion()
if err != nil {
return nil, err
}
good := make([]abi.SectorNumber, 0, len(snos))
for _, sno := range snos {
@ -338,7 +348,10 @@ func (stage *ProveCommitStage) filterProveCommits(
if info == nil {
continue
}
msd := policy.GetMaxProveCommitDuration(av, info.Info.SealProof)
msd, err := policy.GetMaxProveCommitDuration(av, info.Info.SealProof)
if err != nil {
return nil, err
}
if nextEpoch > info.PreCommitEpoch+msd {
continue
}

View File

@ -27,6 +27,7 @@
* [ChainGetRandomnessFromBeacon](#ChainGetRandomnessFromBeacon)
* [ChainGetRandomnessFromTickets](#ChainGetRandomnessFromTickets)
* [ChainGetTipSet](#ChainGetTipSet)
* [ChainGetTipSetAfterHeight](#ChainGetTipSetAfterHeight)
* [ChainGetTipSetByHeight](#ChainGetTipSetByHeight)
* [ChainHasObj](#ChainHasObj)
* [ChainHead](#ChainHead)
@ -766,6 +767,38 @@ Response:
}
```
### ChainGetTipSetAfterHeight
ChainGetTipSetAfterHeight looks back for a tipset at the specified epoch.
If there are no blocks at the specified epoch, the first non-nil tipset at a later epoch
will be returned.
Perms: read
Inputs:
```json
[
10101,
[
{
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
},
{
"/": "bafy2bzacebp3shtrn43k7g3unredz7fxn4gj533d3o43tqn2p2ipxxhrvchve"
}
]
]
```
Response:
```json
{
"Cids": null,
"Blocks": null,
"Height": 0
}
```
### ChainGetTipSetByHeight
ChainGetTipSetByHeight looks back for a tipset at the specified epoch.
If there are no blocks at the specified epoch, a tipset at an earlier epoch

View File

@ -343,7 +343,12 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("getting network version: %s", err)
}
aggFee := big.Div(big.Mul(policy.AggregateNetworkFee(nv, len(infos), bf), aggFeeNum), aggFeeDen)
aggFeeRaw, err := policy.AggregateNetworkFee(nv, len(infos), bf)
if err != nil {
log.Errorf("getting aggregate network fee: %s", err)
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("getting aggregate network fee: %s", err)
}
aggFee := big.Div(big.Mul(aggFeeRaw, aggFeeNum), aggFeeDen)
needFunds := big.Add(collateral, aggFee)
needFunds, err = collateralSendAmount(b.mctx, b.api, b.maddr, cfg, needFunds)
@ -563,8 +568,18 @@ func (b *CommitBatcher) getCommitCutoff(si SectorInfo) (time.Time, error) {
log.Errorf("getting precommit info: %s", err)
return time.Now(), err
}
av, err := actors.VersionForNetwork(nv)
if err != nil {
log.Errorf("unsupported network vrsion: %s", err)
return time.Now(), err
}
mpcd, err := policy.GetMaxProveCommitDuration(av, si.SectorType)
if err != nil {
log.Errorf("getting max prove commit duration: %s", err)
return time.Now(), err
}
cutoffEpoch := pci.PreCommitEpoch + policy.GetMaxProveCommitDuration(actors.VersionForNetwork(nv), si.SectorType)
cutoffEpoch := pci.PreCommitEpoch + mpcd
for _, p := range si.Pieces {
if p.DealInfo == nil {

View File

@ -143,7 +143,14 @@ func (m *Sealing) getTicket(ctx statemachine.Context, sector SectorInfo) (abi.Se
return nil, 0, allocated, xerrors.Errorf("getTicket: StateNetworkVersion: api error, not proceeding: %+v", err)
}
msd := policy.GetMaxProveCommitDuration(actors.VersionForNetwork(nv), sector.SectorType)
av, err := actors.VersionForNetwork(nv)
if err != nil {
return nil, 0, allocated, xerrors.Errorf("getTicket: actor version for network error, not proceeding: %w", err)
}
msd, err := policy.GetMaxProveCommitDuration(av, sector.SectorType)
if err != nil {
return nil, 0, allocated, xerrors.Errorf("getTicket: max prove commit duration policy error, not proceeding: %w", err)
}
if checkProveCommitExpired(pci.PreCommitEpoch, msd, epoch) {
return nil, 0, allocated, xerrors.Errorf("ticket expired for precommitted sector")
@ -223,7 +230,16 @@ func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo)
return nil
}
msd := policy.GetMaxProveCommitDuration(actors.VersionForNetwork(nv), sector.SectorType)
av, err := actors.VersionForNetwork(nv)
if err != nil {
log.Errorf("handlePreCommit1: VersionForNetwork error, not proceeding: %w", err)
return nil
}
msd, err := policy.GetMaxProveCommitDuration(av, sector.SectorType)
if err != nil {
log.Errorf("handlePreCommit1: GetMaxProveCommitDuration error, not proceeding: %w", err)
return nil
}
// if height > PreCommitEpoch + msd, there is no need to recalculate
if checkProveCommitExpired(pci.PreCommitEpoch, msd, height) {
@ -309,7 +325,14 @@ func (m *Sealing) preCommitParams(ctx statemachine.Context, sector SectorInfo) (
return nil, big.Zero(), nil, ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("failed to get network version: %w", err)})
}
msd := policy.GetMaxProveCommitDuration(actors.VersionForNetwork(nv), sector.SectorType)
av, err := actors.VersionForNetwork(nv)
if err != nil {
return nil, big.Zero(), nil, ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("failed to get actors version: %w", err)})
}
msd, err := policy.GetMaxProveCommitDuration(av, sector.SectorType)
if err != nil {
return nil, big.Zero(), nil, ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("failed to get max prove commit duration: %w", err)})
}
if minExpiration := sector.TicketEpoch + policy.MaxPreCommitRandomnessLookback + msd + miner.MinSectorExpiration; expiration < minExpiration {
expiration = minExpiration

View File

@ -36,6 +36,7 @@ type TargetAPI interface {
ChainGetNode(ctx context.Context, p string) (*api.IpldObject, error)
ChainGetTipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error)
ChainGetTipSetByHeight(ctx context.Context, h abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error)
ChainGetTipSetAfterHeight(ctx context.Context, h abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error)
ChainHasObj(context.Context, cid.Cid) (bool, error)
ChainHead(ctx context.Context) (*types.TipSet, error)
ChainNotify(context.Context) (<-chan []*api.HeadChange, error)
@ -163,32 +164,48 @@ func (gw *Node) ChainGetTipSet(ctx context.Context, tsk types.TipSetKey) (*types
}
func (gw *Node) ChainGetTipSetByHeight(ctx context.Context, h abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error) {
if err := gw.checkTipSetHeight(ctx, h, tsk); err != nil {
return nil, err
}
return gw.target.ChainGetTipSetByHeight(ctx, h, tsk)
}
func (gw *Node) ChainGetTipSetAfterHeight(ctx context.Context, h abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error) {
if err := gw.checkTipSetHeight(ctx, h, tsk); err != nil {
return nil, err
}
return gw.target.ChainGetTipSetAfterHeight(ctx, h, tsk)
}
func (gw *Node) checkTipSetHeight(ctx context.Context, h abi.ChainEpoch, tsk types.TipSetKey) error {
var ts *types.TipSet
if tsk.IsEmpty() {
head, err := gw.target.ChainHead(ctx)
if err != nil {
return nil, err
return err
}
ts = head
} else {
gts, err := gw.target.ChainGetTipSet(ctx, tsk)
if err != nil {
return nil, err
return err
}
ts = gts
}
// Check if the tipset key refers to gw tipset that's too far in the past
if err := gw.checkTipset(ts); err != nil {
return nil, err
return err
}
// Check if the height is too far in the past
if err := gw.checkTipsetHeight(ts, h); err != nil {
return nil, err
return err
}
return gw.target.ChainGetTipSetByHeight(ctx, h, tsk)
return nil
}
func (gw *Node) ChainGetNode(ctx context.Context, p string) (*api.IpldObject, error) {

View File

@ -17,6 +17,7 @@ import (
market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
@ -203,9 +204,9 @@ func (p *DealPublisher) processNewDeal(pdeal *pendingDeal) {
log.Infof("add deal with piece CID %s to publish deals queue - %d deals in queue (max queue size %d)",
pdeal.deal.Proposal.PieceCID, len(p.pending), p.maxDealsPerPublishMsg)
// If the maximum number of deals per message has been reached,
// send a publish message
if uint64(len(p.pending)) >= p.maxDealsPerPublishMsg {
// If the maximum number of deals per message has been reached or we're not batching, send a
// publish message
if uint64(len(p.pending)) >= p.maxDealsPerPublishMsg || p.publishPeriod == 0 {
log.Infof("publish deals queue has reached max size of %d, publishing deals", p.maxDealsPerPublishMsg)
p.publishAllDeals()
return
@ -218,7 +219,7 @@ func (p *DealPublisher) processNewDeal(pdeal *pendingDeal) {
func (p *DealPublisher) waitForMoreDeals() {
// Check if we're already waiting for deals
if !p.publishPeriodStart.IsZero() {
elapsed := time.Since(p.publishPeriodStart)
elapsed := build.Clock.Since(p.publishPeriodStart)
log.Infof("%s elapsed of / %s until publish deals queue is published",
elapsed, p.publishPeriod)
return
@ -227,11 +228,11 @@ func (p *DealPublisher) waitForMoreDeals() {
// Set a timeout to wait for more deals to arrive
log.Infof("waiting publish deals queue period of %s before publishing", p.publishPeriod)
ctx, cancel := context.WithCancel(p.ctx)
p.publishPeriodStart = time.Now()
p.publishPeriodStart = build.Clock.Now()
p.cancelWaitForMoreDeals = cancel
go func() {
timer := time.NewTimer(p.publishPeriod)
timer := build.Clock.Timer(p.publishPeriod)
select {
case <-ctx.Done():
timer.Stop()

View File

@ -9,12 +9,14 @@ import (
"github.com/filecoin-project/go-state-types/crypto"
market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
"github.com/ipfs/go-cid"
"github.com/raulk/clock"
"github.com/stretchr/testify/require"
tutils "github.com/filecoin-project/specs-actors/v2/support/testing"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/types"
@ -25,7 +27,11 @@ import (
)
func TestDealPublisher(t *testing.T) {
t.Skip("this test randomly fails in various subtests; see issue #6799")
oldClock := build.Clock
t.Cleanup(func() { build.Clock = oldClock })
mc := clock.NewMock()
build.Clock = mc
testCases := []struct {
name string
publishPeriod time.Duration
@ -92,6 +98,7 @@ func TestDealPublisher(t *testing.T) {
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
mc.Set(time.Now())
dpapi := newDPAPI(t)
// Create a deal publisher
@ -116,7 +123,31 @@ func TestDealPublisher(t *testing.T) {
}
// Wait until publish period has elapsed
time.Sleep(2 * tc.publishPeriod)
if tc.publishPeriod > 0 {
// If we expect deals to get stuck in the queue, wait until that happens
if tc.maxDealsPerMsg != 0 && tc.dealCountWithinPublishPeriod%int(tc.maxDealsPerMsg) != 0 {
require.Eventually(t, func() bool {
dp.lk.Lock()
defer dp.lk.Unlock()
return !dp.publishPeriodStart.IsZero()
}, time.Second, time.Millisecond, "failed to queue deals")
}
// Then wait to send
require.Eventually(t, func() bool {
dp.lk.Lock()
defer dp.lk.Unlock()
// Advance if necessary.
if mc.Since(dp.publishPeriodStart) <= tc.publishPeriod {
dp.lk.Unlock()
mc.Set(dp.publishPeriodStart.Add(tc.publishPeriod + 1))
dp.lk.Lock()
}
return len(dp.pending) == 0
}, time.Second, time.Millisecond, "failed to send pending messages")
}
// Publish deals after publish period
for i := 0; i < tc.dealCountAfterPublishPeriod; i++ {
@ -124,6 +155,19 @@ func TestDealPublisher(t *testing.T) {
dealsToPublish = append(dealsToPublish, deal)
}
if tc.publishPeriod > 0 && tc.dealCountAfterPublishPeriod > 0 {
require.Eventually(t, func() bool {
dp.lk.Lock()
defer dp.lk.Unlock()
if mc.Since(dp.publishPeriodStart) <= tc.publishPeriod {
dp.lk.Unlock()
mc.Set(dp.publishPeriodStart.Add(tc.publishPeriod + 1))
dp.lk.Lock()
}
return len(dp.pending) == 0
}, time.Second, time.Millisecond, "failed to send pending messages")
}
checkPublishedDeals(t, dpapi, dealsToPublish, tc.expectedDealsPerMsg)
})
}
@ -133,7 +177,7 @@ func TestForcePublish(t *testing.T) {
dpapi := newDPAPI(t)
// Create a deal publisher
start := time.Now()
start := build.Clock.Now()
publishPeriod := time.Hour
dp := newDealPublisher(dpapi, nil, PublishMsgConfig{
Period: publishPeriod,
@ -152,7 +196,7 @@ func TestForcePublish(t *testing.T) {
dealsToPublish = append(dealsToPublish, deal)
// Allow a moment for them to be queued
time.Sleep(10 * time.Millisecond)
build.Clock.Sleep(10 * time.Millisecond)
// Should be two deals in the pending deals list
// (deal with cancelled context is ignored)
@ -160,7 +204,7 @@ func TestForcePublish(t *testing.T) {
require.Len(t, pendingInfo.Deals, 2)
require.Equal(t, publishPeriod, pendingInfo.PublishPeriod)
require.True(t, pendingInfo.PublishPeriodStart.After(start))
require.True(t, pendingInfo.PublishPeriodStart.Before(time.Now()))
require.True(t, pendingInfo.PublishPeriodStart.Before(build.Clock.Now()))
// Force publish all pending deals
dp.ForcePublishPendingDeals()

View File

@ -107,8 +107,8 @@ func (n *ProviderNodeAdapter) OnDealComplete(ctx context.Context, deal storagema
}
p, offset, err := n.secb.AddPiece(ctx, pieceSize, pieceData, sdInfo)
curTime := time.Now()
for time.Since(curTime) < addPieceRetryTimeout {
curTime := build.Clock.Now()
for build.Clock.Since(curTime) < addPieceRetryTimeout {
if !xerrors.Is(err, sealing.ErrTooManySectorsSealing) {
if err != nil {
log.Errorf("failed to addPiece for deal %d, err: %v", deal.DealID, err)
@ -116,7 +116,7 @@ func (n *ProviderNodeAdapter) OnDealComplete(ctx context.Context, deal storagema
break
}
select {
case <-time.After(addPieceRetryWait):
case <-build.Clock.After(addPieceRetryWait):
p, offset, err = n.secb.AddPiece(ctx, pieceSize, pieceData, sdInfo)
case <-ctx.Done():
return nil, xerrors.New("context expired while waiting to retry AddPiece")

View File

@ -50,6 +50,7 @@ type ChainModuleAPI interface {
ChainGetMessage(ctx context.Context, mc cid.Cid) (*types.Message, error)
ChainGetTipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error)
ChainGetTipSetByHeight(ctx context.Context, h abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error)
ChainGetTipSetAfterHeight(ctx context.Context, h abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error)
ChainReadObj(context.Context, cid.Cid) ([]byte, error)
}
@ -266,6 +267,14 @@ func (m *ChainModule) ChainGetTipSetByHeight(ctx context.Context, h abi.ChainEpo
return m.Chain.GetTipsetByHeight(ctx, h, ts, true)
}
func (m *ChainModule) ChainGetTipSetAfterHeight(ctx context.Context, h abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error) {
ts, err := m.Chain.GetTipSetFromKey(tsk)
if err != nil {
return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err)
}
return m.Chain.GetTipsetByHeight(ctx, h, ts, false)
}
func (m *ChainModule) ChainReadObj(ctx context.Context, obj cid.Cid) ([]byte, error) {
blk, err := m.ExposedBlockstore.Get(obj)
if err != nil {

View File

@ -30,8 +30,12 @@ func (a *MsigAPI) messageBuilder(ctx context.Context, from address.Address) (mul
if err != nil {
return nil, err
}
av, err := actors.VersionForNetwork(nver)
if err != nil {
return nil, err
}
return multisig.Message(actors.VersionForNetwork(nver), from), nil
return multisig.Message(av, from), nil
}
// TODO: remove gp (gasPrice) from arguments

View File

@ -1332,13 +1332,16 @@ func (m *StateModule) StateDealProviderCollateralBounds(ctx context.Context, siz
return api.DealCollateralBounds{}, xerrors.Errorf("getting reward baseline power: %w", err)
}
min, max := policy.DealProviderCollateralBounds(size,
min, max, err := policy.DealProviderCollateralBounds(size,
verified,
powClaim.RawBytePower,
powClaim.QualityAdjPower,
rewPow,
circ.FilCirculating,
m.StateManager.GetNtwkVersion(ctx, ts.Height()))
if err != nil {
return api.DealCollateralBounds{}, xerrors.Errorf("getting deal provider coll bounds: %w", err)
}
return api.DealCollateralBounds{
Min: types.BigDiv(types.BigMul(min, dealProviderCollateralNum), dealProviderCollateralDen),
Max: max,

View File

@ -88,7 +88,11 @@ func (ca *channelAccessor) messageBuilder(ctx context.Context, from address.Addr
return nil, err
}
return paych.Message(actors.VersionForNetwork(nwVersion), from), nil
av, err := actors.VersionForNetwork(nwVersion)
if err != nil {
return nil, err
}
return paych.Message(av, from), nil
}
func (ca *channelAccessor) getChannelInfo(addr address.Address) (*ChannelInfo, error) {

View File

@ -113,6 +113,7 @@ type fullNodeFilteredAPI interface {
ChainGetRandomnessFromTickets(ctx context.Context, tsk types.TipSetKey, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte) (abi.Randomness, error)
ChainGetRandomnessFromBeacon(ctx context.Context, tsk types.TipSetKey, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte) (abi.Randomness, error)
ChainGetTipSetByHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error)
ChainGetTipSetAfterHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error)
ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error)
ChainGetMessage(ctx context.Context, mc cid.Cid) (*types.Message, error)
ChainReadObj(context.Context, cid.Cid) ([]byte, error)

View File

@ -738,8 +738,12 @@ func (s *WindowPoStScheduler) batchPartitions(partitions []api.Partition, nv net
}
// Also respect the AddressedPartitionsMax (which is the same as DeclarationsMax (which is all really just MaxPartitionsPerDeadline))
if partitionsPerMsg > policy.GetDeclarationsMax(nv) {
partitionsPerMsg = policy.GetDeclarationsMax(nv)
declMax, err := policy.GetDeclarationsMax(nv)
if err != nil {
return nil, xerrors.Errorf("getting max declarations: %w", err)
}
if partitionsPerMsg > declMax {
partitionsPerMsg = declMax
}
// The number of messages will be: