Merge remote-tracking branch 'origin/master' into next
This commit is contained in:
commit
8ce35e30dd
@ -1073,17 +1073,14 @@ func extractSyncState(ctx context.Context) *SyncerState {
|
|||||||
|
|
||||||
// collectHeaders collects the headers from the blocks between any two tipsets.
|
// collectHeaders collects the headers from the blocks between any two tipsets.
|
||||||
//
|
//
|
||||||
// `from` is the heaviest/projected/target tipset we have learned about, and
|
// `incoming` is the heaviest/projected/target tipset we have learned about, and
|
||||||
// `to` is usually an anchor tipset we already have in our view of the chain
|
// `known` is usually an anchor tipset we already have in our view of the chain
|
||||||
// (which could be the genesis).
|
// (which could be the genesis).
|
||||||
//
|
//
|
||||||
// collectHeaders checks if portions of the chain are in our ChainStore; falling
|
// collectHeaders checks if portions of the chain are in our ChainStore; falling
|
||||||
// down to the network to retrieve the missing parts. If during the process, any
|
// down to the network to retrieve the missing parts. If during the process, any
|
||||||
// portion we receive is in our denylist (bad list), we short-circuit.
|
// portion we receive is in our denylist (bad list), we short-circuit.
|
||||||
//
|
//
|
||||||
// {hint/naming}: `from` and `to` is in inverse order. `from` is the highest,
|
|
||||||
// and `to` is the lowest. This method traverses the chain backwards.
|
|
||||||
//
|
|
||||||
// {hint/usage}: This is used by collectChain, which is in turn called from the
|
// {hint/usage}: This is used by collectChain, which is in turn called from the
|
||||||
// main Sync method (Syncer#Sync), so it's a pretty central method.
|
// main Sync method (Syncer#Sync), so it's a pretty central method.
|
||||||
//
|
//
|
||||||
@ -1093,7 +1090,7 @@ func extractSyncState(ctx context.Context) *SyncerState {
|
|||||||
// bad.
|
// bad.
|
||||||
// 2. Check the consistency of beacon entries in the from tipset. We check
|
// 2. Check the consistency of beacon entries in the from tipset. We check
|
||||||
// total equality of the BeaconEntries in each block.
|
// total equality of the BeaconEntries in each block.
|
||||||
// 3. Travers the chain backwards, for each tipset:
|
// 3. Traverse the chain backwards, for each tipset:
|
||||||
// 3a. Load it from the chainstore; if found, it move on to its parent.
|
// 3a. Load it from the chainstore; if found, it move on to its parent.
|
||||||
// 3b. Query our peers via BlockSync in batches, requesting up to a
|
// 3b. Query our peers via BlockSync in batches, requesting up to a
|
||||||
// maximum of 500 tipsets every time.
|
// maximum of 500 tipsets every time.
|
||||||
@ -1104,40 +1101,40 @@ func extractSyncState(ctx context.Context) *SyncerState {
|
|||||||
//
|
//
|
||||||
// All throughout the process, we keep checking if the received blocks are in
|
// All throughout the process, we keep checking if the received blocks are in
|
||||||
// the deny list, and short-circuit the process if so.
|
// the deny list, and short-circuit the process if so.
|
||||||
func (syncer *Syncer) collectHeaders(ctx context.Context, from *types.TipSet, to *types.TipSet) ([]*types.TipSet, error) {
|
func (syncer *Syncer) collectHeaders(ctx context.Context, incoming *types.TipSet, known *types.TipSet) ([]*types.TipSet, error) {
|
||||||
ctx, span := trace.StartSpan(ctx, "collectHeaders")
|
ctx, span := trace.StartSpan(ctx, "collectHeaders")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
ss := extractSyncState(ctx)
|
ss := extractSyncState(ctx)
|
||||||
|
|
||||||
span.AddAttributes(
|
span.AddAttributes(
|
||||||
trace.Int64Attribute("fromHeight", int64(from.Height())),
|
trace.Int64Attribute("incomingHeight", int64(incoming.Height())),
|
||||||
trace.Int64Attribute("toHeight", int64(to.Height())),
|
trace.Int64Attribute("knownHeight", int64(known.Height())),
|
||||||
)
|
)
|
||||||
|
|
||||||
// Check if the parents of the from block are in the denylist.
|
// Check if the parents of the from block are in the denylist.
|
||||||
// i.e. if a fork of the chain has been requested that we know to be bad.
|
// i.e. if a fork of the chain has been requested that we know to be bad.
|
||||||
for _, pcid := range from.Parents().Cids() {
|
for _, pcid := range incoming.Parents().Cids() {
|
||||||
if reason, ok := syncer.bad.Has(pcid); ok {
|
if reason, ok := syncer.bad.Has(pcid); ok {
|
||||||
newReason := reason.Linked("linked to %s", pcid)
|
newReason := reason.Linked("linked to %s", pcid)
|
||||||
for _, b := range from.Cids() {
|
for _, b := range incoming.Cids() {
|
||||||
syncer.bad.Add(b, newReason)
|
syncer.bad.Add(b, newReason)
|
||||||
}
|
}
|
||||||
return nil, xerrors.Errorf("chain linked to block marked previously as bad (%s, %s) (reason: %s)", from.Cids(), pcid, reason)
|
return nil, xerrors.Errorf("chain linked to block marked previously as bad (%s, %s) (reason: %s)", incoming.Cids(), pcid, reason)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
// ensure consistency of beacon entires
|
// ensure consistency of beacon entires
|
||||||
targetBE := from.Blocks()[0].BeaconEntries
|
targetBE := incoming.Blocks()[0].BeaconEntries
|
||||||
sorted := sort.SliceIsSorted(targetBE, func(i, j int) bool {
|
sorted := sort.SliceIsSorted(targetBE, func(i, j int) bool {
|
||||||
return targetBE[i].Round < targetBE[j].Round
|
return targetBE[i].Round < targetBE[j].Round
|
||||||
})
|
})
|
||||||
if !sorted {
|
if !sorted {
|
||||||
syncer.bad.Add(from.Cids()[0], NewBadBlockReason(from.Cids(), "wrong order of beacon entires"))
|
syncer.bad.Add(incoming.Cids()[0], NewBadBlockReason(incoming.Cids(), "wrong order of beacon entires"))
|
||||||
return nil, xerrors.Errorf("wrong order of beacon entires")
|
return nil, xerrors.Errorf("wrong order of beacon entires")
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, bh := range from.Blocks()[1:] {
|
for _, bh := range incoming.Blocks()[1:] {
|
||||||
if len(targetBE) != len(bh.BeaconEntries) {
|
if len(targetBE) != len(bh.BeaconEntries) {
|
||||||
// cannot mark bad, I think @Kubuxu
|
// cannot mark bad, I think @Kubuxu
|
||||||
return nil, xerrors.Errorf("tipset contained different number for beacon entires")
|
return nil, xerrors.Errorf("tipset contained different number for beacon entires")
|
||||||
@ -1152,12 +1149,12 @@ func (syncer *Syncer) collectHeaders(ctx context.Context, from *types.TipSet, to
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
blockSet := []*types.TipSet{from}
|
blockSet := []*types.TipSet{incoming}
|
||||||
|
|
||||||
at := from.Parents()
|
at := incoming.Parents()
|
||||||
|
|
||||||
// we want to sync all the blocks until the height above the block we have
|
// we want to sync all the blocks until the height above the block we have
|
||||||
untilHeight := to.Height() + 1
|
untilHeight := known.Height() + 1
|
||||||
|
|
||||||
ss.SetHeight(blockSet[len(blockSet)-1].Height())
|
ss.SetHeight(blockSet[len(blockSet)-1].Height())
|
||||||
|
|
||||||
@ -1172,7 +1169,7 @@ loop:
|
|||||||
syncer.bad.Add(b, newReason)
|
syncer.bad.Add(b, newReason)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, xerrors.Errorf("chain contained block marked previously as bad (%s, %s) (reason: %s)", from.Cids(), bc, reason)
|
return nil, xerrors.Errorf("chain contained block marked previously as bad (%s, %s) (reason: %s)", incoming.Cids(), bc, reason)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1221,7 +1218,7 @@ loop:
|
|||||||
syncer.bad.Add(b, newReason)
|
syncer.bad.Add(b, newReason)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, xerrors.Errorf("chain contained block marked previously as bad (%s, %s) (reason: %s)", from.Cids(), bc, reason)
|
return nil, xerrors.Errorf("chain contained block marked previously as bad (%s, %s) (reason: %s)", incoming.Cids(), bc, reason)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
blockSet = append(blockSet, b)
|
blockSet = append(blockSet, b)
|
||||||
@ -1233,23 +1230,23 @@ loop:
|
|||||||
at = blks[len(blks)-1].Parents()
|
at = blks[len(blks)-1].Parents()
|
||||||
}
|
}
|
||||||
|
|
||||||
if !types.CidArrsEqual(blockSet[len(blockSet)-1].Parents().Cids(), to.Cids()) {
|
// base is the tipset in the candidate chain at the height equal to our known tipset height.
|
||||||
last := blockSet[len(blockSet)-1]
|
if base := blockSet[len(blockSet)-1]; !types.CidArrsEqual(base.Parents().Cids(), known.Cids()) {
|
||||||
if last.Parents() == to.Parents() {
|
if base.Parents() == known.Parents() {
|
||||||
// common case: receiving a block thats potentially part of the same tipset as our best block
|
// common case: receiving a block thats potentially part of the same tipset as our best block
|
||||||
return blockSet, nil
|
return blockSet, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// We have now ascertained that this is *not* a 'fast forward'
|
// We have now ascertained that this is *not* a 'fast forward'
|
||||||
|
|
||||||
log.Warnf("(fork detected) synced header chain (%s - %d) does not link to our best block (%s - %d)", from.Cids(), from.Height(), to.Cids(), to.Height())
|
log.Warnf("(fork detected) synced header chain (%s - %d) does not link to our best block (%s - %d)", incoming.Cids(), incoming.Height(), known.Cids(), known.Height())
|
||||||
fork, err := syncer.syncFork(ctx, last, to)
|
fork, err := syncer.syncFork(ctx, base, known)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if xerrors.Is(err, ErrForkTooLong) {
|
if xerrors.Is(err, ErrForkTooLong) {
|
||||||
// TODO: we're marking this block bad in the same way that we mark invalid blocks bad. Maybe distinguish?
|
// TODO: we're marking this block bad in the same way that we mark invalid blocks bad. Maybe distinguish?
|
||||||
log.Warn("adding forked chain to our bad tipset cache")
|
log.Warn("adding forked chain to our bad tipset cache")
|
||||||
for _, b := range from.Blocks() {
|
for _, b := range incoming.Blocks() {
|
||||||
syncer.bad.Add(b.Cid(), NewBadBlockReason(from.Cids(), "fork past finality"))
|
syncer.bad.Add(b.Cid(), NewBadBlockReason(incoming.Cids(), "fork past finality"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil, xerrors.Errorf("failed to sync fork: %w", err)
|
return nil, xerrors.Errorf("failed to sync fork: %w", err)
|
||||||
@ -1269,13 +1266,13 @@ var ErrForkTooLong = fmt.Errorf("fork longer than threshold")
|
|||||||
// If the fork is too long (build.ForkLengthThreshold), we add the entire subchain to the
|
// If the fork is too long (build.ForkLengthThreshold), we add the entire subchain to the
|
||||||
// denylist. Else, we find the common ancestor, and add the missing chain
|
// denylist. Else, we find the common ancestor, and add the missing chain
|
||||||
// fragment until the fork point to the returned []TipSet.
|
// fragment until the fork point to the returned []TipSet.
|
||||||
func (syncer *Syncer) syncFork(ctx context.Context, from *types.TipSet, to *types.TipSet) ([]*types.TipSet, error) {
|
func (syncer *Syncer) syncFork(ctx context.Context, incoming *types.TipSet, known *types.TipSet) ([]*types.TipSet, error) {
|
||||||
tips, err := syncer.Bsync.GetBlocks(ctx, from.Parents(), int(build.ForkLengthThreshold))
|
tips, err := syncer.Bsync.GetBlocks(ctx, incoming.Parents(), int(build.ForkLengthThreshold))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
nts, err := syncer.store.LoadTipSet(to.Parents())
|
nts, err := syncer.store.LoadTipSet(known.Parents())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("failed to load next local tipset: %w", err)
|
return nil, xerrors.Errorf("failed to load next local tipset: %w", err)
|
||||||
}
|
}
|
||||||
@ -1285,7 +1282,7 @@ func (syncer *Syncer) syncFork(ctx context.Context, from *types.TipSet, to *type
|
|||||||
if !syncer.Genesis.Equals(nts) {
|
if !syncer.Genesis.Equals(nts) {
|
||||||
return nil, xerrors.Errorf("somehow synced chain that linked back to a different genesis (bad genesis: %s)", nts.Key())
|
return nil, xerrors.Errorf("somehow synced chain that linked back to a different genesis (bad genesis: %s)", nts.Key())
|
||||||
}
|
}
|
||||||
return nil, xerrors.Errorf("synced chain forked at genesis, refusing to sync")
|
return nil, xerrors.Errorf("synced chain forked at genesis, refusing to sync; incoming: %s")
|
||||||
}
|
}
|
||||||
|
|
||||||
if nts.Equals(tips[cur]) {
|
if nts.Equals(tips[cur]) {
|
||||||
|
@ -43,7 +43,7 @@ func TestBigIntSerializationRoundTrip(t *testing.T) {
|
|||||||
|
|
||||||
func TestFilRoundTrip(t *testing.T) {
|
func TestFilRoundTrip(t *testing.T) {
|
||||||
testValues := []string{
|
testValues := []string{
|
||||||
"0", "1", "1.001", "100.10001", "101100", "5000.01", "5000",
|
"0 FIL", "1 FIL", "1.001 FIL", "100.10001 FIL", "101100 FIL", "5000.01 FIL", "5000 FIL",
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, v := range testValues {
|
for _, v := range testValues {
|
||||||
|
@ -13,9 +13,9 @@ type FIL BigInt
|
|||||||
func (f FIL) String() string {
|
func (f FIL) String() string {
|
||||||
r := new(big.Rat).SetFrac(f.Int, big.NewInt(int64(build.FilecoinPrecision)))
|
r := new(big.Rat).SetFrac(f.Int, big.NewInt(int64(build.FilecoinPrecision)))
|
||||||
if r.Sign() == 0 {
|
if r.Sign() == 0 {
|
||||||
return "0"
|
return "0 FIL"
|
||||||
}
|
}
|
||||||
return strings.TrimRight(strings.TrimRight(r.FloatString(18), "0"), ".")
|
return strings.TrimRight(strings.TrimRight(r.FloatString(18), "0"), ".") + " FIL"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f FIL) Format(s fmt.State, ch rune) {
|
func (f FIL) Format(s fmt.State, ch rune) {
|
||||||
@ -28,14 +28,35 @@ func (f FIL) Format(s fmt.State, ch rune) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func ParseFIL(s string) (FIL, error) {
|
func ParseFIL(s string) (FIL, error) {
|
||||||
|
suffix := strings.TrimLeft(s, ".1234567890")
|
||||||
|
s = s[:len(s)-len(suffix)]
|
||||||
|
var attofil bool
|
||||||
|
if suffix != "" {
|
||||||
|
norm := strings.ToLower(strings.TrimSpace(suffix))
|
||||||
|
switch norm {
|
||||||
|
case "", "fil":
|
||||||
|
case "attofil", "afil":
|
||||||
|
attofil = true
|
||||||
|
default:
|
||||||
|
return FIL{}, fmt.Errorf("unrecognized suffix: %q", suffix)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
r, ok := new(big.Rat).SetString(s)
|
r, ok := new(big.Rat).SetString(s)
|
||||||
if !ok {
|
if !ok {
|
||||||
return FIL{}, fmt.Errorf("failed to parse %q as a decimal number", s)
|
return FIL{}, fmt.Errorf("failed to parse %q as a decimal number", s)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !attofil {
|
||||||
r = r.Mul(r, big.NewRat(int64(build.FilecoinPrecision), 1))
|
r = r.Mul(r, big.NewRat(int64(build.FilecoinPrecision), 1))
|
||||||
|
}
|
||||||
|
|
||||||
if !r.IsInt() {
|
if !r.IsInt() {
|
||||||
return FIL{}, fmt.Errorf("invalid FIL value: %q", s)
|
var pref string
|
||||||
|
if attofil {
|
||||||
|
pref = "atto"
|
||||||
|
}
|
||||||
|
return FIL{}, fmt.Errorf("invalid %sFIL value: %q", pref, s)
|
||||||
}
|
}
|
||||||
|
|
||||||
return FIL{r.Num()}, nil
|
return FIL{r.Num()}, nil
|
||||||
|
@ -51,7 +51,7 @@ func main() {
|
|||||||
|
|
||||||
if err := app.Run(os.Args); err != nil {
|
if err := app.Run(os.Args); err != nil {
|
||||||
log.Warnf("%+v", err)
|
log.Warnf("%+v", err)
|
||||||
return
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -294,6 +294,30 @@ create table if not exists miner_info
|
|||||||
primary key (miner_id)
|
primary key (miner_id)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* captures chain-specific power state for any given stateroot
|
||||||
|
*/
|
||||||
|
create table if not exists chain_power
|
||||||
|
(
|
||||||
|
state_root text not null
|
||||||
|
constraint chain_power_pk
|
||||||
|
primary key,
|
||||||
|
baseline_power text not null
|
||||||
|
);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* captures miner-specific power state for any given stateroot
|
||||||
|
*/
|
||||||
|
create table if not exists miner_power
|
||||||
|
(
|
||||||
|
miner_id text not null,
|
||||||
|
state_root text not null,
|
||||||
|
raw_bytes_power text not null,
|
||||||
|
quality_adjusted_power text not null,
|
||||||
|
constraint miner_power_pk
|
||||||
|
primary key (miner_id, state_root)
|
||||||
|
);
|
||||||
|
|
||||||
/* used to tell when a miners sectors (proven-not-yet-expired) changed if the miner_sectors_cid's are different a new sector was added or removed (terminated/expired) */
|
/* used to tell when a miners sectors (proven-not-yet-expired) changed if the miner_sectors_cid's are different a new sector was added or removed (terminated/expired) */
|
||||||
create table if not exists miner_sectors_heads
|
create table if not exists miner_sectors_heads
|
||||||
(
|
(
|
||||||
@ -500,6 +524,46 @@ func (st *storage) storeActors(actors map[address.Address]map[types.Actor]actorI
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// storeChainPower captures reward actor state as it relates to power captured on-chain
|
||||||
|
func (st *storage) storeChainPower(rewardTips map[types.TipSetKey]*rewardStateInfo) error {
|
||||||
|
tx, err := st.db.Begin()
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("begin chain_power tx: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := tx.Exec(`create temp table cp (like chain_power excluding constraints) on commit drop`); err != nil {
|
||||||
|
return xerrors.Errorf("prep chain_power temp: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
stmt, err := tx.Prepare(`copy cp (state_root, baseline_power) from STDIN`)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("prepare tmp chain_power: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, rewardState := range rewardTips {
|
||||||
|
if _, err := stmt.Exec(
|
||||||
|
rewardState.stateroot.String(),
|
||||||
|
rewardState.baselinePower.String(),
|
||||||
|
); err != nil {
|
||||||
|
log.Errorw("failed to store chain power", "state_root", rewardState.stateroot, "error", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := stmt.Close(); err != nil {
|
||||||
|
return xerrors.Errorf("close prepared chain_power: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := tx.Exec(`insert into chain_power select * from cp on conflict do nothing`); err != nil {
|
||||||
|
return xerrors.Errorf("insert chain_power from tmp: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := tx.Commit(); err != nil {
|
||||||
|
return xerrors.Errorf("commit chain_power tx: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
type storeSectorsAPI interface {
|
type storeSectorsAPI interface {
|
||||||
StateMinerSectors(context.Context, address.Address, *abi.BitField, bool, types.TipSetKey) ([]*api.ChainSectorInfo, error)
|
StateMinerSectors(context.Context, address.Address, *abi.BitField, bool, types.TipSetKey) ([]*api.ChainSectorInfo, error)
|
||||||
}
|
}
|
||||||
@ -607,6 +671,50 @@ func (st *storage) storeMiners(minerTips map[types.TipSetKey][]*minerStateInfo)
|
|||||||
return tx.Commit()
|
return tx.Commit()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// storeMinerPower captures miner actor state as it relates to power per miner captured on-chain
|
||||||
|
func (st *storage) storeMinerPower(minerTips map[types.TipSetKey][]*minerStateInfo) error {
|
||||||
|
tx, err := st.db.Begin()
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("begin miner_power tx: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := tx.Exec(`create temp table mp (like miner_power excluding constraints) on commit drop`); err != nil {
|
||||||
|
return xerrors.Errorf("prep miner_power temp: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
stmt, err := tx.Prepare(`copy mp (miner_id, state_root, raw_bytes_power, quality_adjusted_power) from STDIN`)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("prepare tmp miner_power: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, miners := range minerTips {
|
||||||
|
for _, minerInfo := range miners {
|
||||||
|
if _, err := stmt.Exec(
|
||||||
|
minerInfo.addr.String(),
|
||||||
|
minerInfo.stateroot.String(),
|
||||||
|
minerInfo.rawPower.String(),
|
||||||
|
minerInfo.qalPower.String(),
|
||||||
|
); err != nil {
|
||||||
|
log.Errorw("failed to store miner power", "miner", minerInfo.addr, "stateroot", minerInfo.stateroot, "error", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := stmt.Close(); err != nil {
|
||||||
|
return xerrors.Errorf("close prepared miner_power: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := tx.Exec(`insert into miner_power select * from mp on conflict do nothing`); err != nil {
|
||||||
|
return xerrors.Errorf("insert miner_power from tmp: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := tx.Commit(); err != nil {
|
||||||
|
return xerrors.Errorf("commit miner_power tx: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (st *storage) storeMinerSectorsHeads(minerTips map[types.TipSetKey][]*minerStateInfo, api api.FullNode) error {
|
func (st *storage) storeMinerSectorsHeads(minerTips map[types.TipSetKey][]*minerStateInfo, api api.FullNode) error {
|
||||||
tx, err := st.db.Begin()
|
tx, err := st.db.Begin()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -13,12 +13,14 @@ import (
|
|||||||
"github.com/filecoin-project/go-address"
|
"github.com/filecoin-project/go-address"
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
cbg "github.com/whyrusleeping/cbor-gen"
|
cbg "github.com/whyrusleeping/cbor-gen"
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||||
"github.com/filecoin-project/specs-actors/actors/abi/big"
|
"github.com/filecoin-project/specs-actors/actors/abi/big"
|
||||||
"github.com/filecoin-project/specs-actors/actors/builtin"
|
"github.com/filecoin-project/specs-actors/actors/builtin"
|
||||||
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
|
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
|
||||||
"github.com/filecoin-project/specs-actors/actors/builtin/power"
|
"github.com/filecoin-project/specs-actors/actors/builtin/power"
|
||||||
|
"github.com/filecoin-project/specs-actors/actors/builtin/reward"
|
||||||
"github.com/filecoin-project/specs-actors/actors/util/adt"
|
"github.com/filecoin-project/specs-actors/actors/util/adt"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/api"
|
"github.com/filecoin-project/lotus/api"
|
||||||
@ -53,6 +55,11 @@ func runSyncer(ctx context.Context, api api.FullNode, st *storage, maxBatch int)
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type rewardStateInfo struct {
|
||||||
|
stateroot cid.Cid
|
||||||
|
baselinePower big.Int
|
||||||
|
}
|
||||||
|
|
||||||
type minerStateInfo struct {
|
type minerStateInfo struct {
|
||||||
// common
|
// common
|
||||||
addr address.Address
|
addr address.Address
|
||||||
@ -273,6 +280,8 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// map of tipset to reward state
|
||||||
|
rewardTips := make(map[types.TipSetKey]*rewardStateInfo, len(changes))
|
||||||
// map of tipset to all miners that had a head-change at that tipset.
|
// map of tipset to all miners that had a head-change at that tipset.
|
||||||
minerTips := make(map[types.TipSetKey][]*minerStateInfo, len(changes))
|
minerTips := make(map[types.TipSetKey][]*minerStateInfo, len(changes))
|
||||||
// heads we've seen, im being paranoid
|
// heads we've seen, im being paranoid
|
||||||
@ -302,16 +311,22 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
|
|||||||
alk.Unlock()
|
alk.Unlock()
|
||||||
})
|
})
|
||||||
|
|
||||||
log.Infof("Getting miner info")
|
log.Infof("Getting actor change info")
|
||||||
|
|
||||||
minerChanges := 0
|
minerChanges := 0
|
||||||
for addr, m := range actors {
|
for addr, m := range actors {
|
||||||
for actor, c := range m {
|
for actor, c := range m {
|
||||||
if actor.Code != builtin.StorageMinerActorCodeID {
|
// reward actor
|
||||||
|
if actor.Code == builtin.RewardActorCodeID {
|
||||||
|
rewardTips[c.tsKey] = &rewardStateInfo{
|
||||||
|
stateroot: c.stateroot,
|
||||||
|
baselinePower: big.Zero(),
|
||||||
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// only want miner actors with head change events
|
// miner actors with head change events
|
||||||
|
if actor.Code == builtin.StorageMinerActorCodeID {
|
||||||
if _, found := headsSeen[actor.Head]; found {
|
if _, found := headsSeen[actor.Head]; found {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -334,7 +349,35 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
|
|||||||
|
|
||||||
headsSeen[actor.Head] = struct{}{}
|
headsSeen[actor.Head] = struct{}{}
|
||||||
}
|
}
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
rewardProcessingStartedAt := time.Now()
|
||||||
|
parmap.Par(50, parmap.KVMapArr(rewardTips), func(it func() (types.TipSetKey, *rewardStateInfo)) {
|
||||||
|
tsKey, rewardInfo := it()
|
||||||
|
// get reward actor states at each tipset once for all updates
|
||||||
|
rewardActor, err := api.StateGetActor(ctx, builtin.RewardActorAddr, tsKey)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(xerrors.Errorf("get reward state (@ %s): %w", rewardInfo.stateroot.String(), err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
rewardStateRaw, err := api.ChainReadObj(ctx, rewardActor.Head)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(xerrors.Errorf("read state obj (@ %s): %w", rewardInfo.stateroot.String(), err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var rewardActorState reward.State
|
||||||
|
if err := rewardActorState.UnmarshalCBOR(bytes.NewReader(rewardStateRaw)); err != nil {
|
||||||
|
log.Error(xerrors.Errorf("unmarshal state (@ %s): %w", rewardInfo.stateroot.String(), err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
rewardInfo.baselinePower = rewardActorState.BaselinePower
|
||||||
|
})
|
||||||
|
log.Infow("Completed Reward Processing", "duration", time.Since(rewardProcessingStartedAt).String(), "processed", len(rewardTips))
|
||||||
|
|
||||||
minerProcessingStartedAt := time.Now()
|
minerProcessingStartedAt := time.Now()
|
||||||
log.Infow("Processing miners", "numTips", len(minerTips), "numMinerChanges", minerChanges)
|
log.Infow("Processing miners", "numTips", len(minerTips), "numMinerChanges", minerChanges)
|
||||||
@ -415,25 +458,35 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
|
|||||||
}
|
}
|
||||||
|
|
||||||
log.Info("Storing actors")
|
log.Info("Storing actors")
|
||||||
|
|
||||||
if err := st.storeActors(actors); err != nil {
|
if err := st.storeActors(actors); err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
chainPowerStartedAt := time.Now()
|
||||||
|
if err := st.storeChainPower(rewardTips); err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
}
|
||||||
|
log.Infow("Stored chain power", "duration", time.Since(chainPowerStartedAt).String())
|
||||||
|
|
||||||
log.Info("Storing miners")
|
log.Info("Storing miners")
|
||||||
if err := st.storeMiners(minerTips); err != nil {
|
if err := st.storeMiners(minerTips); err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info("Storing miner sectors")
|
minerPowerStartedAt := time.Now()
|
||||||
|
if err := st.storeMinerPower(minerTips); err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
}
|
||||||
|
log.Infow("Stored miner power", "duration", time.Since(minerPowerStartedAt).String())
|
||||||
|
|
||||||
sectorStart := time.Now()
|
sectorStart := time.Now()
|
||||||
if err := st.storeSectors(minerTips, api); err != nil {
|
if err := st.storeSectors(minerTips, api); err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
log.Infow("Finished storing miner sectors", "duration", time.Since(sectorStart).String())
|
log.Infow("Stored miner sectors", "duration", time.Since(sectorStart).String())
|
||||||
|
|
||||||
log.Info("Storing miner sectors heads")
|
log.Info("Storing miner sectors heads")
|
||||||
if err := st.storeMinerSectorsHeads(minerTips, api); err != nil {
|
if err := st.storeMinerSectorsHeads(minerTips, api); err != nil {
|
||||||
|
@ -3,6 +3,7 @@ package main
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-address"
|
"github.com/filecoin-project/go-address"
|
||||||
"github.com/filecoin-project/lotus/build"
|
"github.com/filecoin-project/lotus/build"
|
||||||
"github.com/urfave/cli/v2"
|
"github.com/urfave/cli/v2"
|
||||||
@ -199,7 +200,7 @@ var verifRegListVerifiersCmd = &cli.Command{
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
vh, err := hamt.LoadNode(ctx, cst, st.Verifiers)
|
vh, err := hamt.LoadNode(ctx, cst, st.Verifiers, hamt.UseTreeBitWidth(5))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -251,7 +252,7 @@ var verifRegListClientsCmd = &cli.Command{
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
vh, err := hamt.LoadNode(ctx, cst, st.VerifiedClients)
|
vh, err := hamt.LoadNode(ctx, cst, st.VerifiedClients, hamt.UseTreeBitWidth(5))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -346,7 +347,7 @@ var verifRegCheckVerifierCmd = &cli.Command{
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
vh, err := hamt.LoadNode(ctx, cst, st.Verifiers)
|
vh, err := hamt.LoadNode(ctx, cst, st.Verifiers, hamt.UseTreeBitWidth(5))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -359,10 +359,8 @@ func migratePreSealMeta(ctx context.Context, api lapi.FullNode, metadata string,
|
|||||||
}*/
|
}*/
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infof("Setting next sector ID to %d", maxSectorID+1)
|
|
||||||
|
|
||||||
buf := make([]byte, binary.MaxVarintLen64)
|
buf := make([]byte, binary.MaxVarintLen64)
|
||||||
size := binary.PutUvarint(buf, uint64(maxSectorID+1))
|
size := binary.PutUvarint(buf, uint64(maxSectorID))
|
||||||
return mds.Put(datastore.NewKey(modules.StorageCounterDSPrefix), buf[:size])
|
return mds.Put(datastore.NewKey(modules.StorageCounterDSPrefix), buf[:size])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -18,4 +18,6 @@ func SetupLogLevels() {
|
|||||||
_ = logging.SetLogLevel("stores", "DEBUG")
|
_ = logging.SetLogLevel("stores", "DEBUG")
|
||||||
_ = logging.SetLogLevel("nat", "INFO")
|
_ = logging.SetLogLevel("nat", "INFO")
|
||||||
}
|
}
|
||||||
|
// Always mute RtRefreshManager because it breaks terminals
|
||||||
|
_ = logging.SetLogLevel("dht/RtRefreshManager", "FATAL")
|
||||||
}
|
}
|
||||||
|
@ -176,17 +176,18 @@ func (ht *apiIpldStore) Put(ctx context.Context, v interface{}) (cid.Cid, error)
|
|||||||
}
|
}
|
||||||
|
|
||||||
func RecordTipsetStatePoints(ctx context.Context, api api.FullNode, pl *PointList, tipset *types.TipSet) error {
|
func RecordTipsetStatePoints(ctx context.Context, api api.FullNode, pl *PointList, tipset *types.TipSet) error {
|
||||||
pc, err := api.StatePledgeCollateral(ctx, tipset.Key())
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
attoFil := types.NewInt(build.FilecoinPrecision).Int
|
attoFil := types.NewInt(build.FilecoinPrecision).Int
|
||||||
|
|
||||||
pcFil := new(big.Rat).SetFrac(pc.Int, attoFil)
|
//TODO: StatePledgeCollateral API is not implemented and is commented out - re-enable this block once the API is implemented again.
|
||||||
pcFilFloat, _ := pcFil.Float64()
|
//pc, err := api.StatePledgeCollateral(ctx, tipset.Key())
|
||||||
p := NewPoint("chain.pledge_collateral", pcFilFloat)
|
//if err != nil {
|
||||||
pl.AddPoint(p)
|
//return err
|
||||||
|
//}
|
||||||
|
|
||||||
|
//pcFil := new(big.Rat).SetFrac(pc.Int, attoFil)
|
||||||
|
//pcFilFloat, _ := pcFil.Float64()
|
||||||
|
//p := NewPoint("chain.pledge_collateral", pcFilFloat)
|
||||||
|
//pl.AddPoint(p)
|
||||||
|
|
||||||
netBal, err := api.WalletBalance(ctx, builtin.RewardActorAddr)
|
netBal, err := api.WalletBalance(ctx, builtin.RewardActorAddr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -195,7 +196,7 @@ func RecordTipsetStatePoints(ctx context.Context, api api.FullNode, pl *PointLis
|
|||||||
|
|
||||||
netBalFil := new(big.Rat).SetFrac(netBal.Int, attoFil)
|
netBalFil := new(big.Rat).SetFrac(netBal.Int, attoFil)
|
||||||
netBalFilFloat, _ := netBalFil.Float64()
|
netBalFilFloat, _ := netBalFil.Float64()
|
||||||
p = NewPoint("network.balance", netBalFilFloat)
|
p := NewPoint("network.balance", netBalFilFloat)
|
||||||
pl.AddPoint(p)
|
pl.AddPoint(p)
|
||||||
|
|
||||||
totalPower, err := api.StateMinerPower(ctx, address.Address{}, tipset.Key())
|
totalPower, err := api.StateMinerPower(ctx, address.Address{}, tipset.Key())
|
||||||
|
Loading…
Reference in New Issue
Block a user