package full import ( "context" "encoding/json" "sync" "github.com/google/uuid" "github.com/ipfs/go-cid" "github.com/zyedidia/generic/queue" "golang.org/x/xerrors" "github.com/filecoin-project/go-jsonrpc" "github.com/filecoin-project/lotus/chain/events/filter" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types/ethtypes" ) type filterEventCollector interface { TakeCollectedEvents(context.Context) []*filter.CollectedEvent } type filterMessageCollector interface { TakeCollectedMessages(context.Context) []*types.SignedMessage } type filterTipSetCollector interface { TakeCollectedTipSets(context.Context) []types.TipSetKey } func ethLogFromEvent(entries []types.EventEntry) (data []byte, topics []ethtypes.EthHash, ok bool) { var ( topicsFound [4]bool topicsFoundCount int dataFound bool ) // Topics must be non-nil, even if empty. So we might as well pre-allocate for 4 (the max). topics = make([]ethtypes.EthHash, 0, 4) for _, entry := range entries { // Drop events with non-raw topics. Built-in actors emit CBOR, and anything else would be // invalid anyway. if entry.Codec != cid.Raw { return nil, nil, false } // Check if the key is t1..t4 if len(entry.Key) == 2 && "t1" <= entry.Key && entry.Key <= "t4" { // '1' - '1' == 0, etc. idx := int(entry.Key[1] - '1') // Drop events with mis-sized topics. if len(entry.Value) != 32 { log.Warnw("got an EVM event topic with an invalid size", "key", entry.Key, "size", len(entry.Value)) return nil, nil, false } // Drop events with duplicate topics. if topicsFound[idx] { log.Warnw("got a duplicate EVM event topic", "key", entry.Key) return nil, nil, false } topicsFound[idx] = true topicsFoundCount++ // Extend the topics array for len(topics) <= idx { topics = append(topics, ethtypes.EthHash{}) } copy(topics[idx][:], entry.Value) } else if entry.Key == "d" { // Drop events with duplicate data fields. if dataFound { log.Warnw("got duplicate EVM event data") return nil, nil, false } dataFound = true data = entry.Value } else { // Skip entries we don't understand (makes it easier to extend things). // But we warn for now because we don't expect them. log.Warnw("unexpected event entry", "key", entry.Key) } } // Drop events with skipped topics. if len(topics) != topicsFoundCount { log.Warnw("EVM event topic length mismatch", "expected", len(topics), "actual", topicsFoundCount) return nil, nil, false } return data, topics, true } func ethFilterResultFromEvents(ctx context.Context, evs []*filter.CollectedEvent, sa StateAPI) (*ethtypes.EthFilterResult, error) { res := ðtypes.EthFilterResult{} for _, ev := range evs { log := ethtypes.EthLog{ Removed: ev.Reverted, LogIndex: ethtypes.EthUint64(ev.EventIdx), TransactionIndex: ethtypes.EthUint64(ev.MsgIdx), BlockNumber: ethtypes.EthUint64(ev.Height), } var ( err error ok bool ) log.Data, log.Topics, ok = ethLogFromEvent(ev.Entries) if !ok { continue } log.Address, err = ethtypes.EthAddressFromFilecoinAddress(ev.EmitterAddr) if err != nil { return nil, err } log.TransactionHash, err = ethTxHashFromMessageCid(ctx, ev.MsgCid, sa) if err != nil { return nil, err } if log.TransactionHash == ethtypes.EmptyEthHash { // We've garbage collected the message, ignore the events and continue. continue } c, err := ev.TipSetKey.Cid() if err != nil { return nil, err } log.BlockHash, err = ethtypes.EthHashFromCid(c) if err != nil { return nil, err } res.Results = append(res.Results, log) } return res, nil } func ethFilterResultFromTipSets(tsks []types.TipSetKey) (*ethtypes.EthFilterResult, error) { res := ðtypes.EthFilterResult{} for _, tsk := range tsks { c, err := tsk.Cid() if err != nil { return nil, err } hash, err := ethtypes.EthHashFromCid(c) if err != nil { return nil, err } res.Results = append(res.Results, hash) } return res, nil } func ethFilterResultFromMessages(cs []*types.SignedMessage) (*ethtypes.EthFilterResult, error) { res := ðtypes.EthFilterResult{} for _, c := range cs { hash, err := ethTxHashFromSignedMessage(c) if err != nil { return nil, err } res.Results = append(res.Results, hash) } return res, nil } type EthSubscriptionManager struct { Chain *store.ChainStore StateAPI StateAPI ChainAPI ChainAPI mu sync.Mutex subs map[ethtypes.EthSubscriptionID]*ethSubscription } func (e *EthSubscriptionManager) StartSubscription(ctx context.Context, out ethSubscriptionCallback, dropFilter func(context.Context, filter.Filter) error) (*ethSubscription, error) { // nolint rawid, err := uuid.NewRandom() if err != nil { return nil, xerrors.Errorf("new uuid: %w", err) } id := ethtypes.EthSubscriptionID{} copy(id[:], rawid[:]) // uuid is 16 bytes ctx, quit := context.WithCancel(ctx) sub := ðSubscription{ Chain: e.Chain, StateAPI: e.StateAPI, ChainAPI: e.ChainAPI, uninstallFilter: dropFilter, id: id, in: make(chan interface{}, 200), out: out, quit: quit, toSend: queue.New[[]byte](), sendCond: make(chan struct{}, 1), } e.mu.Lock() if e.subs == nil { e.subs = make(map[ethtypes.EthSubscriptionID]*ethSubscription) } e.subs[sub.id] = sub e.mu.Unlock() go sub.start(ctx) go sub.startOut(ctx) return sub, nil } func (e *EthSubscriptionManager) StopSubscription(ctx context.Context, id ethtypes.EthSubscriptionID) error { e.mu.Lock() defer e.mu.Unlock() sub, ok := e.subs[id] if !ok { return xerrors.Errorf("subscription not found") } sub.stop() delete(e.subs, id) return nil } type ethSubscriptionCallback func(context.Context, jsonrpc.RawParams) error const maxSendQueue = 20000 type ethSubscription struct { Chain *store.ChainStore StateAPI StateAPI ChainAPI ChainAPI uninstallFilter func(context.Context, filter.Filter) error id ethtypes.EthSubscriptionID in chan interface{} out ethSubscriptionCallback mu sync.Mutex filters []filter.Filter quit func() sendLk sync.Mutex sendQueueLen int toSend *queue.Queue[[]byte] sendCond chan struct{} lastSentTipset *types.TipSetKey } func (e *ethSubscription) addFilter(ctx context.Context, f filter.Filter) { e.mu.Lock() defer e.mu.Unlock() f.SetSubChannel(e.in) e.filters = append(e.filters, f) } // startOut processes the final subscription queue. It's here in case the subscriber // is slow, and we need to buffer the messages. func (e *ethSubscription) startOut(ctx context.Context) { for { select { case <-ctx.Done(): return case <-e.sendCond: e.sendLk.Lock() for !e.toSend.Empty() { front := e.toSend.Dequeue() e.sendQueueLen-- e.sendLk.Unlock() if err := e.out(ctx, front); err != nil { log.Warnw("error sending subscription response, killing subscription", "sub", e.id, "error", err) e.stop() return } e.sendLk.Lock() } e.sendLk.Unlock() } } } func (e *ethSubscription) send(ctx context.Context, v interface{}) { resp := ethtypes.EthSubscriptionResponse{ SubscriptionID: e.id, Result: v, } outParam, err := json.Marshal(resp) if err != nil { log.Warnw("marshaling subscription response", "sub", e.id, "error", err) return } e.sendLk.Lock() defer e.sendLk.Unlock() e.toSend.Enqueue(outParam) e.sendQueueLen++ if e.sendQueueLen > maxSendQueue { log.Warnw("subscription send queue full, killing subscription", "sub", e.id) e.stop() return } select { case e.sendCond <- struct{}{}: default: // already signalled, and we're holding the lock so we know that the event will be processed } } func (e *ethSubscription) start(ctx context.Context) { for ctx.Err() == nil { select { case <-ctx.Done(): return case v := <-e.in: switch vt := v.(type) { case *filter.CollectedEvent: evs, err := ethFilterResultFromEvents(ctx, []*filter.CollectedEvent{vt}, e.StateAPI) if err != nil { continue } for _, r := range evs.Results { e.send(ctx, r) } case *types.TipSet: // Skip processing for tipset at epoch 0 as it has no parent if vt.Height() == 0 { continue } // Check if the parent has already been processed parentTipSetKey := vt.Parents() if e.lastSentTipset != nil && (*e.lastSentTipset) == parentTipSetKey { continue } parentTipSet, loadErr := e.Chain.LoadTipSet(ctx, parentTipSetKey) if loadErr != nil { log.Warnw("failed to load parent tipset", "tipset", parentTipSetKey, "error", loadErr) continue } ethBlock, ethBlockErr := newEthBlockFromFilecoinTipSet(ctx, parentTipSet, true, e.Chain, e.StateAPI) if ethBlockErr != nil { continue } e.send(ctx, ethBlock) e.lastSentTipset = &parentTipSetKey case *types.SignedMessage: // mpool txid evs, err := ethFilterResultFromMessages([]*types.SignedMessage{vt}) if err != nil { continue } for _, r := range evs.Results { e.send(ctx, r) } default: log.Warnf("unexpected subscription value type: %T", vt) } } } } func (e *ethSubscription) stop() { e.mu.Lock() if e.quit == nil { e.mu.Unlock() return } if e.quit != nil { e.quit() e.quit = nil e.mu.Unlock() for _, f := range e.filters { // note: the context in actually unused in uninstallFilter if err := e.uninstallFilter(context.TODO(), f); err != nil { // this will leave the filter a zombie, collecting events up to the maximum allowed log.Warnf("failed to remove filter when unsubscribing: %v", err) } } } }