lotus/chain/sub/incoming.go

81 lines
2.1 KiB
Go
Raw Normal View History

2019-07-08 14:07:09 +00:00
package sub
import (
"context"
2019-07-08 14:07:09 +00:00
logging "github.com/ipfs/go-log"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/filecoin-project/lotus/chain"
"github.com/filecoin-project/lotus/chain/types"
2019-07-08 14:07:09 +00:00
)
var log = logging.Logger("sub")
func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *chain.Syncer) {
for {
msg, err := bsub.Next(ctx)
if err != nil {
2019-09-17 14:23:08 +00:00
if ctx.Err() != nil {
log.Warn("quitting HandleIncomingBlocks loop")
return
}
log.Error("error from block subscription: ", err)
2019-07-08 14:07:09 +00:00
continue
}
blk, err := types.DecodeBlockMsg(msg.GetData())
2019-07-08 14:07:09 +00:00
if err != nil {
log.Error("got invalid block over pubsub: ", err)
continue
}
go func() {
2019-08-27 18:45:21 +00:00
log.Debug("about to fetch messages for block from pubsub")
bmsgs, err := s.Bsync.FetchMessagesByCids(context.TODO(), blk.BlsMessages)
2019-07-08 14:07:09 +00:00
if err != nil {
log.Errorf("failed to fetch all bls messages for block received over pubusb: %s", err)
2019-07-08 14:07:09 +00:00
return
}
smsgs, err := s.Bsync.FetchSignedMessagesByCids(context.TODO(), blk.SecpkMessages)
if err != nil {
log.Errorf("failed to fetch all secpk messages for block received over pubusb: %s", err)
return
}
2019-11-07 00:18:06 +00:00
log.Debugw("new block over pubsub", "cid", blk.Header.Cid(), "source", msg.GetFrom())
s.InformNewBlock(msg.GetFrom(), &types.FullBlock{
Header: blk.Header,
BlsMessages: bmsgs,
SecpkMessages: smsgs,
2019-07-08 14:07:09 +00:00
})
}()
}
}
func HandleIncomingMessages(ctx context.Context, mpool *chain.MessagePool, msub *pubsub.Subscription) {
for {
msg, err := msub.Next(ctx)
if err != nil {
2019-09-17 14:23:08 +00:00
log.Warn("error from message subscription: ", err)
if ctx.Err() != nil {
log.Warn("quitting HandleIncomingMessages loop")
return
}
2019-07-08 14:07:09 +00:00
continue
}
m, err := types.DecodeSignedMessage(msg.GetData())
2019-07-08 14:07:09 +00:00
if err != nil {
log.Errorf("got incorrectly formatted Message: %s", err)
continue
}
if err := mpool.Add(m); err != nil {
log.Errorf("failed to add message from network to message pool (From: %s, To: %s, Nonce: %d, Value: %s): %+v", m.Message.From, m.Message.To, m.Message.Nonce, types.FIL(m.Message.Value), err)
2019-07-08 14:07:09 +00:00
continue
}
}
}