Merge branch 'master' into asr/spec-v1

This commit is contained in:
Steven Allen 2020-10-05 10:29:09 -07:00
commit b6500beaab
11 changed files with 151 additions and 65 deletions

View File

@ -530,6 +530,7 @@ type DealInfo struct {
DealID abi.DealID
CreationTime time.Time
Verified bool
}
type MsgLookup struct {

View File

@ -3,6 +3,7 @@ package messagesigner
import (
"bytes"
"context"
"sync"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/chain/messagepool"
@ -16,7 +17,7 @@ import (
"golang.org/x/xerrors"
)
const dsKeyActorNonce = "ActorNonce"
const dsKeyActorNonce = "ActorNextNonce"
var log = logging.Logger("messagesigner")
@ -28,6 +29,7 @@ type mpoolAPI interface {
// when signing a message
type MessageSigner struct {
wallet *wallet.Wallet
lk sync.Mutex
mpool mpoolAPI
ds datastore.Batching
}
@ -47,25 +49,42 @@ func newMessageSigner(wallet *wallet.Wallet, mpool mpoolAPI, ds dtypes.MetadataD
// SignMessage increments the nonce for the message From address, and signs
// the message
func (ms *MessageSigner) SignMessage(ctx context.Context, msg *types.Message) (*types.SignedMessage, error) {
func (ms *MessageSigner) SignMessage(ctx context.Context, msg *types.Message, cb func(*types.SignedMessage) error) (*types.SignedMessage, error) {
ms.lk.Lock()
defer ms.lk.Unlock()
// Get the next message nonce
nonce, err := ms.nextNonce(msg.From)
if err != nil {
return nil, xerrors.Errorf("failed to create nonce: %w", err)
}
// Sign the message with the nonce
msg.Nonce = nonce
sig, err := ms.wallet.Sign(ctx, msg.From, msg.Cid().Bytes())
if err != nil {
return nil, xerrors.Errorf("failed to sign message: %w", err)
}
return &types.SignedMessage{
// Callback with the signed message
smsg := &types.SignedMessage{
Message: *msg,
Signature: *sig,
}, nil
}
err = cb(smsg)
if err != nil {
return nil, err
}
// If the callback executed successfully, write the nonce to the datastore
if err := ms.saveNonce(msg.From, nonce); err != nil {
return nil, xerrors.Errorf("failed to save nonce: %w", err)
}
return smsg, nil
}
// nextNonce increments the nonce.
// nextNonce gets the next nonce for the given address.
// If there is no nonce in the datastore, gets the nonce from the message pool.
func (ms *MessageSigner) nextNonce(addr address.Address) (uint64, error) {
// Nonces used to be created by the mempool and we need to support nodes
@ -77,21 +96,22 @@ func (ms *MessageSigner) nextNonce(addr address.Address) (uint64, error) {
return 0, xerrors.Errorf("failed to get nonce from mempool: %w", err)
}
// Get the nonce for this address from the datastore
addrNonceKey := datastore.KeyWithNamespaces([]string{dsKeyActorNonce, addr.String()})
// Get the next nonce for this address from the datastore
addrNonceKey := ms.dstoreKey(addr)
dsNonceBytes, err := ms.ds.Get(addrNonceKey)
switch {
case xerrors.Is(err, datastore.ErrNotFound):
// If a nonce for this address hasn't yet been created in the
// datastore, just use the nonce from the mempool
return nonce, nil
case err != nil:
return 0, xerrors.Errorf("failed to get nonce from datastore: %w", err)
default:
// There is a nonce in the datastore, so unmarshall and increment it
maj, val, err := cbg.CborReadHeader(bytes.NewReader(dsNonceBytes))
// There is a nonce in the datastore, so unmarshall it
maj, dsNonce, err := cbg.CborReadHeader(bytes.NewReader(dsNonceBytes))
if err != nil {
return 0, xerrors.Errorf("failed to parse nonce from datastore: %w", err)
}
@ -99,26 +119,37 @@ func (ms *MessageSigner) nextNonce(addr address.Address) (uint64, error) {
return 0, xerrors.Errorf("bad cbor type parsing nonce from datastore")
}
dsNonce := val + 1
// The message pool nonce should be <= than the datastore nonce
if nonce <= dsNonce {
nonce = dsNonce
} else {
log.Warnf("mempool nonce was larger than datastore nonce (%d > %d)", nonce, dsNonce)
}
}
// Write the nonce for this address to the datastore
return nonce, nil
}
}
// saveNonce increments the nonce for this address and writes it to the
// datastore
func (ms *MessageSigner) saveNonce(addr address.Address, nonce uint64) error {
// Increment the nonce
nonce++
// Write the nonce to the datastore
addrNonceKey := ms.dstoreKey(addr)
buf := bytes.Buffer{}
_, err = buf.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, nonce))
_, err := buf.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, nonce))
if err != nil {
return 0, xerrors.Errorf("failed to marshall nonce: %w", err)
return xerrors.Errorf("failed to marshall nonce: %w", err)
}
err = ms.ds.Put(addrNonceKey, buf.Bytes())
if err != nil {
return 0, xerrors.Errorf("failed to write nonce to datastore: %w", err)
return xerrors.Errorf("failed to write nonce to datastore: %w", err)
}
return nonce, nil
return nil
}
func (ms *MessageSigner) dstoreKey(addr address.Address) datastore.Key {
return datastore.KeyWithNamespaces([]string{dsKeyActorNonce, addr.String()})
}

