polish: track tipset height for processing

This commit is contained in:
frrist 2020-07-09 14:18:19 -07:00
parent 1e2e62bad6
commit f9d8b051f4
2 changed files with 39 additions and 16 deletions

View File

@ -381,14 +381,14 @@ create table if not exists market_deal_states
( (
deal_id bigint not null, deal_id bigint not null,
state_root text not null,
sector_start_epoch bigint not null, sector_start_epoch bigint not null,
last_update_epoch bigint not null, last_update_epoch bigint not null,
slash_epoch bigint not null, slash_epoch bigint not null,
state_root text not null,
constraint market_deal_states_pk constraint market_deal_states_pk
primary key (deal_id) primary key (deal_id,sector_start_epoch,last_update_epoch,slash_epoch)
); );
@ -948,7 +948,7 @@ func (st *storage) updateMinerSectors(minerTips map[types.TipSetKey][]*minerStat
return updateTx.Commit() return updateTx.Commit()
} }
func (st *storage) storeMarketActorDealStates(marketTips map[types.TipSetKey]*marketStateInfo, api api.FullNode) error { func (st *storage) storeMarketActorDealStates(marketTips map[types.TipSetKey]*marketStateInfo, tipHeights []tipsetKeyHeight, api api.FullNode) error {
tx, err := st.db.Begin() tx, err := st.db.Begin()
if err != nil { if err != nil {
return err return err
@ -956,12 +956,14 @@ func (st *storage) storeMarketActorDealStates(marketTips map[types.TipSetKey]*ma
if _, err := tx.Exec(`create temp table mds (like market_deal_states excluding constraints) on commit drop;`); err != nil { if _, err := tx.Exec(`create temp table mds (like market_deal_states excluding constraints) on commit drop;`); err != nil {
return err return err
} }
stmt, err := tx.Prepare(`copy mds (deal_id, state_root, sector_start_epoch, last_update_epoch, slash_epoch) from STDIN`) stmt, err := tx.Prepare(`copy mds (deal_id, sector_start_epoch, last_update_epoch, slash_epoch, state_root) from STDIN`)
if err != nil { if err != nil {
return err return err
} }
for tskey, mt := range marketTips { for _, th := range tipHeights {
dealStates, err := api.StateMarketDeals(context.TODO(), tskey) mt := marketTips[th.tsKey]
log.Infow("store deal state", "height", th.height, "tipset", th.tsKey, "minerTS", mt.tsKey)
dealStates, err := api.StateMarketDeals(context.TODO(), mt.tsKey)
if err != nil { if err != nil {
return err return err
} }
@ -974,10 +976,10 @@ func (st *storage) storeMarketActorDealStates(marketTips map[types.TipSetKey]*ma
if _, err := stmt.Exec( if _, err := stmt.Exec(
id, id,
mt.stateroot.String(),
ds.State.SectorStartEpoch, ds.State.SectorStartEpoch,
ds.State.LastUpdatedEpoch, ds.State.LastUpdatedEpoch,
ds.State.SlashEpoch, ds.State.SlashEpoch,
mt.stateroot.String(),
); err != nil { ); err != nil {
return err return err
} }
@ -993,10 +995,9 @@ func (st *storage) storeMarketActorDealStates(marketTips map[types.TipSetKey]*ma
} }
return tx.Commit() return tx.Commit()
} }
func (st *storage) storeMarketActorDealProposals(marketTips map[types.TipSetKey]*marketStateInfo, api api.FullNode) error { func (st *storage) storeMarketActorDealProposals(marketTips map[types.TipSetKey]*marketStateInfo, tipHeights []tipsetKeyHeight, api api.FullNode) error {
tx, err := st.db.Begin() tx, err := st.db.Begin()
if err != nil { if err != nil {
return err return err
@ -1011,8 +1012,10 @@ func (st *storage) storeMarketActorDealProposals(marketTips map[types.TipSetKey]
return err return err
} }
for tskey, mt := range marketTips { // insert in sorted order (lowest height -> highest height) since dealid is pk of table.
dealStates, err := api.StateMarketDeals(context.TODO(), tskey) for _, th := range tipHeights {
mt := marketTips[th.tsKey]
dealStates, err := api.StateMarketDeals(context.TODO(), mt.tsKey)
if err != nil { if err != nil {
return err return err
} }

View File

@ -6,8 +6,8 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/filecoin-project/specs-actors/actors/builtin/market"
"math" "math"
"sort"
"sync" "sync"
"time" "time"
@ -19,6 +19,7 @@ import (
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big" "github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/specs-actors/actors/builtin" "github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/filecoin-project/specs-actors/actors/builtin/market"
"github.com/filecoin-project/specs-actors/actors/builtin/miner" "github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/filecoin-project/specs-actors/actors/builtin/power" "github.com/filecoin-project/specs-actors/actors/builtin/power"
"github.com/filecoin-project/specs-actors/actors/builtin/reward" "github.com/filecoin-project/specs-actors/actors/builtin/reward"
@ -103,6 +104,11 @@ type actorInfo struct {
state string state string
} }
type tipsetKeyHeight struct {
height abi.ChainEpoch
tsKey types.TipSetKey
}
func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.TipSet, maxBatch int) { func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.TipSet, maxBatch int) {
var alk sync.Mutex var alk sync.Mutex
@ -184,6 +190,9 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
log.Infow("Starting Sync", "height", minH, "numBlocks", len(toSync), "maxBatch", maxBatch) log.Infow("Starting Sync", "height", minH, "numBlocks", len(toSync), "maxBatch", maxBatch)
// relate tipset keys to height so they may be processed in ascending order.
var tipHeights []tipsetKeyHeight
tipsSeen := make(map[types.TipSetKey]struct{})
// map of addresses to changed actors // map of addresses to changed actors
var changes map[string]types.Actor var changes map[string]types.Actor
// collect all actor state that has changes between block headers // collect all actor state that has changes between block headers
@ -291,9 +300,20 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
parentTsKey: pts.Parents(), parentTsKey: pts.Parents(),
} }
addressToID[addr] = address.Undef addressToID[addr] = address.Undef
if _, ok := tipsSeen[pts.Key()]; !ok {
tipHeights = append(tipHeights, tipsetKeyHeight{
height: pts.Height(),
tsKey: pts.Key(),
})
}
tipsSeen[pts.Key()] = struct{}{}
alk.Unlock() alk.Unlock()
} }
}) })
// sort tipHeights in ascending order.
sort.Slice(tipHeights, func(i, j int) bool {
return tipHeights[i].height < tipHeights[j].height
})
// map of tipset to reward state // map of tipset to reward state
rewardTips := make(map[types.TipSetKey]*rewardStateInfo, len(changes)) rewardTips := make(map[types.TipSetKey]*rewardStateInfo, len(changes))
@ -536,12 +556,12 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
} }
log.Info("Storing market actor info") log.Info("Storing market actor info")
if err := st.storeMarketActorDealProposals(marketActorChanges, api); err != nil { if err := st.storeMarketActorDealProposals(marketActorChanges, tipHeights, api); err != nil {
log.Error(err) log.Error(err)
return return
} }
if err := st.storeMarketActorDealStates(marketActorChanges, api); err != nil { if err := st.storeMarketActorDealStates(marketActorChanges, tipHeights, api); err != nil {
log.Error(err) log.Error(err)
return return
} }