diff --git a/README.md b/README.md index e2a1bf120..6c1e23efa 100644 --- a/README.md +++ b/README.md @@ -40,9 +40,6 @@ All work is tracked via issues. An attempt at keeping an up-to-date view on rema The lotus Filecoin implementation unfolds into the following packages: - [This repo](https://github.com/filecoin-project/lotus) -- [storage-fsm](https://github.com/filecoin-project/storage-fsm) -- [sector-storage](https://github.com/filecoin-project/sector-storage) -- [specs-storage](https://github.com/filecoin-project/specs-storage) - [go-fil-markets](https://github.com/filecoin-project/go-fil-markets) which has its own [kanban work tracker available here](https://app.zenhub.com/workspaces/markets-shared-components-5daa144a7046a60001c6e253/board) - [spec-actors](https://github.com/filecoin-project/specs-actors) which has its own [kanban work tracker available here](https://app.zenhub.com/workspaces/actors-5ee6f3aa87591f0016c05685/board) diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index c036bf1f6..a21df794f 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -3,6 +3,7 @@ package sub import ( "bytes" "context" + "errors" "fmt" "sync" "time" @@ -40,6 +41,9 @@ import ( var log = logging.Logger("sub") +var ErrSoftFailure = errors.New("soft validation failure") +var ErrInsufficientPower = errors.New("incoming block's miner does not have minimum power") + func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *chain.Syncer, bserv bserv.BlockService, cmgr connmgr.ConnManager) { for { msg, err := bsub.Next(ctx) @@ -258,16 +262,15 @@ func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub stats.Record(ctx, metrics.BlockReceived.M(1)) - recordFailure := func(what string) { - ctx, _ = tag.New(ctx, tag.Insert(metrics.FailureType, what)) - stats.Record(ctx, metrics.BlockValidationFailure.M(1)) + recordFailureFlagPeer := func(what string) { + recordFailure(ctx, metrics.BlockValidationFailure, what) bv.flagPeer(pid) } blk, what, err := bv.decodeAndCheckBlock(msg) if err != nil { log.Error("got invalid block over pubsub: ", err) - recordFailure(what) + recordFailureFlagPeer(what) return pubsub.ValidationReject } @@ -275,7 +278,7 @@ func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub err = bv.validateMsgMeta(ctx, blk) if err != nil { log.Warnf("error validating message metadata: %s", err) - recordFailure("invalid_block_meta") + recordFailureFlagPeer("invalid_block_meta") return pubsub.ValidationReject } @@ -288,11 +291,12 @@ func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub // if we are synced and the miner is unknown, then the block is rejcected. key, err := bv.checkPowerAndGetWorkerKey(ctx, blk.Header) if err != nil { - if bv.isChainNearSynced() { + if err != ErrSoftFailure && bv.isChainNearSynced() { log.Warnf("received block from unknown miner or miner that doesn't meet min power over pubsub; rejecting message") - recordFailure("unknown_miner") + recordFailureFlagPeer("unknown_miner") return pubsub.ValidationReject } + log.Warnf("cannot validate block message; unknown miner or miner that doesn't meet min power in unsynced chain") return pubsub.ValidationIgnore } @@ -300,13 +304,13 @@ func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub err = sigs.CheckBlockSignature(ctx, blk.Header, key) if err != nil { log.Errorf("block signature verification failed: %s", err) - recordFailure("signature_verification_failed") + recordFailureFlagPeer("signature_verification_failed") return pubsub.ValidationReject } if blk.Header.ElectionProof.WinCount < 1 { log.Errorf("block is not claiming to be winning") - recordFailure("not_winning") + recordFailureFlagPeer("not_winning") return pubsub.ValidationReject } @@ -473,19 +477,19 @@ func (bv *BlockValidator) checkPowerAndGetWorkerKey(ctx context.Context, bh *typ baseTs := bv.chain.GetHeaviestTipSet() lbts, err := stmgr.GetLookbackTipSetForRound(ctx, bv.stmgr, baseTs, bh.Height) if err != nil { - log.Warnf("failed to load lookback tipset for incoming block") - return address.Undef, err + log.Warnf("failed to load lookback tipset for incoming block: %s", err) + return address.Undef, ErrSoftFailure } hmp, err := stmgr.MinerHasMinPower(ctx, bv.stmgr, bh.Miner, lbts) if err != nil { - log.Warnf("failed to determine if incoming block's miner has minimum power") - return address.Undef, err + log.Warnf("failed to determine if incoming block's miner has minimum power: %s", err) + return address.Undef, ErrSoftFailure } if !hmp { log.Warnf("incoming block's miner does not have minimum power") - return address.Undef, xerrors.New("incoming block's miner does not have minimum power") + return address.Undef, ErrInsufficientPower } return key, nil @@ -541,9 +545,9 @@ func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubs log.Debugf("failed to add message from network to message pool (From: %s, To: %s, Nonce: %d, Value: %s): %s", m.Message.From, m.Message.To, m.Message.Nonce, types.FIL(m.Message.Value), err) ctx, _ = tag.New( ctx, - tag.Insert(metrics.FailureType, "add"), + tag.Upsert(metrics.Local, "false"), ) - stats.Record(ctx, metrics.MessageValidationFailure.M(1)) + recordFailure(ctx, metrics.MessageValidationFailure, "add") switch { case xerrors.Is(err, messagepool.ErrBroadcastAnyway): fallthrough @@ -560,37 +564,41 @@ func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubs } func (mv *MessageValidator) validateLocalMessage(ctx context.Context, msg *pubsub.Message) pubsub.ValidationResult { + ctx, _ = tag.New( + ctx, + tag.Upsert(metrics.Local, "true"), + ) // do some lightweight validation stats.Record(ctx, metrics.MessagePublished.M(1)) m, err := types.DecodeSignedMessage(msg.Message.GetData()) if err != nil { log.Warnf("failed to decode local message: %s", err) - stats.Record(ctx, metrics.MessageValidationFailure.M(1)) + recordFailure(ctx, metrics.MessageValidationFailure, "decode") return pubsub.ValidationIgnore } if m.Size() > 32*1024 { log.Warnf("local message is too large! (%dB)", m.Size()) - stats.Record(ctx, metrics.MessageValidationFailure.M(1)) + recordFailure(ctx, metrics.MessageValidationFailure, "oversize") return pubsub.ValidationIgnore } if m.Message.To == address.Undef { log.Warn("local message has invalid destination address") - stats.Record(ctx, metrics.MessageValidationFailure.M(1)) + recordFailure(ctx, metrics.MessageValidationFailure, "undef-addr") return pubsub.ValidationIgnore } if !m.Message.Value.LessThan(types.TotalFilecoinInt) { log.Warnf("local messages has too high value: %s", m.Message.Value) - stats.Record(ctx, metrics.MessageValidationFailure.M(1)) + recordFailure(ctx, metrics.MessageValidationFailure, "value-too-high") return pubsub.ValidationIgnore } if err := mv.mpool.VerifyMsgSig(m); err != nil { log.Warnf("signature verification failed for local message: %s", err) - stats.Record(ctx, metrics.MessageValidationFailure.M(1)) + recordFailure(ctx, metrics.MessageValidationFailure, "verify-sig") return pubsub.ValidationIgnore } @@ -613,3 +621,11 @@ func HandleIncomingMessages(ctx context.Context, mpool *messagepool.MessagePool, // Do nothing... everything happens in validate } } + +func recordFailure(ctx context.Context, metric *stats.Int64Measure, failureType string) { + ctx, _ = tag.New( + ctx, + tag.Upsert(metrics.FailureType, failureType), + ) + stats.Record(ctx, metric.M(1)) +} diff --git a/chain/vm/runtime.go b/chain/vm/runtime.go index d39c14beb..99333fc04 100644 --- a/chain/vm/runtime.go +++ b/chain/vm/runtime.go @@ -34,7 +34,6 @@ type Runtime struct { vm *VM state *state.StateTree - msg *types.Message vmsg vmr.Message height abi.ChainEpoch cst cbor.IpldStore diff --git a/chain/vm/vm.go b/chain/vm/vm.go index 7be5417b7..f51cbff29 100644 --- a/chain/vm/vm.go +++ b/chain/vm/vm.go @@ -97,7 +97,6 @@ func (vm *VM) makeRuntime(ctx context.Context, msg *types.Message, origin addres ctx: ctx, vm: vm, state: vm.cstate, - msg: msg, origin: origin, originNonce: originNonce, height: vm.blockHeight, diff --git a/cmd/lotus-chainwatch/main.go b/cmd/lotus-chainwatch/main.go index b230d9cae..5cb0f3507 100644 --- a/cmd/lotus-chainwatch/main.go +++ b/cmd/lotus-chainwatch/main.go @@ -26,6 +26,11 @@ func main() { EnvVars: []string{"LOTUS_PATH"}, Value: "~/.lotus", // TODO: Consider XDG_DATA_HOME }, + &cli.StringFlag{ + Name: "api", + EnvVars: []string{"FULLNODE_API_INFO"}, + Value: "", + }, &cli.StringFlag{ Name: "db", EnvVars: []string{"LOTUS_DB"}, diff --git a/cmd/lotus-chainwatch/processor/market.go b/cmd/lotus-chainwatch/processor/market.go index 426005ac3..e50ec3076 100644 --- a/cmd/lotus-chainwatch/processor/market.go +++ b/cmd/lotus-chainwatch/processor/market.go @@ -96,12 +96,6 @@ func (p *Processor) HandleMarketChanges(ctx context.Context, marketTips ActorTip log.Fatalw("Failed to persist market actors", "error", err) } - // we persist the dealID <--> minerID,sectorID here since the dealID needs to be stored above first - if err := p.storePreCommitDealInfo(p.sectorDealEvents); err != nil { - close(p.sectorDealEvents) - return err - } - if err := p.updateMarket(ctx, marketChanges); err != nil { log.Fatalw("Failed to update market actors", "error", err) } @@ -272,48 +266,6 @@ func (p *Processor) storeMarketActorDealProposals(ctx context.Context, marketTip } -func (p *Processor) storePreCommitDealInfo(dealEvents <-chan *SectorDealEvent) error { - tx, err := p.db.Begin() - if err != nil { - return err - } - - if _, err := tx.Exec(`create temp table mds (like minerid_dealid_sectorid excluding constraints) on commit drop;`); err != nil { - return xerrors.Errorf("Failed to create temp table for minerid_dealid_sectorid: %w", err) - } - - stmt, err := tx.Prepare(`copy mds (deal_id, miner_id, sector_id) from STDIN`) - if err != nil { - return xerrors.Errorf("Failed to prepare minerid_dealid_sectorid statement: %w", err) - } - - for sde := range dealEvents { - for _, did := range sde.DealIDs { - if _, err := stmt.Exec( - uint64(did), - sde.MinerID.String(), - sde.SectorID, - ); err != nil { - return err - } - } - } - - if err := stmt.Close(); err != nil { - return xerrors.Errorf("Failed to close miner sector deals statement: %w", err) - } - - if _, err := tx.Exec(`insert into minerid_dealid_sectorid select * from mds on conflict do nothing`); err != nil { - return xerrors.Errorf("Failed to insert into miner deal sector table: %w", err) - } - - if err := tx.Commit(); err != nil { - return xerrors.Errorf("Failed to commit miner deal sector table: %w", err) - } - return nil - -} - func (p *Processor) updateMarketActorDealProposals(ctx context.Context, marketTip []marketActorInfo) error { start := time.Now() defer func() { diff --git a/cmd/lotus-chainwatch/processor/messages.go b/cmd/lotus-chainwatch/processor/messages.go index 2e88d8aae..e3d23f219 100644 --- a/cmd/lotus-chainwatch/processor/messages.go +++ b/cmd/lotus-chainwatch/processor/messages.go @@ -3,7 +3,6 @@ package processor import ( "context" "sync" - "time" "golang.org/x/sync/errgroup" "golang.org/x/xerrors" @@ -120,10 +119,6 @@ func (p *Processor) persistMessagesAndReceipts(ctx context.Context, blocks map[c } func (p *Processor) storeReceipts(recs map[mrec]*types.MessageReceipt) error { - start := time.Now() - defer func() { - log.Debugw("Persisted Receipts", "duration", time.Since(start).String()) - }() tx, err := p.db.Begin() if err != nil { return err @@ -164,10 +159,6 @@ create temp table recs (like receipts excluding constraints) on commit drop; } func (p *Processor) storeMsgInclusions(incls map[cid.Cid][]cid.Cid) error { - start := time.Now() - defer func() { - log.Debugw("Persisted Message Inclusions", "duration", time.Since(start).String()) - }() tx, err := p.db.Begin() if err != nil { return err @@ -206,10 +197,6 @@ create temp table mi (like block_messages excluding constraints) on commit drop; } func (p *Processor) storeMessages(msgs map[cid.Cid]*types.Message) error { - start := time.Now() - defer func() { - log.Debugw("Persisted Messages", "duration", time.Since(start).String()) - }() tx, err := p.db.Begin() if err != nil { return err diff --git a/cmd/lotus-chainwatch/processor/miner.go b/cmd/lotus-chainwatch/processor/miner.go index 6e4d40dec..7973d3c42 100644 --- a/cmd/lotus-chainwatch/processor/miner.go +++ b/cmd/lotus-chainwatch/processor/miner.go @@ -271,7 +271,11 @@ func (p *Processor) persistMiners(ctx context.Context, miners []minerActorInfo) preCommitEvents := make(chan *MinerSectorsEvent, 8) sectorEvents := make(chan *MinerSectorsEvent, 8) partitionEvents := make(chan *MinerSectorsEvent, 8) - p.sectorDealEvents = make(chan *SectorDealEvent, 8) + dealEvents := make(chan *SectorDealEvent, 8) + + grp.Go(func() error { + return p.storePreCommitDealInfo(dealEvents) + }) grp.Go(func() error { return p.storeMinerSectorEvents(ctx, sectorEvents, preCommitEvents, partitionEvents) @@ -280,9 +284,9 @@ func (p *Processor) persistMiners(ctx context.Context, miners []minerActorInfo) grp.Go(func() error { defer func() { close(preCommitEvents) - close(p.sectorDealEvents) + close(dealEvents) }() - return p.storeMinerPreCommitInfo(ctx, miners, preCommitEvents, p.sectorDealEvents) + return p.storeMinerPreCommitInfo(ctx, miners, preCommitEvents, dealEvents) }) grp.Go(func() error { @@ -911,6 +915,48 @@ func (p *Processor) storeMinersActorInfoState(ctx context.Context, miners []mine return tx.Commit() } +func (p *Processor) storePreCommitDealInfo(dealEvents <-chan *SectorDealEvent) error { + tx, err := p.db.Begin() + if err != nil { + return err + } + + if _, err := tx.Exec(`create temp table mds (like minerid_dealid_sectorid excluding constraints) on commit drop;`); err != nil { + return xerrors.Errorf("Failed to create temp table for minerid_dealid_sectorid: %w", err) + } + + stmt, err := tx.Prepare(`copy mds (deal_id, miner_id, sector_id) from STDIN`) + if err != nil { + return xerrors.Errorf("Failed to prepare minerid_dealid_sectorid statement: %w", err) + } + + for sde := range dealEvents { + for _, did := range sde.DealIDs { + if _, err := stmt.Exec( + uint64(did), + sde.MinerID.String(), + sde.SectorID, + ); err != nil { + return err + } + } + } + + if err := stmt.Close(); err != nil { + return xerrors.Errorf("Failed to close miner sector deals statement: %w", err) + } + + if _, err := tx.Exec(`insert into minerid_dealid_sectorid select * from mds on conflict do nothing`); err != nil { + return xerrors.Errorf("Failed to insert into miner deal sector table: %w", err) + } + + if err := tx.Commit(); err != nil { + return xerrors.Errorf("Failed to commit miner deal sector table: %w", err) + } + return nil + +} + func (p *Processor) storeMinersPower(miners []minerActorInfo) error { start := time.Now() defer func() { diff --git a/cmd/lotus-chainwatch/processor/mpool.go b/cmd/lotus-chainwatch/processor/mpool.go index 1f5826170..0a6445d78 100644 --- a/cmd/lotus-chainwatch/processor/mpool.go +++ b/cmd/lotus-chainwatch/processor/mpool.go @@ -47,8 +47,6 @@ func (p *Processor) subMpool(ctx context.Context) { msgs[v.Message.Message.Cid()] = &v.Message.Message } - log.Debugf("Processing %d mpool updates", len(msgs)) - err := p.storeMessages(msgs) if err != nil { log.Error(err) diff --git a/cmd/lotus-chainwatch/processor/power.go b/cmd/lotus-chainwatch/processor/power.go index daf17a884..6fa03e943 100644 --- a/cmd/lotus-chainwatch/processor/power.go +++ b/cmd/lotus-chainwatch/processor/power.go @@ -7,6 +7,7 @@ import ( "golang.org/x/xerrors" + "github.com/filecoin-project/specs-actors/actors/abi/big" "github.com/filecoin-project/specs-actors/actors/builtin" "github.com/filecoin-project/specs-actors/actors/builtin/power" "github.com/filecoin-project/specs-actors/actors/util/smoothing" @@ -15,7 +16,19 @@ import ( type powerActorInfo struct { common actorInfo - epochSmoothingEstimate *smoothing.FilterEstimate + totalRawBytes big.Int + totalRawBytesCommitted big.Int + totalQualityAdjustedBytes big.Int + totalQualityAdjustedBytesCommitted big.Int + totalPledgeCollateral big.Int + + newRawBytes big.Int + newQualityAdjustedBytes big.Int + newPledgeCollateral big.Int + newQAPowerSmoothed *smoothing.FilterEstimate + + minerCount int64 + minerCountAboveMinimumPower int64 } func (p *Processor) setupPower() error { @@ -25,13 +38,27 @@ func (p *Processor) setupPower() error { } if _, err := tx.Exec(` -create table if not exists power_smoothing_estimates +create table if not exists chain_power ( - state_root text not null - constraint power_smoothing_estimates_pk - primary key, - position_estimate text not null, - velocity_estimate text not null + state_root text not null + constraint power_smoothing_estimates_pk + primary key, + + new_raw_bytes_power text not null, + new_qa_bytes_power text not null, + new_pledge_collateral text not null, + + total_raw_bytes_power text not null, + total_raw_bytes_committed text not null, + total_qa_bytes_power text not null, + total_qa_bytes_committed text not null, + total_pledge_collateral text not null, + + qa_smoothed_position_estimate text not null, + qa_smoothed_velocity_estimate text not null, + + miner_count int not null, + minimum_consensus_miner_count int not null ); `); err != nil { return err @@ -60,8 +87,8 @@ func (p *Processor) processPowerActors(ctx context.Context, powerTips ActorTips) }() var out []powerActorInfo - for tipset, powers := range powerTips { - for _, act := range powers { + for tipset, powerStates := range powerTips { + for _, act := range powerStates { var pw powerActorInfo pw.common = act @@ -80,7 +107,19 @@ func (p *Processor) processPowerActors(ctx context.Context, powerTips ActorTips) return nil, xerrors.Errorf("unmarshal state (@ %s): %w", pw.common.stateroot.String(), err) } - pw.epochSmoothingEstimate = powerActorState.ThisEpochQAPowerSmoothed + pw.totalRawBytes = powerActorState.TotalRawBytePower + pw.totalRawBytesCommitted = powerActorState.TotalBytesCommitted + pw.totalQualityAdjustedBytes = powerActorState.TotalQualityAdjPower + pw.totalQualityAdjustedBytesCommitted = powerActorState.TotalQABytesCommitted + pw.totalPledgeCollateral = powerActorState.TotalPledgeCollateral + + pw.newRawBytes = powerActorState.ThisEpochRawBytePower + pw.newQualityAdjustedBytes = powerActorState.ThisEpochQualityAdjPower + pw.newPledgeCollateral = powerActorState.ThisEpochPledgeCollateral + pw.newQAPowerSmoothed = powerActorState.ThisEpochQAPowerSmoothed + + pw.minerCount = powerActorState.MinerCount + pw.minerCountAboveMinimumPower = powerActorState.MinerAboveMinPowerCount out = append(out, pw) } } @@ -88,46 +127,59 @@ func (p *Processor) processPowerActors(ctx context.Context, powerTips ActorTips) return out, nil } -func (p *Processor) persistPowerActors(ctx context.Context, powers []powerActorInfo) error { +func (p *Processor) persistPowerActors(ctx context.Context, powerStates []powerActorInfo) error { // NB: use errgroup when there is more than a single store operation - return p.storePowerSmoothingEstimates(powers) + return p.storePowerSmoothingEstimates(powerStates) } -func (p *Processor) storePowerSmoothingEstimates(powers []powerActorInfo) error { +func (p *Processor) storePowerSmoothingEstimates(powerStates []powerActorInfo) error { tx, err := p.db.Begin() if err != nil { - return xerrors.Errorf("begin power_smoothing_estimates tx: %w", err) + return xerrors.Errorf("begin chain_power tx: %w", err) } - if _, err := tx.Exec(`create temp table rse (like power_smoothing_estimates) on commit drop`); err != nil { - return xerrors.Errorf("prep power_smoothing_estimates: %w", err) + if _, err := tx.Exec(`create temp table cp (like chain_power) on commit drop`); err != nil { + return xerrors.Errorf("prep chain_power: %w", err) } - stmt, err := tx.Prepare(`copy rse (state_root, position_estimate, velocity_estimate) from stdin;`) + stmt, err := tx.Prepare(`copy cp (state_root, new_raw_bytes_power, new_qa_bytes_power, new_pledge_collateral, total_raw_bytes_power, total_raw_bytes_committed, total_qa_bytes_power, total_qa_bytes_committed, total_pledge_collateral, qa_smoothed_position_estimate, qa_smoothed_velocity_estimate, miner_count, minimum_consensus_miner_count) from stdin;`) if err != nil { - return xerrors.Errorf("prepare tmp power_smoothing_estimates: %w", err) + return xerrors.Errorf("prepare tmp chain_power: %w", err) } - for _, powerState := range powers { + for _, ps := range powerStates { if _, err := stmt.Exec( - powerState.common.stateroot.String(), - powerState.epochSmoothingEstimate.PositionEstimate.String(), - powerState.epochSmoothingEstimate.VelocityEstimate.String(), + ps.common.stateroot.String(), + ps.newRawBytes.String(), + ps.newQualityAdjustedBytes.String(), + ps.newPledgeCollateral.String(), + + ps.totalRawBytes.String(), + ps.totalRawBytesCommitted.String(), + ps.totalQualityAdjustedBytes.String(), + ps.totalQualityAdjustedBytesCommitted.String(), + ps.totalPledgeCollateral.String(), + + ps.newQAPowerSmoothed.PositionEstimate.String(), + ps.newQAPowerSmoothed.VelocityEstimate.String(), + + ps.minerCount, + ps.minerCountAboveMinimumPower, ); err != nil { return xerrors.Errorf("failed to store smoothing estimate: %w", err) } } if err := stmt.Close(); err != nil { - return xerrors.Errorf("close prepared power_smoothing_estimates: %w", err) + return xerrors.Errorf("close prepared chain_power: %w", err) } - if _, err := tx.Exec(`insert into power_smoothing_estimates select * from rse on conflict do nothing`); err != nil { - return xerrors.Errorf("insert power_smoothing_estimates from tmp: %w", err) + if _, err := tx.Exec(`insert into chain_power select * from cp on conflict do nothing`); err != nil { + return xerrors.Errorf("insert chain_power from tmp: %w", err) } if err := tx.Commit(); err != nil { - return xerrors.Errorf("commit power_smoothing_estimates tx: %w", err) + return xerrors.Errorf("commit chain_power tx: %w", err) } return nil diff --git a/cmd/lotus-chainwatch/processor/processor.go b/cmd/lotus-chainwatch/processor/processor.go index 9b12a2cf7..99548aeac 100644 --- a/cmd/lotus-chainwatch/processor/processor.go +++ b/cmd/lotus-chainwatch/processor/processor.go @@ -35,9 +35,6 @@ type Processor struct { // number of blocks processed at a time batch int - - // process communication channels - sectorDealEvents chan *SectorDealEvent } type ActorTips map[types.TipSetKey][]actorInfo @@ -152,7 +149,6 @@ func (p *Processor) Start(ctx context.Context) { log.Errorf("Failed to handle market changes: %w", err) return } - log.Info("Processed Market Changes") }() grp.Add(1) @@ -162,7 +158,6 @@ func (p *Processor) Start(ctx context.Context) { log.Errorf("Failed to handle miner changes: %w", err) return } - log.Info("Processed Miner Changes") }() grp.Add(1) @@ -172,7 +167,6 @@ func (p *Processor) Start(ctx context.Context) { log.Errorf("Failed to handle reward changes: %w", err) return } - log.Info("Processed Reward Changes") }() grp.Add(1) @@ -182,7 +176,6 @@ func (p *Processor) Start(ctx context.Context) { log.Errorf("Failed to handle power actor changes: %w", err) return } - log.Info("Processes Power Changes") }() grp.Add(1) @@ -192,7 +185,6 @@ func (p *Processor) Start(ctx context.Context) { log.Errorf("Failed to handle message changes: %w", err) return } - log.Info("Processed Message Changes") }() grp.Add(1) @@ -202,7 +194,6 @@ func (p *Processor) Start(ctx context.Context) { log.Errorf("Failed to handle common actor changes: %w", err) return } - log.Info("Processed CommonActor Changes") }() grp.Wait() @@ -214,7 +205,7 @@ func (p *Processor) Start(ctx context.Context) { if err := p.refreshViews(); err != nil { log.Errorw("Failed to refresh views", "error", err) } - log.Infow("Processed Batch", "duration", time.Since(loopStart).String()) + log.Infow("Processed Batch Complete", "duration", time.Since(loopStart).String()) } } }() diff --git a/cmd/lotus-chainwatch/processor/reward.go b/cmd/lotus-chainwatch/processor/reward.go index 230b3c6c1..7068c1a93 100644 --- a/cmd/lotus-chainwatch/processor/reward.go +++ b/cmd/lotus-chainwatch/processor/reward.go @@ -5,7 +5,6 @@ import ( "context" "time" - "golang.org/x/sync/errgroup" "golang.org/x/xerrors" "github.com/filecoin-project/specs-actors/actors/abi/big" @@ -13,20 +12,23 @@ import ( "github.com/filecoin-project/specs-actors/actors/builtin/reward" "github.com/filecoin-project/specs-actors/actors/util/smoothing" - "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/types" ) type rewardActorInfo struct { common actorInfo - // expected power in bytes during this epoch - baselinePower big.Int + cumSumBaselinePower big.Int + cumSumRealizedPower big.Int - // base reward in attofil for each block found during this epoch - baseBlockReward big.Int + effectiveNetworkTime int64 + effectiveBaselinePower big.Int - epochSmoothingEstimate *smoothing.FilterEstimate + newBaselinePower big.Int + newBaseReward big.Int + newSmoothingEstimate *smoothing.FilterEstimate + + totalMinedReward big.Int } func (p *Processor) setupRewards() error { @@ -36,34 +38,23 @@ func (p *Processor) setupRewards() error { } if _, err := tx.Exec(` -/* -* captures base block reward per miner per state root and does not -* include penalties or gas reward -*/ -create table if not exists base_block_rewards -( - state_root text not null - constraint block_rewards_pk - primary key, - base_block_reward numeric not null -); - /* captures chain-specific power state for any given stateroot */ -create table if not exists chain_power +create table if not exists chain_reward ( state_root text not null - constraint chain_power_pk + constraint chain_reward_pk primary key, - baseline_power text not null -); + cum_sum_baseline text not null, + cum_sum_realized text not null, + effective_network_time int not null, + effective_baseline_power text not null, -create table if not exists reward_smoothing_estimates -( - state_root text not null - constraint reward_smoothing_estimates_pk - primary key, - position_estimate text not null, - velocity_estimate text not null + new_baseline_power text not null, + new_reward numeric not null, + new_reward_smoothed_position_estimate text not null, + new_reward_smoothed_velocity_estimate text not null, + + total_mined_reward text not null ); `); err != nil { return err @@ -113,9 +104,14 @@ func (p *Processor) processRewardActors(ctx context.Context, rewardTips ActorTip return nil, xerrors.Errorf("unmarshal state (@ %s): %w", rw.common.stateroot.String(), err) } - rw.baseBlockReward = rewardActorState.ThisEpochReward - rw.baselinePower = rewardActorState.ThisEpochBaselinePower - rw.epochSmoothingEstimate = rewardActorState.ThisEpochRewardSmoothed + rw.cumSumBaselinePower = rewardActorState.CumsumBaseline + rw.cumSumRealizedPower = rewardActorState.CumsumRealized + rw.effectiveNetworkTime = int64(rewardActorState.EffectiveNetworkTime) + rw.effectiveBaselinePower = rewardActorState.EffectiveBaselinePower + rw.newBaselinePower = rewardActorState.ThisEpochBaselinePower + rw.newBaseReward = rewardActorState.ThisEpochReward + rw.newSmoothingEstimate = rewardActorState.ThisEpochRewardSmoothed + rw.totalMinedReward = rewardActorState.TotalMined out = append(out, rw) } } @@ -145,8 +141,14 @@ func (p *Processor) processRewardActors(ctx context.Context, rewardTips ActorTip return nil, err } - rw.baseBlockReward = rewardActorState.ThisEpochReward - rw.baselinePower = rewardActorState.ThisEpochBaselinePower + rw.cumSumBaselinePower = rewardActorState.CumsumBaseline + rw.cumSumRealizedPower = rewardActorState.CumsumRealized + rw.effectiveNetworkTime = int64(rewardActorState.EffectiveNetworkTime) + rw.effectiveBaselinePower = rewardActorState.EffectiveBaselinePower + rw.newBaselinePower = rewardActorState.ThisEpochBaselinePower + rw.newBaseReward = rewardActorState.ThisEpochReward + rw.newSmoothingEstimate = rewardActorState.ThisEpochRewardSmoothed + rw.totalMinedReward = rewardActorState.TotalMined out = append(out, rw) } @@ -159,149 +161,47 @@ func (p *Processor) persistRewardActors(ctx context.Context, rewards []rewardAct log.Debugw("Persisted Reward Actors", "duration", time.Since(start).String()) }() - grp, ctx := errgroup.WithContext(ctx) //nolint - - grp.Go(func() error { - if err := p.storeChainPower(rewards); err != nil { - return err - } - return nil - }) - - grp.Go(func() error { - if err := p.storeBaseBlockReward(rewards); err != nil { - return err - } - return nil - }) - - grp.Go(func() error { - if err := p.storeRewardSmoothingEstimates(rewards); err != nil { - return err - } - return nil - }) - - return grp.Wait() -} - -func (p *Processor) storeChainPower(rewards []rewardActorInfo) error { tx, err := p.db.Begin() if err != nil { - return xerrors.Errorf("begin chain_power tx: %w", err) + return xerrors.Errorf("begin chain_reward tx: %w", err) } - if _, err := tx.Exec(`create temp table cp (like chain_power excluding constraints) on commit drop`); err != nil { - return xerrors.Errorf("prep chain_power temp: %w", err) + if _, err := tx.Exec(`create temp table cr (like chain_reward excluding constraints) on commit drop`); err != nil { + return xerrors.Errorf("prep chain_reward temp: %w", err) } - stmt, err := tx.Prepare(`copy cp (state_root, baseline_power) from STDIN`) + stmt, err := tx.Prepare(`copy cr ( state_root, cum_sum_baseline, cum_sum_realized, effective_network_time, effective_baseline_power, new_baseline_power, new_reward, new_reward_smoothed_position_estimate, new_reward_smoothed_velocity_estimate, total_mined_reward) from STDIN`) if err != nil { - return xerrors.Errorf("prepare tmp chain_power: %w", err) + return xerrors.Errorf("prepare tmp chain_reward: %w", err) } for _, rewardState := range rewards { if _, err := stmt.Exec( rewardState.common.stateroot.String(), - rewardState.baselinePower.String(), + rewardState.cumSumBaselinePower.String(), + rewardState.cumSumRealizedPower.String(), + rewardState.effectiveNetworkTime, + rewardState.effectiveBaselinePower.String(), + rewardState.newBaselinePower.String(), + rewardState.newBaseReward.String(), + rewardState.newSmoothingEstimate.PositionEstimate.String(), + rewardState.newSmoothingEstimate.VelocityEstimate.String(), + rewardState.totalMinedReward.String(), ); err != nil { log.Errorw("failed to store chain power", "state_root", rewardState.common.stateroot, "error", err) } } if err := stmt.Close(); err != nil { - return xerrors.Errorf("close prepared chain_power: %w", err) + return xerrors.Errorf("close prepared chain_reward: %w", err) } - if _, err := tx.Exec(`insert into chain_power select * from cp on conflict do nothing`); err != nil { - return xerrors.Errorf("insert chain_power from tmp: %w", err) + if _, err := tx.Exec(`insert into chain_reward select * from cr on conflict do nothing`); err != nil { + return xerrors.Errorf("insert chain_reward from tmp: %w", err) } if err := tx.Commit(); err != nil { - return xerrors.Errorf("commit chain_power tx: %w", err) - } - - return nil -} - -func (p *Processor) storeBaseBlockReward(rewards []rewardActorInfo) error { - tx, err := p.db.Begin() - if err != nil { - return xerrors.Errorf("begin base_block_reward tx: %w", err) - } - - if _, err := tx.Exec(`create temp table bbr (like base_block_rewards excluding constraints) on commit drop`); err != nil { - return xerrors.Errorf("prep base_block_reward temp: %w", err) - } - - stmt, err := tx.Prepare(`copy bbr (state_root, base_block_reward) from STDIN`) - if err != nil { - return xerrors.Errorf("prepare tmp base_block_reward: %w", err) - } - - for _, rewardState := range rewards { - baseBlockReward := big.Div(rewardState.baseBlockReward, big.NewIntUnsigned(build.BlocksPerEpoch)) - if _, err := stmt.Exec( - rewardState.common.stateroot.String(), - baseBlockReward.String(), - ); err != nil { - log.Errorw("failed to store base block reward", "state_root", rewardState.common.stateroot, "error", err) - } - } - - if err := stmt.Close(); err != nil { - return xerrors.Errorf("close prepared base_block_reward: %w", err) - } - - if _, err := tx.Exec(`insert into base_block_rewards select * from bbr on conflict do nothing`); err != nil { - return xerrors.Errorf("insert base_block_reward from tmp: %w", err) - } - - if err := tx.Commit(); err != nil { - return xerrors.Errorf("commit base_block_reward tx: %w", err) - } - - return nil -} - -func (p *Processor) storeRewardSmoothingEstimates(rewards []rewardActorInfo) error { - tx, err := p.db.Begin() - if err != nil { - return xerrors.Errorf("begin reward_smoothing_estimates tx: %w", err) - } - - if _, err := tx.Exec(`create temp table rse (like reward_smoothing_estimates) on commit drop`); err != nil { - return xerrors.Errorf("prep reward_smoothing_estimates: %w", err) - } - - stmt, err := tx.Prepare(`copy rse (state_root, position_estimate, velocity_estimate) from stdin;`) - if err != nil { - return xerrors.Errorf("prepare tmp reward_smoothing_estimates: %w", err) - } - - for _, rewardState := range rewards { - if rewardState.epochSmoothingEstimate == nil { - continue - } - if _, err := stmt.Exec( - rewardState.common.stateroot.String(), - rewardState.epochSmoothingEstimate.PositionEstimate.String(), - rewardState.epochSmoothingEstimate.VelocityEstimate.String(), - ); err != nil { - return xerrors.Errorf("failed to store smoothing estimate: %w", err) - } - } - - if err := stmt.Close(); err != nil { - return xerrors.Errorf("close prepared reward_smoothing_estimates: %w", err) - } - - if _, err := tx.Exec(`insert into reward_smoothing_estimates select * from rse on conflict do nothing`); err != nil { - return xerrors.Errorf("insert reward_smoothing_estimates from tmp: %w", err) - } - - if err := tx.Commit(); err != nil { - return xerrors.Errorf("commit reward_smoothing_estimates tx: %w", err) + return xerrors.Errorf("commit chain_reward tx: %w", err) } return nil diff --git a/cmd/lotus-chainwatch/run.go b/cmd/lotus-chainwatch/run.go index 8bdcfcfe3..64f242755 100644 --- a/cmd/lotus-chainwatch/run.go +++ b/cmd/lotus-chainwatch/run.go @@ -2,20 +2,25 @@ package main import ( "database/sql" + "fmt" "net/http" _ "net/http/pprof" "os" + "strings" _ "github.com/lib/pq" - lcli "github.com/filecoin-project/lotus/cli" + "github.com/filecoin-project/go-jsonrpc" logging "github.com/ipfs/go-log/v2" "github.com/urfave/cli/v2" "golang.org/x/xerrors" + "github.com/filecoin-project/lotus/api" + lcli "github.com/filecoin-project/lotus/cli" "github.com/filecoin-project/lotus/cmd/lotus-chainwatch/processor" "github.com/filecoin-project/lotus/cmd/lotus-chainwatch/scheduler" "github.com/filecoin-project/lotus/cmd/lotus-chainwatch/syncer" + "github.com/filecoin-project/lotus/cmd/lotus-chainwatch/util" ) var runCmd = &cli.Command{ @@ -39,9 +44,24 @@ var runCmd = &cli.Command{ return err } - api, closer, err := lcli.GetFullNodeAPI(cctx) - if err != nil { - return err + var api api.FullNode + var closer jsonrpc.ClientCloser + var err error + if tokenMaddr := cctx.String("api"); tokenMaddr != "" { + toks := strings.Split(tokenMaddr, ":") + if len(toks) != 2 { + return fmt.Errorf("invalid api tokens, expected :, got: %s", tokenMaddr) + } + + api, closer, err = util.GetFullNodeAPIUsingCredentials(cctx.Context, toks[1], toks[0]) + if err != nil { + return err + } + } else { + api, closer, err = lcli.GetFullNodeAPI(cctx) + if err != nil { + return err + } } defer closer() ctx := lcli.ReqContext(cctx) @@ -70,7 +90,7 @@ var runCmd = &cli.Command{ } db.SetMaxOpenConns(1350) - sync := syncer.NewSyncer(db, api) + sync := syncer.NewSyncer(db, api, 1400) sync.Start(ctx) proc := processor.NewProcessor(ctx, db, api, maxBatch) diff --git a/cmd/lotus-chainwatch/scheduler/refresh_top_miners_by_base_reward.go b/cmd/lotus-chainwatch/scheduler/refresh_top_miners_by_base_reward.go index b6a24507f..145e84229 100644 --- a/cmd/lotus-chainwatch/scheduler/refresh_top_miners_by_base_reward.go +++ b/cmd/lotus-chainwatch/scheduler/refresh_top_miners_by_base_reward.go @@ -3,7 +3,6 @@ package scheduler import ( "context" "database/sql" - "time" "golang.org/x/xerrors" ) @@ -24,9 +23,9 @@ func setupTopMinerByBaseRewardSchema(ctx context.Context, db *sql.DB) error { with total_rewards_by_miner as ( select b.miner, - sum(bbr.base_block_reward * b.win_count) as total_reward + sum(cr.new_reward * b.win_count) as total_reward from blocks b - inner join base_block_rewards bbr on b.parentstateroot = bbr.state_root + inner join chain_reward cr on b.parentstateroot = cr.state_root group by 1 ) select rank() over (order by total_reward desc), @@ -43,17 +42,17 @@ func setupTopMinerByBaseRewardSchema(ctx context.Context, db *sql.DB) error { b."timestamp"as current_timestamp, max(b.height) as current_height from blocks b - join base_block_rewards bbr on b.parentstateroot = bbr.state_root - where bbr.base_block_reward is not null + join chain_reward cr on b.parentstateroot = cr.state_root + where cr.new_reward is not null group by 1 order by 1 desc limit 1; `); err != nil { - return xerrors.Errorf("create top_miner_by_base_reward views: %w", err) + return xerrors.Errorf("create top_miners_by_base_reward views: %w", err) } if err := tx.Commit(); err != nil { - return xerrors.Errorf("committing top_miner_by_base_reward views; %w", err) + return xerrors.Errorf("committing top_miners_by_base_reward views; %w", err) } return nil } @@ -65,11 +64,6 @@ func refreshTopMinerByBaseReward(ctx context.Context, db *sql.DB) error { default: } - t := time.Now() - defer func() { - log.Debugw("refresh top_miners_by_base_reward", "duration", time.Since(t).String()) - }() - _, err := db.Exec("refresh materialized view top_miners_by_base_reward;") if err != nil { return xerrors.Errorf("refresh top_miners_by_base_reward: %w", err) diff --git a/cmd/lotus-chainwatch/scheduler/scheduler.go b/cmd/lotus-chainwatch/scheduler/scheduler.go index 936b61ce7..6782bc16d 100644 --- a/cmd/lotus-chainwatch/scheduler/scheduler.go +++ b/cmd/lotus-chainwatch/scheduler/scheduler.go @@ -40,7 +40,7 @@ func (s *Scheduler) Start(ctx context.Context) { go func() { // run once on start after schema has initialized - time.Sleep(5 * time.Second) + time.Sleep(1 * time.Minute) if err := refreshTopMinerByBaseReward(ctx, s.db); err != nil { log.Errorw("failed to refresh top miner", "error", err) } diff --git a/cmd/lotus-chainwatch/syncer/sync.go b/cmd/lotus-chainwatch/syncer/sync.go index 69195b536..7b8153777 100644 --- a/cmd/lotus-chainwatch/syncer/sync.go +++ b/cmd/lotus-chainwatch/syncer/sync.go @@ -23,14 +23,17 @@ var log = logging.Logger("syncer") type Syncer struct { db *sql.DB + lookbackLimit uint64 + headerLk sync.Mutex node api.FullNode } -func NewSyncer(db *sql.DB, node api.FullNode) *Syncer { +func NewSyncer(db *sql.DB, node api.FullNode, lookbackLimit uint64) *Syncer { return &Syncer{ - db: db, - node: node, + db: db, + node: node, + lookbackLimit: lookbackLimit, } } @@ -148,40 +151,34 @@ create index if not exists state_heights_parentstateroot_index } func (s *Syncer) Start(ctx context.Context) { + if err := logging.SetLogLevel("syncer", "info"); err != nil { + log.Fatal(err) + } log.Debug("Starting Syncer") if err := s.setupSchemas(); err != nil { log.Fatal(err) } - // doing the initial sync here lets us avoid the HCCurrent case in the switch - head, err := s.node.ChainHead(ctx) - if err != nil { - log.Fatalw("Failed to get chain head form lotus", "error", err) - } - - unsynced, err := s.unsyncedBlocks(ctx, head, time.Unix(0, 0)) - if err != nil { - log.Fatalw("failed to gather unsynced blocks", "error", err) - } - - if err := s.storeHeaders(unsynced, true, time.Now()); err != nil { - log.Fatalw("failed to store unsynced blocks", "error", err) - } - // continue to keep the block headers table up to date. notifs, err := s.node.ChainNotify(ctx) if err != nil { log.Fatal(err) } - lastSynced := time.Now() + // we need to ensure that on a restart we don't reprocess the whole flarping chain + blkCID, height, err := s.mostRecentlySyncedBlockHeight() + if err != nil { + log.Fatalw("failed to find most recently synced block", "error", err) + } + log.Infow("Found starting point for syncing", "blockCID", blkCID.String(), "height", height) + sinceEpoch := uint64(height) go func() { for notif := range notifs { for _, change := range notif { switch change.Type { case store.HCApply: - unsynced, err := s.unsyncedBlocks(ctx, change.Val, lastSynced) + unsynced, err := s.unsyncedBlocks(ctx, change.Val, sinceEpoch) if err != nil { log.Errorw("failed to gather unsynced blocks", "error", err) } @@ -194,13 +191,13 @@ func (s *Syncer) Start(ctx context.Context) { continue } - if err := s.storeHeaders(unsynced, true, lastSynced); err != nil { + if err := s.storeHeaders(unsynced, true, time.Now()); err != nil { // so this is pretty bad, need some kind of retry.. // for now just log an error and the blocks will be attempted again on next notifi log.Errorw("failed to store unsynced blocks", "error", err) } - lastSynced = time.Now() + sinceEpoch = uint64(change.Val.Height()) case store.HCRevert: log.Debug("revert todo") } @@ -209,12 +206,8 @@ func (s *Syncer) Start(ctx context.Context) { }() } -func (s *Syncer) unsyncedBlocks(ctx context.Context, head *types.TipSet, since time.Time) (map[cid.Cid]*types.BlockHeader, error) { - // get a list of blocks we have already synced in the past 3 mins. This ensures we aren't returning the entire - // table every time. - lookback := since.Add(-(time.Minute * 3)) - log.Debugw("Gathering unsynced blocks", "since", lookback.String()) - hasList, err := s.syncedBlocks(lookback) +func (s *Syncer) unsyncedBlocks(ctx context.Context, head *types.TipSet, since uint64) (map[cid.Cid]*types.BlockHeader, error) { + hasList, err := s.syncedBlocks(since, s.lookbackLimit) if err != nil { return nil, err } @@ -257,9 +250,8 @@ func (s *Syncer) unsyncedBlocks(ctx context.Context, head *types.TipSet, since t return toSync, nil } -func (s *Syncer) syncedBlocks(timestamp time.Time) (map[cid.Cid]struct{}, error) { - // timestamp is used to return a configurable amount of rows based on when they were last added. - rws, err := s.db.Query(`select cid FROM blocks_synced where synced_at > $1`, timestamp.Unix()) +func (s *Syncer) syncedBlocks(since, limit uint64) (map[cid.Cid]struct{}, error) { + rws, err := s.db.Query(`select bs.cid FROM blocks_synced bs left join blocks b on b.cid = bs.cid where b.height <= $1 and bs.processed_at is not null limit $2`, since, limit) if err != nil { return nil, xerrors.Errorf("Failed to query blocks_synced: %w", err) } @@ -281,14 +273,43 @@ func (s *Syncer) syncedBlocks(timestamp time.Time) (map[cid.Cid]struct{}, error) return out, nil } +func (s *Syncer) mostRecentlySyncedBlockHeight() (cid.Cid, int64, error) { + rw := s.db.QueryRow(` +select blocks_synced.cid, b.height +from blocks_synced +left join blocks b on blocks_synced.cid = b.cid +where processed_at is not null +order by height desc +limit 1 +`) + + var c string + var h int64 + if err := rw.Scan(&c, &h); err != nil { + if err == sql.ErrNoRows { + return cid.Undef, 0, nil + } + return cid.Undef, -1, err + } + + ci, err := cid.Parse(c) + if err != nil { + return cid.Undef, -1, err + } + + return ci, h, nil +} + func (s *Syncer) storeCirculatingSupply(ctx context.Context, tipset *types.TipSet) error { supply, err := s.node.StateCirculatingSupply(ctx, tipset.Key()) if err != nil { return err } - ceInsert := `insert into chain_economics (parent_state_root, circulating_fil, vested_fil, mined_fil, burnt_fil, locked_fil)` + - `values ('%s', '%s', '%s', '%s', '%s', '%s');` + ceInsert := `insert into chain_economics (parent_state_root, circulating_fil, vested_fil, mined_fil, burnt_fil, locked_fil) ` + + `values ('%s', '%s', '%s', '%s', '%s', '%s') on conflict on constraint chain_economics_pk do ` + + `update set (circulating_fil, vested_fil, mined_fil, burnt_fil, locked_fil) = ('%[2]s', '%[3]s', '%[4]s', '%[5]s', '%[6]s') ` + + `where chain_economics.parent_state_root = '%[1]s';` if _, err := s.db.Exec(fmt.Sprintf(ceInsert, tipset.ParentState().String(), diff --git a/cmd/lotus-chainwatch/util/api.go b/cmd/lotus-chainwatch/util/api.go new file mode 100644 index 000000000..cfda833e0 --- /dev/null +++ b/cmd/lotus-chainwatch/util/api.go @@ -0,0 +1,34 @@ +package util + +import ( + "context" + "net/http" + + "github.com/filecoin-project/go-jsonrpc" + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/api/client" + ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" +) + +func GetFullNodeAPIUsingCredentials(ctx context.Context, listenAddr, token string) (api.FullNode, jsonrpc.ClientCloser, error) { + parsedAddr, err := ma.NewMultiaddr(listenAddr) + if err != nil { + return nil, nil, err + } + + _, addr, err := manet.DialArgs(parsedAddr) + if err != nil { + return nil, nil, err + } + + return client.NewFullNodeRPC(ctx, apiURI(addr), apiHeaders(token)) +} +func apiURI(addr string) string { + return "ws://" + addr + "/rpc/v0" +} +func apiHeaders(token string) http.Header { + headers := http.Header{} + headers.Add("Authorization", "Bearer "+token) + return headers +} diff --git a/cmd/lotus/debug_advance.go b/cmd/lotus/debug_advance.go index 972107370..0c168b3be 100644 --- a/cmd/lotus/debug_advance.go +++ b/cmd/lotus/debug_advance.go @@ -60,7 +60,9 @@ func init() { } } - // TODO: beacon + + mbi, err := api.MinerGetBaseInfo(ctx, addr, head.Height()+1, head.Key()) + ep := &types.ElectionProof{} ep.WinCount = ep.ComputeWinCount(types.NewInt(1), types.NewInt(1)) for ep.WinCount == 0 { @@ -75,7 +77,7 @@ func init() { uts := head.MinTimestamp() + uint64(build.BlockDelaySecs) nheight := head.Height() + 1 blk, err := api.MinerCreateBlock(ctx, &lapi.BlockTemplate{ - addr, head.Key(), ticket, ep, nil, msgs, nheight, uts, gen.ValidWpostForTesting, + addr, head.Key(), ticket, ep, mbi.BeaconEntries, msgs, nheight, uts, gen.ValidWpostForTesting, }) if err != nil { return xerrors.Errorf("creating block: %w", err) diff --git a/conformance/runner_test.go b/conformance/runner_test.go index 1e72f0c31..5443cfc6a 100644 --- a/conformance/runner_test.go +++ b/conformance/runner_test.go @@ -4,6 +4,7 @@ import ( "bytes" "compress/gzip" "context" + "encoding/base64" "encoding/json" "io/ioutil" "os" @@ -195,6 +196,9 @@ func executeMessageVector(t *testing.T, vector *schema.TestVector) { if expected, actual := receipt.GasUsed, ret.GasUsed; expected != actual { t.Errorf("gas used of msg %d did not match; expected: %d, got: %d", i, expected, actual) } + if expected, actual := []byte(receipt.ReturnValue), ret.Return; !bytes.Equal(expected, actual) { + t.Errorf("return value of msg %d did not match; expected: %s, got: %s", i, base64.StdEncoding.EncodeToString(expected), base64.StdEncoding.EncodeToString(actual)) + } } // Once all messages are applied, assert that the final state root matches diff --git a/extern/storage-sealing/sealing.go b/extern/storage-sealing/sealing.go index 31fc0faf6..e48679cc7 100644 --- a/extern/storage-sealing/sealing.go +++ b/extern/storage-sealing/sealing.go @@ -224,15 +224,23 @@ func (m *Sealing) Remove(ctx context.Context, sid abi.SectorNumber) error { // Caller should NOT hold m.unsealedInfoMap.lk func (m *Sealing) StartPacking(sectorID abi.SectorNumber) error { + // locking here ensures that when the SectorStartPacking event is sent, the sector won't be picked up anywhere else + m.unsealedInfoMap.lk.Lock() + defer m.unsealedInfoMap.lk.Unlock() + + // cannot send SectorStartPacking to sectors that have already been packed, otherwise it will cause the state machine to exit + if _, ok := m.unsealedInfoMap.infos[sectorID]; !ok { + log.Warnf("call start packing, but sector %v not in unsealedInfoMap.infos, maybe have called", sectorID) + return nil + } log.Infof("Starting packing sector %d", sectorID) err := m.sectors.Send(uint64(sectorID), SectorStartPacking{}) if err != nil { return err } + log.Infof("send Starting packing event success sector %d", sectorID) - m.unsealedInfoMap.lk.Lock() delete(m.unsealedInfoMap.infos, sectorID) - m.unsealedInfoMap.lk.Unlock() return nil } diff --git a/metrics/metrics.go b/metrics/metrics.go index e00208d5d..a6732e8ea 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -19,6 +19,7 @@ var ( Commit, _ = tag.NewKey("commit") PeerID, _ = tag.NewKey("peer_id") FailureType, _ = tag.NewKey("failure_type") + Local, _ = tag.NewKey("local") MessageFrom, _ = tag.NewKey("message_from") MessageTo, _ = tag.NewKey("message_to") MessageNonce, _ = tag.NewKey("message_nonce") @@ -30,7 +31,7 @@ var ( LotusInfo = stats.Int64("info", "Arbitrary counter to tag lotus info to", stats.UnitDimensionless) ChainNodeHeight = stats.Int64("chain/node_height", "Current Height of the node", stats.UnitDimensionless) ChainNodeWorkerHeight = stats.Int64("chain/node_worker_height", "Current Height of workers on the node", stats.UnitDimensionless) - MessagePublished = stats.Int64("message/pubished", "Counter for total locally published messages", stats.UnitDimensionless) + MessagePublished = stats.Int64("message/published", "Counter for total locally published messages", stats.UnitDimensionless) MessageReceived = stats.Int64("message/received", "Counter for total received messages", stats.UnitDimensionless) MessageValidationFailure = stats.Int64("message/failure", "Counter for message validation failures", stats.UnitDimensionless) MessageValidationSuccess = stats.Int64("message/success", "Counter for message validation successes", stats.UnitDimensionless) @@ -82,6 +83,10 @@ var ( Measure: BlockValidationDurationMilliseconds, Aggregation: defaultMillisecondsDistribution, } + MessagePublishedView = &view.View{ + Measure: MessagePublished, + Aggregation: view.Count(), + } MessageReceivedView = &view.View{ Measure: MessageReceived, Aggregation: view.Count(), @@ -89,7 +94,7 @@ var ( MessageValidationFailureView = &view.View{ Measure: MessageValidationFailure, Aggregation: view.Count(), - TagKeys: []tag.Key{FailureType}, + TagKeys: []tag.Key{FailureType, Local}, } MessageValidationSuccessView = &view.View{ Measure: MessageValidationSuccess, @@ -99,6 +104,34 @@ var ( Measure: PeerCount, Aggregation: view.LastValue(), } + PubsubPublishMessageView = &view.View{ + Measure: PubsubPublishMessage, + Aggregation: view.Count(), + } + PubsubDeliverMessageView = &view.View{ + Measure: PubsubDeliverMessage, + Aggregation: view.Count(), + } + PubsubRejectMessageView = &view.View{ + Measure: PubsubRejectMessage, + Aggregation: view.Count(), + } + PubsubDuplicateMessageView = &view.View{ + Measure: PubsubDuplicateMessage, + Aggregation: view.Count(), + } + PubsubRecvRPCView = &view.View{ + Measure: PubsubRecvRPC, + Aggregation: view.Count(), + } + PubsubSendRPCView = &view.View{ + Measure: PubsubSendRPC, + Aggregation: view.Count(), + } + PubsubDropRPCView = &view.View{ + Measure: PubsubDropRPC, + Aggregation: view.Count(), + } ) // DefaultViews is an array of OpenCensus views for metric gathering purposes @@ -110,10 +143,18 @@ var DefaultViews = append([]*view.View{ BlockValidationFailureView, BlockValidationSuccessView, BlockValidationDurationView, + MessagePublishedView, MessageReceivedView, MessageValidationFailureView, MessageValidationSuccessView, PeerCountView, + PubsubPublishMessageView, + PubsubDeliverMessageView, + PubsubRejectMessageView, + PubsubDuplicateMessageView, + PubsubRecvRPCView, + PubsubSendRPCView, + PubsubDropRPCView, }, rpcmetrics.DefaultViews...) diff --git a/node/modules/core.go b/node/modules/core.go index 0305c737e..d73e4e25d 100644 --- a/node/modules/core.go +++ b/node/modules/core.go @@ -94,7 +94,13 @@ func BuiltinBootstrap() (dtypes.BootstrapPeers, error) { } func DrandBootstrap(d dtypes.DrandConfig) (dtypes.DrandBootstrap, error) { - return addrutil.ParseAddresses(context.TODO(), d.Relays) + // TODO: retry resolving, don't fail if at least one resolve succeeds + addrs, err := addrutil.ParseAddresses(context.TODO(), d.Relays) + if err != nil { + log.Errorf("reoslving drand relays addresses: %+v", err) + return nil, nil + } + return addrs, nil } func SetupJournal(lr repo.LockedRepo) error { diff --git a/node/modules/lp2p/pubsub.go b/node/modules/lp2p/pubsub.go index f847cf7ad..c7fb5123a 100644 --- a/node/modules/lp2p/pubsub.go +++ b/node/modules/lp2p/pubsub.go @@ -235,7 +235,7 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) { PublishThreshold: -1000, GraylistThreshold: -2500, AcceptPXThreshold: 1000, - OpportunisticGraftThreshold: 5, + OpportunisticGraftThreshold: 3.5, }, ), pubsub.WithPeerScoreInspect(in.Sk.Update, 10*time.Second), @@ -251,6 +251,7 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) { pubsub.GossipSubDout = 0 pubsub.GossipSubDlazy = 1024 pubsub.GossipSubGossipFactor = 0.5 + pubsub.GossipSubPruneBackoff = 5 * time.Minute // turn on PX options = append(options, pubsub.WithPeerExchange(true)) }