From 1b8aa2c2b41b785589880d44a585bbf7b646c617 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Mon, 2 Dec 2019 20:34:41 -0800 Subject: [PATCH 01/31] fix propagation delay to be a sane number --- build/params_debug.go | 2 ++ build/params_devnet.go | 2 ++ chain/store/store.go | 2 +- chain/sync_manager.go | 2 -- miner/miner.go | 2 +- 5 files changed, 6 insertions(+), 4 deletions(-) diff --git a/build/params_debug.go b/build/params_debug.go index 7186ee478..644a5d7b5 100644 --- a/build/params_debug.go +++ b/build/params_debug.go @@ -7,6 +7,8 @@ import "os" // Seconds const BlockDelay = 6 +const PropagationDelay = 3 + // FallbackPoStDelay is the number of epochs the miner needs to wait after // ElectionPeriodStart before starting fallback post computation // diff --git a/build/params_devnet.go b/build/params_devnet.go index c2c1c3905..6f53d2704 100644 --- a/build/params_devnet.go +++ b/build/params_devnet.go @@ -5,6 +5,8 @@ package build // Seconds const BlockDelay = 30 +const PropagationDelay = 5 + // FallbackPoStDelay is the number of epochs the miner needs to wait after // ElectionPeriodStart before starting fallback post computation // diff --git a/chain/store/store.go b/chain/store/store.go index 9f977b161..9f20626e8 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -290,7 +290,7 @@ func (cs *ChainStore) takeHeaviestTipSet(ctx context.Context, ts *types.TipSet) span.AddAttributes(trace.BoolAttribute("newHead", true)) - log.Debugf("New heaviest tipset! %s", ts.Cids()) + log.Infof("New heaviest tipset! %s (height=%d)", ts.Cids()) cs.heaviest = ts if err := cs.writeHead(ts); err != nil { diff --git a/chain/sync_manager.go b/chain/sync_manager.go index 0e296d617..104ca674c 100644 --- a/chain/sync_manager.go +++ b/chain/sync_manager.go @@ -71,7 +71,6 @@ func (sm *SyncManager) Stop() { } func (sm *SyncManager) SetPeerHead(ctx context.Context, p peer.ID, ts *types.TipSet) { - log.Info("set peer head!", ts.Height(), ts.Cids()) sm.lk.Lock() defer sm.lk.Unlock() sm.peerHeads[p] = ts @@ -336,7 +335,6 @@ func (sm *SyncManager) syncWorker(id int) { log.Info("sync manager worker shutting down") return } - log.Info("sync worker go time!", ts.Height(), ts.Cids()) ctx := context.WithValue(context.TODO(), syncStateKey{}, ss) err := sm.doSync(ctx, ts) diff --git a/miner/miner.go b/miner/miner.go index ef5ef0b7c..f4e5b9ea8 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -27,7 +27,7 @@ func NewMiner(api api.FullNode, epp gen.ElectionPoStProver) *Miner { epp: epp, waitFunc: func(ctx context.Context) error { // Wait around for half the block time in case other parents come in - time.Sleep(build.BlockDelay * time.Second / 2) + time.Sleep(build.PropagationDelay * time.Second) return nil }, } From 4ad0db0ecd5af627f5406ee5fc85da76ee189261 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Mon, 2 Dec 2019 23:35:29 -0800 Subject: [PATCH 02/31] update go-amt-ipld code to fix infinite loop issue --- go.mod | 2 +- go.sum | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 9cfefdfc4..8c523faef 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/fatih/color v1.7.0 // indirect github.com/filecoin-project/chain-validation v0.0.0-20191106200742-11986803c0f7 github.com/filecoin-project/filecoin-ffi v0.0.0-00010101000000-000000000000 - github.com/filecoin-project/go-amt-ipld v0.0.0-20191122035745-59b9dfc0efc7 + github.com/filecoin-project/go-amt-ipld v0.0.0-20191203073133-f941215342ed github.com/gbrlsnchs/jwt/v3 v3.0.0-beta.1 github.com/go-ole/go-ole v1.2.4 // indirect github.com/google/go-cmp v0.3.1 // indirect diff --git a/go.sum b/go.sum index 7c5401b5f..07e518523 100644 --- a/go.sum +++ b/go.sum @@ -80,6 +80,8 @@ github.com/filecoin-project/chain-validation v0.0.0-20191106200742-11986803c0f7 github.com/filecoin-project/chain-validation v0.0.0-20191106200742-11986803c0f7/go.mod h1:0/0/QUNqpF/jVzLHFncGeT3NvGPODBhGzQlNgzmoZew= github.com/filecoin-project/go-amt-ipld v0.0.0-20191122035745-59b9dfc0efc7 h1:lKSMm8Go6qI7+Dk3rWCNIh57wBOqVNJ21re/p7D58gc= github.com/filecoin-project/go-amt-ipld v0.0.0-20191122035745-59b9dfc0efc7/go.mod h1:lKjJYPg2kwbav5f78i5YA8kGccnZn18IySbpneXvaQs= +github.com/filecoin-project/go-amt-ipld v0.0.0-20191203073133-f941215342ed h1:Wt4+eF3fda6MKLjK0/zzBOxs5cUwGyucJSAfO4LnX/w= +github.com/filecoin-project/go-amt-ipld v0.0.0-20191203073133-f941215342ed/go.mod h1:KsFPWjF+UUYl6n9A+qbg4bjFgAOneicFZtDH/LQEX2U= github.com/filecoin-project/go-leb128 v0.0.0-20190212224330-8d79a5489543 h1:aMJGfgqe1QDhAVwxRg5fjCRF533xHidiKsugk7Vvzug= github.com/filecoin-project/go-leb128 v0.0.0-20190212224330-8d79a5489543/go.mod h1:mjrHv1cDGJWDlGmC0eDc1E5VJr8DmL9XMUcaFwiuKg8= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= From c1cd332ff73122ea279139f904c82e4791dda26a Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Mon, 2 Dec 2019 23:46:58 -0800 Subject: [PATCH 03/31] impose a maximum sector ID --- chain/actors/actor_miner.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/chain/actors/actor_miner.go b/chain/actors/actor_miner.go index 6494f3220..022e49ad5 100644 --- a/chain/actors/actor_miner.go +++ b/chain/actors/actor_miner.go @@ -23,6 +23,8 @@ import ( "golang.org/x/xerrors" ) +const MaxSectorID = 32 << 30 // 32 billion sectors should be good enough right? + type StorageMinerActor struct{} type StorageMinerActorState struct { @@ -537,6 +539,9 @@ func SectorIsUnique(ctx context.Context, s types.Storage, sroot cid.Cid, sid uin } func AddToSectorSet(ctx context.Context, blks amt.Blocks, ss cid.Cid, sectorID uint64, commR, commD []byte) (cid.Cid, ActorError) { + if sectorID > MaxSectorID { + return cid.Undef, aerrors.Newf(25, "sector ID out of range: %d", sectorID) + } ssr, err := amt.LoadAMT(blks, ss) if err != nil { return cid.Undef, aerrors.HandleExternalError(err, "could not load sector set node") @@ -557,6 +562,10 @@ func AddToSectorSet(ctx context.Context, blks amt.Blocks, ss cid.Cid, sectorID u } func GetFromSectorSet(ctx context.Context, s types.Storage, ss cid.Cid, sectorID uint64) (bool, []byte, []byte, ActorError) { + if sectorID > MaxSectorID { + return false, nil, nil, aerrors.Newf(25, "sector ID out of range: %d", sectorID) + } + ssr, err := amt.LoadAMT(types.WrapStorage(s), ss) if err != nil { return false, nil, nil, aerrors.HandleExternalError(err, "could not load sector set node") From 84f07cee3505b8f81d35a6a6b6c532299e308250 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Mon, 2 Dec 2019 23:58:38 -0800 Subject: [PATCH 04/31] fix null block mining --- go.sum | 2 -- miner/miner.go | 5 ++--- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/go.sum b/go.sum index 07e518523..934cf3331 100644 --- a/go.sum +++ b/go.sum @@ -78,8 +78,6 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv github.com/fd/go-nat v1.0.0/go.mod h1:BTBu/CKvMmOMUPkKVef1pngt2WFH/lg7E6yQnulfp6E= github.com/filecoin-project/chain-validation v0.0.0-20191106200742-11986803c0f7 h1:Ags/z6ZubzKonQ9PsY9fO439yGdVg07qpdxfv/AEUno= github.com/filecoin-project/chain-validation v0.0.0-20191106200742-11986803c0f7/go.mod h1:0/0/QUNqpF/jVzLHFncGeT3NvGPODBhGzQlNgzmoZew= -github.com/filecoin-project/go-amt-ipld v0.0.0-20191122035745-59b9dfc0efc7 h1:lKSMm8Go6qI7+Dk3rWCNIh57wBOqVNJ21re/p7D58gc= -github.com/filecoin-project/go-amt-ipld v0.0.0-20191122035745-59b9dfc0efc7/go.mod h1:lKjJYPg2kwbav5f78i5YA8kGccnZn18IySbpneXvaQs= github.com/filecoin-project/go-amt-ipld v0.0.0-20191203073133-f941215342ed h1:Wt4+eF3fda6MKLjK0/zzBOxs5cUwGyucJSAfO4LnX/w= github.com/filecoin-project/go-amt-ipld v0.0.0-20191203073133-f941215342ed/go.mod h1:KsFPWjF+UUYl6n9A+qbg4bjFgAOneicFZtDH/LQEX2U= github.com/filecoin-project/go-leb128 v0.0.0-20190212224330-8d79a5489543 h1:aMJGfgqe1QDhAVwxRg5fjCRF533xHidiKsugk7Vvzug= diff --git a/miner/miner.go b/miner/miner.go index ef5ef0b7c..e04286669 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -232,9 +232,8 @@ func (m *Miner) GetBestMiningCandidate(ctx context.Context) (*MiningBase, error) } } - return &MiningBase{ - ts: bts, - }, nil + m.lastWork = &MiningBase{ts: bts} + return m.lastWork, nil } func (m *Miner) hasPower(ctx context.Context, addr address.Address, ts *types.TipSet) (bool, error) { From ac11fb6d7fb5f693a042301d02b0b5fc774fd029 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 3 Dec 2019 12:05:12 +0100 Subject: [PATCH 05/31] chainwatch: gather receipts --- cmd/lotus-chainwatch/storage.go | 48 +++++++++++++++++++++++++++++++++ cmd/lotus-chainwatch/sync.go | 45 +++++++++++++++++++++++++++++++ 2 files changed, 93 insertions(+) diff --git a/cmd/lotus-chainwatch/storage.go b/cmd/lotus-chainwatch/storage.go index 35135313f..572b8dde3 100644 --- a/cmd/lotus-chainwatch/storage.go +++ b/cmd/lotus-chainwatch/storage.go @@ -160,6 +160,26 @@ create table if not exists mpool_messages create unique index if not exists mpool_messages_msg_uindex on mpool_messages (msg); +create table if not exists receipts +( + msg text not null + constraint receipts_messages_cid_fk + references messages, + state text not null + constraint receipts_blocks_parentStateRoot_fk + references blocks (parentStateRoot), + idx int not null, + exit int not null, + gas_used int not null, + return blob, + constraint receipts_pk + primary key (msg, state) +); + +create index if not exists receipts_msg_state_index + on receipts (msg, state); + + create table if not exists miner_heads ( head text not null @@ -342,6 +362,34 @@ func (st *storage) storeMessages(msgs map[cid.Cid]*types.Message) error { return tx.Commit() } +func (st *storage) storeReceipts(recs map[mrec]*types.MessageReceipt) error { + tx, err := st.db.Begin() + if err != nil { + return err + } + + stmt, err := tx.Prepare(`insert into receipts (msg, state, idx, exit, gas_used, return) VALUES (?, ?, ?, ?, ?, ?) on conflict do nothing`) + if err != nil { + return err + } + defer stmt.Close() + + for c, m := range recs { + if _, err := stmt.Exec( + c.msg.String(), + c.state.String(), + c.idx, + m.ExitCode, + m.GasUsed.String(), + m.Return, + ); err != nil { + return err + } + } + + return tx.Commit() +} + func (st *storage) storeAddressMap(addrs map[address.Address]address.Address) error { tx, err := st.db.Begin() if err != nil { diff --git a/cmd/lotus-chainwatch/sync.go b/cmd/lotus-chainwatch/sync.go index 5c87ea04d..f9dcf44e3 100644 --- a/cmd/lotus-chainwatch/sync.go +++ b/cmd/lotus-chainwatch/sync.go @@ -233,6 +233,15 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS return } + log.Infof("Getting parent receipts") + + receipts := fetchParentReceipts(ctx, api, toSync) + + if err := st.storeReceipts(receipts); err != nil { + log.Error(err) + return + } + log.Infof("Resolving addresses") for _, message := range msgs { @@ -290,3 +299,39 @@ func fetchMessages(ctx context.Context, api api.FullNode, toSync map[cid.Cid]*ty return messages, inclusions } + +type mrec struct { + msg cid.Cid + state cid.Cid + idx int +} + +func fetchParentReceipts(ctx context.Context, api api.FullNode, toSync map[cid.Cid]*types.BlockHeader) map[mrec]*types.MessageReceipt { + var lk sync.Mutex + out := map[mrec]*types.MessageReceipt{} + + par(50, maparr(toSync), func(header *types.BlockHeader) { + recs, err := api.ChainGetParentReceipts(ctx, header.Cid()) + if err != nil { + log.Error(err) + return + } + msgs, err := api.ChainGetParentMessages(ctx, header.Cid()) + if err != nil { + log.Error(err) + return + } + + lk.Lock() + for i, r := range recs { + out[mrec{ + msg: msgs[i].Cid, + state: header.ParentStateRoot, + idx: i, + }] = r + } + lk.Unlock() + }) + + return out +} From 550b5de5981c87e6106f04039bb93c8e67e7995b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 3 Dec 2019 13:20:34 +0100 Subject: [PATCH 06/31] Fix bootstrap addrs --- build/bootstrap/bootstrappers.pi | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/build/bootstrap/bootstrappers.pi b/build/bootstrap/bootstrappers.pi index 124dcce60..98e1010e6 100644 --- a/build/bootstrap/bootstrappers.pi +++ b/build/bootstrap/bootstrappers.pi @@ -1,3 +1,2 @@ /ip4/147.75.80.17/tcp/1347/p2p/12D3KooWPWCCqUN3gPEaFAMpAwfh5a6SryBEsFt5R2oK8oW86a4C -/ip6/2604:1380:2000:f400::1/tcp/36137/p2p/12D3KooWNL1fJPBArhsoqwg2wbXgCDTByMyg4ZGp6HjgWr9bgnaJ -/ip4/147.75.80.29/tcp/44397/p2p/12D3KooWNL1fJPBArhsoqwg2wbXgCDTByMyg4ZGp6HjgWr9bgnaJ +/ip4/147.75.80.29/tcp/1347/p2p/12D3KooWNL1fJPBArhsoqwg2wbXgCDTByMyg4ZGp6HjgWr9bgnaJ From 9ad4a00cdacb83cba27ca1555da29a4778e46515 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Mon, 2 Dec 2019 22:41:28 -0800 Subject: [PATCH 07/31] fix miner message filter nonce checking --- api/api_full.go | 1 + api/struct.go | 5 +++++ miner/miner.go | 45 ++++++++++++++++++++++++++++++++--------- miner/miner_test.go | 4 ++-- node/impl/full/chain.go | 23 +++++++++++++++++++++ 5 files changed, 67 insertions(+), 11 deletions(-) diff --git a/api/api_full.go b/api/api_full.go index 2d5ed580c..726ae0175 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -29,6 +29,7 @@ type FullNode interface { ChainGetBlockMessages(context.Context, cid.Cid) (*BlockMessages, error) ChainGetParentReceipts(context.Context, cid.Cid) ([]*types.MessageReceipt, error) ChainGetParentMessages(context.Context, cid.Cid) ([]Message, error) + ChainGetTipSetMessages(context.Context, types.TipSetKey) ([]Message, error) ChainGetTipSetByHeight(context.Context, uint64, *types.TipSet) (*types.TipSet, error) ChainReadObj(context.Context, cid.Cid) ([]byte, error) ChainSetHead(context.Context, *types.TipSet) error diff --git a/api/struct.go b/api/struct.go index f47836f6a..3d797811b 100644 --- a/api/struct.go +++ b/api/struct.go @@ -45,6 +45,7 @@ type FullNodeStruct struct { ChainGetBlockMessages func(context.Context, cid.Cid) (*BlockMessages, error) `perm:"read"` ChainGetParentReceipts func(context.Context, cid.Cid) ([]*types.MessageReceipt, error) `perm:"read"` ChainGetParentMessages func(context.Context, cid.Cid) ([]Message, error) `perm:"read"` + ChainGetTipSetMessages func(context.Context, types.TipSetKey) ([]Message, error) `perm:"read"` ChainGetTipSetByHeight func(context.Context, uint64, *types.TipSet) (*types.TipSet, error) `perm:"read"` ChainReadObj func(context.Context, cid.Cid) ([]byte, error) `perm:"read"` ChainSetHead func(context.Context, *types.TipSet) error `perm:"admin"` @@ -310,6 +311,10 @@ func (c *FullNodeStruct) ChainGetParentMessages(ctx context.Context, b cid.Cid) return c.Internal.ChainGetParentMessages(ctx, b) } +func (c *FullNodeStruct) ChainGetTipSetMessages(ctx context.Context, tsk types.TipSetKey) ([]Message, error) { + return c.Internal.ChainGetTipSetMessages(ctx, tsk) +} + func (c *FullNodeStruct) ChainNotify(ctx context.Context) (<-chan []*store.HeadChange, error) { return c.Internal.ChainNotify(ctx) } diff --git a/miner/miner.go b/miner/miner.go index 9bb166752..90d2c1c95 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -334,14 +334,40 @@ func (m *Miner) computeTicket(ctx context.Context, addr address.Address, base *M }, nil } -func (m *Miner) createBlock(base *MiningBase, addr address.Address, ticket *types.Ticket, proof *types.EPostProof) (*types.BlockMsg, error) { +func (m *Miner) actorLookup(ctx context.Context, addr address.Address, ts *types.TipSet) (uint64, *types.BigInt, error) { + // TODO: strong opportunities for some caching in this method + act, err := m.api.StateGetActor(ctx, addr, ts) + if err != nil { + return 0, nil, xerrors.Errorf("looking up actor failed: %w", err) + } + msgs, err := m.api.ChainGetTipSetMessages(ctx, ts.Key()) + if err != nil { + return 0, nil, xerrors.Errorf("failed to get tipset messages: %w", err) + } + + balance := act.Balance + curnonce := act.Nonce + for _, m := range msgs { + if m.Message.From == addr { + if m.Message.Nonce != curnonce { + return 0, nil, xerrors.Errorf("tipset messages had bad nonce: %s had nonce %d, expected %d", m.Cid, m.Message.Nonce, curnonce) + } + curnonce++ + balance = types.BigSub(balance, m.Message.RequiredFunds()) + } + } + + return curnonce, &balance, nil +} + +func (m *Miner) createBlock(base *MiningBase, addr address.Address, ticket *types.Ticket, proof *types.EPostProof) (*types.BlockMsg, error) { pending, err := m.api.MpoolPending(context.TODO(), base.ts) if err != nil { return nil, xerrors.Errorf("failed to get pending messages: %w", err) } - msgs, err := selectMessages(context.TODO(), m.api.StateGetActor, base, pending) + msgs, err := selectMessages(context.TODO(), m.actorLookup, base, pending) if err != nil { return nil, xerrors.Errorf("message filtering failed: %w", err) } @@ -354,7 +380,7 @@ func (m *Miner) createBlock(base *MiningBase, addr address.Address, ticket *type return m.api.MinerCreateBlock(context.TODO(), addr, base.ts, ticket, proof, msgs, nheight, uint64(uts)) } -type actorLookup func(context.Context, address.Address, *types.TipSet) (*types.Actor, error) +type actorLookup func(context.Context, address.Address, *types.TipSet) (uint64, *types.BigInt, error) func selectMessages(ctx context.Context, al actorLookup, base *MiningBase, msgs []*types.SignedMessage) ([]*types.SignedMessage, error) { out := make([]*types.SignedMessage, 0, len(msgs)) @@ -367,14 +393,15 @@ func selectMessages(ctx context.Context, al actorLookup, base *MiningBase, msgs } from := msg.Message.From - act, err := al(ctx, from, base.ts) - if err != nil { - return nil, xerrors.Errorf("failed to check message sender balance: %w", err) - } if _, ok := inclNonces[from]; !ok { - inclNonces[from] = act.Nonce - inclBalances[from] = act.Balance + nonce, balance, err := al(ctx, from, base.ts) + if err != nil { + return nil, xerrors.Errorf("failed to check message sender balance: %w", err) + } + + inclNonces[from] = nonce + inclBalances[from] = *balance } if inclBalances[from].LessThan(msg.Message.RequiredFunds()) { diff --git a/miner/miner_test.go b/miner/miner_test.go index 160433d72..47e9e6d4a 100644 --- a/miner/miner_test.go +++ b/miner/miner_test.go @@ -33,8 +33,8 @@ func TestMessageFiltering(t *testing.T) { }, } - af := func(ctx context.Context, addr address.Address, ts *types.TipSet) (*types.Actor, error) { - return actors[addr], nil + af := func(ctx context.Context, addr address.Address, ts *types.TipSet) (uint64, *types.BigInt, error) { + return actors[addr].Nonce, &actors[addr].Balance, nil } msgs := []types.Message{ diff --git a/node/impl/full/chain.go b/node/impl/full/chain.go index 1e39be598..435e6b93e 100644 --- a/node/impl/full/chain.go +++ b/node/impl/full/chain.go @@ -68,6 +68,7 @@ func (a *ChainAPI) ChainGetBlockMessages(ctx context.Context, msg cid.Cid) (*api }, nil } +// TODO: Maybe deprecate in favor of just using ChainGetTipSetMessages? func (a *ChainAPI) ChainGetParentMessages(ctx context.Context, bcid cid.Cid) ([]api.Message, error) { b, err := a.Chain.GetBlock(bcid) if err != nil { @@ -101,6 +102,28 @@ func (a *ChainAPI) ChainGetParentMessages(ctx context.Context, bcid cid.Cid) ([] return out, nil } +func (a *ChainAPI) ChainGetTipSetMessages(ctx context.Context, tsk types.TipSetKey) ([]api.Message, error) { + ts, err := a.Chain.LoadTipSet(tsk.Cids()) + if err != nil { + return nil, xerrors.Errorf("failed to load tipset from key: %w", err) + } + + messages, err := a.Chain.MessagesForTipset(ts) + if err != nil { + return nil, xerrors.Errorf("getting messages for tipset: %w", err) + } + + var out []api.Message + for _, m := range messages { + out = append(out, api.Message{ + Cid: m.Cid(), + Message: m.VMMessage(), + }) + } + + return out, nil +} + func (a *ChainAPI) ChainGetParentReceipts(ctx context.Context, bcid cid.Cid) ([]*types.MessageReceipt, error) { b, err := a.Chain.GetBlock(bcid) if err != nil { From 5810922441d6e11c048cf10be29a187266545cd0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 3 Dec 2019 17:39:47 +0100 Subject: [PATCH 08/31] This helps for some reason --- chain/stmgr/stmgr.go | 1 + miner/miner.go | 19 +++++++++++++++---- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/chain/stmgr/stmgr.go b/chain/stmgr/stmgr.go index c65dfe596..2fb73785a 100644 --- a/chain/stmgr/stmgr.go +++ b/chain/stmgr/stmgr.go @@ -195,6 +195,7 @@ func (sm *StateManager) computeTipSetState(ctx context.Context, blks []*types.Bl } if applied[m.From] != m.Nonce { + log.Infof("skipping message from %s: nonce check failed: exp %d, was %d", m.From, applied[m.From], m.Nonce) continue } applied[m.From]++ diff --git a/miner/miner.go b/miner/miner.go index 90d2c1c95..03f0f98c8 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -2,6 +2,7 @@ package miner import ( "context" + "sort" "sync" "time" @@ -348,16 +349,26 @@ func (m *Miner) actorLookup(ctx context.Context, addr address.Address, ts *types balance := act.Balance curnonce := act.Nonce + + sort.Slice(msgs, func(i, j int) bool { // TODO: is this actually needed? + return msgs[i].Message.Nonce < msgs[j].Message.Nonce + }) + + max := int64(-2) + for _, m := range msgs { if m.Message.From == addr { - if m.Message.Nonce != curnonce { - return 0, nil, xerrors.Errorf("tipset messages had bad nonce: %s had nonce %d, expected %d", m.Cid, m.Message.Nonce, curnonce) - } - curnonce++ + max = int64(m.Message.Nonce) balance = types.BigSub(balance, m.Message.RequiredFunds()) } } + max++ // next unapplied nonce + + if max != -1 && uint64(max) != curnonce { + return 0, nil, xerrors.Errorf("tipset messages from %s have too low nonce %d, expected %d, h: %d", addr, max, curnonce, ts.Height()) + } + return curnonce, &balance, nil } From f43ff07ab31ec058996e7adaa1d6f799c5b209ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 3 Dec 2019 18:38:37 +0100 Subject: [PATCH 09/31] events: Allow revert on empty cache --- chain/events/tscache.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chain/events/tscache.go b/chain/events/tscache.go index 1bccb4d65..cf1b8e580 100644 --- a/chain/events/tscache.go +++ b/chain/events/tscache.go @@ -62,7 +62,7 @@ func (tsc *tipSetCache) add(ts *types.TipSet) error { func (tsc *tipSetCache) revert(ts *types.TipSet) error { if tsc.len == 0 { - return xerrors.New("tipSetCache.revert: nothing to revert; cache is empty") + return nil // this can happen, and it's fine } if !tsc.cache[tsc.start].Equals(ts) { From 6127d82c7bc424c3459e7423a5bc0f48ea815552 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 3 Dec 2019 18:41:31 +0100 Subject: [PATCH 10/31] events: allow get on empty ts cache --- chain/events/tscache.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/chain/events/tscache.go b/chain/events/tscache.go index cf1b8e580..2e58a3f5b 100644 --- a/chain/events/tscache.go +++ b/chain/events/tscache.go @@ -92,7 +92,8 @@ func (tsc *tipSetCache) getNonNull(height uint64) (*types.TipSet, error) { func (tsc *tipSetCache) get(height uint64) (*types.TipSet, error) { if tsc.len == 0 { - return nil, xerrors.New("tipSetCache.get: cache is empty") + log.Warnf("tipSetCache.get: cache is empty, requesting from storage (h=%d)", height) + return tsc.storage(context.TODO(), height, nil) } headH := tsc.cache[tsc.start].Height() From 9c6e9212a242692605f5c70397dc5e0f5faa01f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 3 Dec 2019 19:25:56 +0100 Subject: [PATCH 11/31] mining: get pending messages early --- chain/gen/gen.go | 34 +++++++++++++++++++++++++++------- miner/miner.go | 40 ++++++++++++++++++++++++++++++---------- 2 files changed, 57 insertions(+), 17 deletions(-) diff --git a/chain/gen/gen.go b/chain/gen/gen.go index c35ca5ce9..a823c1250 100644 --- a/chain/gen/gen.go +++ b/chain/gen/gen.go @@ -261,13 +261,17 @@ func (cg *ChainGen) nextBlockProof(ctx context.Context, pts *types.TipSet, m add VRFProof: vrfout, } - win, eproof, err := IsRoundWinner(ctx, pts, round, m, cg.eppProvs[m], &mca{w: cg.w, sm: cg.sm}) + win, eproofin, err := IsRoundWinner(ctx, pts, round, m, cg.eppProvs[m], &mca{w: cg.w, sm: cg.sm}) if err != nil { return nil, nil, xerrors.Errorf("checking round winner failed: %w", err) } if !win { return nil, tick, nil } + eproof, err := ComputeProof(ctx, cg.eppProvs[m], eproofin) + if err != nil { + return nil, nil, xerrors.Errorf("computing proof: %w", err) + } return eproof, tick, nil } @@ -466,7 +470,14 @@ func (epp *eppProvider) ComputeProof(ctx context.Context, _ sectorbuilder.Sorted return []byte("valid proof"), nil } -func IsRoundWinner(ctx context.Context, ts *types.TipSet, round int64, miner address.Address, epp ElectionPoStProver, a MiningCheckAPI) (bool, *types.EPostProof, error) { +type ProofInput struct { + sectors sectorbuilder.SortedPublicSectorInfo + hvrf []byte + winners []sectorbuilder.EPostCandidate + vrfout []byte +} + +func IsRoundWinner(ctx context.Context, ts *types.TipSet, round int64, miner address.Address, epp ElectionPoStProver, a MiningCheckAPI) (bool, *ProofInput, error) { r, err := a.ChainGetRandomness(ctx, ts.Key(), round-build.EcRandomnessLookback) if err != nil { return false, nil, xerrors.Errorf("chain get randomness: %w", err) @@ -529,16 +540,25 @@ func IsRoundWinner(ctx context.Context, ts *types.TipSet, round int64, miner add return false, nil, nil } - proof, err := epp.ComputeProof(ctx, sectors, hvrf[:], winners) + return true, &ProofInput{ + sectors: sectors, + hvrf: hvrf[:], + winners: winners, + vrfout: vrfout, + }, nil +} + +func ComputeProof(ctx context.Context, epp ElectionPoStProver, pi *ProofInput) (*types.EPostProof, error) { + proof, err := epp.ComputeProof(ctx, pi.sectors, pi.hvrf, pi.winners) if err != nil { - return false, nil, xerrors.Errorf("failed to compute snark for election proof: %w", err) + return nil, xerrors.Errorf("failed to compute snark for election proof: %w", err) } ept := types.EPostProof{ Proof: proof, - PostRand: vrfout, + PostRand: pi.vrfout, } - for _, win := range winners { + for _, win := range pi.winners { ept.Candidates = append(ept.Candidates, types.EPostTicket{ Partial: win.PartialTicket[:], SectorID: win.SectorID, @@ -546,7 +566,7 @@ func IsRoundWinner(ctx context.Context, ts *types.TipSet, round int64, miner add }) } - return true, &ept, nil + return &ept, nil } type SignFunc func(context.Context, address.Address, []byte) (*types.Signature, error) diff --git a/miner/miner.go b/miner/miner.go index 03f0f98c8..461cf7c86 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -265,7 +265,7 @@ func (m *Miner) mineOne(ctx context.Context, addr address.Address, base *MiningB return nil, xerrors.Errorf("scratching ticket failed: %w", err) } - win, proof, err := gen.IsRoundWinner(ctx, base.ts, int64(base.ts.Height()+base.nullRounds+1), addr, m.epp, m.api) + win, proofin, err := gen.IsRoundWinner(ctx, base.ts, int64(base.ts.Height()+base.nullRounds+1), addr, m.epp, m.api) if err != nil { return nil, xerrors.Errorf("failed to check if we win next round: %w", err) } @@ -275,7 +275,18 @@ func (m *Miner) mineOne(ctx context.Context, addr address.Address, base *MiningB return nil, nil } - b, err := m.createBlock(base, addr, ticket, proof) + // get pending messages early, + pending, err := m.api.MpoolPending(context.TODO(), base.ts) + if err != nil { + return nil, xerrors.Errorf("failed to get pending messages: %w", err) + } + + proof, err := gen.ComputeProof(ctx, m.epp, proofin) + if err != nil { + return nil, xerrors.Errorf("computing election proof: %w", err) + } + + b, err := m.createBlock(base, addr, ticket, proof, pending) if err != nil { return nil, xerrors.Errorf("failed to create block: %w", err) } @@ -372,12 +383,7 @@ func (m *Miner) actorLookup(ctx context.Context, addr address.Address, ts *types return curnonce, &balance, nil } -func (m *Miner) createBlock(base *MiningBase, addr address.Address, ticket *types.Ticket, proof *types.EPostProof) (*types.BlockMsg, error) { - pending, err := m.api.MpoolPending(context.TODO(), base.ts) - if err != nil { - return nil, xerrors.Errorf("failed to get pending messages: %w", err) - } - +func (m *Miner) createBlock(base *MiningBase, addr address.Address, ticket *types.Ticket, proof *types.EPostProof, pending []*types.SignedMessage) (*types.BlockMsg, error) { msgs, err := selectMessages(context.TODO(), m.actorLookup, base, pending) if err != nil { return nil, xerrors.Errorf("message filtering failed: %w", err) @@ -393,10 +399,24 @@ func (m *Miner) createBlock(base *MiningBase, addr address.Address, ticket *type type actorLookup func(context.Context, address.Address, *types.TipSet) (uint64, *types.BigInt, error) +func countFrom(msgs []*types.SignedMessage, from address.Address) (out int) { + for _, msg := range msgs { + if msg.Message.From == from { + out++ + } + } + return out +} + func selectMessages(ctx context.Context, al actorLookup, base *MiningBase, msgs []*types.SignedMessage) ([]*types.SignedMessage, error) { out := make([]*types.SignedMessage, 0, len(msgs)) inclNonces := make(map[address.Address]uint64) inclBalances := make(map[address.Address]types.BigInt) + + sort.Slice(msgs, func(i, j int) bool { // TODO: is this actually needed? + return msgs[i].Message.Nonce < msgs[j].Message.Nonce + }) + for _, msg := range msgs { if msg.Message.To == address.Undef { log.Warnf("message in mempool had bad 'To' address") @@ -421,12 +441,12 @@ func selectMessages(ctx context.Context, al actorLookup, base *MiningBase, msgs } if msg.Message.Nonce > inclNonces[from] { - log.Warnf("message in mempool has too high of a nonce (%d > %d) %s", msg.Message.Nonce, inclNonces[from], msg.Cid()) + log.Warnf("message in mempool has too high of a nonce (%d > %d) %s (%d pending for orig)", msg.Message.Nonce, inclNonces[from], msg.Cid(), countFrom(msgs, from)) continue } if msg.Message.Nonce < inclNonces[from] { - log.Warnf("message in mempool has already used nonce (%d < %d), from %s, to %s, %s", msg.Message.Nonce, inclNonces[from], msg.Message.From, msg.Message.To, msg.Cid()) + log.Warnf("message in mempool has already used nonce (%d < %d), from %s, to %s, %s (%d pending for)", msg.Message.Nonce, inclNonces[from], msg.Message.From, msg.Message.To, msg.Cid(), countFrom(msgs, from)) continue } From 96c04fc0a60be9f0072dfe27902cc02bf8d5a256 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 3 Dec 2019 20:33:29 +0100 Subject: [PATCH 12/31] mpool: Make MpoolPending more atomic --- chain/gen/gen.go | 4 +- chain/messagepool/messagepool.go | 111 ++++++++++++++++++------------- node/impl/full/mpool.go | 58 +++++++++++++++- 3 files changed, 122 insertions(+), 51 deletions(-) diff --git a/chain/gen/gen.go b/chain/gen/gen.go index a823c1250..db97dae32 100644 --- a/chain/gen/gen.go +++ b/chain/gen/gen.go @@ -472,9 +472,9 @@ func (epp *eppProvider) ComputeProof(ctx context.Context, _ sectorbuilder.Sorted type ProofInput struct { sectors sectorbuilder.SortedPublicSectorInfo - hvrf []byte + hvrf []byte winners []sectorbuilder.EPostCandidate - vrfout []byte + vrfout []byte } func IsRoundWinner(ctx context.Context, ts *types.TipSet, round int64, miner address.Address, epp ElectionPoStProver, a MiningCheckAPI) (bool, *ProofInput, error) { diff --git a/chain/messagepool/messagepool.go b/chain/messagepool/messagepool.go index 38b7e1fc4..38456a143 100644 --- a/chain/messagepool/messagepool.go +++ b/chain/messagepool/messagepool.go @@ -62,7 +62,7 @@ type MessagePool struct { pending map[address.Address]*msgSet pendingCount int - curTsLk sync.Mutex + curTsLk sync.Mutex // DO NOT LOCK INSIDE lk curTs *types.TipSet api Provider @@ -106,7 +106,7 @@ func (ms *msgSet) add(m *types.SignedMessage) error { } type Provider interface { - SubscribeHeadChanges(func(rev, app []*types.TipSet) error) + SubscribeHeadChanges(func(rev, app []*types.TipSet) error) *types.TipSet PutMessage(m store.ChainMsg) (cid.Cid, error) PubSubPublish(string, []byte) error StateGetActor(address.Address, *types.TipSet) (*types.Actor, error) @@ -124,8 +124,9 @@ func NewProvider(sm *stmgr.StateManager, ps *pubsub.PubSub) Provider { return &mpoolProvider{sm, ps} } -func (mpp *mpoolProvider) SubscribeHeadChanges(cb func(rev, app []*types.TipSet) error) { +func (mpp *mpoolProvider) SubscribeHeadChanges(cb func(rev, app []*types.TipSet) error) *types.TipSet { mpp.sm.ChainStore().SubscribeHeadChanges(cb) + return mpp.sm.ChainStore().GetHeaviestTipSet() } func (mpp *mpoolProvider) PutMessage(m store.ChainMsg) (cid.Cid, error) { @@ -173,7 +174,7 @@ func New(api Provider, ds dtypes.MetadataDS) (*MessagePool, error) { go mp.repubLocal() - api.SubscribeHeadChanges(func(rev, app []*types.TipSet) error { + mp.curTs = api.SubscribeHeadChanges(func(rev, app []*types.TipSet) error { err := mp.HeadChange(rev, app) if err != nil { log.Errorf("mpool head notif handler error: %+v", err) @@ -257,6 +258,12 @@ func (mp *MessagePool) Push(m *types.SignedMessage) error { } func (mp *MessagePool) Add(m *types.SignedMessage) error { + mp.curTsLk.Lock() + defer mp.curTsLk.Unlock() + return mp.addTs(m, mp.curTs) +} + +func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet) error { // big messages are bad, anti DOS if m.Size() > 32*1024 { return xerrors.Errorf("mpool message too large (%dB): %w", m.Size(), ErrMessageTooBig) @@ -275,7 +282,7 @@ func (mp *MessagePool) Add(m *types.SignedMessage) error { return err } - snonce, err := mp.getStateNonce(m.Message.From) + snonce, err := mp.getStateNonce(m.Message.From, curTs) if err != nil { return xerrors.Errorf("failed to look up actor state nonce: %w", err) } @@ -333,14 +340,17 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage) error { } func (mp *MessagePool) GetNonce(addr address.Address) (uint64, error) { + mp.curTsLk.Lock() + defer mp.curTsLk.Lock() + mp.lk.Lock() defer mp.lk.Unlock() - return mp.getNonceLocked(addr) + return mp.getNonceLocked(addr, mp.curTs) } -func (mp *MessagePool) getNonceLocked(addr address.Address) (uint64, error) { - stateNonce, err := mp.getStateNonce(addr) // sanity check +func (mp *MessagePool) getNonceLocked(addr address.Address, curTs *types.TipSet) (uint64, error) { + stateNonce, err := mp.getStateNonce(addr, curTs) // sanity check if err != nil { return 0, err } @@ -359,22 +369,9 @@ func (mp *MessagePool) getNonceLocked(addr address.Address) (uint64, error) { return stateNonce, nil } -func (mp *MessagePool) setCurTipset(ts *types.TipSet) { - mp.curTsLk.Lock() - defer mp.curTsLk.Unlock() - mp.curTs = ts -} - -func (mp *MessagePool) getCurTipset() *types.TipSet { - mp.curTsLk.Lock() - defer mp.curTsLk.Unlock() - return mp.curTs -} - -func (mp *MessagePool) getStateNonce(addr address.Address) (uint64, error) { +func (mp *MessagePool) getStateNonce(addr address.Address, curTs *types.TipSet) (uint64, error) { // TODO: this method probably should be cached - curTs := mp.getCurTipset() act, err := mp.api.StateGetActor(addr, curTs) if err != nil { return 0, err @@ -417,13 +414,16 @@ func (mp *MessagePool) getStateBalance(addr address.Address) (types.BigInt, erro } func (mp *MessagePool) PushWithNonce(addr address.Address, cb func(uint64) (*types.SignedMessage, error)) (*types.SignedMessage, error) { + mp.curTsLk.Lock() + defer mp.curTsLk.Lock() + mp.lk.Lock() defer mp.lk.Unlock() if addr.Protocol() == address.ID { log.Warnf("Called pushWithNonce with ID address (%s) this might not be handled properly yet", addr) } - nonce, err := mp.getNonceLocked(addr) + nonce, err := mp.getNonceLocked(addr, mp.curTs) if err != nil { return nil, err } @@ -485,15 +485,19 @@ func (mp *MessagePool) Remove(from address.Address, nonce uint64) { } } -func (mp *MessagePool) Pending() []*types.SignedMessage { +func (mp *MessagePool) Pending() ([]*types.SignedMessage, *types.TipSet) { + mp.curTsLk.Lock() + defer mp.curTsLk.Unlock() + mp.lk.Lock() defer mp.lk.Unlock() + out := make([]*types.SignedMessage, 0) for a := range mp.pending { out = append(out, mp.pendingFor(a)...) } - return out + return out, mp.curTs } func (mp *MessagePool) pendingFor(a address.Address) []*types.SignedMessage { @@ -516,6 +520,8 @@ func (mp *MessagePool) pendingFor(a address.Address) []*types.SignedMessage { } func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) error { + mp.curTsLk.Lock() + defer mp.curTsLk.Unlock() for _, ts := range revert { pts, err := mp.api.LoadTipSet(ts.Parents()) @@ -523,27 +529,14 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) return err } - mp.setCurTipset(pts) - for _, b := range ts.Blocks() { - bmsgs, smsgs, err := mp.api.MessagesForBlock(b) - if err != nil { - return xerrors.Errorf("failed to get messages for revert block %s(height %d): %w", b.Cid(), b.Height, err) - } - for _, msg := range smsgs { - if err := mp.Add(msg); err != nil { - log.Error(err) // TODO: probably lots of spam in multi-block tsets - } - } + msgs, err := mp.MessagesForBlocks(ts.Blocks()) + if err != nil { + return err + } - for _, msg := range bmsgs { - smsg := mp.RecoverSig(msg) - if smsg != nil { - if err := mp.Add(smsg); err != nil { - log.Error(err) // TODO: probably lots of spam in multi-block tsets - } - } else { - log.Warnf("could not recover signature for bls message %s during a reorg revert", msg.Cid()) - } + for _, msg := range msgs { + if err := mp.addTs(msg, pts); err != nil { + log.Error(err) // TODO: probably lots of spam in multi-block tsets } } } @@ -562,12 +555,38 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) mp.Remove(msg.From, msg.Nonce) } } - mp.setCurTipset(ts) + + mp.curTs = ts } return nil } +func (mp *MessagePool) MessagesForBlocks(blks []*types.BlockHeader) ([]*types.SignedMessage, error) { + out := make([]*types.SignedMessage, 0) + + for _, b := range blks { + bmsgs, smsgs, err := mp.api.MessagesForBlock(b) + if err != nil { + return nil, xerrors.Errorf("failed to get messages for apply block %s(height %d) (msgroot = %s): %w", b.Cid(), b.Height, b.Messages, err) + } + for _, msg := range smsgs { + out = append(out, msg) + } + + for _, msg := range bmsgs { + smsg := mp.RecoverSig(msg) + if smsg != nil { + out = append(out, smsg) + } else { + log.Warnf("could not recover signature for bls message %s", msg.Cid()) + } + } + } + + return out, nil +} + func (mp *MessagePool) RecoverSig(msg *types.Message) *types.SignedMessage { val, ok := mp.blsSigCache.Get(msg.Cid()) if !ok { diff --git a/node/impl/full/mpool.go b/node/impl/full/mpool.go index 9ea34a7d1..249c62948 100644 --- a/node/impl/full/mpool.go +++ b/node/impl/full/mpool.go @@ -3,12 +3,14 @@ package full import ( "context" + "github.com/ipfs/go-cid" "go.uber.org/fx" "golang.org/x/xerrors" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/address" "github.com/filecoin-project/lotus/chain/messagepool" + "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" ) @@ -17,13 +19,63 @@ type MpoolAPI struct { WalletAPI + Chain *store.ChainStore + Mpool *messagepool.MessagePool } func (a *MpoolAPI) MpoolPending(ctx context.Context, ts *types.TipSet) ([]*types.SignedMessage, error) { - // TODO: need to make sure we don't return messages that were already included in the referenced chain - // also need to accept ts == nil just fine, assume nil == chain.Head() - return a.Mpool.Pending(), nil + pending, mpts := a.Mpool.Pending() + + haveCids := map[cid.Cid]struct{}{} + for _, m := range pending { + haveCids[m.Cid()] = struct{}{} + } + + if mpts.Height() > ts.Height() { + return pending, nil + } + + for { + if mpts.Height() == ts.Height() { + if mpts.Equals(ts) { + return pending, nil + } + // different blocks in tipsets + + have, err := a.Mpool.MessagesForBlocks(ts.Blocks()) + if err != nil { + return nil, xerrors.Errorf("getting messages for base ts: %w", err) + } + + for _, m := range have { + haveCids[m.Cid()] = struct{}{} + } + } + + msgs, err := a.Mpool.MessagesForBlocks(ts.Blocks()) + if err != nil { + return nil, xerrors.Errorf(": %w", err) + } + + for _, m := range msgs { + if _, ok := haveCids[m.Cid()]; ok { + continue + } + + haveCids[m.Cid()] = struct{}{} + pending = append(pending, m) + } + + if mpts.Height() >= ts.Height() { + return pending, nil + } + + ts, err = a.Chain.LoadTipSet(ts.Parents()) + if err != nil { + return nil, xerrors.Errorf("loading parent tipset: %w", err) + } + } } func (a *MpoolAPI) MpoolPush(ctx context.Context, smsg *types.SignedMessage) error { From 2d28bcf0579429109992db8173763eba64a07b85 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 3 Dec 2019 20:34:31 +0100 Subject: [PATCH 13/31] Allow calling MpoolPending with nil TS --- node/impl/full/mpool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/impl/full/mpool.go b/node/impl/full/mpool.go index 249c62948..7d2dd5639 100644 --- a/node/impl/full/mpool.go +++ b/node/impl/full/mpool.go @@ -32,7 +32,7 @@ func (a *MpoolAPI) MpoolPending(ctx context.Context, ts *types.TipSet) ([]*types haveCids[m.Cid()] = struct{}{} } - if mpts.Height() > ts.Height() { + if ts == nil || mpts.Height() > ts.Height() { return pending, nil } From 0e4c59d6a738b5ee7533ac4be6d6a03d62cca8e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 3 Dec 2019 20:47:17 +0100 Subject: [PATCH 14/31] miner: Don't deduce balance twice per msg in actorLookup --- miner/miner.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/miner/miner.go b/miner/miner.go index 461cf7c86..6004e343b 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -358,7 +358,6 @@ func (m *Miner) actorLookup(ctx context.Context, addr address.Address, ts *types return 0, nil, xerrors.Errorf("failed to get tipset messages: %w", err) } - balance := act.Balance curnonce := act.Nonce sort.Slice(msgs, func(i, j int) bool { // TODO: is this actually needed? @@ -370,7 +369,6 @@ func (m *Miner) actorLookup(ctx context.Context, addr address.Address, ts *types for _, m := range msgs { if m.Message.From == addr { max = int64(m.Message.Nonce) - balance = types.BigSub(balance, m.Message.RequiredFunds()) } } @@ -380,7 +378,7 @@ func (m *Miner) actorLookup(ctx context.Context, addr address.Address, ts *types return 0, nil, xerrors.Errorf("tipset messages from %s have too low nonce %d, expected %d, h: %d", addr, max, curnonce, ts.Height()) } - return curnonce, &balance, nil + return curnonce, &act.Balance, nil } func (m *Miner) createBlock(base *MiningBase, addr address.Address, ticket *types.Ticket, proof *types.EPostProof, pending []*types.SignedMessage) (*types.BlockMsg, error) { From d0448287a9baa39893ed9f17a083ea55031c18b0 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Tue, 3 Dec 2019 12:05:54 -0800 Subject: [PATCH 15/31] remove unnecessary code --- api/api_full.go | 1 - api/struct.go | 5 ----- miner/miner.go | 45 +++++------------------------------------ miner/miner_test.go | 4 ++-- node/impl/full/chain.go | 23 --------------------- 5 files changed, 7 insertions(+), 71 deletions(-) diff --git a/api/api_full.go b/api/api_full.go index 726ae0175..2d5ed580c 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -29,7 +29,6 @@ type FullNode interface { ChainGetBlockMessages(context.Context, cid.Cid) (*BlockMessages, error) ChainGetParentReceipts(context.Context, cid.Cid) ([]*types.MessageReceipt, error) ChainGetParentMessages(context.Context, cid.Cid) ([]Message, error) - ChainGetTipSetMessages(context.Context, types.TipSetKey) ([]Message, error) ChainGetTipSetByHeight(context.Context, uint64, *types.TipSet) (*types.TipSet, error) ChainReadObj(context.Context, cid.Cid) ([]byte, error) ChainSetHead(context.Context, *types.TipSet) error diff --git a/api/struct.go b/api/struct.go index 3d797811b..f47836f6a 100644 --- a/api/struct.go +++ b/api/struct.go @@ -45,7 +45,6 @@ type FullNodeStruct struct { ChainGetBlockMessages func(context.Context, cid.Cid) (*BlockMessages, error) `perm:"read"` ChainGetParentReceipts func(context.Context, cid.Cid) ([]*types.MessageReceipt, error) `perm:"read"` ChainGetParentMessages func(context.Context, cid.Cid) ([]Message, error) `perm:"read"` - ChainGetTipSetMessages func(context.Context, types.TipSetKey) ([]Message, error) `perm:"read"` ChainGetTipSetByHeight func(context.Context, uint64, *types.TipSet) (*types.TipSet, error) `perm:"read"` ChainReadObj func(context.Context, cid.Cid) ([]byte, error) `perm:"read"` ChainSetHead func(context.Context, *types.TipSet) error `perm:"admin"` @@ -311,10 +310,6 @@ func (c *FullNodeStruct) ChainGetParentMessages(ctx context.Context, b cid.Cid) return c.Internal.ChainGetParentMessages(ctx, b) } -func (c *FullNodeStruct) ChainGetTipSetMessages(ctx context.Context, tsk types.TipSetKey) ([]Message, error) { - return c.Internal.ChainGetTipSetMessages(ctx, tsk) -} - func (c *FullNodeStruct) ChainNotify(ctx context.Context) (<-chan []*store.HeadChange, error) { return c.Internal.ChainNotify(ctx) } diff --git a/miner/miner.go b/miner/miner.go index 6004e343b..a5dc405c3 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -346,43 +346,8 @@ func (m *Miner) computeTicket(ctx context.Context, addr address.Address, base *M }, nil } -func (m *Miner) actorLookup(ctx context.Context, addr address.Address, ts *types.TipSet) (uint64, *types.BigInt, error) { - // TODO: strong opportunities for some caching in this method - act, err := m.api.StateGetActor(ctx, addr, ts) - if err != nil { - return 0, nil, xerrors.Errorf("looking up actor failed: %w", err) - } - - msgs, err := m.api.ChainGetTipSetMessages(ctx, ts.Key()) - if err != nil { - return 0, nil, xerrors.Errorf("failed to get tipset messages: %w", err) - } - - curnonce := act.Nonce - - sort.Slice(msgs, func(i, j int) bool { // TODO: is this actually needed? - return msgs[i].Message.Nonce < msgs[j].Message.Nonce - }) - - max := int64(-2) - - for _, m := range msgs { - if m.Message.From == addr { - max = int64(m.Message.Nonce) - } - } - - max++ // next unapplied nonce - - if max != -1 && uint64(max) != curnonce { - return 0, nil, xerrors.Errorf("tipset messages from %s have too low nonce %d, expected %d, h: %d", addr, max, curnonce, ts.Height()) - } - - return curnonce, &act.Balance, nil -} - func (m *Miner) createBlock(base *MiningBase, addr address.Address, ticket *types.Ticket, proof *types.EPostProof, pending []*types.SignedMessage) (*types.BlockMsg, error) { - msgs, err := selectMessages(context.TODO(), m.actorLookup, base, pending) + msgs, err := selectMessages(context.TODO(), m.api.StateGetActor, base, pending) if err != nil { return nil, xerrors.Errorf("message filtering failed: %w", err) } @@ -395,7 +360,7 @@ func (m *Miner) createBlock(base *MiningBase, addr address.Address, ticket *type return m.api.MinerCreateBlock(context.TODO(), addr, base.ts, ticket, proof, msgs, nheight, uint64(uts)) } -type actorLookup func(context.Context, address.Address, *types.TipSet) (uint64, *types.BigInt, error) +type actorLookup func(context.Context, address.Address, *types.TipSet) (*types.Actor, error) func countFrom(msgs []*types.SignedMessage, from address.Address) (out int) { for _, msg := range msgs { @@ -424,13 +389,13 @@ func selectMessages(ctx context.Context, al actorLookup, base *MiningBase, msgs from := msg.Message.From if _, ok := inclNonces[from]; !ok { - nonce, balance, err := al(ctx, from, base.ts) + act, err := al(ctx, from, base.ts) if err != nil { return nil, xerrors.Errorf("failed to check message sender balance: %w", err) } - inclNonces[from] = nonce - inclBalances[from] = *balance + inclNonces[from] = act.Nonce + inclBalances[from] = act.Balance } if inclBalances[from].LessThan(msg.Message.RequiredFunds()) { diff --git a/miner/miner_test.go b/miner/miner_test.go index 47e9e6d4a..160433d72 100644 --- a/miner/miner_test.go +++ b/miner/miner_test.go @@ -33,8 +33,8 @@ func TestMessageFiltering(t *testing.T) { }, } - af := func(ctx context.Context, addr address.Address, ts *types.TipSet) (uint64, *types.BigInt, error) { - return actors[addr].Nonce, &actors[addr].Balance, nil + af := func(ctx context.Context, addr address.Address, ts *types.TipSet) (*types.Actor, error) { + return actors[addr], nil } msgs := []types.Message{ diff --git a/node/impl/full/chain.go b/node/impl/full/chain.go index 435e6b93e..1e39be598 100644 --- a/node/impl/full/chain.go +++ b/node/impl/full/chain.go @@ -68,7 +68,6 @@ func (a *ChainAPI) ChainGetBlockMessages(ctx context.Context, msg cid.Cid) (*api }, nil } -// TODO: Maybe deprecate in favor of just using ChainGetTipSetMessages? func (a *ChainAPI) ChainGetParentMessages(ctx context.Context, bcid cid.Cid) ([]api.Message, error) { b, err := a.Chain.GetBlock(bcid) if err != nil { @@ -102,28 +101,6 @@ func (a *ChainAPI) ChainGetParentMessages(ctx context.Context, bcid cid.Cid) ([] return out, nil } -func (a *ChainAPI) ChainGetTipSetMessages(ctx context.Context, tsk types.TipSetKey) ([]api.Message, error) { - ts, err := a.Chain.LoadTipSet(tsk.Cids()) - if err != nil { - return nil, xerrors.Errorf("failed to load tipset from key: %w", err) - } - - messages, err := a.Chain.MessagesForTipset(ts) - if err != nil { - return nil, xerrors.Errorf("getting messages for tipset: %w", err) - } - - var out []api.Message - for _, m := range messages { - out = append(out, api.Message{ - Cid: m.Cid(), - Message: m.VMMessage(), - }) - } - - return out, nil -} - func (a *ChainAPI) ChainGetParentReceipts(ctx context.Context, bcid cid.Cid) ([]*types.MessageReceipt, error) { b, err := a.Chain.GetBlock(bcid) if err != nil { From 569bcce8780fb016814c88ed1266f4b04d3ef213 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Tue, 3 Dec 2019 12:23:56 -0800 Subject: [PATCH 16/31] sorting is not necessary --- miner/miner.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/miner/miner.go b/miner/miner.go index a5dc405c3..ad06acd01 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -2,7 +2,6 @@ package miner import ( "context" - "sort" "sync" "time" @@ -376,10 +375,6 @@ func selectMessages(ctx context.Context, al actorLookup, base *MiningBase, msgs inclNonces := make(map[address.Address]uint64) inclBalances := make(map[address.Address]types.BigInt) - sort.Slice(msgs, func(i, j int) bool { // TODO: is this actually needed? - return msgs[i].Message.Nonce < msgs[j].Message.Nonce - }) - for _, msg := range msgs { if msg.Message.To == address.Undef { log.Warnf("message in mempool had bad 'To' address") From d79f1c180d0da74f705354b6cd02c9e02562bf73 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 3 Dec 2019 22:09:39 +0100 Subject: [PATCH 17/31] mpool: Make tests pass --- chain/messagepool/messagepool.go | 6 ++++-- chain/messagepool/messagepool_test.go | 6 ++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/chain/messagepool/messagepool.go b/chain/messagepool/messagepool.go index 38456a143..369be86f6 100644 --- a/chain/messagepool/messagepool.go +++ b/chain/messagepool/messagepool.go @@ -341,7 +341,7 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage) error { func (mp *MessagePool) GetNonce(addr address.Address) (uint64, error) { mp.curTsLk.Lock() - defer mp.curTsLk.Lock() + defer mp.curTsLk.Unlock() mp.lk.Lock() defer mp.lk.Unlock() @@ -415,7 +415,7 @@ func (mp *MessagePool) getStateBalance(addr address.Address) (types.BigInt, erro func (mp *MessagePool) PushWithNonce(addr address.Address, cb func(uint64) (*types.SignedMessage, error)) (*types.SignedMessage, error) { mp.curTsLk.Lock() - defer mp.curTsLk.Lock() + defer mp.curTsLk.Unlock() mp.lk.Lock() defer mp.lk.Unlock() @@ -534,6 +534,8 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) return err } + mp.curTs = pts + for _, msg := range msgs { if err := mp.addTs(msg, pts); err != nil { log.Error(err) // TODO: probably lots of spam in multi-block tsets diff --git a/chain/messagepool/messagepool_test.go b/chain/messagepool/messagepool_test.go index 1cd8f8a75..8b18465b9 100644 --- a/chain/messagepool/messagepool_test.go +++ b/chain/messagepool/messagepool_test.go @@ -52,8 +52,9 @@ func (tma *testMpoolApi) setBlockMessages(h *types.BlockHeader, msgs ...*types.S tma.tipsets = append(tma.tipsets, mock.TipSet(h)) } -func (tma *testMpoolApi) SubscribeHeadChanges(cb func(rev, app []*types.TipSet) error) { +func (tma *testMpoolApi) SubscribeHeadChanges(cb func(rev, app []*types.TipSet) error) *types.TipSet { tma.cb = cb + return nil } func (tma *testMpoolApi) PutMessage(m store.ChainMsg) (cid.Cid, error) { @@ -216,7 +217,8 @@ func TestRevertMessages(t *testing.T) { assertNonce(t, mp, sender, 4) - if len(mp.Pending()) != 3 { + p, _ := mp.Pending() + if len(p) != 3 { t.Fatal("expected three messages in mempool") } From 9bb054a767f66464b046cac3e5b5c1e8fd035300 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 3 Dec 2019 22:27:07 +0100 Subject: [PATCH 18/31] slightly cleaner IsRoundWinner --- chain/gen/gen.go | 26 +++++++++++++------------- miner/miner.go | 4 ++-- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/chain/gen/gen.go b/chain/gen/gen.go index db97dae32..059db2be5 100644 --- a/chain/gen/gen.go +++ b/chain/gen/gen.go @@ -261,11 +261,11 @@ func (cg *ChainGen) nextBlockProof(ctx context.Context, pts *types.TipSet, m add VRFProof: vrfout, } - win, eproofin, err := IsRoundWinner(ctx, pts, round, m, cg.eppProvs[m], &mca{w: cg.w, sm: cg.sm}) + eproofin, err := IsRoundWinner(ctx, pts, round, m, cg.eppProvs[m], &mca{w: cg.w, sm: cg.sm}) if err != nil { return nil, nil, xerrors.Errorf("checking round winner failed: %w", err) } - if !win { + if eproofin == nil { return nil, tick, nil } eproof, err := ComputeProof(ctx, cg.eppProvs[m], eproofin) @@ -477,28 +477,28 @@ type ProofInput struct { vrfout []byte } -func IsRoundWinner(ctx context.Context, ts *types.TipSet, round int64, miner address.Address, epp ElectionPoStProver, a MiningCheckAPI) (bool, *ProofInput, error) { +func IsRoundWinner(ctx context.Context, ts *types.TipSet, round int64, miner address.Address, epp ElectionPoStProver, a MiningCheckAPI) (*ProofInput, error) { r, err := a.ChainGetRandomness(ctx, ts.Key(), round-build.EcRandomnessLookback) if err != nil { - return false, nil, xerrors.Errorf("chain get randomness: %w", err) + return nil, xerrors.Errorf("chain get randomness: %w", err) } mworker, err := a.StateMinerWorker(ctx, miner, ts) if err != nil { - return false, nil, xerrors.Errorf("failed to get miner worker: %w", err) + return nil, xerrors.Errorf("failed to get miner worker: %w", err) } vrfout, err := ComputeVRF(ctx, a.WalletSign, mworker, miner, DSepElectionPost, r) if err != nil { - return false, nil, xerrors.Errorf("failed to compute VRF: %w", err) + return nil, xerrors.Errorf("failed to compute VRF: %w", err) } pset, err := a.StateMinerProvingSet(ctx, miner, ts) if err != nil { - return false, nil, xerrors.Errorf("failed to load proving set for miner: %w", err) + return nil, xerrors.Errorf("failed to load proving set for miner: %w", err) } if len(pset) == 0 { - return false, nil, nil + return nil, nil } var sinfos []ffi.PublicSectorInfo @@ -515,17 +515,17 @@ func IsRoundWinner(ctx context.Context, ts *types.TipSet, round int64, miner add hvrf := sha256.Sum256(vrfout) candidates, err := epp.GenerateCandidates(ctx, sectors, hvrf[:]) if err != nil { - return false, nil, xerrors.Errorf("failed to generate electionPoSt candidates: %w", err) + return nil, xerrors.Errorf("failed to generate electionPoSt candidates: %w", err) } pow, err := a.StateMinerPower(ctx, miner, ts) if err != nil { - return false, nil, xerrors.Errorf("failed to check power: %w", err) + return nil, xerrors.Errorf("failed to check power: %w", err) } ssize, err := a.StateMinerSectorSize(ctx, miner, ts) if err != nil { - return false, nil, xerrors.Errorf("failed to look up miners sector size: %w", err) + return nil, xerrors.Errorf("failed to look up miners sector size: %w", err) } var winners []sectorbuilder.EPostCandidate @@ -537,10 +537,10 @@ func IsRoundWinner(ctx context.Context, ts *types.TipSet, round int64, miner add // no winners, sad if len(winners) == 0 { - return false, nil, nil + return nil, nil } - return true, &ProofInput{ + return &ProofInput{ sectors: sectors, hvrf: hvrf[:], winners: winners, diff --git a/miner/miner.go b/miner/miner.go index ad06acd01..b49c422d5 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -264,12 +264,12 @@ func (m *Miner) mineOne(ctx context.Context, addr address.Address, base *MiningB return nil, xerrors.Errorf("scratching ticket failed: %w", err) } - win, proofin, err := gen.IsRoundWinner(ctx, base.ts, int64(base.ts.Height()+base.nullRounds+1), addr, m.epp, m.api) + proofin, err := gen.IsRoundWinner(ctx, base.ts, int64(base.ts.Height()+base.nullRounds+1), addr, m.epp, m.api) if err != nil { return nil, xerrors.Errorf("failed to check if we win next round: %w", err) } - if !win { + if proofin == nil { base.nullRounds++ return nil, nil } From a65320c97065cfc6094982a9326ead8044f1ab68 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 3 Dec 2019 22:59:08 +0100 Subject: [PATCH 19/31] storageminer: Don't start fPoSt with every head change --- storage/fpost_sched.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/storage/fpost_sched.go b/storage/fpost_sched.go index 9480dc3c2..9dc6992fa 100644 --- a/storage/fpost_sched.go +++ b/storage/fpost_sched.go @@ -102,10 +102,12 @@ func (s *fpostScheduler) update(ctx context.Context, new *types.TipSet) error { return err } - if newEPS != s.activeEPS { - s.abortActivePoSt() + if newEPS == s.activeEPS { + return nil } + s.abortActivePoSt() + if newEPS != Inactive && start { s.doPost(ctx, newEPS, new) } @@ -124,7 +126,7 @@ func (s *fpostScheduler) abortActivePoSt() { log.Warnf("Aborting Fallback PoSt (EPS: %d)", s.activeEPS) - s.activeEPS = 0 + s.activeEPS = Inactive s.abort = nil } From a7738af169a46f3b2078571d84e1c49f40b1d0f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 3 Dec 2019 23:34:15 +0100 Subject: [PATCH 20/31] storageminer: exit fpostScheduler loop cleanly --- storage/fpost_sched.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/storage/fpost_sched.go b/storage/fpost_sched.go index 9480dc3c2..1b5c18b27 100644 --- a/storage/fpost_sched.go +++ b/storage/fpost_sched.go @@ -49,10 +49,17 @@ func (s *fpostScheduler) run(ctx context.Context) { panic(err) } + defer s.abortActivePoSt() + // not fine to panic after this point for { select { - case changes := <-notifs: + case changes, ok := <-notifs: + if !ok { + log.Warn("fpostScheduler notifs channel closed") + return + } + ctx, span := trace.StartSpan(ctx, "fpostScheduler.headChange") var lowest, highest *types.TipSet = s.cur, nil @@ -74,6 +81,8 @@ func (s *fpostScheduler) run(ctx context.Context) { } span.End() + case <-ctx.Done(): + return } } } From 21629d005bc653c7ade2ec9dbc963de1c4bc0731 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 3 Dec 2019 23:57:47 +0100 Subject: [PATCH 21/31] storageminer: separate sectorstore types --- storage/sector_types.go | 107 ++++++++++++++++++ storage/{sealing_utils.go => sector_utils.go} | 0 ...ing_utils_test.go => sector_utils_test.go} | 0 storage/{sealing.go => sectors.go} | 98 ---------------- 4 files changed, 107 insertions(+), 98 deletions(-) create mode 100644 storage/sector_types.go rename storage/{sealing_utils.go => sector_utils.go} (100%) rename storage/{sealing_utils_test.go => sector_utils_test.go} (100%) rename storage/{sealing.go => sectors.go} (72%) diff --git a/storage/sector_types.go b/storage/sector_types.go new file mode 100644 index 000000000..67b394732 --- /dev/null +++ b/storage/sector_types.go @@ -0,0 +1,107 @@ +package storage + +import ( + "context" + + "github.com/ipfs/go-cid" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/lib/sectorbuilder" +) + +type TicketFn func(context.Context) (*sectorbuilder.SealTicket, error) + +type SealTicket struct { + BlockHeight uint64 + TicketBytes []byte +} + +func (t *SealTicket) SB() sectorbuilder.SealTicket { + out := sectorbuilder.SealTicket{BlockHeight: t.BlockHeight} + copy(out.TicketBytes[:], t.TicketBytes) + return out +} + +type SealSeed struct { + BlockHeight uint64 + TicketBytes []byte +} + +func (t *SealSeed) SB() sectorbuilder.SealSeed { + out := sectorbuilder.SealSeed{BlockHeight: t.BlockHeight} + copy(out.TicketBytes[:], t.TicketBytes) + return out +} + +type Piece struct { + DealID uint64 + + Size uint64 + CommP []byte +} + +func (p *Piece) ppi() (out sectorbuilder.PublicPieceInfo) { + out.Size = p.Size + copy(out.CommP[:], p.CommP) + return out +} + +type SectorInfo struct { + State api.SectorState + SectorID uint64 + + // Packing + + Pieces []Piece + + // PreCommit + CommC []byte + CommD []byte + CommR []byte + CommRLast []byte + Proof []byte + Ticket SealTicket + + PreCommitMessage *cid.Cid + + // PreCommitted + Seed SealSeed + + // Committing + CommitMessage *cid.Cid +} + +func (t *SectorInfo) pieceInfos() []sectorbuilder.PublicPieceInfo { + out := make([]sectorbuilder.PublicPieceInfo, len(t.Pieces)) + for i, piece := range t.Pieces { + out[i] = piece.ppi() + } + return out +} + +func (t *SectorInfo) deals() []uint64 { + out := make([]uint64, len(t.Pieces)) + for i, piece := range t.Pieces { + out[i] = piece.DealID + } + return out +} + +func (t *SectorInfo) existingPieces() []uint64 { + out := make([]uint64, len(t.Pieces)) + for i, piece := range t.Pieces { + out[i] = piece.Size + } + return out +} + +func (t *SectorInfo) rspco() sectorbuilder.RawSealPreCommitOutput { + var out sectorbuilder.RawSealPreCommitOutput + + copy(out.CommC[:], t.CommC) + copy(out.CommD[:], t.CommD) + copy(out.CommR[:], t.CommR) + copy(out.CommRLast[:], t.CommRLast) + + return out +} diff --git a/storage/sealing_utils.go b/storage/sector_utils.go similarity index 100% rename from storage/sealing_utils.go rename to storage/sector_utils.go diff --git a/storage/sealing_utils_test.go b/storage/sector_utils_test.go similarity index 100% rename from storage/sealing_utils_test.go rename to storage/sector_utils_test.go diff --git a/storage/sealing.go b/storage/sectors.go similarity index 72% rename from storage/sealing.go rename to storage/sectors.go index 7628fabd8..583fdc71d 100644 --- a/storage/sealing.go +++ b/storage/sectors.go @@ -4,7 +4,6 @@ import ( "context" "io" - cid "github.com/ipfs/go-cid" xerrors "golang.org/x/xerrors" "github.com/filecoin-project/lotus/api" @@ -12,68 +11,6 @@ import ( "github.com/filecoin-project/lotus/lib/sectorbuilder" ) -type TicketFn func(context.Context) (*sectorbuilder.SealTicket, error) - -type SealTicket struct { - BlockHeight uint64 - TicketBytes []byte -} - -func (t *SealTicket) SB() sectorbuilder.SealTicket { - out := sectorbuilder.SealTicket{BlockHeight: t.BlockHeight} - copy(out.TicketBytes[:], t.TicketBytes) - return out -} - -type SealSeed struct { - BlockHeight uint64 - TicketBytes []byte -} - -func (t *SealSeed) SB() sectorbuilder.SealSeed { - out := sectorbuilder.SealSeed{BlockHeight: t.BlockHeight} - copy(out.TicketBytes[:], t.TicketBytes) - return out -} - -type Piece struct { - DealID uint64 - - Size uint64 - CommP []byte -} - -func (p *Piece) ppi() (out sectorbuilder.PublicPieceInfo) { - out.Size = p.Size - copy(out.CommP[:], p.CommP) - return out -} - -type SectorInfo struct { - State api.SectorState - SectorID uint64 - - // Packing - - Pieces []Piece - - // PreCommit - CommC []byte - CommD []byte - CommR []byte - CommRLast []byte - Proof []byte - Ticket SealTicket - - PreCommitMessage *cid.Cid - - // PreCommitted - Seed SealSeed - - // Committing - CommitMessage *cid.Cid -} - type sectorUpdate struct { newState api.SectorState id uint64 @@ -81,41 +18,6 @@ type sectorUpdate struct { mut func(*SectorInfo) } -func (t *SectorInfo) pieceInfos() []sectorbuilder.PublicPieceInfo { - out := make([]sectorbuilder.PublicPieceInfo, len(t.Pieces)) - for i, piece := range t.Pieces { - out[i] = piece.ppi() - } - return out -} - -func (t *SectorInfo) deals() []uint64 { - out := make([]uint64, len(t.Pieces)) - for i, piece := range t.Pieces { - out[i] = piece.DealID - } - return out -} - -func (t *SectorInfo) existingPieces() []uint64 { - out := make([]uint64, len(t.Pieces)) - for i, piece := range t.Pieces { - out[i] = piece.Size - } - return out -} - -func (t *SectorInfo) rspco() sectorbuilder.RawSealPreCommitOutput { - var out sectorbuilder.RawSealPreCommitOutput - - copy(out.CommC[:], t.CommC) - copy(out.CommD[:], t.CommD) - copy(out.CommR[:], t.CommR) - copy(out.CommRLast[:], t.CommRLast) - - return out -} - func (m *Miner) sectorStateLoop(ctx context.Context) error { trackedSectors, err := m.ListSectors() if err != nil { From 4206744d7b538f9c4e4be308abeaa0fbdc4c2165 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Tue, 3 Dec 2019 15:04:52 -0800 Subject: [PATCH 22/31] fix log message in chainstore --- chain/store/store.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chain/store/store.go b/chain/store/store.go index 9f20626e8..0c2f339b1 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -290,7 +290,7 @@ func (cs *ChainStore) takeHeaviestTipSet(ctx context.Context, ts *types.TipSet) span.AddAttributes(trace.BoolAttribute("newHead", true)) - log.Infof("New heaviest tipset! %s (height=%d)", ts.Cids()) + log.Infof("New heaviest tipset! %s (height=%d)", ts.Cids(), ts.Height()) cs.heaviest = ts if err := cs.writeHead(ts); err != nil { From dbcb839b6f57579bc4cfd163d43cba0bbee9f8d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 4 Dec 2019 00:42:22 +0100 Subject: [PATCH 23/31] Make sector state transitions more explicit --- api/api_storage.go | 34 +++++---------- storage/sector_states.go | 93 +++++++++++++++++++--------------------- storage/sector_types.go | 4 ++ storage/sectors.go | 89 ++++++++++++++++++++++++++++++-------- 4 files changed, 130 insertions(+), 90 deletions(-) diff --git a/api/api_storage.go b/api/api_storage.go index 880259dd6..0df3d8658 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -2,8 +2,6 @@ package api import ( "context" - "fmt" - "github.com/filecoin-project/lotus/chain/address" "github.com/filecoin-project/lotus/lib/sectorbuilder" ) @@ -23,29 +21,19 @@ const ( Committing Proving - SectorNoUpdate = UndefinedSectorState + FailedUnrecoverable ) -func SectorStateStr(s SectorState) string { - switch s { - case UndefinedSectorState: - return "UndefinedSectorState" - case Empty: - return "Empty" - case Packing: - return "Packing" - case Unsealed: - return "Unsealed" - case PreCommitting: - return "PreCommitting" - case PreCommitted: - return "PreCommitted" - case Committing: - return "Committing" - case Proving: - return "Proving" - } - return fmt.Sprintf("", s) +var SectorStates = []string{ + UndefinedSectorState: "UndefinedSectorState", + Empty: "Empty", + Packing: "Packing", + Unsealed: "Unsealed", + PreCommitting: "PreCommitting", + PreCommitted: "PreCommitted", + Committing: "Committing", + Proving: "Proving", + FailedUnrecoverable: "FailedUnrecoverable", } // StorageMiner is a low-level interface to the Filecoin network storage miner node diff --git a/storage/sector_states.go b/storage/sector_states.go index d57f6275d..dfee5f14e 100644 --- a/storage/sector_states.go +++ b/storage/sector_states.go @@ -12,29 +12,24 @@ import ( "github.com/filecoin-project/lotus/lib/sectorbuilder" ) -type providerHandlerFunc func(ctx context.Context, deal SectorInfo) (func(*SectorInfo), error) +type providerHandlerFunc func(ctx context.Context, deal SectorInfo) *sectorUpdate -func (m *Miner) handleSectorUpdate(ctx context.Context, sector SectorInfo, cb providerHandlerFunc, next api.SectorState) { +func (m *Miner) handleSectorUpdate(ctx context.Context, sector SectorInfo, cb providerHandlerFunc) { go func() { - mut, err := cb(ctx, sector) + update := cb(ctx, sector) - if err == nil && next == api.SectorNoUpdate { - return + if update == nil { + return // async } select { - case m.sectorUpdated <- sectorUpdate{ - newState: next, - id: sector.SectorID, - err: err, - mut: mut, - }: + case m.sectorUpdated <- *update: case <-m.stop: } }() } -func (m *Miner) finishPacking(ctx context.Context, sector SectorInfo) (func(*SectorInfo), error) { +func (m *Miner) handlePacking(ctx context.Context, sector SectorInfo) *sectorUpdate { log.Infow("performing filling up rest of the sector...", "sector", sector.SectorID) var allocated uint64 @@ -45,12 +40,12 @@ func (m *Miner) finishPacking(ctx context.Context, sector SectorInfo) (func(*Sec ubytes := sectorbuilder.UserBytesForSectorSize(m.sb.SectorSize()) if allocated > ubytes { - return nil, xerrors.Errorf("too much data in sector: %d > %d", allocated, ubytes) + return sector.upd().fatal(xerrors.Errorf("too much data in sector: %d > %d", allocated, ubytes)) } fillerSizes, err := fillersFromRem(ubytes - allocated) if err != nil { - return nil, err + return sector.upd().fatal(err) } if len(fillerSizes) > 0 { @@ -59,27 +54,27 @@ func (m *Miner) finishPacking(ctx context.Context, sector SectorInfo) (func(*Sec pieces, err := m.storeGarbage(ctx, sector.SectorID, sector.existingPieces(), fillerSizes...) if err != nil { - return nil, xerrors.Errorf("filling up the sector (%v): %w", fillerSizes, err) + return sector.upd().fatal(xerrors.Errorf("filling up the sector (%v): %w", fillerSizes, err)) } - return func(info *SectorInfo) { + return sector.upd().to(api.Unsealed).state(func(info *SectorInfo) { info.Pieces = append(info.Pieces, pieces...) - }, nil + }) } -func (m *Miner) sealPreCommit(ctx context.Context, sector SectorInfo) (func(*SectorInfo), error) { +func (m *Miner) handleUnsealed(ctx context.Context, sector SectorInfo) *sectorUpdate { log.Infow("performing sector replication...", "sector", sector.SectorID) ticket, err := m.tktFn(ctx) if err != nil { - return nil, err + return sector.upd().fatal(err) } rspco, err := m.sb.SealPreCommit(sector.SectorID, *ticket, sector.pieceInfos()) if err != nil { - return nil, xerrors.Errorf("seal pre commit failed: %w", err) + return sector.upd().fatal(xerrors.Errorf("seal pre commit failed: %w", err)) } - return func(info *SectorInfo) { + return sector.upd().to(api.PreCommitting).state(func(info *SectorInfo) { info.CommC = rspco.CommC[:] info.CommD = rspco.CommD[:] info.CommR = rspco.CommR[:] @@ -88,10 +83,11 @@ func (m *Miner) sealPreCommit(ctx context.Context, sector SectorInfo) (func(*Sec BlockHeight: ticket.BlockHeight, TicketBytes: ticket.TicketBytes[:], } - }, nil + }) + } -func (m *Miner) preCommit(ctx context.Context, sector SectorInfo) (func(*SectorInfo), error) { +func (m *Miner) handlePreCommitting(ctx context.Context, sector SectorInfo) *sectorUpdate { params := &actors.SectorPreCommitInfo{ SectorNumber: sector.SectorID, @@ -101,7 +97,7 @@ func (m *Miner) preCommit(ctx context.Context, sector SectorInfo) (func(*SectorI } enc, aerr := actors.SerializeParams(params) if aerr != nil { - return nil, xerrors.Errorf("could not serialize commit sector parameters: %w", aerr) + return sector.upd().fatal(xerrors.Errorf("could not serialize commit sector parameters: %w", aerr)) } msg := &types.Message{ @@ -117,26 +113,26 @@ func (m *Miner) preCommit(ctx context.Context, sector SectorInfo) (func(*SectorI log.Info("submitting precommit for sector: ", sector.SectorID) smsg, err := m.api.MpoolPushMessage(ctx, msg) if err != nil { - return nil, xerrors.Errorf("pushing message to mpool: %w", err) + return sector.upd().fatal(xerrors.Errorf("pushing message to mpool: %w", err)) } - return func(info *SectorInfo) { + return sector.upd().to(api.PreCommitted).state(func(info *SectorInfo) { mcid := smsg.Cid() info.PreCommitMessage = &mcid - }, nil + }) } -func (m *Miner) preCommitted(ctx context.Context, sector SectorInfo) (func(*SectorInfo), error) { +func (m *Miner) handlePreCommitted(ctx context.Context, sector SectorInfo) *sectorUpdate { // would be ideal to just use the events.Called handler, but it wouldnt be able to handle individual message timeouts log.Info("Sector precommitted: ", sector.SectorID) mw, err := m.api.StateWaitMsg(ctx, *sector.PreCommitMessage) if err != nil { - return nil, err + return sector.upd().fatal(err) } if mw.Receipt.ExitCode != 0 { log.Error("sector precommit failed: ", mw.Receipt.ExitCode) - return nil, err + return sector.upd().fatal(err) } log.Info("precommit message landed on chain: ", sector.SectorID) @@ -146,19 +142,18 @@ func (m *Miner) preCommitted(ctx context.Context, sector SectorInfo) (func(*Sect err = m.events.ChainAt(func(ctx context.Context, ts *types.TipSet, curH uint64) error { rand, err := m.api.ChainGetRandomness(ctx, ts.Key(), int64(randHeight)) if err != nil { - return xerrors.Errorf("failed to get randomness for computing seal proof: %w", err) + err = xerrors.Errorf("failed to get randomness for computing seal proof: %w", err) + + m.sectorUpdated <- *sector.upd().fatal(err) + return err } - m.sectorUpdated <- sectorUpdate{ - newState: api.Committing, - id: sector.SectorID, - mut: func(info *SectorInfo) { - info.Seed = SealSeed{ - BlockHeight: randHeight, - TicketBytes: rand, - } - }, - } + m.sectorUpdated <- *sector.upd().to(api.Committing).state(func(info *SectorInfo) { + info.Seed = SealSeed{ + BlockHeight: randHeight, + TicketBytes: rand, + } + }) return nil }, func(ctx context.Context, ts *types.TipSet) error { @@ -169,15 +164,15 @@ func (m *Miner) preCommitted(ctx context.Context, sector SectorInfo) (func(*Sect log.Warn("waitForPreCommitMessage ChainAt errored: ", err) } - return nil, nil + return nil } -func (m *Miner) committing(ctx context.Context, sector SectorInfo) (func(*SectorInfo), error) { +func (m *Miner) handleCommitting(ctx context.Context, sector SectorInfo) *sectorUpdate { log.Info("scheduling seal proof computation...") proof, err := m.sb.SealCommit(sector.SectorID, sector.Ticket.SB(), sector.Seed.SB(), sector.pieceInfos(), sector.rspco()) if err != nil { - return nil, xerrors.Errorf("computing seal proof failed: %w", err) + return sector.upd().fatal(xerrors.Errorf("computing seal proof failed: %w", err)) } // TODO: Consider splitting states and persist proof for faster recovery @@ -190,7 +185,7 @@ func (m *Miner) committing(ctx context.Context, sector SectorInfo) (func(*Sector enc, aerr := actors.SerializeParams(params) if aerr != nil { - return nil, xerrors.Errorf("could not serialize commit sector parameters: %w", aerr) + return sector.upd().fatal(xerrors.Errorf("could not serialize commit sector parameters: %w", aerr)) } msg := &types.Message{ @@ -212,17 +207,17 @@ func (m *Miner) committing(ctx context.Context, sector SectorInfo) (func(*Sector mw, err := m.api.StateWaitMsg(ctx, smsg.Cid()) if err != nil { - return nil, xerrors.Errorf("failed to wait for porep inclusion: %w", err) + return sector.upd().fatal(xerrors.Errorf("failed to wait for porep inclusion: %w", err)) } if mw.Receipt.ExitCode != 0 { log.Errorf("UNHANDLED: submitting sector proof failed (exit=%d, msg=%s) (t:%x; s:%x(%d); p:%x)", mw.Receipt.ExitCode, smsg.Cid(), sector.Ticket.TicketBytes, sector.Seed.TicketBytes, sector.Seed.BlockHeight, params.Proof) - return nil, xerrors.New("UNHANDLED: submitting sector proof failed") + return sector.upd().fatal(xerrors.New("UNHANDLED: submitting sector proof failed")) } - return func(info *SectorInfo) { + return sector.upd().to(api.Proving).state(func(info *SectorInfo) { mcid := smsg.Cid() info.CommitMessage = &mcid info.Proof = proof - }, nil + }) } diff --git a/storage/sector_types.go b/storage/sector_types.go index 67b394732..179a7bbc6 100644 --- a/storage/sector_types.go +++ b/storage/sector_types.go @@ -71,6 +71,10 @@ type SectorInfo struct { CommitMessage *cid.Cid } +func (t *SectorInfo) upd() *sectorUpdate { + return §orUpdate{id: t.SectorID} +} + func (t *SectorInfo) pieceInfos() []sectorbuilder.PublicPieceInfo { out := make([]sectorbuilder.PublicPieceInfo, len(t.Pieces)) for i, piece := range t.Pieces { diff --git a/storage/sectors.go b/storage/sectors.go index 583fdc71d..7a7312520 100644 --- a/storage/sectors.go +++ b/storage/sectors.go @@ -18,6 +18,33 @@ type sectorUpdate struct { mut func(*SectorInfo) } +func (u *sectorUpdate) fatal(err error) *sectorUpdate { + return §orUpdate{ + newState: api.FailedUnrecoverable, + id: u.id, + err: err, + mut: u.mut, + } +} + +func (u *sectorUpdate) state(m func(*SectorInfo)) *sectorUpdate { + return §orUpdate{ + newState: u.newState, + id: u.id, + err: u.err, + mut: m, + } +} + +func (u *sectorUpdate) to(newState api.SectorState) *sectorUpdate { + return §orUpdate{ + newState: newState, + id: u.id, + err: u.err, + mut: u.mut, + } +} + func (m *Miner) sectorStateLoop(ctx context.Context) error { trackedSectors, err := m.ListSectors() if err != nil { @@ -97,9 +124,7 @@ func (m *Miner) onSectorIncoming(sector *SectorInfo) { } if err := m.sectors.Begin(sector.SectorID, sector); err != nil { - // We may have re-sent the proposal - log.Errorf("deal tracking failed: %s", err) - m.failSector(sector.SectorID, err) + log.Errorf("sector tracking failed: %s", err) return } @@ -116,7 +141,7 @@ func (m *Miner) onSectorIncoming(sector *SectorInfo) { } func (m *Miner) onSectorUpdated(ctx context.Context, update sectorUpdate) { - log.Infof("Sector %d updated state to %s", update.id, api.SectorStateStr(update.newState)) + log.Infof("Sector %d updated state to %s", update.id, api.SectorStates[update.newState]) var sector SectorInfo err := m.sectors.Mutate(update.id, func(s *SectorInfo) error { s.State = update.newState @@ -127,39 +152,67 @@ func (m *Miner) onSectorUpdated(ctx context.Context, update sectorUpdate) { return nil }) if update.err != nil { - log.Errorf("sector %d failed: %s", update.id, update.err) - m.failSector(update.id, update.err) - return + log.Errorf("sector %d failed: %+v", update.id, update.err) } if err != nil { - m.failSector(update.id, err) + log.Errorf("sector %d error: %+v", update.id, err) return } + /* + + *<- Empty + | | + | v + *<- Packing <- incoming + | | + | v + *<- Unsealed + | | + | v + *<- PreCommitting + | | + | v + *<- PreCommitted + | | + | v + *<- Committing + | | + | v + *<- Proving + | + v + FailedUnrecoverable + + UndefinedSectorState <- ¯\_(ツ)_/¯ + | ^ + *---------------------/ + + */ + switch update.newState { case api.Packing: - m.handleSectorUpdate(ctx, sector, m.finishPacking, api.Unsealed) + m.handleSectorUpdate(ctx, sector, m.handlePacking) case api.Unsealed: - m.handleSectorUpdate(ctx, sector, m.sealPreCommit, api.PreCommitting) + m.handleSectorUpdate(ctx, sector, m.handleUnsealed) case api.PreCommitting: - m.handleSectorUpdate(ctx, sector, m.preCommit, api.PreCommitted) + m.handleSectorUpdate(ctx, sector, m.handlePreCommitting) case api.PreCommitted: - m.handleSectorUpdate(ctx, sector, m.preCommitted, api.SectorNoUpdate) + m.handleSectorUpdate(ctx, sector, m.handlePreCommitted) case api.Committing: - m.handleSectorUpdate(ctx, sector, m.committing, api.Proving) + m.handleSectorUpdate(ctx, sector, m.handleCommitting) case api.Proving: // TODO: track sector health / expiration log.Infof("Proving sector %d", update.id) - case api.SectorNoUpdate: // noop + case api.UndefinedSectorState: + log.Error("sector update with undefined state!") + case api.FailedUnrecoverable: + log.Errorf("sector %d failed unrecoverably", update.id) default: log.Errorf("unexpected sector update state: %d", update.newState) } } -func (m *Miner) failSector(id uint64, err error) { - log.Errorf("sector %d error: %+v", id, err) -} - func (m *Miner) AllocatePiece(size uint64) (sectorID uint64, offset uint64, err error) { if padreader.PaddedSize(size) != size { return 0, 0, xerrors.Errorf("cannot allocate unpadded piece") From 13c39452c1e81cd5b90c0473ed0b312abdeda3ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 4 Dec 2019 00:59:40 +0100 Subject: [PATCH 24/31] storageminer: States for common failure modes --- api/api_storage.go | 25 ++++++++++++++------ build/params_debug.go | 2 ++ cmd/lotus-storage-miner/info.go | 2 +- cmd/lotus-storage-miner/sectors.go | 4 ++-- storage/sector_states.go | 18 +++++++------- storage/sectors.go | 38 +++++++++++++++++++++++------- 6 files changed, 62 insertions(+), 27 deletions(-) diff --git a/api/api_storage.go b/api/api_storage.go index 0df3d8658..09abbbf7c 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -21,18 +21,29 @@ const ( Committing Proving + SealFailed + PreCommitFailed + SealCommitFailed + CommitFailed + FailedUnrecoverable ) var SectorStates = []string{ UndefinedSectorState: "UndefinedSectorState", - Empty: "Empty", - Packing: "Packing", - Unsealed: "Unsealed", - PreCommitting: "PreCommitting", - PreCommitted: "PreCommitted", - Committing: "Committing", - Proving: "Proving", + Empty: "Empty", + Packing: "Packing", + Unsealed: "Unsealed", + PreCommitting: "PreCommitting", + PreCommitted: "PreCommitted", + Committing: "Committing", + Proving: "Proving", + + SealFailed: "SealFailed", + PreCommitFailed: "PreCommitFailed", + SealCommitFailed: "SealCommitFailed", + CommitFailed: "CommitFailed", + FailedUnrecoverable: "FailedUnrecoverable", } diff --git a/build/params_debug.go b/build/params_debug.go index 644a5d7b5..3f770c48e 100644 --- a/build/params_debug.go +++ b/build/params_debug.go @@ -4,6 +4,8 @@ package build import "os" +var SectorSizes = []uint64{1024} + // Seconds const BlockDelay = 6 diff --git a/cmd/lotus-storage-miner/info.go b/cmd/lotus-storage-miner/info.go index 917b71bc7..c749cc6dc 100644 --- a/cmd/lotus-storage-miner/info.go +++ b/cmd/lotus-storage-miner/info.go @@ -128,7 +128,7 @@ func sectorsInfo(ctx context.Context, napi api.StorageMiner) (map[string]int, er return nil, err } - out[api.SectorStateStr(st.State)]++ + out[api.SectorStates[st.State]]++ } return out, nil diff --git a/cmd/lotus-storage-miner/sectors.go b/cmd/lotus-storage-miner/sectors.go index d5ee85d00..23076b155 100644 --- a/cmd/lotus-storage-miner/sectors.go +++ b/cmd/lotus-storage-miner/sectors.go @@ -61,7 +61,7 @@ var sectorsStatusCmd = &cli.Command{ } fmt.Printf("SectorID:\t%d\n", status.SectorID) - fmt.Printf("Status:\t%s\n", api.SectorStateStr(status.State)) + fmt.Printf("Status:\t%s\n", api.SectorStates[status.State]) fmt.Printf("CommD:\t\t%x\n", status.CommD) fmt.Printf("CommR:\t\t%x\n", status.CommR) fmt.Printf("Ticket:\t\t%x\n", status.Ticket.TicketBytes) @@ -132,7 +132,7 @@ var sectorsListCmd = &cli.Command{ fmt.Printf("%d: %s\tsSet: %s\tpSet: %s\ttktH: %d\tseedH: %d\tdeals: %v\n", s, - api.SectorStateStr(st.State), + api.SectorStates[st.State], yesno(inSSet), yesno(inPSet), st.Ticket.BlockHeight, diff --git a/storage/sector_states.go b/storage/sector_states.go index dfee5f14e..ef555a061 100644 --- a/storage/sector_states.go +++ b/storage/sector_states.go @@ -71,7 +71,7 @@ func (m *Miner) handleUnsealed(ctx context.Context, sector SectorInfo) *sectorUp rspco, err := m.sb.SealPreCommit(sector.SectorID, *ticket, sector.pieceInfos()) if err != nil { - return sector.upd().fatal(xerrors.Errorf("seal pre commit failed: %w", err)) + return sector.upd().to(api.SealFailed).error(xerrors.Errorf("seal pre commit failed: %w", err)) } return sector.upd().to(api.PreCommitting).state(func(info *SectorInfo) { @@ -97,7 +97,7 @@ func (m *Miner) handlePreCommitting(ctx context.Context, sector SectorInfo) *sec } enc, aerr := actors.SerializeParams(params) if aerr != nil { - return sector.upd().fatal(xerrors.Errorf("could not serialize commit sector parameters: %w", aerr)) + return sector.upd().to(api.PreCommitFailed).error(xerrors.Errorf("could not serialize commit sector parameters: %w", aerr)) } msg := &types.Message{ @@ -113,7 +113,7 @@ func (m *Miner) handlePreCommitting(ctx context.Context, sector SectorInfo) *sec log.Info("submitting precommit for sector: ", sector.SectorID) smsg, err := m.api.MpoolPushMessage(ctx, msg) if err != nil { - return sector.upd().fatal(xerrors.Errorf("pushing message to mpool: %w", err)) + return sector.upd().to(api.PreCommitFailed).error(xerrors.Errorf("pushing message to mpool: %w", err)) } return sector.upd().to(api.PreCommitted).state(func(info *SectorInfo) { @@ -127,12 +127,12 @@ func (m *Miner) handlePreCommitted(ctx context.Context, sector SectorInfo) *sect log.Info("Sector precommitted: ", sector.SectorID) mw, err := m.api.StateWaitMsg(ctx, *sector.PreCommitMessage) if err != nil { - return sector.upd().fatal(err) + return sector.upd().to(api.PreCommitFailed).error(err) } if mw.Receipt.ExitCode != 0 { log.Error("sector precommit failed: ", mw.Receipt.ExitCode) - return sector.upd().fatal(err) + return sector.upd().to(api.PreCommitFailed).error(err) } log.Info("precommit message landed on chain: ", sector.SectorID) @@ -172,7 +172,7 @@ func (m *Miner) handleCommitting(ctx context.Context, sector SectorInfo) *sector proof, err := m.sb.SealCommit(sector.SectorID, sector.Ticket.SB(), sector.Seed.SB(), sector.pieceInfos(), sector.rspco()) if err != nil { - return sector.upd().fatal(xerrors.Errorf("computing seal proof failed: %w", err)) + return sector.upd().to(api.SealCommitFailed).error(xerrors.Errorf("computing seal proof failed: %w", err)) } // TODO: Consider splitting states and persist proof for faster recovery @@ -185,7 +185,7 @@ func (m *Miner) handleCommitting(ctx context.Context, sector SectorInfo) *sector enc, aerr := actors.SerializeParams(params) if aerr != nil { - return sector.upd().fatal(xerrors.Errorf("could not serialize commit sector parameters: %w", aerr)) + return sector.upd().to(api.CommitFailed).error(xerrors.Errorf("could not serialize commit sector parameters: %w", aerr)) } msg := &types.Message{ @@ -200,14 +200,14 @@ func (m *Miner) handleCommitting(ctx context.Context, sector SectorInfo) *sector smsg, err := m.api.MpoolPushMessage(ctx, msg) if err != nil { - log.Error(xerrors.Errorf("pushing message to mpool: %w", err)) + return sector.upd().to(api.CommitFailed).error(xerrors.Errorf("pushing message to mpool: %w", err)) } // TODO: Separate state before this wait, so we persist message cid? mw, err := m.api.StateWaitMsg(ctx, smsg.Cid()) if err != nil { - return sector.upd().fatal(xerrors.Errorf("failed to wait for porep inclusion: %w", err)) + return sector.upd().to(api.CommitFailed).error(xerrors.Errorf("failed to wait for porep inclusion: %w", err)) } if mw.Receipt.ExitCode != 0 { diff --git a/storage/sectors.go b/storage/sectors.go index 7a7312520..8b50b1616 100644 --- a/storage/sectors.go +++ b/storage/sectors.go @@ -27,6 +27,15 @@ func (u *sectorUpdate) fatal(err error) *sectorUpdate { } } +func (u *sectorUpdate) error(err error) *sectorUpdate { + return §orUpdate{ + newState: u.newState, + id: u.id, + err: err, + mut: u.mut, + } +} + func (u *sectorUpdate) state(m func(*SectorInfo)) *sectorUpdate { return §orUpdate{ newState: u.newState, @@ -161,23 +170,23 @@ func (m *Miner) onSectorUpdated(ctx context.Context, update sectorUpdate) { /* - *<- Empty + * Empty | | | v *<- Packing <- incoming | | | v - *<- Unsealed + *<- Unsealed <--> SealFailed | | | v - *<- PreCommitting + * PreCommitting <--> PreCommitFailed + | | ^ + | v | + *<- PreCommitted ------/ | | - | v - *<- PreCommitted - | | - | v + | v v--> SealCommitFailed *<- Committing - | | + | | ^--> CommitFailed | v *<- Proving | @@ -191,6 +200,7 @@ func (m *Miner) onSectorUpdated(ctx context.Context, update sectorUpdate) { */ switch update.newState { + // Happy path case api.Packing: m.handleSectorUpdate(ctx, sector, m.handlePacking) case api.Unsealed: @@ -204,6 +214,18 @@ func (m *Miner) onSectorUpdated(ctx context.Context, update sectorUpdate) { case api.Proving: // TODO: track sector health / expiration log.Infof("Proving sector %d", update.id) + + // Handled failure modes + case api.SealFailed: + log.Warn("sector %d entered unimplemented state 'SealFailed'", update.id) + case api.PreCommitFailed: + log.Warn("sector %d entered unimplemented state 'PreCommitFailed'", update.id) + case api.SealCommitFailed: + log.Warn("sector %d entered unimplemented state 'SealCommitFailed'", update.id) + case api.CommitFailed: + log.Warn("sector %d entered unimplemented state 'CommitFailed'", update.id) + + // Fatal errors case api.UndefinedSectorState: log.Error("sector update with undefined state!") case api.FailedUnrecoverable: From 6467a982bc26269b0d0d7fb3e39d8a79e65a7336 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 4 Dec 2019 01:25:18 +0100 Subject: [PATCH 25/31] storageminer: Wait for sync --- cli/sync.go | 91 +++++++++++++++++---------------- cmd/lotus-storage-miner/init.go | 26 ++++++---- cmd/lotus-storage-miner/run.go | 12 +++++ 3 files changed, 77 insertions(+), 52 deletions(-) diff --git a/cli/sync.go b/cli/sync.go index 533a7bc9f..8543b9505 100644 --- a/cli/sync.go +++ b/cli/sync.go @@ -1,6 +1,7 @@ package cli import ( + "context" "fmt" "time" @@ -73,48 +74,52 @@ var syncWaitCmd = &cli.Command{ defer closer() ctx := ReqContext(cctx) - for { - state, err := napi.SyncState(ctx) - if err != nil { - return err - } - - head, err := napi.ChainHead(ctx) - if err != nil { - return err - } - - working := 0 - for i, ss := range state.ActiveSyncs { - switch ss.Stage { - case api.StageSyncComplete: - default: - working = i - case api.StageIdle: - // not complete, not actively working - } - } - - ss := state.ActiveSyncs[working] - - var target []cid.Cid - if ss.Target != nil { - target = ss.Target.Cids() - } - - fmt.Printf("\r\x1b[2KWorker %d: Target: %s\tState: %s\tHeight: %d", working, target, chain.SyncStageString(ss.Stage), ss.Height) - - if time.Now().Unix()-int64(head.MinTimestamp()) < build.BlockDelay { - fmt.Println("\nDone!") - return nil - } - - select { - case <-ctx.Done(): - fmt.Println("\nExit by user") - return nil - case <-time.After(1 * time.Second): - } - } + return SyncWait(ctx, napi) }, } + +func SyncWait(ctx context.Context, napi api.FullNode) error { + for { + state, err := napi.SyncState(ctx) + if err != nil { + return err + } + + head, err := napi.ChainHead(ctx) + if err != nil { + return err + } + + working := 0 + for i, ss := range state.ActiveSyncs { + switch ss.Stage { + case api.StageSyncComplete: + default: + working = i + case api.StageIdle: + // not complete, not actively working + } + } + + ss := state.ActiveSyncs[working] + + var target []cid.Cid + if ss.Target != nil { + target = ss.Target.Cids() + } + + fmt.Printf("\r\x1b[2KWorker %d: Target: %s\tState: %s\tHeight: %d", working, target, chain.SyncStageString(ss.Stage), ss.Height) + + if time.Now().Unix()-int64(head.MinTimestamp()) < build.BlockDelay { + fmt.Println("\nDone!") + return nil + } + + select { + case <-ctx.Done(): + fmt.Println("\nExit by user") + return nil + case <-time.After(1 * time.Second): + } + } +} diff --git a/cmd/lotus-storage-miner/init.go b/cmd/lotus-storage-miner/init.go index 5250b8f7f..05889b012 100644 --- a/cmd/lotus-storage-miner/init.go +++ b/cmd/lotus-storage-miner/init.go @@ -80,6 +80,23 @@ var initCmd = &cli.Command{ return xerrors.Errorf("fetching proof parameters: %w", err) } + log.Info("Trying to connect to full node RPC") + + api, closer, err := lcli.GetFullNodeAPI(cctx) // TODO: consider storing full node address in config + if err != nil { + return err + } + defer closer() + ctx := lcli.ReqContext(cctx) + + log.Info("Checking full node sync status") + + if !cctx.Bool("genesis-miner") { + if err := lcli.SyncWait(ctx, api); err != nil { + return xerrors.Errorf("sync wait: %w", err) + } + } + log.Info("Checking if repo exists") repoPath := cctx.String(FlagStorageRepo) @@ -96,15 +113,6 @@ var initCmd = &cli.Command{ return xerrors.Errorf("repo at '%s' is already initialized", cctx.String(FlagStorageRepo)) } - log.Info("Trying to connect to full node RPC") - - api, closer, err := lcli.GetFullNodeAPI(cctx) // TODO: consider storing full node address in config - if err != nil { - return err - } - defer closer() - ctx := lcli.ReqContext(cctx) - log.Info("Checking full node version") v, err := api.Version(ctx) diff --git a/cmd/lotus-storage-miner/run.go b/cmd/lotus-storage-miner/run.go index 894303b92..2498fe419 100644 --- a/cmd/lotus-storage-miner/run.go +++ b/cmd/lotus-storage-miner/run.go @@ -35,6 +35,10 @@ var runCmd = &cli.Command{ Usage: "Enable use of GPU for mining operations", Value: true, }, + &cli.BoolFlag{ + Name: "nosync", + Usage: "Don't check full-node sync status", + }, }, Action: func(cctx *cli.Context) error { if err := build.GetParams(true, false); err != nil { @@ -61,6 +65,14 @@ var runCmd = &cli.Command{ return xerrors.Errorf("lotus-daemon API version doesn't match: local: ", api.Version{APIVersion: build.APIVersion}) } + log.Info("Checking full node sync status") + + if !cctx.Bool("nosync") { + if err := lcli.SyncWait(ctx, nodeApi); err != nil { + return xerrors.Errorf("sync wait: %w", err) + } + } + storageRepoPath := cctx.String(FlagStorageRepo) r, err := repo.NewFS(storageRepoPath) if err != nil { From d112fd18b1e59459c596c5776283b467713974ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 4 Dec 2019 01:44:29 +0100 Subject: [PATCH 26/31] Persist sector sealing errors --- api/api_storage.go | 1 + chain/blocksync/cbor_gen.go | 2 +- chain/types/cbor_gen.go | 2 +- cmd/lotus-storage-miner/sectors.go | 3 +++ node/impl/storminer.go | 2 ++ storage/cbor_gen.go | 21 +++++++++++++++++++-- storage/sector_types.go | 3 +++ storage/sectors.go | 6 ++++++ 8 files changed, 36 insertions(+), 4 deletions(-) diff --git a/api/api_storage.go b/api/api_storage.go index 09abbbf7c..f878e5d82 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -82,6 +82,7 @@ type SectorInfo struct { Deals []uint64 Ticket sectorbuilder.SealTicket Seed sectorbuilder.SealSeed + LastErr string } type SealedRef struct { diff --git a/chain/blocksync/cbor_gen.go b/chain/blocksync/cbor_gen.go index 4af61a490..579b3f857 100644 --- a/chain/blocksync/cbor_gen.go +++ b/chain/blocksync/cbor_gen.go @@ -5,7 +5,7 @@ import ( "io" "github.com/filecoin-project/lotus/chain/types" - "github.com/ipfs/go-cid" + cid "github.com/ipfs/go-cid" cbg "github.com/whyrusleeping/cbor-gen" xerrors "golang.org/x/xerrors" ) diff --git a/chain/types/cbor_gen.go b/chain/types/cbor_gen.go index d2b60f268..fc7a7888c 100644 --- a/chain/types/cbor_gen.go +++ b/chain/types/cbor_gen.go @@ -5,7 +5,7 @@ import ( "io" "math" - "github.com/ipfs/go-cid" + cid "github.com/ipfs/go-cid" cbg "github.com/whyrusleeping/cbor-gen" xerrors "golang.org/x/xerrors" ) diff --git a/cmd/lotus-storage-miner/sectors.go b/cmd/lotus-storage-miner/sectors.go index 23076b155..774b29bd3 100644 --- a/cmd/lotus-storage-miner/sectors.go +++ b/cmd/lotus-storage-miner/sectors.go @@ -70,6 +70,9 @@ var sectorsStatusCmd = &cli.Command{ fmt.Printf("SeedH:\t\t%d\n", status.Seed.BlockHeight) fmt.Printf("Proof:\t\t%x\n", status.Proof) fmt.Printf("Deals:\t\t%v\n", status.Deals) + if status.LastErr != "" { + fmt.Printf("Last Error:\t\t%s\n", status.LastErr) + } return nil }, } diff --git a/node/impl/storminer.go b/node/impl/storminer.go index 04f162c30..1eac4a551 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -60,6 +60,8 @@ func (sm *StorageMinerAPI) SectorsStatus(ctx context.Context, sid uint64) (api.S Deals: deals, Ticket: info.Ticket.SB(), Seed: info.Seed.SB(), + + LastErr: info.LastErr, }, nil } diff --git a/storage/cbor_gen.go b/storage/cbor_gen.go index 1bc604e4a..41de25822 100644 --- a/storage/cbor_gen.go +++ b/storage/cbor_gen.go @@ -239,7 +239,7 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error { _, err := w.Write(cbg.CborNull) return err } - if _, err := w.Write([]byte{140}); err != nil { + if _, err := w.Write([]byte{141}); err != nil { return err } @@ -337,6 +337,13 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error { } } + // t.t.LastErr (string) (string) + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len(t.LastErr)))); err != nil { + return err + } + if _, err := w.Write([]byte(t.LastErr)); err != nil { + return err + } return nil } @@ -351,7 +358,7 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) error { return fmt.Errorf("cbor input should be of type array") } - if extra != 12 { + if extra != 13 { return fmt.Errorf("cbor input had wrong number of fields") } @@ -552,5 +559,15 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) error { } } + // t.t.LastErr (string) (string) + + { + sval, err := cbg.ReadString(br) + if err != nil { + return err + } + + t.LastErr = string(sval) + } return nil } diff --git a/storage/sector_types.go b/storage/sector_types.go index 179a7bbc6..20dff0161 100644 --- a/storage/sector_types.go +++ b/storage/sector_types.go @@ -69,6 +69,9 @@ type SectorInfo struct { // Committing CommitMessage *cid.Cid + + // Debug + LastErr string } func (t *SectorInfo) upd() *sectorUpdate { diff --git a/storage/sectors.go b/storage/sectors.go index 8b50b1616..929708880 100644 --- a/storage/sectors.go +++ b/storage/sectors.go @@ -2,6 +2,7 @@ package storage import ( "context" + "fmt" "io" xerrors "golang.org/x/xerrors" @@ -154,6 +155,11 @@ func (m *Miner) onSectorUpdated(ctx context.Context, update sectorUpdate) { var sector SectorInfo err := m.sectors.Mutate(update.id, func(s *SectorInfo) error { s.State = update.newState + s.LastErr = "" + if update.err != nil { + s.LastErr = fmt.Sprintf("%+v", update.err) + } + if update.mut != nil { update.mut(s) } From 2d15e925cd2df3ab76d7657622e68fd10fff1350 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Tue, 3 Dec 2019 12:00:04 -0800 Subject: [PATCH 27/31] WIP: improve timing of mining --- chain/sync.go | 3 +++ miner/miner.go | 30 +++++++++++++++++++++--------- miner/testminer.go | 4 ++-- 3 files changed, 26 insertions(+), 11 deletions(-) diff --git a/chain/sync.go b/chain/sync.go index b98a1fddb..f5d9d8724 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -538,6 +538,9 @@ func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) err if h.Timestamp > uint64(time.Now().Unix()+build.AllowableClockDrift) { return xerrors.Errorf("block was from the future") } + if h.Timestamp > uint64(time.Now().Unix()) { + log.Warn("Got block from the future, but within threshold", h.Timestamp, time.Now().Unix()) + } if h.Timestamp < baseTs.MinTimestamp()+(build.BlockDelay*(h.Height-baseTs.Height())) { log.Warn("timestamp funtimes: ", h.Timestamp, baseTs.MinTimestamp(), h.Height, baseTs.Height()) diff --git a/miner/miner.go b/miner/miner.go index b49c422d5..0b8d2f9f5 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -19,15 +19,17 @@ import ( var log = logging.Logger("miner") -type waitFunc func(ctx context.Context) error +type waitFunc func(ctx context.Context, baseTime uint64) error func NewMiner(api api.FullNode, epp gen.ElectionPoStProver) *Miner { return &Miner{ api: api, epp: epp, - waitFunc: func(ctx context.Context) error { + waitFunc: func(ctx context.Context, baseTime uint64) error { // Wait around for half the block time in case other parents come in - time.Sleep(build.PropagationDelay * time.Second) + deadline := baseTime + build.PropagationDelay + time.Sleep(time.Until(time.Unix(int64(deadline), 0))) + return nil }, } @@ -141,8 +143,15 @@ eventLoop: addrs := m.addresses m.lk.Unlock() - // Sleep a small amount in order to wait for other blocks to arrive - if err := m.waitFunc(ctx); err != nil { + prebase, err := m.GetBestMiningCandidate(ctx) + if err != nil { + log.Errorf("failed to get best mining candidate: %s", err) + time.Sleep(time.Second * 5) + continue + } + + // Wait until propagation delay period after block we plan to mine on + if err := m.waitFunc(ctx, prebase.ts.MinTimestamp()); err != nil { log.Error(err) return } @@ -159,6 +168,8 @@ eventLoop: } lastBase = *base + log.Infof("Time delta between now and our mining base: %ds", uint64(time.Now().Unix())-base.ts.MinTimestamp()) + blks := make([]*types.BlockMsg, 0) for _, addr := range addrs { @@ -289,10 +300,9 @@ func (m *Miner) mineOne(ctx context.Context, addr address.Address, base *MiningB if err != nil { return nil, xerrors.Errorf("failed to create block: %w", err) } - log.Infow("mined new block", "cid", b.Cid(), "height", b.Header.Height) - dur := time.Now().Sub(start) - log.Infof("Creating block took %s", dur) + dur := time.Since(start) + log.Infow("mined new block", "cid", b.Cid(), "height", b.Header.Height, "took", dur) if dur > time.Second*build.BlockDelay { log.Warn("CAUTION: block production took longer than the block delay. Your computer may not be fast enough to keep up") } @@ -374,6 +384,7 @@ func selectMessages(ctx context.Context, al actorLookup, base *MiningBase, msgs out := make([]*types.SignedMessage, 0, len(msgs)) inclNonces := make(map[address.Address]uint64) inclBalances := make(map[address.Address]types.BigInt) + inclCount := make(map[address.Address]int) for _, msg := range msgs { if msg.Message.To == address.Undef { @@ -399,7 +410,7 @@ func selectMessages(ctx context.Context, al actorLookup, base *MiningBase, msgs } if msg.Message.Nonce > inclNonces[from] { - log.Warnf("message in mempool has too high of a nonce (%d > %d) %s (%d pending for orig)", msg.Message.Nonce, inclNonces[from], msg.Cid(), countFrom(msgs, from)) + log.Warnf("message in mempool has too high of a nonce (%d > %d, from %s, inclcount %d) %s (%d pending for orig)", msg.Message.Nonce, inclNonces[from], from, inclCount[from], msg.Cid(), countFrom(msgs, from)) continue } @@ -410,6 +421,7 @@ func selectMessages(ctx context.Context, al actorLookup, base *MiningBase, msgs inclNonces[from] = msg.Message.Nonce + 1 inclBalances[from] = types.BigSub(inclBalances[from], msg.Message.RequiredFunds()) + inclCount[from]++ out = append(out, msg) } diff --git a/miner/testminer.go b/miner/testminer.go index 594f2175e..1f93eb04f 100644 --- a/miner/testminer.go +++ b/miner/testminer.go @@ -23,8 +23,8 @@ func NewTestMiner(nextCh <-chan struct{}, addr address.Address) func(api.FullNod } } -func chanWaiter(next <-chan struct{}) func(ctx context.Context) error { - return func(ctx context.Context) error { +func chanWaiter(next <-chan struct{}) func(ctx context.Context, _ uint64) error { + return func(ctx context.Context, _ uint64) error { select { case <-ctx.Done(): return ctx.Err() From 034f0cc479ec831c162df031c0cd59dcae05204a Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Tue, 3 Dec 2019 19:56:29 -0800 Subject: [PATCH 28/31] track sync errors in status --- api/api_full.go | 1 + chain/sync.go | 9 +++++++-- chain/syncstate.go | 45 ++++++++++++++++++++++++++++++++++++--------- 3 files changed, 44 insertions(+), 11 deletions(-) diff --git a/api/api_full.go b/api/api_full.go index 2d5ed580c..5fe71856e 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -277,6 +277,7 @@ const ( StagePersistHeaders StageMessages StageSyncComplete + StageSyncErrored ) type MpoolChange int diff --git a/chain/sync.go b/chain/sync.go index f5d9d8724..ec2527d49 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -1115,6 +1115,7 @@ func (syncer *Syncer) collectChain(ctx context.Context, ts *types.TipSet) error headers, err := syncer.collectHeaders(ctx, ts, syncer.store.GetHeaviestTipSet()) if err != nil { + ss.Error(err) return err } @@ -1131,14 +1132,18 @@ func (syncer *Syncer) collectChain(ctx context.Context, ts *types.TipSet) error toPersist = append(toPersist, ts.Blocks()...) } if err := syncer.store.PersistBlockHeaders(toPersist...); err != nil { - return xerrors.Errorf("failed to persist synced blocks to the chainstore: %w", err) + err = xerrors.Errorf("failed to persist synced blocks to the chainstore: %w", err) + ss.Error(err) + return err } toPersist = nil ss.SetStage(api.StageMessages) if err := syncer.syncMessagesAndCheckState(ctx, headers); err != nil { - return xerrors.Errorf("collectChain syncMessages: %w", err) + err = xerrors.Errorf("collectChain syncMessages: %w", err) + ss.Error(err) + return err } ss.SetStage(api.StageSyncComplete) diff --git a/chain/syncstate.go b/chain/syncstate.go index 45aeba90c..622598193 100644 --- a/chain/syncstate.go +++ b/chain/syncstate.go @@ -3,6 +3,7 @@ package chain import ( "fmt" "sync" + "time" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/types" @@ -18,17 +19,22 @@ func SyncStageString(v api.SyncStateStage) string { return "message sync" case api.StageSyncComplete: return "complete" + case api.StageSyncErrored: + return "error" default: return fmt.Sprintf("", v) } } type SyncerState struct { - lk sync.Mutex - Target *types.TipSet - Base *types.TipSet - Stage api.SyncStateStage - Height uint64 + lk sync.Mutex + Target *types.TipSet + Base *types.TipSet + Stage api.SyncStateStage + Height uint64 + Message string + Start time.Time + End time.Time } func (ss *SyncerState) SetStage(v api.SyncStateStage) { @@ -39,6 +45,9 @@ func (ss *SyncerState) SetStage(v api.SyncStateStage) { ss.lk.Lock() defer ss.lk.Unlock() ss.Stage = v + if v == api.StageSyncComplete { + ss.End = time.Now() + } } func (ss *SyncerState) Init(base, target *types.TipSet) { @@ -52,6 +61,9 @@ func (ss *SyncerState) Init(base, target *types.TipSet) { ss.Base = base ss.Stage = api.StageHeaders ss.Height = 0 + ss.Message = "" + ss.Start = time.Now() + ss.End = time.Time{} } func (ss *SyncerState) SetHeight(h uint64) { @@ -64,13 +76,28 @@ func (ss *SyncerState) SetHeight(h uint64) { ss.Height = h } +func (ss *SyncerState) Error(err error) { + if ss == nil { + return + } + + ss.lk.Lock() + defer ss.lk.Unlock() + ss.Message = err.Error() + ss.Stage = api.StageSyncErrored + ss.End = time.Now() +} + func (ss *SyncerState) Snapshot() SyncerState { ss.lk.Lock() defer ss.lk.Unlock() return SyncerState{ - Base: ss.Base, - Target: ss.Target, - Stage: ss.Stage, - Height: ss.Height, + Base: ss.Base, + Target: ss.Target, + Stage: ss.Stage, + Height: ss.Height, + Message: ss.Message, + Start: ss.Start, + End: ss.End, } } From c302051bc27c8c5ae4cf331b1d2ac0c06e834e43 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Tue, 3 Dec 2019 20:59:41 -0800 Subject: [PATCH 29/31] add duration tracking to sync status --- api/api_full.go | 5 +++++ chain/sync.go | 6 +++++- cli/sync.go | 18 +++++++++++++++--- node/impl/full/sync.go | 11 +++++++---- 4 files changed, 32 insertions(+), 8 deletions(-) diff --git a/api/api_full.go b/api/api_full.go index 5fe71856e..a8a71f799 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -2,6 +2,7 @@ package api import ( "context" + "time" "github.com/ipfs/go-cid" "github.com/ipfs/go-filestore" @@ -263,6 +264,10 @@ type ActiveSync struct { Stage SyncStateStage Height uint64 + + Start time.Time + End time.Time + Message string } type SyncState struct { diff --git a/chain/sync.go b/chain/sync.go index ec2527d49..44f3fd249 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -144,7 +144,11 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) { bestPweight := syncer.store.GetHeaviestTipSet().Blocks()[0].ParentWeight targetWeight := fts.TipSet().Blocks()[0].ParentWeight if targetWeight.LessThan(bestPweight) { - log.Warn("incoming tipset does not appear to be better than our best chain, ignoring for now") + var miners []string + for _, blk := range fts.TipSet().Blocks() { + miners = append(miners, blk.Miner.String()) + } + log.Warnf("incoming tipset from %s does not appear to be better than our best chain, ignoring for now", miners) return } diff --git a/cli/sync.go b/cli/sync.go index 8543b9505..1b051769a 100644 --- a/cli/sync.go +++ b/cli/sync.go @@ -26,14 +26,14 @@ var syncStatusCmd = &cli.Command{ Name: "status", Usage: "check sync status", Action: func(cctx *cli.Context) error { - api, closer, err := GetFullNodeAPI(cctx) + apic, closer, err := GetFullNodeAPI(cctx) if err != nil { return err } defer closer() ctx := ReqContext(cctx) - state, err := api.SyncState(ctx) + state, err := apic.SyncState(ctx) if err != nil { return err } @@ -43,6 +43,7 @@ var syncStatusCmd = &cli.Command{ fmt.Printf("worker %d:\n", i) var base, target []cid.Cid var heightDiff int64 + var theight uint64 if ss.Base != nil { base = ss.Base.Cids() heightDiff = int64(ss.Base.Height()) @@ -50,14 +51,25 @@ var syncStatusCmd = &cli.Command{ if ss.Target != nil { target = ss.Target.Cids() heightDiff = int64(ss.Target.Height()) - heightDiff + theight = ss.Target.Height() } else { heightDiff = 0 } fmt.Printf("\tBase:\t%s\n", base) - fmt.Printf("\tTarget:\t%s\n", target) + fmt.Printf("\tTarget:\t%s (%d)\n", target, theight) fmt.Printf("\tHeight diff:\t%d\n", heightDiff) fmt.Printf("\tStage: %s\n", chain.SyncStageString(ss.Stage)) fmt.Printf("\tHeight: %d\n", ss.Height) + if ss.End.IsZero() { + if !ss.Start.IsZero() { + fmt.Printf("\tElapsed: %s\n", time.Since(ss.Start)) + } + } else { + fmt.Printf("\tElapsed: %s\n", ss.End.Sub(ss.Start)) + } + if ss.Stage == api.StageSyncErrored { + fmt.Printf("\tError: %s\n", ss.Message) + } } return nil }, diff --git a/node/impl/full/sync.go b/node/impl/full/sync.go index c84957ad7..0ebdad536 100644 --- a/node/impl/full/sync.go +++ b/node/impl/full/sync.go @@ -26,10 +26,13 @@ func (a *SyncAPI) SyncState(ctx context.Context) (*api.SyncState, error) { for _, ss := range states { out.ActiveSyncs = append(out.ActiveSyncs, api.ActiveSync{ - Base: ss.Base, - Target: ss.Target, - Stage: ss.Stage, - Height: ss.Height, + Base: ss.Base, + Target: ss.Target, + Stage: ss.Stage, + Height: ss.Height, + Start: ss.Start, + End: ss.End, + Message: ss.Message, }) } return out, nil From fd7807d746568fa9c3f0f86fbf55ee8fa70e93e7 Mon Sep 17 00:00:00 2001 From: ognots Date: Tue, 3 Dec 2019 20:13:27 -0500 Subject: [PATCH 30/31] macos build and linux and darwin bin artifacts - adds a new job to run build in macos and produce lotus and lotus-storage-miner binaries - persist linux ad darwin lotus and lotus-storage-miner binaries as CircleCI artifacts - create CircleCI cache for go mod and cargo --- .circleci/config.yml | 99 ++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 91 insertions(+), 8 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 30dc273b8..cfc6e995a 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -6,7 +6,7 @@ executors: golang: docker: - image: circleci/golang:1.13 - resource_class: 2xlarge + resource_class: 2xlarge commands: install-deps: @@ -14,25 +14,37 @@ commands: - go/install-ssh - go/install: {package: git} prepare: + parameters: + linux: + default: true + description: is a linux build environment? + type: boolean + darwin: + default: false + description: is a darwin build environment? + type: boolean steps: - checkout - - run: sudo apt-get update - - run: sudo apt-get install ocl-icd-opencl-dev + - when: + condition: << parameters.linux >> + steps: + - run: sudo apt-get update + - run: sudo apt-get install ocl-icd-opencl-dev - run: git submodule sync - run: git submodule update --init download-params: steps: - restore_cache: name: Restore parameters cache - keys: - - 'v15-lotus-params-{{ checksum "build/proof-params/parameters.json" }}-{{ checksum "build/paramfetch.go" }}' - - 'v15-lotus-params-{{ checksum "build/proof-params/parameters.json" }}-' + keys: + - 'v19-lotus-params-{{ checksum "build/proof-params/parameters.json" }}-{{ checksum "build/paramfetch.go" }}' + - 'v19-lotus-params-{{ checksum "build/proof-params/parameters.json" }}-' paths: - /var/tmp/filecoin-proof-parameters/ - run: ./lotus fetch-params --include-test-params - save_cache: name: Save parameters cache - key: 'v15-lotus-params-{{ checksum "build/proof-params/parameters.json" }}-{{ checksum "build/paramfetch.go" }}' + key: 'v19-lotus-params-{{ checksum "build/proof-params/parameters.json" }}-{{ checksum "build/paramfetch.go" }}' paths: - /var/tmp/filecoin-proof-parameters/ @@ -54,8 +66,15 @@ jobs: - go/mod-download - run: sudo apt-get update - run: sudo apt-get install npm + - restore_cache: + name: restore go mod cache + key: v1-go-deps-{{ arch }}-{{ checksum "~/go/src/github.com/filecoin-project/lotus/go.mod" }} - run: command: make buildall + - store_artifacts: + path: lotus + - store_artifacts: + path: lotus-storage-miner test: description: | @@ -95,6 +114,9 @@ jobs: - install-deps - prepare - go/mod-download + - restore_cache: + name: restore go mod cache + key: v1-go-deps-{{ arch }}-{{ checksum "~/go/src/github.com/filecoin-project/lotus/go.mod" }} - run: command: make deps lotus no_output_timeout: 30m @@ -123,6 +145,63 @@ jobs: shell: /bin/bash -eo pipefail command: | bash <(curl -s https://codecov.io/bash) + - save_cache: + name: save go mod cache + key: v1-go-deps-{{ arch }}-{{ checksum "~/go/src/github.com/filecoin-project/lotus/go.mod" }} + paths: + - "~/go/pkg" + - "~/go/src/github.com" + - "~/go/src/golang.org" + + build_macos: + description: build darwin lotus binary + macos: + xcode: "10.0.0" + working_directory: ~/go/src/github.com/filecoin-project/lotus + steps: + - prepare: + linux: false + darwin: true + - run: + name: Install go + command: | + curl -O https://dl.google.com/go/go1.13.4.darwin-amd64.pkg && \ + sudo installer -pkg go1.13.4.darwin-amd64.pkg -target / + - run: + name: Install pkg-config + command: HOMEBREW_NO_AUTO_UPDATE=1 brew install pkg-config + - run: go version + - run: + name: Install Rust + command: | + curl https://sh.rustup.rs -sSf | sh -s -- -y + - run: + name: Install jq + command: | + mkdir $HOME/.bin + curl --location https://github.com/stedolan/jq/releases/download/jq-1.6/jq-osx-amd64 --output $HOME/.bin/jq + chmod +x $HOME/.bin/jq + - restore_cache: + name: restore go mod and cargo cache + key: v1-go-deps-{{ arch }}-{{ checksum "~/go/src/github.com/filecoin-project/lotus/go.mod" }} + - install-deps + - go/mod-download + - run: + command: make build + no_output_timeout: 30m + - store_artifacts: + path: lotus + - store_artifacts: + path: lotus-storage-miner + - save_cache: + name: save go mod and cargo cache + key: v1-go-deps-{{ arch }}-{{ checksum "~/go/src/github.com/filecoin-project/lotus/go.mod" }} + paths: + - "~/go/pkg" + - "~/go/src/github.com" + - "~/go/src/golang.org" + - "~/.rustup" + - "~/.cargo" lint: &lint description: | @@ -177,4 +256,8 @@ workflows: - test: codecov-upload: true - mod-tidy-check - - build-all \ No newline at end of file + - build-all + - build_macos: + filters: + branches: + only: master From 99dfb8df68a2548df47cb76afa1b575b42caf87a Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Tue, 3 Dec 2019 22:18:02 -0800 Subject: [PATCH 31/31] dont add too many messages to a block --- miner/miner.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/miner/miner.go b/miner/miner.go index 0b8d2f9f5..df2b3cdaa 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -17,6 +17,8 @@ import ( "golang.org/x/xerrors" ) +const MaxMessagesPerBlock = 4000 + var log = logging.Logger("miner") type waitFunc func(ctx context.Context, baseTime uint64) error @@ -424,6 +426,9 @@ func selectMessages(ctx context.Context, al actorLookup, base *MiningBase, msgs inclCount[from]++ out = append(out, msg) + if len(out) >= MaxMessagesPerBlock { + break + } } return out, nil }