pair: work from pair with placer

This commit is contained in:
frrist 2020-06-30 13:22:58 -07:00
parent c16ea42fad
commit 8c54c09ab7
2 changed files with 93 additions and 63 deletions

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"database/sql" "database/sql"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/util/adt"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
"sync" "sync"
"time" "time"
@ -258,35 +259,47 @@ create index if not exists receipts_msg_state_index
create table if not exists miner_sectors create table if not exists miner_sectors
( (
miner text not null, miner_id text not null,
sectorid bigint not null, sector_id bigint not null,
activation bigint not null, activation_epoch bigint not null,
dealweight bigint not null, expiration_epoch bigint not null,
verifieddealweight bigint not null, deal_weight text not null,
expiration bigint not null, verified_deal_weight text not null,
sealcid text not null, seal_cid text not null,
sealrandepoch bigint not null, seal_rand_epoch bigint not null,
constraint miner_sectors_pk constraint miner_sectors_pk
primary key (miner, sectorid) primary key (miner_id, sector_id)
); );
create index if not exists miner_sectors_miner_sectorid_index create index if not exists miner_sectors_miner_sectorid_index
on miner_sectors (miner, sectorid); on miner_sectors (miner_id, sector_id);
create table if not exists miner_heads create table if not exists miner_info
( (
addr text not null, miner_id text not null,
owner text not null, owner_addr text not null,
worker text not null, worker_addr text not null,
peerid text, peer_id text,
sectorsize text not null, sector_size text not null,
windowpostpartitionsectors bigint not null,
precommit_deposits text not null,
locked_funds text not null,
next_deadline_process_faults bigint not null,
constraint miner_info_pk
primary key (miner_id)
);
/* used to tell when a miners sectors (proven-not-yet-expired) changed if the miner_sectors_cid's are different a new sector was added or removed (terminated/expired) */
create table if not exists miner_sectors_heads
(
miner_id text not null,
miner_sectors_cid text not null,
state_root text not null,
constraint miner_sectors_heads_pk
primary key (miner_id,miner_sectors_cid)
precommitdeposits text not null,
lockedfunds text not null,
nextdeadlineprocessfaults bigint not null,
constraint miner_heads_pk
primary key (addr)
); );
/* /*
@ -473,7 +486,7 @@ type storeSectorsAPI interface {
StateMinerSectors(context.Context, address.Address, *abi.BitField, bool, types.TipSetKey) ([]*api.ChainSectorInfo, error) StateMinerSectors(context.Context, address.Address, *abi.BitField, bool, types.TipSetKey) ([]*api.ChainSectorInfo, error)
} }
func (st *storage) storeSectors(minerTips map[types.TipSetKey][]*newMinerInfo, sectorApi storeSectorsAPI) error { func (st *storage) storeSectors(minerTips map[types.TipSetKey][]*minerStateInfo, sectorApi storeSectorsAPI) error {
tx, err := st.db.Begin() tx, err := st.db.Begin()
if err != nil { if err != nil {
return err return err
@ -483,7 +496,7 @@ func (st *storage) storeSectors(minerTips map[types.TipSetKey][]*newMinerInfo, s
return xerrors.Errorf("prep temp: %w", err) return xerrors.Errorf("prep temp: %w", err)
} }
stmt, err := tx.Prepare(`copy ms (miner, sectorid, activation, dealweight, verifieddealweight, expiration, sealcid, sealrandepoch) from STDIN `) stmt, err := tx.Prepare(`copy ms (miner_id, sector_id, activation_epoch, expiration_epoch, deal_weight, verified_deal_weight, seal_cid, seal_rand_epoch) from STDIN`)
if err != nil { if err != nil {
return err return err
} }
@ -500,9 +513,9 @@ func (st *storage) storeSectors(minerTips map[types.TipSetKey][]*newMinerInfo, s
miner.addr.String(), miner.addr.String(),
uint64(sector.ID), uint64(sector.ID),
int64(sector.Info.ActivationEpoch), int64(sector.Info.ActivationEpoch),
sector.Info.DealWeight.Uint64(),
sector.Info.VerifiedDealWeight.Uint64(),
int64(sector.Info.Info.Expiration), int64(sector.Info.Info.Expiration),
sector.Info.DealWeight.String(),
sector.Info.VerifiedDealWeight.String(),
sector.Info.Info.SealedCID.String(), sector.Info.Info.SealedCID.String(),
int64(sector.Info.Info.SealRandEpoch), int64(sector.Info.Info.SealRandEpoch),
); err != nil { ); err != nil {
@ -523,22 +536,17 @@ func (st *storage) storeSectors(minerTips map[types.TipSetKey][]*newMinerInfo, s
return tx.Commit() return tx.Commit()
} }
func (st *storage) storeMiners(minerTips map[types.TipSetKey][]*newMinerInfo) error { func (st *storage) storeMiners(minerTips map[types.TipSetKey][]*minerStateInfo) error {
tx, err := st.db.Begin() tx, err := st.db.Begin()
if err != nil { if err != nil {
return err return err
} }
if _, err := tx.Exec(` if _, err := tx.Exec(`create temp table mi (like miner_info excluding constraints) on commit drop;`); err != nil {
create temp table mh (like miner_heads excluding constraints) on commit drop;
`); err != nil {
return xerrors.Errorf("prep temp: %w", err) return xerrors.Errorf("prep temp: %w", err)
} }
stmt, err := tx.Prepare(`copy mh (addr, owner, worker, peerid, sectorsize, windowpostpartitionsectors, precommitdeposits, lockedfunds, nextdeadlineprocessfaults) from STDIN`) stmt, err := tx.Prepare(`copy mi (miner_id, owner_addr, worker_addr, peer_id, sector_size, precommit_deposits, locked_funds, next_deadline_process_faults) from STDIN`)
if err != nil { if err != nil {
return err return err
} }
@ -560,14 +568,11 @@ func (st *storage) storeMiners(minerTips map[types.TipSetKey][]*newMinerInfo) er
miner.info.Worker.String(), miner.info.Worker.String(),
pid, pid,
miner.info.SectorSize.ShortString(), miner.info.SectorSize.ShortString(),
miner.info.WindowPoStPartitionSectors,
miner.state.PreCommitDeposits.String(), miner.state.PreCommitDeposits.String(),
miner.state.LockedFunds.String(), miner.state.LockedFunds.String(),
miner.state.NextDeadlineToProcessFaults, miner.state.NextDeadlineToProcessFaults,
); err != nil { ); err != nil {
log.Errorw("failed to store miner state", miner.state) log.Errorw("failed to store miner state", "state", miner.state, "info", miner.info, "error", err)
log.Errorw("failed to store miner info", miner.info)
return err return err
} }
@ -577,7 +582,46 @@ func (st *storage) storeMiners(minerTips map[types.TipSetKey][]*newMinerInfo) er
return err return err
} }
if _, err := tx.Exec(`insert into miner_heads select * from mh on conflict do nothing `); err != nil { if _, err := tx.Exec(`insert into miner_info select * from mi on conflict do nothing `); err != nil {
return xerrors.Errorf("actor put: %w", err)
}
return tx.Commit()
}
func (st *storage) storeMinerSectorsHeads(minerTips map[types.TipSetKey][]*minerStateInfo) error {
tx, err := st.db.Begin()
if err != nil {
return err
}
if _, err := tx.Exec(`create temp table msh (like miner_sectors_heads excluding constraints) on commit drop;`); err != nil {
return xerrors.Errorf("prep temp: %w", err)
}
stmt, err := tx.Prepare(`copy msh (miner_id, miner_sectors_cid, state_root) from STDIN`)
if err != nil {
return err
}
for _, miners := range minerTips {
for _, miner := range miners {
if _, err := stmt.Exec(
miner.addr.String(),
miner.state.Sectors.String(),
miner.stateroot.String(),
); err != nil {
log.Errorw("failed to store miners sectors head", "state", miner.state, "info", miner.info, "error", err)
return err
}
}
}
if err := stmt.Close(); err != nil {
return err
}
if _, err := tx.Exec(`insert into miner_sectors_heads select * from msh on conflict do nothing `); err != nil {
return xerrors.Errorf("actor put: %w", err) return xerrors.Errorf("actor put: %w", err)
} }

View File

@ -53,24 +53,7 @@ func runSyncer(ctx context.Context, api api.FullNode, st *storage, maxBatch int)
}() }()
} }
type minerKey struct { type minerStateInfo struct {
addr address.Address
act types.Actor
stateroot cid.Cid
tsKey types.TipSetKey
}
type minerInfo struct {
state miner.State
info miner.MinerInfo
rawPower big.Int
qalPower big.Int
ssize uint64
psize uint64
}
type newMinerInfo struct {
// common // common
addr address.Address addr address.Address
act types.Actor act types.Actor
@ -281,7 +264,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
}) })
// map of tipset to all miners that had a head-change at that tipset. // map of tipset to all miners that had a head-change at that tipset.
minerTips := make(map[types.TipSetKey][]*newMinerInfo, len(changes)) minerTips := make(map[types.TipSetKey][]*minerStateInfo, len(changes))
// heads we've seen, im being paranoid // heads we've seen, im being paranoid
headsSeen := make(map[cid.Cid]struct{}, len(actors)) headsSeen := make(map[cid.Cid]struct{}, len(actors))
@ -324,7 +307,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
} }
minerChanges++ minerChanges++
minerTips[c.tsKey] = append(minerTips[c.tsKey], &newMinerInfo{ minerTips[c.tsKey] = append(minerTips[c.tsKey], &minerStateInfo{
addr: addr, addr: addr,
act: actor, act: actor,
stateroot: c.stateroot, stateroot: c.stateroot,
@ -344,7 +327,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
log.Infow("Processing miners", "numTips", len(minerTips), "numMinerChanges", minerChanges) log.Infow("Processing miners", "numTips", len(minerTips), "numMinerChanges", minerChanges)
// extract the power actor state at each tipset, loop over all miners that changed at said tipset and extract their // extract the power actor state at each tipset, loop over all miners that changed at said tipset and extract their
// claims from the power actor state. This ensures we only fetch the power actors state once for each tipset. // claims from the power actor state. This ensures we only fetch the power actors state once for each tipset.
parmap.Par(50, parmap.KVMapArr(minerTips), func(it func() (types.TipSetKey, []*newMinerInfo)) { parmap.Par(50, parmap.KVMapArr(minerTips), func(it func() (types.TipSetKey, []*minerStateInfo)) {
tsKey, minerInfo := it() tsKey, minerInfo := it()
// get the power actors claims map // get the power actors claims map
@ -421,16 +404,13 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
return return
} }
// TODO re-enable when ready to fill miner metadata, the contents of storeMiners is commented out too.
log.Info("Storing miners") log.Info("Storing miners")
if err := st.storeMiners(minerTips); err != nil { if err := st.storeMiners(minerTips); err != nil {
log.Error(err) log.Error(err)
return return
} }
log.Info("Storing miner sectors") log.Info("Storing miner sectors")
sectorStart := time.Now() sectorStart := time.Now()
if err := st.storeSectors(minerTips, api); err != nil { if err := st.storeSectors(minerTips, api); err != nil {
log.Error(err) log.Error(err)
@ -438,6 +418,12 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
} }
log.Infow("Finished storing miner sectors", "duration", time.Since(sectorStart).String()) log.Infow("Finished storing miner sectors", "duration", time.Since(sectorStart).String())
log.Info("Storing miner sectors heads")
if err := st.storeMinerSectorsHeads(minerTips); err != nil {
log.Error(err)
return
}
log.Infof("Storing messages") log.Infof("Storing messages")
if err := st.storeMessages(msgs); err != nil { if err := st.storeMessages(msgs); err != nil {