Use new indexer pubsub message encoding
This commit is contained in:
parent
681ce94a34
commit
9481fa0a4b
@ -1,12 +1,14 @@
|
|||||||
package sub
|
package sub
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
address "github.com/filecoin-project/go-address"
|
address "github.com/filecoin-project/go-address"
|
||||||
|
"github.com/filecoin-project/go-legs/dtsync"
|
||||||
"github.com/filecoin-project/lotus/build"
|
"github.com/filecoin-project/lotus/build"
|
||||||
"github.com/filecoin-project/lotus/chain"
|
"github.com/filecoin-project/lotus/chain"
|
||||||
"github.com/filecoin-project/lotus/chain/consensus"
|
"github.com/filecoin-project/lotus/chain/consensus"
|
||||||
@ -25,7 +27,6 @@ import (
|
|||||||
connmgr "github.com/libp2p/go-libp2p-core/connmgr"
|
connmgr "github.com/libp2p/go-libp2p-core/connmgr"
|
||||||
"github.com/libp2p/go-libp2p-core/peer"
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||||
"github.com/multiformats/go-varint"
|
|
||||||
"go.opencensus.io/stats"
|
"go.opencensus.io/stats"
|
||||||
"go.opencensus.io/tag"
|
"go.opencensus.io/tag"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
@ -491,18 +492,20 @@ func (v *IndexerMessageValidator) Validate(ctx context.Context, pid peer.ID, msg
|
|||||||
return pubsub.ValidationReject
|
return pubsub.ValidationReject
|
||||||
}
|
}
|
||||||
|
|
||||||
// Decode CID and originator addresses from message.
|
idxrMsg := dtsync.Message{}
|
||||||
minerID, msgCid, err := decodeIndexerMessage(msg.Data)
|
err := idxrMsg.UnmarshalCBOR(bytes.NewBuffer(msg.Data))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorw("Could not decode pubsub message", "err", err)
|
log.Errorw("Could not decode indexer pubsub message", "err", err)
|
||||||
return pubsub.ValidationReject
|
return pubsub.ValidationReject
|
||||||
}
|
}
|
||||||
|
if len(idxrMsg.ExtraData) == 0 {
|
||||||
if minerID == "" {
|
|
||||||
log.Debugw("ignoring messsage missing miner id", "peer", originPeer)
|
log.Debugw("ignoring messsage missing miner id", "peer", originPeer)
|
||||||
return pubsub.ValidationIgnore
|
return pubsub.ValidationIgnore
|
||||||
}
|
}
|
||||||
|
|
||||||
|
minerID := string(idxrMsg.ExtraData)
|
||||||
|
msgCid := idxrMsg.Cid
|
||||||
|
|
||||||
var msgInfo *peerMsgInfo
|
var msgInfo *peerMsgInfo
|
||||||
val, ok := v.peerCache.Get(minerID)
|
val, ok := v.peerCache.Get(minerID)
|
||||||
if !ok {
|
if !ok {
|
||||||
@ -584,55 +587,6 @@ func (v *IndexerMessageValidator) rateLimitPeer(msgInfo *peerMsgInfo, msgCid cid
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func decodeIndexerMessage(data []byte) (string, cid.Cid, error) {
|
|
||||||
n, msgCid, err := cid.CidFromBytes(data)
|
|
||||||
if err != nil {
|
|
||||||
return "", cid.Undef, err
|
|
||||||
}
|
|
||||||
if n > len(data) {
|
|
||||||
return "", cid.Undef, xerrors.New("bad cid length encoding")
|
|
||||||
}
|
|
||||||
data = data[n:]
|
|
||||||
|
|
||||||
var minerID string
|
|
||||||
|
|
||||||
if len(data) != 0 {
|
|
||||||
addrCount, n, err := varint.FromUvarint(data)
|
|
||||||
if err != nil {
|
|
||||||
return "", cid.Undef, xerrors.Errorf("cannot read number of multiaddrs: %w", err)
|
|
||||||
}
|
|
||||||
if n > len(data) {
|
|
||||||
return "", cid.Undef, xerrors.New("bad multiaddr count encoding")
|
|
||||||
}
|
|
||||||
data = data[n:]
|
|
||||||
|
|
||||||
if addrCount != 0 {
|
|
||||||
// Read multiaddrs if there is any more data in message data. This allows
|
|
||||||
// backward-compatability with publishers that do not supply address data.
|
|
||||||
for i := 0; i < int(addrCount); i++ {
|
|
||||||
val, n, err := varint.FromUvarint(data)
|
|
||||||
if err != nil {
|
|
||||||
return "", cid.Undef, xerrors.Errorf("cannot read multiaddrs length: %w", err)
|
|
||||||
}
|
|
||||||
if n > len(data) {
|
|
||||||
return "", cid.Undef, xerrors.New("bad multiaddr length encoding")
|
|
||||||
}
|
|
||||||
data = data[n:]
|
|
||||||
|
|
||||||
if len(data) < int(val) {
|
|
||||||
return "", cid.Undef, xerrors.New("bad multiaddr encoding")
|
|
||||||
}
|
|
||||||
data = data[val:]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if len(data) != 0 {
|
|
||||||
minerID = string(data)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return minerID, msgCid, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (v *IndexerMessageValidator) authenticateMessage(ctx context.Context, minerID string, peerID peer.ID) error {
|
func (v *IndexerMessageValidator) authenticateMessage(ctx context.Context, minerID string, peerID peer.ID) error {
|
||||||
// Get miner info from lotus
|
// Get miner info from lotus
|
||||||
minerAddress, err := address.NewFromString(minerID)
|
minerAddress, err := address.NewFromString(minerID)
|
||||||
|
1
go.mod
1
go.mod
@ -39,6 +39,7 @@ require (
|
|||||||
github.com/filecoin-project/go-fil-markets v1.19.1-0.20220203152434-8790cca614d3
|
github.com/filecoin-project/go-fil-markets v1.19.1-0.20220203152434-8790cca614d3
|
||||||
github.com/filecoin-project/go-indexer-core v0.2.8
|
github.com/filecoin-project/go-indexer-core v0.2.8
|
||||||
github.com/filecoin-project/go-jsonrpc v0.1.5
|
github.com/filecoin-project/go-jsonrpc v0.1.5
|
||||||
|
github.com/filecoin-project/go-legs v0.3.0
|
||||||
github.com/filecoin-project/go-padreader v0.0.1
|
github.com/filecoin-project/go-padreader v0.0.1
|
||||||
github.com/filecoin-project/go-paramfetch v0.0.3-0.20220111000201-e42866db1a53
|
github.com/filecoin-project/go-paramfetch v0.0.3-0.20220111000201-e42866db1a53
|
||||||
github.com/filecoin-project/go-state-types v0.1.3
|
github.com/filecoin-project/go-state-types v0.1.3
|
||||||
|
3
go.sum
3
go.sum
@ -350,8 +350,9 @@ github.com/filecoin-project/go-indexer-core v0.2.8 h1:h1SRdZKTVcaXlzex3UevHh4OWD
|
|||||||
github.com/filecoin-project/go-indexer-core v0.2.8/go.mod h1:IagNfTdFuX4057kla43PjRCn3yBuUiZgIxuA0hTUamY=
|
github.com/filecoin-project/go-indexer-core v0.2.8/go.mod h1:IagNfTdFuX4057kla43PjRCn3yBuUiZgIxuA0hTUamY=
|
||||||
github.com/filecoin-project/go-jsonrpc v0.1.5 h1:ckxqZ09ivBAVf5CSmxxrqqNHC7PJm3GYGtYKiNQ+vGk=
|
github.com/filecoin-project/go-jsonrpc v0.1.5 h1:ckxqZ09ivBAVf5CSmxxrqqNHC7PJm3GYGtYKiNQ+vGk=
|
||||||
github.com/filecoin-project/go-jsonrpc v0.1.5/go.mod h1:XBBpuKIMaXIIzeqzO1iucq4GvbF8CxmXRFoezRh+Cx4=
|
github.com/filecoin-project/go-jsonrpc v0.1.5/go.mod h1:XBBpuKIMaXIIzeqzO1iucq4GvbF8CxmXRFoezRh+Cx4=
|
||||||
github.com/filecoin-project/go-legs v0.2.7 h1:+b1BQv4QKkRNsDUE8Z4sEhLXhfVQ+iGpHhANpYqxJlA=
|
|
||||||
github.com/filecoin-project/go-legs v0.2.7/go.mod h1:NrdELuDbtAH8/xqRMgyOYms67aliQajExInLS6g8zFM=
|
github.com/filecoin-project/go-legs v0.2.7/go.mod h1:NrdELuDbtAH8/xqRMgyOYms67aliQajExInLS6g8zFM=
|
||||||
|
github.com/filecoin-project/go-legs v0.3.0 h1:1rDNdPdXbgetmmvRcYZV5YIplIO8LtBVQ7ZttKCrTrM=
|
||||||
|
github.com/filecoin-project/go-legs v0.3.0/go.mod h1:x6nwM+DuN7NzlPndOoJuiHYCX+pze6+efPRx17nIA7M=
|
||||||
github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20/go.mod h1:mPn+LRRd5gEKNAtc+r3ScpW2JRU/pj4NBKdADYWHiak=
|
github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20/go.mod h1:mPn+LRRd5gEKNAtc+r3ScpW2JRU/pj4NBKdADYWHiak=
|
||||||
github.com/filecoin-project/go-padreader v0.0.1 h1:8h2tVy5HpoNbr2gBRr+WD6zV6VD6XHig+ynSGJg8ZOs=
|
github.com/filecoin-project/go-padreader v0.0.1 h1:8h2tVy5HpoNbr2gBRr+WD6zV6VD6XHig+ynSGJg8ZOs=
|
||||||
github.com/filecoin-project/go-padreader v0.0.1/go.mod h1:VYVPJqwpsfmtoHnAmPx6MUwmrK6HIcDqZJiuZhtmfLQ=
|
github.com/filecoin-project/go-padreader v0.0.1/go.mod h1:VYVPJqwpsfmtoHnAmPx6MUwmrK6HIcDqZJiuZhtmfLQ=
|
||||||
|
Loading…
Reference in New Issue
Block a user