feat: add test and fixes for EthSubscribe (#9659)
This commit is contained in:
parent
cd180f1325
commit
5b4ec7dbea
@ -302,6 +302,7 @@ func (m *EventFilterManager) Apply(ctx context.Context, from, to *types.TipSet)
|
||||
if len(m.filters) == 0 && m.EventIndex == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
tse := &TipSetEvents{
|
||||
msgTs: from,
|
||||
rctTs: to,
|
||||
@ -360,7 +361,7 @@ func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight a
|
||||
currentHeight := m.currentHeight
|
||||
m.mu.Unlock()
|
||||
|
||||
if m.EventIndex == nil && minHeight < currentHeight {
|
||||
if m.EventIndex == nil && minHeight != -1 && minHeight < currentHeight {
|
||||
return nil, xerrors.Errorf("historic event index disabled")
|
||||
}
|
||||
|
||||
@ -379,7 +380,7 @@ func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight a
|
||||
maxResults: m.MaxFilterResults,
|
||||
}
|
||||
|
||||
if m.EventIndex != nil && minHeight < currentHeight {
|
||||
if m.EventIndex != nil && minHeight != -1 && minHeight < currentHeight {
|
||||
// Filter needs historic events
|
||||
if err := m.EventIndex.PrefillFilter(ctx, f); err != nil {
|
||||
return nil, err
|
||||
|
@ -592,3 +592,117 @@ func TestEthGetLogsAll(t *testing.T) {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func TestEthSubscribeLogs(t *testing.T) {
|
||||
require := require.New(t)
|
||||
|
||||
kit.QuietMiningLogs()
|
||||
|
||||
blockTime := 100 * time.Millisecond
|
||||
client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ThroughRPC(), kit.RealTimeFilterAPI())
|
||||
ens.InterconnectAll().BeginMining(blockTime)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
|
||||
defer cancel()
|
||||
|
||||
// install contract
|
||||
contractHex, err := os.ReadFile("contracts/events.bin")
|
||||
require.NoError(err)
|
||||
|
||||
contract, err := hex.DecodeString(string(contractHex))
|
||||
require.NoError(err)
|
||||
|
||||
fromAddr, err := client.WalletDefaultAddress(ctx)
|
||||
require.NoError(err)
|
||||
|
||||
result := client.EVM().DeployContract(ctx, fromAddr, contract)
|
||||
|
||||
idAddr, err := address.NewIDAddress(result.ActorID)
|
||||
require.NoError(err)
|
||||
t.Logf("actor ID address is %s", idAddr)
|
||||
|
||||
// install filter
|
||||
respCh, err := client.EthSubscribe(ctx, "logs", nil)
|
||||
require.NoError(err)
|
||||
|
||||
subResponses := []api.EthSubscriptionResponse{}
|
||||
go func() {
|
||||
for resp := range respCh {
|
||||
subResponses = append(subResponses, resp)
|
||||
}
|
||||
}()
|
||||
|
||||
const iterations = 10
|
||||
|
||||
type msgInTipset struct {
|
||||
msg api.Message
|
||||
ts *types.TipSet
|
||||
}
|
||||
|
||||
msgChan := make(chan msgInTipset, iterations)
|
||||
|
||||
waitAllCh := make(chan struct{})
|
||||
go func() {
|
||||
headChangeCh, err := client.ChainNotify(ctx)
|
||||
require.NoError(err)
|
||||
<-headChangeCh // skip hccurrent
|
||||
|
||||
count := 0
|
||||
for {
|
||||
select {
|
||||
case headChanges := <-headChangeCh:
|
||||
for _, change := range headChanges {
|
||||
if change.Type == store.HCApply || change.Type == store.HCRevert {
|
||||
msgs, err := client.ChainGetMessagesInTipset(ctx, change.Val.Key())
|
||||
require.NoError(err)
|
||||
|
||||
count += len(msgs)
|
||||
for _, m := range msgs {
|
||||
select {
|
||||
case msgChan <- msgInTipset{msg: m, ts: change.Val}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
if count == iterations {
|
||||
close(msgChan)
|
||||
close(waitAllCh)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
time.Sleep(blockTime * 6)
|
||||
|
||||
for i := 0; i < iterations; i++ {
|
||||
// log a four topic event with data
|
||||
ret := client.EVM().InvokeSolidity(ctx, fromAddr, idAddr, []byte{0x00, 0x00, 0x00, 0x02}, nil)
|
||||
require.True(ret.Receipt.ExitCode.IsSuccess(), "contract execution failed")
|
||||
}
|
||||
|
||||
select {
|
||||
case <-waitAllCh:
|
||||
case <-time.After(time.Minute):
|
||||
t.Errorf("timeout to wait for pack messages")
|
||||
}
|
||||
|
||||
if len(subResponses) > 0 {
|
||||
ok, err := client.EthUnsubscribe(ctx, subResponses[0].SubscriptionID)
|
||||
require.NoError(err)
|
||||
require.True(ok, "unsubscribed")
|
||||
}
|
||||
|
||||
received := make(map[api.EthHash]msgInTipset)
|
||||
for m := range msgChan {
|
||||
eh, err := api.NewEthHashFromCid(m.msg.Cid)
|
||||
require.NoError(err)
|
||||
received[eh] = m
|
||||
}
|
||||
require.Equal(iterations, len(received), "all messages on chain")
|
||||
|
||||
// expect to have seen all logs
|
||||
require.Equal(len(received), len(subResponses))
|
||||
}
|
||||
|
@ -157,7 +157,6 @@ var ChainNode = Options(
|
||||
Override(new(messagesigner.MpoolNonceAPI), From(new(*messagepool.MessagePool))),
|
||||
Override(new(full.ChainModuleAPI), From(new(full.ChainModule))),
|
||||
Override(new(full.EthModuleAPI), From(new(full.EthModule))),
|
||||
Override(new(full.EthEventAPI), From(new(full.EthEvent))),
|
||||
Override(new(full.GasModuleAPI), From(new(full.GasModule))),
|
||||
Override(new(full.MpoolModuleAPI), From(new(full.MpoolModule))),
|
||||
Override(new(full.StateModuleAPI), From(new(full.StateModule))),
|
||||
|
@ -96,7 +96,6 @@ type EthModule struct {
|
||||
var _ EthModuleAPI = (*EthModule)(nil)
|
||||
|
||||
type EthEvent struct {
|
||||
EthModuleAPI
|
||||
Chain *store.ChainStore
|
||||
EventFilterManager *filter.EventFilterManager
|
||||
TipSetFilterManager *filter.TipSetFilterManager
|
||||
@ -169,7 +168,7 @@ func (a *EthModule) EthGetBlockByHash(ctx context.Context, blkHash api.EthHash,
|
||||
if err != nil {
|
||||
return api.EthBlock{}, xerrors.Errorf("error loading tipset %s: %w", ts, err)
|
||||
}
|
||||
return a.newEthBlockFromFilecoinTipSet(ctx, ts, fullTxInfo)
|
||||
return newEthBlockFromFilecoinTipSet(ctx, ts, fullTxInfo, a.Chain, a.ChainAPI, a.StateAPI)
|
||||
}
|
||||
|
||||
func (a *EthModule) EthGetBlockByNumber(ctx context.Context, blkNum string, fullTxInfo bool) (api.EthBlock, error) {
|
||||
@ -183,7 +182,7 @@ func (a *EthModule) EthGetBlockByNumber(ctx context.Context, blkNum string, full
|
||||
if err != nil {
|
||||
return api.EthBlock{}, xerrors.Errorf("error loading tipset %s: %w", ts, err)
|
||||
}
|
||||
return a.newEthBlockFromFilecoinTipSet(ctx, ts, fullTxInfo)
|
||||
return newEthBlockFromFilecoinTipSet(ctx, ts, fullTxInfo, a.Chain, a.ChainAPI, a.StateAPI)
|
||||
}
|
||||
|
||||
func (a *EthModule) EthGetTransactionByHash(ctx context.Context, txHash *api.EthHash) (*api.EthTx, error) {
|
||||
@ -199,7 +198,7 @@ func (a *EthModule) EthGetTransactionByHash(ctx context.Context, txHash *api.Eth
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
tx, err := a.newEthTxFromFilecoinMessageLookup(ctx, msgLookup)
|
||||
tx, err := newEthTxFromFilecoinMessageLookup(ctx, msgLookup, a.Chain, a.ChainAPI, a.StateAPI)
|
||||
if err != nil {
|
||||
return nil, nil
|
||||
}
|
||||
@ -226,7 +225,7 @@ func (a *EthModule) EthGetTransactionReceipt(ctx context.Context, txHash api.Eth
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
tx, err := a.newEthTxFromFilecoinMessageLookup(ctx, msgLookup)
|
||||
tx, err := newEthTxFromFilecoinMessageLookup(ctx, msgLookup, a.Chain, a.ChainAPI, a.StateAPI)
|
||||
if err != nil {
|
||||
return nil, nil
|
||||
}
|
||||
@ -244,7 +243,7 @@ func (a *EthModule) EthGetTransactionReceipt(ctx context.Context, txHash api.Eth
|
||||
}
|
||||
}
|
||||
|
||||
receipt, err := a.newEthTxReceipt(ctx, tx, msgLookup, replay, events)
|
||||
receipt, err := newEthTxReceipt(ctx, tx, msgLookup, replay, events, a.StateAPI)
|
||||
if err != nil {
|
||||
return nil, nil
|
||||
}
|
||||
@ -454,7 +453,7 @@ func (a *EthModule) EthFeeHistory(ctx context.Context, blkCount api.EthUint64, n
|
||||
for ts.Height() >= abi.ChainEpoch(oldestBlkHeight) {
|
||||
// Unfortunately we need to rebuild the full message view so we can
|
||||
// totalize gas used in the tipset.
|
||||
block, err := a.newEthBlockFromFilecoinTipSet(ctx, ts, false)
|
||||
block, err := newEthBlockFromFilecoinTipSet(ctx, ts, false, a.Chain, a.ChainAPI, a.StateAPI)
|
||||
if err != nil {
|
||||
return api.EthFeeHistory{}, fmt.Errorf("cannot create eth block: %v", err)
|
||||
}
|
||||
@ -675,285 +674,6 @@ func (a *EthModule) EthCall(ctx context.Context, tx api.EthCall, blkParam string
|
||||
return api.EthBytes{}, nil
|
||||
}
|
||||
|
||||
func (a *EthModule) newEthBlockFromFilecoinTipSet(ctx context.Context, ts *types.TipSet, fullTxInfo bool) (api.EthBlock, error) {
|
||||
parent, err := a.Chain.LoadTipSet(ctx, ts.Parents())
|
||||
if err != nil {
|
||||
return api.EthBlock{}, err
|
||||
}
|
||||
parentKeyCid, err := parent.Key().Cid()
|
||||
if err != nil {
|
||||
return api.EthBlock{}, err
|
||||
}
|
||||
parentBlkHash, err := api.NewEthHashFromCid(parentKeyCid)
|
||||
if err != nil {
|
||||
return api.EthBlock{}, err
|
||||
}
|
||||
|
||||
blkCid, err := ts.Key().Cid()
|
||||
if err != nil {
|
||||
return api.EthBlock{}, err
|
||||
}
|
||||
blkHash, err := api.NewEthHashFromCid(blkCid)
|
||||
if err != nil {
|
||||
return api.EthBlock{}, err
|
||||
}
|
||||
|
||||
blkMsgs, err := a.Chain.BlockMsgsForTipset(ctx, ts)
|
||||
if err != nil {
|
||||
return api.EthBlock{}, xerrors.Errorf("error loading messages for tipset: %v: %w", ts, err)
|
||||
}
|
||||
|
||||
block := api.NewEthBlock()
|
||||
|
||||
// this seems to be a very expensive way to get gasUsed of the block. may need to find an efficient way to do it
|
||||
gasUsed := int64(0)
|
||||
for _, blkMsg := range blkMsgs {
|
||||
for _, msg := range append(blkMsg.BlsMessages, blkMsg.SecpkMessages...) {
|
||||
msgLookup, err := a.StateAPI.StateSearchMsg(ctx, types.EmptyTSK, msg.Cid(), api.LookbackNoLimit, true)
|
||||
if err != nil || msgLookup == nil {
|
||||
return api.EthBlock{}, nil
|
||||
}
|
||||
gasUsed += msgLookup.Receipt.GasUsed
|
||||
|
||||
if fullTxInfo {
|
||||
tx, err := a.newEthTxFromFilecoinMessageLookup(ctx, msgLookup)
|
||||
if err != nil {
|
||||
return api.EthBlock{}, nil
|
||||
}
|
||||
block.Transactions = append(block.Transactions, tx)
|
||||
} else {
|
||||
hash, err := api.NewEthHashFromCid(msg.Cid())
|
||||
if err != nil {
|
||||
return api.EthBlock{}, err
|
||||
}
|
||||
block.Transactions = append(block.Transactions, hash.String())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
block.Hash = blkHash
|
||||
block.Number = api.EthUint64(ts.Height())
|
||||
block.ParentHash = parentBlkHash
|
||||
block.Timestamp = api.EthUint64(ts.Blocks()[0].Timestamp)
|
||||
block.BaseFeePerGas = api.EthBigInt{Int: ts.Blocks()[0].ParentBaseFee.Int}
|
||||
block.GasUsed = api.EthUint64(gasUsed)
|
||||
return block, nil
|
||||
}
|
||||
|
||||
// lookupEthAddress makes its best effort at finding the Ethereum address for a
|
||||
// Filecoin address. It does the following:
|
||||
//
|
||||
// 1. If the supplied address is an f410 address, we return its payload as the EthAddress.
|
||||
// 2. Otherwise (f0, f1, f2, f3), we look up the actor on the state tree. If it has a predictable address, we return it if it's f410 address.
|
||||
// 3. Otherwise, we fall back to returning a masked ID Ethereum address. If the supplied address is an f0 address, we
|
||||
// use that ID to form the masked ID address.
|
||||
// 4. Otherwise, we fetch the actor's ID from the state tree and form the masked ID with it.
|
||||
func (a *EthModule) lookupEthAddress(ctx context.Context, addr address.Address) (api.EthAddress, error) {
|
||||
// Attempt to convert directly.
|
||||
if ethAddr, ok, err := api.TryEthAddressFromFilecoinAddress(addr, false); err != nil {
|
||||
return api.EthAddress{}, err
|
||||
} else if ok {
|
||||
return ethAddr, nil
|
||||
}
|
||||
|
||||
// Lookup on the target actor.
|
||||
actor, err := a.StateAPI.StateGetActor(ctx, addr, types.EmptyTSK)
|
||||
if err != nil {
|
||||
return api.EthAddress{}, err
|
||||
}
|
||||
if actor.Address != nil {
|
||||
if ethAddr, ok, err := api.TryEthAddressFromFilecoinAddress(*actor.Address, false); err != nil {
|
||||
return api.EthAddress{}, err
|
||||
} else if ok {
|
||||
return ethAddr, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Check if we already have an ID addr, and use it if possible.
|
||||
if ethAddr, ok, err := api.TryEthAddressFromFilecoinAddress(addr, true); err != nil {
|
||||
return api.EthAddress{}, err
|
||||
} else if ok {
|
||||
return ethAddr, nil
|
||||
}
|
||||
|
||||
// Otherwise, resolve the ID addr.
|
||||
idAddr, err := a.StateAPI.StateLookupID(ctx, addr, types.EmptyTSK)
|
||||
if err != nil {
|
||||
return api.EthAddress{}, err
|
||||
}
|
||||
return api.EthAddressFromFilecoinAddress(idAddr)
|
||||
}
|
||||
|
||||
func (a *EthModule) newEthTxFromFilecoinMessageLookup(ctx context.Context, msgLookup *api.MsgLookup) (api.EthTx, error) {
|
||||
if msgLookup == nil {
|
||||
return api.EthTx{}, fmt.Errorf("msg does not exist")
|
||||
}
|
||||
cid := msgLookup.Message
|
||||
txHash, err := api.NewEthHashFromCid(cid)
|
||||
if err != nil {
|
||||
return api.EthTx{}, err
|
||||
}
|
||||
|
||||
ts, err := a.Chain.LoadTipSet(ctx, msgLookup.TipSet)
|
||||
if err != nil {
|
||||
return api.EthTx{}, err
|
||||
}
|
||||
|
||||
// This tx is located in the parent tipset
|
||||
parentTs, err := a.Chain.LoadTipSet(ctx, ts.Parents())
|
||||
if err != nil {
|
||||
return api.EthTx{}, err
|
||||
}
|
||||
|
||||
parentTsCid, err := parentTs.Key().Cid()
|
||||
if err != nil {
|
||||
return api.EthTx{}, err
|
||||
}
|
||||
|
||||
blkHash, err := api.NewEthHashFromCid(parentTsCid)
|
||||
if err != nil {
|
||||
return api.EthTx{}, err
|
||||
}
|
||||
|
||||
msg, err := a.ChainAPI.ChainGetMessage(ctx, msgLookup.Message)
|
||||
if err != nil {
|
||||
return api.EthTx{}, err
|
||||
}
|
||||
|
||||
fromEthAddr, err := a.lookupEthAddress(ctx, msg.From)
|
||||
if err != nil {
|
||||
return api.EthTx{}, err
|
||||
}
|
||||
|
||||
toEthAddr, err := a.lookupEthAddress(ctx, msg.To)
|
||||
if err != nil {
|
||||
return api.EthTx{}, err
|
||||
}
|
||||
|
||||
toAddr := &toEthAddr
|
||||
input := msg.Params
|
||||
// Check to see if we need to decode as contract deployment.
|
||||
// We don't need to resolve the to address, because there's only one form (an ID).
|
||||
if msg.To == builtintypes.EthereumAddressManagerActorAddr {
|
||||
switch msg.Method {
|
||||
case builtintypes.MethodsEAM.Create:
|
||||
toAddr = nil
|
||||
var params eam.CreateParams
|
||||
err = params.UnmarshalCBOR(bytes.NewReader(msg.Params))
|
||||
input = params.Initcode
|
||||
case builtintypes.MethodsEAM.Create2:
|
||||
toAddr = nil
|
||||
var params eam.Create2Params
|
||||
err = params.UnmarshalCBOR(bytes.NewReader(msg.Params))
|
||||
input = params.Initcode
|
||||
}
|
||||
if err != nil {
|
||||
return api.EthTx{}, err
|
||||
}
|
||||
}
|
||||
// Otherwise, try to decode as a cbor byte array.
|
||||
// TODO: Actually check if this is an ethereum call. This code will work for demo purposes, but is not correct.
|
||||
if toAddr != nil {
|
||||
if decodedParams, err := cbg.ReadByteArray(bytes.NewReader(msg.Params), uint64(len(msg.Params))); err == nil {
|
||||
input = decodedParams
|
||||
}
|
||||
}
|
||||
|
||||
tx := api.EthTx{
|
||||
ChainID: api.EthUint64(build.Eip155ChainId),
|
||||
Hash: txHash,
|
||||
BlockHash: blkHash,
|
||||
BlockNumber: api.EthUint64(parentTs.Height()),
|
||||
From: fromEthAddr,
|
||||
To: toAddr,
|
||||
Value: api.EthBigInt(msg.Value),
|
||||
Type: api.EthUint64(2),
|
||||
Gas: api.EthUint64(msg.GasLimit),
|
||||
MaxFeePerGas: api.EthBigInt(msg.GasFeeCap),
|
||||
MaxPriorityFeePerGas: api.EthBigInt(msg.GasPremium),
|
||||
V: api.EthBytes{},
|
||||
R: api.EthBytes{},
|
||||
S: api.EthBytes{},
|
||||
Input: input,
|
||||
}
|
||||
return tx, nil
|
||||
}
|
||||
|
||||
func (a *EthModule) newEthTxReceipt(ctx context.Context, tx api.EthTx, lookup *api.MsgLookup, replay *api.InvocResult, events []types.Event) (api.EthTxReceipt, error) {
|
||||
receipt := api.EthTxReceipt{
|
||||
TransactionHash: tx.Hash,
|
||||
TransactionIndex: tx.TransactionIndex,
|
||||
BlockHash: tx.BlockHash,
|
||||
BlockNumber: tx.BlockNumber,
|
||||
From: tx.From,
|
||||
To: tx.To,
|
||||
StateRoot: api.EmptyEthHash,
|
||||
LogsBloom: []byte{0},
|
||||
}
|
||||
|
||||
if receipt.To == nil && lookup.Receipt.ExitCode.IsSuccess() {
|
||||
// Create and Create2 return the same things.
|
||||
var ret eam.CreateReturn
|
||||
if err := ret.UnmarshalCBOR(bytes.NewReader(lookup.Receipt.Return)); err != nil {
|
||||
return api.EthTxReceipt{}, xerrors.Errorf("failed to parse contract creation result: %w", err)
|
||||
}
|
||||
addr := api.EthAddress(ret.EthAddress)
|
||||
receipt.ContractAddress = &addr
|
||||
}
|
||||
|
||||
if lookup.Receipt.ExitCode.IsSuccess() {
|
||||
receipt.Status = 1
|
||||
}
|
||||
if lookup.Receipt.ExitCode.IsError() {
|
||||
receipt.Status = 0
|
||||
}
|
||||
|
||||
if len(events) > 0 {
|
||||
receipt.Logs = make([]api.EthLog, 0, len(events))
|
||||
for i, evt := range events {
|
||||
l := api.EthLog{
|
||||
Removed: false,
|
||||
LogIndex: api.EthUint64(i),
|
||||
TransactionIndex: tx.TransactionIndex,
|
||||
TransactionHash: tx.Hash,
|
||||
BlockHash: tx.BlockHash,
|
||||
BlockNumber: tx.BlockNumber,
|
||||
}
|
||||
|
||||
for _, entry := range evt.Entries {
|
||||
hash := api.EthHashData(entry.Value)
|
||||
if entry.Key == api.EthTopic1 || entry.Key == api.EthTopic2 || entry.Key == api.EthTopic3 || entry.Key == api.EthTopic4 {
|
||||
l.Topics = append(l.Topics, hash)
|
||||
} else {
|
||||
l.Data = append(l.Data, hash)
|
||||
}
|
||||
}
|
||||
|
||||
addr, err := address.NewIDAddress(uint64(evt.Emitter))
|
||||
if err != nil {
|
||||
return api.EthTxReceipt{}, xerrors.Errorf("failed to create ID address: %w", err)
|
||||
}
|
||||
|
||||
l.Address, err = a.lookupEthAddress(ctx, addr)
|
||||
if err != nil {
|
||||
return api.EthTxReceipt{}, xerrors.Errorf("failed to resolve Ethereum address: %w", err)
|
||||
}
|
||||
|
||||
receipt.Logs = append(receipt.Logs, l)
|
||||
}
|
||||
}
|
||||
|
||||
receipt.GasUsed = api.EthUint64(lookup.Receipt.GasUsed)
|
||||
|
||||
// TODO: handle CumulativeGasUsed
|
||||
receipt.CumulativeGasUsed = api.EmptyEthInt
|
||||
|
||||
effectiveGasPrice := big.Div(replay.GasCost.TotalCost, big.NewInt(lookup.Receipt.GasUsed))
|
||||
receipt.EffectiveGasPrice = api.EthBigInt(effectiveGasPrice)
|
||||
|
||||
return receipt, nil
|
||||
}
|
||||
|
||||
func (e *EthEvent) EthGetLogs(ctx context.Context, filterSpec *api.EthFilterSpec) (*api.EthFilterResult, error) {
|
||||
if e.EventFilterManager == nil {
|
||||
return nil, api.ErrNotSupported
|
||||
@ -1406,9 +1126,11 @@ func ethFilterResultFromMessages(cs []cid.Cid) (*api.EthFilterResult, error) {
|
||||
}
|
||||
|
||||
type EthSubscriptionManager struct {
|
||||
EthModuleAPI
|
||||
mu sync.Mutex
|
||||
subs map[string]*ethSubscription
|
||||
Chain *store.ChainStore
|
||||
StateAPI StateAPI
|
||||
ChainAPI ChainAPI
|
||||
mu sync.Mutex
|
||||
subs map[string]*ethSubscription
|
||||
}
|
||||
|
||||
func (e *EthSubscriptionManager) StartSubscription(ctx context.Context) (*ethSubscription, error) {
|
||||
@ -1420,11 +1142,13 @@ func (e *EthSubscriptionManager) StartSubscription(ctx context.Context) (*ethSub
|
||||
ctx, quit := context.WithCancel(ctx)
|
||||
|
||||
sub := ðSubscription{
|
||||
EthModuleAPI: e.EthModuleAPI,
|
||||
id: id.String(),
|
||||
in: make(chan interface{}, 200),
|
||||
out: make(chan api.EthSubscriptionResponse),
|
||||
quit: quit,
|
||||
Chain: e.Chain,
|
||||
StateAPI: e.StateAPI,
|
||||
ChainAPI: e.ChainAPI,
|
||||
id: id.String(),
|
||||
in: make(chan interface{}, 200),
|
||||
out: make(chan api.EthSubscriptionResponse, 20),
|
||||
quit: quit,
|
||||
}
|
||||
|
||||
e.mu.Lock()
|
||||
@ -1454,10 +1178,12 @@ func (e *EthSubscriptionManager) StopSubscription(ctx context.Context, id string
|
||||
}
|
||||
|
||||
type ethSubscription struct {
|
||||
EthModuleAPI
|
||||
id string
|
||||
in chan interface{}
|
||||
out chan api.EthSubscriptionResponse
|
||||
Chain *store.ChainStore
|
||||
StateAPI StateAPI
|
||||
ChainAPI ChainAPI
|
||||
id string
|
||||
in chan interface{}
|
||||
out chan api.EthSubscriptionResponse
|
||||
|
||||
mu sync.Mutex
|
||||
filters []filter.Filter
|
||||
@ -1487,19 +1213,7 @@ func (e *ethSubscription) start(ctx context.Context) {
|
||||
case *filter.CollectedEvent:
|
||||
resp.Result, err = ethFilterResultFromEvents([]*filter.CollectedEvent{vt})
|
||||
case *types.TipSet:
|
||||
// Sadly convoluted since the logic for conversion to eth block is long and buried away
|
||||
// in unexported methods of EthModule
|
||||
tsCid, err := vt.Key().Cid()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
hash, err := api.NewEthHashFromCid(tsCid)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
eb, err := e.EthGetBlockByHash(ctx, hash, true)
|
||||
eb, err := newEthBlockFromFilecoinTipSet(ctx, vt, true, e.Chain, e.ChainAPI, e.StateAPI)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
@ -1532,3 +1246,282 @@ func (e *ethSubscription) stop() {
|
||||
e.quit = nil
|
||||
}
|
||||
}
|
||||
|
||||
func newEthBlockFromFilecoinTipSet(ctx context.Context, ts *types.TipSet, fullTxInfo bool, cs *store.ChainStore, ca ChainAPI, sa StateAPI) (api.EthBlock, error) {
|
||||
parent, err := cs.LoadTipSet(ctx, ts.Parents())
|
||||
if err != nil {
|
||||
return api.EthBlock{}, err
|
||||
}
|
||||
parentKeyCid, err := parent.Key().Cid()
|
||||
if err != nil {
|
||||
return api.EthBlock{}, err
|
||||
}
|
||||
parentBlkHash, err := api.NewEthHashFromCid(parentKeyCid)
|
||||
if err != nil {
|
||||
return api.EthBlock{}, err
|
||||
}
|
||||
|
||||
blkCid, err := ts.Key().Cid()
|
||||
if err != nil {
|
||||
return api.EthBlock{}, err
|
||||
}
|
||||
blkHash, err := api.NewEthHashFromCid(blkCid)
|
||||
if err != nil {
|
||||
return api.EthBlock{}, err
|
||||
}
|
||||
|
||||
blkMsgs, err := cs.BlockMsgsForTipset(ctx, ts)
|
||||
if err != nil {
|
||||
return api.EthBlock{}, xerrors.Errorf("error loading messages for tipset: %v: %w", ts, err)
|
||||
}
|
||||
|
||||
block := api.NewEthBlock()
|
||||
|
||||
// this seems to be a very expensive way to get gasUsed of the block. may need to find an efficient way to do it
|
||||
gasUsed := int64(0)
|
||||
for _, blkMsg := range blkMsgs {
|
||||
for _, msg := range append(blkMsg.BlsMessages, blkMsg.SecpkMessages...) {
|
||||
msgLookup, err := sa.StateSearchMsg(ctx, types.EmptyTSK, msg.Cid(), api.LookbackNoLimit, true)
|
||||
if err != nil || msgLookup == nil {
|
||||
return api.EthBlock{}, nil
|
||||
}
|
||||
gasUsed += msgLookup.Receipt.GasUsed
|
||||
|
||||
if fullTxInfo {
|
||||
tx, err := newEthTxFromFilecoinMessageLookup(ctx, msgLookup, cs, ca, sa)
|
||||
if err != nil {
|
||||
return api.EthBlock{}, nil
|
||||
}
|
||||
block.Transactions = append(block.Transactions, tx)
|
||||
} else {
|
||||
hash, err := api.NewEthHashFromCid(msg.Cid())
|
||||
if err != nil {
|
||||
return api.EthBlock{}, err
|
||||
}
|
||||
block.Transactions = append(block.Transactions, hash.String())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
block.Hash = blkHash
|
||||
block.Number = api.EthUint64(ts.Height())
|
||||
block.ParentHash = parentBlkHash
|
||||
block.Timestamp = api.EthUint64(ts.Blocks()[0].Timestamp)
|
||||
block.BaseFeePerGas = api.EthBigInt{Int: ts.Blocks()[0].ParentBaseFee.Int}
|
||||
block.GasUsed = api.EthUint64(gasUsed)
|
||||
return block, nil
|
||||
}
|
||||
|
||||
// lookupEthAddress makes its best effort at finding the Ethereum address for a
|
||||
// Filecoin address. It does the following:
|
||||
//
|
||||
// 1. If the supplied address is an f410 address, we return its payload as the EthAddress.
|
||||
// 2. Otherwise (f0, f1, f2, f3), we look up the actor on the state tree. If it has a predictable address, we return it if it's f410 address.
|
||||
// 3. Otherwise, we fall back to returning a masked ID Ethereum address. If the supplied address is an f0 address, we
|
||||
// use that ID to form the masked ID address.
|
||||
// 4. Otherwise, we fetch the actor's ID from the state tree and form the masked ID with it.
|
||||
func lookupEthAddress(ctx context.Context, addr address.Address, sa StateAPI) (api.EthAddress, error) {
|
||||
// Attempt to convert directly.
|
||||
if ethAddr, ok, err := api.TryEthAddressFromFilecoinAddress(addr, false); err != nil {
|
||||
return api.EthAddress{}, err
|
||||
} else if ok {
|
||||
return ethAddr, nil
|
||||
}
|
||||
|
||||
// Lookup on the target actor.
|
||||
actor, err := sa.StateGetActor(ctx, addr, types.EmptyTSK)
|
||||
if err != nil {
|
||||
return api.EthAddress{}, err
|
||||
}
|
||||
if actor.Address != nil {
|
||||
if ethAddr, ok, err := api.TryEthAddressFromFilecoinAddress(*actor.Address, false); err != nil {
|
||||
return api.EthAddress{}, err
|
||||
} else if ok {
|
||||
return ethAddr, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Check if we already have an ID addr, and use it if possible.
|
||||
if ethAddr, ok, err := api.TryEthAddressFromFilecoinAddress(addr, true); err != nil {
|
||||
return api.EthAddress{}, err
|
||||
} else if ok {
|
||||
return ethAddr, nil
|
||||
}
|
||||
|
||||
// Otherwise, resolve the ID addr.
|
||||
idAddr, err := sa.StateLookupID(ctx, addr, types.EmptyTSK)
|
||||
if err != nil {
|
||||
return api.EthAddress{}, err
|
||||
}
|
||||
return api.EthAddressFromFilecoinAddress(idAddr)
|
||||
}
|
||||
|
||||
func newEthTxFromFilecoinMessageLookup(ctx context.Context, msgLookup *api.MsgLookup, cs *store.ChainStore, ca ChainAPI, sa StateAPI) (api.EthTx, error) {
|
||||
if msgLookup == nil {
|
||||
return api.EthTx{}, fmt.Errorf("msg does not exist")
|
||||
}
|
||||
cid := msgLookup.Message
|
||||
txHash, err := api.NewEthHashFromCid(cid)
|
||||
if err != nil {
|
||||
return api.EthTx{}, err
|
||||
}
|
||||
|
||||
ts, err := cs.LoadTipSet(ctx, msgLookup.TipSet)
|
||||
if err != nil {
|
||||
return api.EthTx{}, err
|
||||
}
|
||||
|
||||
// This tx is located in the parent tipset
|
||||
parentTs, err := cs.LoadTipSet(ctx, ts.Parents())
|
||||
if err != nil {
|
||||
return api.EthTx{}, err
|
||||
}
|
||||
|
||||
parentTsCid, err := parentTs.Key().Cid()
|
||||
if err != nil {
|
||||
return api.EthTx{}, err
|
||||
}
|
||||
|
||||
blkHash, err := api.NewEthHashFromCid(parentTsCid)
|
||||
if err != nil {
|
||||
return api.EthTx{}, err
|
||||
}
|
||||
|
||||
msg, err := ca.ChainGetMessage(ctx, msgLookup.Message)
|
||||
if err != nil {
|
||||
return api.EthTx{}, err
|
||||
}
|
||||
|
||||
fromEthAddr, err := lookupEthAddress(ctx, msg.From, sa)
|
||||
if err != nil {
|
||||
return api.EthTx{}, err
|
||||
}
|
||||
|
||||
toEthAddr, err := lookupEthAddress(ctx, msg.To, sa)
|
||||
if err != nil {
|
||||
return api.EthTx{}, err
|
||||
}
|
||||
|
||||
toAddr := &toEthAddr
|
||||
input := msg.Params
|
||||
// Check to see if we need to decode as contract deployment.
|
||||
// We don't need to resolve the to address, because there's only one form (an ID).
|
||||
if msg.To == builtintypes.EthereumAddressManagerActorAddr {
|
||||
switch msg.Method {
|
||||
case builtintypes.MethodsEAM.Create:
|
||||
toAddr = nil
|
||||
var params eam.CreateParams
|
||||
err = params.UnmarshalCBOR(bytes.NewReader(msg.Params))
|
||||
input = params.Initcode
|
||||
case builtintypes.MethodsEAM.Create2:
|
||||
toAddr = nil
|
||||
var params eam.Create2Params
|
||||
err = params.UnmarshalCBOR(bytes.NewReader(msg.Params))
|
||||
input = params.Initcode
|
||||
}
|
||||
if err != nil {
|
||||
return api.EthTx{}, err
|
||||
}
|
||||
}
|
||||
// Otherwise, try to decode as a cbor byte array.
|
||||
// TODO: Actually check if this is an ethereum call. This code will work for demo purposes, but is not correct.
|
||||
if toAddr != nil {
|
||||
if decodedParams, err := cbg.ReadByteArray(bytes.NewReader(msg.Params), uint64(len(msg.Params))); err == nil {
|
||||
input = decodedParams
|
||||
}
|
||||
}
|
||||
|
||||
tx := api.EthTx{
|
||||
ChainID: api.EthUint64(build.Eip155ChainId),
|
||||
Hash: txHash,
|
||||
BlockHash: blkHash,
|
||||
BlockNumber: api.EthUint64(parentTs.Height()),
|
||||
From: fromEthAddr,
|
||||
To: toAddr,
|
||||
Value: api.EthBigInt(msg.Value),
|
||||
Type: api.EthUint64(2),
|
||||
Gas: api.EthUint64(msg.GasLimit),
|
||||
MaxFeePerGas: api.EthBigInt(msg.GasFeeCap),
|
||||
MaxPriorityFeePerGas: api.EthBigInt(msg.GasPremium),
|
||||
V: api.EthBytes{},
|
||||
R: api.EthBytes{},
|
||||
S: api.EthBytes{},
|
||||
Input: input,
|
||||
}
|
||||
return tx, nil
|
||||
}
|
||||
|
||||
func newEthTxReceipt(ctx context.Context, tx api.EthTx, lookup *api.MsgLookup, replay *api.InvocResult, events []types.Event, sa StateAPI) (api.EthTxReceipt, error) {
|
||||
receipt := api.EthTxReceipt{
|
||||
TransactionHash: tx.Hash,
|
||||
TransactionIndex: tx.TransactionIndex,
|
||||
BlockHash: tx.BlockHash,
|
||||
BlockNumber: tx.BlockNumber,
|
||||
From: tx.From,
|
||||
To: tx.To,
|
||||
StateRoot: api.EmptyEthHash,
|
||||
LogsBloom: []byte{0},
|
||||
}
|
||||
|
||||
if receipt.To == nil && lookup.Receipt.ExitCode.IsSuccess() {
|
||||
// Create and Create2 return the same things.
|
||||
var ret eam.CreateReturn
|
||||
if err := ret.UnmarshalCBOR(bytes.NewReader(lookup.Receipt.Return)); err != nil {
|
||||
return api.EthTxReceipt{}, xerrors.Errorf("failed to parse contract creation result: %w", err)
|
||||
}
|
||||
addr := api.EthAddress(ret.EthAddress)
|
||||
receipt.ContractAddress = &addr
|
||||
}
|
||||
|
||||
if lookup.Receipt.ExitCode.IsSuccess() {
|
||||
receipt.Status = 1
|
||||
}
|
||||
if lookup.Receipt.ExitCode.IsError() {
|
||||
receipt.Status = 0
|
||||
}
|
||||
|
||||
if len(events) > 0 {
|
||||
receipt.Logs = make([]api.EthLog, 0, len(events))
|
||||
for i, evt := range events {
|
||||
l := api.EthLog{
|
||||
Removed: false,
|
||||
LogIndex: api.EthUint64(i),
|
||||
TransactionIndex: tx.TransactionIndex,
|
||||
TransactionHash: tx.Hash,
|
||||
BlockHash: tx.BlockHash,
|
||||
BlockNumber: tx.BlockNumber,
|
||||
}
|
||||
|
||||
for _, entry := range evt.Entries {
|
||||
hash := api.EthHashData(entry.Value)
|
||||
if entry.Key == api.EthTopic1 || entry.Key == api.EthTopic2 || entry.Key == api.EthTopic3 || entry.Key == api.EthTopic4 {
|
||||
l.Topics = append(l.Topics, hash)
|
||||
} else {
|
||||
l.Data = append(l.Data, hash)
|
||||
}
|
||||
}
|
||||
|
||||
addr, err := address.NewIDAddress(uint64(evt.Emitter))
|
||||
if err != nil {
|
||||
return api.EthTxReceipt{}, xerrors.Errorf("failed to create ID address: %w", err)
|
||||
}
|
||||
|
||||
l.Address, err = lookupEthAddress(ctx, addr, sa)
|
||||
if err != nil {
|
||||
return api.EthTxReceipt{}, xerrors.Errorf("failed to resolve Ethereum address: %w", err)
|
||||
}
|
||||
|
||||
receipt.Logs = append(receipt.Logs, l)
|
||||
}
|
||||
}
|
||||
|
||||
receipt.GasUsed = api.EthUint64(lookup.Receipt.GasUsed)
|
||||
|
||||
// TODO: handle CumulativeGasUsed
|
||||
receipt.CumulativeGasUsed = api.EmptyEthInt
|
||||
|
||||
effectiveGasPrice := big.Div(replay.GasCost.TotalCost, big.NewInt(lookup.Receipt.GasUsed))
|
||||
receipt.EffectiveGasPrice = api.EthBigInt(effectiveGasPrice)
|
||||
|
||||
return receipt, nil
|
||||
}
|
||||
|
@ -31,10 +31,9 @@ type EventAPI struct {
|
||||
|
||||
var _ events.EventAPI = &EventAPI{}
|
||||
|
||||
func EthEventAPI(cfg config.ActorEventConfig) func(helpers.MetricsCtx, fx.Lifecycle, *store.ChainStore, *stmgr.StateManager, EventAPI, *messagepool.MessagePool, full.EthModuleAPI) (*full.EthEvent, error) {
|
||||
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, cs *store.ChainStore, sm *stmgr.StateManager, evapi EventAPI, mp *messagepool.MessagePool, em full.EthModuleAPI) (*full.EthEvent, error) {
|
||||
func EthEventAPI(cfg config.ActorEventConfig) func(helpers.MetricsCtx, fx.Lifecycle, *store.ChainStore, *stmgr.StateManager, EventAPI, *messagepool.MessagePool, full.StateAPI, full.ChainAPI) (*full.EthEvent, error) {
|
||||
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, cs *store.ChainStore, sm *stmgr.StateManager, evapi EventAPI, mp *messagepool.MessagePool, stateapi full.StateAPI, chainapi full.ChainAPI) (*full.EthEvent, error) {
|
||||
ee := &full.EthEvent{
|
||||
EthModuleAPI: em,
|
||||
Chain: cs,
|
||||
MaxFilterHeightRange: abi.ChainEpoch(cfg.MaxFilterHeightRange),
|
||||
}
|
||||
@ -46,7 +45,9 @@ func EthEventAPI(cfg config.ActorEventConfig) func(helpers.MetricsCtx, fx.Lifecy
|
||||
}
|
||||
|
||||
ee.SubManager = &full.EthSubscriptionManager{
|
||||
EthModuleAPI: em,
|
||||
Chain: cs,
|
||||
StateAPI: stateapi,
|
||||
ChainAPI: chainapi,
|
||||
}
|
||||
ee.FilterStore = filter.NewMemFilterStore(cfg.MaxFilters)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user