diff --git a/api/api_full.go b/api/api_full.go index ceb0518b8..5cbdde8e3 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -530,6 +530,7 @@ type DealInfo struct { DealID abi.DealID CreationTime time.Time + Verified bool } type MsgLookup struct { diff --git a/chain/messagesigner/messagesigner.go b/chain/messagesigner/messagesigner.go index 1ad83543b..ac94d6a3e 100644 --- a/chain/messagesigner/messagesigner.go +++ b/chain/messagesigner/messagesigner.go @@ -3,6 +3,7 @@ package messagesigner import ( "bytes" "context" + "sync" "github.com/filecoin-project/go-address" "github.com/filecoin-project/lotus/chain/messagepool" @@ -16,7 +17,7 @@ import ( "golang.org/x/xerrors" ) -const dsKeyActorNonce = "ActorNonce" +const dsKeyActorNonce = "ActorNextNonce" var log = logging.Logger("messagesigner") @@ -28,6 +29,7 @@ type mpoolAPI interface { // when signing a message type MessageSigner struct { wallet *wallet.Wallet + lk sync.Mutex mpool mpoolAPI ds datastore.Batching } @@ -47,25 +49,42 @@ func newMessageSigner(wallet *wallet.Wallet, mpool mpoolAPI, ds dtypes.MetadataD // SignMessage increments the nonce for the message From address, and signs // the message -func (ms *MessageSigner) SignMessage(ctx context.Context, msg *types.Message) (*types.SignedMessage, error) { +func (ms *MessageSigner) SignMessage(ctx context.Context, msg *types.Message, cb func(*types.SignedMessage) error) (*types.SignedMessage, error) { + ms.lk.Lock() + defer ms.lk.Unlock() + + // Get the next message nonce nonce, err := ms.nextNonce(msg.From) if err != nil { return nil, xerrors.Errorf("failed to create nonce: %w", err) } + // Sign the message with the nonce msg.Nonce = nonce sig, err := ms.wallet.Sign(ctx, msg.From, msg.Cid().Bytes()) if err != nil { return nil, xerrors.Errorf("failed to sign message: %w", err) } - return &types.SignedMessage{ + // Callback with the signed message + smsg := &types.SignedMessage{ Message: *msg, Signature: *sig, - }, nil + } + err = cb(smsg) + if err != nil { + return nil, err + } + + // If the callback executed successfully, write the nonce to the datastore + if err := ms.saveNonce(msg.From, nonce); err != nil { + return nil, xerrors.Errorf("failed to save nonce: %w", err) + } + + return smsg, nil } -// nextNonce increments the nonce. +// nextNonce gets the next nonce for the given address. // If there is no nonce in the datastore, gets the nonce from the message pool. func (ms *MessageSigner) nextNonce(addr address.Address) (uint64, error) { // Nonces used to be created by the mempool and we need to support nodes @@ -77,21 +96,22 @@ func (ms *MessageSigner) nextNonce(addr address.Address) (uint64, error) { return 0, xerrors.Errorf("failed to get nonce from mempool: %w", err) } - // Get the nonce for this address from the datastore - addrNonceKey := datastore.KeyWithNamespaces([]string{dsKeyActorNonce, addr.String()}) + // Get the next nonce for this address from the datastore + addrNonceKey := ms.dstoreKey(addr) dsNonceBytes, err := ms.ds.Get(addrNonceKey) switch { case xerrors.Is(err, datastore.ErrNotFound): // If a nonce for this address hasn't yet been created in the // datastore, just use the nonce from the mempool + return nonce, nil case err != nil: return 0, xerrors.Errorf("failed to get nonce from datastore: %w", err) default: - // There is a nonce in the datastore, so unmarshall and increment it - maj, val, err := cbg.CborReadHeader(bytes.NewReader(dsNonceBytes)) + // There is a nonce in the datastore, so unmarshall it + maj, dsNonce, err := cbg.CborReadHeader(bytes.NewReader(dsNonceBytes)) if err != nil { return 0, xerrors.Errorf("failed to parse nonce from datastore: %w", err) } @@ -99,26 +119,37 @@ func (ms *MessageSigner) nextNonce(addr address.Address) (uint64, error) { return 0, xerrors.Errorf("bad cbor type parsing nonce from datastore") } - dsNonce := val + 1 - // The message pool nonce should be <= than the datastore nonce if nonce <= dsNonce { nonce = dsNonce } else { log.Warnf("mempool nonce was larger than datastore nonce (%d > %d)", nonce, dsNonce) } - } - // Write the nonce for this address to the datastore + return nonce, nil + } +} + +// saveNonce increments the nonce for this address and writes it to the +// datastore +func (ms *MessageSigner) saveNonce(addr address.Address, nonce uint64) error { + // Increment the nonce + nonce++ + + // Write the nonce to the datastore + addrNonceKey := ms.dstoreKey(addr) buf := bytes.Buffer{} - _, err = buf.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, nonce)) + _, err := buf.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, nonce)) if err != nil { - return 0, xerrors.Errorf("failed to marshall nonce: %w", err) + return xerrors.Errorf("failed to marshall nonce: %w", err) } err = ms.ds.Put(addrNonceKey, buf.Bytes()) if err != nil { - return 0, xerrors.Errorf("failed to write nonce to datastore: %w", err) + return xerrors.Errorf("failed to write nonce to datastore: %w", err) } - - return nonce, nil + return nil +} + +func (ms *MessageSigner) dstoreKey(addr address.Address) datastore.Key { + return datastore.KeyWithNamespaces([]string{dsKeyActorNonce, addr.String()}) } diff --git a/chain/messagesigner/messagesigner_test.go b/chain/messagesigner/messagesigner_test.go index 55676b258..04869ff6d 100644 --- a/chain/messagesigner/messagesigner_test.go +++ b/chain/messagesigner/messagesigner_test.go @@ -5,6 +5,8 @@ import ( "sync" "testing" + "golang.org/x/xerrors" + "github.com/filecoin-project/lotus/chain/wallet" "github.com/filecoin-project/go-state-types/crypto" @@ -58,6 +60,7 @@ func TestMessageSignerSignMessage(t *testing.T) { msg *types.Message mpoolNonce [1]uint64 expNonce uint64 + cbErr error } tests := []struct { name string @@ -137,6 +140,37 @@ func TestMessageSignerSignMessage(t *testing.T) { }, expNonce: 2, }}, + }, { + name: "recover from callback error", + msgs: []msgSpec{{ + // No nonce yet in datastore + msg: &types.Message{ + To: to1, + From: from1, + }, + expNonce: 0, + }, { + // Increment nonce + msg: &types.Message{ + To: to1, + From: from1, + }, + expNonce: 1, + }, { + // Callback returns error + msg: &types.Message{ + To: to1, + From: from1, + }, + cbErr: xerrors.Errorf("err"), + }, { + // Callback successful, should increment nonce in datastore + msg: &types.Message{ + To: to1, + From: from1, + }, + expNonce: 2, + }}, }} for _, tt := range tests { tt := tt @@ -149,9 +183,18 @@ func TestMessageSignerSignMessage(t *testing.T) { if len(m.mpoolNonce) == 1 { mpool.setNonce(m.msg.From, m.mpoolNonce[0]) } - smsg, err := ms.SignMessage(ctx, m.msg) - require.NoError(t, err) - require.Equal(t, m.expNonce, smsg.Message.Nonce) + merr := m.cbErr + smsg, err := ms.SignMessage(ctx, m.msg, func(message *types.SignedMessage) error { + return merr + }) + + if m.cbErr != nil { + require.Error(t, err) + require.Nil(t, smsg) + } else { + require.NoError(t, err) + require.Equal(t, m.expNonce, smsg.Message.Nonce) + } } }) } diff --git a/chain/stmgr/stmgr.go b/chain/stmgr/stmgr.go index 91109d9bd..e371992e8 100644 --- a/chain/stmgr/stmgr.go +++ b/chain/stmgr/stmgr.go @@ -259,6 +259,18 @@ func (sm *StateManager) ApplyBlocks(ctx context.Context, parentEpoch abi.ChainEp } for i := parentEpoch; i < epoch; i++ { + if i > parentEpoch { + // run cron for null rounds if any + if err := runCron(); err != nil { + return cid.Undef, cid.Undef, err + } + + pstate, err = vmi.Flush(ctx) + if err != nil { + return cid.Undef, cid.Undef, xerrors.Errorf("flushing vm: %w", err) + } + } + // handle state forks // XXX: The state tree newState, err := sm.handleStateForks(ctx, pstate, i, cb, ts) @@ -273,18 +285,6 @@ func (sm *StateManager) ApplyBlocks(ctx context.Context, parentEpoch abi.ChainEp } } - if i > parentEpoch { - // run cron for null rounds if any - if err := runCron(); err != nil { - return cid.Cid{}, cid.Cid{}, err - } - - newState, err = vmi.Flush(ctx) - if err != nil { - return cid.Undef, cid.Undef, xerrors.Errorf("flushing vm: %w", err) - } - } - vmi.SetBlockHeight(i + 1) pstate = newState } diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index eb17b078d..53bc3d68e 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -44,7 +44,11 @@ var log = logging.Logger("sub") var ErrSoftFailure = errors.New("soft validation failure") var ErrInsufficientPower = errors.New("incoming block's miner does not have minimum power") -func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *chain.Syncer, bserv bserv.BlockService, cmgr connmgr.ConnManager) { +func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *chain.Syncer, bs bserv.BlockService, cmgr connmgr.ConnManager) { + // Timeout after (block time + propagation delay). This is useless at + // this point. + timeout := time.Duration(build.BlockDelaySecs+build.PropagationDelaySecs) * time.Second + for { msg, err := bsub.Next(ctx) if err != nil { @@ -65,15 +69,22 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha src := msg.GetFrom() go func() { + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + // NOTE: we could also share a single session between + // all requests but that may have other consequences. + ses := bserv.NewSession(ctx, bs) + start := build.Clock.Now() log.Debug("about to fetch messages for block from pubsub") - bmsgs, err := FetchMessagesByCids(context.TODO(), bserv, blk.BlsMessages) + bmsgs, err := FetchMessagesByCids(ctx, ses, blk.BlsMessages) if err != nil { log.Errorf("failed to fetch all bls messages for block received over pubusb: %s; source: %s", err, src) return } - smsgs, err := FetchSignedMessagesByCids(context.TODO(), bserv, blk.SecpkMessages) + smsgs, err := FetchSignedMessagesByCids(ctx, ses, blk.SecpkMessages) if err != nil { log.Errorf("failed to fetch all secpk messages for block received over pubusb: %s; source: %s", err, src) return @@ -98,7 +109,7 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha func FetchMessagesByCids( ctx context.Context, - bserv bserv.BlockService, + bserv bserv.BlockGetter, cids []cid.Cid, ) ([]*types.Message, error) { out := make([]*types.Message, len(cids)) @@ -127,7 +138,7 @@ func FetchMessagesByCids( // FIXME: Duplicate of above. func FetchSignedMessagesByCids( ctx context.Context, - bserv bserv.BlockService, + bserv bserv.BlockGetter, cids []cid.Cid, ) ([]*types.SignedMessage, error) { out := make([]*types.SignedMessage, len(cids)) @@ -157,12 +168,11 @@ func FetchSignedMessagesByCids( // blocks we did not request. func fetchCids( ctx context.Context, - bserv bserv.BlockService, + bserv bserv.BlockGetter, cids []cid.Cid, cb func(int, blocks.Block) error, ) error { - // FIXME: Why don't we use the context here? - fetchedBlocks := bserv.GetBlocks(context.TODO(), cids) + fetchedBlocks := bserv.GetBlocks(ctx, cids) cidIndex := make(map[cid.Cid]int) for i, c := range cids { diff --git a/cmd/lotus-storage-miner/market.go b/cmd/lotus-storage-miner/market.go index 0e7be3dd3..6e3743143 100644 --- a/cmd/lotus-storage-miner/market.go +++ b/cmd/lotus-storage-miner/market.go @@ -164,12 +164,6 @@ var setAskCmd = &cli.Command{ Usage: "Set the price of the ask for verified deals (specified as attoFIL / GiB / Epoch) to `PRICE`", Required: true, }, - &cli.StringFlag{ - Name: "duration", - Usage: "Set duration of ask (a quantity of time after which the ask expires) `DURATION`", - DefaultText: "720h0m0s", - Value: "720h0m0s", - }, &cli.StringFlag{ Name: "min-piece-size", Usage: "Set minimum piece size (w/bit-padding, in bytes) in ask to `SIZE`", @@ -194,7 +188,7 @@ var setAskCmd = &cli.Command{ pri := types.NewInt(cctx.Uint64("price")) vpri := types.NewInt(cctx.Uint64("verified-price")) - dur, err := time.ParseDuration(cctx.String("duration")) + dur, err := time.ParseDuration("720h0m0s") if err != nil { return xerrors.Errorf("cannot parse duration: %w", err) } diff --git a/documentation/en/api-methods.md b/documentation/en/api-methods.md index fc37aff06..1bae2c1e6 100644 --- a/documentation/en/api-methods.md +++ b/documentation/en/api-methods.md @@ -956,7 +956,8 @@ Response: "PricePerEpoch": "0", "Duration": 42, "DealID": 5432, - "CreationTime": "0001-01-01T00:00:00Z" + "CreationTime": "0001-01-01T00:00:00Z", + "Verified": true } ``` @@ -992,7 +993,8 @@ Response: "PricePerEpoch": "0", "Duration": 42, "DealID": 5432, - "CreationTime": "0001-01-01T00:00:00Z" + "CreationTime": "0001-01-01T00:00:00Z", + "Verified": true } ``` diff --git a/go.mod b/go.mod index 112ca5749..a160389b0 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( github.com/fatih/color v1.8.0 github.com/filecoin-project/filecoin-ffi v0.30.4-0.20200716204036-cddc56607e1d github.com/filecoin-project/go-address v0.0.4 - github.com/filecoin-project/go-bitfield v0.2.1-0.20200920172649-837cbe6a1ed3 + github.com/filecoin-project/go-bitfield v0.2.1 github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2 github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 github.com/filecoin-project/go-data-transfer v0.6.7 diff --git a/go.sum b/go.sum index 2dd396739..5a00fff77 100644 --- a/go.sum +++ b/go.sum @@ -230,8 +230,8 @@ github.com/filecoin-project/go-amt-ipld/v2 v2.1.0 h1:t6qDiuGYYngDqaLc2ZUvdtAg4UN github.com/filecoin-project/go-amt-ipld/v2 v2.1.0/go.mod h1:nfFPoGyX0CU9SkXX8EoCcSuHN1XcbN0c6KBh7yvP5fs= github.com/filecoin-project/go-bitfield v0.2.0 h1:gCtLcjskIPtdg4NfN7gQZSQF9yrBQ7mkT0qCJxzGI2Q= github.com/filecoin-project/go-bitfield v0.2.0/go.mod h1:CNl9WG8hgR5mttCnUErjcQjGvuiZjRqK9rHVBsQF4oM= -github.com/filecoin-project/go-bitfield v0.2.1-0.20200920172649-837cbe6a1ed3 h1:HQa4+yCYsLq1TLM0kopeAhSCLbtZ541cWEi5N5rO+9g= -github.com/filecoin-project/go-bitfield v0.2.1-0.20200920172649-837cbe6a1ed3/go.mod h1:CNl9WG8hgR5mttCnUErjcQjGvuiZjRqK9rHVBsQF4oM= +github.com/filecoin-project/go-bitfield v0.2.1 h1:S6Uuqcspqu81sWJ0He4OAfFLm1tSwPdVjtKTkl5m/xQ= +github.com/filecoin-project/go-bitfield v0.2.1/go.mod h1:CNl9WG8hgR5mttCnUErjcQjGvuiZjRqK9rHVBsQF4oM= github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2 h1:av5fw6wmm58FYMgJeoB/lK9XXrgdugYiTqkdxjTy9k8= github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2/go.mod h1:pqTiPHobNkOVM5thSRsHYjyQfq7O5QSCMhvuu9JoDlg= github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus= diff --git a/node/impl/client/client.go b/node/impl/client/client.go index d76ac2ee3..f146dcea3 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -116,7 +116,13 @@ func (a *API) ClientStartDeal(ctx context.Context, params *api.StartDealParams) } } } - exist, err := a.WalletHas(ctx, params.Wallet) + + walletKey, err := a.StateAPI.StateManager.ResolveToKeyAddress(ctx, params.Wallet, nil) + if err != nil { + return nil, xerrors.Errorf("failed resolving params.Wallet addr: %w", params.Wallet) + } + + exist, err := a.WalletHas(ctx, walletKey) if err != nil { return nil, xerrors.Errorf("failed getting addr from wallet: %w", params.Wallet) } @@ -199,6 +205,7 @@ func (a *API) ClientListDeals(ctx context.Context) ([]api.DealInfo, error) { Duration: uint64(v.Proposal.Duration()), DealID: v.DealID, CreationTime: v.CreationTime.Time(), + Verified: v.Proposal.VerifiedDeal, } } @@ -222,6 +229,7 @@ func (a *API) ClientGetDealInfo(ctx context.Context, d cid.Cid) (*api.DealInfo, Duration: uint64(v.Proposal.Duration()), DealID: v.DealID, CreationTime: v.CreationTime.Time(), + Verified: v.Proposal.VerifiedDeal, }, nil } @@ -847,6 +855,7 @@ func newDealInfo(v storagemarket.ClientDeal) api.DealInfo { Duration: uint64(v.Proposal.Duration()), DealID: v.DealID, CreationTime: v.CreationTime.Time(), + Verified: v.Proposal.VerifiedDeal, } } diff --git a/node/impl/full/mpool.go b/node/impl/full/mpool.go index e0dd3ecef..1f093606c 100644 --- a/node/impl/full/mpool.go +++ b/node/impl/full/mpool.go @@ -160,17 +160,13 @@ func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message, spe return nil, xerrors.Errorf("mpool push: not enough funds: %s < %s", b, msg.Value) } - smsg, err := a.MessageSigner.SignMessage(ctx, msg) - if err != nil { - return nil, xerrors.Errorf("mpool push: failed to sign message: %w", err) - } - - _, err = a.Mpool.Push(smsg) - if err != nil { - return nil, xerrors.Errorf("mpool push: failed to push message: %w", err) - } - - return smsg, err + // Sign and push the message + return a.MessageSigner.SignMessage(ctx, msg, func(smsg *types.SignedMessage) error { + if _, err := a.Mpool.Push(smsg); err != nil { + return xerrors.Errorf("mpool push: failed to push message: %w", err) + } + return nil + }) } func (a *MpoolAPI) MpoolGetNonce(ctx context.Context, addr address.Address) (uint64, error) {