From 1dc6a2fea6ce5726dd6a3954c8e7972e956448f6 Mon Sep 17 00:00:00 2001 From: gammazero Date: Tue, 8 Feb 2022 02:53:25 -0800 Subject: [PATCH 01/11] Add indexer pubsub message authentication and rate limiting --- chain/sub/incoming.go | 199 +++++++++++++++++++++++++++-- chain/sub/ratelimit/queue.go | 89 +++++++++++++ chain/sub/ratelimit/window.go | 70 ++++++++++ chain/sub/ratelimit/window_test.go | 61 +++++++++ node/modules/services.go | 5 +- 5 files changed, 413 insertions(+), 11 deletions(-) create mode 100644 chain/sub/ratelimit/queue.go create mode 100644 chain/sub/ratelimit/window.go create mode 100644 chain/sub/ratelimit/window_test.go diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index dd18f25d0..c0ac42a59 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -2,15 +2,17 @@ package sub import ( "context" - "fmt" + "sync" "time" address "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain" "github.com/filecoin-project/lotus/chain/consensus" "github.com/filecoin-project/lotus/chain/messagepool" "github.com/filecoin-project/lotus/chain/store" + "github.com/filecoin-project/lotus/chain/sub/ratelimit" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/node/impl/client" @@ -22,6 +24,7 @@ import ( connmgr "github.com/libp2p/go-libp2p-core/connmgr" "github.com/libp2p/go-libp2p-core/peer" pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/multiformats/go-varint" "go.opencensus.io/stats" "go.opencensus.io/tag" "golang.org/x/xerrors" @@ -168,12 +171,12 @@ func fetchCids( cidIndex := make(map[cid.Cid]int) for i, c := range cids { if c.Prefix() != msgCidPrefix { - return fmt.Errorf("invalid msg CID: %s", c) + return xerrors.Errorf("invalid msg CID: %s", c) } cidIndex[c] = i } if len(cids) != len(cidIndex) { - return fmt.Errorf("duplicate CIDs in fetchCids input") + return xerrors.Errorf("duplicate CIDs in fetchCids input") } for block := range bserv.GetBlocks(ctx, cids) { @@ -196,7 +199,7 @@ func fetchCids( if len(cidIndex) > 0 { err := ctx.Err() if err == nil { - err = fmt.Errorf("failed to fetch %d messages for unknown reasons", len(cidIndex)) + err = xerrors.Errorf("failed to fetch %d messages for unknown reasons", len(cidIndex)) } return err } @@ -445,23 +448,199 @@ func recordFailure(ctx context.Context, metric *stats.Int64Measure, failureType stats.Record(ctx, metric.M(1)) } -type IndexerMessageValidator struct { - self peer.ID +type peerMsgInfo struct { + peerID peer.ID + lastCid cid.Cid + rateLimit *ratelimit.Window + mutex sync.Mutex } -func NewIndexerMessageValidator(self peer.ID) *IndexerMessageValidator { - return &IndexerMessageValidator{self: self} +type IndexerMessageValidator struct { + self peer.ID + + peerCache *lru.TwoQueueCache + fullNode api.FullNode +} + +func NewIndexerMessageValidator(self peer.ID, fullNode api.FullNode) *IndexerMessageValidator { + peerCache, _ := lru.New2Q(1024) + + return &IndexerMessageValidator{ + self: self, + peerCache: peerCache, + fullNode: fullNode, + } } func (v *IndexerMessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult { // This chain-node should not be publishing its own messages. These are - // relayed from miner-nodes or index publishers. If a node appears to be - // local, reject it. + // relayed from market-nodes. If a node appears to be local, reject it. if pid == v.self { log.Warnf("refusing to relay indexer message from self") stats.Record(ctx, metrics.IndexerMessageValidationFailure.M(1)) return pubsub.ValidationReject } + originPeer := msg.GetFrom() + if originPeer == v.self { + log.Warnf("refusing to relay indexer message originating from self") + stats.Record(ctx, metrics.IndexerMessageValidationFailure.M(1)) + return pubsub.ValidationReject + } + + // Decode CID and originator addresses from message. + minerID, msgCid, err := decodeIndexerMessage(msg.Data) + if err != nil { + log.Errorw("Could not decode pubsub message", "err", err) + return pubsub.ValidationReject + } + + if minerID == "" { + log.Warnw("ignoring messsage missing miner id", "peer", originPeer) + return pubsub.ValidationIgnore + } + + var msgInfo *peerMsgInfo + val, ok := v.peerCache.Get(minerID) + if !ok { + msgInfo = &peerMsgInfo{} + } else { + msgInfo = val.(*peerMsgInfo) + } + + // Lock this peer's message info. + msgInfo.mutex.Lock() + defer msgInfo.mutex.Unlock() + + if !ok || originPeer != msgInfo.peerID { + // Check that the message was signed by an authenticated peer. + err = v.authenticateMessage(ctx, minerID, originPeer) + if err != nil { + log.Warnw("cannot authenticate messsage", "err", err, "peer", originPeer, "minerID", minerID) + stats.Record(ctx, metrics.IndexerMessageValidationFailure.M(1)) + return pubsub.ValidationReject + } + msgInfo.peerID = originPeer + if !ok { + // Add msgInfo to cache only after being authenticated. If two + // messages from the same peer are handled concurrently, there is a + // small chance that one msgInfo could replace the other here when + // the info is first cached. This is OK, so no need to prevent it. + v.peerCache.Add(minerID, msgInfo) + } + } + + // See if message needs to be ignored due to rate limiting. + if v.rateLimitPeer(msgInfo, msgCid) { + return pubsub.ValidationIgnore + } + stats.Record(ctx, metrics.IndexerMessageValidationSuccess.M(1)) return pubsub.ValidationAccept } + +func (v *IndexerMessageValidator) rateLimitPeer(msgInfo *peerMsgInfo, msgCid cid.Cid) bool { + const ( + msgLimit = 5 + msgTimeLimit = 10 * time.Second + repeatTimeLimit = 2 * time.Hour + ) + + timeWindow := msgInfo.rateLimit + + // Check overall message rate. + if timeWindow == nil { + timeWindow = ratelimit.NewWindow(msgLimit, msgTimeLimit) + msgInfo.rateLimit = timeWindow + } else if msgInfo.lastCid == msgCid { + // Check if this is a repeat of the previous message data. + if time.Since(timeWindow.Newest()) < repeatTimeLimit { + log.Warnw("ignoring repeated indexer message", "sender", msgInfo.peerID) + return true + } + } + + err := timeWindow.Add() + if err != nil { + log.Warnw("ignoring indexer message", "sender", msgInfo.peerID, "err", err) + return true + } + + msgInfo.lastCid = msgCid + + return false +} + +func decodeIndexerMessage(data []byte) (string, cid.Cid, error) { + n, msgCid, err := cid.CidFromBytes(data) + if err != nil { + return "", cid.Undef, err + } + if n > len(data) { + return "", cid.Undef, xerrors.New("bad cid length encoding") + } + data = data[n:] + + var minerID string + + if len(data) != 0 { + addrCount, n, err := varint.FromUvarint(data) + if err != nil { + return "", cid.Undef, xerrors.Errorf("cannot read number of multiaddrs: %w", err) + } + if n > len(data) { + return "", cid.Undef, xerrors.New("bad multiaddr count encoding") + } + data = data[n:] + + if addrCount != 0 { + // Read multiaddrs if there is any more data in message data. This allows + // backward-compatability with publishers that do not supply address data. + for i := 0; i < int(addrCount); i++ { + val, n, err := varint.FromUvarint(data) + if err != nil { + return "", cid.Undef, xerrors.Errorf("cannot read multiaddrs length: %w", err) + } + if n > len(data) { + return "", cid.Undef, xerrors.New("bad multiaddr length encoding") + } + data = data[n:] + + if len(data) < int(val) { + return "", cid.Undef, xerrors.New("bad multiaddr encoding") + } + data = data[val:] + } + } + if len(data) != 0 { + minerID = string(data) + } + } + + return minerID, msgCid, nil +} + +func (v *IndexerMessageValidator) authenticateMessage(ctx context.Context, minerID string, peerID peer.ID) error { + // Get miner info from lotus + minerAddress, err := address.NewFromString(minerID) + if err != nil { + return xerrors.Errorf("invalid miner id: %w", err) + } + + ts, err := v.fullNode.ChainHead(ctx) + if err != nil { + return err + } + + minerInfo, err := v.fullNode.StateMinerInfo(ctx, minerAddress, ts.Key()) + if err != nil { + return err + } + + if minerInfo.PeerId == nil { + return xerrors.New("no peer id for miner") + } + if *minerInfo.PeerId != peerID { + return xerrors.New("message not signed by peer in miner info") + } + return nil +} diff --git a/chain/sub/ratelimit/queue.go b/chain/sub/ratelimit/queue.go new file mode 100644 index 000000000..d5ed001fd --- /dev/null +++ b/chain/sub/ratelimit/queue.go @@ -0,0 +1,89 @@ +package ratelimit + +import "errors" + +var ErrRate = errors.New("rate exceeded") + +type queue struct { + buf []int64 + count int + head int + tail int +} + +// cap returns the queue capacity +func (q *queue) cap() int { + return len(q.buf) +} + +// len returns the number of items in the queue +func (q *queue) len() int { + return q.count +} + +// push adds an element to the end of the queue. +func (q *queue) push(elem int64) error { + if q.count == len(q.buf) { + return ErrRate + } + + q.buf[q.tail] = elem + // Calculate new tail position. + q.tail = q.next(q.tail) + q.count++ + return nil +} + +// pop removes and returns the element from the front of the queue. +func (q *queue) pop() int64 { + if q.count <= 0 { + panic("pop from empty queue") + } + ret := q.buf[q.head] + + // Calculate new head position. + q.head = q.next(q.head) + q.count-- + + return ret +} + +// front returns the element at the front of the queue. This is the element +// that would be returned by pop(). This call panics if the queue is empty. +func (q *queue) front() int64 { + if q.count <= 0 { + panic("front() called when empty") + } + return q.buf[q.head] +} + +// back returns the element at the back of the queue. This call panics if the +// queue is empty. +func (q *queue) back() int64 { + if q.count <= 0 { + panic("back() called when empty") + } + return q.buf[q.prev(q.tail)] +} + +// prev returns the previous buffer position wrapping around buffer. +func (q *queue) prev(i int) int { + if i == 0 { + return len(q.buf) - 1 + } + return (i - 1) % len(q.buf) +} + +// next returns the next buffer position wrapping around buffer. +func (q *queue) next(i int) int { + return (i + 1) % len(q.buf) +} + +// truncate pops values that are less than or equal the specified threshold. +func (q *queue) truncate(threshold int64) { + for q.count != 0 && q.buf[q.head] <= threshold { + // pop() without returning a value + q.head = q.next(q.head) + q.count-- + } +} diff --git a/chain/sub/ratelimit/window.go b/chain/sub/ratelimit/window.go new file mode 100644 index 000000000..f0429d3d3 --- /dev/null +++ b/chain/sub/ratelimit/window.go @@ -0,0 +1,70 @@ +package ratelimit + +import "time" + +// Window is a time windows for counting events within a span of time. The +// windows slides forward in time so that it spans from the most recent event +// to size time in the past. +type Window struct { + q *queue + size int64 +} + +// NewWindow creates a new Window that limits the number of events to maximum +// count of events withing a duration of time. The capacity sets the maximum +// number of events, and size sets the span of time over which the events are +// counted. +func NewWindow(capacity int, size time.Duration) *Window { + return &Window{ + q: &queue{ + buf: make([]int64, capacity), + }, + size: int64(size), + } +} + +// Add attempts to append a new timestamp into the current window. Previously +// added values that are not not within `size` difference from the value being +// added are first removed. Add fails if adding the value would cause the +// window to exceed capacity. +func (w *Window) Add() error { + now := time.Now().UnixNano() + if w.Len() != 0 { + w.q.truncate(now - w.size) + } + return w.q.push(now) +} + +// Cap returns the maximum number of items the window can hold. +func (w *Window) Cap() int { + return w.q.cap() +} + +// Len returns the number of elements currently in the window. +func (w *Window) Len() int { + return w.q.len() +} + +// Span returns the distance from the first to the last item in the window. +func (w *Window) Span() time.Duration { + if w.q.len() < 2 { + return 0 + } + return time.Duration(w.q.back() - w.q.front()) +} + +// Oldest returns the oldest timestamp in the window. +func (w *Window) Oldest() time.Time { + if w.q.len() == 0 { + return time.Time{} + } + return time.Unix(0, w.q.front()) +} + +// Newest returns the newest timestamp in the window. +func (w *Window) Newest() time.Time { + if w.q.len() == 0 { + return time.Time{} + } + return time.Unix(0, w.q.back()) +} diff --git a/chain/sub/ratelimit/window_test.go b/chain/sub/ratelimit/window_test.go new file mode 100644 index 000000000..f656d6a0b --- /dev/null +++ b/chain/sub/ratelimit/window_test.go @@ -0,0 +1,61 @@ +package ratelimit + +import ( + "testing" + "time" +) + +func TestWindow(t *testing.T) { + const ( + maxEvents = 3 + timeLimit = 100 * time.Millisecond + ) + w := NewWindow(maxEvents, timeLimit) + if w.Len() != 0 { + t.Fatal("q.Len() =", w.Len(), "expect 0") + } + if w.Cap() != maxEvents { + t.Fatal("q.Cap() =", w.Cap(), "expect 3") + } + if !w.Newest().IsZero() { + t.Fatal("expected newest to be zero time with empty window") + } + if !w.Oldest().IsZero() { + t.Fatal("expected oldest to be zero time with empty window") + } + if w.Span() != 0 { + t.Fatal("expected span to be zero time with empty window") + } + + var err error + for i := 0; i < maxEvents; i++ { + err = w.Add() + if err != nil { + t.Fatalf("cannot add event %d", i) + } + } + if w.Len() != maxEvents { + t.Fatalf("q.Len() is %d, expected %d", w.Len(), maxEvents) + } + if err = w.Add(); err == nil { + t.Fatalf("add event %d within time limit should have failed", maxEvents+1) + } + + time.Sleep(timeLimit) + if err = w.Add(); err != nil { + t.Fatalf("cannot add event after time limit: %s", err) + } + + prev := w.Newest() + time.Sleep(timeLimit) + err = w.Add() + if err != nil { + t.Fatalf("cannot add event") + } + if w.Newest().Before(prev) { + t.Fatal("newest is before previous value") + } + if w.Oldest().Before(prev) { + t.Fatal("oldest is before previous value") + } +} diff --git a/node/modules/services.go b/node/modules/services.go index d0095075d..320a46d94 100644 --- a/node/modules/services.go +++ b/node/modules/services.go @@ -20,6 +20,7 @@ import ( "github.com/filecoin-project/go-fil-markets/discovery" discoveryimpl "github.com/filecoin-project/go-fil-markets/discovery/impl" + "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain" "github.com/filecoin-project/lotus/chain/beacon" @@ -201,7 +202,9 @@ func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub func RelayIndexerMessages(lc fx.Lifecycle, ps *pubsub.PubSub, nn dtypes.NetworkName, h host.Host) error { topicName := build.IndexerIngestTopic(nn) - v := sub.NewIndexerMessageValidator(h.ID()) + // TODO: How do this get set? + var fullNode api.FullNode + v := sub.NewIndexerMessageValidator(h.ID(), fullNode) if err := ps.RegisterTopicValidator(topicName, v.Validate); err != nil { return xerrors.Errorf("failed to register validator for topic %s, err: %w", topicName, err) From b2805823ce70fd5d7a194a00bdf975b0ab5ab892 Mon Sep 17 00:00:00 2001 From: gammazero Date: Tue, 8 Feb 2022 04:55:59 -0800 Subject: [PATCH 02/11] Pass to validator the interfaces needed to get miner info --- chain/sub/incoming.go | 20 ++++++++++++-------- node/modules/services.go | 8 +++----- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index c0ac42a59..e39d0177f 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -6,7 +6,7 @@ import ( "time" address "github.com/filecoin-project/go-address" - "github.com/filecoin-project/lotus/api" + //"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain" "github.com/filecoin-project/lotus/chain/consensus" @@ -16,6 +16,7 @@ import ( "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/node/impl/client" + "github.com/filecoin-project/lotus/node/impl/full" lru "github.com/hashicorp/golang-lru" blocks "github.com/ipfs/go-block-format" bserv "github.com/ipfs/go-blockservice" @@ -459,16 +460,18 @@ type IndexerMessageValidator struct { self peer.ID peerCache *lru.TwoQueueCache - fullNode api.FullNode + chainApi full.ChainModuleAPI + stateApi full.StateModuleAPI } -func NewIndexerMessageValidator(self peer.ID, fullNode api.FullNode) *IndexerMessageValidator { +func NewIndexerMessageValidator(self peer.ID, chainApi full.ChainModuleAPI, stateApi full.StateModuleAPI) *IndexerMessageValidator { peerCache, _ := lru.New2Q(1024) return &IndexerMessageValidator{ self: self, peerCache: peerCache, - fullNode: fullNode, + chainApi: chainApi, + stateApi: stateApi, } } @@ -512,7 +515,7 @@ func (v *IndexerMessageValidator) Validate(ctx context.Context, pid peer.ID, msg defer msgInfo.mutex.Unlock() if !ok || originPeer != msgInfo.peerID { - // Check that the message was signed by an authenticated peer. + // Check that the miner ID maps to the peer that sent the message. err = v.authenticateMessage(ctx, minerID, originPeer) if err != nil { log.Warnw("cannot authenticate messsage", "err", err, "peer", originPeer, "minerID", minerID) @@ -626,12 +629,12 @@ func (v *IndexerMessageValidator) authenticateMessage(ctx context.Context, miner return xerrors.Errorf("invalid miner id: %w", err) } - ts, err := v.fullNode.ChainHead(ctx) + ts, err := v.chainApi.ChainHead(ctx) if err != nil { return err } - minerInfo, err := v.fullNode.StateMinerInfo(ctx, minerAddress, ts.Key()) + minerInfo, err := v.stateApi.StateMinerInfo(ctx, minerAddress, ts.Key()) if err != nil { return err } @@ -640,7 +643,8 @@ func (v *IndexerMessageValidator) authenticateMessage(ctx context.Context, miner return xerrors.New("no peer id for miner") } if *minerInfo.PeerId != peerID { - return xerrors.New("message not signed by peer in miner info") + return xerrors.New("miner id does not map to peer that sent message") } + return nil } diff --git a/node/modules/services.go b/node/modules/services.go index 320a46d94..95a2399d2 100644 --- a/node/modules/services.go +++ b/node/modules/services.go @@ -20,7 +20,6 @@ import ( "github.com/filecoin-project/go-fil-markets/discovery" discoveryimpl "github.com/filecoin-project/go-fil-markets/discovery/impl" - "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain" "github.com/filecoin-project/lotus/chain/beacon" @@ -36,6 +35,7 @@ import ( "github.com/filecoin-project/lotus/lib/peermgr" marketevents "github.com/filecoin-project/lotus/markets/loggers" "github.com/filecoin-project/lotus/node/hello" + "github.com/filecoin-project/lotus/node/impl/full" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/helpers" "github.com/filecoin-project/lotus/node/repo" @@ -199,12 +199,10 @@ func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub waitForSync(stmgr, pubsubMsgsSyncEpochs, subscribe) } -func RelayIndexerMessages(lc fx.Lifecycle, ps *pubsub.PubSub, nn dtypes.NetworkName, h host.Host) error { +func RelayIndexerMessages(lc fx.Lifecycle, ps *pubsub.PubSub, nn dtypes.NetworkName, h host.Host, chainModule full.ChainModule, stateModule full.StateModule) error { topicName := build.IndexerIngestTopic(nn) - // TODO: How do this get set? - var fullNode api.FullNode - v := sub.NewIndexerMessageValidator(h.ID(), fullNode) + v := sub.NewIndexerMessageValidator(h.ID(), &chainModule, &stateModule) if err := ps.RegisterTopicValidator(topicName, v.Validate); err != nil { return xerrors.Errorf("failed to register validator for topic %s, err: %w", topicName, err) From a62e0270025a17facc64ce416c5901e0a298ab9c Mon Sep 17 00:00:00 2001 From: gammazero Date: Wed, 9 Feb 2022 10:29:49 -0800 Subject: [PATCH 03/11] review changes --- chain/sub/incoming.go | 5 ++--- chain/sub/ratelimit/queue.go | 4 ++-- chain/sub/ratelimit/window_test.go | 4 ++-- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index e39d0177f..222ad3dea 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -6,7 +6,6 @@ import ( "time" address "github.com/filecoin-project/go-address" - //"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain" "github.com/filecoin-project/lotus/chain/consensus" @@ -465,7 +464,7 @@ type IndexerMessageValidator struct { } func NewIndexerMessageValidator(self peer.ID, chainApi full.ChainModuleAPI, stateApi full.StateModuleAPI) *IndexerMessageValidator { - peerCache, _ := lru.New2Q(1024) + peerCache, _ := lru.New2Q(8192) return &IndexerMessageValidator{ self: self, @@ -498,7 +497,7 @@ func (v *IndexerMessageValidator) Validate(ctx context.Context, pid peer.ID, msg } if minerID == "" { - log.Warnw("ignoring messsage missing miner id", "peer", originPeer) + log.Debugw("ignoring messsage missing miner id", "peer", originPeer) return pubsub.ValidationIgnore } diff --git a/chain/sub/ratelimit/queue.go b/chain/sub/ratelimit/queue.go index d5ed001fd..49f9bef66 100644 --- a/chain/sub/ratelimit/queue.go +++ b/chain/sub/ratelimit/queue.go @@ -2,7 +2,7 @@ package ratelimit import "errors" -var ErrRate = errors.New("rate exceeded") +var ErrRateLimitExceeded = errors.New("rate limit exceeded") type queue struct { buf []int64 @@ -24,7 +24,7 @@ func (q *queue) len() int { // push adds an element to the end of the queue. func (q *queue) push(elem int64) error { if q.count == len(q.buf) { - return ErrRate + return ErrRateLimitExceeded } q.buf[q.tail] = elem diff --git a/chain/sub/ratelimit/window_test.go b/chain/sub/ratelimit/window_test.go index f656d6a0b..c86b65ef7 100644 --- a/chain/sub/ratelimit/window_test.go +++ b/chain/sub/ratelimit/window_test.go @@ -37,8 +37,8 @@ func TestWindow(t *testing.T) { if w.Len() != maxEvents { t.Fatalf("q.Len() is %d, expected %d", w.Len(), maxEvents) } - if err = w.Add(); err == nil { - t.Fatalf("add event %d within time limit should have failed", maxEvents+1) + if err = w.Add(); err != ErrRateLimitExceeded { + t.Fatalf("add event %d within time limit should have failed with err: %s", maxEvents+1, ErrRateLimitExceeded) } time.Sleep(timeLimit) From 3ff209d95d76def5fc648b8d08ab4366c925bb46 Mon Sep 17 00:00:00 2001 From: gammazero Date: Wed, 9 Feb 2022 11:06:56 -0800 Subject: [PATCH 04/11] Add replay rejection --- chain/sub/incoming.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index 222ad3dea..f8578bee8 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -1,6 +1,7 @@ package sub import ( + "bytes" "context" "sync" "time" @@ -451,6 +452,7 @@ func recordFailure(ctx context.Context, metric *stats.Int64Measure, failureType type peerMsgInfo struct { peerID peer.ID lastCid cid.Cid + lastSeqno []byte rateLimit *ratelimit.Window mutex sync.Mutex } @@ -513,6 +515,17 @@ func (v *IndexerMessageValidator) Validate(ctx context.Context, pid peer.ID, msg msgInfo.mutex.Lock() defer msgInfo.mutex.Unlock() + if ok { + // Reject replayed messages. + seqno := msg.Message.GetSeqno() + if bytes.Equal(msgInfo.lastSeqno, seqno) { + log.Warnf("rejecting replayed indexer message") + stats.Record(ctx, metrics.IndexerMessageValidationFailure.M(1)) + return pubsub.ValidationReject + } + msgInfo.lastSeqno = seqno + } + if !ok || originPeer != msgInfo.peerID { // Check that the miner ID maps to the peer that sent the message. err = v.authenticateMessage(ctx, minerID, originPeer) From 681ce94a34f71a970e8569b66bab0e637b1eafe7 Mon Sep 17 00:00:00 2001 From: gammazero Date: Wed, 9 Feb 2022 16:21:05 -0800 Subject: [PATCH 05/11] Correctly handle seqno check --- chain/sub/incoming.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index f8578bee8..acec90c72 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -1,8 +1,8 @@ package sub import ( - "bytes" "context" + "encoding/binary" "sync" "time" @@ -452,7 +452,7 @@ func recordFailure(ctx context.Context, metric *stats.Int64Measure, failureType type peerMsgInfo struct { peerID peer.ID lastCid cid.Cid - lastSeqno []byte + lastSeqno uint64 rateLimit *ratelimit.Window mutex sync.Mutex } @@ -517,11 +517,10 @@ func (v *IndexerMessageValidator) Validate(ctx context.Context, pid peer.ID, msg if ok { // Reject replayed messages. - seqno := msg.Message.GetSeqno() - if bytes.Equal(msgInfo.lastSeqno, seqno) { - log.Warnf("rejecting replayed indexer message") - stats.Record(ctx, metrics.IndexerMessageValidationFailure.M(1)) - return pubsub.ValidationReject + seqno := binary.BigEndian.Uint64(msg.Message.GetSeqno()) + if seqno <= msgInfo.lastSeqno { + log.Debugf("ignoring replayed indexer message") + return pubsub.ValidationIgnore } msgInfo.lastSeqno = seqno } From 9481fa0a4b5ed18575947fa413d5de936f6437f3 Mon Sep 17 00:00:00 2001 From: gammazero Date: Wed, 9 Feb 2022 16:40:27 -0800 Subject: [PATCH 06/11] Use new indexer pubsub message encoding --- chain/sub/incoming.go | 64 ++++++------------------------------------- go.mod | 1 + go.sum | 3 +- 3 files changed, 12 insertions(+), 56 deletions(-) diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index acec90c72..4277503fd 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -1,12 +1,14 @@ package sub import ( + "bytes" "context" "encoding/binary" "sync" "time" address "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-legs/dtsync" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain" "github.com/filecoin-project/lotus/chain/consensus" @@ -25,7 +27,6 @@ import ( connmgr "github.com/libp2p/go-libp2p-core/connmgr" "github.com/libp2p/go-libp2p-core/peer" pubsub "github.com/libp2p/go-libp2p-pubsub" - "github.com/multiformats/go-varint" "go.opencensus.io/stats" "go.opencensus.io/tag" "golang.org/x/xerrors" @@ -491,18 +492,20 @@ func (v *IndexerMessageValidator) Validate(ctx context.Context, pid peer.ID, msg return pubsub.ValidationReject } - // Decode CID and originator addresses from message. - minerID, msgCid, err := decodeIndexerMessage(msg.Data) + idxrMsg := dtsync.Message{} + err := idxrMsg.UnmarshalCBOR(bytes.NewBuffer(msg.Data)) if err != nil { - log.Errorw("Could not decode pubsub message", "err", err) + log.Errorw("Could not decode indexer pubsub message", "err", err) return pubsub.ValidationReject } - - if minerID == "" { + if len(idxrMsg.ExtraData) == 0 { log.Debugw("ignoring messsage missing miner id", "peer", originPeer) return pubsub.ValidationIgnore } + minerID := string(idxrMsg.ExtraData) + msgCid := idxrMsg.Cid + var msgInfo *peerMsgInfo val, ok := v.peerCache.Get(minerID) if !ok { @@ -584,55 +587,6 @@ func (v *IndexerMessageValidator) rateLimitPeer(msgInfo *peerMsgInfo, msgCid cid return false } -func decodeIndexerMessage(data []byte) (string, cid.Cid, error) { - n, msgCid, err := cid.CidFromBytes(data) - if err != nil { - return "", cid.Undef, err - } - if n > len(data) { - return "", cid.Undef, xerrors.New("bad cid length encoding") - } - data = data[n:] - - var minerID string - - if len(data) != 0 { - addrCount, n, err := varint.FromUvarint(data) - if err != nil { - return "", cid.Undef, xerrors.Errorf("cannot read number of multiaddrs: %w", err) - } - if n > len(data) { - return "", cid.Undef, xerrors.New("bad multiaddr count encoding") - } - data = data[n:] - - if addrCount != 0 { - // Read multiaddrs if there is any more data in message data. This allows - // backward-compatability with publishers that do not supply address data. - for i := 0; i < int(addrCount); i++ { - val, n, err := varint.FromUvarint(data) - if err != nil { - return "", cid.Undef, xerrors.Errorf("cannot read multiaddrs length: %w", err) - } - if n > len(data) { - return "", cid.Undef, xerrors.New("bad multiaddr length encoding") - } - data = data[n:] - - if len(data) < int(val) { - return "", cid.Undef, xerrors.New("bad multiaddr encoding") - } - data = data[val:] - } - } - if len(data) != 0 { - minerID = string(data) - } - } - - return minerID, msgCid, nil -} - func (v *IndexerMessageValidator) authenticateMessage(ctx context.Context, minerID string, peerID peer.ID) error { // Get miner info from lotus minerAddress, err := address.NewFromString(minerID) diff --git a/go.mod b/go.mod index 2d3e7d232..23ab5a0c2 100644 --- a/go.mod +++ b/go.mod @@ -39,6 +39,7 @@ require ( github.com/filecoin-project/go-fil-markets v1.19.1-0.20220203152434-8790cca614d3 github.com/filecoin-project/go-indexer-core v0.2.8 github.com/filecoin-project/go-jsonrpc v0.1.5 + github.com/filecoin-project/go-legs v0.3.0 github.com/filecoin-project/go-padreader v0.0.1 github.com/filecoin-project/go-paramfetch v0.0.3-0.20220111000201-e42866db1a53 github.com/filecoin-project/go-state-types v0.1.3 diff --git a/go.sum b/go.sum index b2ebceb1c..22cf52fe8 100644 --- a/go.sum +++ b/go.sum @@ -350,8 +350,9 @@ github.com/filecoin-project/go-indexer-core v0.2.8 h1:h1SRdZKTVcaXlzex3UevHh4OWD github.com/filecoin-project/go-indexer-core v0.2.8/go.mod h1:IagNfTdFuX4057kla43PjRCn3yBuUiZgIxuA0hTUamY= github.com/filecoin-project/go-jsonrpc v0.1.5 h1:ckxqZ09ivBAVf5CSmxxrqqNHC7PJm3GYGtYKiNQ+vGk= github.com/filecoin-project/go-jsonrpc v0.1.5/go.mod h1:XBBpuKIMaXIIzeqzO1iucq4GvbF8CxmXRFoezRh+Cx4= -github.com/filecoin-project/go-legs v0.2.7 h1:+b1BQv4QKkRNsDUE8Z4sEhLXhfVQ+iGpHhANpYqxJlA= github.com/filecoin-project/go-legs v0.2.7/go.mod h1:NrdELuDbtAH8/xqRMgyOYms67aliQajExInLS6g8zFM= +github.com/filecoin-project/go-legs v0.3.0 h1:1rDNdPdXbgetmmvRcYZV5YIplIO8LtBVQ7ZttKCrTrM= +github.com/filecoin-project/go-legs v0.3.0/go.mod h1:x6nwM+DuN7NzlPndOoJuiHYCX+pze6+efPRx17nIA7M= github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20/go.mod h1:mPn+LRRd5gEKNAtc+r3ScpW2JRU/pj4NBKdADYWHiak= github.com/filecoin-project/go-padreader v0.0.1 h1:8h2tVy5HpoNbr2gBRr+WD6zV6VD6XHig+ynSGJg8ZOs= github.com/filecoin-project/go-padreader v0.0.1/go.mod h1:VYVPJqwpsfmtoHnAmPx6MUwmrK6HIcDqZJiuZhtmfLQ= From 3f3d61b0434a050f3e4ef98fd341eb1ff3767cff Mon Sep 17 00:00:00 2001 From: gammazero Date: Thu, 10 Feb 2022 08:41:18 -0800 Subject: [PATCH 07/11] Service creation takes interface, not implementation --- node/modules/services.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node/modules/services.go b/node/modules/services.go index 95a2399d2..6c9a23d49 100644 --- a/node/modules/services.go +++ b/node/modules/services.go @@ -199,10 +199,10 @@ func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub waitForSync(stmgr, pubsubMsgsSyncEpochs, subscribe) } -func RelayIndexerMessages(lc fx.Lifecycle, ps *pubsub.PubSub, nn dtypes.NetworkName, h host.Host, chainModule full.ChainModule, stateModule full.StateModule) error { +func RelayIndexerMessages(lc fx.Lifecycle, ps *pubsub.PubSub, nn dtypes.NetworkName, h host.Host, chainModule full.ChainModuleAPI, stateModule full.StateModuleAPI) error { topicName := build.IndexerIngestTopic(nn) - v := sub.NewIndexerMessageValidator(h.ID(), &chainModule, &stateModule) + v := sub.NewIndexerMessageValidator(h.ID(), chainModule, stateModule) if err := ps.RegisterTopicValidator(topicName, v.Validate); err != nil { return xerrors.Errorf("failed to register validator for topic %s, err: %w", topicName, err) From 1e37185d741465a2d3bea5f4083cab029ff1136a Mon Sep 17 00:00:00 2001 From: gammazero Date: Thu, 10 Feb 2022 08:44:40 -0800 Subject: [PATCH 08/11] Ignore, not regect, indexer messages from self. --- chain/sub/incoming.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index 4277503fd..be2c75641 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -481,15 +481,15 @@ func (v *IndexerMessageValidator) Validate(ctx context.Context, pid peer.ID, msg // This chain-node should not be publishing its own messages. These are // relayed from market-nodes. If a node appears to be local, reject it. if pid == v.self { - log.Warnf("refusing to relay indexer message from self") + log.Debug("ignoring indexer message from self") stats.Record(ctx, metrics.IndexerMessageValidationFailure.M(1)) - return pubsub.ValidationReject + return pubsub.ValidationIgnore } originPeer := msg.GetFrom() if originPeer == v.self { - log.Warnf("refusing to relay indexer message originating from self") + log.Debug("ignoring indexer message originating from self") stats.Record(ctx, metrics.IndexerMessageValidationFailure.M(1)) - return pubsub.ValidationReject + return pubsub.ValidationIgnore } idxrMsg := dtsync.Message{} From b72cd90c19125ef6a0a7ced3fcff4a622f6b5b64 Mon Sep 17 00:00:00 2001 From: gammazero Date: Thu, 10 Feb 2022 09:21:21 -0800 Subject: [PATCH 09/11] Add indexer message scoring --- node/modules/lp2p/pubsub.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/node/modules/lp2p/pubsub.go b/node/modules/lp2p/pubsub.go index 14f100450..13d32bbcb 100644 --- a/node/modules/lp2p/pubsub.go +++ b/node/modules/lp2p/pubsub.go @@ -114,6 +114,22 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) { InvalidMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour), } + ingestTopicParams := &pubsub.TopicScoreParams{ + // expected ~0.5 confirmed deals / min. sampled + TopicWeight: 1, + + TimeInMeshWeight: 0.00027, // ~1/3600 + TimeInMeshQuantum: time.Second, + TimeInMeshCap: 1, + + FirstMessageDeliveriesWeight: 5, + FirstMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour), + FirstMessageDeliveriesCap: 100, // allowing for burstiness + + InvalidMessageDeliveriesWeight: -1000, + InvalidMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour), + } + topicParams := map[string]*pubsub.TopicScoreParams{ build.BlocksTopic(in.Nn): { // expected 10 blocks/min @@ -208,6 +224,9 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) { drandTopics = append(drandTopics, topic) } + // Index ingestion whitelist + topicParams[build.IndexerIngestTopic(in.Nn)] = ingestTopicParams + // IP colocation whitelist var ipcoloWhitelist []*net.IPNet for _, cidr := range in.Cfg.IPColocationWhitelist { From eedcf910eb3aaa0b221c81df79a4f9a2f141c44f Mon Sep 17 00:00:00 2001 From: gammazero Date: Thu, 10 Feb 2022 10:53:07 -0800 Subject: [PATCH 10/11] change scoring based on review --- node/modules/lp2p/pubsub.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node/modules/lp2p/pubsub.go b/node/modules/lp2p/pubsub.go index 13d32bbcb..a59ff94db 100644 --- a/node/modules/lp2p/pubsub.go +++ b/node/modules/lp2p/pubsub.go @@ -116,13 +116,13 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) { ingestTopicParams := &pubsub.TopicScoreParams{ // expected ~0.5 confirmed deals / min. sampled - TopicWeight: 1, + TopicWeight: 0.1, TimeInMeshWeight: 0.00027, // ~1/3600 TimeInMeshQuantum: time.Second, TimeInMeshCap: 1, - FirstMessageDeliveriesWeight: 5, + FirstMessageDeliveriesWeight: 0.5, FirstMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour), FirstMessageDeliveriesCap: 100, // allowing for burstiness From c1b2080f4e61db91ef4914ce45749fa133fab29f Mon Sep 17 00:00:00 2001 From: gammazero Date: Thu, 10 Feb 2022 11:25:30 -0800 Subject: [PATCH 11/11] spelling in comment --- chain/sub/ratelimit/window.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chain/sub/ratelimit/window.go b/chain/sub/ratelimit/window.go index f0429d3d3..0756e8998 100644 --- a/chain/sub/ratelimit/window.go +++ b/chain/sub/ratelimit/window.go @@ -11,7 +11,7 @@ type Window struct { } // NewWindow creates a new Window that limits the number of events to maximum -// count of events withing a duration of time. The capacity sets the maximum +// count of events within a duration of time. The capacity sets the maximum // number of events, and size sets the span of time over which the events are // counted. func NewWindow(capacity int, size time.Duration) *Window {