View File

@ -5,6 +5,8 @@ import (
"sync"
"testing"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/chain/wallet"
"github.com/filecoin-project/go-state-types/crypto"
@ -58,6 +60,7 @@ func TestMessageSignerSignMessage(t *testing.T) {
msg *types.Message
mpoolNonce [1]uint64
expNonce uint64
cbErr error
}
tests := []struct {
name string
@ -137,6 +140,37 @@ func TestMessageSignerSignMessage(t *testing.T) {
},
expNonce: 2,
}},
}, {
name: "recover from callback error",
msgs: []msgSpec{{
// No nonce yet in datastore
msg: &types.Message{
To: to1,
From: from1,
},
expNonce: 0,
}, {
// Increment nonce
msg: &types.Message{
To: to1,
From: from1,
},
expNonce: 1,
}, {
// Callback returns error
msg: &types.Message{
To: to1,
From: from1,
},
cbErr: xerrors.Errorf("err"),
}, {
// Callback successful, should increment nonce in datastore
msg: &types.Message{
To: to1,
From: from1,
},
expNonce: 2,
}},
}}
for _, tt := range tests {
tt := tt
@ -149,9 +183,18 @@ func TestMessageSignerSignMessage(t *testing.T) {
if len(m.mpoolNonce) == 1 {
mpool.setNonce(m.msg.From, m.mpoolNonce[0])
}
smsg, err := ms.SignMessage(ctx, m.msg)
require.NoError(t, err)
require.Equal(t, m.expNonce, smsg.Message.Nonce)
merr := m.cbErr
smsg, err := ms.SignMessage(ctx, m.msg, func(message *types.SignedMessage) error {
return merr
})
if m.cbErr != nil {
require.Error(t, err)
require.Nil(t, smsg)
} else {
require.NoError(t, err)
require.Equal(t, m.expNonce, smsg.Message.Nonce)
}
}
})
}

View File

@ -259,6 +259,18 @@ func (sm *StateManager) ApplyBlocks(ctx context.Context, parentEpoch abi.ChainEp
}
for i := parentEpoch; i < epoch; i++ {
if i > parentEpoch {
// run cron for null rounds if any
if err := runCron(); err != nil {
return cid.Undef, cid.Undef, err
}
pstate, err = vmi.Flush(ctx)
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("flushing vm: %w", err)
}
}
// handle state forks
// XXX: The state tree
newState, err := sm.handleStateForks(ctx, pstate, i, cb, ts)
@ -273,18 +285,6 @@ func (sm *StateManager) ApplyBlocks(ctx context.Context, parentEpoch abi.ChainEp
}
}
if i > parentEpoch {
// run cron for null rounds if any
if err := runCron(); err != nil {
return cid.Cid{}, cid.Cid{}, err
}
newState, err = vmi.Flush(ctx)
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("flushing vm: %w", err)
}
}
vmi.SetBlockHeight(i + 1)
pstate = newState
}

View File

