diff --git a/chain/badtscache.go b/chain/badtscache.go index 82020565a..103237307 100644 --- a/chain/badtscache.go +++ b/chain/badtscache.go @@ -30,7 +30,7 @@ func (bbr BadBlockReason) Linked(reason string, i ...interface{}) BadBlockReason if bbr.OriginalReason != nil { or = bbr.OriginalReason } - return BadBlockReason{Reason: reason, OriginalReason: or} + return BadBlockReason{Reason: fmt.Sprintf(reason, i...), OriginalReason: or} } func (bbr BadBlockReason) String() string { diff --git a/chain/events/state/predicates.go b/chain/events/state/predicates.go index 30699fa0b..c89bd49e0 100644 --- a/chain/events/state/predicates.go +++ b/chain/events/state/predicates.go @@ -2,18 +2,24 @@ package state import ( "context" + + "github.com/ipfs/go-cid" + cbor "github.com/ipfs/go-ipld-cbor" + "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-amt-ipld/v2" - "github.com/filecoin-project/lotus/api/apibstore" - "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/builtin" "github.com/filecoin-project/specs-actors/actors/builtin/market" - "github.com/ipfs/go-cid" - cbor "github.com/ipfs/go-ipld-cbor" + "github.com/filecoin-project/specs-actors/actors/builtin/miner" + "github.com/filecoin-project/specs-actors/actors/util/adt" + + "github.com/filecoin-project/lotus/api/apibstore" + "github.com/filecoin-project/lotus/chain/types" ) -// UserData is the data returned from the DiffFunc +// UserData is the data returned from the DiffTipSetKeyFunc type UserData interface{} // ChainAPI abstracts out calls made by this class to external APIs @@ -35,22 +41,22 @@ func NewStatePredicates(api ChainAPI) *StatePredicates { } } -// DiffFunc check if there's a change form oldState to newState, and returns +// DiffTipSetKeyFunc check if there's a change form oldState to newState, and returns // - changed: was there a change // - user: user-defined data representing the state change // - err -type DiffFunc func(ctx context.Context, oldState, newState *types.TipSet) (changed bool, user UserData, err error) +type DiffTipSetKeyFunc func(ctx context.Context, oldState, newState types.TipSetKey) (changed bool, user UserData, err error) -type DiffStateFunc func(ctx context.Context, oldActorStateHead, newActorStateHead cid.Cid) (changed bool, user UserData, err error) +type DiffActorStateFunc func(ctx context.Context, oldActorStateHead, newActorStateHead cid.Cid) (changed bool, user UserData, err error) // OnActorStateChanged calls diffStateFunc when the state changes for the given actor -func (sp *StatePredicates) OnActorStateChanged(addr address.Address, diffStateFunc DiffStateFunc) DiffFunc { - return func(ctx context.Context, oldState, newState *types.TipSet) (changed bool, user UserData, err error) { - oldActor, err := sp.api.StateGetActor(ctx, addr, oldState.Key()) +func (sp *StatePredicates) OnActorStateChanged(addr address.Address, diffStateFunc DiffActorStateFunc) DiffTipSetKeyFunc { + return func(ctx context.Context, oldState, newState types.TipSetKey) (changed bool, user UserData, err error) { + oldActor, err := sp.api.StateGetActor(ctx, addr, oldState) if err != nil { return false, nil, err } - newActor, err := sp.api.StateGetActor(ctx, addr, newState.Key()) + newActor, err := sp.api.StateGetActor(ctx, addr, newState) if err != nil { return false, nil, err } @@ -65,7 +71,7 @@ func (sp *StatePredicates) OnActorStateChanged(addr address.Address, diffStateFu type DiffStorageMarketStateFunc func(ctx context.Context, oldState *market.State, newState *market.State) (changed bool, user UserData, err error) // OnStorageMarketActorChanged calls diffStorageMarketState when the state changes for the market actor -func (sp *StatePredicates) OnStorageMarketActorChanged(diffStorageMarketState DiffStorageMarketStateFunc) DiffFunc { +func (sp *StatePredicates) OnStorageMarketActorChanged(diffStorageMarketState DiffStorageMarketStateFunc) DiffTipSetKeyFunc { return sp.OnActorStateChanged(builtin.StorageMarketActorAddr, func(ctx context.Context, oldActorStateHead, newActorStateHead cid.Cid) (changed bool, user UserData, err error) { var oldState market.State if err := sp.cst.Get(ctx, oldActorStateHead, &oldState); err != nil { @@ -142,3 +148,123 @@ func (sp *StatePredicates) DealStateChangedForIDs(dealIds []abi.DealID) DiffDeal return false, nil, nil } } + +type DiffMinerActorStateFunc func(ctx context.Context, oldState *miner.State, newState *miner.State) (changed bool, user UserData, err error) + +func (sp *StatePredicates) OnMinerActorChange(minerAddr address.Address, diffMinerActorState DiffMinerActorStateFunc) DiffTipSetKeyFunc { + return sp.OnActorStateChanged(minerAddr, func(ctx context.Context, oldActorStateHead, newActorStateHead cid.Cid) (changed bool, user UserData, err error) { + var oldState miner.State + if err := sp.cst.Get(ctx, oldActorStateHead, &oldState); err != nil { + return false, nil, err + } + var newState miner.State + if err := sp.cst.Get(ctx, newActorStateHead, &newState); err != nil { + return false, nil, err + } + return diffMinerActorState(ctx, &oldState, &newState) + }) +} + +type MinerSectorChanges struct { + Added []miner.SectorOnChainInfo + Extended []SectorExtensions + Removed []miner.SectorOnChainInfo +} + +type SectorExtensions struct { + From miner.SectorOnChainInfo + To miner.SectorOnChainInfo +} + +func (sp *StatePredicates) OnMinerSectorChange() DiffMinerActorStateFunc { + return func(ctx context.Context, oldState, newState *miner.State) (changed bool, user UserData, err error) { + ctxStore := &contextStore{ + ctx: context.TODO(), + cst: sp.cst, + } + + sectorChanges := &MinerSectorChanges{ + Added: []miner.SectorOnChainInfo{}, + Extended: []SectorExtensions{}, + Removed: []miner.SectorOnChainInfo{}, + } + + // no sector changes + if oldState.Sectors.Equals(newState.Sectors) { + return false, nil, nil + } + + oldSectors, err := adt.AsArray(ctxStore, oldState.Sectors) + if err != nil { + return false, nil, err + } + + newSectors, err := adt.AsArray(ctxStore, newState.Sectors) + if err != nil { + return false, nil, err + } + + var osi miner.SectorOnChainInfo + + // find all sectors that were extended or removed + if err := oldSectors.ForEach(&osi, func(i int64) error { + var nsi miner.SectorOnChainInfo + found, err := newSectors.Get(uint64(osi.SectorNumber), &nsi) + if err != nil { + return err + } + if !found { + sectorChanges.Removed = append(sectorChanges.Removed, osi) + return nil + } + + if nsi.Expiration != osi.Expiration { + sectorChanges.Extended = append(sectorChanges.Extended, SectorExtensions{ + From: osi, + To: nsi, + }) + } + + // we don't update miners state filed with `newSectors.Root()` so this operation is safe. + if err := newSectors.Delete(uint64(osi.SectorNumber)); err != nil { + return err + } + return nil + }); err != nil { + return false, nil, err + } + + // all sectors that remain in newSectors are new + var nsi miner.SectorOnChainInfo + if err := newSectors.ForEach(&nsi, func(i int64) error { + sectorChanges.Added = append(sectorChanges.Added, nsi) + return nil + }); err != nil { + return false, nil, err + } + + // nothing changed + if len(sectorChanges.Added)+len(sectorChanges.Extended)+len(sectorChanges.Removed) == 0 { + return false, nil, nil + } + + return true, sectorChanges, nil + } +} + +type contextStore struct { + ctx context.Context + cst *cbor.BasicIpldStore +} + +func (cs *contextStore) Context() context.Context { + return cs.ctx +} + +func (cs *contextStore) Get(ctx context.Context, c cid.Cid, out interface{}) error { + return cs.cst.Get(ctx, c, out) +} + +func (cs *contextStore) Put(ctx context.Context, v interface{}) (cid.Cid, error) { + return cs.cst.Put(ctx, v) +} diff --git a/chain/events/state/predicates_test.go b/chain/events/state/predicates_test.go index c8f115972..1573b84d8 100644 --- a/chain/events/state/predicates_test.go +++ b/chain/events/state/predicates_test.go @@ -4,23 +4,26 @@ import ( "context" "testing" - "github.com/filecoin-project/specs-actors/actors/crypto" - "github.com/ipfs/go-hamt-ipld" - - "github.com/filecoin-project/go-amt-ipld/v2" - "github.com/filecoin-project/specs-actors/actors/builtin/market" - ds "github.com/ipfs/go-datastore" - ds_sync "github.com/ipfs/go-datastore/sync" - bstore "github.com/ipfs/go-ipfs-blockstore" - cbornode "github.com/ipfs/go-ipld-cbor" + "github.com/stretchr/testify/require" "golang.org/x/xerrors" "github.com/ipfs/go-cid" - "github.com/stretchr/testify/require" + ds "github.com/ipfs/go-datastore" + ds_sync "github.com/ipfs/go-datastore/sync" + "github.com/ipfs/go-hamt-ipld" + bstore "github.com/ipfs/go-ipfs-blockstore" + cbornode "github.com/ipfs/go-ipld-cbor" "github.com/filecoin-project/go-address" - "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/go-amt-ipld/v2" "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/specs-actors/actors/abi/big" + "github.com/filecoin-project/specs-actors/actors/builtin/market" + "github.com/filecoin-project/specs-actors/actors/builtin/miner" + "github.com/filecoin-project/specs-actors/actors/crypto" + tutils "github.com/filecoin-project/specs-actors/support/testing" + + "github.com/filecoin-project/lotus/chain/types" ) var dummyCid cid.Cid @@ -104,12 +107,12 @@ func TestPredicates(t *testing.T) { diffFn := preds.OnStorageMarketActorChanged(preds.OnDealStateChanged(preds.DealStateChangedForIDs(dealIds))) // Diff a state against itself: expect no change - changed, _, err := diffFn(ctx, oldState, oldState) + changed, _, err := diffFn(ctx, oldState.Key(), oldState.Key()) require.NoError(t, err) require.False(t, changed) // Diff old state against new state - changed, val, err := diffFn(ctx, oldState, newState) + changed, val, err := diffFn(ctx, oldState.Key(), newState.Key()) require.NoError(t, err) require.True(t, changed) @@ -130,7 +133,7 @@ func TestPredicates(t *testing.T) { // Diff with non-existent deal. noDeal := []abi.DealID{3} diffNoDealFn := preds.OnStorageMarketActorChanged(preds.OnDealStateChanged(preds.DealStateChangedForIDs(noDeal))) - changed, _, err = diffNoDealFn(ctx, oldState, newState) + changed, _, err = diffNoDealFn(ctx, oldState.Key(), newState.Key()) require.NoError(t, err) require.False(t, changed) @@ -141,7 +144,7 @@ func TestPredicates(t *testing.T) { t.Fatal("No state change so this should not be called") return false, nil, nil }) - changed, _, err = actorDiffFn(ctx, oldState, oldState) + changed, _, err = actorDiffFn(ctx, oldState.Key(), oldState.Key()) require.NoError(t, err) require.False(t, changed) @@ -156,6 +159,87 @@ func TestPredicates(t *testing.T) { require.False(t, changed) } +func TestMinerSectorChange(t *testing.T) { + ctx := context.Background() + bs := bstore.NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore())) + store := cbornode.NewCborStore(bs) + + nextID := uint64(0) + nextIDAddrF := func() address.Address { + defer func() { nextID++ }() + return tutils.NewIDAddr(t, nextID) + } + + owner, worker := nextIDAddrF(), nextIDAddrF() + si0 := newSectorOnChainInfo(0, tutils.MakeCID("0"), big.NewInt(0), abi.ChainEpoch(0), abi.ChainEpoch(10)) + si1 := newSectorOnChainInfo(1, tutils.MakeCID("1"), big.NewInt(1), abi.ChainEpoch(1), abi.ChainEpoch(11)) + si2 := newSectorOnChainInfo(2, tutils.MakeCID("2"), big.NewInt(2), abi.ChainEpoch(2), abi.ChainEpoch(11)) + oldMinerC := createMinerState(ctx, t, store, owner, worker, []miner.SectorOnChainInfo{si0, si1, si2}) + + si3 := newSectorOnChainInfo(3, tutils.MakeCID("3"), big.NewInt(3), abi.ChainEpoch(3), abi.ChainEpoch(12)) + // 0 delete + // 1 extend + // 2 same + // 3 added + si1Ext := si1 + si1Ext.Expiration++ + newMinerC := createMinerState(ctx, t, store, owner, worker, []miner.SectorOnChainInfo{si1Ext, si2, si3}) + + minerAddr := nextIDAddrF() + oldState, err := mockTipset(minerAddr, 1) + require.NoError(t, err) + newState, err := mockTipset(minerAddr, 2) + require.NoError(t, err) + + api := newMockAPI(bs) + api.setActor(oldState.Key(), &types.Actor{Head: oldMinerC}) + api.setActor(newState.Key(), &types.Actor{Head: newMinerC}) + + preds := NewStatePredicates(api) + + minerDiffFn := preds.OnMinerActorChange(minerAddr, preds.OnMinerSectorChange()) + change, val, err := minerDiffFn(ctx, oldState.Key(), newState.Key()) + require.NoError(t, err) + require.True(t, change) + require.NotNil(t, val) + + sectorChanges, ok := val.(*MinerSectorChanges) + require.True(t, ok) + + require.Equal(t, len(sectorChanges.Added), 1) + require.Equal(t, sectorChanges.Added[0], si3) + + require.Equal(t, len(sectorChanges.Removed), 1) + require.Equal(t, sectorChanges.Removed[0], si0) + + require.Equal(t, len(sectorChanges.Extended), 1) + require.Equal(t, sectorChanges.Extended[0].From, si1) + require.Equal(t, sectorChanges.Extended[0].To, si1Ext) + + change, val, err = minerDiffFn(ctx, oldState.Key(), oldState.Key()) + require.NoError(t, err) + require.False(t, change) + require.Nil(t, val) + + change, val, err = minerDiffFn(ctx, newState.Key(), oldState.Key()) + require.NoError(t, err) + require.True(t, change) + require.NotNil(t, val) + + sectorChanges, ok = val.(*MinerSectorChanges) + require.True(t, ok) + + require.Equal(t, len(sectorChanges.Added), 1) + require.Equal(t, sectorChanges.Added[0], si0) + + require.Equal(t, len(sectorChanges.Removed), 1) + require.Equal(t, sectorChanges.Removed[0], si3) + + require.Equal(t, len(sectorChanges.Extended), 1) + require.Equal(t, sectorChanges.Extended[0].To, si1) + require.Equal(t, sectorChanges.Extended[0].From, si1Ext) +} + func mockTipset(miner address.Address, timestamp uint64) (*types.TipSet, error) { return types.NewTipSet([]*types.BlockHeader{{ Miner: miner, @@ -170,7 +254,7 @@ func mockTipset(miner address.Address, timestamp uint64) (*types.TipSet, error) } func createMarketState(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, deals map[abi.DealID]*market.DealState) cid.Cid { - rootCid := createAMT(ctx, t, store, deals) + rootCid := createDealAMT(ctx, t, store, deals) state := createEmptyMarketState(t, store) state.States = rootCid @@ -188,7 +272,7 @@ func createEmptyMarketState(t *testing.T, store *cbornode.BasicIpldStore) *marke return market.ConstructState(emptyArrayCid, emptyMap, emptyMap) } -func createAMT(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, deals map[abi.DealID]*market.DealState) cid.Cid { +func createDealAMT(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, deals map[abi.DealID]*market.DealState) cid.Cid { root := amt.NewAMT(store) for dealID, dealState := range deals { err := root.Set(ctx, uint64(dealID), dealState) @@ -198,3 +282,77 @@ func createAMT(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore require.NoError(t, err) return rootCid } + +func createMinerState(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, owner, worker address.Address, sectors []miner.SectorOnChainInfo) cid.Cid { + rootCid := createSectorsAMT(ctx, t, store, sectors) + + state := createEmptyMinerState(ctx, t, store, owner, worker) + state.Sectors = rootCid + + stateC, err := store.Put(ctx, state) + require.NoError(t, err) + return stateC +} + +func createEmptyMinerState(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, owner, worker address.Address) *miner.State { + emptyArrayCid, err := amt.NewAMT(store).Flush(context.TODO()) + require.NoError(t, err) + emptyMap, err := store.Put(context.TODO(), hamt.NewNode(store, hamt.UseTreeBitWidth(5))) + require.NoError(t, err) + + emptyDeadlines := miner.ConstructDeadlines() + emptyDeadlinesCid, err := store.Put(context.Background(), emptyDeadlines) + require.NoError(t, err) + + minerInfo := emptyMap + + state, err := miner.ConstructState(minerInfo, 123, emptyArrayCid, emptyMap, emptyDeadlinesCid) + require.NoError(t, err) + return state + +} + +func createSectorsAMT(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, sectors []miner.SectorOnChainInfo) cid.Cid { + root := amt.NewAMT(store) + for _, sector := range sectors { + sector := sector + err := root.Set(ctx, uint64(sector.SectorNumber), §or) + require.NoError(t, err) + } + rootCid, err := root.Flush(ctx) + require.NoError(t, err) + return rootCid +} + +// returns a unique SectorOnChainInfo with each invocation with SectorNumber set to `sectorNo`. +func newSectorOnChainInfo(sectorNo abi.SectorNumber, sealed cid.Cid, weight big.Int, activation, expiration abi.ChainEpoch) miner.SectorOnChainInfo { + info := newSectorPreCommitInfo(sectorNo, sealed, expiration) + return miner.SectorOnChainInfo{ + SectorNumber: info.SectorNumber, + SealProof: info.SealProof, + SealedCID: info.SealedCID, + DealIDs: info.DealIDs, + Expiration: info.Expiration, + + Activation: activation, + DealWeight: weight, + VerifiedDealWeight: weight, + InitialPledge: big.Zero(), + } +} + +const ( + sectorSealRandEpochValue = abi.ChainEpoch(1) +) + +// returns a unique SectorPreCommitInfo with each invocation with SectorNumber set to `sectorNo`. +func newSectorPreCommitInfo(sectorNo abi.SectorNumber, sealed cid.Cid, expiration abi.ChainEpoch) *miner.SectorPreCommitInfo { + return &miner.SectorPreCommitInfo{ + SealProof: abi.RegisteredSealProof_StackedDrg32GiBV1, + SectorNumber: sectorNo, + SealedCID: sealed, + SealRandEpoch: sectorSealRandEpochValue, + DealIDs: nil, + Expiration: expiration, + } +} diff --git a/chain/store/store.go b/chain/store/store.go index 985bfa482..fa31f6b08 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -21,6 +21,7 @@ import ( "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/state" "github.com/filecoin-project/lotus/chain/vm" + "github.com/filecoin-project/lotus/journal" "github.com/filecoin-project/lotus/metrics" "go.opencensus.io/stats" "go.opencensus.io/trace" @@ -325,6 +326,14 @@ func (cs *ChainStore) reorgWorker(ctx context.Context, initialNotifees []ReorgNo continue } + journal.Add("sync", map[string]interface{}{ + "op": "headChange", + "from": r.old.Key(), + "to": r.new.Key(), + "rev": len(revert), + "apply": len(apply), + }) + // reverse the apply array for i := len(apply)/2 - 1; i >= 0; i-- { opp := len(apply) - 1 - i diff --git a/chain/sync.go b/chain/sync.go index 19a0393b3..bcfa71267 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -536,7 +536,7 @@ func (syncer *Syncer) ValidateTipSet(ctx context.Context, fts *store.FullTipSet) futures = append(futures, async.Err(func() error { if err := syncer.ValidateBlock(ctx, b); err != nil { if isPermanent(err) { - syncer.bad.Add(b.Cid(), BadBlockReason{Reason: err.Error()}) + syncer.bad.Add(b.Cid(), NewBadBlockReason([]cid.Cid{b.Cid()}, err.Error())) } return xerrors.Errorf("validating block %s: %w", b.Cid(), err) } diff --git a/cli/client.go b/cli/client.go index a81e35812..5e8d61e83 100644 --- a/cli/client.go +++ b/cli/client.go @@ -418,8 +418,8 @@ var clientRetrieveCmd = &cli.Command{ ArgsUsage: "[dataCid outputPath]", Flags: []cli.Flag{ &cli.StringFlag{ - Name: "address", - Usage: "address to use for transactions", + Name: "from", + Usage: "address to send transactions from", }, &cli.BoolFlag{ Name: "car", @@ -444,8 +444,8 @@ var clientRetrieveCmd = &cli.Command{ ctx := ReqContext(cctx) var payer address.Address - if cctx.String("address") != "" { - payer, err = address.NewFromString(cctx.String("address")) + if cctx.String("from") != "" { + payer, err = address.NewFromString(cctx.String("from")) } else { payer, err = fapi.WalletDefaultAddress(ctx) } @@ -531,7 +531,7 @@ var clientQueryAskCmd = &cli.Command{ }, Action: func(cctx *cli.Context) error { if cctx.NArg() != 1 { - fmt.Println("Usage: query-ask [address]") + fmt.Println("Usage: query-ask [minerAddress]") return nil } diff --git a/cli/multisig.go b/cli/multisig.go index 5861290f3..543e4ac10 100644 --- a/cli/multisig.go +++ b/cli/multisig.go @@ -58,7 +58,7 @@ var msigCreateCmd = &cli.Command{ Value: "0", }, &cli.StringFlag{ - Name: "sender", + Name: "from", Usage: "account to send the create message from", }, }, @@ -85,7 +85,7 @@ var msigCreateCmd = &cli.Command{ // get the address we're going to use to create the multisig (can be one of the above, as long as they have funds) var sendAddr address.Address - if send := cctx.String("sender"); send == "" { + if send := cctx.String("from"); send == "" { defaddr, err := api.WalletDefaultAddress(ctx) if err != nil { return err diff --git a/cli/state.go b/cli/state.go index d71fd80e0..03fa51a3e 100644 --- a/cli/state.go +++ b/cli/state.go @@ -790,8 +790,8 @@ var stateComputeStateCmd = &cli.Command{ Usage: "Perform state computations", Flags: []cli.Flag{ &cli.Uint64Flag{ - Name: "height", - Usage: "set the height to compute state at", + Name: "vm-height", + Usage: "set the height that the vm will see", }, &cli.BoolFlag{ Name: "apply-mpool-messages", @@ -820,7 +820,7 @@ var stateComputeStateCmd = &cli.Command{ return err } - h := abi.ChainEpoch(cctx.Uint64("height")) + h := abi.ChainEpoch(cctx.Uint64("vm-height")) if h == 0 { if ts == nil { head, err := api.ChainHead(ctx) diff --git a/cmd/lotus-chainwatch/storage.go b/cmd/lotus-chainwatch/storage.go index 4353d76be..726a7a187 100644 --- a/cmd/lotus-chainwatch/storage.go +++ b/cmd/lotus-chainwatch/storage.go @@ -3,20 +3,18 @@ package main import ( "context" "database/sql" - "fmt" - "github.com/filecoin-project/specs-actors/actors/util/adt" - "github.com/libp2p/go-libp2p-core/peer" "sync" "time" "github.com/filecoin-project/go-address" "github.com/filecoin-project/specs-actors/actors/abi" - miner_spec "github.com/filecoin-project/specs-actors/actors/builtin/miner" "github.com/ipfs/go-cid" _ "github.com/lib/pq" + "github.com/libp2p/go-libp2p-core/peer" "golang.org/x/xerrors" "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/events/state" "github.com/filecoin-project/lotus/chain/types" ) @@ -25,8 +23,7 @@ type storage struct { headerLk sync.Mutex - // stateful miner data - minerSectors map[cid.Cid]struct{} + genesisTs *types.TipSet } func openStorage(dbSource string) (*storage, error) { @@ -37,10 +34,7 @@ func openStorage(dbSource string) (*storage, error) { db.SetMaxOpenConns(1350) - ms := make(map[cid.Cid]struct{}) - ms[cid.Undef] = struct{}{} - - st := &storage{db: db, minerSectors: ms} + st := &storage{db: db} return st, st.setup() } @@ -313,6 +307,19 @@ create table if not exists miner_sectors_heads ); +create type miner_sector_event_type as enum ('ADDED', 'EXTENDED', 'EXPIRED', 'TERMINATED'); + +create table if not exists miner_sector_events +( + miner_id text not null, + sector_id bigint not null, + state_root text not null, + event miner_sector_event_type not null, + + constraint miner_sector_events_pk + primary key (sector_id, event, miner_id, state_root) +) + /* create or replace function miner_tips(epoch bigint) returns table (head text, @@ -600,12 +607,6 @@ func (st *storage) storeMiners(minerTips map[types.TipSetKey][]*minerStateInfo) return tx.Commit() } -type minerSectorUpdate struct { - minerState *minerStateInfo - tskey types.TipSetKey - oldSector cid.Cid -} - func (st *storage) storeMinerSectorsHeads(minerTips map[types.TipSetKey][]*minerStateInfo, api api.FullNode) error { tx, err := st.db.Begin() if err != nil { @@ -621,26 +622,8 @@ func (st *storage) storeMinerSectorsHeads(minerTips map[types.TipSetKey][]*miner return err } - var updateMiners []*minerSectorUpdate - for tsk, miners := range minerTips { + for _, miners := range minerTips { for _, miner := range miners { - sectorCID, err := st.getLatestMinerSectorCID(context.TODO(), miner.addr) - if err != nil { - panic(err) - } - if sectorCID == cid.Undef { - continue - } - if _, found := st.minerSectors[sectorCID]; !found { - // schedule miner table update - updateMiners = append(updateMiners, &minerSectorUpdate{ - minerState: miner, - tskey: tsk, - oldSector: sectorCID, - }) - } - st.minerSectors[sectorCID] = struct{}{} - log.Debugw("got sector CID", "miner", miner.addr, "cid", sectorCID.String()) if _, err := stmt.Exec( miner.addr.String(), miner.state.Sectors.String(), @@ -660,94 +643,153 @@ func (st *storage) storeMinerSectorsHeads(minerTips map[types.TipSetKey][]*miner return xerrors.Errorf("actor put: %w", err) } - if err := tx.Commit(); err != nil { + return tx.Commit() +} + +type sectorUpdate struct { + terminationEpoch abi.ChainEpoch + terminated bool + + expirationEpoch abi.ChainEpoch + + sectorID abi.SectorNumber + minerID address.Address +} + +func (st *storage) updateMinerSectors(minerTips map[types.TipSetKey][]*minerStateInfo, api api.FullNode) error { + log.Debugw("updating miners constant sector table", "#tipsets", len(minerTips)) + pred := state.NewStatePredicates(api) + + eventTx, err := st.db.Begin() + if err != nil { return err } - return st.updateMinerSectors(updateMiners, api) -} -type deletedSector struct { - deletedSector miner_spec.SectorOnChainInfo - miner address.Address - tskey types.TipSetKey -} + if _, err := eventTx.Exec(`create temp table mse (like miner_sector_events excluding constraints) on commit drop;`); err != nil { + return xerrors.Errorf("prep temp: %w", err) + } -func (st *storage) updateMinerSectors(miners []*minerSectorUpdate, api api.FullNode) error { - log.Info("updating miners constant sector table") - var deletedSectors []*deletedSector - for _, miner := range miners { - s := &apiIpldStore{context.TODO(), api} - newSectors, err := adt.AsArray(s, miner.minerState.state.Sectors) - if err != nil { - log.Warnw("new sectors as array", "error", err, "cid", miner.minerState.state.Sectors) - return err + eventStmt, err := eventTx.Prepare(`copy mse (sector_id, event, miner_id, state_root) from STDIN `) + if err != nil { + return err + } + + var sectorUpdates []sectorUpdate + // TODO consider performing the miner sector diffing in parallel and performing the database update after. + for _, miners := range minerTips { + for _, miner := range miners { + // special case genesis miners + if miner.tsKey == st.genesisTs.Key() { + sectors, err := api.StateMinerSectors(context.TODO(), miner.addr, nil, true, miner.tsKey) + if err != nil { + log.Debugw("failed to get miner info for genesis", "miner", miner.addr.String()) + continue + } + + for _, sector := range sectors { + if _, err := eventStmt.Exec(sector.Info.SectorNumber, "ADDED", miner.addr.String(), miner.stateroot.String()); err != nil { + return err + } + } + } else { + sectorDiffFn := pred.OnMinerActorChange(miner.addr, pred.OnMinerSectorChange()) + changed, val, err := sectorDiffFn(context.TODO(), miner.parentTsKey, miner.tsKey) + if err != nil { + log.Debugw("error getting miner sector diff", "miner", miner.addr, "error", err) + continue + } + if !changed { + continue + } + changes := val.(*state.MinerSectorChanges) + log.Debugw("sector changes for miner", "miner", miner.addr.String(), "Added", len(changes.Added), "Extended", len(changes.Extended), "Removed", len(changes.Removed), "oldState", miner.parentTsKey, "newState", miner.tsKey) + + for _, extended := range changes.Extended { + if _, err := eventStmt.Exec(extended.To.SectorNumber, "EXTENDED", miner.addr.String(), miner.stateroot.String()); err != nil { + return err + } + sectorUpdates = append(sectorUpdates, sectorUpdate{ + terminationEpoch: 0, + terminated: false, + expirationEpoch: extended.To.Expiration, + sectorID: extended.To.SectorNumber, + minerID: miner.addr, + }) + log.Debugw("sector extended", "miner", miner.addr.String(), "sector", extended.To.SectorNumber, "old", extended.To.Expiration, "new", extended.From.Expiration) + } + curTs, err := api.ChainGetTipSet(context.TODO(), miner.tsKey) + if err != nil { + return err + } + + for _, removed := range changes.Removed { + // decide if they were terminated or extended + if removed.Expiration > curTs.Height() { + if _, err := eventStmt.Exec(removed.SectorNumber, "TERMINATED", miner.addr.String(), miner.stateroot.String()); err != nil { + return err + } + log.Debugw("sector terminated", "miner", miner.addr.String(), "sector", removed.SectorNumber, "old", "sectorExpiration", removed.Expiration, "terminationEpoch", curTs.Height()) + sectorUpdates = append(sectorUpdates, sectorUpdate{ + terminationEpoch: curTs.Height(), + terminated: true, + expirationEpoch: removed.Expiration, + sectorID: removed.SectorNumber, + minerID: miner.addr, + }) + } + if _, err := eventStmt.Exec(removed.SectorNumber, "EXPIRED", miner.addr.String(), miner.stateroot.String()); err != nil { + return err + } + log.Debugw("sector removed", "miner", miner.addr.String(), "sector", removed.SectorNumber, "old", "sectorExpiration", removed.Expiration, "currEpoch", curTs.Height()) + } + + for _, added := range changes.Added { + if _, err := eventStmt.Exec(miner.addr.String(), added.SectorNumber, miner.stateroot.String(), "ADDED"); err != nil { + return err + } + } + } } + } + if err := eventStmt.Close(); err != nil { + return err + } - oldSectors, err := adt.AsArray(s, miner.oldSector) - if err != nil { - log.Warnw("old sectors as array", "error", err, "cid", miner.oldSector.String()) - return err - } + if _, err := eventTx.Exec(`insert into miner_sector_events select * from mse on conflict do nothing `); err != nil { + return xerrors.Errorf("actor put: %w", err) + } - var oldSecInfo miner_spec.SectorOnChainInfo - var newSecInfo miner_spec.SectorOnChainInfo - // if we cannot find an old sector in the new list then it was removed. - if err := oldSectors.ForEach(&oldSecInfo, func(i int64) error { - found, err := newSectors.Get(uint64(oldSecInfo.SectorNumber), &newSecInfo) - if err != nil { - log.Warnw("new sectors get", "error", err) + if err := eventTx.Commit(); err != nil { + return err + } + + updateTx, err := st.db.Begin() + if err != nil { + return err + } + + updateStmt, err := updateTx.Prepare(`UPDATE miner_sectors SET termination_epoch=$1, expiration_epoch=$2 WHERE miner_id=$3 AND sector_id=$4`) + if err != nil { + return err + } + + for _, update := range sectorUpdates { + if update.terminated { + if _, err := updateStmt.Exec(update.terminationEpoch, update.expirationEpoch, update.minerID.String(), update.sectorID); err != nil { return err } - if !found { - log.Infow("MINER DELETED SECTOR", "miner", miner.minerState.addr.String(), "sector", oldSecInfo.SectorNumber, "tipset", miner.tskey.String()) - deletedSectors = append(deletedSectors, &deletedSector{ - deletedSector: oldSecInfo, - miner: miner.minerState.addr, - tskey: miner.tskey, - }) + } else { + if _, err := updateStmt.Exec(nil, update.expirationEpoch, update.minerID.String(), update.sectorID); err != nil { + return err } - return nil - }); err != nil { - log.Warnw("old sectors foreach", "error", err) - return err - } - if len(deletedSectors) > 0 { - log.Infow("Calculated updates", "miner", miner.minerState.addr, "deleted sectors", len(deletedSectors)) - } - } - // now we have all the sectors that were removed, update the database - tx, err := st.db.Begin() - if err != nil { - return err - } - stmt, err := tx.Prepare(`UPDATE miner_sectors SET termination_epoch=$1 WHERE miner_id=$2 AND sector_id=$3`) - if err != nil { - return err - } - for _, ds := range deletedSectors { - ts, err := api.ChainGetTipSet(context.TODO(), ds.tskey) - if err != nil { - log.Warnw("get tipset", "error", err) - return err - } - // TODO validate this shits right - if ts.Height() >= ds.deletedSector.Expiration { - // means it expired, do nothing - log.Infow("expired sector", "miner", ds.miner.String(), "sector", ds.deletedSector.SectorNumber) - continue - } - log.Infow("terminated sector", "miner", ds.miner.String(), "sector", ds.deletedSector.SectorNumber) - // means it was terminated. - if _, err := stmt.Exec(int64(ts.Height()), ds.miner.String(), int64(ds.deletedSector.SectorNumber)); err != nil { - return err } } - if err := stmt.Close(); err != nil { + if err := updateStmt.Close(); err != nil { return err } - defer log.Info("update miner sectors complete") - return tx.Commit() + + return updateTx.Commit() } func (st *storage) storeHeaders(bhs map[cid.Cid]*types.BlockHeader, sync bool) error { @@ -1252,27 +1294,3 @@ func (st *storage) refreshViews() error { func (st *storage) close() error { return st.db.Close() } - -func (st *storage) getLatestMinerSectorCID(ctx context.Context, miner address.Address) (cid.Cid, error) { - queryStr := fmt.Sprintf(` -SELECT miner_sectors_cid -FROM miner_sectors_heads -LEFT JOIN blocks ON miner_sectors_heads.state_root = blocks.parentstateroot -WHERE miner_id = '%s' -ORDER BY blocks.height DESC -LIMIT 1; -`, - miner.String()) - - var cidstr string - err := st.db.QueryRowContext(ctx, queryStr).Scan(&cidstr) - switch { - case err == sql.ErrNoRows: - log.Warnf("no miner with miner_id: %s in table", miner) - return cid.Undef, nil - case err != nil: - return cid.Undef, err - default: - return cid.Decode(cidstr) - } -} diff --git a/cmd/lotus-chainwatch/sync.go b/cmd/lotus-chainwatch/sync.go index a3bb57c43..3627072e8 100644 --- a/cmd/lotus-chainwatch/sync.go +++ b/cmd/lotus-chainwatch/sync.go @@ -59,6 +59,10 @@ type minerStateInfo struct { act types.Actor stateroot cid.Cid + // calculating changes + tsKey types.TipSetKey + parentTsKey types.TipSetKey + // miner specific state miner.State info *miner.MinerInfo @@ -71,9 +75,10 @@ type minerStateInfo struct { } type actorInfo struct { - stateroot cid.Cid - tsKey types.TipSetKey - state string + stateroot cid.Cid + tsKey types.TipSetKey + parentTsKey types.TipSetKey + state string } func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.TipSet, maxBatch int) { @@ -169,6 +174,8 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. if len(bh.Parents) == 0 { // genesis case genesisTs, _ := types.NewTipSet([]*types.BlockHeader{bh}) + st.genesisTs = genesisTs + aadrs, err := api.StateListActors(ctx, genesisTs.Key()) if err != nil { log.Error(err) @@ -201,9 +208,10 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. actors[addr] = map[types.Actor]actorInfo{} } actors[addr][*act] = actorInfo{ - stateroot: bh.ParentStateRoot, - tsKey: genesisTs.Key(), - state: string(state), + stateroot: bh.ParentStateRoot, + tsKey: genesisTs.Key(), + parentTsKey: genesisTs.Key(), + state: string(state), } addressToID[addr] = address.Undef alk.Unlock() @@ -237,7 +245,6 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. } ast, err := api.StateReadState(ctx, addr, pts.Key()) - if err != nil { log.Error(err) return @@ -256,9 +263,10 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. } // a change occurred for the actor with address `addr` and state `act` at tipset `pts`. actors[addr][act] = actorInfo{ - stateroot: bh.ParentStateRoot, - state: string(state), - tsKey: pts.Key(), + stateroot: bh.ParentStateRoot, + state: string(state), + tsKey: pts.Key(), + parentTsKey: pts.Parents(), } addressToID[addr] = address.Undef alk.Unlock() @@ -314,6 +322,9 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. act: actor, stateroot: c.stateroot, + tsKey: c.tsKey, + parentTsKey: c.parentTsKey, + state: miner.State{}, info: nil, @@ -430,6 +441,12 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. return } + log.Info("updating miner sectors heads") + if err := st.updateMinerSectors(minerTips, api); err != nil { + log.Error(err) + return + } + log.Infof("Storing messages") if err := st.storeMessages(msgs); err != nil { diff --git a/journal/journal.go b/journal/journal.go new file mode 100644 index 000000000..b664e8fa7 --- /dev/null +++ b/journal/journal.go @@ -0,0 +1,146 @@ +package journal + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "sync" + "time" + + logging "github.com/ipfs/go-log" + "golang.org/x/xerrors" +) + +func InitializeSystemJournal(dir string) error { + if err := os.MkdirAll(dir, 0755); err != nil { + return err + } + j, err := OpenFSJournal(dir) + if err != nil { + return err + } + currentJournal = j + return nil +} + +func Add(sys string, val interface{}) { + if currentJournal == nil { + log.Warn("no journal configured") + return + } + currentJournal.AddEntry(sys, val) +} + +var log = logging.Logger("journal") + +var currentJournal Journal + +type Journal interface { + AddEntry(system string, obj interface{}) + Close() error +} + +// fsJournal is a basic journal backed by files on a filesystem +type fsJournal struct { + fi *os.File + fSize int64 + + lk sync.Mutex + + journalDir string + + incoming chan *JournalEntry + journalSizeLimit int64 + + closing chan struct{} +} + +func OpenFSJournal(dir string) (*fsJournal, error) { + fsj := &fsJournal{ + journalDir: dir, + incoming: make(chan *JournalEntry, 32), + journalSizeLimit: 1 << 30, + closing: make(chan struct{}), + } + + if err := fsj.rollJournalFile(); err != nil { + return nil, err + } + + go fsj.runLoop() + + return fsj, nil +} + +type JournalEntry struct { + System string + Timestamp time.Time + Val interface{} +} + +func (fsj *fsJournal) putEntry(je *JournalEntry) error { + b, err := json.Marshal(je) + if err != nil { + return err + } + n, err := fsj.fi.Write(append(b, '\n')) + if err != nil { + return err + } + + fsj.fSize += int64(n) + + if fsj.fSize >= fsj.journalSizeLimit { + fsj.rollJournalFile() + } + + return nil +} + +func (fsj *fsJournal) rollJournalFile() error { + if fsj.fi != nil { + fsj.fi.Close() + } + + nfi, err := os.Create(filepath.Join(fsj.journalDir, fmt.Sprintf("lotus-journal-%s.ndjson", time.Now().Format(time.RFC3339)))) + if err != nil { + return xerrors.Errorf("failed to open journal file: %w", err) + } + + fsj.fi = nfi + fsj.fSize = 0 + return nil +} + +func (fsj *fsJournal) runLoop() { + for { + select { + case je := <-fsj.incoming: + if err := fsj.putEntry(je); err != nil { + log.Errorw("failed to write out journal entry", "entry", je, "err", err) + } + case <-fsj.closing: + fsj.fi.Close() + return + } + } +} + +func (fsj *fsJournal) AddEntry(system string, obj interface{}) { + je := &JournalEntry{ + System: system, + Timestamp: time.Now(), + Val: obj, + } + select { + case fsj.incoming <- je: + case <-fsj.closing: + log.Warnw("journal closed but tried to log event", "entry", je) + } +} + +func (fsj *fsJournal) Close() error { + close(fsj.closing) + return nil +} diff --git a/markets/storageadapter/client.go b/markets/storageadapter/client.go index d52cdd6b6..bb8ec5571 100644 --- a/markets/storageadapter/client.go +++ b/markets/storageadapter/client.go @@ -415,7 +415,7 @@ func (c *ClientNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID a preds.OnDealStateChanged( preds.DealStateChangedForIDs([]abi.DealID{dealID}))) match := func(oldTs, newTs *types.TipSet) (bool, events.StateChange, error) { - return dealDiff(ctx, oldTs, newTs) + return dealDiff(ctx, oldTs.Key(), newTs.Key()) } if err := c.ev.StateChanged(checkFunc, stateChanged, revert, int(build.MessageConfidence)+1, build.SealRandomnessLookbackLimit, match); err != nil { return xerrors.Errorf("failed to set up state changed handler: %w", err) diff --git a/markets/storageadapter/provider.go b/markets/storageadapter/provider.go index 1f8af59f5..32bed3c2f 100644 --- a/markets/storageadapter/provider.go +++ b/markets/storageadapter/provider.go @@ -430,7 +430,7 @@ func (n *ProviderNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID preds.OnDealStateChanged( preds.DealStateChangedForIDs([]abi.DealID{dealID}))) match := func(oldTs, newTs *types.TipSet) (bool, events.StateChange, error) { - return dealDiff(ctx, oldTs, newTs) + return dealDiff(ctx, oldTs.Key(), newTs.Key()) } if err := n.ev.StateChanged(checkFunc, stateChanged, revert, int(build.MessageConfidence)+1, build.SealRandomnessLookbackLimit, match); err != nil { return xerrors.Errorf("failed to set up state changed handler: %w", err) diff --git a/node/builder.go b/node/builder.go index 7f387d1f1..a438bf5bd 100644 --- a/node/builder.go +++ b/node/builder.go @@ -119,6 +119,7 @@ const ( ExtractApiKey HeadMetricsKey RunPeerTaggerKey + JournalKey SetApiEndpointKey @@ -150,6 +151,7 @@ func defaults() []Option { Override(new(record.Validator), modules.RecordValidator), Override(new(dtypes.Bootstrapper), dtypes.Bootstrapper(false)), Override(new(dtypes.ShutdownChan), make(chan struct{})), + Override(JournalKey, modules.SetupJournal), // Filecoin modules diff --git a/node/modules/core.go b/node/modules/core.go index ca9872d90..84179bd63 100644 --- a/node/modules/core.go +++ b/node/modules/core.go @@ -6,6 +6,7 @@ import ( "errors" "io" "io/ioutil" + "path/filepath" "github.com/gbrlsnchs/jwt/v3" logging "github.com/ipfs/go-log/v2" @@ -18,6 +19,7 @@ import ( "github.com/filecoin-project/lotus/api/apistruct" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/journal" "github.com/filecoin-project/lotus/lib/addrutil" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/repo" @@ -93,3 +95,7 @@ func BuiltinBootstrap() (dtypes.BootstrapPeers, error) { func DrandBootstrap() (dtypes.DrandBootstrap, error) { return build.DrandBootstrap() } + +func SetupJournal(lr repo.LockedRepo) error { + return journal.InitializeSystemJournal(filepath.Join(lr.Path(), "journal")) +} diff --git a/storage/wdpost_run.go b/storage/wdpost_run.go index 5ee74915f..f6ec64583 100644 --- a/storage/wdpost_run.go +++ b/storage/wdpost_run.go @@ -150,16 +150,16 @@ func (s *WindowPoStScheduler) checkNextRecoveries(ctx context.Context, deadline if err != nil { return xerrors.Errorf("checking unrecovered sectors: %w", err) } - + // if all sectors failed to recover, don't declare recoveries sbfCount, err := sbf.Count() if err != nil { return xerrors.Errorf("counting recovered sectors: %w", err) } - + if sbfCount == 0 { - log.Warnw("No recoveries to declare", "deadline", deadline, "faulty", uc) - return nil + log.Warnw("No recoveries to declare", "deadline", deadline, "faulty", uc) + return nil } params := &miner.DeclareFaultsRecoveredParams{