Merge pull request #4095 from filecoin-project/feat/daemon-lite

lotus-lite: replace wallet / msig calls with thin client to gateway
This commit is contained in:
Łukasz Magiera 2020-10-09 19:22:43 +02:00 committed by GitHub
commit 110f033c83
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 1228 additions and 248 deletions

View File

@ -367,6 +367,10 @@ type FullNode interface {
// StateWaitMsg looks back in the chain for a message. If not found, it blocks until the // StateWaitMsg looks back in the chain for a message. If not found, it blocks until the
// message arrives on chain, and gets to the indicated confidence depth. // message arrives on chain, and gets to the indicated confidence depth.
StateWaitMsg(ctx context.Context, cid cid.Cid, confidence uint64) (*MsgLookup, error) StateWaitMsg(ctx context.Context, cid cid.Cid, confidence uint64) (*MsgLookup, error)
// StateWaitMsgLimited looks back up to limit epochs in the chain for a message.
// If not found, it blocks until the message arrives on chain, and gets to the
// indicated confidence depth.
StateWaitMsgLimited(ctx context.Context, cid cid.Cid, confidence uint64, limit abi.ChainEpoch) (*MsgLookup, error)
// StateListMiners returns the addresses of every miner that has claimed power in the Power Actor // StateListMiners returns the addresses of every miner that has claimed power in the Power Actor
StateListMiners(context.Context, types.TipSetKey) ([]address.Address, error) StateListMiners(context.Context, types.TipSetKey) ([]address.Address, error)
// StateListActors returns the addresses of every actor in the state // StateListActors returns the addresses of every actor in the state

24
api/api_gateway.go Normal file
View File

@ -0,0 +1,24 @@
package api
import (
"context"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/chain/types"
"github.com/ipfs/go-cid"
)
type GatewayAPI interface {
ChainHead(ctx context.Context) (*types.TipSet, error)
ChainGetTipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error)
ChainGetTipSetByHeight(ctx context.Context, h abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error)
GasEstimateMessageGas(ctx context.Context, msg *types.Message, spec *MessageSendSpec, tsk types.TipSetKey) (*types.Message, error)
MpoolPush(ctx context.Context, sm *types.SignedMessage) (cid.Cid, error)
MsigGetAvailableBalance(ctx context.Context, addr address.Address, tsk types.TipSetKey) (types.BigInt, error)
MsigGetVested(ctx context.Context, addr address.Address, start types.TipSetKey, end types.TipSetKey) (types.BigInt, error)
StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error)
StateGetActor(ctx context.Context, actor address.Address, ts types.TipSetKey) (*types.Actor, error)
StateLookupID(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error)
StateWaitMsg(ctx context.Context, msg cid.Cid, confidence uint64) (*MsgLookup, error)
}

View File

@ -190,6 +190,7 @@ type FullNodeStruct struct {
StateReadState func(context.Context, address.Address, types.TipSetKey) (*api.ActorState, error) `perm:"read"` StateReadState func(context.Context, address.Address, types.TipSetKey) (*api.ActorState, error) `perm:"read"`
StateMsgGasCost func(context.Context, cid.Cid, types.TipSetKey) (*api.MsgGasCost, error) `perm:"read"` StateMsgGasCost func(context.Context, cid.Cid, types.TipSetKey) (*api.MsgGasCost, error) `perm:"read"`
StateWaitMsg func(ctx context.Context, cid cid.Cid, confidence uint64) (*api.MsgLookup, error) `perm:"read"` StateWaitMsg func(ctx context.Context, cid cid.Cid, confidence uint64) (*api.MsgLookup, error) `perm:"read"`
StateWaitMsgLimited func(context.Context, cid.Cid, uint64, abi.ChainEpoch) (*api.MsgLookup, error) `perm:"read"`
StateSearchMsg func(context.Context, cid.Cid) (*api.MsgLookup, error) `perm:"read"` StateSearchMsg func(context.Context, cid.Cid) (*api.MsgLookup, error) `perm:"read"`
StateListMiners func(context.Context, types.TipSetKey) ([]address.Address, error) `perm:"read"` StateListMiners func(context.Context, types.TipSetKey) ([]address.Address, error) `perm:"read"`
StateListActors func(context.Context, types.TipSetKey) ([]address.Address, error) `perm:"read"` StateListActors func(context.Context, types.TipSetKey) ([]address.Address, error) `perm:"read"`
@ -361,6 +362,23 @@ type WorkerStruct struct {
} }
} }
type GatewayStruct struct {
Internal struct {
// TODO: does the gateway need perms?
ChainGetTipSet func(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error)
ChainGetTipSetByHeight func(ctx context.Context, h abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error)
ChainHead func(ctx context.Context) (*types.TipSet, error)
GasEstimateMessageGas func(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec, tsk types.TipSetKey) (*types.Message, error)
MpoolPush func(ctx context.Context, sm *types.SignedMessage) (cid.Cid, error)
MsigGetAvailableBalance func(ctx context.Context, addr address.Address, tsk types.TipSetKey) (types.BigInt, error)
MsigGetVested func(ctx context.Context, addr address.Address, start types.TipSetKey, end types.TipSetKey) (types.BigInt, error)
StateAccountKey func(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error)
StateGetActor func(ctx context.Context, actor address.Address, ts types.TipSetKey) (*types.Actor, error)
StateLookupID func(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error)
StateWaitMsg func(ctx context.Context, msg cid.Cid, confidence uint64) (*api.MsgLookup, error)
}
}
// CommonStruct // CommonStruct
func (c *CommonStruct) AuthVerify(ctx context.Context, token string) ([]auth.Permission, error) { func (c *CommonStruct) AuthVerify(ctx context.Context, token string) ([]auth.Permission, error) {
@ -854,6 +872,10 @@ func (c *FullNodeStruct) StateWaitMsg(ctx context.Context, msgc cid.Cid, confide
return c.Internal.StateWaitMsg(ctx, msgc, confidence) return c.Internal.StateWaitMsg(ctx, msgc, confidence)
} }
func (c *FullNodeStruct) StateWaitMsgLimited(ctx context.Context, msgc cid.Cid, confidence uint64, limit abi.ChainEpoch) (*api.MsgLookup, error) {
return c.Internal.StateWaitMsgLimited(ctx, msgc, confidence, limit)
}
func (c *FullNodeStruct) StateSearchMsg(ctx context.Context, msgc cid.Cid) (*api.MsgLookup, error) { func (c *FullNodeStruct) StateSearchMsg(ctx context.Context, msgc cid.Cid) (*api.MsgLookup, error) {
return c.Internal.StateSearchMsg(ctx, msgc) return c.Internal.StateSearchMsg(ctx, msgc)
} }
@ -1372,7 +1394,52 @@ func (w *WorkerStruct) Closing(ctx context.Context) (<-chan struct{}, error) {
return w.Internal.Closing(ctx) return w.Internal.Closing(ctx)
} }
func (g GatewayStruct) ChainHead(ctx context.Context) (*types.TipSet, error) {
return g.Internal.ChainHead(ctx)
}
func (g GatewayStruct) ChainGetTipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error) {
return g.Internal.ChainGetTipSet(ctx, tsk)
}
func (g GatewayStruct) ChainGetTipSetByHeight(ctx context.Context, h abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error) {
return g.Internal.ChainGetTipSetByHeight(ctx, h, tsk)
}
func (g GatewayStruct) GasEstimateMessageGas(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec, tsk types.TipSetKey) (*types.Message, error) {
return g.Internal.GasEstimateMessageGas(ctx, msg, spec, tsk)
}
func (g GatewayStruct) MpoolPush(ctx context.Context, sm *types.SignedMessage) (cid.Cid, error) {
return g.Internal.MpoolPush(ctx, sm)
}
func (g GatewayStruct) MsigGetAvailableBalance(ctx context.Context, addr address.Address, tsk types.TipSetKey) (types.BigInt, error) {
return g.Internal.MsigGetAvailableBalance(ctx, addr, tsk)
}
func (g GatewayStruct) MsigGetVested(ctx context.Context, addr address.Address, start types.TipSetKey, end types.TipSetKey) (types.BigInt, error) {
return g.Internal.MsigGetVested(ctx, addr, start, end)
}
func (g GatewayStruct) StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error) {
return g.Internal.StateAccountKey(ctx, addr, tsk)
}
func (g GatewayStruct) StateGetActor(ctx context.Context, actor address.Address, ts types.TipSetKey) (*types.Actor, error) {
return g.Internal.StateGetActor(ctx, actor, ts)
}
func (g GatewayStruct) StateLookupID(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error) {
return g.Internal.StateLookupID(ctx, addr, tsk)
}
func (g GatewayStruct) StateWaitMsg(ctx context.Context, msg cid.Cid, confidence uint64) (*api.MsgLookup, error) {
return g.Internal.StateWaitMsg(ctx, msg, confidence)
}
var _ api.Common = &CommonStruct{} var _ api.Common = &CommonStruct{}
var _ api.FullNode = &FullNodeStruct{} var _ api.FullNode = &FullNodeStruct{}
var _ api.StorageMiner = &StorageMinerStruct{} var _ api.StorageMiner = &StorageMinerStruct{}
var _ api.WorkerAPI = &WorkerStruct{} var _ api.WorkerAPI = &WorkerStruct{}
var _ api.GatewayAPI = &GatewayStruct{}

View File

@ -82,3 +82,17 @@ func NewWorkerRPC(ctx context.Context, addr string, requestHeader http.Header) (
return &res, closer, err return &res, closer, err
} }
// NewGatewayRPC creates a new http jsonrpc client for a gateway node.
func NewGatewayRPC(ctx context.Context, addr string, requestHeader http.Header, opts ...jsonrpc.Option) (api.GatewayAPI, jsonrpc.ClientCloser, error) {
var res apistruct.GatewayStruct
closer, err := jsonrpc.NewMergeClient(ctx, addr, "Filecoin",
[]interface{}{
&res.Internal,
},
requestHeader,
opts...,
)
return &res, closer, err
}

View File

