From 9481fa0a4b5ed18575947fa413d5de936f6437f3 Mon Sep 17 00:00:00 2001 From: gammazero Date: Wed, 9 Feb 2022 16:40:27 -0800 Subject: [PATCH] Use new indexer pubsub message encoding --- chain/sub/incoming.go | 64 ++++++------------------------------------- go.mod | 1 + go.sum | 3 +- 3 files changed, 12 insertions(+), 56 deletions(-) diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index acec90c72..4277503fd 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -1,12 +1,14 @@ package sub import ( + "bytes" "context" "encoding/binary" "sync" "time" address "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-legs/dtsync" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain" "github.com/filecoin-project/lotus/chain/consensus" @@ -25,7 +27,6 @@ import ( connmgr "github.com/libp2p/go-libp2p-core/connmgr" "github.com/libp2p/go-libp2p-core/peer" pubsub "github.com/libp2p/go-libp2p-pubsub" - "github.com/multiformats/go-varint" "go.opencensus.io/stats" "go.opencensus.io/tag" "golang.org/x/xerrors" @@ -491,18 +492,20 @@ func (v *IndexerMessageValidator) Validate(ctx context.Context, pid peer.ID, msg return pubsub.ValidationReject } - // Decode CID and originator addresses from message. - minerID, msgCid, err := decodeIndexerMessage(msg.Data) + idxrMsg := dtsync.Message{} + err := idxrMsg.UnmarshalCBOR(bytes.NewBuffer(msg.Data)) if err != nil { - log.Errorw("Could not decode pubsub message", "err", err) + log.Errorw("Could not decode indexer pubsub message", "err", err) return pubsub.ValidationReject } - - if minerID == "" { + if len(idxrMsg.ExtraData) == 0 { log.Debugw("ignoring messsage missing miner id", "peer", originPeer) return pubsub.ValidationIgnore } + minerID := string(idxrMsg.ExtraData) + msgCid := idxrMsg.Cid + var msgInfo *peerMsgInfo val, ok := v.peerCache.Get(minerID) if !ok { @@ -584,55 +587,6 @@ func (v *IndexerMessageValidator) rateLimitPeer(msgInfo *peerMsgInfo, msgCid cid return false } -func decodeIndexerMessage(data []byte) (string, cid.Cid, error) { - n, msgCid, err := cid.CidFromBytes(data) - if err != nil { - return "", cid.Undef, err - } - if n > len(data) { - return "", cid.Undef, xerrors.New("bad cid length encoding") - } - data = data[n:] - - var minerID string - - if len(data) != 0 { - addrCount, n, err := varint.FromUvarint(data) - if err != nil { - return "", cid.Undef, xerrors.Errorf("cannot read number of multiaddrs: %w", err) - } - if n > len(data) { - return "", cid.Undef, xerrors.New("bad multiaddr count encoding") - } - data = data[n:] - - if addrCount != 0 { - // Read multiaddrs if there is any more data in message data. This allows - // backward-compatability with publishers that do not supply address data. - for i := 0; i < int(addrCount); i++ { - val, n, err := varint.FromUvarint(data) - if err != nil { - return "", cid.Undef, xerrors.Errorf("cannot read multiaddrs length: %w", err) - } - if n > len(data) { - return "", cid.Undef, xerrors.New("bad multiaddr length encoding") - } - data = data[n:] - - if len(data) < int(val) { - return "", cid.Undef, xerrors.New("bad multiaddr encoding") - } - data = data[val:] - } - } - if len(data) != 0 { - minerID = string(data) - } - } - - return minerID, msgCid, nil -} - func (v *IndexerMessageValidator) authenticateMessage(ctx context.Context, minerID string, peerID peer.ID) error { // Get miner info from lotus minerAddress, err := address.NewFromString(minerID) diff --git a/go.mod b/go.mod index 2d3e7d232..23ab5a0c2 100644 --- a/go.mod +++ b/go.mod @@ -39,6 +39,7 @@ require ( github.com/filecoin-project/go-fil-markets v1.19.1-0.20220203152434-8790cca614d3 github.com/filecoin-project/go-indexer-core v0.2.8 github.com/filecoin-project/go-jsonrpc v0.1.5 + github.com/filecoin-project/go-legs v0.3.0 github.com/filecoin-project/go-padreader v0.0.1 github.com/filecoin-project/go-paramfetch v0.0.3-0.20220111000201-e42866db1a53 github.com/filecoin-project/go-state-types v0.1.3 diff --git a/go.sum b/go.sum index b2ebceb1c..22cf52fe8 100644 --- a/go.sum +++ b/go.sum @@ -350,8 +350,9 @@ github.com/filecoin-project/go-indexer-core v0.2.8 h1:h1SRdZKTVcaXlzex3UevHh4OWD github.com/filecoin-project/go-indexer-core v0.2.8/go.mod h1:IagNfTdFuX4057kla43PjRCn3yBuUiZgIxuA0hTUamY= github.com/filecoin-project/go-jsonrpc v0.1.5 h1:ckxqZ09ivBAVf5CSmxxrqqNHC7PJm3GYGtYKiNQ+vGk= github.com/filecoin-project/go-jsonrpc v0.1.5/go.mod h1:XBBpuKIMaXIIzeqzO1iucq4GvbF8CxmXRFoezRh+Cx4= -github.com/filecoin-project/go-legs v0.2.7 h1:+b1BQv4QKkRNsDUE8Z4sEhLXhfVQ+iGpHhANpYqxJlA= github.com/filecoin-project/go-legs v0.2.7/go.mod h1:NrdELuDbtAH8/xqRMgyOYms67aliQajExInLS6g8zFM= +github.com/filecoin-project/go-legs v0.3.0 h1:1rDNdPdXbgetmmvRcYZV5YIplIO8LtBVQ7ZttKCrTrM= +github.com/filecoin-project/go-legs v0.3.0/go.mod h1:x6nwM+DuN7NzlPndOoJuiHYCX+pze6+efPRx17nIA7M= github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20/go.mod h1:mPn+LRRd5gEKNAtc+r3ScpW2JRU/pj4NBKdADYWHiak= github.com/filecoin-project/go-padreader v0.0.1 h1:8h2tVy5HpoNbr2gBRr+WD6zV6VD6XHig+ynSGJg8ZOs= github.com/filecoin-project/go-padreader v0.0.1/go.mod h1:VYVPJqwpsfmtoHnAmPx6MUwmrK6HIcDqZJiuZhtmfLQ=