@ -44,7 +44,11 @@ var log = logging.Logger("sub")
var ErrSoftFailure = errors.New("soft validation failure")
var ErrInsufficientPower = errors.New("incoming block's miner does not have minimum power")
func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *chain.Syncer, bserv bserv.BlockService, cmgr connmgr.ConnManager) {
func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *chain.Syncer, bs bserv.BlockService, cmgr connmgr.ConnManager) {
// Timeout after (block time + propagation delay). This is useless at
// this point.
timeout := time.Duration(build.BlockDelaySecs+build.PropagationDelaySecs) * time.Second
for {
msg, err := bsub.Next(ctx)
if err != nil {
@ -65,15 +69,22 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha
src := msg.GetFrom()
go func() {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
// NOTE: we could also share a single session between
// all requests but that may have other consequences.
ses := bserv.NewSession(ctx, bs)
start := build.Clock.Now()
log.Debug("about to fetch messages for block from pubsub")
bmsgs, err := FetchMessagesByCids(context.TODO(), bserv, blk.BlsMessages)
bmsgs, err := FetchMessagesByCids(ctx, ses, blk.BlsMessages)
if err != nil {
log.Errorf("failed to fetch all bls messages for block received over pubusb: %s; source: %s", err, src)
return
}
smsgs, err := FetchSignedMessagesByCids(context.TODO(), bserv, blk.SecpkMessages)
smsgs, err := FetchSignedMessagesByCids(ctx, ses, blk.SecpkMessages)
if err != nil {
log.Errorf("failed to fetch all secpk messages for block received over pubusb: %s; source: %s", err, src)
return
@ -98,7 +109,7 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha
func FetchMessagesByCids(
ctx context.Context,
bserv bserv.BlockService,
bserv bserv.BlockGetter,
cids []cid.Cid,
) ([]*types.Message, error) {
out := make([]*types.Message, len(cids))
@ -127,7 +138,7 @@ func FetchMessagesByCids(
// FIXME: Duplicate of above.
func FetchSignedMessagesByCids(
ctx context.Context,
bserv bserv.BlockService,
bserv bserv.BlockGetter,
cids []cid.Cid,
) ([]*types.SignedMessage, error) {
out := make([]*types.SignedMessage, len(cids))
@ -157,12 +168,11 @@ func FetchSignedMessagesByCids(
// blocks we did not request.
func fetchCids(
ctx context.Context,
bserv bserv.BlockService,
bserv bserv.BlockGetter,
cids []cid.Cid,
cb func(int, blocks.Block) error,
) error {
// FIXME: Why don't we use the context here?
fetchedBlocks := bserv.GetBlocks(context.TODO(), cids)
fetchedBlocks := bserv.GetBlocks(ctx, cids)
cidIndex := make(map[cid.Cid]int)
for i, c := range cids {

View File

@ -164,12 +164,6 @@ var setAskCmd = &cli.Command{
Usage: "Set the price of the ask for verified deals (specified as attoFIL / GiB / Epoch) to `PRICE`",
Required: true,
},
&cli.StringFlag{
Name: "duration",
Usage: "Set duration of ask (a quantity of time after which the ask expires) `DURATION`",
DefaultText: "720h0m0s",
Value: "720h0m0s",
},
&cli.StringFlag{
Name: "min-piece-size",
Usage: "Set minimum piece size (w/bit-padding, in bytes) in ask to `SIZE`",
@ -194,7 +188,7 @@ var setAskCmd = &cli.Command{
pri := types.NewInt(cctx.Uint64("price"))
vpri := types.NewInt(cctx.Uint64("verified-price"))
dur, err := time.ParseDuration(cctx.String("duration"))
dur, err := time.ParseDuration("720h0m0s")
if err != nil {
return xerrors.Errorf("cannot parse duration: %w", err)
}

View File

@ -956,7 +956,8 @@ Response:
"PricePerEpoch": "0",
"Duration": 42,
"DealID": 5432,
"CreationTime": "0001-01-01T00:00:00Z"
"CreationTime": "0001-01-01T00:00:00Z",
"Verified": true
}
```
@ -992,7 +993,8 @@ Response:
"PricePerEpoch": "0",
"Duration": 42,
"DealID": 5432,
"CreationTime": "0001-01-01T00:00:00Z"
"CreationTime": "0001-01-01T00:00:00Z",
"Verified": true
}
```

2
go.mod
View File

@ -23,7 +23,7 @@ require (
github.com/fatih/color v1.8.0
github.com/filecoin-project/filecoin-ffi v0.30.4-0.20200716204036-cddc56607e1d
github.com/filecoin-project/go-address v0.0.4
github.com/filecoin-project/go-bitfield v0.2.1-0.20200920172649-837cbe6a1ed3
github.com/filecoin-project/go-bitfield v0.2.1
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03
github.com/filecoin-project/go-data-transfer v0.6.7

4
go.sum
View File

@ -230,8 +230,8 @@ github.com/filecoin-project/go-amt-ipld/v2 v2.1.0 h1:t6qDiuGYYngDqaLc2ZUvdtAg4UN
github.com/filecoin-project/go-amt-ipld/v2 v2.1.0/go.mod h1:nfFPoGyX0CU9SkXX8EoCcSuHN1XcbN0c6KBh7yvP5fs=
github.com/filecoin-project/go-bitfield v0.2.0 h1:gCtLcjskIPtdg4NfN7gQZSQF9yrBQ7mkT0qCJxzGI2Q=
github.com/filecoin-project/go-bitfield v0.2.0/go.mod h1:CNl9WG8hgR5mttCnUErjcQjGvuiZjRqK9rHVBsQF4oM=
github.com/filecoin-project/go-bitfield v0.2.1-0.20200920172649-837cbe6a1ed3 h1:HQa4+yCYsLq1TLM0kopeAhSCLbtZ541cWEi5N5rO+9g=
github.com/filecoin-project/go-bitfield v0.2.1-0.20200920172649-837cbe6a1ed3/go.mod h1:CNl9WG8hgR5mttCnUErjcQjGvuiZjRqK9rHVBsQF4oM=
github.com/filecoin-project/go-bitfield v0.2.1 h1:S6Uuqcspqu81sWJ0He4OAfFLm1tSwPdVjtKTkl5m/xQ=
github.com/filecoin-project/go-bitfield v0.2.1/go.mod h1:CNl9WG8hgR5mttCnUErjcQjGvuiZjRqK9rHVBsQF4oM=
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2 h1:av5fw6wmm58FYMgJeoB/lK9XXrgdugYiTqkdxjTy9k8=
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2/go.mod h1:pqTiPHobNkOVM5thSRsHYjyQfq7O5QSCMhvuu9JoDlg=
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus=

View File

@ -116,7 +116,13 @@ func (a *API) ClientStartDeal(ctx context.Context, params *api.StartDealParams)
}
}
}
exist, err := a.WalletHas(ctx, params.Wallet)
walletKey, err := a.StateAPI.StateManager.ResolveToKeyAddress(ctx, params.Wallet, nil)
if err != nil {
return nil, xerrors.Errorf("failed resolving params.Wallet addr: %w", params.Wallet)
}
exist, err := a.WalletHas(ctx, walletKey)
if err != nil {
return nil, xerrors.Errorf("failed getting addr from wallet: %w", params.Wallet)
}
@ -199,6 +205,7 @@ func (a *API) ClientListDeals(ctx context.Context) ([]api.DealInfo, error) {
Duration: uint64(v.Proposal.Duration()),
DealID: v.DealID,
CreationTime: v.CreationTime.Time(),
Verified: v.Proposal.VerifiedDeal,
}
}
@ -222,6 +229,7 @@ func (a *API) ClientGetDealInfo(ctx context.Context, d cid.Cid) (*api.DealInfo,
Duration: uint64(v.Proposal.Duration()),
DealID: v.DealID,
CreationTime: v.CreationTime.Time(),
Verified: v.Proposal.VerifiedDeal,
}, nil
}
@ -847,6 +855,7 @@ func newDealInfo(v storagemarket.ClientDeal) api.DealInfo {
Duration: uint64(v.Proposal.Duration()),
DealID: v.DealID,
CreationTime: v.CreationTime.Time(),
Verified: v.Proposal.VerifiedDeal,
}
}

View File

@ -160,17 +160,13 @@ func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message, spe
return nil, xerrors.Errorf("mpool push: not enough funds: %s < %s", b, msg.Value)
}
smsg, err := a.MessageSigner.SignMessage(ctx, msg)
if err != nil {
return nil, xerrors.Errorf("mpool push: failed to sign message: %w", err)
}
_, err = a.Mpool.Push(smsg)
if err != nil {
return nil, xerrors.Errorf("mpool push: failed to push message: %w", err)
}
return smsg, err
// Sign and push the message
return a.MessageSigner.SignMessage(ctx, msg, func(smsg *types.SignedMessage) error {
if _, err := a.Mpool.Push(smsg); err != nil {
return xerrors.Errorf("mpool push: failed to push message: %w", err)
}
return nil
})
}
func (a *MpoolAPI) MpoolGetNonce(ctx context.Context, addr address.Address) (uint64, error) {