diff --git a/cmd/lotus-chainwatch/storage.go b/cmd/lotus-chainwatch/storage.go index 5b0f089a2..6bc81fbe2 100644 --- a/cmd/lotus-chainwatch/storage.go +++ b/cmd/lotus-chainwatch/storage.go @@ -361,13 +361,14 @@ create table if not exists market_deal_proposals piece_cid text not null, padded_piece_size bigint not null, unpadded_piece_size bigint not null, - verified_deal bool not null, + is_verified bool not null, client_id text not null, provider_id text not null, start_epoch bigint not null, end_epoch bigint not null, + slashed_epoch bigint, storage_price_per_epoch text not null, provider_collateral text not null, @@ -387,8 +388,10 @@ create table if not exists market_deal_states state_root text not null, + unique (deal_id, sector_start_epoch, last_update_epoch, slash_epoch), + constraint market_deal_states_pk - primary key (deal_id,sector_start_epoch,last_update_epoch,slash_epoch) + primary key (deal_id, state_root) ); @@ -949,6 +952,10 @@ func (st *storage) updateMinerSectors(minerTips map[types.TipSetKey][]*minerStat } func (st *storage) storeMarketActorDealStates(marketTips map[types.TipSetKey]*marketStateInfo, tipHeights []tipsetKeyHeight, api api.FullNode) error { + start := time.Now() + defer func() { + log.Infow("Stored Market Deal States", "duration", time.Since(start).String()) + }() tx, err := st.db.Begin() if err != nil { return err @@ -962,7 +969,6 @@ func (st *storage) storeMarketActorDealStates(marketTips map[types.TipSetKey]*ma } for _, th := range tipHeights { mt := marketTips[th.tsKey] - log.Infow("store deal state", "height", th.height, "tipset", th.tsKey, "minerTS", mt.tsKey) dealStates, err := api.StateMarketDeals(context.TODO(), mt.tsKey) if err != nil { return err @@ -998,6 +1004,10 @@ func (st *storage) storeMarketActorDealStates(marketTips map[types.TipSetKey]*ma } func (st *storage) storeMarketActorDealProposals(marketTips map[types.TipSetKey]*marketStateInfo, tipHeights []tipsetKeyHeight, api api.FullNode) error { + start := time.Now() + defer func() { + log.Infow("Stored Market Deal Proposals", "duration", time.Since(start).String()) + }() tx, err := st.db.Begin() if err != nil { return err @@ -1007,7 +1017,7 @@ func (st *storage) storeMarketActorDealProposals(marketTips map[types.TipSetKey] return xerrors.Errorf("prep temp: %w", err) } - stmt, err := tx.Prepare(`copy mdp (deal_id, state_root, piece_cid, padded_piece_size, unpadded_piece_size, verified_deal, client_id, provider_id, start_epoch, end_epoch, storage_price_per_epoch, provider_collateral, client_collateral) from STDIN`) + stmt, err := tx.Prepare(`copy mdp (deal_id, state_root, piece_cid, padded_piece_size, unpadded_piece_size, is_verified, client_id, provider_id, start_epoch, end_epoch, slashed_epoch, storage_price_per_epoch, provider_collateral, client_collateral) from STDIN`) if err != nil { return err } @@ -1037,6 +1047,7 @@ func (st *storage) storeMarketActorDealProposals(marketTips map[types.TipSetKey] ds.Proposal.Provider.String(), ds.Proposal.StartEpoch, ds.Proposal.EndEpoch, + nil, // slashed_epoch ds.Proposal.StoragePricePerEpoch.String(), ds.Proposal.ProviderCollateral.String(), ds.Proposal.ClientCollateral.String(), @@ -1057,6 +1068,55 @@ func (st *storage) storeMarketActorDealProposals(marketTips map[types.TipSetKey] } +func (st *storage) updateMarketActorDealProposals(marketTip map[types.TipSetKey]*marketStateInfo, tipHeights []tipsetKeyHeight, api api.FullNode) error { + start := time.Now() + defer func() { + log.Infow("Updated Market Deal Proposals", "duration", time.Since(start).String()) + }() + pred := state.NewStatePredicates(api) + + tx, err := st.db.Begin() + if err != nil { + return err + } + + stmt, err := tx.Prepare(`update market_deal_proposals set slashed_epoch=$1 where deal_id=$2`) + if err != nil { + return err + } + + for _, th := range tipHeights { + mt := marketTip[th.tsKey] + stateDiff := pred.OnStorageMarketActorChanged(pred.OnDealStateChanged(pred.OnDealStateAmtChanged())) + + changed, val, err := stateDiff(context.TODO(), mt.parentTsKey, mt.tsKey) + if err != nil { + log.Warnw("error getting market deal state diff", "error", err) + } + if !changed { + continue + } + changes, ok := val.(*state.MarketDealStateChanges) + if !ok { + return xerrors.Errorf("Unknown type returned by Deal State AMT predicate: %T", val) + } + + for _, modified := range changes.Modified { + if modified.From.SlashEpoch != modified.To.SlashEpoch { + if _, err := stmt.Exec(modified.To.SlashEpoch, modified.ID); err != nil { + return err + } + } + } + } + + if err := stmt.Close(); err != nil { + return err + } + + return tx.Commit() +} + func (st *storage) storeHeaders(bhs map[cid.Cid]*types.BlockHeader, sync bool) error { st.headerLk.Lock() defer st.headerLk.Unlock() diff --git a/cmd/lotus-chainwatch/sync.go b/cmd/lotus-chainwatch/sync.go index 45dcde1b8..7d48ce1c9 100644 --- a/cmd/lotus-chainwatch/sync.go +++ b/cmd/lotus-chainwatch/sync.go @@ -555,17 +555,24 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. return } - log.Info("Storing market actor info") + log.Info("Storing market actor deal proposal info") if err := st.storeMarketActorDealProposals(marketActorChanges, tipHeights, api); err != nil { log.Error(err) return } + log.Info("Storing market actor deal state info") if err := st.storeMarketActorDealStates(marketActorChanges, tipHeights, api); err != nil { log.Error(err) return } + log.Info("Updating market actor deal proposal info") + if err := st.updateMarketActorDealProposals(marketActorChanges, tipHeights, api); err != nil { + log.Error(err) + return + } + log.Infof("Storing messages") if err := st.storeMessages(msgs); err != nil {