fix: update miner_head table with new miner fields (#2142)

This commit is contained in:
Frrist 2020-06-30 10:26:41 -07:00 committed by GitHub
parent 6fe39ef065
commit c16ea42fad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 69 additions and 66 deletions

View File

@ -18,6 +18,9 @@ var log = logging.Logger("chainwatch")
func main() { func main() {
_ = logging.SetLogLevel("*", "INFO") _ = logging.SetLogLevel("*", "INFO")
if err := logging.SetLogLevel("rpc", "error"); err != nil {
panic(err)
}
log.Info("Starting chainwatch") log.Info("Starting chainwatch")

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"database/sql" "database/sql"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/libp2p/go-libp2p-core/peer"
"sync" "sync"
"time" "time"
@ -257,7 +258,6 @@ create index if not exists receipts_msg_state_index
create table if not exists miner_sectors create table if not exists miner_sectors
( (
stateroot text not null,
miner text not null, miner text not null,
sectorid bigint not null, sectorid bigint not null,
activation bigint not null, activation bigint not null,
@ -267,36 +267,29 @@ create table if not exists miner_sectors
sealcid text not null, sealcid text not null,
sealrandepoch bigint not null, sealrandepoch bigint not null,
constraint miner_sectors_pk constraint miner_sectors_pk
primary key (stateroot, miner, sectorid) primary key (miner, sectorid)
); );
create index if not exists miner_sectors_stateroot_miner_sectorid_index create index if not exists miner_sectors_miner_sectorid_index
on miner_sectors (stateroot, miner, sectorid); on miner_sectors (miner, sectorid);
/*
create table if not exists miner_heads create table if not exists miner_heads
( (
head text not null,
addr text not null, addr text not null,
stateroot text not null,
sectorset text not null,
setsize decimal not null,
provingset text not null,
provingsize decimal not null,
owner text not null, owner text not null,
worker text not null, worker text not null,
peerid text not null, peerid text,
sectorsize bigint not null, sectorsize text not null,
power decimal not null, windowpostpartitionsectors bigint not null,
active bool,
ppe bigint not null, precommitdeposits text not null,
slashed_at bigint not null, lockedfunds text not null,
nextdeadlineprocessfaults bigint not null,
constraint miner_heads_pk constraint miner_heads_pk
primary key (head, addr) primary key (addr)
); );
create index if not exists miner_heads_stateroot_index /*
on miner_heads (stateroot);
create or replace function miner_tips(epoch bigint) create or replace function miner_tips(epoch bigint)
returns table (head text, returns table (head text,
addr text, addr text,
@ -490,7 +483,7 @@ func (st *storage) storeSectors(minerTips map[types.TipSetKey][]*newMinerInfo, s
return xerrors.Errorf("prep temp: %w", err) return xerrors.Errorf("prep temp: %w", err)
} }
stmt, err := tx.Prepare(`copy ms (stateroot, miner, sectorid, activation, dealweight, verifieddealweight, expiration, sealcid, sealrandepoch) from STDIN `) stmt, err := tx.Prepare(`copy ms (miner, sectorid, activation, dealweight, verifieddealweight, expiration, sealcid, sealrandepoch) from STDIN `)
if err != nil { if err != nil {
return err return err
} }
@ -504,7 +497,6 @@ func (st *storage) storeSectors(minerTips map[types.TipSetKey][]*newMinerInfo, s
for _, sector := range sectors { for _, sector := range sectors {
if _, err := stmt.Exec( if _, err := stmt.Exec(
miner.stateroot.String(),
miner.addr.String(), miner.addr.String(),
uint64(sector.ID), uint64(sector.ID),
int64(sector.Info.ActivationEpoch), int64(sector.Info.ActivationEpoch),
@ -531,54 +523,65 @@ func (st *storage) storeSectors(minerTips map[types.TipSetKey][]*newMinerInfo, s
return tx.Commit() return tx.Commit()
} }
func (st *storage) storeMiners(miners map[minerKey]*minerInfo) error { func (st *storage) storeMiners(minerTips map[types.TipSetKey][]*newMinerInfo) error {
/*tx, err := st.db.Begin() tx, err := st.db.Begin()
if err != nil { if err != nil {
return err return err
} }
if _, err := tx.Exec(` if _, err := tx.Exec(`
create temp table mh (like miner_heads excluding constraints) on commit drop; create temp table mh (like miner_heads excluding constraints) on commit drop;
`); err != nil { `); err != nil {
return xerrors.Errorf("prep temp: %w", err) return xerrors.Errorf("prep temp: %w", err)
} }
stmt, err := tx.Prepare(`copy mh (head, addr, stateroot, sectorset, setsize, provingset, provingsize, owner, worker, peerid, sectorsize, power, ppe) from STDIN`) stmt, err := tx.Prepare(`copy mh (addr, owner, worker, peerid, sectorsize, windowpostpartitionsectors, precommitdeposits, lockedfunds, nextdeadlineprocessfaults) from STDIN`)
if err != nil { if err != nil {
return err return err
} }
for k, i := range miners { for ts, miners := range minerTips {
for _, miner := range miners {
var pid string
if len(miner.info.PeerId) != 0 {
peerid, err := peer.IDFromBytes(miner.info.PeerId)
if err != nil {
// this should "never happen", but if it does we should still store info about the miner.
log.Warnw("failed to decode peerID", "peerID (bytes)", miner.info.PeerId, "miner", miner.addr, "tipset", ts.String())
} else {
pid = peerid.String()
}
}
if _, err := stmt.Exec( if _, err := stmt.Exec(
k.act.Head.String(), miner.addr.String(),
k.addr.String(), miner.info.Owner.String(),
k.stateroot.String(), miner.info.Worker.String(),
i.state.Sectors.String(), pid,
fmt.Sprint(i.ssize), miner.info.SectorSize.ShortString(),
i.state.ProvingSet.String(), miner.info.WindowPoStPartitionSectors,
fmt.Sprint(i.psize),
i.info.Owner.String(), miner.state.PreCommitDeposits.String(),
i.info.Worker.String(), miner.state.LockedFunds.String(),
i.info.PeerId.String(), miner.state.NextDeadlineToProcessFaults,
i.info.SectorSize,
i.power.String(), // TODO: SPA
i.state.PoStState.ProvingPeriodStart,
); err != nil { ); err != nil {
log.Errorw("failed to store miner state", miner.state)
log.Errorw("failed to store miner info", miner.info)
return err return err
} }
}
if err := stmt.Close(); err != nil {
return err
}
if _, err := tx.Exec(`insert into miner_heads select * from mh on conflict do nothing `); err != nil {
return xerrors.Errorf("actor put: %w", err)
} }
}
if err := stmt.Close(); err != nil {
return err
}
return tx.Commit()*/ if _, err := tx.Exec(`insert into miner_heads select * from mh on conflict do nothing `); err != nil {
return nil return xerrors.Errorf("actor put: %w", err)
}
return tx.Commit()
} }
func (st *storage) storeHeaders(bhs map[cid.Cid]*types.BlockHeader, sync bool) error { func (st *storage) storeHeaders(bhs map[cid.Cid]*types.BlockHeader, sync bool) error {

View File

@ -123,7 +123,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
allToSync[bh.Cid()] = bh allToSync[bh.Cid()] = bh
if len(allToSync)%500 == 10 { if len(allToSync)%500 == 10 {
log.Infof("to visit: (%d) %s @%d", len(allToSync), bh.Cid(), bh.Height) log.Debugf("to visit: (%d) %s @%d", len(allToSync), bh.Cid(), bh.Height)
} }
if len(bh.Parents) == 0 { if len(bh.Parents) == 0 {
@ -276,7 +276,6 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
tsKey: pts.Key(), tsKey: pts.Key(),
} }
addressToID[addr] = address.Undef addressToID[addr] = address.Undef
//
alk.Unlock() alk.Unlock()
} }
}) })
@ -423,14 +422,12 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
} }
// TODO re-enable when ready to fill miner metadata, the contents of storeMiners is commented out too. // TODO re-enable when ready to fill miner metadata, the contents of storeMiners is commented out too.
/* log.Info("Storing miners")
log.Info("Storing miners")
if err := st.storeMiners(miners); err != nil { if err := st.storeMiners(minerTips); err != nil {
log.Error(err) log.Error(err)
return return
} }
*/
log.Info("Storing miner sectors") log.Info("Storing miner sectors")