From 42bd4eccbeb9608f6cceb9644516af7b3c4a2a2b Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Mon, 22 Jun 2020 15:38:36 -0700 Subject: [PATCH 01/11] implement a persistent journal for lotus node operations --- chain/store/store.go | 9 +++++++++ node/builder.go | 2 ++ node/modules/core.go | 6 ++++++ 3 files changed, 17 insertions(+) diff --git a/chain/store/store.go b/chain/store/store.go index 4dabb96f7..f2ebf2b41 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -20,6 +20,7 @@ import ( "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/state" "github.com/filecoin-project/lotus/chain/vm" + "github.com/filecoin-project/lotus/journal" "github.com/filecoin-project/lotus/metrics" "go.opencensus.io/stats" "go.opencensus.io/trace" @@ -324,6 +325,14 @@ func (cs *ChainStore) reorgWorker(ctx context.Context, initialNotifees []ReorgNo continue } + journal.Add("sync", map[string]interface{}{ + "op": "headChange", + "from": r.old.Key(), + "to": r.new.Key(), + "rev": len(revert), + "apply": len(apply), + }) + // reverse the apply array for i := len(apply)/2 - 1; i >= 0; i-- { opp := len(apply) - 1 - i diff --git a/node/builder.go b/node/builder.go index 2ffe88921..5246028e7 100644 --- a/node/builder.go +++ b/node/builder.go @@ -119,6 +119,7 @@ const ( ExtractApiKey HeadMetricsKey RunPeerTaggerKey + JournalKey SetApiEndpointKey @@ -150,6 +151,7 @@ func defaults() []Option { Override(new(record.Validator), modules.RecordValidator), Override(new(dtypes.Bootstrapper), dtypes.Bootstrapper(false)), Override(new(dtypes.ShutdownChan), make(chan struct{})), + Override(JournalKey, modules.SetupJournal), // Filecoin modules diff --git a/node/modules/core.go b/node/modules/core.go index ca9872d90..84179bd63 100644 --- a/node/modules/core.go +++ b/node/modules/core.go @@ -6,6 +6,7 @@ import ( "errors" "io" "io/ioutil" + "path/filepath" "github.com/gbrlsnchs/jwt/v3" logging "github.com/ipfs/go-log/v2" @@ -18,6 +19,7 @@ import ( "github.com/filecoin-project/lotus/api/apistruct" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/journal" "github.com/filecoin-project/lotus/lib/addrutil" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/repo" @@ -93,3 +95,7 @@ func BuiltinBootstrap() (dtypes.BootstrapPeers, error) { func DrandBootstrap() (dtypes.DrandBootstrap, error) { return build.DrandBootstrap() } + +func SetupJournal(lr repo.LockedRepo) error { + return journal.InitializeSystemJournal(filepath.Join(lr.Path(), "journal")) +} From ccf64a853421085c890ef2794843bf3992404fae Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Tue, 23 Jun 2020 14:34:15 -0700 Subject: [PATCH 02/11] add the real code --- journal/journal.go | 142 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 142 insertions(+) create mode 100644 journal/journal.go diff --git a/journal/journal.go b/journal/journal.go new file mode 100644 index 000000000..faf0fff9d --- /dev/null +++ b/journal/journal.go @@ -0,0 +1,142 @@ +package journal + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "sync" + "time" + + logging "github.com/ipfs/go-log" + "golang.org/x/xerrors" +) + +func InitializeSystemJournal(dir string) error { + if err := os.MkdirAll(dir, 0755); err != nil { + return err + } + j, err := OpenFSJournal(dir) + if err != nil { + return err + } + currentJournal = j + return nil +} + +func Add(sys string, val interface{}) { + currentJournal.AddEntry(sys, val) +} + +var log = logging.Logger("journal") + +var currentJournal Journal + +type Journal interface { + AddEntry(system string, obj interface{}) + Close() error +} + +// fsJournal is a basic journal backed by files on a filesystem +type fsJournal struct { + fi *os.File + fSize int64 + + lk sync.Mutex + + journalDir string + + incoming chan *JournalEntry + journalSizeLimit int64 + + closing chan struct{} +} + +func OpenFSJournal(dir string) (*fsJournal, error) { + fsj := &fsJournal{ + journalDir: dir, + incoming: make(chan *JournalEntry, 32), + journalSizeLimit: 1 << 30, + closing: make(chan struct{}), + } + + if err := fsj.rollJournalFile(); err != nil { + return nil, err + } + + go fsj.runLoop() + + return fsj, nil +} + +type JournalEntry struct { + System string + Timestamp time.Time + Val interface{} +} + +func (fsj *fsJournal) putEntry(je *JournalEntry) error { + b, err := json.Marshal(je) + if err != nil { + return err + } + n, err := fsj.fi.Write(append(b, '\n')) + if err != nil { + return err + } + + fsj.fSize += int64(n) + + if fsj.fSize >= fsj.journalSizeLimit { + fsj.rollJournalFile() + } + + return nil +} + +func (fsj *fsJournal) rollJournalFile() error { + if fsj.fi != nil { + fsj.fi.Close() + } + + nfi, err := os.Create(filepath.Join(fsj.journalDir, fmt.Sprintf("lotus-journal-%d.ndjson", time.Now().Unix()))) + if err != nil { + return xerrors.Errorf("failed to open journal file: %w", err) + } + + fsj.fi = nfi + fsj.fSize = 0 + return nil +} + +func (fsj *fsJournal) runLoop() { + for { + select { + case je := <-fsj.incoming: + if err := fsj.putEntry(je); err != nil { + log.Errorw("failed to write out journal entry", "entry", je, "err", err) + } + case <-fsj.closing: + fsj.fi.Close() + return + } + } +} + +func (fsj *fsJournal) AddEntry(system string, obj interface{}) { + je := &JournalEntry{ + System: system, + Timestamp: time.Now(), + Val: obj, + } + select { + case fsj.incoming <- je: + case <-fsj.closing: + log.Warnw("journal closed but tried to log event", "entry", je) + } +} + +func (fsj *fsJournal) Close() error { + close(fsj.closing) + return nil +} From 18f7b9bb1dfeaaad1842cb74484d7f01fbe9a3ae Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Mon, 6 Jul 2020 09:43:19 -0700 Subject: [PATCH 03/11] use nicer timestamps --- journal/journal.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/journal/journal.go b/journal/journal.go index faf0fff9d..fb0ea52f6 100644 --- a/journal/journal.go +++ b/journal/journal.go @@ -99,7 +99,7 @@ func (fsj *fsJournal) rollJournalFile() error { fsj.fi.Close() } - nfi, err := os.Create(filepath.Join(fsj.journalDir, fmt.Sprintf("lotus-journal-%d.ndjson", time.Now().Unix()))) + nfi, err := os.Create(filepath.Join(fsj.journalDir, fmt.Sprintf("lotus-journal-%s.ndjson", time.Now().Format(time.RFC3339)))) if err != nil { return xerrors.Errorf("failed to open journal file: %w", err) } From 3c6e46cd7055a446b48cb657707795fc6460c61c Mon Sep 17 00:00:00 2001 From: frrist Date: Wed, 1 Jul 2020 18:20:54 -0700 Subject: [PATCH 04/11] feat: add miner sector predicate and test -polish: OnActorStateChanged operate over TipSetKey - it was calling key() interally and tipsetkeys are cheaper to get than the full tipset -polish: improve predicate method names --- chain/events/state/predicates.go | 151 +++++++++++++++++++-- chain/events/state/predicates_test.go | 182 +++++++++++++++++++++++--- 2 files changed, 304 insertions(+), 29 deletions(-) diff --git a/chain/events/state/predicates.go b/chain/events/state/predicates.go index 3245d5c03..7fecaf15a 100644 --- a/chain/events/state/predicates.go +++ b/chain/events/state/predicates.go @@ -3,18 +3,23 @@ package state import ( "context" + "github.com/ipfs/go-cid" + cbor "github.com/ipfs/go-ipld-cbor" + "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-amt-ipld/v2" - "github.com/filecoin-project/lotus/api/apibstore" - "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/builtin" "github.com/filecoin-project/specs-actors/actors/builtin/market" - "github.com/ipfs/go-cid" - cbor "github.com/ipfs/go-ipld-cbor" + "github.com/filecoin-project/specs-actors/actors/builtin/miner" + "github.com/filecoin-project/specs-actors/actors/util/adt" + + "github.com/filecoin-project/lotus/api/apibstore" + "github.com/filecoin-project/lotus/chain/types" ) -// UserData is the data returned from the DiffFunc +// UserData is the data returned from the DiffTipSetKeyFunc type UserData interface{} // ChainAPI abstracts out calls made by this class to external APIs @@ -36,22 +41,22 @@ func NewStatePredicates(api ChainAPI) *StatePredicates { } } -// DiffFunc check if there's a change form oldState to newState, and returns +// DiffTipSetKeyFunc check if there's a change form oldState to newState, and returns // - changed: was there a change // - user: user-defined data representing the state change // - err -type DiffFunc func(ctx context.Context, oldState, newState *types.TipSet) (changed bool, user UserData, err error) +type DiffTipSetKeyFunc func(ctx context.Context, oldState, newState types.TipSetKey) (changed bool, user UserData, err error) -type DiffStateFunc func(ctx context.Context, oldActorStateHead, newActorStateHead cid.Cid) (changed bool, user UserData, err error) +type DiffActorStateFunc func(ctx context.Context, oldActorStateHead, newActorStateHead cid.Cid) (changed bool, user UserData, err error) // OnActorStateChanged calls diffStateFunc when the state changes for the given actor -func (sp *StatePredicates) OnActorStateChanged(addr address.Address, diffStateFunc DiffStateFunc) DiffFunc { - return func(ctx context.Context, oldState, newState *types.TipSet) (changed bool, user UserData, err error) { - oldActor, err := sp.api.StateGetActor(ctx, addr, oldState.Key()) +func (sp *StatePredicates) OnActorStateChanged(addr address.Address, diffStateFunc DiffActorStateFunc) DiffTipSetKeyFunc { + return func(ctx context.Context, oldState, newState types.TipSetKey) (changed bool, user UserData, err error) { + oldActor, err := sp.api.StateGetActor(ctx, addr, oldState) if err != nil { return false, nil, err } - newActor, err := sp.api.StateGetActor(ctx, addr, newState.Key()) + newActor, err := sp.api.StateGetActor(ctx, addr, newState) if err != nil { return false, nil, err } @@ -66,7 +71,7 @@ func (sp *StatePredicates) OnActorStateChanged(addr address.Address, diffStateFu type DiffStorageMarketStateFunc func(ctx context.Context, oldState *market.State, newState *market.State) (changed bool, user UserData, err error) // OnStorageMarketActorChanged calls diffStorageMarketState when the state changes for the market actor -func (sp *StatePredicates) OnStorageMarketActorChanged(diffStorageMarketState DiffStorageMarketStateFunc) DiffFunc { +func (sp *StatePredicates) OnStorageMarketActorChanged(diffStorageMarketState DiffStorageMarketStateFunc) DiffTipSetKeyFunc { return sp.OnActorStateChanged(builtin.StorageMarketActorAddr, func(ctx context.Context, oldActorStateHead, newActorStateHead cid.Cid) (changed bool, user UserData, err error) { var oldState market.State if err := sp.cst.Get(ctx, oldActorStateHead, &oldState); err != nil { @@ -135,3 +140,123 @@ func (sp *StatePredicates) DealStateChangedForIDs(dealIds []abi.DealID) DiffDeal return false, nil, nil } } + +type DiffMinerActorStateFunc func(ctx context.Context, oldState *miner.State, newState *miner.State) (changed bool, user UserData, err error) + +func (sp *StatePredicates) OnMinerActorChange(minerAddr address.Address, diffMinerActorState DiffMinerActorStateFunc) DiffTipSetKeyFunc { + return sp.OnActorStateChanged(minerAddr, func(ctx context.Context, oldActorStateHead, newActorStateHead cid.Cid) (changed bool, user UserData, err error) { + var oldState miner.State + if err := sp.cst.Get(ctx, oldActorStateHead, &oldState); err != nil { + return false, nil, err + } + var newState miner.State + if err := sp.cst.Get(ctx, newActorStateHead, &newState); err != nil { + return false, nil, err + } + return diffMinerActorState(ctx, &oldState, &newState) + }) +} + +type MinerSectorChanges struct { + Added []miner.SectorOnChainInfo + Extended []SectorExtensions + Removed []miner.SectorOnChainInfo +} + +type SectorExtensions struct { + From miner.SectorOnChainInfo + To miner.SectorOnChainInfo +} + +func (sp *StatePredicates) OnMinerSectorChange() DiffMinerActorStateFunc { + return func(ctx context.Context, oldState, newState *miner.State) (changed bool, user UserData, err error) { + ctxStore := &contextStore{ + ctx: context.TODO(), + cst: sp.cst, + } + + sectorChanges := &MinerSectorChanges{ + Added: []miner.SectorOnChainInfo{}, + Extended: []SectorExtensions{}, + Removed: []miner.SectorOnChainInfo{}, + } + + // no sector changes + if oldState.Sectors.Equals(newState.Sectors) { + return false, nil, nil + } + + oldSectors, err := adt.AsArray(ctxStore, oldState.Sectors) + if err != nil { + return false, nil, err + } + + newSectors, err := adt.AsArray(ctxStore, newState.Sectors) + if err != nil { + return false, nil, err + } + + var osi miner.SectorOnChainInfo + + // find all sectors that were extended or removed + if err := oldSectors.ForEach(&osi, func(i int64) error { + var nsi miner.SectorOnChainInfo + found, err := newSectors.Get(uint64(osi.Info.SectorNumber), &nsi) + if err != nil { + return err + } + if !found { + sectorChanges.Removed = append(sectorChanges.Removed, osi) + return nil + } + + if nsi.Info.Expiration != osi.Info.Expiration { + sectorChanges.Extended = append(sectorChanges.Extended, SectorExtensions{ + From: osi, + To: nsi, + }) + } + + // we don't update miners state filed with `newSectors.Root()` so this operation is safe. + if err := newSectors.Delete(uint64(osi.Info.SectorNumber)); err != nil { + return err + } + return nil + }); err != nil { + return false, nil, err + } + + // all sectors that remain in newSectors are new + var nsi miner.SectorOnChainInfo + if err := newSectors.ForEach(&nsi, func(i int64) error { + sectorChanges.Added = append(sectorChanges.Added, nsi) + return nil + }); err != nil { + return false, nil, err + } + + // nothing changed + if len(sectorChanges.Added)+len(sectorChanges.Extended)+len(sectorChanges.Removed) == 0 { + return false, nil, nil + } + + return true, sectorChanges, nil + } +} + +type contextStore struct { + ctx context.Context + cst *cbor.BasicIpldStore +} + +func (cs *contextStore) Context() context.Context { + return cs.ctx +} + +func (cs *contextStore) Get(ctx context.Context, c cid.Cid, out interface{}) error { + return cs.cst.Get(ctx, c, out) +} + +func (cs *contextStore) Put(ctx context.Context, v interface{}) (cid.Cid, error) { + return cs.cst.Put(ctx, v) +} diff --git a/chain/events/state/predicates_test.go b/chain/events/state/predicates_test.go index 56387f8b5..7ab3dd074 100644 --- a/chain/events/state/predicates_test.go +++ b/chain/events/state/predicates_test.go @@ -4,23 +4,26 @@ import ( "context" "testing" - "github.com/filecoin-project/specs-actors/actors/crypto" - "github.com/ipfs/go-hamt-ipld" - - "github.com/filecoin-project/go-amt-ipld/v2" - "github.com/filecoin-project/specs-actors/actors/builtin/market" - ds "github.com/ipfs/go-datastore" - ds_sync "github.com/ipfs/go-datastore/sync" - bstore "github.com/ipfs/go-ipfs-blockstore" - cbornode "github.com/ipfs/go-ipld-cbor" + "github.com/stretchr/testify/require" "golang.org/x/xerrors" "github.com/ipfs/go-cid" - "github.com/stretchr/testify/require" + ds "github.com/ipfs/go-datastore" + ds_sync "github.com/ipfs/go-datastore/sync" + "github.com/ipfs/go-hamt-ipld" + bstore "github.com/ipfs/go-ipfs-blockstore" + cbornode "github.com/ipfs/go-ipld-cbor" "github.com/filecoin-project/go-address" - "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/go-amt-ipld/v2" "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/specs-actors/actors/abi/big" + "github.com/filecoin-project/specs-actors/actors/builtin/market" + "github.com/filecoin-project/specs-actors/actors/builtin/miner" + "github.com/filecoin-project/specs-actors/actors/crypto" + tutils "github.com/filecoin-project/specs-actors/support/testing" + + "github.com/filecoin-project/lotus/chain/types" ) var dummyCid cid.Cid @@ -112,12 +115,12 @@ func TestPredicates(t *testing.T) { diffFn := preds.OnStorageMarketActorChanged(preds.OnDealStateChanged(preds.DealStateChangedForIDs(dealIds))) // Diff a state against itself: expect no change - changed, _, err := diffFn(ctx, oldState, oldState) + changed, _, err := diffFn(ctx, oldState.Key(), oldState.Key()) require.NoError(t, err) require.False(t, changed) // Diff old state against new state - changed, val, err := diffFn(ctx, oldState, newState) + changed, val, err := diffFn(ctx, oldState.Key(), newState.Key()) require.NoError(t, err) require.True(t, changed) @@ -142,7 +145,7 @@ func TestPredicates(t *testing.T) { t.Fatal("No state change so this should not be called") return false, nil, nil }) - changed, _, err = actorDiffFn(ctx, oldState, oldState) + changed, _, err = actorDiffFn(ctx, oldState.Key(), oldState.Key()) require.NoError(t, err) require.False(t, changed) @@ -157,6 +160,87 @@ func TestPredicates(t *testing.T) { require.False(t, changed) } +func TestMinerSectorChange(t *testing.T) { + ctx := context.Background() + bs := bstore.NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore())) + store := cbornode.NewCborStore(bs) + + nextID := uint64(0) + nextIDAddrF := func() address.Address { + defer func() { nextID++ }() + return tutils.NewIDAddr(t, nextID) + } + + owner, worker := nextIDAddrF(), nextIDAddrF() + si0 := newSectorOnChainInfo(0, tutils.MakeCID("0"), big.NewInt(0), abi.ChainEpoch(0), abi.ChainEpoch(10)) + si1 := newSectorOnChainInfo(1, tutils.MakeCID("1"), big.NewInt(1), abi.ChainEpoch(1), abi.ChainEpoch(11)) + si2 := newSectorOnChainInfo(2, tutils.MakeCID("2"), big.NewInt(2), abi.ChainEpoch(2), abi.ChainEpoch(11)) + oldMinerC := createMinerState(ctx, t, store, owner, worker, []miner.SectorOnChainInfo{si0, si1, si2}) + + si3 := newSectorOnChainInfo(3, tutils.MakeCID("3"), big.NewInt(3), abi.ChainEpoch(3), abi.ChainEpoch(12)) + // 0 delete + // 1 extend + // 2 same + // 3 added + si1Ext := si1 + si1Ext.Info.Expiration++ + newMinerC := createMinerState(ctx, t, store, owner, worker, []miner.SectorOnChainInfo{si1Ext, si2, si3}) + + minerAddr := nextIDAddrF() + oldState, err := mockTipset(minerAddr, 1) + require.NoError(t, err) + newState, err := mockTipset(minerAddr, 2) + require.NoError(t, err) + + api := newMockAPI(bs) + api.setActor(oldState.Key(), &types.Actor{Head: oldMinerC}) + api.setActor(newState.Key(), &types.Actor{Head: newMinerC}) + + preds := NewStatePredicates(api) + + minerDiffFn := preds.OnMinerActorChange(minerAddr, preds.OnMinerSectorChange()) + change, val, err := minerDiffFn(ctx, oldState.Key(), newState.Key()) + require.NoError(t, err) + require.True(t, change) + require.NotNil(t, val) + + sectorChanges, ok := val.(*MinerSectorChanges) + require.True(t, ok) + + require.Equal(t, len(sectorChanges.Added), 1) + require.Equal(t, sectorChanges.Added[0], si3) + + require.Equal(t, len(sectorChanges.Removed), 1) + require.Equal(t, sectorChanges.Removed[0], si0) + + require.Equal(t, len(sectorChanges.Extended), 1) + require.Equal(t, sectorChanges.Extended[0].From, si1) + require.Equal(t, sectorChanges.Extended[0].To, si1Ext) + + change, val, err = minerDiffFn(ctx, oldState.Key(), oldState.Key()) + require.NoError(t, err) + require.False(t, change) + require.Nil(t, val) + + change, val, err = minerDiffFn(ctx, newState.Key(), oldState.Key()) + require.NoError(t, err) + require.True(t, change) + require.NotNil(t, val) + + sectorChanges, ok = val.(*MinerSectorChanges) + require.True(t, ok) + + require.Equal(t, len(sectorChanges.Added), 1) + require.Equal(t, sectorChanges.Added[0], si0) + + require.Equal(t, len(sectorChanges.Removed), 1) + require.Equal(t, sectorChanges.Removed[0], si3) + + require.Equal(t, len(sectorChanges.Extended), 1) + require.Equal(t, sectorChanges.Extended[0].To, si1) + require.Equal(t, sectorChanges.Extended[0].From, si1Ext) +} + func mockTipset(miner address.Address, timestamp uint64) (*types.TipSet, error) { return types.NewTipSet([]*types.BlockHeader{{ Miner: miner, @@ -171,7 +255,7 @@ func mockTipset(miner address.Address, timestamp uint64) (*types.TipSet, error) } func createMarketState(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, deals map[abi.DealID]*market.DealState) cid.Cid { - rootCid := createAMT(ctx, t, store, deals) + rootCid := createDealAMT(ctx, t, store, deals) state := createEmptyMarketState(t, store) state.States = rootCid @@ -189,7 +273,7 @@ func createEmptyMarketState(t *testing.T, store *cbornode.BasicIpldStore) *marke return market.ConstructState(emptyArrayCid, emptyMap, emptyMap) } -func createAMT(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, deals map[abi.DealID]*market.DealState) cid.Cid { +func createDealAMT(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, deals map[abi.DealID]*market.DealState) cid.Cid { root := amt.NewAMT(store) for dealID, dealState := range deals { err := root.Set(ctx, uint64(dealID), dealState) @@ -199,3 +283,69 @@ func createAMT(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore require.NoError(t, err) return rootCid } + +func createMinerState(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, owner, worker address.Address, sectors []miner.SectorOnChainInfo) cid.Cid { + rootCid := createSectorsAMT(ctx, t, store, sectors) + + state := createEmptyMinerState(ctx, t, store, owner, worker) + state.Sectors = rootCid + + stateC, err := store.Put(ctx, state) + require.NoError(t, err) + return stateC +} + +func createEmptyMinerState(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, owner, worker address.Address) *miner.State { + emptyArrayCid, err := amt.NewAMT(store).Flush(context.TODO()) + require.NoError(t, err) + emptyMap, err := store.Put(context.TODO(), hamt.NewNode(store, hamt.UseTreeBitWidth(5))) + require.NoError(t, err) + + emptyDeadlines := miner.ConstructDeadlines() + emptyDeadlinesCid, err := store.Put(context.Background(), emptyDeadlines) + require.NoError(t, err) + + state, err := miner.ConstructState(emptyArrayCid, emptyMap, emptyDeadlinesCid, owner, worker, abi.PeerID{'1'}, nil, abi.RegisteredSealProof_StackedDrg64GiBV1, 0) + require.NoError(t, err) + return state + +} + +func createSectorsAMT(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, sectors []miner.SectorOnChainInfo) cid.Cid { + root := amt.NewAMT(store) + for _, sector := range sectors { + sector := sector + err := root.Set(ctx, uint64(sector.Info.SectorNumber), §or) + require.NoError(t, err) + } + rootCid, err := root.Flush(ctx) + require.NoError(t, err) + return rootCid +} + +// returns a unique SectorOnChainInfo with each invocation with SectorNumber set to `sectorNo`. +func newSectorOnChainInfo(sectorNo abi.SectorNumber, sealed cid.Cid, weight big.Int, activation, expiration abi.ChainEpoch) miner.SectorOnChainInfo { + info := newSectorPreCommitInfo(sectorNo, sealed, expiration) + return miner.SectorOnChainInfo{ + Info: *info, + ActivationEpoch: activation, + DealWeight: weight, + VerifiedDealWeight: weight, + } +} + +const ( + sectorSealRandEpochValue = abi.ChainEpoch(1) +) + +// returns a unique SectorPreCommitInfo with each invocation with SectorNumber set to `sectorNo`. +func newSectorPreCommitInfo(sectorNo abi.SectorNumber, sealed cid.Cid, expiration abi.ChainEpoch) *miner.SectorPreCommitInfo { + return &miner.SectorPreCommitInfo{ + SealProof: abi.RegisteredSealProof_StackedDrg32GiBV1, + SectorNumber: sectorNo, + SealedCID: sealed, + SealRandEpoch: sectorSealRandEpochValue, + DealIDs: nil, + Expiration: expiration, + } +} From e1c29ca4698735c913f2f244350bdb230ec4dc94 Mon Sep 17 00:00:00 2001 From: frrist Date: Wed, 1 Jul 2020 21:08:45 -0700 Subject: [PATCH 05/11] polish: wire up miner state predicate --- cmd/lotus-chainwatch/storage.go | 282 +++++++++++++++++--------------- cmd/lotus-chainwatch/sync.go | 37 +++-- 2 files changed, 177 insertions(+), 142 deletions(-) diff --git a/cmd/lotus-chainwatch/storage.go b/cmd/lotus-chainwatch/storage.go index e68c586b5..db2b5abfd 100644 --- a/cmd/lotus-chainwatch/storage.go +++ b/cmd/lotus-chainwatch/storage.go @@ -3,20 +3,18 @@ package main import ( "context" "database/sql" - "fmt" - "github.com/filecoin-project/specs-actors/actors/util/adt" - "github.com/libp2p/go-libp2p-core/peer" "sync" "time" "github.com/filecoin-project/go-address" "github.com/filecoin-project/specs-actors/actors/abi" - miner_spec "github.com/filecoin-project/specs-actors/actors/builtin/miner" "github.com/ipfs/go-cid" _ "github.com/lib/pq" + "github.com/libp2p/go-libp2p-core/peer" "golang.org/x/xerrors" "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/events/state" "github.com/filecoin-project/lotus/chain/types" ) @@ -25,8 +23,7 @@ type storage struct { headerLk sync.Mutex - // stateful miner data - minerSectors map[cid.Cid]struct{} + genesisTs *types.TipSet } func openStorage(dbSource string) (*storage, error) { @@ -37,10 +34,7 @@ func openStorage(dbSource string) (*storage, error) { db.SetMaxOpenConns(1350) - ms := make(map[cid.Cid]struct{}) - ms[cid.Undef] = struct{}{} - - st := &storage{db: db, minerSectors: ms} + st := &storage{db: db} return st, st.setup() } @@ -313,6 +307,19 @@ create table if not exists miner_sectors_heads ); +create type miner_sector_event_type as enum ('ADDED', 'EXTENDED', 'EXPIRED', 'TERMINATED'); + +create table if not exists miner_sector_events +( + miner_id text not null, + sector_id bigint not null, + state_root text not null, + event miner_sector_event_type not null, + + constraint miner_sector_events_pk + primary key (sector_id, event, miner_id, state_root) +) + /* create or replace function miner_tips(epoch bigint) returns table (head text, @@ -600,12 +607,6 @@ func (st *storage) storeMiners(minerTips map[types.TipSetKey][]*minerStateInfo) return tx.Commit() } -type minerSectorUpdate struct { - minerState *minerStateInfo - tskey types.TipSetKey - oldSector cid.Cid -} - func (st *storage) storeMinerSectorsHeads(minerTips map[types.TipSetKey][]*minerStateInfo, api api.FullNode) error { tx, err := st.db.Begin() if err != nil { @@ -621,26 +622,8 @@ func (st *storage) storeMinerSectorsHeads(minerTips map[types.TipSetKey][]*miner return err } - var updateMiners []*minerSectorUpdate - for tsk, miners := range minerTips { + for _, miners := range minerTips { for _, miner := range miners { - sectorCID, err := st.getLatestMinerSectorCID(context.TODO(), miner.addr) - if err != nil { - panic(err) - } - if sectorCID == cid.Undef { - continue - } - if _, found := st.minerSectors[sectorCID]; !found { - // schedule miner table update - updateMiners = append(updateMiners, &minerSectorUpdate{ - minerState: miner, - tskey: tsk, - oldSector: sectorCID, - }) - } - st.minerSectors[sectorCID] = struct{}{} - log.Debugw("got sector CID", "miner", miner.addr, "cid", sectorCID.String()) if _, err := stmt.Exec( miner.addr.String(), miner.state.Sectors.String(), @@ -660,94 +643,153 @@ func (st *storage) storeMinerSectorsHeads(minerTips map[types.TipSetKey][]*miner return xerrors.Errorf("actor put: %w", err) } - if err := tx.Commit(); err != nil { + return tx.Commit() +} + +type sectorUpdate struct { + terminationEpoch abi.ChainEpoch + terminated bool + + expirationEpoch abi.ChainEpoch + + sectorID abi.SectorNumber + minerID address.Address +} + +func (st *storage) updateMinerSectors(minerTips map[types.TipSetKey][]*minerStateInfo, api api.FullNode) error { + log.Debugw("updating miners constant sector table", "#tipsets", len(minerTips)) + pred := state.NewStatePredicates(api) + + eventTx, err := st.db.Begin() + if err != nil { return err } - return st.updateMinerSectors(updateMiners, api) -} -type deletedSector struct { - deletedSector miner_spec.SectorOnChainInfo - miner address.Address - tskey types.TipSetKey -} + if _, err := eventTx.Exec(`create temp table mse (like miner_sector_events excluding constraints) on commit drop;`); err != nil { + return xerrors.Errorf("prep temp: %w", err) + } -func (st *storage) updateMinerSectors(miners []*minerSectorUpdate, api api.FullNode) error { - log.Info("updating miners constant sector table") - var deletedSectors []*deletedSector - for _, miner := range miners { - s := &apiIpldStore{context.TODO(), api} - newSectors, err := adt.AsArray(s, miner.minerState.state.Sectors) - if err != nil { - log.Warnw("new sectors as array", "error", err, "cid", miner.minerState.state.Sectors) - return err + eventStmt, err := eventTx.Prepare(`copy mse (sector_id, event, miner_id, state_root) from STDIN `) + if err != nil { + return err + } + + var sectorUpdates []sectorUpdate + // TODO consider performing the miner sector diffing in parallel and performing the database update after. + for _, miners := range minerTips { + for _, miner := range miners { + // special case genesis miners + if miner.tsKey == st.genesisTs.Key() { + sectors, err := api.StateMinerSectors(context.TODO(), miner.addr, nil, true, miner.tsKey) + if err != nil { + log.Debugw("failed to get miner info for genesis", "miner", miner.addr.String()) + continue + } + + for _, sector := range sectors { + if _, err := eventStmt.Exec(sector.Info.Info.SectorNumber, "ADDED", miner.addr.String(), miner.stateroot.String()); err != nil { + return err + } + } + } else { + sectorDiffFn := pred.OnMinerActorChange(miner.addr, pred.OnMinerSectorChange()) + changed, val, err := sectorDiffFn(context.TODO(), miner.parentTsKey, miner.tsKey) + if err != nil { + log.Debugw("error getting miner sector diff", "miner", miner.addr, "error", err) + continue + } + if !changed { + continue + } + changes := val.(*state.MinerSectorChanges) + log.Debugw("sector changes for miner", "miner", miner.addr.String(), "Added", len(changes.Added), "Extended", len(changes.Extended), "Removed", len(changes.Removed), "oldState", miner.parentTsKey, "newState", miner.tsKey) + + for _, extended := range changes.Extended { + if _, err := eventStmt.Exec(extended.To.Info.SectorNumber, "EXTENDED", miner.addr.String(), miner.stateroot.String()); err != nil { + return err + } + sectorUpdates = append(sectorUpdates, sectorUpdate{ + terminationEpoch: 0, + terminated: false, + expirationEpoch: extended.To.Info.Expiration, + sectorID: extended.To.Info.SectorNumber, + minerID: miner.addr, + }) + log.Debugw("sector extended", "miner", miner.addr.String(), "sector", extended.To.Info.SectorNumber, "old", extended.To.Info.Expiration, "new", extended.From.Info.Expiration) + } + curTs, err := api.ChainGetTipSet(context.TODO(), miner.tsKey) + if err != nil { + return err + } + + for _, removed := range changes.Removed { + // decide if they were terminated or extended + if removed.Info.Expiration > curTs.Height() { + if _, err := eventStmt.Exec(removed.Info.SectorNumber, "TERMINATED", miner.addr.String(), miner.stateroot.String()); err != nil { + return err + } + log.Debugw("sector terminated", "miner", miner.addr.String(), "sector", removed.Info.SectorNumber, "old", "sectorExpiration", removed.Info.Expiration, "terminationEpoch", curTs.Height()) + sectorUpdates = append(sectorUpdates, sectorUpdate{ + terminationEpoch: curTs.Height(), + terminated: true, + expirationEpoch: removed.Info.Expiration, + sectorID: removed.Info.SectorNumber, + minerID: miner.addr, + }) + } + if _, err := eventStmt.Exec(removed.Info.SectorNumber, "EXPIRED", miner.addr.String(), miner.stateroot.String()); err != nil { + return err + } + log.Debugw("sector removed", "miner", miner.addr.String(), "sector", removed.Info.SectorNumber, "old", "sectorExpiration", removed.Info.Expiration, "currEpoch", curTs.Height()) + } + + for _, added := range changes.Added { + if _, err := eventStmt.Exec(miner.addr.String(), added.Info.SectorNumber, miner.stateroot.String(), "ADDED"); err != nil { + return err + } + } + } } + } + if err := eventStmt.Close(); err != nil { + return err + } - oldSectors, err := adt.AsArray(s, miner.oldSector) - if err != nil { - log.Warnw("old sectors as array", "error", err, "cid", miner.oldSector.String()) - return err - } + if _, err := eventTx.Exec(`insert into miner_sector_events select * from mse on conflict do nothing `); err != nil { + return xerrors.Errorf("actor put: %w", err) + } - var oldSecInfo miner_spec.SectorOnChainInfo - var newSecInfo miner_spec.SectorOnChainInfo - // if we cannot find an old sector in the new list then it was removed. - if err := oldSectors.ForEach(&oldSecInfo, func(i int64) error { - found, err := newSectors.Get(uint64(oldSecInfo.Info.SectorNumber), &newSecInfo) - if err != nil { - log.Warnw("new sectors get", "error", err) + if err := eventTx.Commit(); err != nil { + return err + } + + updateTx, err := st.db.Begin() + if err != nil { + return err + } + + updateStmt, err := updateTx.Prepare(`UPDATE miner_sectors SET termination_epoch=$1, expiration_epoch=$2 WHERE miner_id=$3 AND sector_id=$4`) + if err != nil { + return err + } + + for _, update := range sectorUpdates { + if update.terminated { + if _, err := updateStmt.Exec(update.terminationEpoch, update.expirationEpoch, update.minerID.String(), update.sectorID); err != nil { return err } - if !found { - log.Infow("MINER DELETED SECTOR", "miner", miner.minerState.addr.String(), "sector", oldSecInfo.Info.SectorNumber, "tipset", miner.tskey.String()) - deletedSectors = append(deletedSectors, &deletedSector{ - deletedSector: oldSecInfo, - miner: miner.minerState.addr, - tskey: miner.tskey, - }) + } else { + if _, err := updateStmt.Exec(nil, update.expirationEpoch, update.minerID.String(), update.sectorID); err != nil { + return err } - return nil - }); err != nil { - log.Warnw("old sectors foreach", "error", err) - return err - } - if len(deletedSectors) > 0 { - log.Infow("Calculated updates", "miner", miner.minerState.addr, "deleted sectors", len(deletedSectors)) - } - } - // now we have all the sectors that were removed, update the database - tx, err := st.db.Begin() - if err != nil { - return err - } - stmt, err := tx.Prepare(`UPDATE miner_sectors SET termination_epoch=$1 WHERE miner_id=$2 AND sector_id=$3`) - if err != nil { - return err - } - for _, ds := range deletedSectors { - ts, err := api.ChainGetTipSet(context.TODO(), ds.tskey) - if err != nil { - log.Warnw("get tipset", "error", err) - return err - } - // TODO validate this shits right - if ts.Height() >= ds.deletedSector.Info.Expiration { - // means it expired, do nothing - log.Infow("expired sector", "miner", ds.miner.String(), "sector", ds.deletedSector.Info.SectorNumber) - continue - } - log.Infow("terminated sector", "miner", ds.miner.String(), "sector", ds.deletedSector.Info.SectorNumber) - // means it was terminated. - if _, err := stmt.Exec(int64(ts.Height()), ds.miner.String(), int64(ds.deletedSector.Info.SectorNumber)); err != nil { - return err } } - if err := stmt.Close(); err != nil { + if err := updateStmt.Close(); err != nil { return err } - defer log.Info("update miner sectors complete") - return tx.Commit() + + return updateTx.Commit() } func (st *storage) storeHeaders(bhs map[cid.Cid]*types.BlockHeader, sync bool) error { @@ -1252,27 +1294,3 @@ func (st *storage) refreshViews() error { func (st *storage) close() error { return st.db.Close() } - -func (st *storage) getLatestMinerSectorCID(ctx context.Context, miner address.Address) (cid.Cid, error) { - queryStr := fmt.Sprintf(` -SELECT miner_sectors_cid -FROM miner_sectors_heads -LEFT JOIN blocks ON miner_sectors_heads.state_root = blocks.parentstateroot -WHERE miner_id = '%s' -ORDER BY blocks.height DESC -LIMIT 1; -`, - miner.String()) - - var cidstr string - err := st.db.QueryRowContext(ctx, queryStr).Scan(&cidstr) - switch { - case err == sql.ErrNoRows: - log.Warnf("no miner with miner_id: %s in table", miner) - return cid.Undef, nil - case err != nil: - return cid.Undef, err - default: - return cid.Decode(cidstr) - } -} diff --git a/cmd/lotus-chainwatch/sync.go b/cmd/lotus-chainwatch/sync.go index 59e77e4a0..bbefb3e2c 100644 --- a/cmd/lotus-chainwatch/sync.go +++ b/cmd/lotus-chainwatch/sync.go @@ -59,6 +59,10 @@ type minerStateInfo struct { act types.Actor stateroot cid.Cid + // calculating changes + tsKey types.TipSetKey + parentTsKey types.TipSetKey + // miner specific state miner.State info miner.MinerInfo @@ -71,9 +75,10 @@ type minerStateInfo struct { } type actorInfo struct { - stateroot cid.Cid - tsKey types.TipSetKey - state string + stateroot cid.Cid + tsKey types.TipSetKey + parentTsKey types.TipSetKey + state string } func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.TipSet, maxBatch int) { @@ -169,6 +174,8 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. if len(bh.Parents) == 0 { // genesis case genesisTs, _ := types.NewTipSet([]*types.BlockHeader{bh}) + st.genesisTs = genesisTs + aadrs, err := api.StateListActors(ctx, genesisTs.Key()) if err != nil { log.Error(err) @@ -201,9 +208,10 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. actors[addr] = map[types.Actor]actorInfo{} } actors[addr][*act] = actorInfo{ - stateroot: bh.ParentStateRoot, - tsKey: genesisTs.Key(), - state: string(state), + stateroot: bh.ParentStateRoot, + tsKey: genesisTs.Key(), + parentTsKey: genesisTs.Key(), + state: string(state), } addressToID[addr] = address.Undef alk.Unlock() @@ -237,7 +245,6 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. } ast, err := api.StateReadState(ctx, addr, pts.Key()) - if err != nil { log.Error(err) return @@ -256,9 +263,10 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. } // a change occurred for the actor with address `addr` and state `act` at tipset `pts`. actors[addr][act] = actorInfo{ - stateroot: bh.ParentStateRoot, - state: string(state), - tsKey: pts.Key(), + stateroot: bh.ParentStateRoot, + state: string(state), + tsKey: pts.Key(), + parentTsKey: pts.Parents(), } addressToID[addr] = address.Undef alk.Unlock() @@ -314,6 +322,9 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. act: actor, stateroot: c.stateroot, + tsKey: c.tsKey, + parentTsKey: c.parentTsKey, + state: miner.State{}, info: miner.MinerInfo{}, @@ -426,6 +437,12 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. return } + log.Info("updating miner sectors heads") + if err := st.updateMinerSectors(minerTips, api); err != nil { + log.Error(err) + return + } + log.Infof("Storing messages") if err := st.storeMessages(msgs); err != nil { From 04bd56a068dbef20ea582d252246c1b1362d3265 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Mon, 6 Jul 2020 15:37:05 -0700 Subject: [PATCH 06/11] log message if journal is not configured --- journal/journal.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/journal/journal.go b/journal/journal.go index fb0ea52f6..1ba0708ce 100644 --- a/journal/journal.go +++ b/journal/journal.go @@ -25,6 +25,9 @@ func InitializeSystemJournal(dir string) error { } func Add(sys string, val interface{}) { + if currentJournal == nil { + log.Warn("no journal configured") + } currentJournal.AddEntry(sys, val) } From 74aa3e05fabd0dc6b48a5f187e94c6cb482af519 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 7 Jul 2020 02:28:59 +0200 Subject: [PATCH 07/11] journal: Fix panic with no journal set --- journal/journal.go | 1 + 1 file changed, 1 insertion(+) diff --git a/journal/journal.go b/journal/journal.go index 1ba0708ce..b664e8fa7 100644 --- a/journal/journal.go +++ b/journal/journal.go @@ -27,6 +27,7 @@ func InitializeSystemJournal(dir string) error { func Add(sys string, val interface{}) { if currentJournal == nil { log.Warn("no journal configured") + return } currentJournal.AddEntry(sys, val) } From eac8a909e90df0ab27707caaed3f750d3286d646 Mon Sep 17 00:00:00 2001 From: Aayush Rajasekaran Date: Tue, 7 Jul 2020 17:37:43 -0400 Subject: [PATCH 08/11] Unify all sender flags in CLI under the same name --- cli/client.go | 10 +++++----- cli/multisig.go | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/cli/client.go b/cli/client.go index a2f89f0d9..ebcf9a56b 100644 --- a/cli/client.go +++ b/cli/client.go @@ -385,8 +385,8 @@ var clientRetrieveCmd = &cli.Command{ ArgsUsage: "[dataCid outputPath]", Flags: []cli.Flag{ &cli.StringFlag{ - Name: "address", - Usage: "address to use for transactions", + Name: "from", + Usage: "address to send transactions from", }, &cli.BoolFlag{ Name: "car", @@ -411,8 +411,8 @@ var clientRetrieveCmd = &cli.Command{ ctx := ReqContext(cctx) var payer address.Address - if cctx.String("address") != "" { - payer, err = address.NewFromString(cctx.String("address")) + if cctx.String("from") != "" { + payer, err = address.NewFromString(cctx.String("from")) } else { payer, err = fapi.WalletDefaultAddress(ctx) } @@ -498,7 +498,7 @@ var clientQueryAskCmd = &cli.Command{ }, Action: func(cctx *cli.Context) error { if cctx.NArg() != 1 { - fmt.Println("Usage: query-ask [address]") + fmt.Println("Usage: query-ask [minerAddress]") return nil } diff --git a/cli/multisig.go b/cli/multisig.go index 15910b099..9de881d95 100644 --- a/cli/multisig.go +++ b/cli/multisig.go @@ -58,7 +58,7 @@ var msigCreateCmd = &cli.Command{ Value: "0", }, &cli.StringFlag{ - Name: "sender", + Name: "from", Usage: "account to send the create message from", }, }, @@ -85,7 +85,7 @@ var msigCreateCmd = &cli.Command{ // get the address we're going to use to create the multisig (can be one of the above, as long as they have funds) var sendAddr address.Address - if send := cctx.String("sender"); send == "" { + if send := cctx.String("from"); send == "" { defaddr, err := api.WalletDefaultAddress(ctx) if err != nil { return err From 20c80f977cb0f3d0675e56c5dbca1cb5cbbf2440 Mon Sep 17 00:00:00 2001 From: Aayush Rajasekaran Date: Tue, 7 Jul 2020 17:40:57 -0400 Subject: [PATCH 09/11] Rename state compute's height flag to vm-height --- cli/state.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cli/state.go b/cli/state.go index dfe0118ca..da4a201d5 100644 --- a/cli/state.go +++ b/cli/state.go @@ -790,8 +790,8 @@ var stateComputeStateCmd = &cli.Command{ Usage: "Perform state computations", Flags: []cli.Flag{ &cli.Uint64Flag{ - Name: "height", - Usage: "set the height to compute state at", + Name: "vm-height", + Usage: "set the height that the vm will see", }, &cli.BoolFlag{ Name: "apply-mpool-messages", @@ -820,7 +820,7 @@ var stateComputeStateCmd = &cli.Command{ return err } - h := abi.ChainEpoch(cctx.Uint64("height")) + h := abi.ChainEpoch(cctx.Uint64("vm-height")) if h == 0 { if ts == nil { head, err := api.ChainHead(ctx) From 74db060cdb3b968f030b392452bf8e050949434a Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Wed, 8 Jul 2020 03:55:56 +0200 Subject: [PATCH 10/11] Fix bad block rason if ValidateBlock fails Signed-off-by: Jakub Sztandera --- chain/badtscache.go | 2 +- chain/sync.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/chain/badtscache.go b/chain/badtscache.go index 82020565a..103237307 100644 --- a/chain/badtscache.go +++ b/chain/badtscache.go @@ -30,7 +30,7 @@ func (bbr BadBlockReason) Linked(reason string, i ...interface{}) BadBlockReason if bbr.OriginalReason != nil { or = bbr.OriginalReason } - return BadBlockReason{Reason: reason, OriginalReason: or} + return BadBlockReason{Reason: fmt.Sprintf(reason, i...), OriginalReason: or} } func (bbr BadBlockReason) String() string { diff --git a/chain/sync.go b/chain/sync.go index 53d2e1bd3..ba4b73567 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -536,7 +536,7 @@ func (syncer *Syncer) ValidateTipSet(ctx context.Context, fts *store.FullTipSet) futures = append(futures, async.Err(func() error { if err := syncer.ValidateBlock(ctx, b); err != nil { if isPermanent(err) { - syncer.bad.Add(b.Cid(), BadBlockReason{Reason: err.Error()}) + syncer.bad.Add(b.Cid(), NewBadBlockReason([]cid.Cid{b.Cid()}, err.Error())) } return xerrors.Errorf("validating block %s: %w", b.Cid(), err) } From d16de562805a57ef0a254254f7ec93822617456d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 8 Jul 2020 14:35:53 +0200 Subject: [PATCH 11/11] gofmt --- chain/events/state/predicates_test.go | 10 +++++----- storage/wdpost_run.go | 8 ++++---- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/chain/events/state/predicates_test.go b/chain/events/state/predicates_test.go index 0681fcb8a..1573b84d8 100644 --- a/chain/events/state/predicates_test.go +++ b/chain/events/state/predicates_test.go @@ -328,11 +328,11 @@ func createSectorsAMT(ctx context.Context, t *testing.T, store *cbornode.BasicIp func newSectorOnChainInfo(sectorNo abi.SectorNumber, sealed cid.Cid, weight big.Int, activation, expiration abi.ChainEpoch) miner.SectorOnChainInfo { info := newSectorPreCommitInfo(sectorNo, sealed, expiration) return miner.SectorOnChainInfo{ - SectorNumber: info.SectorNumber, - SealProof: info.SealProof, - SealedCID: info.SealedCID, - DealIDs: info.DealIDs, - Expiration: info.Expiration, + SectorNumber: info.SectorNumber, + SealProof: info.SealProof, + SealedCID: info.SealedCID, + DealIDs: info.DealIDs, + Expiration: info.Expiration, Activation: activation, DealWeight: weight, diff --git a/storage/wdpost_run.go b/storage/wdpost_run.go index 5ee74915f..f6ec64583 100644 --- a/storage/wdpost_run.go +++ b/storage/wdpost_run.go @@ -150,16 +150,16 @@ func (s *WindowPoStScheduler) checkNextRecoveries(ctx context.Context, deadline if err != nil { return xerrors.Errorf("checking unrecovered sectors: %w", err) } - + // if all sectors failed to recover, don't declare recoveries sbfCount, err := sbf.Count() if err != nil { return xerrors.Errorf("counting recovered sectors: %w", err) } - + if sbfCount == 0 { - log.Warnw("No recoveries to declare", "deadline", deadline, "faulty", uc) - return nil + log.Warnw("No recoveries to declare", "deadline", deadline, "faulty", uc) + return nil } params := &miner.DeclareFaultsRecoveredParams{