From cc233c6956827c6ab30582cdd30a27c175a29061 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Mon, 20 Apr 2020 09:53:19 -0700 Subject: [PATCH 01/10] add an lru caching blockstore --- lib/cachebs/cachebs.go | 78 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) create mode 100644 lib/cachebs/cachebs.go diff --git a/lib/cachebs/cachebs.go b/lib/cachebs/cachebs.go new file mode 100644 index 000000000..2c00afff2 --- /dev/null +++ b/lib/cachebs/cachebs.go @@ -0,0 +1,78 @@ +package cachebs + +import ( + "context" + + lru "github.com/hashicorp/golang-lru" + block "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + blockstore "github.com/ipfs/go-ipfs-blockstore" + bstore "github.com/ipfs/go-ipfs-blockstore" + logging "github.com/ipfs/go-log/v2" +) + +var log = logging.Logger("cachebs") + +type CacheBS struct { + cache *lru.ARCCache + bs bstore.Blockstore +} + +func NewBufferedBstore(base blockstore.Blockstore, size int) *CacheBS { + c, err := lru.NewARC(size) + if err != nil { + panic(err) + } + return &CacheBS{ + cache: c, + bs: base, + } +} + +var _ (bstore.Blockstore) = &CacheBS{} + +func (bs *CacheBS) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { + return bs.bs.AllKeysChan(ctx) +} + +func (bs *CacheBS) DeleteBlock(c cid.Cid) error { + return bs.bs.DeleteBlock(c) +} + +func (bs *CacheBS) Get(c cid.Cid) (block.Block, error) { + v, ok := bs.cache.Get(c) + if ok { + return v.(block.Block), nil + } + + return bs.bs.Get(c) +} + +func (bs *CacheBS) GetSize(c cid.Cid) (int, error) { + return bs.bs.GetSize(c) +} + +func (bs *CacheBS) Put(blk block.Block) error { + bs.cache.Add(blk.Cid(), blk) + + return bs.bs.Put(blk) +} + +func (bs *CacheBS) Has(c cid.Cid) (bool, error) { + if bs.cache.Contains(c) { + return true, nil + } + + return bs.bs.Has(c) +} + +func (bs *CacheBS) HashOnRead(hor bool) { + bs.bs.HashOnRead(hor) +} + +func (bs *CacheBS) PutMany(blks []block.Block) error { + for _, blk := range blks { + bs.cache.Add(blk.Cid(), blk) + } + return bs.bs.PutMany(blks) +} From 94555ea8c103c5750f100223136d38467df83821 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Wed, 22 Apr 2020 12:04:03 -0700 Subject: [PATCH 02/10] delete block should delete from cache too --- lib/cachebs/cachebs.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/cachebs/cachebs.go b/lib/cachebs/cachebs.go index 2c00afff2..11152e7c5 100644 --- a/lib/cachebs/cachebs.go +++ b/lib/cachebs/cachebs.go @@ -36,6 +36,8 @@ func (bs *CacheBS) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { } func (bs *CacheBS) DeleteBlock(c cid.Cid) error { + bs.cache.Remove(c) + return bs.bs.DeleteBlock(c) } From bac1e3f9012918b3d4704a1a93e17e4553c4ecba Mon Sep 17 00:00:00 2001 From: Jeromy Date: Fri, 29 May 2020 15:15:59 -0700 Subject: [PATCH 03/10] put blocks in cache after get --- lib/cachebs/cachebs.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/lib/cachebs/cachebs.go b/lib/cachebs/cachebs.go index 11152e7c5..5d997137d 100644 --- a/lib/cachebs/cachebs.go +++ b/lib/cachebs/cachebs.go @@ -47,7 +47,13 @@ func (bs *CacheBS) Get(c cid.Cid) (block.Block, error) { return v.(block.Block), nil } - return bs.bs.Get(c) + out, err := bs.bs.Get(c) + if err != nil { + return nil, err + } + + bs.cache.Add(c, out) + return out, nil } func (bs *CacheBS) GetSize(c cid.Cid) (int, error) { From e13a251cc8bc1cd9399a51a10f6ccd2fe752f221 Mon Sep 17 00:00:00 2001 From: frrist Date: Wed, 8 Jul 2020 18:13:27 -0700 Subject: [PATCH 04/10] feat: add deal state and proposal tracking to cw --- cmd/lotus-chainwatch/storage.go | 160 +++++++++++++++++++++++++++++++- cmd/lotus-chainwatch/sync.go | 76 ++++++++++++--- 2 files changed, 219 insertions(+), 17 deletions(-) diff --git a/cmd/lotus-chainwatch/storage.go b/cmd/lotus-chainwatch/storage.go index 8a4215bd2..c9e66a6c2 100644 --- a/cmd/lotus-chainwatch/storage.go +++ b/cmd/lotus-chainwatch/storage.go @@ -3,6 +3,7 @@ package main import ( "context" "database/sql" + "strconv" "sync" "time" @@ -330,8 +331,15 @@ create table if not exists miner_sectors_heads primary key (miner_id,miner_sectors_cid) ); - -create type miner_sector_event_type as enum ('ADDED', 'EXTENDED', 'EXPIRED', 'TERMINATED'); +DO $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'miner_sector_event_type') THEN + CREATE TYPE miner_sector_event_type AS ENUM + ( + 'ADDED','EXTENDED', 'EXPIRED', 'TERMINATED' + ); + END IF; +END$$; create table if not exists miner_sector_events ( @@ -342,7 +350,46 @@ create table if not exists miner_sector_events constraint miner_sector_events_pk primary key (sector_id, event, miner_id, state_root) -) +); + +create table if not exists market_deal_proposals +( + deal_id bigint not null, + + state_root text not null, + + piece_cid text not null, + piece_size bigint not null, + verified_deal bool not null, + + client_id text not null, + provider_id text not null, + + start_epoch bigint not null, + end_epoch bigint not null, + storage_price_per_epoch text not null, + + provider_collateral text not null, + client_collateral text not null, + + constraint market_deal_proposal_pk + primary key (deal_id) +); + +create table if not exists market_deal_states +( + deal_id bigint not null, + + state_root text not null, + + sector_start_epoch bigint not null, + last_update_epoch bigint not null, + slash_epoch bigint not null, + + constraint market_deal_states_pk + primary key (deal_id) + +); /* create or replace function miner_tips(epoch bigint) @@ -852,7 +899,7 @@ func (st *storage) updateMinerSectors(minerTips map[types.TipSetKey][]*minerStat } for _, added := range changes.Added { - if _, err := eventStmt.Exec(miner.addr.String(), added.Info.SectorNumber, miner.stateroot.String(), "ADDED"); err != nil { + if _, err := eventStmt.Exec(added.Info.SectorNumber, "ADDED", miner.addr.String(), miner.stateroot.String()); err != nil { return err } } @@ -900,6 +947,111 @@ func (st *storage) updateMinerSectors(minerTips map[types.TipSetKey][]*minerStat return updateTx.Commit() } +func (st *storage) storeMarketActorDealStates(marketTips map[types.TipSetKey]*marketStateInfo, api api.FullNode) error { + tx, err := st.db.Begin() + if err != nil { + return err + } + if _, err := tx.Exec(`create temp table mds (like market_deal_states excluding constraints) on commit drop;`); err != nil { + return err + } + stmt, err := tx.Prepare(`copy mds (deal_id, state_root, sector_start_epoch, last_update_epoch, slash_epoch) from STDIN`) + if err != nil { + return err + } + for tskey, mt := range marketTips { + dealStates, err := api.StateMarketDeals(context.TODO(), tskey) + if err != nil { + return err + } + + for dealID, ds := range dealStates { + id, err := strconv.ParseUint(dealID, 10, 64) + if err != nil { + return err + } + + if _, err := stmt.Exec( + id, + mt.stateroot.String(), + ds.State.SectorStartEpoch, + ds.State.LastUpdatedEpoch, + ds.State.SlashEpoch, + ); err != nil { + return err + } + + } + } + if err := stmt.Close(); err != nil { + return err + } + + if _, err := tx.Exec(`insert into market_deal_states select * from mds on conflict do nothing`); err != nil { + return err + } + + return tx.Commit() + +} + +func (st *storage) storeMarketActorDealProposals(marketTips map[types.TipSetKey]*marketStateInfo, api api.FullNode) error { + tx, err := st.db.Begin() + if err != nil { + return err + } + + if _, err := tx.Exec(`create temp table mdp (like market_deal_proposals excluding constraints) on commit drop;`); err != nil { + return xerrors.Errorf("prep temp: %w", err) + } + + stmt, err := tx.Prepare(`copy mdp (deal_id, state_root, piece_cid, piece_size, verified_deal, client_id, provider_id, start_epoch, end_epoch, storage_price_per_epoch, provider_collateral, client_collateral) from STDIN`) + if err != nil { + return err + } + + for tskey, mt := range marketTips { + dealStates, err := api.StateMarketDeals(context.TODO(), tskey) + if err != nil { + return err + } + + for dealID, ds := range dealStates { + id, err := strconv.ParseUint(dealID, 10, 64) + if err != nil { + return err + } + + if _, err := stmt.Exec( + id, + mt.stateroot.String(), + ds.Proposal.PieceCID.String(), + ds.Proposal.PieceSize, + ds.Proposal.VerifiedDeal, + ds.Proposal.Client.String(), + ds.Proposal.Provider.String(), + ds.Proposal.StartEpoch, + ds.Proposal.EndEpoch, + ds.Proposal.StoragePricePerEpoch.String(), + ds.Proposal.ProviderCollateral.String(), + ds.Proposal.ClientCollateral.String(), + ); err != nil { + return err + } + + } + } + if err := stmt.Close(); err != nil { + return err + } + if _, err := tx.Exec(`insert into market_deal_proposals select * from mdp on conflict do nothing`); err != nil { + return err + } + + return tx.Commit() + +} + func (st *storage) storeHeaders(bhs map[cid.Cid]*types.BlockHeader, sync bool) error { st.headerLk.Lock() defer st.headerLk.Unlock() diff --git a/cmd/lotus-chainwatch/sync.go b/cmd/lotus-chainwatch/sync.go index 442fd9c0c..14b761ddb 100644 --- a/cmd/lotus-chainwatch/sync.go +++ b/cmd/lotus-chainwatch/sync.go @@ -6,6 +6,7 @@ import ( "context" "encoding/json" "fmt" + "github.com/filecoin-project/specs-actors/actors/builtin/market" "math" "sync" "time" @@ -81,6 +82,20 @@ type minerStateInfo struct { psize uint64 } +type marketStateInfo struct { + // common + act types.Actor + stateroot cid.Cid + + // calculating changes + // calculating changes + tsKey types.TipSetKey + parentTsKey types.TipSetKey + + // market actor specific + state market.State +} + type actorInfo struct { stateroot cid.Cid tsKey types.TipSetKey @@ -313,23 +328,28 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. log.Infof("Getting actor change info") + // highly likely that the market actor will change at every epoch + marketActorChanges := make(map[types.TipSetKey]*marketStateInfo, len(changes)) + minerChanges := 0 for addr, m := range actors { for actor, c := range m { - // reward actor - if actor.Code == builtin.RewardActorCodeID { - rewardTips[c.tsKey] = &rewardStateInfo{ - stateroot: c.stateroot, - baselinePower: big.Zero(), - } + // only want actors with head change events + if _, found := headsSeen[actor.Head]; found { continue } + headsSeen[actor.Head] = struct{}{} - // miner actors with head change events - if actor.Code == builtin.StorageMinerActorCodeID { - if _, found := headsSeen[actor.Head]; found { - continue + switch actor.Code { + case builtin.StorageMarketActorCodeID: + marketActorChanges[c.tsKey] = &marketStateInfo{ + act: actor, + stateroot: c.stateroot, + tsKey: c.tsKey, + parentTsKey: c.parentTsKey, + state: market.State{}, } + case builtin.StorageMinerActorCodeID: minerChanges++ minerTips[c.tsKey] = append(minerTips[c.tsKey], &minerStateInfo{ @@ -346,10 +366,14 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. rawPower: big.Zero(), qalPower: big.Zero(), }) - - headsSeen[actor.Head] = struct{}{} + // reward actor + case builtin.RewardActorCodeID: + rewardTips[c.tsKey] = &rewardStateInfo{ + stateroot: c.stateroot, + baselinePower: big.Zero(), + } } - continue + } } @@ -435,6 +459,21 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. }) log.Infow("Completed Miner Processing", "duration", time.Since(minerProcessingStartedAt).String(), "processed", minerChanges) + log.Info("Getting market actor info") + // TODO: consider taking the min of the array length and using that for concurrency param, e.g: + // concurrency := math.Min(len(marketActorChanges), 50) + parmap.Par(50, parmap.MapArr(marketActorChanges), func(mrktInfo *marketStateInfo) { + astb, err := api.ChainReadObj(ctx, mrktInfo.act.Head) + if err != nil { + log.Error(err) + return + } + if err := mrktInfo.state.UnmarshalCBOR(bytes.NewReader(astb)); err != nil { + log.Error(err) + return + } + }) + log.Info("Getting receipts") receipts := fetchParentReceipts(ctx, api, toSync) @@ -496,6 +535,17 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. return } + log.Info("Storing market actor info") + if err := st.storeMarketActorDealProposals(marketActorChanges, api); err != nil { + log.Error(err) + return + } + + if err := st.storeMarketActorDealStates(marketActorChanges, api); err != nil { + log.Error(err) + return + } + log.Infof("Storing messages") if err := st.storeMessages(msgs); err != nil { From 1e2e62bad66f926048ddbaa4cd96bcb764114fdc Mon Sep 17 00:00:00 2001 From: frrist Date: Thu, 9 Jul 2020 10:49:02 -0700 Subject: [PATCH 05/10] polish: track unpadded piece size in deal prop --- cmd/lotus-chainwatch/storage.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/cmd/lotus-chainwatch/storage.go b/cmd/lotus-chainwatch/storage.go index c9e66a6c2..6efa3d86a 100644 --- a/cmd/lotus-chainwatch/storage.go +++ b/cmd/lotus-chainwatch/storage.go @@ -359,7 +359,8 @@ create table if not exists market_deal_proposals state_root text not null, piece_cid text not null, - piece_size bigint not null, + padded_piece_size bigint not null, + unpadded_piece_size bigint not null, verified_deal bool not null, client_id text not null, @@ -1005,7 +1006,7 @@ func (st *storage) storeMarketActorDealProposals(marketTips map[types.TipSetKey] return xerrors.Errorf("prep temp: %w", err) } - stmt, err := tx.Prepare(`copy mdp (deal_id, state_root, piece_cid, piece_size, verified_deal, client_id, provider_id, start_epoch, end_epoch, storage_price_per_epoch, provider_collateral, client_collateral) from STDIN`) + stmt, err := tx.Prepare(`copy mdp (deal_id, state_root, piece_cid, padded_piece_size, unpadded_piece_size, verified_deal, client_id, provider_id, start_epoch, end_epoch, storage_price_per_epoch, provider_collateral, client_collateral) from STDIN`) if err != nil { return err } @@ -1027,6 +1028,7 @@ func (st *storage) storeMarketActorDealProposals(marketTips map[types.TipSetKey] mt.stateroot.String(), ds.Proposal.PieceCID.String(), ds.Proposal.PieceSize, + ds.Proposal.PieceSize.Unpadded(), ds.Proposal.VerifiedDeal, ds.Proposal.Client.String(), ds.Proposal.Provider.String(), From f9d8b051f4c2894d5e08a06b412e00cf7f9160e2 Mon Sep 17 00:00:00 2001 From: frrist Date: Thu, 9 Jul 2020 14:18:19 -0700 Subject: [PATCH 06/10] polish: track tipset height for processing --- cmd/lotus-chainwatch/storage.go | 29 ++++++++++++++++------------- cmd/lotus-chainwatch/sync.go | 26 +++++++++++++++++++++++--- 2 files changed, 39 insertions(+), 16 deletions(-) diff --git a/cmd/lotus-chainwatch/storage.go b/cmd/lotus-chainwatch/storage.go index 6efa3d86a..5b0f089a2 100644 --- a/cmd/lotus-chainwatch/storage.go +++ b/cmd/lotus-chainwatch/storage.go @@ -381,14 +381,14 @@ create table if not exists market_deal_states ( deal_id bigint not null, - state_root text not null, - sector_start_epoch bigint not null, last_update_epoch bigint not null, slash_epoch bigint not null, - + + state_root text not null, + constraint market_deal_states_pk - primary key (deal_id) + primary key (deal_id,sector_start_epoch,last_update_epoch,slash_epoch) ); @@ -948,7 +948,7 @@ func (st *storage) updateMinerSectors(minerTips map[types.TipSetKey][]*minerStat return updateTx.Commit() } -func (st *storage) storeMarketActorDealStates(marketTips map[types.TipSetKey]*marketStateInfo, api api.FullNode) error { +func (st *storage) storeMarketActorDealStates(marketTips map[types.TipSetKey]*marketStateInfo, tipHeights []tipsetKeyHeight, api api.FullNode) error { tx, err := st.db.Begin() if err != nil { return err @@ -956,12 +956,14 @@ func (st *storage) storeMarketActorDealStates(marketTips map[types.TipSetKey]*ma if _, err := tx.Exec(`create temp table mds (like market_deal_states excluding constraints) on commit drop;`); err != nil { return err } - stmt, err := tx.Prepare(`copy mds (deal_id, state_root, sector_start_epoch, last_update_epoch, slash_epoch) from STDIN`) + stmt, err := tx.Prepare(`copy mds (deal_id, sector_start_epoch, last_update_epoch, slash_epoch, state_root) from STDIN`) if err != nil { return err } - for tskey, mt := range marketTips { - dealStates, err := api.StateMarketDeals(context.TODO(), tskey) + for _, th := range tipHeights { + mt := marketTips[th.tsKey] + log.Infow("store deal state", "height", th.height, "tipset", th.tsKey, "minerTS", mt.tsKey) + dealStates, err := api.StateMarketDeals(context.TODO(), mt.tsKey) if err != nil { return err } @@ -974,10 +976,10 @@ func (st *storage) storeMarketActorDealStates(marketTips map[types.TipSetKey]*ma if _, err := stmt.Exec( id, - mt.stateroot.String(), ds.State.SectorStartEpoch, ds.State.LastUpdatedEpoch, ds.State.SlashEpoch, + mt.stateroot.String(), ); err != nil { return err } @@ -993,10 +995,9 @@ func (st *storage) storeMarketActorDealStates(marketTips map[types.TipSetKey]*ma } return tx.Commit() - } -func (st *storage) storeMarketActorDealProposals(marketTips map[types.TipSetKey]*marketStateInfo, api api.FullNode) error { +func (st *storage) storeMarketActorDealProposals(marketTips map[types.TipSetKey]*marketStateInfo, tipHeights []tipsetKeyHeight, api api.FullNode) error { tx, err := st.db.Begin() if err != nil { return err @@ -1011,8 +1012,10 @@ func (st *storage) storeMarketActorDealProposals(marketTips map[types.TipSetKey] return err } - for tskey, mt := range marketTips { - dealStates, err := api.StateMarketDeals(context.TODO(), tskey) + // insert in sorted order (lowest height -> highest height) since dealid is pk of table. + for _, th := range tipHeights { + mt := marketTips[th.tsKey] + dealStates, err := api.StateMarketDeals(context.TODO(), mt.tsKey) if err != nil { return err } diff --git a/cmd/lotus-chainwatch/sync.go b/cmd/lotus-chainwatch/sync.go index 14b761ddb..45dcde1b8 100644 --- a/cmd/lotus-chainwatch/sync.go +++ b/cmd/lotus-chainwatch/sync.go @@ -6,8 +6,8 @@ import ( "context" "encoding/json" "fmt" - "github.com/filecoin-project/specs-actors/actors/builtin/market" "math" + "sort" "sync" "time" @@ -19,6 +19,7 @@ import ( "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi/big" "github.com/filecoin-project/specs-actors/actors/builtin" + "github.com/filecoin-project/specs-actors/actors/builtin/market" "github.com/filecoin-project/specs-actors/actors/builtin/miner" "github.com/filecoin-project/specs-actors/actors/builtin/power" "github.com/filecoin-project/specs-actors/actors/builtin/reward" @@ -103,6 +104,11 @@ type actorInfo struct { state string } +type tipsetKeyHeight struct { + height abi.ChainEpoch + tsKey types.TipSetKey +} + func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.TipSet, maxBatch int) { var alk sync.Mutex @@ -184,6 +190,9 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. log.Infow("Starting Sync", "height", minH, "numBlocks", len(toSync), "maxBatch", maxBatch) + // relate tipset keys to height so they may be processed in ascending order. + var tipHeights []tipsetKeyHeight + tipsSeen := make(map[types.TipSetKey]struct{}) // map of addresses to changed actors var changes map[string]types.Actor // collect all actor state that has changes between block headers @@ -291,9 +300,20 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. parentTsKey: pts.Parents(), } addressToID[addr] = address.Undef + if _, ok := tipsSeen[pts.Key()]; !ok { + tipHeights = append(tipHeights, tipsetKeyHeight{ + height: pts.Height(), + tsKey: pts.Key(), + }) + } + tipsSeen[pts.Key()] = struct{}{} alk.Unlock() } }) + // sort tipHeights in ascending order. + sort.Slice(tipHeights, func(i, j int) bool { + return tipHeights[i].height < tipHeights[j].height + }) // map of tipset to reward state rewardTips := make(map[types.TipSetKey]*rewardStateInfo, len(changes)) @@ -536,12 +556,12 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. } log.Info("Storing market actor info") - if err := st.storeMarketActorDealProposals(marketActorChanges, api); err != nil { + if err := st.storeMarketActorDealProposals(marketActorChanges, tipHeights, api); err != nil { log.Error(err) return } - if err := st.storeMarketActorDealStates(marketActorChanges, api); err != nil { + if err := st.storeMarketActorDealStates(marketActorChanges, tipHeights, api); err != nil { log.Error(err) return } From c9cb39d5a0cd6370294a2e252025892d4ac9c0b8 Mon Sep 17 00:00:00 2001 From: frrist Date: Fri, 10 Jul 2020 16:55:39 -0700 Subject: [PATCH 07/10] polish: update deal proposal table if slashed --- cmd/lotus-chainwatch/storage.go | 68 +++++++++++++++++++++++++++++++-- cmd/lotus-chainwatch/sync.go | 9 ++++- 2 files changed, 72 insertions(+), 5 deletions(-) diff --git a/cmd/lotus-chainwatch/storage.go b/cmd/lotus-chainwatch/storage.go index 5b0f089a2..6bc81fbe2 100644 --- a/cmd/lotus-chainwatch/storage.go +++ b/cmd/lotus-chainwatch/storage.go @@ -361,13 +361,14 @@ create table if not exists market_deal_proposals piece_cid text not null, padded_piece_size bigint not null, unpadded_piece_size bigint not null, - verified_deal bool not null, + is_verified bool not null, client_id text not null, provider_id text not null, start_epoch bigint not null, end_epoch bigint not null, + slashed_epoch bigint, storage_price_per_epoch text not null, provider_collateral text not null, @@ -387,8 +388,10 @@ create table if not exists market_deal_states state_root text not null, + unique (deal_id, sector_start_epoch, last_update_epoch, slash_epoch), + constraint market_deal_states_pk - primary key (deal_id,sector_start_epoch,last_update_epoch,slash_epoch) + primary key (deal_id, state_root) ); @@ -949,6 +952,10 @@ func (st *storage) updateMinerSectors(minerTips map[types.TipSetKey][]*minerStat } func (st *storage) storeMarketActorDealStates(marketTips map[types.TipSetKey]*marketStateInfo, tipHeights []tipsetKeyHeight, api api.FullNode) error { + start := time.Now() + defer func() { + log.Infow("Stored Market Deal States", "duration", time.Since(start).String()) + }() tx, err := st.db.Begin() if err != nil { return err @@ -962,7 +969,6 @@ func (st *storage) storeMarketActorDealStates(marketTips map[types.TipSetKey]*ma } for _, th := range tipHeights { mt := marketTips[th.tsKey] - log.Infow("store deal state", "height", th.height, "tipset", th.tsKey, "minerTS", mt.tsKey) dealStates, err := api.StateMarketDeals(context.TODO(), mt.tsKey) if err != nil { return err @@ -998,6 +1004,10 @@ func (st *storage) storeMarketActorDealStates(marketTips map[types.TipSetKey]*ma } func (st *storage) storeMarketActorDealProposals(marketTips map[types.TipSetKey]*marketStateInfo, tipHeights []tipsetKeyHeight, api api.FullNode) error { + start := time.Now() + defer func() { + log.Infow("Stored Market Deal Proposals", "duration", time.Since(start).String()) + }() tx, err := st.db.Begin() if err != nil { return err @@ -1007,7 +1017,7 @@ func (st *storage) storeMarketActorDealProposals(marketTips map[types.TipSetKey] return xerrors.Errorf("prep temp: %w", err) } - stmt, err := tx.Prepare(`copy mdp (deal_id, state_root, piece_cid, padded_piece_size, unpadded_piece_size, verified_deal, client_id, provider_id, start_epoch, end_epoch, storage_price_per_epoch, provider_collateral, client_collateral) from STDIN`) + stmt, err := tx.Prepare(`copy mdp (deal_id, state_root, piece_cid, padded_piece_size, unpadded_piece_size, is_verified, client_id, provider_id, start_epoch, end_epoch, slashed_epoch, storage_price_per_epoch, provider_collateral, client_collateral) from STDIN`) if err != nil { return err } @@ -1037,6 +1047,7 @@ func (st *storage) storeMarketActorDealProposals(marketTips map[types.TipSetKey] ds.Proposal.Provider.String(), ds.Proposal.StartEpoch, ds.Proposal.EndEpoch, + nil, // slashed_epoch ds.Proposal.StoragePricePerEpoch.String(), ds.Proposal.ProviderCollateral.String(), ds.Proposal.ClientCollateral.String(), @@ -1057,6 +1068,55 @@ func (st *storage) storeMarketActorDealProposals(marketTips map[types.TipSetKey] } +func (st *storage) updateMarketActorDealProposals(marketTip map[types.TipSetKey]*marketStateInfo, tipHeights []tipsetKeyHeight, api api.FullNode) error { + start := time.Now() + defer func() { + log.Infow("Updated Market Deal Proposals", "duration", time.Since(start).String()) + }() + pred := state.NewStatePredicates(api) + + tx, err := st.db.Begin() + if err != nil { + return err + } + + stmt, err := tx.Prepare(`update market_deal_proposals set slashed_epoch=$1 where deal_id=$2`) + if err != nil { + return err + } + + for _, th := range tipHeights { + mt := marketTip[th.tsKey] + stateDiff := pred.OnStorageMarketActorChanged(pred.OnDealStateChanged(pred.OnDealStateAmtChanged())) + + changed, val, err := stateDiff(context.TODO(), mt.parentTsKey, mt.tsKey) + if err != nil { + log.Warnw("error getting market deal state diff", "error", err) + } + if !changed { + continue + } + changes, ok := val.(*state.MarketDealStateChanges) + if !ok { + return xerrors.Errorf("Unknown type returned by Deal State AMT predicate: %T", val) + } + + for _, modified := range changes.Modified { + if modified.From.SlashEpoch != modified.To.SlashEpoch { + if _, err := stmt.Exec(modified.To.SlashEpoch, modified.ID); err != nil { + return err + } + } + } + } + + if err := stmt.Close(); err != nil { + return err + } + + return tx.Commit() +} + func (st *storage) storeHeaders(bhs map[cid.Cid]*types.BlockHeader, sync bool) error { st.headerLk.Lock() defer st.headerLk.Unlock() diff --git a/cmd/lotus-chainwatch/sync.go b/cmd/lotus-chainwatch/sync.go index 45dcde1b8..7d48ce1c9 100644 --- a/cmd/lotus-chainwatch/sync.go +++ b/cmd/lotus-chainwatch/sync.go @@ -555,17 +555,24 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. return } - log.Info("Storing market actor info") + log.Info("Storing market actor deal proposal info") if err := st.storeMarketActorDealProposals(marketActorChanges, tipHeights, api); err != nil { log.Error(err) return } + log.Info("Storing market actor deal state info") if err := st.storeMarketActorDealStates(marketActorChanges, tipHeights, api); err != nil { log.Error(err) return } + log.Info("Updating market actor deal proposal info") + if err := st.updateMarketActorDealProposals(marketActorChanges, tipHeights, api); err != nil { + log.Error(err) + return + } + log.Infof("Storing messages") if err := st.storeMessages(msgs); err != nil { From bb4954672823e593fe9bd353e1ea65499f9e837f Mon Sep 17 00:00:00 2001 From: "erchuan.ma" <450595468@qq.com> Date: Mon, 13 Jul 2020 22:35:51 +0800 Subject: [PATCH 08/10] api/api_full.go: fix method signature method signature not match --- api/api_full.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/api/api_full.go b/api/api_full.go index 9718eebb8..93f9d8fd4 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -52,7 +52,7 @@ type FullNode interface { // the specified block. ChainGetParentReceipts(ctx context.Context, blockCid cid.Cid) ([]*types.MessageReceipt, error) - // ChainGetParentReceipts returns messages stored in parent tipset of the + // ChainGetParentMessages returns messages stored in parent tipset of the // specified block. ChainGetParentMessages(ctx context.Context, blockCid cid.Cid) ([]Message, error) @@ -305,7 +305,7 @@ type FullNode interface { // MsigGetAvailableBalance returns the portion of a multisig's balance that can be withdrawn or spent MsigGetAvailableBalance(context.Context, address.Address, types.TipSetKey) (types.BigInt, error) - // MsigGetAvailableBalance creates a multisig wallet + // MsigCreate creates a multisig wallet // It takes the following params: , , //, , MsigCreate(context.Context, int64, []address.Address, abi.ChainEpoch, types.BigInt, address.Address, types.BigInt) (cid.Cid, error) From 17642b5ecbbb9489d57bbeda4131ab9a5b78020b Mon Sep 17 00:00:00 2001 From: Aayush Rajasekaran Date: Mon, 13 Jul 2020 19:50:18 -0400 Subject: [PATCH 09/10] clarify multisig create helptext --- cli/multisig.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cli/multisig.go b/cli/multisig.go index 125942054..ac3f6364d 100644 --- a/cli/multisig.go +++ b/cli/multisig.go @@ -51,7 +51,8 @@ var msigCreateCmd = &cli.Command{ ArgsUsage: "[address1 address2 ...]", Flags: []cli.Flag{ &cli.Int64Flag{ - Name: "required", + Name: "required", + Usage: "number of required approvals (uses number of signers provided if omitted)", }, &cli.StringFlag{ Name: "value", From 21ffe188f452e2eb59452ea6adcf462052de935f Mon Sep 17 00:00:00 2001 From: Lucas Molas Date: Wed, 15 Jul 2020 08:17:50 -0300 Subject: [PATCH 10/10] fix: NewTipSet: check mismatch in number of parents --- chain/types/tipset.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/chain/types/tipset.go b/chain/types/tipset.go index 09483dc5e..57ab91787 100644 --- a/chain/types/tipset.go +++ b/chain/types/tipset.go @@ -112,6 +112,10 @@ func NewTipSet(blks []*BlockHeader) (*TipSet, error) { return nil, fmt.Errorf("cannot create tipset with mismatching heights") } + if len(blks[0].Parents) != len(b.Parents) { + return nil, fmt.Errorf("cannot create tipset with mismatching number of parents") + } + for i, cid := range b.Parents { if cid != blks[0].Parents[i] { return nil, fmt.Errorf("cannot create tipset with mismatching parents")