@ -12,10 +12,7 @@ import (
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/impl" "github.com/filecoin-project/lotus/node/impl"
) )
@ -37,11 +34,7 @@ func TestCCUpgrade(t *testing.T, b APIBuilder, blocktime time.Duration) {
func testCCUpgrade(t *testing.T, b APIBuilder, blocktime time.Duration, upgradeHeight abi.ChainEpoch) { func testCCUpgrade(t *testing.T, b APIBuilder, blocktime time.Duration, upgradeHeight abi.ChainEpoch) {
ctx := context.Background() ctx := context.Background()
n, sn := b(t, 1, OneMiner, node.Override(new(stmgr.UpgradeSchedule), stmgr.UpgradeSchedule{{ n, sn := b(t, []FullNodeOpts{FullNodeWithUpgradeAt(upgradeHeight)}, OneMiner)
Network: build.ActorUpgradeNetworkVersion,
Height: upgradeHeight,
Migration: stmgr.UpgradeActorsV2,
}}))
client := n[0].FullNode.(*impl.FullNodeAPI) client := n[0].FullNode.(*impl.FullNodeAPI)
miner := sn[0] miner := sn[0]

View File

@ -48,7 +48,7 @@ func TestDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, carExport
_ = os.Setenv("BELLMAN_NO_GPU", "1") _ = os.Setenv("BELLMAN_NO_GPU", "1")
ctx := context.Background() ctx := context.Background()
n, sn := b(t, 1, OneMiner) n, sn := b(t, OneFull, OneMiner)
client := n[0].FullNode.(*impl.FullNodeAPI) client := n[0].FullNode.(*impl.FullNodeAPI)
miner := sn[0] miner := sn[0]
@ -85,7 +85,7 @@ func TestDoubleDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration) {
_ = os.Setenv("BELLMAN_NO_GPU", "1") _ = os.Setenv("BELLMAN_NO_GPU", "1")
ctx := context.Background() ctx := context.Background()
n, sn := b(t, 1, OneMiner) n, sn := b(t, OneFull, OneMiner)
client := n[0].FullNode.(*impl.FullNodeAPI) client := n[0].FullNode.(*impl.FullNodeAPI)
miner := sn[0] miner := sn[0]
@ -149,7 +149,7 @@ func TestFastRetrievalDealFlow(t *testing.T, b APIBuilder, blocktime time.Durati
_ = os.Setenv("BELLMAN_NO_GPU", "1") _ = os.Setenv("BELLMAN_NO_GPU", "1")
ctx := context.Background() ctx := context.Background()
n, sn := b(t, 1, OneMiner) n, sn := b(t, OneFull, OneMiner)
client := n[0].FullNode.(*impl.FullNodeAPI) client := n[0].FullNode.(*impl.FullNodeAPI)
miner := sn[0] miner := sn[0]
@ -204,7 +204,7 @@ func TestSenondDealRetrieval(t *testing.T, b APIBuilder, blocktime time.Duration
_ = os.Setenv("BELLMAN_NO_GPU", "1") _ = os.Setenv("BELLMAN_NO_GPU", "1")
ctx := context.Background() ctx := context.Background()
n, sn := b(t, 1, OneMiner) n, sn := b(t, OneFull, OneMiner)
client := n[0].FullNode.(*impl.FullNodeAPI) client := n[0].FullNode.(*impl.FullNodeAPI)
miner := sn[0] miner := sn[0]

View File

@ -25,7 +25,7 @@ var log = logging.Logger("apitest")
func (ts *testSuite) testMining(t *testing.T) { func (ts *testSuite) testMining(t *testing.T) {
ctx := context.Background() ctx := context.Background()
apis, sn := ts.makeNodes(t, 1, OneMiner) apis, sn := ts.makeNodes(t, OneFull, OneMiner)
api := apis[0] api := apis[0]
newHeads, err := api.ChainNotify(ctx) newHeads, err := api.ChainNotify(ctx)
@ -54,7 +54,7 @@ func (ts *testSuite) testMiningReal(t *testing.T) {
}() }()
ctx := context.Background() ctx := context.Background()
apis, sn := ts.makeNodes(t, 1, OneMiner) apis, sn := ts.makeNodes(t, OneFull, OneMiner)
api := apis[0] api := apis[0]
newHeads, err := api.ChainNotify(ctx) newHeads, err := api.ChainNotify(ctx)
@ -93,7 +93,7 @@ func TestDealMining(t *testing.T, b APIBuilder, blocktime time.Duration, carExpo
// test making a deal with a fresh miner, and see if it starts to mine // test making a deal with a fresh miner, and see if it starts to mine
ctx := context.Background() ctx := context.Background()
n, sn := b(t, 1, []StorageMiner{ n, sn := b(t, OneFull, []StorageMiner{
{Full: 0, Preseal: PresealGenesis}, {Full: 0, Preseal: PresealGenesis},
{Full: 0, Preseal: 0}, // TODO: Add support for miners on non-first full node {Full: 0, Preseal: 0}, // TODO: Add support for miners on non-first full node
}) })

View File

@ -33,7 +33,7 @@ func TestPaymentChannels(t *testing.T, b APIBuilder, blocktime time.Duration) {
_ = os.Setenv("BELLMAN_NO_GPU", "1") _ = os.Setenv("BELLMAN_NO_GPU", "1")
ctx := context.Background() ctx := context.Background()
n, sn := b(t, 2, OneMiner) n, sn := b(t, TwoFull, OneMiner)
paymentCreator := n[0] paymentCreator := n[0]
paymentReceiver := n[1] paymentReceiver := n[1]

View File

@ -4,11 +4,14 @@ import (
"context" "context"
"testing" "testing"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/miner" "github.com/filecoin-project/lotus/miner"
@ -35,17 +38,27 @@ var PresealGenesis = -1
const GenesisPreseals = 2 const GenesisPreseals = 2
// Options for setting up a mock storage miner
type StorageMiner struct { type StorageMiner struct {
Full int Full int
Preseal int Preseal int
} }
type OptionGenerator func([]TestNode) node.Option
// Options for setting up a mock full node
type FullNodeOpts struct {
Lite bool // run node in "lite" mode
Opts OptionGenerator // generate dependency injection options
}
// APIBuilder is a function which is invoked in test suite to provide // APIBuilder is a function which is invoked in test suite to provide
// test nodes and networks // test nodes and networks
// //
// fullOpts array defines options for each full node
// storage array defines storage nodes, numbers in the array specify full node // storage array defines storage nodes, numbers in the array specify full node
// index the storage node 'belongs' to // index the storage node 'belongs' to
type APIBuilder func(t *testing.T, nFull int, storage []StorageMiner, opts ...node.Option) ([]TestNode, []TestStorageNode) type APIBuilder func(t *testing.T, full []FullNodeOpts, storage []StorageMiner) ([]TestNode, []TestStorageNode)
type testSuite struct { type testSuite struct {
makeNodes APIBuilder makeNodes APIBuilder
} }
@ -63,13 +76,39 @@ func TestApis(t *testing.T, b APIBuilder) {
t.Run("testMiningReal", ts.testMiningReal) t.Run("testMiningReal", ts.testMiningReal)
} }
func DefaultFullOpts(nFull int) []FullNodeOpts {
full := make([]FullNodeOpts, nFull)
for i := range full {
full[i] = FullNodeOpts{
Opts: func(nodes []TestNode) node.Option {
return node.Options()
},
}
}
return full
}
var OneMiner = []StorageMiner{{Full: 0, Preseal: PresealGenesis}} var OneMiner = []StorageMiner{{Full: 0, Preseal: PresealGenesis}}
var OneFull = DefaultFullOpts(1)
var TwoFull = DefaultFullOpts(2)
var FullNodeWithUpgradeAt = func(upgradeHeight abi.ChainEpoch) FullNodeOpts {
return FullNodeOpts{
Opts: func(nodes []TestNode) node.Option {
return node.Override(new(stmgr.UpgradeSchedule), stmgr.UpgradeSchedule{{
Network: build.ActorUpgradeNetworkVersion,
Height: upgradeHeight,
Migration: stmgr.UpgradeActorsV2,
}})
},
}
}
func (ts *testSuite) testVersion(t *testing.T) { func (ts *testSuite) testVersion(t *testing.T) {
build.RunningNodeType = build.NodeFull build.RunningNodeType = build.NodeFull
ctx := context.Background() ctx := context.Background()
apis, _ := ts.makeNodes(t, 1, OneMiner) apis, _ := ts.makeNodes(t, OneFull, OneMiner)
api := apis[0] api := apis[0]
v, err := api.Version(ctx) v, err := api.Version(ctx)
@ -81,7 +120,7 @@ func (ts *testSuite) testVersion(t *testing.T) {
func (ts *testSuite) testID(t *testing.T) { func (ts *testSuite) testID(t *testing.T) {
ctx := context.Background() ctx := context.Background()
apis, _ := ts.makeNodes(t, 1, OneMiner) apis, _ := ts.makeNodes(t, OneFull, OneMiner)
api := apis[0] api := apis[0]
id, err := api.ID(ctx) id, err := api.ID(ctx)
@ -93,7 +132,7 @@ func (ts *testSuite) testID(t *testing.T) {
func (ts *testSuite) testConnectTwo(t *testing.T) { func (ts *testSuite) testConnectTwo(t *testing.T) {
ctx := context.Background() ctx := context.Background()
apis, _ := ts.makeNodes(t, 2, OneMiner) apis, _ := ts.makeNodes(t, TwoFull, OneMiner)
p, err := apis[0].NetPeers(ctx) p, err := apis[0].NetPeers(ctx)
if err != nil { if err != nil {

View File

@ -15,11 +15,9 @@ import (
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/extern/sector-storage/mock" "github.com/filecoin-project/lotus/extern/sector-storage/mock"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing" sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
bminer "github.com/filecoin-project/lotus/miner" bminer "github.com/filecoin-project/lotus/miner"
"github.com/filecoin-project/lotus/node/impl" "github.com/filecoin-project/lotus/node/impl"
@ -34,7 +32,7 @@ func init() {
func TestPledgeSector(t *testing.T, b APIBuilder, blocktime time.Duration, nSectors int) { func TestPledgeSector(t *testing.T, b APIBuilder, blocktime time.Duration, nSectors int) {
ctx := context.Background() ctx := context.Background()
n, sn := b(t, 1, OneMiner) n, sn := b(t, OneFull, OneMiner)
client := n[0].FullNode.(*impl.FullNodeAPI) client := n[0].FullNode.(*impl.FullNodeAPI)
miner := sn[0] miner := sn[0]
@ -133,11 +131,7 @@ func testWindowPostUpgrade(t *testing.T, b APIBuilder, blocktime time.Duration,
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
n, sn := b(t, 1, OneMiner, node.Override(new(stmgr.UpgradeSchedule), stmgr.UpgradeSchedule{{ n, sn := b(t, []FullNodeOpts{FullNodeWithUpgradeAt(upgradeHeight)}, OneMiner)
Network: build.ActorUpgradeNetworkVersion,
Height: upgradeHeight,
Migration: stmgr.UpgradeActorsV2,
}}))
client := n[0].FullNode.(*impl.FullNodeAPI) client := n[0].FullNode.(*impl.FullNodeAPI)
miner := sn[0] miner := sn[0]

View File

@ -6,7 +6,6 @@ import (
"sync" "sync"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/chain/messagepool"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/wallet" "github.com/filecoin-project/lotus/chain/wallet"
"github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/dtypes"
@ -21,7 +20,7 @@ const dsKeyActorNonce = "ActorNextNonce"
var log = logging.Logger("messagesigner") var log = logging.Logger("messagesigner")
type mpoolAPI interface { type MpoolNonceAPI interface {
GetNonce(address.Address) (uint64, error) GetNonce(address.Address) (uint64, error)
} }
@ -30,15 +29,11 @@ type mpoolAPI interface {
type MessageSigner struct { type MessageSigner struct {
wallet *wallet.Wallet wallet *wallet.Wallet
lk sync.Mutex lk sync.Mutex
mpool mpoolAPI mpool MpoolNonceAPI
ds datastore.Batching ds datastore.Batching
} }
func NewMessageSigner(wallet *wallet.Wallet, mpool *messagepool.MessagePool, ds dtypes.MetadataDS) *MessageSigner { func NewMessageSigner(wallet *wallet.Wallet, mpool MpoolNonceAPI, ds dtypes.MetadataDS) *MessageSigner {
return newMessageSigner(wallet, mpool, ds)
}
func newMessageSigner(wallet *wallet.Wallet, mpool mpoolAPI, ds dtypes.MetadataDS) *MessageSigner {
ds = namespace.Wrap(ds, datastore.NewKey("/message-signer/")) ds = namespace.Wrap(ds, datastore.NewKey("/message-signer/"))
return &MessageSigner{ return &MessageSigner{
wallet: wallet, wallet: wallet,

View File

@ -177,7 +177,7 @@ func TestMessageSignerSignMessage(t *testing.T) {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
mpool := newMockMpool() mpool := newMockMpool()
ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
ms := newMessageSigner(w, mpool, ds) ms := NewMessageSigner(w, mpool, ds)
for _, m := range tt.msgs { for _, m := range tt.msgs {
if len(m.mpoolNonce) == 1 { if len(m.mpoolNonce) == 1 {

View File

@ -38,8 +38,16 @@ import (
"github.com/filecoin-project/lotus/chain/vm" "github.com/filecoin-project/lotus/chain/vm"
) )
const LookbackNoLimit = abi.ChainEpoch(-1)
var log = logging.Logger("statemgr") var log = logging.Logger("statemgr")
type StateManagerAPI interface {
LoadActorTsk(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*types.Actor, error)
LookupID(ctx context.Context, addr address.Address, ts *types.TipSet) (address.Address, error)
ResolveToKeyAddress(ctx context.Context, addr address.Address, ts *types.TipSet) (address.Address, error)
}
type versionSpec struct { type versionSpec struct {
networkVersion network.Version networkVersion network.Version
atOrBelow abi.ChainEpoch atOrBelow abi.ChainEpoch
@ -508,7 +516,7 @@ func (sm *StateManager) GetReceipt(ctx context.Context, msg cid.Cid, ts *types.T
return nil, fmt.Errorf("failed to load message: %w", err) return nil, fmt.Errorf("failed to load message: %w", err)
} }
_, r, _, err := sm.searchBackForMsg(ctx, ts, m) _, r, _, err := sm.searchBackForMsg(ctx, ts, m, LookbackNoLimit)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to look back through chain for message: %w", err) return nil, fmt.Errorf("failed to look back through chain for message: %w", err)
} }
@ -517,9 +525,9 @@ func (sm *StateManager) GetReceipt(ctx context.Context, msg cid.Cid, ts *types.T
} }
// WaitForMessage blocks until a message appears on chain. It looks backwards in the chain to see if this has already // WaitForMessage blocks until a message appears on chain. It looks backwards in the chain to see if this has already
// happened. It guarantees that the message has been on chain for at least confidence epochs without being reverted // happened, with an optional limit to how many epochs it will search. It guarantees that the message has been on
// before returning. // chain for at least confidence epochs without being reverted before returning.
func (sm *StateManager) WaitForMessage(ctx context.Context, mcid cid.Cid, confidence uint64) (*types.TipSet, *types.MessageReceipt, cid.Cid, error) { func (sm *StateManager) WaitForMessage(ctx context.Context, mcid cid.Cid, confidence uint64, lookbackLimit abi.ChainEpoch) (*types.TipSet, *types.MessageReceipt, cid.Cid, error) {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
@ -557,7 +565,7 @@ func (sm *StateManager) WaitForMessage(ctx context.Context, mcid cid.Cid, confid
var backFm cid.Cid var backFm cid.Cid
backSearchWait := make(chan struct{}) backSearchWait := make(chan struct{})
go func() { go func() {
fts, r, foundMsg, err := sm.searchBackForMsg(ctx, head[0].Val, msg) fts, r, foundMsg, err := sm.searchBackForMsg(ctx, head[0].Val, msg, lookbackLimit)
if err != nil { if err != nil {
log.Warnf("failed to look back through chain for message: %w", err) log.Warnf("failed to look back through chain for message: %w", err)
return return
@ -649,7 +657,7 @@ func (sm *StateManager) SearchForMessage(ctx context.Context, mcid cid.Cid) (*ty
return head, r, foundMsg, nil return head, r, foundMsg, nil
} }
fts, r, foundMsg, err := sm.searchBackForMsg(ctx, head, msg) fts, r, foundMsg, err := sm.searchBackForMsg(ctx, head, msg, LookbackNoLimit)
if err != nil { if err != nil {
log.Warnf("failed to look back through chain for message %s", mcid) log.Warnf("failed to look back through chain for message %s", mcid)
@ -663,7 +671,15 @@ func (sm *StateManager) SearchForMessage(ctx context.Context, mcid cid.Cid) (*ty
return fts, r, foundMsg, nil return fts, r, foundMsg, nil
} }
func (sm *StateManager) searchBackForMsg(ctx context.Context, from *types.TipSet, m types.ChainMsg) (*types.TipSet, *types.MessageReceipt, cid.Cid, error) { // searchBackForMsg searches up to limit tipsets backwards from the given
// tipset for a message receipt.
// If limit is
// - 0 then no tipsets are searched
// - 5 then five tipset are searched
// - LookbackNoLimit then there is no limit
func (sm *StateManager) searchBackForMsg(ctx context.Context, from *types.TipSet, m types.ChainMsg, limit abi.ChainEpoch) (*types.TipSet, *types.MessageReceipt, cid.Cid, error) {
limitHeight := from.Height() - limit
noLimit := limit == LookbackNoLimit
cur := from cur := from
curActor, err := sm.LoadActor(ctx, m.VMMessage().From, cur) curActor, err := sm.LoadActor(ctx, m.VMMessage().From, cur)
@ -679,7 +695,9 @@ func (sm *StateManager) searchBackForMsg(ctx context.Context, from *types.TipSet
mNonce := m.VMMessage().Nonce mNonce := m.VMMessage().Nonce
for { for {
if cur.Height() == 0 { // If we've reached the genesis block, or we've reached the limit of
// how far back to look
if cur.Height() == 0 || !noLimit && cur.Height() <= limitHeight {
// it ain't here! // it ain't here!
return nil, nil, cid.Undef, nil return nil, nil, cid.Undef, nil
} }
@ -1393,3 +1411,5 @@ func (sm *StateManager) GetMarketState(ctx context.Context, ts *types.TipSet) (m
} }
return actState, nil return actState, nil
} }
var _ StateManagerAPI = (*StateManager)(nil)

View File

@ -59,9 +59,11 @@ func MkBlock(parents *types.TipSet, weightInc uint64, ticketNonce uint64) *types
var pcids []cid.Cid var pcids []cid.Cid
var height abi.ChainEpoch var height abi.ChainEpoch
weight := types.NewInt(weightInc) weight := types.NewInt(weightInc)
var timestamp uint64
if parents != nil { if parents != nil {
pcids = parents.Cids() pcids = parents.Cids()
height = parents.Height() + 1 height = parents.Height() + 1
timestamp = parents.MinTimestamp() + build.BlockDelaySecs
weight = types.BigAdd(parents.Blocks()[0].ParentWeight, weight) weight = types.BigAdd(parents.Blocks()[0].ParentWeight, weight)
} }
@ -79,6 +81,7 @@ func MkBlock(parents *types.TipSet, weightInc uint64, ticketNonce uint64) *types
ParentWeight: weight, ParentWeight: weight,
Messages: c, Messages: c,
Height: height, Height: height,
Timestamp: timestamp,
ParentStateRoot: pstateRoot, ParentStateRoot: pstateRoot,
BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS, Data: []byte("boo! im a signature")}, BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS, Data: []byte("boo! im a signature")},
ParentBaseFee: types.NewInt(uint64(build.MinimumBaseFee)), ParentBaseFee: types.NewInt(uint64(build.MinimumBaseFee)),

View File

@ -289,6 +289,15 @@ func GetWorkerAPI(ctx *cli.Context) (api.WorkerAPI, jsonrpc.ClientCloser, error)
return client.NewWorkerRPC(ctx.Context, addr, headers) return client.NewWorkerRPC(ctx.Context, addr, headers)
} }
func GetGatewayAPI(ctx *cli.Context) (api.GatewayAPI, jsonrpc.ClientCloser, error) {
addr, headers, err := GetRawAPI(ctx, repo.FullNode)
if err != nil {
return nil, nil, err
}
return client.NewGatewayRPC(ctx.Context, addr, headers)
}
func DaemonContext(cctx *cli.Context) context.Context { func DaemonContext(cctx *cli.Context) context.Context {
if mtCtx, ok := cctx.App.Metadata[metadataTraceContext]; ok { if mtCtx, ok := cctx.App.Metadata[metadataTraceContext]; ok {
return mtCtx.(context.Context) return mtCtx.(context.Context)

View File

@ -390,7 +390,7 @@ func checkVoucherOutput(t *testing.T, list string, vouchers []voucherSpec) {
} }
func startTwoNodesOneMiner(ctx context.Context, t *testing.T, blocktime time.Duration) ([]test.TestNode, []address.Address) { func startTwoNodesOneMiner(ctx context.Context, t *testing.T, blocktime time.Duration) ([]test.TestNode, []address.Address) {
n, sn := builder.RPCMockSbBuilder(t, 2, test.OneMiner) n, sn := builder.RPCMockSbBuilder(t, test.TwoFull, test.OneMiner)
paymentCreator := n[0] paymentCreator := n[0]
paymentReceiver := n[1] paymentReceiver := n[1]

View File

@ -6,85 +6,174 @@ import (
"time" "time"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/impl/full"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"go.opencensus.io/trace"
) )
const LookbackCap = time.Hour const (
LookbackCap = time.Hour
stateWaitLookbackLimit = abi.ChainEpoch(20)
)
var ( var (
ErrLookbackTooLong = fmt.Errorf("lookbacks of more than %s are disallowed", LookbackCap) ErrLookbackTooLong = fmt.Errorf("lookbacks of more than %s are disallowed", LookbackCap)
) )
type GatewayAPI struct { // gatewayDepsAPI defines the API methods that the GatewayAPI depends on
api api.FullNode // (to make it easy to mock for tests)
type gatewayDepsAPI interface {
ChainHead(ctx context.Context) (*types.TipSet, error)
ChainGetTipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error)
ChainGetTipSetByHeight(ctx context.Context, h abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error)
GasEstimateMessageGas(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec, tsk types.TipSetKey) (*types.Message, error)
MpoolPushUntrusted(ctx context.Context, sm *types.SignedMessage) (cid.Cid, error)
MsigGetAvailableBalance(ctx context.Context, addr address.Address, tsk types.TipSetKey) (types.BigInt, error)
MsigGetVested(ctx context.Context, addr address.Address, start types.TipSetKey, end types.TipSetKey) (types.BigInt, error)
StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error)
StateGetActor(ctx context.Context, actor address.Address, ts types.TipSetKey) (*types.Actor, error)
StateLookupID(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error)
StateWaitMsgLimited(ctx context.Context, msg cid.Cid, confidence uint64, h abi.ChainEpoch) (*api.MsgLookup, error)
} }
func (a *GatewayAPI) getTipsetTimestamp(ctx context.Context, tsk types.TipSetKey) (time.Time, error) { type GatewayAPI struct {
api gatewayDepsAPI
}
func (a *GatewayAPI) checkTipsetKey(ctx context.Context, tsk types.TipSetKey) error {
if tsk.IsEmpty() { if tsk.IsEmpty() {
return time.Now(), nil return nil
} }
ts, err := a.api.ChainGetTipSet(ctx, tsk) ts, err := a.api.ChainGetTipSet(ctx, tsk)
if err != nil {
return time.Time{}, err
}
return time.Unix(int64(ts.Blocks()[0].Timestamp), 0), nil
}
func (a *GatewayAPI) checkTipset(ctx context.Context, ts types.TipSetKey) error {
when, err := a.getTipsetTimestamp(ctx, ts)
if err != nil { if err != nil {
return err return err
} }
if time.Since(when) > time.Hour { return a.checkTipset(ts)
}
func (a *GatewayAPI) checkTipset(ts *types.TipSet) error {
at := time.Unix(int64(ts.Blocks()[0].Timestamp), 0)
if err := a.checkTimestamp(at); err != nil {
return fmt.Errorf("bad tipset: %w", err)
}
return nil
}
func (a *GatewayAPI) checkTipsetHeight(ts *types.TipSet, h abi.ChainEpoch) error {
tsBlock := ts.Blocks()[0]
heightDelta := time.Duration(uint64(tsBlock.Height-h)*build.BlockDelaySecs) * time.Second
timeAtHeight := time.Unix(int64(tsBlock.Timestamp), 0).Add(-heightDelta)
if err := a.checkTimestamp(timeAtHeight); err != nil {
return fmt.Errorf("bad tipset height: %w", err)
}
return nil
}
func (a *GatewayAPI) checkTimestamp(at time.Time) error {
if time.Since(at) > LookbackCap {
return ErrLookbackTooLong return ErrLookbackTooLong
} }
return nil return nil
} }
func (a *GatewayAPI) StateGetActor(ctx context.Context, actor address.Address, ts types.TipSetKey) (*types.Actor, error) {
ctx, span := trace.StartSpan(ctx, "StateGetActor")
defer span.End()
if err := a.checkTipset(ctx, ts); err != nil {
return nil, fmt.Errorf("bad tipset: %w", err)
}
return a.api.StateGetActor(ctx, actor, ts)
}
func (a *GatewayAPI) ChainHead(ctx context.Context) (*types.TipSet, error) { func (a *GatewayAPI) ChainHead(ctx context.Context) (*types.TipSet, error) {
ctx, span := trace.StartSpan(ctx, "ChainHead")
defer span.End()
// TODO: cache and invalidate cache when timestamp is up (or have internal ChainNotify) // TODO: cache and invalidate cache when timestamp is up (or have internal ChainNotify)
return a.api.ChainHead(ctx) return a.api.ChainHead(ctx)
} }
func (a *GatewayAPI) ChainGetTipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error) { func (a *GatewayAPI) ChainGetTipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error) {
ctx, span := trace.StartSpan(ctx, "ChainGetTipSet")
defer span.End()
if err := a.checkTipset(ctx, tsk); err != nil {
return nil, fmt.Errorf("bad tipset: %w", err)
}
// TODO: since we're limiting lookbacks, should just cache this (could really even cache the json response bytes)
return a.api.ChainGetTipSet(ctx, tsk) return a.api.ChainGetTipSet(ctx, tsk)
} }
func (a *GatewayAPI) ChainGetTipSetByHeight(ctx context.Context, h abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error) {
ts, err := a.api.ChainGetTipSet(ctx, tsk)
if err != nil {
return nil, err
}
// Check if the tipset key refers to a tipset that's too far in the past
if err := a.checkTipset(ts); err != nil {
return nil, err
}
// Check if the height is too far in the past
if err := a.checkTipsetHeight(ts, h); err != nil {
return nil, err
}
return a.api.ChainGetTipSetByHeight(ctx, h, tsk)
}
func (a *GatewayAPI) GasEstimateMessageGas(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec, tsk types.TipSetKey) (*types.Message, error) {
if err := a.checkTipsetKey(ctx, tsk); err != nil {
return nil, err
}
return a.api.GasEstimateMessageGas(ctx, msg, spec, tsk)
}
func (a *GatewayAPI) MpoolPush(ctx context.Context, sm *types.SignedMessage) (cid.Cid, error) { func (a *GatewayAPI) MpoolPush(ctx context.Context, sm *types.SignedMessage) (cid.Cid, error) {
ctx, span := trace.StartSpan(ctx, "MpoolPush")
defer span.End()
// TODO: additional anti-spam checks // TODO: additional anti-spam checks
return a.api.MpoolPushUntrusted(ctx, sm) return a.api.MpoolPushUntrusted(ctx, sm)
} }
func (a *GatewayAPI) MsigGetAvailableBalance(ctx context.Context, addr address.Address, tsk types.TipSetKey) (types.BigInt, error) {
if err := a.checkTipsetKey(ctx, tsk); err != nil {
return types.NewInt(0), err
}
return a.api.MsigGetAvailableBalance(ctx, addr, tsk)
}
func (a *GatewayAPI) MsigGetVested(ctx context.Context, addr address.Address, start types.TipSetKey, end types.TipSetKey) (types.BigInt, error) {
if err := a.checkTipsetKey(ctx, start); err != nil {
return types.NewInt(0), err
}
if err := a.checkTipsetKey(ctx, end); err != nil {
return types.NewInt(0), err
}
return a.api.MsigGetVested(ctx, addr, start, end)
}
func (a *GatewayAPI) StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error) {
if err := a.checkTipsetKey(ctx, tsk); err != nil {
return address.Undef, err
}
return a.api.StateAccountKey(ctx, addr, tsk)
}
func (a *GatewayAPI) StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) {
if err := a.checkTipsetKey(ctx, tsk); err != nil {
return nil, err
}
return a.api.StateGetActor(ctx, actor, tsk)
}
func (a *GatewayAPI) StateLookupID(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error) {
if err := a.checkTipsetKey(ctx, tsk); err != nil {
return address.Undef, err
}
return a.api.StateLookupID(ctx, addr, tsk)
}
func (a *GatewayAPI) StateWaitMsg(ctx context.Context, msg cid.Cid, confidence uint64) (*api.MsgLookup, error) {
return a.api.StateWaitMsgLimited(ctx, msg, confidence, stateWaitLookbackLimit)
}
var _ api.GatewayAPI = (*GatewayAPI)(nil)
var _ full.ChainModuleAPI = (*GatewayAPI)(nil)
var _ full.GasModuleAPI = (*GatewayAPI)(nil)
var _ full.MpoolModuleAPI = (*GatewayAPI)(nil)
var _ full.StateModuleAPI = (*GatewayAPI)(nil)

View File

@ -0,0 +1,191 @@
package main
import (
"context"
"sync"
"testing"
"time"
"github.com/filecoin-project/lotus/build"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/lotus/chain/types/mock"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
"github.com/ipfs/go-cid"
)
func TestGatewayAPIChainGetTipSetByHeight(t *testing.T) {
ctx := context.Background()
lookbackTimestamp := uint64(time.Now().Unix()) - uint64(LookbackCap.Seconds())
type args struct {
h abi.ChainEpoch
tskh abi.ChainEpoch
genesisTS uint64
}
tests := []struct {
name string
args args
expErr bool
}{{
name: "basic",
args: args{
h: abi.ChainEpoch(1),
tskh: abi.ChainEpoch(5),
},
}, {
name: "genesis",
args: args{
h: abi.ChainEpoch(0),
tskh: abi.ChainEpoch(5),
},
}, {
name: "same epoch as tipset",
args: args{
h: abi.ChainEpoch(5),
tskh: abi.ChainEpoch(5),
},
}, {
name: "tipset too old",
args: args{
// Tipset height is 5, genesis is at LookbackCap - 10 epochs.
// So resulting tipset height will be 5 epochs earlier than LookbackCap.
h: abi.ChainEpoch(1),
tskh: abi.ChainEpoch(5),
genesisTS: lookbackTimestamp - build.BlockDelaySecs*10,
},
expErr: true,
}, {
name: "lookup height too old",
args: args{
// Tipset height is 5, lookup height is 1, genesis is at LookbackCap - 3 epochs.
// So
// - lookup height will be 2 epochs earlier than LookbackCap.
// - tipset height will be 2 epochs later than LookbackCap.
h: abi.ChainEpoch(1),
tskh: abi.ChainEpoch(5),
genesisTS: lookbackTimestamp - build.BlockDelaySecs*3,
},
expErr: true,
}, {
name: "tipset and lookup height within acceptable range",
args: args{
// Tipset height is 5, lookup height is 1, genesis is at LookbackCap.
// So
// - lookup height will be 1 epoch later than LookbackCap.
// - tipset height will be 5 epochs later than LookbackCap.
h: abi.ChainEpoch(1),
tskh: abi.ChainEpoch(5),
genesisTS: lookbackTimestamp,
},
}}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
mock := &mockGatewayDepsAPI{}
a := &GatewayAPI{api: mock}
// Create tipsets from genesis up to tskh and return the highest
ts := mock.createTipSets(tt.args.tskh, tt.args.genesisTS)
got, err := a.ChainGetTipSetByHeight(ctx, tt.args.h, ts.Key())
if tt.expErr {
require.Error(t, err)
} else {
require.NoError(t, err)
require.Equal(t, tt.args.h, got.Height())
}
})
}
}
type mockGatewayDepsAPI struct {
lk sync.RWMutex
tipsets []*types.TipSet
}
func (m *mockGatewayDepsAPI) ChainHead(ctx context.Context) (*types.TipSet, error) {
m.lk.RLock()
defer m.lk.RUnlock()
return m.tipsets[len(m.tipsets)-1], nil
}
func (m *mockGatewayDepsAPI) ChainGetTipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error) {
m.lk.RLock()
defer m.lk.RUnlock()
for _, ts := range m.tipsets {
if ts.Key() == tsk {
return ts, nil
}
}
return nil, nil
}
// createTipSets creates tipsets from genesis up to tskh and returns the highest
func (m *mockGatewayDepsAPI) createTipSets(h abi.ChainEpoch, genesisTimestamp uint64) *types.TipSet {
m.lk.Lock()
defer m.lk.Unlock()
targeth := h + 1 // add one for genesis block
if genesisTimestamp == 0 {
genesisTimestamp = uint64(time.Now().Unix()) - build.BlockDelaySecs*uint64(targeth)
}
var currts *types.TipSet
for currh := abi.ChainEpoch(0); currh < targeth; currh++ {
blks := mock.MkBlock(currts, 1, 1)
if currh == 0 {
blks.Timestamp = genesisTimestamp
}
currts = mock.TipSet(blks)
m.tipsets = append(m.tipsets, currts)
}
return m.tipsets[len(m.tipsets)-1]
}
func (m *mockGatewayDepsAPI) ChainGetTipSetByHeight(ctx context.Context, h abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error) {
m.lk.Lock()
defer m.lk.Unlock()
return m.tipsets[h], nil
}
func (m *mockGatewayDepsAPI) GasEstimateMessageGas(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec, tsk types.TipSetKey) (*types.Message, error) {
panic("implement me")
}
func (m *mockGatewayDepsAPI) MpoolPushUntrusted(ctx context.Context, sm *types.SignedMessage) (cid.Cid, error) {
panic("implement me")
}
func (m *mockGatewayDepsAPI) MsigGetAvailableBalance(ctx context.Context, addr address.Address, tsk types.TipSetKey) (types.BigInt, error) {
panic("implement me")
}
func (m *mockGatewayDepsAPI) MsigGetVested(ctx context.Context, addr address.Address, start types.TipSetKey, end types.TipSetKey) (types.BigInt, error) {
panic("implement me")
}
func (m *mockGatewayDepsAPI) StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error) {
panic("implement me")
}
func (m *mockGatewayDepsAPI) StateGetActor(ctx context.Context, actor address.Address, ts types.TipSetKey) (*types.Actor, error) {
panic("implement me")
}
func (m *mockGatewayDepsAPI) StateLookupID(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error) {
panic("implement me")
}
func (m *mockGatewayDepsAPI) StateWaitMsgLimited(ctx context.Context, msg cid.Cid, confidence uint64, h abi.ChainEpoch) (*api.MsgLookup, error) {
panic("implement me")
}

View File

@ -0,0 +1,211 @@
package main
import (
"bytes"
"context"
"fmt"
"os"
"testing"
"time"
init0 "github.com/filecoin-project/specs-actors/actors/builtin/init"
"github.com/filecoin-project/specs-actors/actors/builtin/multisig"
"github.com/stretchr/testify/require"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/client"
"github.com/filecoin-project/lotus/api/test"
"github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/wallet"
"github.com/filecoin-project/lotus/node"
builder "github.com/filecoin-project/lotus/node/test"
)
func init() {
policy.SetSupportedProofTypes(abi.RegisteredSealProof_StackedDrg2KiBV1)
policy.SetConsensusMinerMinPower(abi.NewStoragePower(2048))
policy.SetMinVerifiedDealSize(abi.NewStoragePower(256))
}
// TestEndToEnd tests that API calls can be made on a lite node that is
// connected through a gateway to a full API node
func TestEndToEnd(t *testing.T) {
_ = os.Setenv("BELLMAN_NO_GPU", "1")
blocktime := 5 * time.Millisecond
ctx := context.Background()
full, lite, closer := startNodes(ctx, t, blocktime)
defer closer()
// The full node starts with a wallet
fullWalletAddr, err := full.WalletDefaultAddress(ctx)
require.NoError(t, err)
// Check the full node's wallet balance from the lite node
balance, err := lite.WalletBalance(ctx, fullWalletAddr)
require.NoError(t, err)
fmt.Println(balance)
// Create a wallet on the lite node
liteWalletAddr, err := lite.WalletNew(ctx, wallet.ActSigType("secp256k1"))
require.NoError(t, err)
// Send some funds from the full node to the lite node
err = sendFunds(ctx, t, full, fullWalletAddr, liteWalletAddr, types.NewInt(1e18))
require.NoError(t, err)
// Send some funds from the lite node back to the full node
err = sendFunds(ctx, t, lite, liteWalletAddr, fullWalletAddr, types.NewInt(100))
require.NoError(t, err)
// Sign some data with the lite node wallet address
data := []byte("hello")
sig, err := lite.WalletSign(ctx, liteWalletAddr, data)
require.NoError(t, err)
// Verify the signature
ok, err := lite.WalletVerify(ctx, liteWalletAddr, data, sig)
require.NoError(t, err)
require.True(t, ok)
// Create some wallets on the lite node to use for testing multisig
var walletAddrs []address.Address
for i := 0; i < 4; i++ {
addr, err := lite.WalletNew(ctx, wallet.ActSigType("secp256k1"))
require.NoError(t, err)
walletAddrs = append(walletAddrs, addr)
err = sendFunds(ctx, t, lite, liteWalletAddr, addr, types.NewInt(1e15))
require.NoError(t, err)
}
// Create an msig with three of the addresses and threshold of two sigs
msigAddrs := walletAddrs[:3]
amt := types.NewInt(1000)
addProposal, err := lite.MsigCreate(ctx, 2, msigAddrs, abi.ChainEpoch(50), amt, liteWalletAddr, types.NewInt(0))
require.NoError(t, err)
res, err := lite.StateWaitMsg(ctx, addProposal, 1)
require.NoError(t, err)
require.EqualValues(t, 0, res.Receipt.ExitCode)
var execReturn init0.ExecReturn
err = execReturn.UnmarshalCBOR(bytes.NewReader(res.Receipt.Return))
require.NoError(t, err)
// Get available balance of msig: should be greater than zero and less
// than initial amount
msig := execReturn.IDAddress
msigBalance, err := lite.MsigGetAvailableBalance(ctx, msig, types.EmptyTSK)
require.NoError(t, err)
require.Greater(t, msigBalance.Int64(), int64(0))
require.Less(t, msigBalance.Int64(), amt.Int64())
// Propose to add a new address to the msig
addProposal, err = lite.MsigAddPropose(ctx, msig, walletAddrs[0], walletAddrs[3], false)
require.NoError(t, err)
res, err = lite.StateWaitMsg(ctx, addProposal, 1)
require.NoError(t, err)
require.EqualValues(t, 0, res.Receipt.ExitCode)
var proposeReturn multisig.ProposeReturn
err = proposeReturn.UnmarshalCBOR(bytes.NewReader(res.Receipt.Return))
require.NoError(t, err)
// Approve proposal (proposer is first (implicit) signer, approver is
// second signer
txnID := uint64(proposeReturn.TxnID)
approval1, err := lite.MsigAddApprove(ctx, msig, walletAddrs[1], txnID, walletAddrs[0], walletAddrs[3], false)
require.NoError(t, err)
res, err = lite.StateWaitMsg(ctx, approval1, 1)
require.NoError(t, err)
require.EqualValues(t, 0, res.Receipt.ExitCode)
var approveReturn multisig.ApproveReturn
err = approveReturn.UnmarshalCBOR(bytes.NewReader(res.Receipt.Return))
require.NoError(t, err)
require.True(t, approveReturn.Applied)
}
func sendFunds(ctx context.Context, t *testing.T, fromNode test.TestNode, fromAddr address.Address, toAddr address.Address, amt types.BigInt) error {
msg := &types.Message{
From: fromAddr,
To: toAddr,
Value: amt,
}
sm, err := fromNode.MpoolPushMessage(ctx, msg, nil)
if err != nil {
return err
}
res, err := fromNode.StateWaitMsg(ctx, sm.Cid(), 1)
if err != nil {
return err
}
if res.Receipt.ExitCode != 0 {
return xerrors.Errorf("send funds failed with exit code %d", res.Receipt.ExitCode)
}
return nil
}
func startNodes(ctx context.Context, t *testing.T, blocktime time.Duration) (test.TestNode, test.TestNode, jsonrpc.ClientCloser) {
var closer jsonrpc.ClientCloser
// Create one miner and two full nodes.
// - Put a gateway server in front of full node 1
// - Start full node 2 in lite mode
// - Connect lite node -> gateway server -> full node
opts := append(
// Full node
test.OneFull,
// Lite node
test.FullNodeOpts{
Lite: true,
Opts: func(nodes []test.TestNode) node.Option {
fullNode := nodes[0]
// Create a gateway server in front of the full node
_, addr, err := builder.CreateRPCServer(&GatewayAPI{api: fullNode})
require.NoError(t, err)
// Create a gateway client API that connects to the gateway server
var gapi api.GatewayAPI
gapi, closer, err = client.NewGatewayRPC(ctx, addr, nil)
require.NoError(t, err)
// Provide the gateway API to dependency injection
return node.Override(new(api.GatewayAPI), gapi)
},
},
)
n, sn := builder.RPCMockSbBuilder(t, opts, test.OneMiner)
full := n[0]
lite := n[1]
miner := sn[0]
// Get the listener address for the full node
fullAddr, err := full.NetAddrsListen(ctx)
require.NoError(t, err)
// Connect the miner and the full node
err = miner.NetConnect(ctx, fullAddr)
require.NoError(t, err)
// Start mining blocks
bm := test.NewBlockMiner(ctx, t, miner, blocktime)
bm.MineBlocks()
return full, lite, closer
}

View File

@ -5,8 +5,6 @@ import (
"testing" "testing"
"time" "time"
"github.com/filecoin-project/lotus/node"
logging "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
@ -64,8 +62,8 @@ func TestMinerAllInfo(t *testing.T) {
require.NoError(t, infoAllCmd.Action(cctx)) require.NoError(t, infoAllCmd.Action(cctx))
} }
bp := func(t *testing.T, nFull int, storage []test.StorageMiner, opts ...node.Option) ([]test.TestNode, []test.TestStorageNode) { bp := func(t *testing.T, fullOpts []test.FullNodeOpts, storage []test.StorageMiner) ([]test.TestNode, []test.TestStorageNode) {
n, sn = builder.Builder(t, nFull, storage, opts...) n, sn = builder.Builder(t, fullOpts, storage)
t.Run("pre-info-all", run) t.Run("pre-info-all", run)

View File

@ -15,8 +15,6 @@ import (
"runtime/pprof" "runtime/pprof"
"strings" "strings"
"github.com/filecoin-project/lotus/chain/types"
paramfetch "github.com/filecoin-project/go-paramfetch" paramfetch "github.com/filecoin-project/go-paramfetch"
"github.com/mitchellh/go-homedir" "github.com/mitchellh/go-homedir"
"github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr"
@ -32,6 +30,7 @@ import (
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/vm" "github.com/filecoin-project/lotus/chain/vm"
lcli "github.com/filecoin-project/lotus/cli" lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
@ -114,6 +113,11 @@ var DaemonCmd = &cli.Command{
Name: "halt-after-import", Name: "halt-after-import",
Usage: "halt the process after importing chain from file", Usage: "halt the process after importing chain from file",
}, },
&cli.BoolFlag{
Name: "lite",
Usage: "start lotus in lite mode",
Hidden: true,
},
&cli.StringFlag{ &cli.StringFlag{
Name: "pprof", Name: "pprof",
Usage: "specify name of file for writing cpu profile to", Usage: "specify name of file for writing cpu profile to",
@ -133,6 +137,8 @@ var DaemonCmd = &cli.Command{
}, },
}, },
Action: func(cctx *cli.Context) error { Action: func(cctx *cli.Context) error {
isLite := cctx.Bool("lite")
err := runmetrics.Enable(runmetrics.RunMetricOptions{ err := runmetrics.Enable(runmetrics.RunMetricOptions{
EnableCPU: true, EnableCPU: true,
EnableMemory: true, EnableMemory: true,
@ -192,9 +198,11 @@ var DaemonCmd = &cli.Command{
return xerrors.Errorf("repo init error: %w", err) return xerrors.Errorf("repo init error: %w", err)
} }
if !isLite {
if err := paramfetch.GetParams(lcli.ReqContext(cctx), build.ParametersJSON(), 0); err != nil { if err := paramfetch.GetParams(lcli.ReqContext(cctx), build.ParametersJSON(), 0); err != nil {
return xerrors.Errorf("fetching proof parameters: %w", err) return xerrors.Errorf("fetching proof parameters: %w", err)
} }
}
var genBytes []byte var genBytes []byte
if cctx.String("genesis") != "" { if cctx.String("genesis") != "" {
@ -240,10 +248,23 @@ var DaemonCmd = &cli.Command{
shutdownChan := make(chan struct{}) shutdownChan := make(chan struct{})
// If the daemon is started in "lite mode", provide a GatewayAPI
// for RPC calls
liteModeDeps := node.Options()
if isLite {
gapi, closer, err := lcli.GetGatewayAPI(cctx)
if err != nil {
return err
}
defer closer()
liteModeDeps = node.Override(new(api.GatewayAPI), gapi)
}
var api api.FullNode var api api.FullNode
stop, err := node.New(ctx, stop, err := node.New(ctx,
node.FullAPI(&api), node.FullAPI(&api, node.Lite(isLite)),
node.Override(new(dtypes.Bootstrapper), isBootstrapper), node.Override(new(dtypes.Bootstrapper), isBootstrapper),
node.Override(new(dtypes.ShutdownChan), shutdownChan), node.Override(new(dtypes.ShutdownChan), shutdownChan),
@ -251,6 +272,7 @@ var DaemonCmd = &cli.Command{
node.Repo(r), node.Repo(r),
genesis, genesis,
liteModeDeps,
node.ApplyIf(func(s *node.Settings) bool { return cctx.IsSet("api") }, node.ApplyIf(func(s *node.Settings) bool { return cctx.IsSet("api") },
node.Override(node.SetApiEndpointKey, func(lr repo.LockedRepo) error { node.Override(node.SetApiEndpointKey, func(lr repo.LockedRepo) error {

View File

@ -167,6 +167,7 @@
* [StateVerifiedRegistryRootKey](#StateVerifiedRegistryRootKey) * [StateVerifiedRegistryRootKey](#StateVerifiedRegistryRootKey)
* [StateVerifierStatus](#StateVerifierStatus) * [StateVerifierStatus](#StateVerifierStatus)
* [StateWaitMsg](#StateWaitMsg) * [StateWaitMsg](#StateWaitMsg)
* [StateWaitMsgLimited](#StateWaitMsgLimited)
* [Sync](#Sync) * [Sync](#Sync)
* [SyncCheckBad](#SyncCheckBad) * [SyncCheckBad](#SyncCheckBad)
* [SyncCheckpoint](#SyncCheckpoint) * [SyncCheckpoint](#SyncCheckpoint)
@ -4322,6 +4323,49 @@ Response:
} }
``` ```
### StateWaitMsgLimited
StateWaitMsgLimited looks back up to limit epochs in the chain for a message.
If not found, it blocks until the message arrives on chain, and gets to the
indicated confidence depth.
Perms: read
Inputs:
```json
[
{
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
},
42,
10101
]
```
Response:
```json
{
"Message": {
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
},
"Receipt": {
"ExitCode": 0,
"Return": "Ynl0ZSBhcnJheQ==",
"GasUsed": 9
},
"ReturnDec": {},
"TipSet": [
{
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
},
{
"/": "bafy2bzacebp3shtrn43k7g3unredz7fxn4gj533d3o43tqn2p2ipxxhrvchve"
}
],
"Height": 10101
}
```
## Sync ## Sync
The Sync method group contains methods for interacting with and The Sync method group contains methods for interacting with and
observing the lotus sync service. observing the lotus sync service.

View File

@ -187,7 +187,7 @@ func (c *ClientNodeAdapter) ValidatePublishedDeal(ctx context.Context, deal stor
} }
// TODO: timeout // TODO: timeout
_, ret, _, err := c.sm.WaitForMessage(ctx, *deal.PublishMessage, build.MessageConfidence) _, ret, _, err := c.sm.WaitForMessage(ctx, *deal.PublishMessage, build.MessageConfidence, stmgr.LookbackNoLimit)
if err != nil { if err != nil {
return 0, xerrors.Errorf("waiting for deal publish message: %w", err) return 0, xerrors.Errorf("waiting for deal publish message: %w", err)
} }

View File

@ -6,6 +6,13 @@ import (
"os" "os"
"time" "time"
"github.com/filecoin-project/lotus/chain"
"github.com/filecoin-project/lotus/chain/exchange"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/vm"
"github.com/filecoin-project/lotus/chain/wallet"
"github.com/filecoin-project/lotus/node/hello"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
ci "github.com/libp2p/go-libp2p-core/crypto" ci "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
@ -29,9 +36,7 @@ import (
storage2 "github.com/filecoin-project/specs-storage/storage" storage2 "github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain"
"github.com/filecoin-project/lotus/chain/beacon" "github.com/filecoin-project/lotus/chain/beacon"
"github.com/filecoin-project/lotus/chain/exchange"
"github.com/filecoin-project/lotus/chain/gen" "github.com/filecoin-project/lotus/chain/gen"
"github.com/filecoin-project/lotus/chain/gen/slashfilter" "github.com/filecoin-project/lotus/chain/gen/slashfilter"
"github.com/filecoin-project/lotus/chain/market" "github.com/filecoin-project/lotus/chain/market"
@ -39,10 +44,7 @@ import (
"github.com/filecoin-project/lotus/chain/messagesigner" "github.com/filecoin-project/lotus/chain/messagesigner"
"github.com/filecoin-project/lotus/chain/metrics" "github.com/filecoin-project/lotus/chain/metrics"
"github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/vm"
"github.com/filecoin-project/lotus/chain/wallet"
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/sector-storage/stores" "github.com/filecoin-project/lotus/extern/sector-storage/stores"
@ -56,9 +58,9 @@ import (
"github.com/filecoin-project/lotus/markets/storageadapter" "github.com/filecoin-project/lotus/markets/storageadapter"
"github.com/filecoin-project/lotus/miner" "github.com/filecoin-project/lotus/miner"
"github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/hello"
"github.com/filecoin-project/lotus/node/impl" "github.com/filecoin-project/lotus/node/impl"
"github.com/filecoin-project/lotus/node/impl/common" "github.com/filecoin-project/lotus/node/impl/common"
"github.com/filecoin-project/lotus/node/impl/full"
"github.com/filecoin-project/lotus/node/modules" "github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/helpers" "github.com/filecoin-project/lotus/node/modules/helpers"
@ -158,7 +160,7 @@ type Settings struct {
Online bool // Online option applied Online bool // Online option applied
Config bool // Config option applied Config bool // Config option applied
Lite bool // Start node in "lite" mode
} }
func defaults() []Option { func defaults() []Option {
@ -232,6 +234,10 @@ func isType(t repo.RepoType) func(s *Settings) bool {
// Online sets up basic libp2p node // Online sets up basic libp2p node
func Online() Option { func Online() Option {
isFullOrLiteNode := func(s *Settings) bool { return s.nodeType == repo.FullNode }
isFullNode := func(s *Settings) bool { return s.nodeType == repo.FullNode && !s.Lite }
isLiteNode := func(s *Settings) bool { return s.nodeType == repo.FullNode && s.Lite }
return Options( return Options(
// make sure that online is applied before Config. // make sure that online is applied before Config.
// This is important because Config overrides some of Online units // This is important because Config overrides some of Online units
@ -245,22 +251,20 @@ func Online() Option {
// common // common
Override(new(*slashfilter.SlashFilter), modules.NewSlashFilter), Override(new(*slashfilter.SlashFilter), modules.NewSlashFilter),
// Full node // Full node or lite node
ApplyIf(isFullOrLiteNode,
ApplyIf(isType(repo.FullNode),
// TODO: Fix offline mode // TODO: Fix offline mode
Override(new(dtypes.BootstrapPeers), modules.BuiltinBootstrap), Override(new(dtypes.BootstrapPeers), modules.BuiltinBootstrap),
Override(new(dtypes.DrandBootstrap), modules.DrandBootstrap), Override(new(dtypes.DrandBootstrap), modules.DrandBootstrap),
Override(new(dtypes.DrandSchedule), modules.BuiltinDrandConfig), Override(new(dtypes.DrandSchedule), modules.BuiltinDrandConfig),
Override(HandleIncomingMessagesKey, modules.HandleIncomingMessages),
Override(new(ffiwrapper.Verifier), ffiwrapper.ProofVerifier), Override(new(ffiwrapper.Verifier), ffiwrapper.ProofVerifier),
Override(new(vm.SyscallBuilder), vm.Syscalls), Override(new(vm.SyscallBuilder), vm.Syscalls),
Override(new(*store.ChainStore), modules.ChainStore), Override(new(*store.ChainStore), modules.ChainStore),
Override(new(stmgr.UpgradeSchedule), stmgr.DefaultUpgradeSchedule()), Override(new(stmgr.UpgradeSchedule), stmgr.DefaultUpgradeSchedule()),
Override(new(*stmgr.StateManager), stmgr.NewStateManagerWithUpgradeSchedule), Override(new(*stmgr.StateManager), stmgr.NewStateManagerWithUpgradeSchedule),
Override(new(stmgr.StateManagerAPI), From(new(*stmgr.StateManager))),
Override(new(*wallet.Wallet), wallet.NewWallet), Override(new(*wallet.Wallet), wallet.NewWallet),
Override(new(*messagesigner.MessageSigner), messagesigner.NewMessageSigner), Override(new(*messagesigner.MessageSigner), messagesigner.NewMessageSigner),
@ -288,12 +292,6 @@ func Online() Option {
Override(new(dtypes.Graphsync), modules.Graphsync), Override(new(dtypes.Graphsync), modules.Graphsync),
Override(new(*dtypes.MpoolLocker), new(dtypes.MpoolLocker)), Override(new(*dtypes.MpoolLocker), new(dtypes.MpoolLocker)),
Override(RunHelloKey, modules.RunHello),
Override(RunChainExchangeKey, modules.RunChainExchange),
Override(RunPeerMgrKey, modules.RunPeerMgr),
Override(HandleIncomingBlocksKey, modules.HandleIncomingBlocks),
Override(new(*discoveryimpl.Local), modules.NewLocalDiscovery), Override(new(*discoveryimpl.Local), modules.NewLocalDiscovery),
Override(new(discovery.PeerResolver), modules.RetrievalResolver), Override(new(discovery.PeerResolver), modules.RetrievalResolver),
@ -312,8 +310,34 @@ func Online() Option {
Override(SettlePaymentChannelsKey, settler.SettlePaymentChannels), Override(SettlePaymentChannelsKey, settler.SettlePaymentChannels),
), ),
// Lite node
ApplyIf(isLiteNode,
Override(new(messagesigner.MpoolNonceAPI), From(new(modules.MpoolNonceAPI))),
Override(new(full.ChainModuleAPI), From(new(api.GatewayAPI))),
Override(new(full.GasModuleAPI), From(new(api.GatewayAPI))),
Override(new(full.MpoolModuleAPI), From(new(api.GatewayAPI))),
Override(new(full.StateModuleAPI), From(new(api.GatewayAPI))),
Override(new(stmgr.StateManagerAPI), modules.NewRPCStateManager),
),
// Full node
ApplyIf(isFullNode,
Override(new(messagesigner.MpoolNonceAPI), From(new(*messagepool.MessagePool))),
Override(new(full.ChainModuleAPI), From(new(full.ChainModule))),
Override(new(full.GasModuleAPI), From(new(full.GasModule))),
Override(new(full.MpoolModuleAPI), From(new(full.MpoolModule))),
Override(new(full.StateModuleAPI), From(new(full.StateModule))),
Override(new(stmgr.StateManagerAPI), From(new(*stmgr.StateManager))),
Override(RunHelloKey, modules.RunHello),
Override(RunChainExchangeKey, modules.RunChainExchange),
Override(RunPeerMgrKey, modules.RunPeerMgr),
Override(HandleIncomingMessagesKey, modules.HandleIncomingMessages),
Override(HandleIncomingBlocksKey, modules.HandleIncomingBlocks),
),
// miner // miner
ApplyIf(func(s *Settings) bool { return s.nodeType == repo.StorageMiner }, ApplyIf(isType(repo.StorageMiner),
Override(new(api.Common), From(new(common.CommonAPI))), Override(new(api.Common), From(new(common.CommonAPI))),
Override(new(sectorstorage.StorageAuth), modules.StorageAuth), Override(new(sectorstorage.StorageAuth), modules.StorageAuth),
@ -507,12 +531,22 @@ func Repo(r repo.Repo) Option {
} }
} }
func FullAPI(out *api.FullNode) Option { type FullOption = Option
func Lite(enable bool) FullOption {
return func(s *Settings) error {
s.Lite = enable
return nil
}
}
func FullAPI(out *api.FullNode, fopts ...FullOption) Option {
return Options( return Options(
func(s *Settings) error { func(s *Settings) error {
s.nodeType = repo.FullNode s.nodeType = repo.FullNode
return nil return nil
}, },
Options(fopts...),
func(s *Settings) error { func(s *Settings) error {
resAPI := &impl.FullNodeAPI{} resAPI := &impl.FullNodeAPI{}
s.invokes[ExtractApiKey] = fx.Populate(resAPI) s.invokes[ExtractApiKey] = fx.Populate(resAPI)

View File

@ -39,10 +39,28 @@ import (
var log = logging.Logger("fullnode") var log = logging.Logger("fullnode")
type ChainModuleAPI interface {
ChainHead(context.Context) (*types.TipSet, error)
ChainGetTipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error)
ChainGetTipSetByHeight(ctx context.Context, h abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error)
}
// ChainModule provides a default implementation of ChainModuleAPI.
// It can be swapped out with another implementation through Dependency
// Injection (for example with a thin RPC client).
type ChainModule struct {
fx.In
Chain *store.ChainStore
}
var _ ChainModuleAPI = (*ChainModule)(nil)
type ChainAPI struct { type ChainAPI struct {
fx.In fx.In
WalletAPI WalletAPI
ChainModuleAPI
Chain *store.ChainStore Chain *store.ChainStore
} }
@ -51,8 +69,8 @@ func (a *ChainAPI) ChainNotify(ctx context.Context) (<-chan []*api.HeadChange, e
return a.Chain.SubHeadChanges(ctx), nil return a.Chain.SubHeadChanges(ctx), nil
} }
func (a *ChainAPI) ChainHead(context.Context) (*types.TipSet, error) { func (m *ChainModule) ChainHead(context.Context) (*types.TipSet, error) {
return a.Chain.GetHeaviestTipSet(), nil return m.Chain.GetHeaviestTipSet(), nil
} }
func (a *ChainAPI) ChainGetRandomnessFromTickets(ctx context.Context, tsk types.TipSetKey, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte) (abi.Randomness, error) { func (a *ChainAPI) ChainGetRandomnessFromTickets(ctx context.Context, tsk types.TipSetKey, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte) (abi.Randomness, error) {
@ -77,8 +95,8 @@ func (a *ChainAPI) ChainGetBlock(ctx context.Context, msg cid.Cid) (*types.Block
return a.Chain.GetBlock(msg) return a.Chain.GetBlock(msg)
} }
func (a *ChainAPI) ChainGetTipSet(ctx context.Context, key types.TipSetKey) (*types.TipSet, error) { func (m *ChainModule) ChainGetTipSet(ctx context.Context, key types.TipSetKey) (*types.TipSet, error) {
return a.Chain.LoadTipSet(key) return m.Chain.LoadTipSet(key)
} }
func (a *ChainAPI) ChainGetBlockMessages(ctx context.Context, msg cid.Cid) (*api.BlockMessages, error) { func (a *ChainAPI) ChainGetBlockMessages(ctx context.Context, msg cid.Cid) (*api.BlockMessages, error) {
@ -180,12 +198,12 @@ func (a *ChainAPI) ChainGetParentReceipts(ctx context.Context, bcid cid.Cid) ([]
return out, nil return out, nil
} }
func (a *ChainAPI) ChainGetTipSetByHeight(ctx context.Context, h abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error) { func (m *ChainModule) ChainGetTipSetByHeight(ctx context.Context, h abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error) {
ts, err := a.Chain.GetTipSetFromKey(tsk) ts, err := m.Chain.GetTipSetFromKey(tsk)
if err != nil { if err != nil {
return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err) return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err)
} }
return a.Chain.GetTipsetByHeight(ctx, h, ts, true) return m.Chain.GetTipsetByHeight(ctx, h, ts, true)
} }
func (a *ChainAPI) ChainReadObj(ctx context.Context, obj cid.Cid) ([]byte, error) { func (a *ChainAPI) ChainReadObj(ctx context.Context, obj cid.Cid) ([]byte, error) {

View File

@ -26,8 +26,27 @@ import (
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
) )
type GasModuleAPI interface {
GasEstimateMessageGas(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec, tsk types.TipSetKey) (*types.Message, error)
}
// GasModule provides a default implementation of GasModuleAPI.
// It can be swapped out with another implementation through Dependency
// Injection (for example with a thin RPC client).
type GasModule struct {
fx.In
Stmgr *stmgr.StateManager
Chain *store.ChainStore
Mpool *messagepool.MessagePool
}
var _ GasModuleAPI = (*GasModule)(nil)
type GasAPI struct { type GasAPI struct {
fx.In fx.In
GasModuleAPI
Stmgr *stmgr.StateManager Stmgr *stmgr.StateManager
Chain *store.ChainStore Chain *store.ChainStore
Mpool *messagepool.MessagePool Mpool *messagepool.MessagePool
@ -36,9 +55,24 @@ type GasAPI struct {
const MinGasPremium = 100e3 const MinGasPremium = 100e3
const MaxSpendOnFeeDenom = 100 const MaxSpendOnFeeDenom = 100
func (a *GasAPI) GasEstimateFeeCap(ctx context.Context, msg *types.Message, maxqueueblks int64, func (a *GasAPI) GasEstimateFeeCap(
tsk types.TipSetKey) (types.BigInt, error) { ctx context.Context,
ts := a.Chain.GetHeaviestTipSet() msg *types.Message,
maxqueueblks int64,
tsk types.TipSetKey,
) (types.BigInt, error) {
return gasEstimateFeeCap(a.Chain, msg, maxqueueblks)
}
func (m *GasModule) GasEstimateFeeCap(
ctx context.Context,
msg *types.Message,
maxqueueblks int64,
tsk types.TipSetKey,
) (types.BigInt, error) {
return gasEstimateFeeCap(m.Chain, msg, maxqueueblks)
}
func gasEstimateFeeCap(cstore *store.ChainStore, msg *types.Message, maxqueueblks int64) (types.BigInt, error) {
ts := cstore.GetHeaviestTipSet()
parentBaseFee := ts.Blocks()[0].ParentBaseFee parentBaseFee := ts.Blocks()[0].ParentBaseFee
increaseFactor := math.Pow(1.+1./float64(build.BaseFeeMaxChangeDenom), float64(maxqueueblks)) increaseFactor := math.Pow(1.+1./float64(build.BaseFeeMaxChangeDenom), float64(maxqueueblks))
@ -82,9 +116,25 @@ func medianGasPremium(prices []gasMeta, blocks int) abi.TokenAmount {
return premium return premium
} }
func (a *GasAPI) GasEstimateGasPremium(ctx context.Context, nblocksincl uint64, func (a *GasAPI) GasEstimateGasPremium(
sender address.Address, gaslimit int64, _ types.TipSetKey) (types.BigInt, error) { ctx context.Context,
nblocksincl uint64,
sender address.Address,
gaslimit int64,
_ types.TipSetKey,
) (types.BigInt, error) {
return gasEstimateGasPremium(a.Chain, nblocksincl)
}
func (m *GasModule) GasEstimateGasPremium(
ctx context.Context,
nblocksincl uint64,
sender address.Address,
gaslimit int64,
_ types.TipSetKey,
) (types.BigInt, error) {
return gasEstimateGasPremium(m.Chain, nblocksincl)
}
func gasEstimateGasPremium(cstore *store.ChainStore, nblocksincl uint64) (types.BigInt, error) {
if nblocksincl == 0 { if nblocksincl == 0 {
nblocksincl = 1 nblocksincl = 1
} }
@ -92,20 +142,20 @@ func (a *GasAPI) GasEstimateGasPremium(ctx context.Context, nblocksincl uint64,
var prices []gasMeta var prices []gasMeta
var blocks int var blocks int
ts := a.Chain.GetHeaviestTipSet() ts := cstore.GetHeaviestTipSet()
for i := uint64(0); i < nblocksincl*2; i++ { for i := uint64(0); i < nblocksincl*2; i++ {
if ts.Height() == 0 { if ts.Height() == 0 {
break // genesis break // genesis
} }
pts, err := a.Chain.LoadTipSet(ts.Parents()) pts, err := cstore.LoadTipSet(ts.Parents())
if err != nil { if err != nil {
return types.BigInt{}, err return types.BigInt{}, err
} }
blocks += len(pts.Blocks()) blocks += len(pts.Blocks())
msgs, err := a.Chain.MessagesForTipset(pts) msgs, err := cstore.MessagesForTipset(pts)
if err != nil { if err != nil {
return types.BigInt{}, xerrors.Errorf("loading messages: %w", err) return types.BigInt{}, xerrors.Errorf("loading messages: %w", err)
} }
@ -142,19 +192,30 @@ func (a *GasAPI) GasEstimateGasPremium(ctx context.Context, nblocksincl uint64,
} }
func (a *GasAPI) GasEstimateGasLimit(ctx context.Context, msgIn *types.Message, _ types.TipSetKey) (int64, error) { func (a *GasAPI) GasEstimateGasLimit(ctx context.Context, msgIn *types.Message, _ types.TipSetKey) (int64, error) {
return gasEstimateGasLimit(ctx, a.Chain, a.Stmgr, a.Mpool, msgIn)
}
func (m *GasModule) GasEstimateGasLimit(ctx context.Context, msgIn *types.Message, _ types.TipSetKey) (int64, error) {
return gasEstimateGasLimit(ctx, m.Chain, m.Stmgr, m.Mpool, msgIn)
}
func gasEstimateGasLimit(
ctx context.Context,
cstore *store.ChainStore,
smgr *stmgr.StateManager,
mpool *messagepool.MessagePool,
msgIn *types.Message,
) (int64, error) {
msg := *msgIn msg := *msgIn
msg.GasLimit = build.BlockGasLimit msg.GasLimit = build.BlockGasLimit
msg.GasFeeCap = types.NewInt(uint64(build.MinimumBaseFee) + 1) msg.GasFeeCap = types.NewInt(uint64(build.MinimumBaseFee) + 1)
msg.GasPremium = types.NewInt(1) msg.GasPremium = types.NewInt(1)
currTs := a.Chain.GetHeaviestTipSet() currTs := cstore.GetHeaviestTipSet()
fromA, err := a.Stmgr.ResolveToKeyAddress(ctx, msgIn.From, currTs) fromA, err := smgr.ResolveToKeyAddress(ctx, msgIn.From, currTs)
if err != nil { if err != nil {
return -1, xerrors.Errorf("getting key address: %w", err) return -1, xerrors.Errorf("getting key address: %w", err)
} }
pending, ts := a.Mpool.PendingFor(fromA) pending, ts := mpool.PendingFor(fromA)
priorMsgs := make([]types.ChainMsg, 0, len(pending)) priorMsgs := make([]types.ChainMsg, 0, len(pending))
for _, m := range pending { for _, m := range pending {
priorMsgs = append(priorMsgs, m) priorMsgs = append(priorMsgs, m)
@ -163,11 +224,11 @@ func (a *GasAPI) GasEstimateGasLimit(ctx context.Context, msgIn *types.Message,
// Try calling until we find a height with no migration. // Try calling until we find a height with no migration.
var res *api.InvocResult var res *api.InvocResult
for { for {
res, err = a.Stmgr.CallWithGas(ctx, &msg, priorMsgs, ts) res, err = smgr.CallWithGas(ctx, &msg, priorMsgs, ts)
if err != stmgr.ErrExpensiveFork { if err != stmgr.ErrExpensiveFork {
break break
} }
ts, err = a.Chain.GetTipSetFromKey(ts.Parents()) ts, err = cstore.GetTipSetFromKey(ts.Parents())
if err != nil { if err != nil {
return -1, xerrors.Errorf("getting parent tipset: %w", err) return -1, xerrors.Errorf("getting parent tipset: %w", err)
} }
@ -180,7 +241,7 @@ func (a *GasAPI) GasEstimateGasLimit(ctx context.Context, msgIn *types.Message,
} }
// Special case for PaymentChannel collect, which is deleting actor // Special case for PaymentChannel collect, which is deleting actor
st, err := a.Stmgr.ParentState(ts) st, err := smgr.ParentState(ts)
if err != nil { if err != nil {
_ = err _ = err
// somewhat ignore it as it can happen and we just want to detect // somewhat ignore it as it can happen and we just want to detect
@ -206,17 +267,17 @@ func (a *GasAPI) GasEstimateGasLimit(ctx context.Context, msgIn *types.Message,
return res.MsgRct.GasUsed + 76e3, nil return res.MsgRct.GasUsed + 76e3, nil
} }
func (a *GasAPI) GasEstimateMessageGas(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec, _ types.TipSetKey) (*types.Message, error) { func (m *GasModule) GasEstimateMessageGas(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec, _ types.TipSetKey) (*types.Message, error) {
if msg.GasLimit == 0 { if msg.GasLimit == 0 {
gasLimit, err := a.GasEstimateGasLimit(ctx, msg, types.TipSetKey{}) gasLimit, err := m.GasEstimateGasLimit(ctx, msg, types.TipSetKey{})
if err != nil { if err != nil {
return nil, xerrors.Errorf("estimating gas used: %w", err) return nil, xerrors.Errorf("estimating gas used: %w", err)
} }
msg.GasLimit = int64(float64(gasLimit) * a.Mpool.GetConfig().GasLimitOverestimation) msg.GasLimit = int64(float64(gasLimit) * m.Mpool.GetConfig().GasLimitOverestimation)
} }
if msg.GasPremium == types.EmptyInt || types.BigCmp(msg.GasPremium, types.NewInt(0)) == 0 { if msg.GasPremium == types.EmptyInt || types.BigCmp(msg.GasPremium, types.NewInt(0)) == 0 {
gasPremium, err := a.GasEstimateGasPremium(ctx, 2, msg.From, msg.GasLimit, types.TipSetKey{}) gasPremium, err := m.GasEstimateGasPremium(ctx, 2, msg.From, msg.GasLimit, types.TipSetKey{})
if err != nil { if err != nil {
return nil, xerrors.Errorf("estimating gas price: %w", err) return nil, xerrors.Errorf("estimating gas price: %w", err)
} }
@ -224,7 +285,7 @@ func (a *GasAPI) GasEstimateMessageGas(ctx context.Context, msg *types.Message,
} }
if msg.GasFeeCap == types.EmptyInt || types.BigCmp(msg.GasFeeCap, types.NewInt(0)) == 0 { if msg.GasFeeCap == types.EmptyInt || types.BigCmp(msg.GasFeeCap, types.NewInt(0)) == 0 {
feeCap, err := a.GasEstimateFeeCap(ctx, msg, 20, types.EmptyTSK) feeCap, err := m.GasEstimateFeeCap(ctx, msg, 20, types.EmptyTSK)
if err != nil { if err != nil {
return nil, xerrors.Errorf("estimating fee cap: %w", err) return nil, xerrors.Errorf("estimating fee cap: %w", err)
} }

View File

@ -10,14 +10,32 @@ import (
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/messagepool"
"github.com/filecoin-project/lotus/chain/messagesigner" "github.com/filecoin-project/lotus/chain/messagesigner"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/dtypes"
) )
type MpoolModuleAPI interface {
MpoolPush(ctx context.Context, smsg *types.SignedMessage) (cid.Cid, error)
}
// MpoolModule provides a default implementation of MpoolModuleAPI.
// It can be swapped out with another implementation through Dependency
// Injection (for example with a thin RPC client).
type MpoolModule struct {
fx.In
Mpool *messagepool.MessagePool
}
var _ MpoolModuleAPI = (*MpoolModule)(nil)
type MpoolAPI struct { type MpoolAPI struct {
fx.In fx.In
MpoolModuleAPI
WalletAPI WalletAPI
GasAPI GasAPI
@ -106,8 +124,8 @@ func (a *MpoolAPI) MpoolClear(ctx context.Context, local bool) error {
return nil return nil
} }
func (a *MpoolAPI) MpoolPush(ctx context.Context, smsg *types.SignedMessage) (cid.Cid, error) { func (m *MpoolModule) MpoolPush(ctx context.Context, smsg *types.SignedMessage) (cid.Cid, error) {
return a.Mpool.Push(smsg) return m.Mpool.Push(smsg)
} }
func (a *MpoolAPI) MpoolPushUntrusted(ctx context.Context, smsg *types.SignedMessage) (cid.Cid, error) { func (a *MpoolAPI) MpoolPushUntrusted(ctx context.Context, smsg *types.SignedMessage) (cid.Cid, error) {
@ -162,7 +180,7 @@ func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message, spe
// Sign and push the message // Sign and push the message
return a.MessageSigner.SignMessage(ctx, msg, func(smsg *types.SignedMessage) error { return a.MessageSigner.SignMessage(ctx, msg, func(smsg *types.SignedMessage) error {
if _, err := a.Mpool.Push(smsg); err != nil { if _, err := a.MpoolModuleAPI.MpoolPush(ctx, smsg); err != nil {
return xerrors.Errorf("mpool push: failed to push message: %w", err) return xerrors.Errorf("mpool push: failed to push message: %w", err)
} }
return nil return nil

View File

@ -23,7 +23,6 @@ import (
type MsigAPI struct { type MsigAPI struct {
fx.In fx.In
WalletAPI WalletAPI
StateAPI StateAPI StateAPI StateAPI
MpoolAPI MpoolAPI MpoolAPI MpoolAPI
} }

View File

@ -42,6 +42,27 @@ import (
"github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/dtypes"
) )
type StateModuleAPI interface {
StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error)
StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error)
StateLookupID(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error)
StateWaitMsg(ctx context.Context, msg cid.Cid, confidence uint64) (*api.MsgLookup, error)
MsigGetAvailableBalance(ctx context.Context, addr address.Address, tsk types.TipSetKey) (types.BigInt, error)
MsigGetVested(ctx context.Context, addr address.Address, start types.TipSetKey, end types.TipSetKey) (types.BigInt, error)
}
// StateModule provides a default implementation of StateModuleAPI.
// It can be swapped out with another implementation through Dependency
// Injection (for example with a thin RPC client).
type StateModule struct {
fx.In
StateManager *stmgr.StateManager
Chain *store.ChainStore
}
var _ StateModuleAPI = (*StateModule)(nil)
type StateAPI struct { type StateAPI struct {
fx.In fx.In
@ -49,6 +70,8 @@ type StateAPI struct {
// API attached to the state API. It probably should live somewhere better // API attached to the state API. It probably should live somewhere better
Wallet *wallet.Wallet Wallet *wallet.Wallet
StateModuleAPI
ProofVerifier ffiwrapper.Verifier ProofVerifier ffiwrapper.Verifier
StateManager *stmgr.StateManager StateManager *stmgr.StateManager
Chain *store.ChainStore Chain *store.ChainStore
@ -349,27 +372,33 @@ func (a *StateAPI) StateReplay(ctx context.Context, tsk types.TipSetKey, mc cid.
}, nil }, nil
} }
func (a *StateAPI) stateForTs(ctx context.Context, ts *types.TipSet) (*state.StateTree, error) { func stateForTs(ctx context.Context, ts *types.TipSet, cstore *store.ChainStore, smgr *stmgr.StateManager) (*state.StateTree, error) {
if ts == nil { if ts == nil {
ts = a.Chain.GetHeaviestTipSet() ts = cstore.GetHeaviestTipSet()
} }
st, _, err := a.StateManager.TipSetState(ctx, ts) st, _, err := smgr.TipSetState(ctx, ts)
if err != nil { if err != nil {
return nil, err return nil, err
} }
buf := bufbstore.NewBufferedBstore(a.Chain.Blockstore()) buf := bufbstore.NewBufferedBstore(cstore.Blockstore())
cst := cbor.NewCborStore(buf) cst := cbor.NewCborStore(buf)
return state.LoadStateTree(cst, st) return state.LoadStateTree(cst, st)
} }
func (a *StateAPI) stateForTs(ctx context.Context, ts *types.TipSet) (*state.StateTree, error) {
return stateForTs(ctx, ts, a.Chain, a.StateManager)
}
func (m *StateModule) stateForTs(ctx context.Context, ts *types.TipSet) (*state.StateTree, error) {
return stateForTs(ctx, ts, m.Chain, m.StateManager)
}
func (a *StateAPI) StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) { func (m *StateModule) StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) {
ts, err := a.Chain.GetTipSetFromKey(tsk) ts, err := m.Chain.GetTipSetFromKey(tsk)
if err != nil { if err != nil {
return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err) return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err)
} }
state, err := a.stateForTs(ctx, ts) state, err := m.stateForTs(ctx, ts)
if err != nil { if err != nil {
return nil, xerrors.Errorf("computing tipset state failed: %w", err) return nil, xerrors.Errorf("computing tipset state failed: %w", err)
} }
@ -377,26 +406,22 @@ func (a *StateAPI) StateGetActor(ctx context.Context, actor address.Address, tsk
return state.GetActor(actor) return state.GetActor(actor)
} }
func (a *StateAPI) StateLookupID(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error) { func (m *StateModule) StateLookupID(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error) {
ts, err := a.Chain.GetTipSetFromKey(tsk) ts, err := m.Chain.GetTipSetFromKey(tsk)
if err != nil {
return address.Undef, xerrors.Errorf("loading tipset %s: %w", tsk, err)
}
state, err := a.stateForTs(ctx, ts)
if err != nil {
return address.Undef, err
}
return state.LookupID(addr)
}
func (a *StateAPI) StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error) {
ts, err := a.Chain.GetTipSetFromKey(tsk)
if err != nil { if err != nil {
return address.Undef, xerrors.Errorf("loading tipset %s: %w", tsk, err) return address.Undef, xerrors.Errorf("loading tipset %s: %w", tsk, err)
} }
return a.StateManager.ResolveToKeyAddress(ctx, addr, ts) return m.StateManager.LookupID(ctx, addr, ts)
}
func (m *StateModule) StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error) {
ts, err := m.Chain.GetTipSetFromKey(tsk)
if err != nil {
return address.Undef, xerrors.Errorf("loading tipset %s: %w", tsk, err)
}
return m.StateManager.ResolveToKeyAddress(ctx, addr, ts)
} }
func (a *StateAPI) StateReadState(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*api.ActorState, error) { func (a *StateAPI) StateReadState(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*api.ActorState, error) {
@ -453,22 +478,28 @@ func (a *StateAPI) MinerCreateBlock(ctx context.Context, bt *api.BlockTemplate)
return &out, nil return &out, nil
} }
func (a *StateAPI) StateWaitMsg(ctx context.Context, msg cid.Cid, confidence uint64) (*api.MsgLookup, error) { func (m *StateModule) StateWaitMsg(ctx context.Context, msg cid.Cid, confidence uint64) (*api.MsgLookup, error) {
ts, recpt, found, err := a.StateManager.WaitForMessage(ctx, msg, confidence) return stateWaitMsgLimited(ctx, m.StateManager, m.Chain, msg, confidence, stmgr.LookbackNoLimit)
}
func (a *StateAPI) StateWaitMsgLimited(ctx context.Context, msg cid.Cid, confidence uint64, lookbackLimit abi.ChainEpoch) (*api.MsgLookup, error) {
return stateWaitMsgLimited(ctx, a.StateManager, a.Chain, msg, confidence, lookbackLimit)
}
func stateWaitMsgLimited(ctx context.Context, smgr *stmgr.StateManager, cstore *store.ChainStore, msg cid.Cid, confidence uint64, lookbackLimit abi.ChainEpoch) (*api.MsgLookup, error) {
ts, recpt, found, err := smgr.WaitForMessage(ctx, msg, confidence, lookbackLimit)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var returndec interface{} var returndec interface{}
if recpt.ExitCode == 0 && len(recpt.Return) > 0 { if recpt.ExitCode == 0 && len(recpt.Return) > 0 {
cmsg, err := a.Chain.GetCMessage(msg) cmsg, err := cstore.GetCMessage(msg)
if err != nil { if err != nil {
return nil, xerrors.Errorf("failed to load message after successful receipt search: %w", err) return nil, xerrors.Errorf("failed to load message after successful receipt search: %w", err)
} }
vmsg := cmsg.VMMessage() vmsg := cmsg.VMMessage()
t, err := stmgr.GetReturnType(ctx, a.StateManager, vmsg.To, vmsg.Method, ts) t, err := stmgr.GetReturnType(ctx, smgr, vmsg.To, vmsg.Method, ts)
if err != nil { if err != nil {
return nil, xerrors.Errorf("failed to get return type: %w", err) return nil, xerrors.Errorf("failed to get return type: %w", err)
} }
@ -799,17 +830,17 @@ func (a *StateAPI) StateCompute(ctx context.Context, height abi.ChainEpoch, msgs
}, nil }, nil
} }
func (a *StateAPI) MsigGetAvailableBalance(ctx context.Context, addr address.Address, tsk types.TipSetKey) (types.BigInt, error) { func (m *StateModule) MsigGetAvailableBalance(ctx context.Context, addr address.Address, tsk types.TipSetKey) (types.BigInt, error) {
ts, err := a.Chain.GetTipSetFromKey(tsk) ts, err := m.Chain.GetTipSetFromKey(tsk)
if err != nil { if err != nil {
return types.EmptyInt, xerrors.Errorf("loading tipset %s: %w", tsk, err) return types.EmptyInt, xerrors.Errorf("loading tipset %s: %w", tsk, err)
} }
act, err := a.StateManager.LoadActor(ctx, addr, ts) act, err := m.StateManager.LoadActor(ctx, addr, ts)
if err != nil { if err != nil {
return types.EmptyInt, xerrors.Errorf("failed to load multisig actor: %w", err) return types.EmptyInt, xerrors.Errorf("failed to load multisig actor: %w", err)
} }
msas, err := multisig.Load(a.Chain.Store(ctx), act) msas, err := multisig.Load(m.Chain.Store(ctx), act)
if err != nil { if err != nil {
return types.EmptyInt, xerrors.Errorf("failed to load multisig actor state: %w", err) return types.EmptyInt, xerrors.Errorf("failed to load multisig actor state: %w", err)
} }
@ -858,13 +889,13 @@ func (a *StateAPI) MsigGetVestingSchedule(ctx context.Context, addr address.Addr
}, nil }, nil
} }
func (a *StateAPI) MsigGetVested(ctx context.Context, addr address.Address, start types.TipSetKey, end types.TipSetKey) (types.BigInt, error) { func (m *StateModule) MsigGetVested(ctx context.Context, addr address.Address, start types.TipSetKey, end types.TipSetKey) (types.BigInt, error) {
startTs, err := a.Chain.GetTipSetFromKey(start) startTs, err := m.Chain.GetTipSetFromKey(start)
if err != nil { if err != nil {
return types.EmptyInt, xerrors.Errorf("loading start tipset %s: %w", start, err) return types.EmptyInt, xerrors.Errorf("loading start tipset %s: %w", start, err)
} }
endTs, err := a.Chain.GetTipSetFromKey(end) endTs, err := m.Chain.GetTipSetFromKey(end)
if err != nil { if err != nil {
return types.EmptyInt, xerrors.Errorf("loading end tipset %s: %w", end, err) return types.EmptyInt, xerrors.Errorf("loading end tipset %s: %w", end, err)
} }
@ -875,12 +906,12 @@ func (a *StateAPI) MsigGetVested(ctx context.Context, addr address.Address, star
return big.Zero(), nil return big.Zero(), nil
} }
act, err := a.StateManager.LoadActor(ctx, addr, endTs) act, err := m.StateManager.LoadActor(ctx, addr, endTs)
if err != nil { if err != nil {
return types.EmptyInt, xerrors.Errorf("failed to load multisig actor at end epoch: %w", err) return types.EmptyInt, xerrors.Errorf("failed to load multisig actor at end epoch: %w", err)
} }
msas, err := multisig.Load(a.Chain.Store(ctx), act) msas, err := multisig.Load(m.Chain.Store(ctx), act)
if err != nil { if err != nil {
return types.EmptyInt, xerrors.Errorf("failed to load multisig actor state: %w", err) return types.EmptyInt, xerrors.Errorf("failed to load multisig actor state: %w", err)
} }

View File

@ -19,7 +19,7 @@ import (
type WalletAPI struct { type WalletAPI struct {
fx.In fx.In
StateManager *stmgr.StateManager StateManagerAPI stmgr.StateManagerAPI
Wallet *wallet.Wallet Wallet *wallet.Wallet
} }
@ -36,7 +36,7 @@ func (a *WalletAPI) WalletList(ctx context.Context) ([]address.Address, error) {
} }
func (a *WalletAPI) WalletBalance(ctx context.Context, addr address.Address) (types.BigInt, error) { func (a *WalletAPI) WalletBalance(ctx context.Context, addr address.Address) (types.BigInt, error) {
act, err := a.StateManager.LoadActorTsk(ctx, addr, types.EmptyTSK) act, err := a.StateManagerAPI.LoadActorTsk(ctx, addr, types.EmptyTSK)
if xerrors.Is(err, types.ErrActorNotFound) { if xerrors.Is(err, types.ErrActorNotFound) {
return big.Zero(), nil return big.Zero(), nil
} else if err != nil { } else if err != nil {
@ -46,7 +46,7 @@ func (a *WalletAPI) WalletBalance(ctx context.Context, addr address.Address) (ty
} }
func (a *WalletAPI) WalletSign(ctx context.Context, k address.Address, msg []byte) (*crypto.Signature, error) { func (a *WalletAPI) WalletSign(ctx context.Context, k address.Address, msg []byte) (*crypto.Signature, error) {
keyAddr, err := a.StateManager.ResolveToKeyAddress(ctx, k, nil) keyAddr, err := a.StateManagerAPI.ResolveToKeyAddress(ctx, k, nil)
if err != nil { if err != nil {
return nil, xerrors.Errorf("failed to resolve ID address: %w", keyAddr) return nil, xerrors.Errorf("failed to resolve ID address: %w", keyAddr)
} }

View File

@ -0,0 +1,33 @@
package modules
import (
"context"
"go.uber.org/fx"
"github.com/filecoin-project/lotus/node/impl/full"
"github.com/filecoin-project/lotus/chain/messagesigner"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/go-address"
)
// MpoolNonceAPI substitutes the mpool nonce with an implementation that
// doesn't rely on the mpool - it just gets the nonce from actor state
type MpoolNonceAPI struct {
fx.In
StateAPI full.StateAPI
}
// GetNonce gets the nonce from actor state
func (a *MpoolNonceAPI) GetNonce(addr address.Address) (uint64, error) {
act, err := a.StateAPI.StateGetActor(context.Background(), addr, types.EmptyTSK)
if err != nil {
return 0, err
}
return act.Nonce, nil
}
var _ messagesigner.MpoolNonceAPI = (*MpoolNonceAPI)(nil)

View File

@ -0,0 +1,33 @@
package modules
import (
"context"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/types"
)
type RPCStateManager struct {
gapi api.GatewayAPI
}
func NewRPCStateManager(api api.GatewayAPI) *RPCStateManager {
return &RPCStateManager{gapi: api}
}
func (s *RPCStateManager) LoadActorTsk(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*types.Actor, error) {
return s.gapi.StateGetActor(ctx, addr, tsk)
}
func (s *RPCStateManager) LookupID(ctx context.Context, addr address.Address, ts *types.TipSet) (address.Address, error) {
return s.gapi.StateLookupID(ctx, addr, ts.Key())
}
func (s *RPCStateManager) ResolveToKeyAddress(ctx context.Context, addr address.Address, ts *types.TipSet) (address.Address, error) {
return s.gapi.StateAccountKey(ctx, addr, ts.Key())
}
var _ stmgr.StateManagerAPI = (*RPCStateManager)(nil)

View File

@ -7,10 +7,13 @@ import (
"io/ioutil" "io/ioutil"
"net" "net"
"net/http/httptest" "net/http/httptest"
"strings"
"sync" "sync"
"testing" "testing"
"time" "time"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-jsonrpc" "github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
@ -117,6 +120,7 @@ func CreateTestStorageNode(ctx context.Context, t *testing.T, waddr address.Addr
if err != nil { if err != nil {
t.Fatalf("failed to construct node: %v", err) t.Fatalf("failed to construct node: %v", err)
} }
t.Cleanup(func() { _ = stop(context.Background()) }) t.Cleanup(func() { _ = stop(context.Background()) })
/*// Bootstrap with full node /*// Bootstrap with full node
@ -137,13 +141,29 @@ func CreateTestStorageNode(ctx context.Context, t *testing.T, waddr address.Addr
return test.TestStorageNode{StorageMiner: minerapi, MineOne: mineOne} return test.TestStorageNode{StorageMiner: minerapi, MineOne: mineOne}
} }
func Builder(t *testing.T, nFull int, storage []test.StorageMiner, opts ...node.Option) ([]test.TestNode, []test.TestStorageNode) { func Builder(t *testing.T, fullOpts []test.FullNodeOpts, storage []test.StorageMiner) ([]test.TestNode, []test.TestStorageNode) {
return mockBuilderOpts(t, fullOpts, storage, false)
}
func MockSbBuilder(t *testing.T, fullOpts []test.FullNodeOpts, storage []test.StorageMiner) ([]test.TestNode, []test.TestStorageNode) {
return mockSbBuilderOpts(t, fullOpts, storage, false)
}
func RPCBuilder(t *testing.T, fullOpts []test.FullNodeOpts, storage []test.StorageMiner) ([]test.TestNode, []test.TestStorageNode) {
return mockBuilderOpts(t, fullOpts, storage, true)
}
func RPCMockSbBuilder(t *testing.T, fullOpts []test.FullNodeOpts, storage []test.StorageMiner) ([]test.TestNode, []test.TestStorageNode) {
return mockSbBuilderOpts(t, fullOpts, storage, true)
}
func mockBuilderOpts(t *testing.T, fullOpts []test.FullNodeOpts, storage []test.StorageMiner, rpc bool) ([]test.TestNode, []test.TestStorageNode) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel) t.Cleanup(cancel)
mn := mocknet.New(ctx) mn := mocknet.New(ctx)
fulls := make([]test.TestNode, nFull) fulls := make([]test.TestNode, len(fullOpts))
storers := make([]test.TestStorageNode, len(storage)) storers := make([]test.TestStorageNode, len(storage))
pk, _, err := crypto.GenerateEd25519Key(rand.Reader) pk, _, err := crypto.GenerateEd25519Key(rand.Reader)
@ -208,7 +228,7 @@ func Builder(t *testing.T, nFull int, storage []test.StorageMiner, opts ...node.
// END PRESEAL SECTION // END PRESEAL SECTION
for i := 0; i < nFull; i++ { for i := 0; i < len(fullOpts); i++ {
var genesis node.Option var genesis node.Option
if i == 0 { if i == 0 {
genesis = node.Override(new(modules.Genesis), testing2.MakeGenesisMem(&genbuf, *templ)) genesis = node.Override(new(modules.Genesis), testing2.MakeGenesisMem(&genbuf, *templ))
@ -217,19 +237,25 @@ func Builder(t *testing.T, nFull int, storage []test.StorageMiner, opts ...node.
} }
stop, err := node.New(ctx, stop, err := node.New(ctx,
node.FullAPI(&fulls[i].FullNode), node.FullAPI(&fulls[i].FullNode, node.Lite(fullOpts[i].Lite)),
node.Online(), node.Online(),
node.Repo(repo.NewMemory(nil)), node.Repo(repo.NewMemory(nil)),
node.MockHost(mn), node.MockHost(mn),
node.Test(), node.Test(),
genesis, genesis,
node.Options(opts...),
fullOpts[i].Opts(fulls),
) )
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
t.Cleanup(func() { _ = stop(context.Background()) }) t.Cleanup(func() { _ = stop(context.Background()) })
if rpc {
fulls[i] = fullRpc(t, fulls[i])
}
} }
for i, def := range storage { for i, def := range storage {
@ -261,6 +287,9 @@ func Builder(t *testing.T, nFull int, storage []test.StorageMiner, opts ...node.
psd := presealDirs[i] psd := presealDirs[i]
*/ */
if rpc {
storers[i] = storerRpc(t, storers[i])
}
} }
if err := mn.LinkAll(); err != nil { if err := mn.LinkAll(); err != nil {
@ -286,11 +315,13 @@ func Builder(t *testing.T, nFull int, storage []test.StorageMiner, opts ...node.
return fulls, storers return fulls, storers
} }
func MockSbBuilder(t *testing.T, nFull int, storage []test.StorageMiner, options ...node.Option) ([]test.TestNode, []test.TestStorageNode) { func mockSbBuilderOpts(t *testing.T, fullOpts []test.FullNodeOpts, storage []test.StorageMiner, rpc bool) ([]test.TestNode, []test.TestStorageNode) {
ctx := context.Background() ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
mn := mocknet.New(ctx) mn := mocknet.New(ctx)
fulls := make([]test.TestNode, nFull) fulls := make([]test.TestNode, len(fullOpts))
storers := make([]test.TestStorageNode, len(storage)) storers := make([]test.TestStorageNode, len(storage))
var genbuf bytes.Buffer var genbuf bytes.Buffer
@ -354,7 +385,7 @@ func MockSbBuilder(t *testing.T, nFull int, storage []test.StorageMiner, options
// END PRESEAL SECTION // END PRESEAL SECTION
for i := 0; i < nFull; i++ { for i := 0; i < len(fullOpts); i++ {
var genesis node.Option var genesis node.Option
if i == 0 { if i == 0 {
genesis = node.Override(new(modules.Genesis), testing2.MakeGenesisMem(&genbuf, *templ)) genesis = node.Override(new(modules.Genesis), testing2.MakeGenesisMem(&genbuf, *templ))
@ -362,10 +393,8 @@ func MockSbBuilder(t *testing.T, nFull int, storage []test.StorageMiner, options
genesis = node.Override(new(modules.Genesis), modules.LoadGenesis(genbuf.Bytes())) genesis = node.Override(new(modules.Genesis), modules.LoadGenesis(genbuf.Bytes()))
} }
var err error
// TODO: Don't ignore stop
stop, err := node.New(ctx, stop, err := node.New(ctx,
node.FullAPI(&fulls[i].FullNode), node.FullAPI(&fulls[i].FullNode, node.Lite(fullOpts[i].Lite)),
node.Online(), node.Online(),
node.Repo(repo.NewMemory(nil)), node.Repo(repo.NewMemory(nil)),
node.MockHost(mn), node.MockHost(mn),
@ -374,12 +403,18 @@ func MockSbBuilder(t *testing.T, nFull int, storage []test.StorageMiner, options
node.Override(new(ffiwrapper.Verifier), mock.MockVerifier), node.Override(new(ffiwrapper.Verifier), mock.MockVerifier),
genesis, genesis,
node.Options(options...),
fullOpts[i].Opts(fulls),
) )
if err != nil { if err != nil {
t.Fatalf("%+v", err) t.Fatalf("%+v", err)
} }
t.Cleanup(func() { _ = stop(context.Background()) }) t.Cleanup(func() { _ = stop(context.Background()) })
if rpc {
fulls[i] = fullRpc(t, fulls[i])
}
} }
for i, def := range storage { for i, def := range storage {
@ -414,6 +449,10 @@ func MockSbBuilder(t *testing.T, nFull int, storage []test.StorageMiner, options
node.Override(new(ffiwrapper.Verifier), mock.MockVerifier), node.Override(new(ffiwrapper.Verifier), mock.MockVerifier),
node.Unset(new(*sectorstorage.Manager)), node.Unset(new(*sectorstorage.Manager)),
)) ))
if rpc {
storers[i] = storerRpc(t, storers[i])
}
} }
if err := mn.LinkAll(); err != nil { if err := mn.LinkAll(); err != nil {
@ -438,69 +477,66 @@ func MockSbBuilder(t *testing.T, nFull int, storage []test.StorageMiner, options
return fulls, storers return fulls, storers
} }
func RPCBuilder(t *testing.T, nFull int, storage []test.StorageMiner, opts ...node.Option) ([]test.TestNode, []test.TestStorageNode) { func fullRpc(t *testing.T, nd test.TestNode) test.TestNode {
return rpcWithBuilder(t, Builder, nFull, storage, opts...) ma, listenAddr, err := CreateRPCServer(nd)
require.NoError(t, err)
var full test.TestNode
full.FullNode, _, err = client.NewFullNodeRPC(context.Background(), listenAddr, nil)
require.NoError(t, err)
full.ListenAddr = ma
return full
} }
func RPCMockSbBuilder(t *testing.T, nFull int, storage []test.StorageMiner, opts ...node.Option) ([]test.TestNode, []test.TestStorageNode) { func storerRpc(t *testing.T, nd test.TestStorageNode) test.TestStorageNode {
return rpcWithBuilder(t, MockSbBuilder, nFull, storage, opts...) ma, listenAddr, err := CreateRPCServer(nd)
require.NoError(t, err)
var storer test.TestStorageNode
storer.StorageMiner, _, err = client.NewStorageMinerRPC(context.Background(), listenAddr, nil)
require.NoError(t, err)
storer.ListenAddr = ma
storer.MineOne = nd.MineOne
return storer
} }
func rpcWithBuilder(t *testing.T, b test.APIBuilder, nFull int, storage []test.StorageMiner, opts ...node.Option) ([]test.TestNode, []test.TestStorageNode) { func CreateRPCServer(handler interface{}) (multiaddr.Multiaddr, string, error) {
fullApis, storaApis := b(t, nFull, storage, opts...)
fulls := make([]test.TestNode, nFull)
storers := make([]test.TestStorageNode, len(storage))
for i, a := range fullApis {
rpcServer := jsonrpc.NewServer() rpcServer := jsonrpc.NewServer()
rpcServer.Register("Filecoin", a) rpcServer.Register("Filecoin", handler)
testServ := httptest.NewServer(rpcServer) // todo: close testServ := httptest.NewServer(rpcServer) // todo: close
addr := testServ.Listener.Addr() addr := testServ.Listener.Addr()
listenAddr := "ws://" + addr.String() listenAddr := "ws://" + addr.String()
var err error ma, err := parseWSMultiAddr(addr)
fulls[i].FullNode, _, err = client.NewFullNodeRPC(context.Background(), listenAddr, nil)
if err != nil { if err != nil {
t.Fatal(err) return nil, "", err
} }
ma, err := parseWSSMultiAddr(addr) return ma, listenAddr, err
if err != nil {
t.Fatal(err)
}
fulls[i].ListenAddr = ma
} }
for i, a := range storaApis { func parseWSMultiAddr(addr net.Addr) (multiaddr.Multiaddr, error) {
rpcServer := jsonrpc.NewServer()
rpcServer.Register("Filecoin", a)
testServ := httptest.NewServer(rpcServer) // todo: close
addr := testServ.Listener.Addr()
listenAddr := "ws://" + addr.String()
var err error
storers[i].StorageMiner, _, err = client.NewStorageMinerRPC(context.Background(), listenAddr, nil)
if err != nil {
t.Fatal(err)
}
ma, err := parseWSSMultiAddr(addr)
if err != nil {
t.Fatal(err)
}
storers[i].ListenAddr = ma
storers[i].MineOne = a.MineOne
}
return fulls, storers
}
func parseWSSMultiAddr(addr net.Addr) (multiaddr.Multiaddr, error) {
host, port, err := net.SplitHostPort(addr.String()) host, port, err := net.SplitHostPort(addr.String())
if err != nil { if err != nil {
return nil, err return nil, err
} }
ma, err := multiaddr.NewMultiaddr("/ip4/" + host + "/" + addr.Network() + "/" + port + "/wss") ma, err := multiaddr.NewMultiaddr("/ip4/" + host + "/" + addr.Network() + "/" + port + "/ws")
if err != nil { if err != nil {
return nil, err return nil, err
} }
return ma, nil return ma, nil
} }
func WSMultiAddrToString(addr multiaddr.Multiaddr) (string, error) {
parts := strings.Split(addr.String(), "/")
if len(parts) != 6 || parts[0] != "" {
return "", xerrors.Errorf("Malformed ws multiaddr %s", addr)
}
host := parts[2]
port := parts[4]
proto := parts[5]
return proto + "://" + host + ":" + port + "/rpc/v0", nil
}