utility to retrieve miner worker key from the chain
This commit is contained in:
parent
5bd0e92068
commit
6e0dac06f4
@ -1,13 +1,17 @@
|
|||||||
package sub
|
package sub
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
|
address "github.com/filecoin-project/go-address"
|
||||||
|
miner "github.com/filecoin-project/specs-actors/actors/builtin/miner"
|
||||||
lru "github.com/hashicorp/golang-lru"
|
lru "github.com/hashicorp/golang-lru"
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
|
cbor "github.com/ipfs/go-ipld-cbor"
|
||||||
logging "github.com/ipfs/go-log/v2"
|
logging "github.com/ipfs/go-log/v2"
|
||||||
connmgr "github.com/libp2p/go-libp2p-core/connmgr"
|
connmgr "github.com/libp2p/go-libp2p-core/connmgr"
|
||||||
"github.com/libp2p/go-libp2p-core/peer"
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
@ -18,7 +22,11 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/build"
|
"github.com/filecoin-project/lotus/build"
|
||||||
"github.com/filecoin-project/lotus/chain"
|
"github.com/filecoin-project/lotus/chain"
|
||||||
"github.com/filecoin-project/lotus/chain/messagepool"
|
"github.com/filecoin-project/lotus/chain/messagepool"
|
||||||
|
"github.com/filecoin-project/lotus/chain/state"
|
||||||
|
"github.com/filecoin-project/lotus/chain/stmgr"
|
||||||
|
"github.com/filecoin-project/lotus/chain/store"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
"github.com/filecoin-project/lotus/lib/bufbstore"
|
||||||
"github.com/filecoin-project/lotus/metrics"
|
"github.com/filecoin-project/lotus/metrics"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -87,15 +95,21 @@ type BlockValidator struct {
|
|||||||
recvBlocks *blockReceiptCache
|
recvBlocks *blockReceiptCache
|
||||||
|
|
||||||
blacklist func(peer.ID)
|
blacklist func(peer.ID)
|
||||||
|
|
||||||
|
// necessary for block validation
|
||||||
|
chain *store.ChainStore
|
||||||
|
stmgr *stmgr.StateManager
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewBlockValidator(blacklist func(peer.ID)) *BlockValidator {
|
func NewBlockValidator(chain *store.ChainStore, stmgr *stmgr.StateManager, blacklist func(peer.ID)) *BlockValidator {
|
||||||
p, _ := lru.New2Q(4096)
|
p, _ := lru.New2Q(4096)
|
||||||
return &BlockValidator{
|
return &BlockValidator{
|
||||||
peers: p,
|
peers: p,
|
||||||
killThresh: 10,
|
killThresh: 10,
|
||||||
blacklist: blacklist,
|
blacklist: blacklist,
|
||||||
recvBlocks: newBlockReceiptCache(),
|
recvBlocks: newBlockReceiptCache(),
|
||||||
|
chain: chain,
|
||||||
|
stmgr: stmgr,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -119,6 +133,8 @@ func (bv *BlockValidator) flagPeer(p peer.ID) {
|
|||||||
|
|
||||||
func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
|
func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
|
||||||
stats.Record(ctx, metrics.BlockReceived.M(1))
|
stats.Record(ctx, metrics.BlockReceived.M(1))
|
||||||
|
|
||||||
|
// make sure the block can be decoded
|
||||||
blk, err := types.DecodeBlockMsg(msg.GetData())
|
blk, err := types.DecodeBlockMsg(msg.GetData())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("got invalid block over pubsub: ", err)
|
log.Error("got invalid block over pubsub: ", err)
|
||||||
@ -128,6 +144,7 @@ func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub
|
|||||||
return pubsub.ValidationReject
|
return pubsub.ValidationReject
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// check the message limit constraints
|
||||||
if len(blk.BlsMessages)+len(blk.SecpkMessages) > build.BlockMessageLimit {
|
if len(blk.BlsMessages)+len(blk.SecpkMessages) > build.BlockMessageLimit {
|
||||||
log.Warnf("received block with too many messages over pubsub")
|
log.Warnf("received block with too many messages over pubsub")
|
||||||
ctx, _ = tag.New(ctx, tag.Insert(metrics.FailureType, "too_many_messages"))
|
ctx, _ = tag.New(ctx, tag.Insert(metrics.FailureType, "too_many_messages"))
|
||||||
@ -136,17 +153,72 @@ func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub
|
|||||||
return pubsub.ValidationReject
|
return pubsub.ValidationReject
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// we want to ensure that it is a block from a known miner; we reject blocks from unknown miners
|
||||||
|
// to prevent spam attacks.
|
||||||
|
// the logic works as follows: we lookup the miner in the chain for its key.
|
||||||
|
// if we can find it then it's a known miner and we can validate the signature.
|
||||||
|
// if we can't find it, we check whether we are (near) synced in the chain.
|
||||||
|
// if we are not synced we cannot validate the block and we must ignore it.
|
||||||
|
// if we are synced and the miner is uknown, then the block is rejcected.
|
||||||
|
|
||||||
|
// XXX Implement me
|
||||||
|
|
||||||
|
// it's a good block! make sure we've only seen it once
|
||||||
if bv.recvBlocks.add(blk.Header.Cid()) > 0 {
|
if bv.recvBlocks.add(blk.Header.Cid()) > 0 {
|
||||||
// TODO: once these changes propagate to the network, we can consider
|
// TODO: once these changes propagate to the network, we can consider
|
||||||
// dropping peers who send us the same block multiple times
|
// dropping peers who send us the same block multiple times
|
||||||
return pubsub.ValidationIgnore
|
return pubsub.ValidationIgnore
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// all good, accept the block
|
||||||
msg.ValidatorData = blk
|
msg.ValidatorData = blk
|
||||||
stats.Record(ctx, metrics.BlockValidationSuccess.M(1))
|
stats.Record(ctx, metrics.BlockValidationSuccess.M(1))
|
||||||
return pubsub.ValidationAccept
|
return pubsub.ValidationAccept
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (bv *BlockValidator) getMinerWorkerKey(ctx context.Context, msg *types.BlockMsg) (address.Address, error) {
|
||||||
|
addr := msg.Header.Miner
|
||||||
|
|
||||||
|
// TODO cache those, all this is expensive!
|
||||||
|
|
||||||
|
// TODO I have a feeling all this can be simplified by cleverer DI to use the API
|
||||||
|
ts := bv.chain.GetHeaviestTipSet()
|
||||||
|
st, _, err := bv.stmgr.TipSetState(ctx, ts)
|
||||||
|
if err != nil {
|
||||||
|
return address.Undef, err
|
||||||
|
}
|
||||||
|
buf := bufbstore.NewBufferedBstore(bv.chain.Blockstore())
|
||||||
|
cst := cbor.NewCborStore(buf)
|
||||||
|
state, err := state.LoadStateTree(cst, st)
|
||||||
|
if err != nil {
|
||||||
|
return address.Undef, err
|
||||||
|
}
|
||||||
|
act, err := state.GetActor(addr)
|
||||||
|
if err != nil {
|
||||||
|
return address.Undef, err
|
||||||
|
}
|
||||||
|
|
||||||
|
blk, err := bv.chain.Blockstore().Get(act.Head)
|
||||||
|
if err != nil {
|
||||||
|
return address.Undef, err
|
||||||
|
}
|
||||||
|
aso := blk.RawData()
|
||||||
|
|
||||||
|
var mst miner.State
|
||||||
|
err = mst.UnmarshalCBOR(bytes.NewReader(aso))
|
||||||
|
if err != nil {
|
||||||
|
return address.Undef, err
|
||||||
|
}
|
||||||
|
|
||||||
|
worker := mst.Info.Worker
|
||||||
|
key, err := bv.stmgr.ResolveToKeyAddress(ctx, worker, ts)
|
||||||
|
if err != nil {
|
||||||
|
return address.Undef, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return key, nil
|
||||||
|
}
|
||||||
|
|
||||||
type blockReceiptCache struct {
|
type blockReceiptCache struct {
|
||||||
blocks *lru.TwoQueueCache
|
blocks *lru.TwoQueueCache
|
||||||
}
|
}
|
||||||
|
@ -20,6 +20,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/beacon/drand"
|
"github.com/filecoin-project/lotus/chain/beacon/drand"
|
||||||
"github.com/filecoin-project/lotus/chain/blocksync"
|
"github.com/filecoin-project/lotus/chain/blocksync"
|
||||||
"github.com/filecoin-project/lotus/chain/messagepool"
|
"github.com/filecoin-project/lotus/chain/messagepool"
|
||||||
|
"github.com/filecoin-project/lotus/chain/stmgr"
|
||||||
"github.com/filecoin-project/lotus/chain/store"
|
"github.com/filecoin-project/lotus/chain/store"
|
||||||
"github.com/filecoin-project/lotus/chain/sub"
|
"github.com/filecoin-project/lotus/chain/sub"
|
||||||
"github.com/filecoin-project/lotus/lib/peermgr"
|
"github.com/filecoin-project/lotus/lib/peermgr"
|
||||||
@ -58,7 +59,7 @@ func RunBlockSync(h host.Host, svc *blocksync.BlockSyncService) {
|
|||||||
h.SetStreamHandler(blocksync.BlockSyncProtocolID, svc.HandleStream)
|
h.SetStreamHandler(blocksync.BlockSyncProtocolID, svc.HandleStream)
|
||||||
}
|
}
|
||||||
|
|
||||||
func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, s *chain.Syncer, h host.Host, nn dtypes.NetworkName) {
|
func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, s *chain.Syncer, chain *store.ChainStore, stmgr *stmgr.StateManager, h host.Host, nn dtypes.NetworkName) {
|
||||||
ctx := helpers.LifecycleCtx(mctx, lc)
|
ctx := helpers.LifecycleCtx(mctx, lc)
|
||||||
|
|
||||||
blocksub, err := ps.Subscribe(build.BlocksTopic(nn))
|
blocksub, err := ps.Subscribe(build.BlocksTopic(nn))
|
||||||
@ -66,10 +67,12 @@ func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.P
|
|||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
v := sub.NewBlockValidator(func(p peer.ID) {
|
v := sub.NewBlockValidator(
|
||||||
ps.BlacklistPeer(p)
|
chain, stmgr,
|
||||||
h.ConnManager().TagPeer(p, "badblock", -1000)
|
func(p peer.ID) {
|
||||||
})
|
ps.BlacklistPeer(p)
|
||||||
|
h.ConnManager().TagPeer(p, "badblock", -1000)
|
||||||
|
})
|
||||||
|
|
||||||
if err := ps.RegisterTopicValidator(build.BlocksTopic(nn), v.Validate); err != nil {
|
if err := ps.RegisterTopicValidator(build.BlocksTopic(nn), v.Validate); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
|
Loading…
Reference in New Issue
Block a user