Merge remote-tracking branch 'origin/master' into steb/refactor-consistent-tipset-methods

This commit is contained in:
Łukasz Magiera 2021-02-05 13:08:49 +01:00
commit 8f603717a6
44 changed files with 1802 additions and 812 deletions

View File

@ -66,6 +66,9 @@ debug: lotus lotus-miner lotus-worker lotus-seed
calibnet: GOFLAGS+=-tags=calibnet
calibnet: lotus lotus-miner lotus-worker lotus-seed
nerpanet: GOFLAGS+=-tags=nerpanet
nerpanet: lotus lotus-miner lotus-worker lotus-seed
lotus: $(BUILD_DEPS)
rm -f lotus
go build $(GOFLAGS) -o lotus ./cmd/lotus

View File

@ -464,6 +464,12 @@ type FullNode interface {
// MsigGetVested returns the amount of FIL that vested in a multisig in a certain period.
// It takes the following params: <multisig address>, <start epoch>, <end epoch>
MsigGetVested(context.Context, address.Address, types.TipSetKey, types.TipSetKey) (types.BigInt, error)
//MsigGetPending returns pending transactions for the given multisig
//wallet. Once pending transactions are fully approved, they will no longer
//appear here.
MsigGetPending(context.Context, address.Address, types.TipSetKey) ([]*MsigTransaction, error)
// MsigCreate creates a multisig wallet
// It takes the following params: <required number of senders>, <approving addresses>, <unlock duration>
//<initial balance>, <sender address of the create msg>, <gas price>
@ -986,3 +992,13 @@ type MessageMatch struct {
To address.Address
From address.Address
}
type MsigTransaction struct {
ID int64
To address.Address
Value abi.TokenAmount
Method abi.MethodNum
Params []byte
Approved []address.Address
}

View File

@ -27,6 +27,7 @@ type GatewayAPI interface {
MpoolPush(ctx context.Context, sm *types.SignedMessage) (cid.Cid, error)
MsigGetAvailableBalance(ctx context.Context, addr address.Address, tsk types.TipSetKey) (types.BigInt, error)
MsigGetVested(ctx context.Context, addr address.Address, start types.TipSetKey, end types.TipSetKey) (types.BigInt, error)
MsigGetPending(context.Context, address.Address, types.TipSetKey) ([]*MsigTransaction, error)
StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error)
StateDealProviderCollateralBounds(ctx context.Context, size abi.PaddedPieceSize, verified bool, tsk types.TipSetKey) (DealCollateralBounds, error)
StateGetActor(ctx context.Context, actor address.Address, ts types.TipSetKey) (*types.Actor, error)

View File

@ -232,6 +232,7 @@ type FullNodeStruct struct {
MsigGetAvailableBalance func(context.Context, address.Address, types.TipSetKey) (types.BigInt, error) `perm:"read"`
MsigGetVestingSchedule func(context.Context, address.Address, types.TipSetKey) (api.MsigVesting, error) `perm:"read"`
MsigGetVested func(context.Context, address.Address, types.TipSetKey, types.TipSetKey) (types.BigInt, error) `perm:"read"`
MsigGetPending func(context.Context, address.Address, types.TipSetKey) ([]*api.MsigTransaction, error) `perm:"read"`
MsigCreate func(context.Context, uint64, []address.Address, abi.ChainEpoch, types.BigInt, address.Address, types.BigInt) (cid.Cid, error) `perm:"sign"`
MsigPropose func(context.Context, address.Address, address.Address, types.BigInt, address.Address, uint64, []byte) (cid.Cid, error) `perm:"sign"`
MsigApprove func(context.Context, address.Address, uint64, address.Address) (cid.Cid, error) `perm:"sign"`
@ -434,6 +435,7 @@ type GatewayStruct struct {
MpoolPush func(ctx context.Context, sm *types.SignedMessage) (cid.Cid, error)
MsigGetAvailableBalance func(ctx context.Context, addr address.Address, tsk types.TipSetKey) (types.BigInt, error)
MsigGetVested func(ctx context.Context, addr address.Address, start types.TipSetKey, end types.TipSetKey) (types.BigInt, error)
MsigGetPending func(context.Context, address.Address, types.TipSetKey) ([]*api.MsigTransaction, error)
StateAccountKey func(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error)
StateDealProviderCollateralBounds func(ctx context.Context, size abi.PaddedPieceSize, verified bool, tsk types.TipSetKey) (api.DealCollateralBounds, error)
StateGetActor func(ctx context.Context, actor address.Address, ts types.TipSetKey) (*types.Actor, error)
@ -444,9 +446,10 @@ type GatewayStruct struct {
StateMinerProvingDeadline func(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*dline.Info, error)
StateMinerPower func(context.Context, address.Address, types.TipSetKey) (*api.MinerPower, error)
StateMarketBalance func(ctx context.Context, addr address.Address, tsk types.TipSetKey) (api.MarketBalance, error)
StateSearchMsg func(ctx context.Context, msg cid.Cid) (*api.MsgLookup, error)
StateMarketStorageDeal func(ctx context.Context, dealId abi.DealID, tsk types.TipSetKey) (*api.MarketDeal, error)
StateReadState func(context.Context, address.Address, types.TipSetKey) (*api.ActorState, error)
StateNetworkVersion func(ctx context.Context, tsk types.TipSetKey) (stnetwork.Version, error)
StateSearchMsg func(ctx context.Context, msg cid.Cid) (*api.MsgLookup, error)
StateSectorGetInfo func(ctx context.Context, maddr address.Address, n abi.SectorNumber, tsk types.TipSetKey) (*miner.SectorOnChainInfo, error)
StateVerifiedClientStatus func(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*abi.StoragePower, error)
StateWaitMsg func(ctx context.Context, msg cid.Cid, confidence uint64) (*api.MsgLookup, error)
@ -1111,6 +1114,10 @@ func (c *FullNodeStruct) MsigGetVested(ctx context.Context, a address.Address, s
return c.Internal.MsigGetVested(ctx, a, sTsk, eTsk)
}
func (c *FullNodeStruct) MsigGetPending(ctx context.Context, a address.Address, tsk types.TipSetKey) ([]*api.MsigTransaction, error) {
return c.Internal.MsigGetPending(ctx, a, tsk)
}
func (c *FullNodeStruct) MsigCreate(ctx context.Context, req uint64, addrs []address.Address, duration abi.ChainEpoch, val types.BigInt, src address.Address, gp types.BigInt) (cid.Cid, error) {
return c.Internal.MsigCreate(ctx, req, addrs, duration, val, src, gp)
}
@ -1737,6 +1744,10 @@ func (g GatewayStruct) MsigGetVested(ctx context.Context, addr address.Address,
return g.Internal.MsigGetVested(ctx, addr, start, end)
}
func (g GatewayStruct) MsigGetPending(ctx context.Context, addr address.Address, tsk types.TipSetKey) ([]*api.MsigTransaction, error) {
return g.Internal.MsigGetPending(ctx, addr, tsk)
}
func (g GatewayStruct) StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error) {
return g.Internal.StateAccountKey(ctx, addr, tsk)
}
@ -1801,6 +1812,10 @@ func (g GatewayStruct) StateWaitMsg(ctx context.Context, msg cid.Cid, confidence
return g.Internal.StateWaitMsg(ctx, msg, confidence)
}
func (g GatewayStruct) StateReadState(ctx context.Context, addr address.Address, ts types.TipSetKey) (*api.ActorState, error) {
return g.Internal.StateReadState(ctx, addr, ts)
}
func (c *WalletStruct) WalletNew(ctx context.Context, typ types.KeyType) (address.Address, error) {
return c.Internal.WalletNew(ctx, typ)
}

View File

@ -20,9 +20,13 @@ import (
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
"github.com/filecoin-project/lotus/chain/types"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/filecoin-project/lotus/markets/storageadapter"
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/impl"
market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
ipld "github.com/ipfs/go-ipld-format"
dag "github.com/ipfs/go-merkledag"
dstest "github.com/ipfs/go-merkledag/test"
@ -88,6 +92,97 @@ func CreateClientFile(ctx context.Context, client api.FullNode, rseed int) (*api
return res, data, nil
}
func TestPublishDealsBatching(t *testing.T, b APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) {
publishPeriod := 10 * time.Second
maxDealsPerMsg := uint64(2)
// Set max deals per publish deals message to 2
minerDef := []StorageMiner{{
Full: 0,
Opts: node.Override(
new(*storageadapter.DealPublisher),
storageadapter.NewDealPublisher(nil, storageadapter.PublishMsgConfig{
Period: publishPeriod,
MaxDealsPerMsg: maxDealsPerMsg,
})),
Preseal: PresealGenesis,
}}
// Create a connect client and miner node
n, sn := b(t, OneFull, minerDef)
client := n[0].FullNode.(*impl.FullNodeAPI)
miner := sn[0]
s := connectAndStartMining(t, b, blocktime, client, miner)
defer s.blockMiner.Stop()
// Starts a deal and waits until it's published
runDealTillPublish := func(rseed int) {
res, _, err := CreateClientFile(s.ctx, s.client, rseed)
require.NoError(t, err)
upds, err := client.ClientGetDealUpdates(s.ctx)
require.NoError(t, err)
startDeal(t, s.ctx, s.miner, s.client, res.Root, false, startEpoch)
// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
time.Sleep(time.Second)
done := make(chan struct{})
go func() {
for upd := range upds {
if upd.DataRef.Root == res.Root && upd.State == storagemarket.StorageDealAwaitingPreCommit {
done <- struct{}{}
}
}
}()
<-done
}
// Run three deals in parallel
done := make(chan struct{}, maxDealsPerMsg+1)
for rseed := 1; rseed <= 3; rseed++ {
rseed := rseed
go func() {
runDealTillPublish(rseed)
done <- struct{}{}
}()
}
// Wait for two of the deals to be published
for i := 0; i < int(maxDealsPerMsg); i++ {
<-done
}
// Expect a single PublishStorageDeals message that includes the first two deals
msgCids, err := s.client.StateListMessages(s.ctx, &api.MessageMatch{To: market.Address}, types.EmptyTSK, 1)
require.NoError(t, err)
count := 0
for _, msgCid := range msgCids {
msg, err := s.client.ChainGetMessage(s.ctx, msgCid)
require.NoError(t, err)
if msg.Method == market.Methods.PublishStorageDeals {
count++
var pubDealsParams market2.PublishStorageDealsParams
err = pubDealsParams.UnmarshalCBOR(bytes.NewReader(msg.Params))
require.NoError(t, err)
require.Len(t, pubDealsParams.Deals, int(maxDealsPerMsg))
}
}
require.Equal(t, 1, count)
// The third deal should be published once the publish period expires.
// Allow a little padding as it takes a moment for the state change to
// be noticed by the client.
padding := 10 * time.Second
select {
case <-time.After(publishPeriod + padding):
require.Fail(t, "Expected 3rd deal to be published once publish period elapsed")
case <-done: // Success
}
}
func TestFastRetrievalDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) {
s := setupOneClientOneMiner(t, b, blocktime)
defer s.blockMiner.Stop()
@ -159,6 +254,21 @@ func TestSecondDealRetrieval(t *testing.T, b APIBuilder, blocktime time.Duration
}
}
func TestZeroPricePerByteRetrievalDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) {
s := setupOneClientOneMiner(t, b, blocktime)
defer s.blockMiner.Stop()
// Set price-per-byte to zero
ask, err := s.miner.MarketGetRetrievalAsk(s.ctx)
require.NoError(t, err)
ask.PricePerByte = abi.NewTokenAmount(0)
err = s.miner.MarketSetRetrievalAsk(s.ctx, ask)
require.NoError(t, err)
MakeDeal(t, s.ctx, 6, s.client, s.miner, false, false, startEpoch)
}
func startDeal(t *testing.T, ctx context.Context, miner TestStorageNode, client api.FullNode, fcid cid.Cid, fastRet bool, startEpoch abi.ChainEpoch) *cid.Cid {
maddr, err := miner.ActorAddress(ctx)
if err != nil {

View File

@ -59,6 +59,7 @@ const GenesisPreseals = 2
// Options for setting up a mock storage miner
type StorageMiner struct {
Full int
Opts node.Option
Preseal int
}

View File

@ -0,0 +1,4 @@
/dns4/bootstrap-0.nerpa.interplanetary.dev/tcp/1347/p2p/12D3KooWNfuGjtzWTVz8eJGZ2C3aJg2xLqorhsagu4LTWw6CwpK9
/dns4/bootstrap-1.nerpa.interplanetary.dev/tcp/1347/p2p/12D3KooWDfsxYk7dC6NNsHqZqqyMJCzkjZuXhjsmqBk3TUCBZLga
/dns4/bootstrap-2.nerpa.interplanetary.dev/tcp/1347/p2p/12D3KooWRZAGHmCCaa2gkYmnC4Q2TEwHGFSh6Fh1FFJ7RSXak5yN
/dns4/bootstrap-3.nerpa.interplanetary.dev/tcp/1347/p2p/12D3KooWBFxEigSKLvxJVdw3JziC9ePHHnyAn5LifWSqg2kttcth

BIN
build/genesis/nerpanet.car Normal file

Binary file not shown.

View File

@ -28,9 +28,6 @@ var UpgradeActorsV2Height = abi.ChainEpoch(30)
const UpgradeTapeHeight = 60
// This signals our tentative epoch for mainnet launch. Can make it later, but not earlier.
// Miners, clients, developers, custodians all need time to prepare.
// We still have upgrades and state changes to do, but can happen after signaling timing here.
const UpgradeLiftoffHeight = -5
const UpgradeKumquatHeight = 90

View File

@ -2,6 +2,7 @@
// +build !2k
// +build !testground
// +build !calibnet
// +build !nerpanet
package build

74
build/params_nerpanet.go Normal file
View File

@ -0,0 +1,74 @@
// +build nerpanet
package build
import (
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/chain/actors/policy"
builtin2 "github.com/filecoin-project/specs-actors/v2/actors/builtin"
)
var DrandSchedule = map[abi.ChainEpoch]DrandEnum{
0: DrandMainnet,
}
const BootstrappersFile = "nerpanet.pi"
const GenesisFile = "nerpanet.car"
const UpgradeBreezeHeight = -1
const BreezeGasTampingDuration = 0
const UpgradeSmokeHeight = -1
const UpgradeIgnitionHeight = -2
const UpgradeRefuelHeight = -3
const UpgradeTapeHeight = -4
const UpgradeLiftoffHeight = -5
const UpgradeActorsV2Height = 120 // critical: the network can bootstrap from v1 only
const UpgradeKumquatHeight = -6
const UpgradeCalicoHeight = 999999999
const UpgradePersianHeight = UpgradeCalicoHeight + (builtin2.EpochsInHour * 60)
const UpgradeOrangeHeight = 9999999999
// 2020-12-22T02:00:00Z
const UpgradeClausHeight = 99999999999
// TODO
const UpgradeActorsV3Height = 999999999999
func init() {
// Minimum block production power is set to 4 TiB
// Rationale is to discourage small-scale miners from trying to take over the network
// One needs to invest in ~2.3x the compute to break consensus, making it not worth it
//
// DOWNSIDE: the fake-seals need to be kept alive/protected, otherwise network will seize
//
policy.SetConsensusMinerMinPower(abi.NewStoragePower(4 << 40))
policy.SetSupportedProofTypes(
abi.RegisteredSealProof_StackedDrg512MiBV1,
abi.RegisteredSealProof_StackedDrg32GiBV1,
abi.RegisteredSealProof_StackedDrg64GiBV1,
)
// Lower the most time-consuming parts of PoRep
policy.SetPreCommitChallengeDelay(10)
// TODO - make this a variable
//miner.WPoStChallengeLookback = abi.ChainEpoch(2)
Devnet = false
}
const BlockDelaySecs = uint64(builtin2.EpochDurationSeconds)
const PropagationDelaySecs = uint64(6)
// BootstrapPeerThreshold is the minimum number peers we need to track for a sync worker to start
const BootstrapPeerThreshold = 4

View File

@ -51,6 +51,7 @@ type MessageBuilder interface {
// this type is the same between v0 and v2
type ProposalHashData = multisig3.ProposalHashData
type ProposeReturn = multisig3.ProposeReturn
type ProposeParams = multisig3.ProposeParams
func txnParams(id uint64, data *ProposalHashData) ([]byte, error) {
params := multisig3.TxnIDParams{ID: multisig3.TxnID(id)}

View File

@ -99,11 +99,13 @@ func (e *Events) listenHeadChanges(ctx context.Context) {
} else {
log.Warn("listenHeadChanges quit")
}
if ctx.Err() != nil {
select {
case <-build.Clock.After(time.Second):
case <-ctx.Done():
log.Warnf("not restarting listenHeadChanges: context error: %s", ctx.Err())
return
}
build.Clock.Sleep(time.Second)
log.Info("restarting listenHeadChanges")
}
}

View File

@ -32,7 +32,6 @@ import (
lapi "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/genesis"
)
@ -177,8 +176,6 @@ var sealBenchCmd = &cli.Command{
},
},
Action: func(c *cli.Context) error {
policy.AddSupportedProofTypes(abi.RegisteredSealProof_StackedDrg2KiBV1)
if c.Bool("no-gpu") {
err := os.Setenv("BELLMAN_NO_GPU", "1")
if err != nil {

View File

@ -48,6 +48,7 @@ type gatewayDepsAPI interface {
MpoolPushUntrusted(ctx context.Context, sm *types.SignedMessage) (cid.Cid, error)
MsigGetAvailableBalance(ctx context.Context, addr address.Address, tsk types.TipSetKey) (types.BigInt, error)
MsigGetVested(ctx context.Context, addr address.Address, start types.TipSetKey, end types.TipSetKey) (types.BigInt, error)
MsigGetPending(ctx context.Context, addr address.Address, ts types.TipSetKey) ([]*api.MsigTransaction, error)
StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error)
StateDealProviderCollateralBounds(ctx context.Context, size abi.PaddedPieceSize, verified bool, tsk types.TipSetKey) (api.DealCollateralBounds, error)
StateGetActor(ctx context.Context, actor address.Address, ts types.TipSetKey) (*types.Actor, error)
@ -228,6 +229,14 @@ func (a *GatewayAPI) MsigGetVested(ctx context.Context, addr address.Address, st
return a.api.MsigGetVested(ctx, addr, start, end)
}
func (a *GatewayAPI) MsigGetPending(ctx context.Context, addr address.Address, tsk types.TipSetKey) ([]*api.MsigTransaction, error) {
if err := a.checkTipsetKey(ctx, tsk); err != nil {
return nil, err
}
return a.api.MsigGetPending(ctx, addr, tsk)
}
func (a *GatewayAPI) StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error) {
if err := a.checkTipsetKey(ctx, tsk); err != nil {
return address.Undef, err

View File

@ -6,7 +6,9 @@ import (
"net/http"
"os"
"contrib.go.opencensus.io/exporter/prometheus"
"github.com/filecoin-project/go-jsonrpc"
promclient "github.com/prometheus/client_golang/prometheus"
"go.opencensus.io/tag"
"github.com/filecoin-project/lotus/build"
@ -99,6 +101,17 @@ var runCmd = &cli.Command{
rpcServer.Register("Filecoin", metrics.MetricedGatewayAPI(NewGatewayAPI(api)))
mux.Handle("/rpc/v0", rpcServer)
registry := promclient.DefaultRegisterer.(*promclient.Registry)
exporter, err := prometheus.NewExporter(prometheus.Options{
Registry: registry,
Namespace: "lotus_gw",
})
if err != nil {
return err
}
mux.Handle("/debug/metrics", exporter)
mux.PathPrefix("/").Handler(http.DefaultServeMux)
/*ah := &auth.Handler{

View File

@ -17,6 +17,7 @@ import (
var bitFieldCmd = &cli.Command{
Name: "bitfield",
Usage: "Bitfield analyze tool",
Description: "analyze bitfields",
Flags: []cli.Flag{
&cli.StringFlag{
@ -26,54 +27,25 @@ var bitFieldCmd = &cli.Command{
},
},
Subcommands: []*cli.Command{
bitFieldEncodeCmd,
bitFieldDecodeCmd,
bitFieldRunsCmd,
bitFieldStatCmd,
bitFieldDecodeCmd,
bitFieldMergeCmd,
bitFieldIntersectCmd,
bitFieldEncodeCmd,
bitFieldSubCmd,
},
}
var bitFieldRunsCmd = &cli.Command{
Name: "runs",
Usage: "Bitfield bit runs",
Description: "print bit runs in a bitfield",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "enc",
Value: "base64",
Usage: "specify input encoding to parse",
},
},
Action: func(cctx *cli.Context) error {
var val string
if cctx.Args().Present() {
val = cctx.Args().Get(0)
} else {
b, err := ioutil.ReadAll(os.Stdin)
dec, err := decodeToByte(cctx, 0)
if err != nil {
return err
}
val = string(b)
}
var dec []byte
switch cctx.String("enc") {
case "base64":
d, err := base64.StdEncoding.DecodeString(val)
if err != nil {
return fmt.Errorf("decoding base64 value: %w", err)
}
dec = d
case "hex":
d, err := hex.DecodeString(val)
if err != nil {
return fmt.Errorf("decoding hex value: %w", err)
}
dec = d
default:
return fmt.Errorf("unrecognized encoding: %s", cctx.String("enc"))
}
rle, err := rlepluslazy.FromBuf(dec)
if err != nil {
@ -98,7 +70,7 @@ var bitFieldRunsCmd = &cli.Command{
s = "FALSE"
}
fmt.Printf("@%d %s * %d\n", idx, s, r.Len)
fmt.Printf("@%08d %s * %d\n", idx, s, r.Len)
idx += r.Len
}
@ -109,43 +81,14 @@ var bitFieldRunsCmd = &cli.Command{
var bitFieldStatCmd = &cli.Command{
Name: "stat",
Usage: "Bitfield stats",
Description: "print bitfield stats",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "enc",
Value: "base64",
Usage: "specify input encoding to parse",
},
},
Action: func(cctx *cli.Context) error {
var val string
if cctx.Args().Present() {
val = cctx.Args().Get(0)
} else {
b, err := ioutil.ReadAll(os.Stdin)
dec, err := decodeToByte(cctx, 0)
if err != nil {
return err
}
val = string(b)
}
var dec []byte
switch cctx.String("enc") {
case "base64":
d, err := base64.StdEncoding.DecodeString(val)
if err != nil {
return fmt.Errorf("decoding base64 value: %w", err)
}
dec = d
case "hex":
d, err := hex.DecodeString(val)
if err != nil {
return fmt.Errorf("decoding hex value: %w", err)
}
dec = d
default:
return fmt.Errorf("unrecognized encoding: %s", cctx.String("enc"))
}
fmt.Printf("Raw length: %d bits (%d bytes)\n", len(dec)*8, len(dec))
rle, err := rlepluslazy.FromBuf(dec)
if err != nil {
@ -157,10 +100,7 @@ var bitFieldStatCmd = &cli.Command{
return xerrors.Errorf("getting run iterator: %w", err)
}
fmt.Printf("Raw length: %d bits (%d bytes)\n", len(dec)*8, len(dec))
var ones, zeros, oneRuns, zeroRuns, invalid uint64
for rit.HasNext() {
r, err := rit.NextRun()
if err != nil {
@ -195,14 +135,8 @@ var bitFieldStatCmd = &cli.Command{
var bitFieldDecodeCmd = &cli.Command{
Name: "decode",
Usage: "Bitfield to decimal number",
Description: "decode bitfield and print all numbers in it",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "enc",
Value: "base64",
Usage: "specify input encoding to parse",
},
},
Action: func(cctx *cli.Context) error {
rle, err := decode(cctx, 0)
if err != nil {
@ -219,43 +153,61 @@ var bitFieldDecodeCmd = &cli.Command{
},
}
var bitFieldIntersectCmd = &cli.Command{
Name: "intersect",
Description: "intersect 2 bitfields and print the resulting bitfield as base64",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "enc",
Value: "base64",
Usage: "specify input encoding to parse",
},
},
var bitFieldMergeCmd = &cli.Command{
Name: "merge",
Usage: "Merge 2 bitfields",
Description: "Merge 2 bitfields and print the resulting bitfield",
Action: func(cctx *cli.Context) error {
a, err := decode(cctx, 0)
if err != nil {
return err
}
b, err := decode(cctx, 1)
if err != nil {
return err
}
o, err := bitfield.MergeBitFields(a, b)
if err != nil {
return xerrors.Errorf("merge: %w", err)
}
str, err := encode(cctx, o)
if err != nil {
return err
}
fmt.Println(str)
return nil
},
}
var bitFieldIntersectCmd = &cli.Command{
Name: "intersect",
Usage: "Intersect 2 bitfields",
Description: "intersect 2 bitfields and print the resulting bitfield",
Action: func(cctx *cli.Context) error {
a, err := decode(cctx, 0)
if err != nil {
return err
}
b, err := decode(cctx, 1)
if err != nil {
return err
}
o, err := bitfield.IntersectBitField(a, b)
if err != nil {
return xerrors.Errorf("intersect: %w", err)
}
s, err := o.RunIterator()
str, err := encode(cctx, o)
if err != nil {
return err
}
bytes, err := rlepluslazy.EncodeRuns(s, []byte{})
if err != nil {
return err
}
fmt.Println(base64.StdEncoding.EncodeToString(bytes))
fmt.Println(str)
return nil
},
@ -263,41 +215,29 @@ var bitFieldIntersectCmd = &cli.Command{
var bitFieldSubCmd = &cli.Command{
Name: "sub",
Description: "subtract 2 bitfields and print the resulting bitfield as base64",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "enc",
Value: "base64",
Usage: "specify input encoding to parse",
},
},
Usage: "Subtract 2 bitfields",
Description: "subtract 2 bitfields and print the resulting bitfield",
Action: func(cctx *cli.Context) error {
b, err := decode(cctx, 1)
a, err := decode(cctx, 0)
if err != nil {
return err
}
a, err := decode(cctx, 0)
b, err := decode(cctx, 1)
if err != nil {
return err
}
o, err := bitfield.SubtractBitField(a, b)
if err != nil {
return xerrors.Errorf("intersect: %w", err)
return xerrors.Errorf("subtract: %w", err)
}
s, err := o.RunIterator()
str, err := encode(cctx, o)
if err != nil {
return err
}
bytes, err := rlepluslazy.EncodeRuns(s, []byte{})
if err != nil {
return err
}
fmt.Println(base64.StdEncoding.EncodeToString(bytes))
fmt.Println(str)
return nil
},
@ -305,15 +245,9 @@ var bitFieldSubCmd = &cli.Command{
var bitFieldEncodeCmd = &cli.Command{
Name: "encode",
Usage: "Decimal number to bitfield",
Description: "encode a series of decimal numbers into a bitfield",
ArgsUsage: "[infile]",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "enc",
Value: "base64",
Usage: "specify input encoding to parse",
},
},
Action: func(cctx *cli.Context) error {
f, err := os.Open(cctx.Args().First())
if err != nil {
@ -331,38 +265,64 @@ var bitFieldEncodeCmd = &cli.Command{
out.Set(i)
}
s, err := out.RunIterator()
str, err := encode(cctx, out)
if err != nil {
return err
}
bytes, err := rlepluslazy.EncodeRuns(s, []byte{})
if err != nil {
return err
}
fmt.Println(base64.StdEncoding.EncodeToString(bytes))
fmt.Println(str)
return nil
},
}
func decode(cctx *cli.Context, a int) (bitfield.BitField, error) {
var val string
if cctx.Args().Present() {
if a >= cctx.NArg() {
return bitfield.BitField{}, xerrors.Errorf("need more than %d args", a)
func encode(cctx *cli.Context, field bitfield.BitField) (string, error) {
s, err := field.RunIterator()
if err != nil {
return "", err
}
val = cctx.Args().Get(a)
} else {
if a > 0 {
return bitfield.BitField{}, xerrors.Errorf("need more than %d args", a)
bytes, err := rlepluslazy.EncodeRuns(s, []byte{})
if err != nil {
return "", err
}
b, err := ioutil.ReadAll(os.Stdin)
var str string
switch cctx.String("enc") {
case "base64":
str = base64.StdEncoding.EncodeToString(bytes)
case "hex":
str = hex.EncodeToString(bytes)
default:
return "", fmt.Errorf("unrecognized encoding: %s", cctx.String("enc"))
}
return str, nil
}
func decode(cctx *cli.Context, i int) (bitfield.BitField, error) {
b, err := decodeToByte(cctx, i)
if err != nil {
return bitfield.BitField{}, err
}
val = string(b)
return bitfield.NewFromBytes(b)
}
func decodeToByte(cctx *cli.Context, i int) ([]byte, error) {
var val string
if cctx.Args().Present() {
if i >= cctx.NArg() {
return nil, xerrors.Errorf("need more than %d args", i)
}
val = cctx.Args().Get(i)
} else {
if i > 0 {
return nil, xerrors.Errorf("need more than %d args", i)
}
r, err := ioutil.ReadAll(os.Stdin)
if err != nil {
return nil, err
}
val = string(r)
}
var dec []byte
@ -370,18 +330,18 @@ func decode(cctx *cli.Context, a int) (bitfield.BitField, error) {
case "base64":
d, err := base64.StdEncoding.DecodeString(val)
if err != nil {
return bitfield.BitField{}, fmt.Errorf("decoding base64 value: %w", err)
return nil, fmt.Errorf("decoding base64 value: %w", err)
}
dec = d
case "hex":
d, err := hex.DecodeString(val)
if err != nil {
return bitfield.BitField{}, fmt.Errorf("decoding hex value: %w", err)
return nil, fmt.Errorf("decoding hex value: %w", err)
}
dec = d
default:
return bitfield.BitField{}, fmt.Errorf("unrecognized encoding: %s", cctx.String("enc"))
return nil, fmt.Errorf("unrecognized encoding: %s", cctx.String("enc"))
}
return bitfield.NewFromBytes(dec)
return dec, nil
}

View File

@ -0,0 +1,244 @@
package main
import (
"bytes"
"context"
"crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
gobig "math/big"
"strings"
"sync"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors/builtin"
"github.com/filecoin-project/lotus/chain/actors/builtin/multisig"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/types"
lcli "github.com/filecoin-project/lotus/cli"
)
type InteractiveWallet struct {
lk sync.Mutex
apiGetter func() (api.FullNode, jsonrpc.ClientCloser, error)
under api.WalletAPI
}
func (c *InteractiveWallet) WalletNew(ctx context.Context, typ types.KeyType) (address.Address, error) {
err := c.accept(func() error {
fmt.Println("-----")
fmt.Println("ACTION: WalletNew - Creating new wallet")
fmt.Printf("TYPE: %s\n", typ)
return nil
})
if err != nil {
return address.Address{}, err
}
return c.under.WalletNew(ctx, typ)
}
func (c *InteractiveWallet) WalletHas(ctx context.Context, addr address.Address) (bool, error) {
return c.under.WalletHas(ctx, addr)
}
func (c *InteractiveWallet) WalletList(ctx context.Context) ([]address.Address, error) {
return c.under.WalletList(ctx)
}
func (c *InteractiveWallet) WalletSign(ctx context.Context, k address.Address, msg []byte, meta api.MsgMeta) (*crypto.Signature, error) {
err := c.accept(func() error {
fmt.Println("-----")
fmt.Println("ACTION: WalletSign - Sign a message/deal")
fmt.Printf("ADDRESS: %s\n", k)
fmt.Printf("TYPE: %s\n", meta.Type)
switch meta.Type {
case api.MTChainMsg:
var cmsg types.Message
if err := cmsg.UnmarshalCBOR(bytes.NewReader(meta.Extra)); err != nil {
return xerrors.Errorf("unmarshalling message: %w", err)
}
_, bc, err := cid.CidFromBytes(msg)
if err != nil {
return xerrors.Errorf("getting cid from signing bytes: %w", err)
}
if !cmsg.Cid().Equals(bc) {
return xerrors.Errorf("cid(meta.Extra).bytes() != msg")
}
jb, err := json.MarshalIndent(&cmsg, "", " ")
if err != nil {
return xerrors.Errorf("json-marshaling the message: %w", err)
}
fmt.Println("Message JSON:", string(jb))
fmt.Println("Value:", types.FIL(cmsg.Value))
fmt.Println("Max Fees:", types.FIL(cmsg.RequiredFunds()))
fmt.Println("Max Total Cost:", types.FIL(big.Add(cmsg.RequiredFunds(), cmsg.Value)))
if c.apiGetter != nil {
napi, closer, err := c.apiGetter()
if err != nil {
return xerrors.Errorf("getting node api: %w", err)
}
defer closer()
toact, err := napi.StateGetActor(ctx, cmsg.To, types.EmptyTSK)
if err != nil {
return xerrors.Errorf("looking up dest actor: %w", err)
}
fmt.Println("Method:", stmgr.MethodsMap[toact.Code][cmsg.Method].Name)
p, err := lcli.JsonParams(toact.Code, cmsg.Method, cmsg.Params)
if err != nil {
return err
}
fmt.Println("Params:", p)
if builtin.IsMultisigActor(toact.Code) && cmsg.Method == multisig.Methods.Propose {
var mp multisig.ProposeParams
if err := mp.UnmarshalCBOR(bytes.NewReader(cmsg.Params)); err != nil {
return xerrors.Errorf("unmarshalling multisig propose params: %w", err)
}
fmt.Println("\tMultiSig Proposal Value:", types.FIL(mp.Value))
fmt.Println("\tMultiSig Proposal Hex Params:", hex.EncodeToString(mp.Params))
toact, err := napi.StateGetActor(ctx, mp.To, types.EmptyTSK)
if err != nil {
return xerrors.Errorf("looking up msig dest actor: %w", err)
}
fmt.Println("\tMultiSig Proposal Method:", stmgr.MethodsMap[toact.Code][mp.Method].Name)
p, err := lcli.JsonParams(toact.Code, mp.Method, mp.Params)
if err != nil {
return err
}
fmt.Println("\tMultiSig Proposal Params:", strings.ReplaceAll(p, "\n", "\n\t"))
}
} else {
fmt.Println("Params: No chain node connection, can't decode params")
}
case api.MTDealProposal:
return xerrors.Errorf("TODO") // TODO
default:
log.Infow("WalletSign", "address", k, "type", meta.Type)
}
return nil
})
if err != nil {
return nil, err
}
return c.under.WalletSign(ctx, k, msg, meta)
}
func (c *InteractiveWallet) WalletExport(ctx context.Context, a address.Address) (*types.KeyInfo, error) {
err := c.accept(func() error {
fmt.Println("-----")
fmt.Println("ACTION: WalletExport - Export private key")
fmt.Printf("ADDRESS: %s\n", a)
return nil
})
if err != nil {
return nil, err
}
return c.under.WalletExport(ctx, a)
}
func (c *InteractiveWallet) WalletImport(ctx context.Context, ki *types.KeyInfo) (address.Address, error) {
err := c.accept(func() error {
fmt.Println("-----")
fmt.Println("ACTION: WalletImport - Import private key")
fmt.Printf("TYPE: %s\n", ki.Type)
return nil
})
if err != nil {
return address.Undef, err
}
return c.under.WalletImport(ctx, ki)
}
func (c *InteractiveWallet) WalletDelete(ctx context.Context, addr address.Address) error {
err := c.accept(func() error {
fmt.Println("-----")
fmt.Println("ACTION: WalletDelete - Delete a private key")
fmt.Printf("ADDRESS: %s\n", addr)
return nil
})
if err != nil {
return err
}
return c.under.WalletDelete(ctx, addr)
}
func (c *InteractiveWallet) accept(prompt func() error) error {
c.lk.Lock()
defer c.lk.Unlock()
if err := prompt(); err != nil {
return err
}
yes := randomYes()
for {
fmt.Printf("\nAccept the above? (%s/No): ", yes)
var a string
if _, err := fmt.Scanln(&a); err != nil {
return err
}
switch a {
case yes:
fmt.Println("approved")
return nil
case "No":
return xerrors.Errorf("action rejected")
}
fmt.Printf("Type EXACTLY '%s' or 'No'\n", yes)
}
}
var yeses = []string{
"yes",
"Yes",
"YES",
"approve",
"Approve",
"accept",
"Accept",
"authorize",
"Authorize",
"confirm",
"Confirm",
}
func randomYes() string {
i, err := rand.Int(rand.Reader, gobig.NewInt(int64(len(yeses))))
if err != nil {
panic(err)
}
return yeses[i.Int64()]
}

View File

@ -45,6 +45,12 @@ func main() {
EnvVars: []string{"WALLET_PATH"},
Value: "~/.lotuswallet", // TODO: Consider XDG_DATA_HOME
},
&cli.StringFlag{
Name: "repo",
EnvVars: []string{"LOTUS_PATH"},
Hidden: true,
Value: "~/.lotus",
},
},
Commands: local,
@ -70,6 +76,14 @@ var runCmd = &cli.Command{
Name: "ledger",
Usage: "use a ledger device instead of an on-disk wallet",
},
&cli.BoolFlag{
Name: "interactive",
Usage: "prompt before performing actions (DO NOT USE FOR MINER WORKER ADDRESS)",
},
&cli.BoolFlag{
Name: "offline",
Usage: "don't query chain state in interactive mode",
},
},
Action: func(cctx *cli.Context) error {
log.Info("Starting lotus wallet")
@ -134,8 +148,25 @@ var runCmd = &cli.Command{
log.Info("Setting up API endpoint at " + address)
if cctx.Bool("interactive") {
var ag func() (api.FullNode, jsonrpc.ClientCloser, error)
if !cctx.Bool("offline") {
ag = func() (api.FullNode, jsonrpc.ClientCloser, error) {
return lcli.GetFullNodeAPI(cctx)
}
}
w = &InteractiveWallet{
under: w,
apiGetter: ag,
}
} else {
w = &LoggedWallet{under: w}
}
rpcServer := jsonrpc.NewServer()
rpcServer.Register("Filecoin", &LoggedWallet{under: metrics.MetricedWalletAPI(w)})
rpcServer.Register("Filecoin", metrics.MetricedWalletAPI(w))
mux.Handle("/rpc/v0", rpcServer)
mux.PathPrefix("/").Handler(http.DefaultServeMux) // pprof

View File

@ -99,6 +99,7 @@
* [MsigCancel](#MsigCancel)
* [MsigCreate](#MsigCreate)
* [MsigGetAvailableBalance](#MsigGetAvailableBalance)
* [MsigGetPending](#MsigGetPending)
* [MsigGetVested](#MsigGetVested)
* [MsigGetVestingSchedule](#MsigGetVestingSchedule)
* [MsigPropose](#MsigPropose)
@ -2445,6 +2446,31 @@ Inputs:
Response: `"0"`
### MsigGetPending
MsigGetPending returns pending transactions for the given multisig
wallet. Once pending transactions are fully approved, they will no longer
appear here.
Perms: read
Inputs:
```json
[
"f01234",
[
{
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
},
{
"/": "bafy2bzacebp3shtrn43k7g3unredz7fxn4gj533d3o43tqn2p2ipxxhrvchve"
}
]
]
```
Response: `null`
### MsigGetVested
MsigGetVested returns the amount of FIL that vested in a multisig in a certain period.
It takes the following params: <multisig address>, <start epoch>, <end epoch>

View File

@ -15,4 +15,6 @@ type Config struct {
MaxSealingSectorsForDeals uint64
WaitDealsDelay time.Duration
AlwaysKeepUnsealedCopy bool
}

View File

@ -512,7 +512,12 @@ func (m *Sealing) handleCommitWait(ctx statemachine.Context, sector SectorInfo)
func (m *Sealing) handleFinalizeSector(ctx statemachine.Context, sector SectorInfo) error {
// TODO: Maybe wait for some finality
if err := m.sealer.FinalizeSector(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.keepUnsealedRanges(false)); err != nil {
cfg, err := m.getConfig()
if err != nil {
return xerrors.Errorf("getting sealing config: %w", err)
}
if err := m.sealer.FinalizeSector(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.keepUnsealedRanges(false, cfg.AlwaysKeepUnsealedCopy)); err != nil {
return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("finalize sector: %w", err)})
}
@ -523,7 +528,12 @@ func (m *Sealing) handleProvingSector(ctx statemachine.Context, sector SectorInf
// TODO: track sector health / expiration
log.Infof("Proving sector %d", sector.SectorNumber)
if err := m.sealer.ReleaseUnsealed(ctx.Context(), m.minerSector(sector.SectorType, sector.SectorNumber), sector.keepUnsealedRanges(true)); err != nil {
cfg, err := m.getConfig()
if err != nil {
return xerrors.Errorf("getting sealing config: %w", err)
}
if err := m.sealer.ReleaseUnsealed(ctx.Context(), m.minerSector(sector.SectorType, sector.SectorNumber), sector.keepUnsealedRanges(true, cfg.AlwaysKeepUnsealedCopy)); err != nil {
log.Error(err)
}

View File

@ -165,7 +165,7 @@ func (t *SectorInfo) sealingCtx(ctx context.Context) context.Context {
// Returns list of offset/length tuples of sector data ranges which clients
// requested to keep unsealed
func (t *SectorInfo) keepUnsealedRanges(invert bool) []storage.Range {
func (t *SectorInfo) keepUnsealedRanges(invert, alwaysKeep bool) []storage.Range {
var out []storage.Range
var at abi.UnpaddedPieceSize
@ -176,7 +176,10 @@ func (t *SectorInfo) keepUnsealedRanges(invert bool) []storage.Range {
if piece.DealInfo == nil {
continue
}
if piece.DealInfo.KeepUnsealed == invert {
keep := piece.DealInfo.KeepUnsealed || alwaysKeep
if keep == invert {
continue
}

4
go.mod
View File

@ -33,7 +33,7 @@ require (
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03
github.com/filecoin-project/go-data-transfer v1.2.7
github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a
github.com/filecoin-project/go-fil-markets v1.1.2
github.com/filecoin-project/go-fil-markets v1.1.7
github.com/filecoin-project/go-jsonrpc v0.1.2
github.com/filecoin-project/go-multistore v0.0.3
github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20
@ -44,7 +44,7 @@ require (
github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b
github.com/filecoin-project/specs-actors v0.9.13
github.com/filecoin-project/specs-actors/v2 v2.3.4
github.com/filecoin-project/specs-actors/v3 v3.0.1-0.20210128055125-ab0632b1c8fa
github.com/filecoin-project/specs-actors/v3 v3.0.1-0.20210128235937-57195d8909b1
github.com/filecoin-project/specs-storage v0.1.1-0.20201105051918-5188d9774506
github.com/filecoin-project/test-vectors/schema v0.0.5
github.com/gbrlsnchs/jwt/v3 v3.0.0-beta.1

8
go.sum
View File

@ -269,8 +269,8 @@ github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go
github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a h1:hyJ+pUm/4U4RdEZBlg6k8Ma4rDiuvqyGpoICXAxwsTg=
github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ=
github.com/filecoin-project/go-fil-markets v1.0.5-0.20201113164554-c5eba40d5335/go.mod h1:AJySOJC00JRWEZzRG2KsfUnqEf5ITXxeX09BE9N4f9c=
github.com/filecoin-project/go-fil-markets v1.1.2 h1:5FVdDmF9GvW6Xllql9OGiJXEZjh/tu590BXSQH2W/vU=
github.com/filecoin-project/go-fil-markets v1.1.2/go.mod h1:6oTRaAsHnCqhi3mpZqdvnWIzH6QzHQc4dbhJrI9/BfQ=
github.com/filecoin-project/go-fil-markets v1.1.7 h1:7yy7alIDWzUxljxZhGmG3+wvaU4Ty5QDMbPmdZeaIJ8=
github.com/filecoin-project/go-fil-markets v1.1.7/go.mod h1:6oTRaAsHnCqhi3mpZqdvnWIzH6QzHQc4dbhJrI9/BfQ=
github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM=
github.com/filecoin-project/go-hamt-ipld v0.1.5/go.mod h1:6Is+ONR5Cd5R6XZoCse1CWaXZc0Hdb/JeX+EQCQzX24=
github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0 h1:b3UDemBYN2HNfk3KOXNuxgTTxlWi3xVvbQP0IT38fvM=
@ -307,8 +307,8 @@ github.com/filecoin-project/specs-actors/v2 v2.3.2 h1:2Vcf4CGa29kRh4JJ02m+FbvD/p
github.com/filecoin-project/specs-actors/v2 v2.3.2/go.mod h1:UuJQLoTx/HPvvWeqlIFmC/ywlOLHNe8SNQ3OunFbu2Y=
github.com/filecoin-project/specs-actors/v2 v2.3.4 h1:NZK2oMCcA71wNsUzDBmLQyRMzcCnX9tDGvwZ53G67j8=
github.com/filecoin-project/specs-actors/v2 v2.3.4/go.mod h1:UuJQLoTx/HPvvWeqlIFmC/ywlOLHNe8SNQ3OunFbu2Y=
github.com/filecoin-project/specs-actors/v3 v3.0.1-0.20210128055125-ab0632b1c8fa h1:J0yyTt9MLDaN0XvzjEAWTCvG6SRVfXc6dVLluvRiOsQ=
github.com/filecoin-project/specs-actors/v3 v3.0.1-0.20210128055125-ab0632b1c8fa/go.mod h1:NL24TPjJGyU7fh1ztpUyYcoZi3TmRKNEI0huPYmhObA=
github.com/filecoin-project/specs-actors/v3 v3.0.1-0.20210128235937-57195d8909b1 h1:I6mvbwANIoToUZ37cYmuLyDKbPlAUxWnp0fJOZnlTz4=
github.com/filecoin-project/specs-actors/v3 v3.0.1-0.20210128235937-57195d8909b1/go.mod h1:NL24TPjJGyU7fh1ztpUyYcoZi3TmRKNEI0huPYmhObA=
github.com/filecoin-project/specs-storage v0.1.1-0.20201105051918-5188d9774506 h1:Ur/l2+6qN+lQiqjozWWc5p9UDaAMDZKTlDS98oRnlIw=
github.com/filecoin-project/specs-storage v0.1.1-0.20201105051918-5188d9774506/go.mod h1:nJRRM7Aa9XVvygr3W9k6xGF46RWzr2zxF/iGoAIfA/g=
github.com/filecoin-project/test-vectors/schema v0.0.5 h1:w3zHQhzM4pYxJDl21avXjOKBLF8egrvwUwjpT8TquDg=

View File

@ -22,7 +22,7 @@ import (
type IpfsBstore struct {
ctx context.Context
api iface.CoreAPI
api, offlineAPI iface.CoreAPI
}
func NewIpfsBstore(ctx context.Context, onlineMode bool) (*IpfsBstore, error) {
@ -34,10 +34,18 @@ func NewIpfsBstore(ctx context.Context, onlineMode bool) (*IpfsBstore, error) {
if err != nil {
return nil, xerrors.Errorf("setting offline mode: %s", err)
}
offlineAPI := api
if onlineMode {
offlineAPI, err = localApi.WithOptions(options.Api.Offline(true))
if err != nil {
return nil, xerrors.Errorf("applying offline mode: %s", err)
}
}
return &IpfsBstore{
ctx: ctx,
api: api,
offlineAPI: offlineAPI,
}, nil
}
@ -50,10 +58,18 @@ func NewRemoteIpfsBstore(ctx context.Context, maddr multiaddr.Multiaddr, onlineM
if err != nil {
return nil, xerrors.Errorf("applying offline mode: %s", err)
}
offlineAPI := api
if onlineMode {
offlineAPI, err = httpApi.WithOptions(options.Api.Offline(true))
if err != nil {
return nil, xerrors.Errorf("applying offline mode: %s", err)
}
}
return &IpfsBstore{
ctx: ctx,
api: api,
offlineAPI: offlineAPI,
}, nil
}
@ -62,7 +78,7 @@ func (i *IpfsBstore) DeleteBlock(cid cid.Cid) error {
}
func (i *IpfsBstore) Has(cid cid.Cid) (bool, error) {
_, err := i.api.Block().Stat(i.ctx, path.IpldPath(cid))
_, err := i.offlineAPI.Block().Stat(i.ctx, path.IpldPath(cid))
if err != nil {
// The underlying client is running in Offline mode.
// Stat() will fail with an err if the block isn't in the

View File

@ -6,11 +6,12 @@ import (
"bytes"
"context"
"github.com/filecoin-project/go-address"
cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/ipfs/go-cid"
"go.uber.org/fx"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/go-fil-markets/shared"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-state-types/abi"
@ -31,15 +32,16 @@ import (
"github.com/filecoin-project/lotus/lib/sigs"
"github.com/filecoin-project/lotus/markets/utils"
"github.com/filecoin-project/lotus/node/impl/full"
"github.com/filecoin-project/lotus/node/modules/helpers"
)
type ClientNodeAdapter struct {
*clientApi
*apiWrapper
fundmgr *market.FundManager
ev *events.Events
dsMatcher *dealStateMatcher
scMgr *SectorCommittedManager
}
type clientApi struct {
@ -48,16 +50,20 @@ type clientApi struct {
full.MpoolAPI
}
func NewClientNodeAdapter(stateapi full.StateAPI, chain full.ChainAPI, mpool full.MpoolAPI, fundmgr *market.FundManager) storagemarket.StorageClientNode {
func NewClientNodeAdapter(mctx helpers.MetricsCtx, lc fx.Lifecycle, stateapi full.StateAPI, chain full.ChainAPI, mpool full.MpoolAPI, fundmgr *market.FundManager) storagemarket.StorageClientNode {
capi := &clientApi{chain, stateapi, mpool}
return &ClientNodeAdapter{
ctx := helpers.LifecycleCtx(mctx, lc)
ev := events.NewEvents(ctx, capi)
a := &ClientNodeAdapter{
clientApi: capi,
apiWrapper: &apiWrapper{api: capi},
fundmgr: fundmgr,
ev: events.NewEvents(context.TODO(), capi),
ev: ev,
dsMatcher: newDealStateMatcher(state.NewStatePredicates(state.WrapFastAPI(capi))),
}
a.scMgr = NewSectorCommittedManager(ev, a, &apiWrapper{api: capi})
return a
}
func (c *ClientNodeAdapter) ListStorageProviders(ctx context.Context, encodedTs shared.TipSetToken) ([]*storagemarket.StorageProviderInfo, error) {
@ -135,6 +141,7 @@ func (c *ClientNodeAdapter) GetBalance(ctx context.Context, addr address.Address
// ValidatePublishedDeal validates that the provided deal has appeared on chain and references the same ClientDeal
// returns the Deal id if there is no error
// TODO: Don't return deal ID
func (c *ClientNodeAdapter) ValidatePublishedDeal(ctx context.Context, deal storagemarket.ClientDeal) (abi.DealID, error) {
log.Infow("DEAL ACCEPTED!")
@ -216,14 +223,17 @@ func (c *ClientNodeAdapter) DealProviderCollateralBounds(ctx context.Context, si
return big.Mul(bounds.Min, big.NewInt(clientOverestimation)), bounds.Max, nil
}
// TODO: Remove dealID parameter, change publishCid to be cid.Cid (instead of pointer)
func (c *ClientNodeAdapter) OnDealSectorPreCommitted(ctx context.Context, provider address.Address, dealID abi.DealID, proposal market2.DealProposal, publishCid *cid.Cid, cb storagemarket.DealSectorPreCommittedCallback) error {
return OnDealSectorPreCommitted(ctx, c, c.ev, provider, dealID, marketactor.DealProposal(proposal), publishCid, cb)
return c.scMgr.OnDealSectorPreCommitted(ctx, provider, marketactor.DealProposal(proposal), *publishCid, cb)
}
// TODO: Remove dealID parameter, change publishCid to be cid.Cid (instead of pointer)
func (c *ClientNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider address.Address, dealID abi.DealID, sectorNumber abi.SectorNumber, proposal market2.DealProposal, publishCid *cid.Cid, cb storagemarket.DealSectorCommittedCallback) error {
return OnDealSectorCommitted(ctx, c, c.ev, provider, dealID, sectorNumber, marketactor.DealProposal(proposal), publishCid, cb)
return c.scMgr.OnDealSectorCommitted(ctx, provider, sectorNumber, marketactor.DealProposal(proposal), *publishCid, cb)
}
// TODO: Replace dealID parameter with DealProposal
func (c *ClientNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID abi.DealID, onDealExpired storagemarket.DealExpiredCallback, onDealSlashed storagemarket.DealSlashedCallback) error {
head, err := c.ChainHead(ctx)
if err != nil {

View File

@ -0,0 +1,345 @@
package storageadapter
import (
"context"
"fmt"
"strings"
"sync"
"time"
"go.uber.org/fx"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/types"
market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
)
type dealPublisherAPI interface {
ChainHead(context.Context) (*types.TipSet, error)
MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error)
StateMinerInfo(context.Context, address.Address, types.TipSetKey) (miner.MinerInfo, error)
}
// DealPublisher batches deal publishing so that many deals can be included in
// a single publish message. This saves gas for miners that publish deals
// frequently.
// When a deal is submitted, the DealPublisher waits a configurable amount of
// time for other deals to be submitted before sending the publish message.
// There is a configurable maximum number of deals that can be included in one
// message. When the limit is reached the DealPublisher immediately submits a
// publish message with all deals in the queue.
type DealPublisher struct {
api dealPublisherAPI
ctx context.Context
Shutdown context.CancelFunc
maxDealsPerPublishMsg uint64
publishPeriod time.Duration
publishSpec *api.MessageSendSpec
lk sync.Mutex
pending []*pendingDeal
cancelWaitForMoreDeals context.CancelFunc
publishPeriodStart time.Time
}
// A deal that is queued to be published
type pendingDeal struct {
ctx context.Context
deal market2.ClientDealProposal
Result chan publishResult
}
// The result of publishing a deal
type publishResult struct {
msgCid cid.Cid
err error
}
func newPendingDeal(ctx context.Context, deal market2.ClientDealProposal) *pendingDeal {
return &pendingDeal{
ctx: ctx,
deal: deal,
Result: make(chan publishResult),
}
}
type PublishMsgConfig struct {
// The amount of time to wait for more deals to arrive before
// publishing
Period time.Duration
// The maximum number of deals to include in a single PublishStorageDeals
// message
MaxDealsPerMsg uint64
}
func NewDealPublisher(
feeConfig *config.MinerFeeConfig,
publishMsgCfg PublishMsgConfig,
) func(lc fx.Lifecycle, full api.FullNode) *DealPublisher {
return func(lc fx.Lifecycle, full api.FullNode) *DealPublisher {
maxFee := abi.NewTokenAmount(0)
if feeConfig != nil {
maxFee = abi.TokenAmount(feeConfig.MaxPublishDealsFee)
}
publishSpec := &api.MessageSendSpec{MaxFee: maxFee}
dp := newDealPublisher(full, publishMsgCfg, publishSpec)
lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
dp.Shutdown()
return nil
},
})
return dp
}
}
func newDealPublisher(
dpapi dealPublisherAPI,
publishMsgCfg PublishMsgConfig,
publishSpec *api.MessageSendSpec,
) *DealPublisher {
ctx, cancel := context.WithCancel(context.Background())
return &DealPublisher{
api: dpapi,
ctx: ctx,
Shutdown: cancel,
maxDealsPerPublishMsg: publishMsgCfg.MaxDealsPerMsg,
publishPeriod: publishMsgCfg.Period,
publishSpec: publishSpec,
}
}
func (p *DealPublisher) Publish(ctx context.Context, deal market2.ClientDealProposal) (cid.Cid, error) {
pdeal := newPendingDeal(ctx, deal)
// Add the deal to the queue
p.processNewDeal(pdeal)
// Wait for the deal to be submitted
select {
case <-ctx.Done():
return cid.Undef, ctx.Err()
case res := <-pdeal.Result:
return res.msgCid, res.err
}
}
func (p *DealPublisher) processNewDeal(pdeal *pendingDeal) {
p.lk.Lock()
defer p.lk.Unlock()
// Filter out any cancelled deals
p.filterCancelledDeals()
// If all deals have been cancelled, clear the wait-for-deals timer
if len(p.pending) == 0 && p.cancelWaitForMoreDeals != nil {
p.cancelWaitForMoreDeals()
p.cancelWaitForMoreDeals = nil
}
// Make sure the new deal hasn't been cancelled
if pdeal.ctx.Err() != nil {
return
}
// Add the new deal to the queue
p.pending = append(p.pending, pdeal)
log.Infof("add deal with piece CID %s to publish deals queue - %d deals in queue (max queue size %d)",
pdeal.deal.Proposal.PieceCID, len(p.pending), p.maxDealsPerPublishMsg)
// If the maximum number of deals per message has been reached,
// send a publish message
if uint64(len(p.pending)) >= p.maxDealsPerPublishMsg {
log.Infof("publish deals queue has reached max size of %d, publishing deals", p.maxDealsPerPublishMsg)
p.publishAllDeals()
return
}
// Otherwise wait for more deals to arrive or the timeout to be reached
p.waitForMoreDeals()
}
func (p *DealPublisher) waitForMoreDeals() {
// Check if we're already waiting for deals
if !p.publishPeriodStart.IsZero() {
elapsed := time.Since(p.publishPeriodStart)
log.Infof("%s elapsed of / %s until publish deals queue is published",
elapsed, p.publishPeriod)
return
}
// Set a timeout to wait for more deals to arrive
log.Infof("waiting publish deals queue period of %s before publishing", p.publishPeriod)
ctx, cancel := context.WithCancel(p.ctx)
p.publishPeriodStart = time.Now()
p.cancelWaitForMoreDeals = cancel
go func() {
timer := time.NewTimer(p.publishPeriod)
select {
case <-ctx.Done():
timer.Stop()
case <-timer.C:
p.lk.Lock()
defer p.lk.Unlock()
// The timeout has expired so publish all pending deals
log.Infof("publish deals queue period of %s has expired, publishing deals", p.publishPeriod)
p.publishAllDeals()
}
}()
}
func (p *DealPublisher) publishAllDeals() {
// If the timeout hasn't yet been cancelled, cancel it
if p.cancelWaitForMoreDeals != nil {
p.cancelWaitForMoreDeals()
p.cancelWaitForMoreDeals = nil
p.publishPeriodStart = time.Time{}
}
// Filter out any deals that have been cancelled
p.filterCancelledDeals()
deals := p.pending[:]
p.pending = nil
// Send the publish message
go p.publishReady(deals)
}
func (p *DealPublisher) publishReady(ready []*pendingDeal) {
if len(ready) == 0 {
return
}
// onComplete is called when the publish message has been sent or there
// was an error
onComplete := func(pd *pendingDeal, msgCid cid.Cid, err error) {
// Send the publish result on the pending deal's Result channel
res := publishResult{
msgCid: msgCid,
err: err,
}
select {
case <-p.ctx.Done():
case <-pd.ctx.Done():
case pd.Result <- res:
}
}
// Validate each deal to make sure it can be published
validated := make([]*pendingDeal, 0, len(ready))
deals := make([]market2.ClientDealProposal, 0, len(ready))
for _, pd := range ready {
// Validate the deal
if err := p.validateDeal(pd.deal); err != nil {
// Validation failed, complete immediately with an error
go onComplete(pd, cid.Undef, err)
continue
}
validated = append(validated, pd)
deals = append(deals, pd.deal)
}
// Send the publish message
msgCid, err := p.publishDealProposals(deals)
// Signal that each deal has been published
for _, pd := range validated {
go onComplete(pd, msgCid, err)
}
}
// validateDeal checks that the deal proposal start epoch hasn't already
// elapsed
func (p *DealPublisher) validateDeal(deal market2.ClientDealProposal) error {
head, err := p.api.ChainHead(p.ctx)
if err != nil {
return err
}
if head.Height() > deal.Proposal.StartEpoch {
return xerrors.Errorf(
"cannot publish deal with piece CID %s: current epoch %d has passed deal proposal start epoch %d",
deal.Proposal.PieceCID, head.Height(), deal.Proposal.StartEpoch)
}
return nil
}
// Sends the publish message
func (p *DealPublisher) publishDealProposals(deals []market2.ClientDealProposal) (cid.Cid, error) {
if len(deals) == 0 {
return cid.Undef, nil
}
log.Infof("publishing %d deals in publish deals queue with piece CIDs: %s", len(deals), pieceCids(deals))
provider := deals[0].Proposal.Provider
for _, dl := range deals {
if dl.Proposal.Provider != provider {
msg := fmt.Sprintf("publishing %d deals failed: ", len(deals)) +
"not all deals are for same provider: " +
fmt.Sprintf("deal with piece CID %s is for provider %s ", deals[0].Proposal.PieceCID, deals[0].Proposal.Provider) +
fmt.Sprintf("but deal with piece CID %s is for provider %s", dl.Proposal.PieceCID, dl.Proposal.Provider)
return cid.Undef, xerrors.Errorf(msg)
}
}
mi, err := p.api.StateMinerInfo(p.ctx, provider, types.EmptyTSK)
if err != nil {
return cid.Undef, err
}
params, err := actors.SerializeParams(&market2.PublishStorageDealsParams{
Deals: deals,
})
if err != nil {
return cid.Undef, xerrors.Errorf("serializing PublishStorageDeals params failed: %w", err)
}
smsg, err := p.api.MpoolPushMessage(p.ctx, &types.Message{
To: market.Address,
From: mi.Worker,
Value: types.NewInt(0),
Method: market.Methods.PublishStorageDeals,
Params: params,
}, p.publishSpec)
if err != nil {
return cid.Undef, err
}
return smsg.Cid(), nil
}
func pieceCids(deals []market2.ClientDealProposal) string {
cids := make([]string, 0, len(deals))
for _, dl := range deals {
cids = append(cids, dl.Proposal.PieceCID.String())
}
return strings.Join(cids, ", ")
}
// filter out deals that have been cancelled
func (p *DealPublisher) filterCancelledDeals() {
i := 0
for _, pd := range p.pending {
if pd.ctx.Err() == nil {
p.pending[i] = pd
i++
}
}
p.pending = p.pending[:i]
}

View File

@ -0,0 +1,266 @@
package storageadapter
import (
"bytes"
"context"
"testing"
"time"
"github.com/filecoin-project/go-state-types/crypto"
market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
"github.com/ipfs/go-cid"
"github.com/stretchr/testify/require"
tutils "github.com/filecoin-project/specs-actors/v2/support/testing"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/types"
market0 "github.com/filecoin-project/specs-actors/actors/builtin/market"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api"
)
func TestDealPublisher(t *testing.T) {
testCases := []struct {
name string
publishPeriod time.Duration
maxDealsPerMsg uint64
dealCountWithinPublishPeriod int
ctxCancelledWithinPublishPeriod int
expiredDeals int
dealCountAfterPublishPeriod int
expectedDealsPerMsg []int
}{{
name: "publish one deal within publish period",
publishPeriod: 10 * time.Millisecond,
maxDealsPerMsg: 5,
dealCountWithinPublishPeriod: 1,
dealCountAfterPublishPeriod: 0,
expectedDealsPerMsg: []int{1},
}, {
name: "publish two deals within publish period",
publishPeriod: 10 * time.Millisecond,
maxDealsPerMsg: 5,
dealCountWithinPublishPeriod: 2,
dealCountAfterPublishPeriod: 0,
expectedDealsPerMsg: []int{2},
}, {
name: "publish one deal within publish period, and one after",
publishPeriod: 10 * time.Millisecond,
maxDealsPerMsg: 5,
dealCountWithinPublishPeriod: 1,
dealCountAfterPublishPeriod: 1,
expectedDealsPerMsg: []int{1, 1},
}, {
name: "publish deals that exceed max deals per message within publish period, and one after",
publishPeriod: 10 * time.Millisecond,
maxDealsPerMsg: 2,
dealCountWithinPublishPeriod: 3,
dealCountAfterPublishPeriod: 1,
expectedDealsPerMsg: []int{2, 1, 1},
}, {
name: "ignore deals with cancelled context",
publishPeriod: 10 * time.Millisecond,
maxDealsPerMsg: 5,
dealCountWithinPublishPeriod: 2,
ctxCancelledWithinPublishPeriod: 2,
dealCountAfterPublishPeriod: 1,
expectedDealsPerMsg: []int{2, 1},
}, {
name: "ignore expired deals",
publishPeriod: 10 * time.Millisecond,
maxDealsPerMsg: 5,
dealCountWithinPublishPeriod: 2,
expiredDeals: 2,
dealCountAfterPublishPeriod: 1,
expectedDealsPerMsg: []int{2, 1},
}, {
name: "zero config",
publishPeriod: 0,
maxDealsPerMsg: 0,
dealCountWithinPublishPeriod: 2,
ctxCancelledWithinPublishPeriod: 0,
dealCountAfterPublishPeriod: 2,
expectedDealsPerMsg: []int{1, 1, 1, 1},
}}
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
ctx := context.Background()
client := tutils.NewActorAddr(t, "client")
provider := tutils.NewActorAddr(t, "provider")
worker := tutils.NewActorAddr(t, "worker")
dpapi := newDPAPI(t, worker)
// Create a deal publisher
dp := newDealPublisher(dpapi, PublishMsgConfig{
Period: tc.publishPeriod,
MaxDealsPerMsg: tc.maxDealsPerMsg,
}, &api.MessageSendSpec{MaxFee: abi.NewTokenAmount(1)})
// Keep a record of the deals that were submitted to be published
var dealsToPublish []market.ClientDealProposal
publishDeal := func(ctxCancelled bool, expired bool) {
pctx := ctx
var cancel context.CancelFunc
if ctxCancelled {
pctx, cancel = context.WithCancel(ctx)
cancel()
}
startEpoch := abi.ChainEpoch(20)
if expired {
startEpoch = abi.ChainEpoch(5)
}
deal := market.ClientDealProposal{
Proposal: market0.DealProposal{
PieceCID: generateCids(1)[0],
Client: client,
Provider: provider,
StartEpoch: startEpoch,
EndEpoch: abi.ChainEpoch(120),
},
ClientSignature: crypto.Signature{
Type: crypto.SigTypeSecp256k1,
Data: []byte("signature data"),
},
}
if !ctxCancelled && !expired {
dealsToPublish = append(dealsToPublish, deal)
}
go func() {
_, err := dp.Publish(pctx, deal)
if ctxCancelled || expired {
require.Error(t, err)
} else {
require.NoError(t, err)
}
}()
}
// Publish deals within publish period
for i := 0; i < tc.dealCountWithinPublishPeriod; i++ {
publishDeal(false, false)
}
for i := 0; i < tc.ctxCancelledWithinPublishPeriod; i++ {
publishDeal(true, false)
}
for i := 0; i < tc.expiredDeals; i++ {
publishDeal(false, true)
}
// Wait until publish period has elapsed
time.Sleep(2 * tc.publishPeriod)
// Publish deals after publish period
for i := 0; i < tc.dealCountAfterPublishPeriod; i++ {
publishDeal(false, false)
}
// For each message that was expected to be sent
var publishedDeals []market.ClientDealProposal
for _, expectedDealsInMsg := range tc.expectedDealsPerMsg {
// Should have called StateMinerInfo with the provider address
stateMinerInfoAddr := <-dpapi.stateMinerInfoCalls
require.Equal(t, provider, stateMinerInfoAddr)
// Check the fields of the message that was sent
msg := <-dpapi.pushedMsgs
require.Equal(t, worker, msg.From)
require.Equal(t, market.Address, msg.To)
require.Equal(t, market.Methods.PublishStorageDeals, msg.Method)
// Check that the expected number of deals was included in the message
var params market2.PublishStorageDealsParams
err := params.UnmarshalCBOR(bytes.NewReader(msg.Params))
require.NoError(t, err)
require.Len(t, params.Deals, expectedDealsInMsg)
// Keep track of the deals that were sent
for _, d := range params.Deals {
publishedDeals = append(publishedDeals, d)
}
}
// Verify that all deals that were submitted to be published were
// sent out (we do this by ensuring all the piece CIDs are present)
require.True(t, matchPieceCids(publishedDeals, dealsToPublish))
})
}
}
func matchPieceCids(sent []market.ClientDealProposal, exp []market.ClientDealProposal) bool {
cidsA := dealPieceCids(sent)
cidsB := dealPieceCids(exp)
if len(cidsA) != len(cidsB) {
return false
}
s1 := cid.NewSet()
for _, c := range cidsA {
s1.Add(c)
}
for _, c := range cidsB {
if !s1.Has(c) {
return false
}
}
return true
}
func dealPieceCids(deals []market2.ClientDealProposal) []cid.Cid {
cids := make([]cid.Cid, 0, len(deals))
for _, dl := range deals {
cids = append(cids, dl.Proposal.PieceCID)
}
return cids
}
type dpAPI struct {
t *testing.T
worker address.Address
stateMinerInfoCalls chan address.Address
pushedMsgs chan *types.Message
}
func newDPAPI(t *testing.T, worker address.Address) *dpAPI {
return &dpAPI{
t: t,
worker: worker,
stateMinerInfoCalls: make(chan address.Address, 128),
pushedMsgs: make(chan *types.Message, 128),
}
}
func (d *dpAPI) ChainHead(ctx context.Context) (*types.TipSet, error) {
dummyCid, err := cid.Parse("bafkqaaa")
require.NoError(d.t, err)
return types.NewTipSet([]*types.BlockHeader{{
Miner: tutils.NewActorAddr(d.t, "miner"),
Height: abi.ChainEpoch(10),
ParentStateRoot: dummyCid,
Messages: dummyCid,
ParentMessageReceipts: dummyCid,
BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS},
BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS},
}})
}
func (d *dpAPI) StateMinerInfo(ctx context.Context, address address.Address, key types.TipSetKey) (miner.MinerInfo, error) {
d.stateMinerInfoCalls <- address
return miner.MinerInfo{Worker: d.worker}, nil
}
func (d *dpAPI) MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) {
d.pushedMsgs <- msg
return &types.SignedMessage{Message: *msg}, nil
}

View File

@ -1,102 +0,0 @@
package storageadapter
import (
"bytes"
"context"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/exitcode"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/types"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
)
type getCurrentDealInfoAPI interface {
StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error)
StateMarketStorageDeal(context.Context, abi.DealID, types.TipSetKey) (*api.MarketDeal, error)
StateSearchMsg(context.Context, cid.Cid) (*api.MsgLookup, error)
diffPreCommits(ctx context.Context, actor address.Address, pre, cur types.TipSetKey) (*miner.PreCommitChanges, error)
}
// GetCurrentDealInfo gets current information on a deal, and corrects the deal ID as needed
func GetCurrentDealInfo(ctx context.Context, ts *types.TipSet, api getCurrentDealInfoAPI, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid) (abi.DealID, *api.MarketDeal, types.TipSetKey, error) {
marketDeal, dealErr := api.StateMarketStorageDeal(ctx, dealID, ts.Key())
if dealErr == nil {
equal, err := checkDealEquality(ctx, ts, api, proposal, marketDeal.Proposal)
if err != nil {
return dealID, nil, types.EmptyTSK, err
}
if equal {
return dealID, marketDeal, types.EmptyTSK, nil
}
dealErr = xerrors.Errorf("Deal proposals did not match")
}
if publishCid == nil {
return dealID, nil, types.EmptyTSK, dealErr
}
// attempt deal id correction
lookup, err := api.StateSearchMsg(ctx, *publishCid)
if err != nil {
return dealID, nil, types.EmptyTSK, err
}
if lookup.Receipt.ExitCode != exitcode.Ok {
return dealID, nil, types.EmptyTSK, xerrors.Errorf("looking for publish deal message %s: non-ok exit code: %s", *publishCid, lookup.Receipt.ExitCode)
}
var retval market.PublishStorageDealsReturn
if err := retval.UnmarshalCBOR(bytes.NewReader(lookup.Receipt.Return)); err != nil {
return dealID, nil, types.EmptyTSK, xerrors.Errorf("looking for publish deal message: unmarshaling message return: %w", err)
}
if len(retval.IDs) != 1 {
// market currently only ever sends messages with 1 deal
return dealID, nil, types.EmptyTSK, xerrors.Errorf("can't recover dealIDs from publish deal message with more than 1 deal")
}
if retval.IDs[0] == dealID {
// DealID did not change, so we are stuck with the original lookup error
return dealID, nil, lookup.TipSet, dealErr
}
dealID = retval.IDs[0]
marketDeal, err = api.StateMarketStorageDeal(ctx, dealID, ts.Key())
if err == nil {
equal, err := checkDealEquality(ctx, ts, api, proposal, marketDeal.Proposal)
if err != nil {
return dealID, nil, types.EmptyTSK, err
}
if !equal {
return dealID, nil, types.EmptyTSK, xerrors.Errorf("Deal proposals did not match")
}
}
return dealID, marketDeal, lookup.TipSet, err
}
func checkDealEquality(ctx context.Context, ts *types.TipSet, api getCurrentDealInfoAPI, p1, p2 market.DealProposal) (bool, error) {
p1ClientID, err := api.StateLookupID(ctx, p1.Client, ts.Key())
if err != nil {
return false, err
}
p2ClientID, err := api.StateLookupID(ctx, p2.Client, ts.Key())
if err != nil {
return false, err
}
return p1.PieceCID.Equals(p2.PieceCID) &&
p1.PieceSize == p2.PieceSize &&
p1.VerifiedDeal == p2.VerifiedDeal &&
p1.Label == p2.Label &&
p1.StartEpoch == p2.StartEpoch &&
p1.EndEpoch == p2.EndEpoch &&
p1.StoragePricePerEpoch.Equals(p2.StoragePricePerEpoch) &&
p1.ProviderCollateral.Equals(p2.ProviderCollateral) &&
p1.ClientCollateral.Equals(p2.ClientCollateral) &&
p1.Provider == p2.Provider &&
p1ClientID == p2ClientID, nil
}

View File

@ -1,268 +0,0 @@
package storageadapter
import (
"bytes"
"errors"
"math/rand"
"testing"
"time"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/exitcode"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
test "github.com/filecoin-project/lotus/chain/events/state/mock"
"github.com/filecoin-project/lotus/chain/types"
"github.com/ipfs/go-cid"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"
"golang.org/x/xerrors"
)
var errNotFound = errors.New("Could not find")
func TestGetCurrentDealInfo(t *testing.T) {
ctx := context.Background()
dummyCid, _ := cid.Parse("bafkqaaa")
startDealID := abi.DealID(rand.Uint64())
newDealID := abi.DealID(rand.Uint64())
twoValuesReturn := makePublishDealsReturnBytes(t, []abi.DealID{abi.DealID(rand.Uint64()), abi.DealID(rand.Uint64())})
sameValueReturn := makePublishDealsReturnBytes(t, []abi.DealID{startDealID})
newValueReturn := makePublishDealsReturnBytes(t, []abi.DealID{newDealID})
proposal := market.DealProposal{
PieceCID: dummyCid,
PieceSize: abi.PaddedPieceSize(rand.Uint64()),
Label: "success",
}
otherProposal := market.DealProposal{
PieceCID: dummyCid,
PieceSize: abi.PaddedPieceSize(rand.Uint64()),
Label: "other",
}
successDeal := &api.MarketDeal{
Proposal: proposal,
State: market.DealState{
SectorStartEpoch: 1,
LastUpdatedEpoch: 2,
},
}
otherDeal := &api.MarketDeal{
Proposal: otherProposal,
State: market.DealState{
SectorStartEpoch: 1,
LastUpdatedEpoch: 2,
},
}
testCases := map[string]struct {
searchMessageLookup *api.MsgLookup
searchMessageErr error
marketDeals map[abi.DealID]*api.MarketDeal
publishCid *cid.Cid
expectedDealID abi.DealID
expectedMarketDeal *api.MarketDeal
expectedError error
}{
"deal lookup succeeds": {
marketDeals: map[abi.DealID]*api.MarketDeal{
startDealID: successDeal,
},
expectedDealID: startDealID,
expectedMarketDeal: successDeal,
},
"publish CID = nil": {
expectedDealID: startDealID,
expectedError: errNotFound,
},
"publish CID = nil, other deal on lookup": {
marketDeals: map[abi.DealID]*api.MarketDeal{
startDealID: otherDeal,
},
expectedDealID: startDealID,
expectedError: xerrors.Errorf("Deal proposals did not match"),
},
"search message fails": {
publishCid: &dummyCid,
searchMessageErr: errors.New("something went wrong"),
expectedDealID: startDealID,
expectedError: errors.New("something went wrong"),
},
"return code not ok": {
publishCid: &dummyCid,
searchMessageLookup: &api.MsgLookup{
Receipt: types.MessageReceipt{
ExitCode: exitcode.ErrIllegalState,
},
},
expectedDealID: startDealID,
expectedError: xerrors.Errorf("looking for publish deal message %s: non-ok exit code: %s", dummyCid, exitcode.ErrIllegalState),
},
"unable to unmarshal params": {
publishCid: &dummyCid,
searchMessageLookup: &api.MsgLookup{
Receipt: types.MessageReceipt{
ExitCode: exitcode.Ok,
Return: []byte("applesauce"),
},
},
expectedDealID: startDealID,
expectedError: xerrors.Errorf("looking for publish deal message: unmarshaling message return: cbor input should be of type array"),
},
"more than one returned id": {
publishCid: &dummyCid,
searchMessageLookup: &api.MsgLookup{
Receipt: types.MessageReceipt{
ExitCode: exitcode.Ok,
Return: twoValuesReturn,
},
},
expectedDealID: startDealID,
expectedError: xerrors.Errorf("can't recover dealIDs from publish deal message with more than 1 deal"),
},
"deal ids still match": {
publishCid: &dummyCid,
searchMessageLookup: &api.MsgLookup{
Receipt: types.MessageReceipt{
ExitCode: exitcode.Ok,
Return: sameValueReturn,
},
},
expectedDealID: startDealID,
expectedError: errNotFound,
},
"new deal id success": {
publishCid: &dummyCid,
searchMessageLookup: &api.MsgLookup{
Receipt: types.MessageReceipt{
ExitCode: exitcode.Ok,
Return: newValueReturn,
},
},
marketDeals: map[abi.DealID]*api.MarketDeal{
newDealID: successDeal,
},
expectedDealID: newDealID,
expectedMarketDeal: successDeal,
},
"new deal id after other deal found": {
publishCid: &dummyCid,
searchMessageLookup: &api.MsgLookup{
Receipt: types.MessageReceipt{
ExitCode: exitcode.Ok,
Return: newValueReturn,
},
},
marketDeals: map[abi.DealID]*api.MarketDeal{
startDealID: otherDeal,
newDealID: successDeal,
},
expectedDealID: newDealID,
expectedMarketDeal: successDeal,
},
"new deal id failure": {
publishCid: &dummyCid,
searchMessageLookup: &api.MsgLookup{
Receipt: types.MessageReceipt{
ExitCode: exitcode.Ok,
Return: newValueReturn,
},
},
expectedDealID: newDealID,
expectedError: errNotFound,
},
"new deal id, failure due to other deal present": {
publishCid: &dummyCid,
searchMessageLookup: &api.MsgLookup{
Receipt: types.MessageReceipt{
ExitCode: exitcode.Ok,
Return: newValueReturn,
},
},
marketDeals: map[abi.DealID]*api.MarketDeal{
newDealID: otherDeal,
},
expectedDealID: newDealID,
expectedError: xerrors.Errorf("Deal proposals did not match"),
},
}
runTestCase := func(testCase string, data struct {
searchMessageLookup *api.MsgLookup
searchMessageErr error
marketDeals map[abi.DealID]*api.MarketDeal
publishCid *cid.Cid
expectedDealID abi.DealID
expectedMarketDeal *api.MarketDeal
expectedError error
}) {
t.Run(testCase, func(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
ts, err := test.MockTipset(address.TestAddress, rand.Uint64())
require.NoError(t, err)
marketDeals := make(map[marketDealKey]*api.MarketDeal)
for dealID, deal := range data.marketDeals {
marketDeals[marketDealKey{dealID, ts.Key()}] = deal
}
api := &mockGetCurrentDealInfoAPI{
SearchMessageLookup: data.searchMessageLookup,
SearchMessageErr: data.searchMessageErr,
MarketDeals: marketDeals,
}
dealID, marketDeal, _, err := GetCurrentDealInfo(ctx, ts, api, startDealID, proposal, data.publishCid)
require.Equal(t, data.expectedDealID, dealID)
require.Equal(t, data.expectedMarketDeal, marketDeal)
if data.expectedError == nil {
require.NoError(t, err)
} else {
require.EqualError(t, err, data.expectedError.Error())
}
})
}
for testCase, data := range testCases {
runTestCase(testCase, data)
}
}
type marketDealKey struct {
abi.DealID
types.TipSetKey
}
type mockGetCurrentDealInfoAPI struct {
SearchMessageLookup *api.MsgLookup
SearchMessageErr error
MarketDeals map[marketDealKey]*api.MarketDeal
}
func (mapi *mockGetCurrentDealInfoAPI) diffPreCommits(ctx context.Context, actor address.Address, pre, cur types.TipSetKey) (*miner.PreCommitChanges, error) {
return &miner.PreCommitChanges{}, nil
}
func (mapi *mockGetCurrentDealInfoAPI) StateMarketStorageDeal(ctx context.Context, dealID abi.DealID, ts types.TipSetKey) (*api.MarketDeal, error) {
deal, ok := mapi.MarketDeals[marketDealKey{dealID, ts}]
if !ok {
return nil, errNotFound
}
return deal, nil
}
func (mapi *mockGetCurrentDealInfoAPI) StateSearchMsg(context.Context, cid.Cid) (*api.MsgLookup, error) {
return mapi.SearchMessageLookup, mapi.SearchMessageErr
}
func (mapi *mockGetCurrentDealInfoAPI) StateLookupID(ctx context.Context, addr address.Address, ts types.TipSetKey) (address.Address, error) {
return addr, nil
}
func makePublishDealsReturnBytes(t *testing.T, dealIDs []abi.DealID) []byte {
buf := new(bytes.Buffer)
dealsReturn := market.PublishStorageDealsReturn{
IDs: dealIDs,
}
err := dealsReturn.MarshalCBOR(buf)
require.NoError(t, err)
return buf.Bytes()
}

View File

@ -5,6 +5,7 @@ import (
"context"
"sync"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
@ -19,11 +20,40 @@ import (
"github.com/filecoin-project/lotus/chain/types"
)
type sectorCommittedEventsAPI interface {
type eventsCalledAPI interface {
Called(check events.CheckFunc, msgHnd events.MsgHandler, rev events.RevertHandler, confidence int, timeout abi.ChainEpoch, mf events.MsgMatchFunc) error
}
func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, eventsApi sectorCommittedEventsAPI, provider address.Address, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid, callback storagemarket.DealSectorPreCommittedCallback) error {
type dealInfoAPI interface {
GetCurrentDealInfo(ctx context.Context, tok sealing.TipSetToken, proposal *market.DealProposal, publishCid cid.Cid) (sealing.CurrentDealInfo, error)
}
type diffPreCommitsAPI interface {
diffPreCommits(ctx context.Context, actor address.Address, pre, cur types.TipSetKey) (*miner.PreCommitChanges, error)
}
type SectorCommittedManager struct {
ev eventsCalledAPI
dealInfo dealInfoAPI
dpc diffPreCommitsAPI
}
func NewSectorCommittedManager(ev eventsCalledAPI, tskAPI sealing.CurrentDealInfoTskAPI, dpcAPI diffPreCommitsAPI) *SectorCommittedManager {
dim := &sealing.CurrentDealInfoManager{
CDAPI: &sealing.CurrentDealInfoAPIAdapter{CurrentDealInfoTskAPI: tskAPI},
}
return newSectorCommittedManager(ev, dim, dpcAPI)
}
func newSectorCommittedManager(ev eventsCalledAPI, dealInfo dealInfoAPI, dpcAPI diffPreCommitsAPI) *SectorCommittedManager {
return &SectorCommittedManager{
ev: ev,
dealInfo: dealInfo,
dpc: dpcAPI,
}
}
func (mgr *SectorCommittedManager) OnDealSectorPreCommitted(ctx context.Context, provider address.Address, proposal market.DealProposal, publishCid cid.Cid, callback storagemarket.DealSectorPreCommittedCallback) error {
// Ensure callback is only called once
var once sync.Once
cb := func(sectorNumber abi.SectorNumber, isActive bool, err error) {
@ -34,7 +64,7 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev
// First check if the deal is already active, and if so, bail out
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
di, isActive, publishTs, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid)
dealInfo, isActive, err := mgr.checkIfDealAlreadyActive(ctx, ts, &proposal, publishCid)
if err != nil {
// Note: the error returned from here will end up being returned
// from OnDealSectorPreCommitted so no need to call the callback
@ -54,24 +84,19 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev
// when the client node was down after the deal was published, and when
// the precommit containing it landed on chain)
if publishTs == types.EmptyTSK {
lookup, err := api.StateSearchMsg(ctx, *publishCid)
publishTs, err := types.TipSetKeyFromBytes(dealInfo.PublishMsgTipSet)
if err != nil {
return false, false, err
}
if lookup != nil { // can be nil in tests
publishTs = lookup.TipSet
}
}
diff, err := api.diffPreCommits(ctx, provider, publishTs, ts.Key())
diff, err := mgr.dpc.diffPreCommits(ctx, provider, publishTs, ts.Key())
if err != nil {
return false, false, err
}
for _, info := range diff.Added {
for _, d := range info.Info.DealIDs {
if d == di {
if d == dealInfo.DealID {
cb(info.Info.SectorNumber, false, nil)
return true, false, nil
}
@ -103,7 +128,7 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev
// If the deal hasn't been activated by the proposed start epoch, the
// deal will timeout (when msg == nil it means the timeout epoch was reached)
if msg == nil {
err = xerrors.Errorf("deal %d was not activated by proposed deal start epoch %d", dealID, proposal.StartEpoch)
err = xerrors.Errorf("deal with piece CID %s was not activated by proposed deal start epoch %d", proposal.PieceCID, proposal.StartEpoch)
return false, err
}
@ -118,16 +143,16 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev
return false, xerrors.Errorf("unmarshal pre commit: %w", err)
}
// When the deal is published, the deal ID may change, so get the
// When there is a reorg, the deal ID may change, so get the
// current deal ID from the publish message CID
dealID, _, _, err = GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid)
res, err := mgr.dealInfo.GetCurrentDealInfo(ctx, ts.Key().Bytes(), &proposal, publishCid)
if err != nil {
return false, err
}
// Check through the deal IDs associated with this message
for _, did := range params.DealIDs {
if did == dealID {
if did == res.DealID {
// Found the deal ID in this message. Callback with the sector ID.
cb(params.SectorNumber, false, nil)
return false, nil
@ -144,14 +169,14 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev
return nil
}
if err := eventsApi.Called(checkFunc, called, revert, int(build.MessageConfidence+1), timeoutEpoch, matchEvent); err != nil {
if err := mgr.ev.Called(checkFunc, called, revert, int(build.MessageConfidence+1), timeoutEpoch, matchEvent); err != nil {
return xerrors.Errorf("failed to set up called handler: %w", err)
}
return nil
}
func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, eventsApi sectorCommittedEventsAPI, provider address.Address, dealID abi.DealID, sectorNumber abi.SectorNumber, proposal market.DealProposal, publishCid *cid.Cid, callback storagemarket.DealSectorCommittedCallback) error {
func (mgr *SectorCommittedManager) OnDealSectorCommitted(ctx context.Context, provider address.Address, sectorNumber abi.SectorNumber, proposal market.DealProposal, publishCid cid.Cid, callback storagemarket.DealSectorCommittedCallback) error {
// Ensure callback is only called once
var once sync.Once
cb := func(err error) {
@ -162,7 +187,7 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event
// First check if the deal is already active, and if so, bail out
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
_, isActive, _, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid)
_, isActive, err := mgr.checkIfDealAlreadyActive(ctx, ts, &proposal, publishCid)
if err != nil {
// Note: the error returned from here will end up being returned
// from OnDealSectorCommitted so no need to call the callback
@ -208,7 +233,7 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event
// If the deal hasn't been activated by the proposed start epoch, the
// deal will timeout (when msg == nil it means the timeout epoch was reached)
if msg == nil {
err := xerrors.Errorf("deal %d was not activated by proposed deal start epoch %d", dealID, proposal.StartEpoch)
err := xerrors.Errorf("deal with piece CID %s was not activated by proposed deal start epoch %d", proposal.PieceCID, proposal.StartEpoch)
return false, err
}
@ -218,17 +243,17 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event
}
// Get the deal info
_, sd, _, err := GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid)
res, err := mgr.dealInfo.GetCurrentDealInfo(ctx, ts.Key().Bytes(), &proposal, publishCid)
if err != nil {
return false, xerrors.Errorf("failed to look up deal on chain: %w", err)
}
// Make sure the deal is active
if sd.State.SectorStartEpoch < 1 {
return false, xerrors.Errorf("deal wasn't active: deal=%d, parentState=%s, h=%d", dealID, ts.ParentState(), ts.Height())
if res.MarketDeal.State.SectorStartEpoch < 1 {
return false, xerrors.Errorf("deal wasn't active: deal=%d, parentState=%s, h=%d", res.DealID, ts.ParentState(), ts.Height())
}
log.Infof("Storage deal %d activated at epoch %d", dealID, sd.State.SectorStartEpoch)
log.Infof("Storage deal %d activated at epoch %d", res.DealID, res.MarketDeal.State.SectorStartEpoch)
cb(nil)
@ -241,29 +266,29 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event
return nil
}
if err := eventsApi.Called(checkFunc, called, revert, int(build.MessageConfidence+1), timeoutEpoch, matchEvent); err != nil {
if err := mgr.ev.Called(checkFunc, called, revert, int(build.MessageConfidence+1), timeoutEpoch, matchEvent); err != nil {
return xerrors.Errorf("failed to set up called handler: %w", err)
}
return nil
}
func checkIfDealAlreadyActive(ctx context.Context, api getCurrentDealInfoAPI, ts *types.TipSet, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid) (abi.DealID, bool, types.TipSetKey, error) {
di, sd, publishTs, err := GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid)
func (mgr *SectorCommittedManager) checkIfDealAlreadyActive(ctx context.Context, ts *types.TipSet, proposal *market.DealProposal, publishCid cid.Cid) (sealing.CurrentDealInfo, bool, error) {
res, err := mgr.dealInfo.GetCurrentDealInfo(ctx, ts.Key().Bytes(), proposal, publishCid)
if err != nil {
// TODO: This may be fine for some errors
return 0, false, types.EmptyTSK, xerrors.Errorf("failed to look up deal on chain: %w", err)
}
// Sector with deal is already active
if sd.State.SectorStartEpoch > 0 {
return 0, true, publishTs, nil
return res, false, xerrors.Errorf("failed to look up deal on chain: %w", err)
}
// Sector was slashed
if sd.State.SlashEpoch > 0 {
return 0, false, types.EmptyTSK, xerrors.Errorf("deal %d was slashed at epoch %d", dealID, sd.State.SlashEpoch)
if res.MarketDeal.State.SlashEpoch > 0 {
return res, false, xerrors.Errorf("deal %d was slashed at epoch %d", res.DealID, res.MarketDeal.State.SlashEpoch)
}
return di, false, publishTs, nil
// Sector with deal is already active
if res.MarketDeal.State.SectorStartEpoch > 0 {
return res, true, nil
}
return res, false, nil
}

View File

@ -7,6 +7,9 @@ import (
"fmt"
"math/rand"
"testing"
"time"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"golang.org/x/xerrors"
@ -15,13 +18,13 @@ import (
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/cbor"
"github.com/filecoin-project/go-state-types/exitcode"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/events"
test "github.com/filecoin-project/lotus/chain/events/state/mock"
"github.com/filecoin-project/lotus/chain/types"
tutils "github.com/filecoin-project/specs-actors/v2/support/testing"
"github.com/ipfs/go-cid"
"github.com/stretchr/testify/require"
)
@ -32,13 +35,16 @@ func TestOnDealSectorPreCommitted(t *testing.T) {
publishCid := generateCids(1)[0]
sealedCid := generateCids(1)[0]
pieceCid := generateCids(1)[0]
startDealID := abi.DealID(rand.Uint64())
newDealID := abi.DealID(rand.Uint64())
newValueReturn := makePublishDealsReturnBytes(t, []abi.DealID{newDealID})
dealID := abi.DealID(rand.Uint64())
sectorNumber := abi.SectorNumber(rand.Uint64())
proposal := market.DealProposal{
PieceCID: pieceCid,
PieceSize: abi.PaddedPieceSize(rand.Uint64()),
Client: tutils.NewActorAddr(t, "client"),
Provider: tutils.NewActorAddr(t, "provider"),
StoragePricePerEpoch: abi.NewTokenAmount(1),
ProviderCollateral: abi.NewTokenAmount(1),
ClientCollateral: abi.NewTokenAmount(1),
Label: "success",
}
unfinishedDeal := &api.MarketDeal{
@ -48,17 +54,26 @@ func TestOnDealSectorPreCommitted(t *testing.T) {
LastUpdatedEpoch: 2,
},
}
successDeal := &api.MarketDeal{
activeDeal := &api.MarketDeal{
Proposal: proposal,
State: market.DealState{
SectorStartEpoch: 1,
LastUpdatedEpoch: 2,
},
}
slashedDeal := &api.MarketDeal{
Proposal: proposal,
State: market.DealState{
SectorStartEpoch: 1,
LastUpdatedEpoch: 2,
SlashEpoch: 2,
},
}
type testCase struct {
searchMessageLookup *api.MsgLookup
searchMessageErr error
checkTsDeals map[abi.DealID]*api.MarketDeal
currentDealInfo sealing.CurrentDealInfo
currentDealInfoErr error
currentDealInfoErr2 error
preCommitDiff *miner.PreCommitChanges
matchStates []matchState
dealStartEpochTimeout bool
expectedCBCallCount uint64
@ -69,45 +84,17 @@ func TestOnDealSectorPreCommitted(t *testing.T) {
}
testCases := map[string]testCase{
"normal sequence": {
checkTsDeals: map[abi.DealID]*api.MarketDeal{
startDealID: unfinishedDeal,
currentDealInfo: sealing.CurrentDealInfo{
DealID: dealID,
MarketDeal: unfinishedDeal,
},
matchStates: []matchState{
{
msg: makeMessage(t, provider, miner.Methods.PreCommitSector, &miner.SectorPreCommitInfo{
SectorNumber: sectorNumber,
SealedCID: sealedCid,
DealIDs: []abi.DealID{startDealID},
DealIDs: []abi.DealID{dealID},
}),
deals: map[abi.DealID]*api.MarketDeal{
startDealID: unfinishedDeal,
},
},
},
expectedCBCallCount: 1,
expectedCBIsActive: false,
expectedCBSectorNumber: sectorNumber,
},
"deal id changes in called": {
searchMessageLookup: &api.MsgLookup{
Receipt: types.MessageReceipt{
ExitCode: exitcode.Ok,
Return: newValueReturn,
},
},
checkTsDeals: map[abi.DealID]*api.MarketDeal{
newDealID: unfinishedDeal,
},
matchStates: []matchState{
{
msg: makeMessage(t, provider, miner.Methods.PreCommitSector, &miner.SectorPreCommitInfo{
SectorNumber: sectorNumber,
SealedCID: sealedCid,
DealIDs: []abi.DealID{newDealID},
}),
deals: map[abi.DealID]*api.MarketDeal{
newDealID: unfinishedDeal,
},
},
},
expectedCBCallCount: 1,
@ -115,85 +102,98 @@ func TestOnDealSectorPreCommitted(t *testing.T) {
expectedCBSectorNumber: sectorNumber,
},
"ignores unsuccessful pre-commit message": {
checkTsDeals: map[abi.DealID]*api.MarketDeal{
startDealID: unfinishedDeal,
currentDealInfo: sealing.CurrentDealInfo{
DealID: dealID,
MarketDeal: unfinishedDeal,
},
matchStates: []matchState{
{
msg: makeMessage(t, provider, miner.Methods.PreCommitSector, &miner.SectorPreCommitInfo{
SectorNumber: sectorNumber,
SealedCID: sealedCid,
DealIDs: []abi.DealID{startDealID},
DealIDs: []abi.DealID{dealID},
}),
deals: map[abi.DealID]*api.MarketDeal{
startDealID: unfinishedDeal,
},
// non-zero exit code indicates unsuccessful pre-commit message
receipt: &types.MessageReceipt{ExitCode: 1},
},
},
expectedCBCallCount: 0,
},
"error on deal in check": {
checkTsDeals: map[abi.DealID]*api.MarketDeal{},
searchMessageErr: errors.New("something went wrong"),
expectedCBCallCount: 0,
expectedError: errors.New("failed to set up called handler: failed to look up deal on chain: something went wrong"),
"deal already pre-committed": {
currentDealInfo: sealing.CurrentDealInfo{
DealID: dealID,
MarketDeal: unfinishedDeal,
},
"sector start epoch > 0 in check": {
checkTsDeals: map[abi.DealID]*api.MarketDeal{
startDealID: successDeal,
preCommitDiff: &miner.PreCommitChanges{
Added: []miner.SectorPreCommitOnChainInfo{{
Info: miner.SectorPreCommitInfo{
SectorNumber: sectorNumber,
DealIDs: []abi.DealID{dealID},
},
}},
},
expectedCBCallCount: 1,
expectedCBIsActive: false,
expectedCBSectorNumber: sectorNumber,
},
"error getting current deal info in check func": {
currentDealInfoErr: errors.New("something went wrong"),
expectedCBCallCount: 0,
expectedError: xerrors.Errorf("failed to set up called handler: failed to look up deal on chain: something went wrong"),
},
"sector already active": {
currentDealInfo: sealing.CurrentDealInfo{
DealID: dealID,
MarketDeal: activeDeal,
},
expectedCBCallCount: 1,
expectedCBIsActive: true,
},
"error on deal in pre-commit": {
searchMessageErr: errors.New("something went wrong"),
checkTsDeals: map[abi.DealID]*api.MarketDeal{
startDealID: unfinishedDeal,
"sector was slashed": {
currentDealInfo: sealing.CurrentDealInfo{
DealID: dealID,
MarketDeal: slashedDeal,
PublishMsgTipSet: nil,
},
expectedCBCallCount: 0,
expectedError: xerrors.Errorf("failed to set up called handler: deal %d was slashed at epoch %d", dealID, slashedDeal.State.SlashEpoch),
},
"error getting current deal info in called func": {
currentDealInfo: sealing.CurrentDealInfo{
DealID: dealID,
MarketDeal: unfinishedDeal,
},
currentDealInfoErr2: errors.New("something went wrong"),
matchStates: []matchState{
{
msg: makeMessage(t, provider, miner.Methods.PreCommitSector, &miner.SectorPreCommitInfo{
SectorNumber: sectorNumber,
SealedCID: sealedCid,
DealIDs: []abi.DealID{startDealID},
DealIDs: []abi.DealID{dealID},
}),
deals: map[abi.DealID]*api.MarketDeal{},
},
},
expectedCBCallCount: 0,
expectedError: errors.New("failed to set up called handler: something went wrong"),
expectedCBCallCount: 1,
expectedCBError: errors.New("handling applied event: something went wrong"),
},
"proposed deal epoch timeout": {
checkTsDeals: map[abi.DealID]*api.MarketDeal{
startDealID: unfinishedDeal,
currentDealInfo: sealing.CurrentDealInfo{
DealID: dealID,
MarketDeal: activeDeal,
},
dealStartEpochTimeout: true,
expectedCBCallCount: 1,
expectedCBError: xerrors.Errorf("handling applied event: deal %d was not activated by proposed deal start epoch 0", startDealID),
expectedCBError: xerrors.Errorf("handling applied event: deal with piece CID %s was not activated by proposed deal start epoch 0", unfinishedDeal.Proposal.PieceCID),
},
}
runTestCase := func(testCase string, data testCase) {
t.Run(testCase, func(t *testing.T) {
// ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
// defer cancel()
api := &mockGetCurrentDealInfoAPI{
SearchMessageLookup: data.searchMessageLookup,
SearchMessageErr: data.searchMessageErr,
MarketDeals: make(map[marketDealKey]*api.MarketDeal),
}
checkTs, err := test.MockTipset(provider, rand.Uint64())
require.NoError(t, err)
for dealID, deal := range data.checkTsDeals {
api.MarketDeals[marketDealKey{dealID, checkTs.Key()}] = deal
}
matchMessages := make([]matchMessage, len(data.matchStates))
for i, ms := range data.matchStates {
matchTs, err := test.MockTipset(provider, rand.Uint64())
require.NoError(t, err)
for dealID, deal := range ms.deals {
api.MarketDeals[marketDealKey{dealID, matchTs.Key()}] = deal
}
matchMessages[i] = matchMessage{
curH: 5,
msg: ms.msg,
@ -217,7 +217,18 @@ func TestOnDealSectorPreCommitted(t *testing.T) {
cbIsActive = isActive
cbError = err
}
err = OnDealSectorPreCommitted(ctx, api, eventsAPI, provider, startDealID, proposal, &publishCid, cb)
mockPCAPI := &mockPreCommitsAPI{
PCChanges: data.preCommitDiff,
}
mockDIAPI := &mockDealInfoAPI{
CurrentDealInfo: data.currentDealInfo,
CurrentDealInfo2: data.currentDealInfo,
Err: data.currentDealInfoErr,
Err2: data.currentDealInfoErr2,
}
scm := newSectorCommittedManager(eventsAPI, mockDIAPI, mockPCAPI)
err = scm.OnDealSectorPreCommitted(ctx, provider, proposal, publishCid, cb)
if data.expectedError == nil {
require.NoError(t, err)
} else {
@ -240,16 +251,18 @@ func TestOnDealSectorPreCommitted(t *testing.T) {
func TestOnDealSectorCommitted(t *testing.T) {
provider := address.TestAddress
ctx := context.Background()
publishCid := generateCids(1)[0]
pieceCid := generateCids(1)[0]
startDealID := abi.DealID(rand.Uint64())
newDealID := abi.DealID(rand.Uint64())
newValueReturn := makePublishDealsReturnBytes(t, []abi.DealID{newDealID})
dealID := abi.DealID(rand.Uint64())
sectorNumber := abi.SectorNumber(rand.Uint64())
proposal := market.DealProposal{
PieceCID: pieceCid,
PieceSize: abi.PaddedPieceSize(rand.Uint64()),
Client: tutils.NewActorAddr(t, "client"),
Provider: tutils.NewActorAddr(t, "provider"),
StoragePricePerEpoch: abi.NewTokenAmount(1),
ProviderCollateral: abi.NewTokenAmount(1),
ClientCollateral: abi.NewTokenAmount(1),
Label: "success",
}
unfinishedDeal := &api.MarketDeal{
@ -259,17 +272,26 @@ func TestOnDealSectorCommitted(t *testing.T) {
LastUpdatedEpoch: 2,
},
}
successDeal := &api.MarketDeal{
activeDeal := &api.MarketDeal{
Proposal: proposal,
State: market.DealState{
SectorStartEpoch: 1,
LastUpdatedEpoch: 2,
},
}
slashedDeal := &api.MarketDeal{
Proposal: proposal,
State: market.DealState{
SectorStartEpoch: 1,
LastUpdatedEpoch: 2,
SlashEpoch: 2,
},
}
type testCase struct {
searchMessageLookup *api.MsgLookup
searchMessageErr error
checkTsDeals map[abi.DealID]*api.MarketDeal
currentDealInfo sealing.CurrentDealInfo
currentDealInfoErr error
currentDealInfo2 sealing.CurrentDealInfo
currentDealInfoErr2 error
matchStates []matchState
dealStartEpochTimeout bool
expectedCBCallCount uint64
@ -278,121 +300,118 @@ func TestOnDealSectorCommitted(t *testing.T) {
}
testCases := map[string]testCase{
"normal sequence": {
checkTsDeals: map[abi.DealID]*api.MarketDeal{
startDealID: unfinishedDeal,
currentDealInfo: sealing.CurrentDealInfo{
DealID: dealID,
MarketDeal: unfinishedDeal,
},
currentDealInfo2: sealing.CurrentDealInfo{
DealID: dealID,
MarketDeal: activeDeal,
},
matchStates: []matchState{
{
msg: makeMessage(t, provider, miner.Methods.ProveCommitSector, &miner.ProveCommitSectorParams{
SectorNumber: sectorNumber,
}),
deals: map[abi.DealID]*api.MarketDeal{
startDealID: successDeal,
},
},
},
expectedCBCallCount: 1,
},
"deal id changes in called": {
searchMessageLookup: &api.MsgLookup{
Receipt: types.MessageReceipt{
ExitCode: exitcode.Ok,
Return: newValueReturn,
},
},
checkTsDeals: map[abi.DealID]*api.MarketDeal{
newDealID: unfinishedDeal,
},
matchStates: []matchState{
{
msg: makeMessage(t, provider, miner.Methods.ProveCommitSector, &miner.ProveCommitSectorParams{
SectorNumber: sectorNumber,
}),
deals: map[abi.DealID]*api.MarketDeal{
newDealID: successDeal,
},
},
},
expectedCBCallCount: 1,
},
"ignores unsuccessful prove-commit message": {
checkTsDeals: map[abi.DealID]*api.MarketDeal{
startDealID: unfinishedDeal,
currentDealInfo: sealing.CurrentDealInfo{
DealID: dealID,
MarketDeal: unfinishedDeal,
},
currentDealInfo2: sealing.CurrentDealInfo{
DealID: dealID,
MarketDeal: activeDeal,
},
matchStates: []matchState{
{
msg: makeMessage(t, provider, miner.Methods.ProveCommitSector, &miner.ProveCommitSectorParams{
SectorNumber: sectorNumber,
}),
deals: map[abi.DealID]*api.MarketDeal{
startDealID: successDeal,
},
// Exit-code 1 means the prove-commit was unsuccessful
receipt: &types.MessageReceipt{ExitCode: 1},
},
},
expectedCBCallCount: 0,
},
"error on deal in check": {
checkTsDeals: map[abi.DealID]*api.MarketDeal{},
searchMessageErr: errors.New("something went wrong"),
"error getting current deal info in check func": {
currentDealInfoErr: errors.New("something went wrong"),
expectedCBCallCount: 0,
expectedError: errors.New("failed to set up called handler: failed to look up deal on chain: something went wrong"),
expectedError: xerrors.Errorf("failed to set up called handler: failed to look up deal on chain: something went wrong"),
},
"sector start epoch > 0 in check": {
checkTsDeals: map[abi.DealID]*api.MarketDeal{
startDealID: successDeal,
"sector already active": {
currentDealInfo: sealing.CurrentDealInfo{
DealID: dealID,
MarketDeal: activeDeal,
},
expectedCBCallCount: 1,
},
"error on deal in called": {
searchMessageErr: errors.New("something went wrong"),
checkTsDeals: map[abi.DealID]*api.MarketDeal{
startDealID: unfinishedDeal,
"sector was slashed": {
currentDealInfo: sealing.CurrentDealInfo{
DealID: dealID,
MarketDeal: slashedDeal,
},
expectedCBCallCount: 0,
expectedError: xerrors.Errorf("failed to set up called handler: deal %d was slashed at epoch %d", dealID, slashedDeal.State.SlashEpoch),
},
"error getting current deal info in called func": {
currentDealInfo: sealing.CurrentDealInfo{
DealID: dealID,
MarketDeal: unfinishedDeal,
},
currentDealInfoErr2: errors.New("something went wrong"),
matchStates: []matchState{
{
msg: makeMessage(t, provider, miner.Methods.ProveCommitSector, &miner.ProveCommitSectorParams{
SectorNumber: sectorNumber,
}),
},
},
expectedCBCallCount: 1,
expectedCBError: xerrors.Errorf("handling applied event: failed to look up deal on chain: something went wrong"),
},
"proposed deal epoch timeout": {
currentDealInfo: sealing.CurrentDealInfo{
DealID: dealID,
MarketDeal: unfinishedDeal,
},
dealStartEpochTimeout: true,
expectedCBCallCount: 1,
expectedCBError: xerrors.Errorf("handling applied event: deal with piece CID %s was not activated by proposed deal start epoch 0", unfinishedDeal.Proposal.PieceCID),
},
"got prove-commit but deal not active": {
currentDealInfo: sealing.CurrentDealInfo{
DealID: dealID,
MarketDeal: unfinishedDeal,
},
currentDealInfo2: sealing.CurrentDealInfo{
DealID: dealID,
MarketDeal: unfinishedDeal,
},
matchStates: []matchState{
{
msg: makeMessage(t, provider, miner.Methods.ProveCommitSector, &miner.ProveCommitSectorParams{
SectorNumber: sectorNumber,
}),
deals: map[abi.DealID]*api.MarketDeal{
newDealID: successDeal,
},
},
},
expectedCBCallCount: 1,
expectedCBError: errors.New("handling applied event: failed to look up deal on chain: something went wrong"),
expectedError: errors.New("failed to set up called handler: failed to look up deal on chain: something went wrong"),
},
"proposed deal epoch timeout": {
checkTsDeals: map[abi.DealID]*api.MarketDeal{
startDealID: unfinishedDeal,
},
dealStartEpochTimeout: true,
expectedCBCallCount: 1,
expectedCBError: xerrors.Errorf("handling applied event: deal %d was not activated by proposed deal start epoch 0", startDealID),
expectedCBError: xerrors.Errorf("handling applied event: deal wasn't active: deal=%d, parentState=bafkqaaa, h=5", dealID),
},
}
runTestCase := func(testCase string, data testCase) {
t.Run(testCase, func(t *testing.T) {
// ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
// defer cancel()
api := &mockGetCurrentDealInfoAPI{
SearchMessageLookup: data.searchMessageLookup,
SearchMessageErr: data.searchMessageErr,
MarketDeals: make(map[marketDealKey]*api.MarketDeal),
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
checkTs, err := test.MockTipset(provider, rand.Uint64())
require.NoError(t, err)
for dealID, deal := range data.checkTsDeals {
api.MarketDeals[marketDealKey{dealID, checkTs.Key()}] = deal
}
matchMessages := make([]matchMessage, len(data.matchStates))
for i, ms := range data.matchStates {
matchTs, err := test.MockTipset(provider, rand.Uint64())
require.NoError(t, err)
for dealID, deal := range ms.deals {
api.MarketDeals[marketDealKey{dealID, matchTs.Key()}] = deal
}
matchMessages[i] = matchMessage{
curH: 5,
msg: ms.msg,
@ -412,7 +431,15 @@ func TestOnDealSectorCommitted(t *testing.T) {
cbCallCount++
cbError = err
}
err = OnDealSectorCommitted(ctx, api, eventsAPI, provider, startDealID, sectorNumber, proposal, &publishCid, cb)
mockPCAPI := &mockPreCommitsAPI{}
mockDIAPI := &mockDealInfoAPI{
CurrentDealInfo: data.currentDealInfo,
CurrentDealInfo2: data.currentDealInfo2,
Err: data.currentDealInfoErr,
Err2: data.currentDealInfoErr2,
}
scm := newSectorCommittedManager(eventsAPI, mockDIAPI, mockPCAPI)
err = scm.OnDealSectorCommitted(ctx, provider, sectorNumber, proposal, publishCid, cb)
if data.expectedError == nil {
require.NoError(t, err)
} else {
@ -434,7 +461,6 @@ func TestOnDealSectorCommitted(t *testing.T) {
type matchState struct {
msg *types.Message
receipt *types.MessageReceipt
deals map[abi.DealID]*api.MarketDeal
}
type matchMessage struct {
@ -476,7 +502,8 @@ func (fe *fakeEvents) Called(check events.CheckFunc, msgHnd events.MsgHandler, r
}
more, err := msgHnd(matchMessage.msg, receipt, matchMessage.ts, matchMessage.curH)
if err != nil {
return err
// error is handled through a callback rather than being returned
return nil
}
if matchMessage.doesRevert {
err := rev(fe.Ctx, matchMessage.ts)
@ -514,3 +541,32 @@ func generateCids(n int) []cid.Cid {
}
return cids
}
type mockPreCommitsAPI struct {
PCChanges *miner.PreCommitChanges
Err error
}
func (m *mockPreCommitsAPI) diffPreCommits(ctx context.Context, actor address.Address, pre, cur types.TipSetKey) (*miner.PreCommitChanges, error) {
pcc := &miner.PreCommitChanges{}
if m.PCChanges != nil {
pcc = m.PCChanges
}
return pcc, m.Err
}
type mockDealInfoAPI struct {
count int
CurrentDealInfo sealing.CurrentDealInfo
Err error
CurrentDealInfo2 sealing.CurrentDealInfo
Err2 error
}
func (m *mockDealInfoAPI) GetCurrentDealInfo(ctx context.Context, tok sealing.TipSetToken, proposal *market.DealProposal, publishCid cid.Cid) (sealing.CurrentDealInfo, error) {
m.count++
if m.count == 2 {
return m.CurrentDealInfo2, m.Err2
}
return m.CurrentDealInfo, m.Err
}

View File

@ -9,20 +9,19 @@ import (
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"go.uber.org/fx"
"golang.org/x/xerrors"
market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/shared"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/go-state-types/exitcode"
market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/events"
@ -33,6 +32,7 @@ import (
"github.com/filecoin-project/lotus/markets/utils"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/helpers"
"github.com/filecoin-project/lotus/storage/sectorblocks"
)
@ -42,7 +42,6 @@ var log = logging.Logger("storageadapter")
type ProviderNodeAdapter struct {
api.FullNode
*apiWrapper
// this goes away with the data transfer module
dag dtypes.StagingDAG
@ -50,57 +49,38 @@ type ProviderNodeAdapter struct {
secb *sectorblocks.SectorBlocks
ev *events.Events
publishSpec, addBalanceSpec *api.MessageSendSpec
dealPublisher *DealPublisher
addBalanceSpec *api.MessageSendSpec
dsMatcher *dealStateMatcher
scMgr *SectorCommittedManager
}
func NewProviderNodeAdapter(fc *config.MinerFeeConfig) func(dag dtypes.StagingDAG, secb *sectorblocks.SectorBlocks, full api.FullNode) storagemarket.StorageProviderNode {
return func(dag dtypes.StagingDAG, secb *sectorblocks.SectorBlocks, full api.FullNode) storagemarket.StorageProviderNode {
func NewProviderNodeAdapter(fc *config.MinerFeeConfig) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, dag dtypes.StagingDAG, secb *sectorblocks.SectorBlocks, full api.FullNode, dealPublisher *DealPublisher) storagemarket.StorageProviderNode {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, dag dtypes.StagingDAG, secb *sectorblocks.SectorBlocks, full api.FullNode, dealPublisher *DealPublisher) storagemarket.StorageProviderNode {
ctx := helpers.LifecycleCtx(mctx, lc)
ev := events.NewEvents(ctx, full)
na := &ProviderNodeAdapter{
FullNode: full,
apiWrapper: &apiWrapper{api: full},
dag: dag,
secb: secb,
ev: events.NewEvents(context.TODO(), full),
ev: ev,
dealPublisher: dealPublisher,
dsMatcher: newDealStateMatcher(state.NewStatePredicates(state.WrapFastAPI(full))),
}
if fc != nil {
na.publishSpec = &api.MessageSendSpec{MaxFee: abi.TokenAmount(fc.MaxPublishDealsFee)}
na.addBalanceSpec = &api.MessageSendSpec{MaxFee: abi.TokenAmount(fc.MaxMarketBalanceAddFee)}
}
na.scMgr = NewSectorCommittedManager(ev, na, &apiWrapper{api: full})
return na
}
}
func (n *ProviderNodeAdapter) PublishDeals(ctx context.Context, deal storagemarket.MinerDeal) (cid.Cid, error) {
log.Info("publishing deal")
mi, err := n.StateMinerInfo(ctx, deal.Proposal.Provider, types.EmptyTSK)
if err != nil {
return cid.Undef, err
}
params, err := actors.SerializeParams(&market2.PublishStorageDealsParams{
Deals: []market2.ClientDealProposal{deal.ClientDealProposal},
})
if err != nil {
return cid.Undef, xerrors.Errorf("serializing PublishStorageDeals params failed: %w", err)
}
// TODO: We may want this to happen after fetching data
smsg, err := n.MpoolPushMessage(ctx, &types.Message{
To: market.Address,
From: mi.Worker,
Value: types.NewInt(0),
Method: market.Methods.PublishStorageDeals,
Params: params,
}, n.publishSpec)
if err != nil {
return cid.Undef, err
}
return smsg.Cid(), nil
return n.dealPublisher.Publish(ctx, deal.ClientDealProposal)
}
func (n *ProviderNodeAdapter) OnDealComplete(ctx context.Context, deal storagemarket.MinerDeal, pieceSize abi.UnpaddedPieceSize, pieceData io.Reader) (*storagemarket.PackingResult, error) {
@ -280,12 +260,14 @@ func (n *ProviderNodeAdapter) DealProviderCollateralBounds(ctx context.Context,
return bounds.Min, bounds.Max, nil
}
// TODO: Remove dealID parameter, change publishCid to be cid.Cid (instead of pointer)
func (n *ProviderNodeAdapter) OnDealSectorPreCommitted(ctx context.Context, provider address.Address, dealID abi.DealID, proposal market2.DealProposal, publishCid *cid.Cid, cb storagemarket.DealSectorPreCommittedCallback) error {
return OnDealSectorPreCommitted(ctx, n, n.ev, provider, dealID, market.DealProposal(proposal), publishCid, cb)
return n.scMgr.OnDealSectorPreCommitted(ctx, provider, market.DealProposal(proposal), *publishCid, cb)
}
// TODO: Remove dealID parameter, change publishCid to be cid.Cid (instead of pointer)
func (n *ProviderNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider address.Address, dealID abi.DealID, sectorNumber abi.SectorNumber, proposal market2.DealProposal, publishCid *cid.Cid, cb storagemarket.DealSectorCommittedCallback) error {
return OnDealSectorCommitted(ctx, n, n.ev, provider, dealID, sectorNumber, market.DealProposal(proposal), publishCid, cb)
return n.scMgr.OnDealSectorCommitted(ctx, provider, sectorNumber, market.DealProposal(proposal), *publishCid, cb)
}
func (n *ProviderNodeAdapter) GetChainHead(ctx context.Context) (shared.TipSetToken, abi.ChainEpoch, error) {
@ -305,6 +287,31 @@ func (n *ProviderNodeAdapter) WaitForMessage(ctx context.Context, mcid cid.Cid,
return cb(receipt.Receipt.ExitCode, receipt.Receipt.Return, receipt.Message, nil)
}
func (n *ProviderNodeAdapter) WaitForPublishDeals(ctx context.Context, publishCid cid.Cid, proposal market2.DealProposal) (*storagemarket.PublishDealsWaitResult, error) {
// Wait for deal to be published (plus additional time for confidence)
receipt, err := n.StateWaitMsg(ctx, publishCid, 2*build.MessageConfidence)
if err != nil {
return nil, xerrors.Errorf("WaitForPublishDeals errored: %w", err)
}
if receipt.Receipt.ExitCode != exitcode.Ok {
return nil, xerrors.Errorf("WaitForPublishDeals exit code: %s", receipt.Receipt.ExitCode)
}
// The deal ID may have changed since publish if there was a reorg, so
// get the current deal ID
head, err := n.ChainHead(ctx)
if err != nil {
return nil, xerrors.Errorf("WaitForPublishDeals failed to get chain head: %w", err)
}
res, err := n.scMgr.dealInfo.GetCurrentDealInfo(ctx, head.Key().Bytes(), (*market.DealProposal)(&proposal), publishCid)
if err != nil {
return nil, xerrors.Errorf("WaitForPublishDeals getting deal info errored: %w", err)
}
return &storagemarket.PublishDealsWaitResult{DealID: res.DealID, FinalCid: receipt.Message}, nil
}
func (n *ProviderNodeAdapter) GetDataCap(ctx context.Context, addr address.Address, encodedTs shared.TipSetToken) (*abi.StoragePower, error) {
tsk, err := types.TipSetKeyFromBytes(encodedTs)
if err != nil {

View File

@ -377,6 +377,7 @@ func Online() Option {
Override(new(dtypes.StorageDealFilter), modules.BasicDealFilter(nil)),
Override(new(dtypes.RetrievalDealFilter), modules.RetrievalDealFilter(nil)),
Override(new(storagemarket.StorageProvider), modules.StorageProvider),
Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(nil, storageadapter.PublishMsgConfig{})),
Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter(nil)),
Override(HandleMigrateProviderFundsKey, modules.HandleMigrateProviderFunds),
Override(HandleRetrievalKey, modules.HandleRetrieval),
@ -519,6 +520,10 @@ func ConfigStorageMiner(c interface{}) Option {
Override(new(dtypes.RetrievalDealFilter), modules.RetrievalDealFilter(dealfilter.CliRetrievalDealFilter(cfg.Dealmaking.RetrievalFilter))),
),
Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(&cfg.Fees, storageadapter.PublishMsgConfig{
Period: time.Duration(cfg.Dealmaking.PublishMsgPeriod),
MaxDealsPerMsg: cfg.Dealmaking.MaxDealsPerPublishMsg,
})),
Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter(&cfg.Fees)),
Override(new(sectorstorage.SealerConfig), cfg.Storage),
@ -646,5 +651,6 @@ func Test() Option {
Unset(RunPeerMgrKey),
Unset(new(*peermgr.PeerMgr)),
Override(new(beacon.Schedule), testing.RandomBeacon),
Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(nil, storageadapter.PublishMsgConfig{})),
)
}

View File

@ -48,6 +48,12 @@ type DealmakingConfig struct {
ConsiderUnverifiedStorageDeals bool
PieceCidBlocklist []cid.Cid
ExpectedSealDuration Duration
// The amount of time to wait for more deals to arrive before
// publishing
PublishMsgPeriod Duration
// The maximum number of deals to include in a single PublishStorageDeals
// message
MaxDealsPerPublishMsg uint64
Filter string
RetrievalFilter string
@ -64,6 +70,8 @@ type SealingConfig struct {
MaxSealingSectorsForDeals uint64
WaitDealsDelay Duration
AlwaysKeepUnsealedCopy bool
}
type MinerFeeConfig struct {
@ -207,6 +215,8 @@ func DefaultStorageMiner() *StorageMiner {
PieceCidBlocklist: []cid.Cid{},
// TODO: It'd be nice to set this based on sector size
ExpectedSealDuration: Duration(time.Hour * 24),
PublishMsgPeriod: Duration(time.Hour),
MaxDealsPerPublishMsg: 8,
},
Fees: MinerFeeConfig{

View File

@ -40,6 +40,7 @@ import (
type StateModuleAPI interface {
MsigGetAvailableBalance(ctx context.Context, addr address.Address, tsk types.TipSetKey) (types.BigInt, error)
MsigGetVested(ctx context.Context, addr address.Address, start types.TipSetKey, end types.TipSetKey) (types.BigInt, error)
MsigGetPending(ctx context.Context, addr address.Address, tsk types.TipSetKey) ([]*api.MsigTransaction, error)
StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error)
StateDealProviderCollateralBounds(ctx context.Context, size abi.PaddedPieceSize, verified bool, tsk types.TipSetKey) (api.DealCollateralBounds, error)
StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error)
@ -52,8 +53,8 @@ type StateModuleAPI interface {
StateMinerProvingDeadline(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*dline.Info, error)
StateMinerPower(context.Context, address.Address, types.TipSetKey) (*api.MinerPower, error)
StateNetworkVersion(ctx context.Context, key types.TipSetKey) (network.Version, error)
StateSectorGetInfo(ctx context.Context, maddr address.Address, n abi.SectorNumber, tsk types.TipSetKey) (*miner.SectorOnChainInfo, error)
StateSearchMsg(ctx context.Context, msg cid.Cid) (*api.MsgLookup, error)
StateSectorGetInfo(ctx context.Context, maddr address.Address, n abi.SectorNumber, tsk types.TipSetKey) (*miner.SectorOnChainInfo, error)
StateVerifiedClientStatus(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*abi.StoragePower, error)
StateWaitMsg(ctx context.Context, msg cid.Cid, confidence uint64) (*api.MsgLookup, error)
}
@ -977,6 +978,40 @@ func (m *StateModule) MsigGetVested(ctx context.Context, addr address.Address, s
return types.BigSub(startLk, endLk), nil
}
func (m *StateModule) MsigGetPending(ctx context.Context, addr address.Address, tsk types.TipSetKey) ([]*api.MsigTransaction, error) {
ts, err := m.Chain.GetTipSetFromKey(tsk)
if err != nil {
return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err)
}
act, err := m.StateManager.LoadActor(ctx, addr, ts)
if err != nil {
return nil, xerrors.Errorf("failed to load multisig actor: %w", err)
}
msas, err := multisig.Load(m.Chain.Store(ctx), act)
if err != nil {
return nil, xerrors.Errorf("failed to load multisig actor state: %w", err)
}
var out = []*api.MsigTransaction{}
if err := msas.ForEachPendingTxn(func(id int64, txn multisig.Transaction) error {
out = append(out, &api.MsigTransaction{
ID: id,
To: txn.To,
Value: txn.Value,
Method: txn.Method,
Params: txn.Params,
Approved: txn.Approved,
})
return nil
}); err != nil {
return nil, err
}
return out, nil
}
var initialPledgeNum = types.NewInt(110)
var initialPledgeDen = types.NewInt(100)

View File

@ -218,7 +218,7 @@ func StorageMiner(fc config.MinerFeeConfig) func(params StorageMinerParams) (*st
ctx := helpers.LifecycleCtx(mctx, lc)
fps, err := storage.NewWindowedPoStScheduler(api, fc, as, sealer, sealer, j, maddr)
fps, err := storage.NewWindowedPoStScheduler(api, fc, as, sealer, verif, sealer, j, maddr)
if err != nil {
return nil, err
}
@ -812,6 +812,7 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error
MaxSealingSectors: cfg.MaxSealingSectors,
MaxSealingSectorsForDeals: cfg.MaxSealingSectorsForDeals,
WaitDealsDelay: config.Duration(cfg.WaitDealsDelay),
AlwaysKeepUnsealedCopy: cfg.AlwaysKeepUnsealedCopy,
}
})
return
@ -826,6 +827,7 @@ func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error
MaxSealingSectors: cfg.Sealing.MaxSealingSectors,
MaxSealingSectorsForDeals: cfg.Sealing.MaxSealingSectorsForDeals,
WaitDealsDelay: time.Duration(cfg.Sealing.WaitDealsDelay),
AlwaysKeepUnsealedCopy: cfg.Sealing.AlwaysKeepUnsealedCopy,
}
})
return

View File

@ -5,14 +5,12 @@ import (
"testing"
"time"
builder "github.com/filecoin-project/lotus/node/test"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/lib/lotuslog"
logging "github.com/ipfs/go-log/v2"
"github.com/filecoin-project/lotus/api/test"
"github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/filecoin-project/lotus/lib/lotuslog"
builder "github.com/filecoin-project/lotus/node/test"
logging "github.com/ipfs/go-log/v2"
)
func init() {
@ -57,6 +55,9 @@ func TestAPIDealFlow(t *testing.T) {
t.Run("TestFastRetrievalDealFlow", func(t *testing.T) {
test.TestFastRetrievalDealFlow(t, builder.MockSbBuilder, blockTime, dealStartEpoch)
})
t.Run("TestPublishDealsBatching", func(t *testing.T) {
test.TestPublishDealsBatching(t, builder.MockSbBuilder, blockTime, dealStartEpoch)
})
}
func TestAPIDealFlowReal(t *testing.T) {

View File

@ -288,7 +288,11 @@ func mockBuilderOpts(t *testing.T, fullOpts []test.FullNodeOpts, storage []test.
genMiner := maddrs[i]
wa := genms[i].Worker
storers[i] = CreateTestStorageNode(ctx, t, wa, genMiner, pk, f, mn, node.Options())
opts := def.Opts
if opts == nil {
opts = node.Options()
}
storers[i] = CreateTestStorageNode(ctx, t, wa, genMiner, pk, f, mn, opts)
if err := storers[i].StorageAddLocal(ctx, presealDirs[i]); err != nil {
t.Fatalf("%+v", err)
}
@ -455,12 +459,17 @@ func mockSbBuilderOpts(t *testing.T, fullOpts []test.FullNodeOpts, storage []tes
}
}
opts := def.Opts
if opts == nil {
opts = node.Options()
}
storers[i] = CreateTestStorageNode(ctx, t, genms[i].Worker, maddrs[i], pidKeys[i], f, mn, node.Options(
node.Override(new(sectorstorage.SectorManager), func() (sectorstorage.SectorManager, error) {
return mock.NewMockSectorMgr(sectors), nil
}),
node.Override(new(ffiwrapper.Verifier), mock.MockVerifier),
node.Unset(new(*sectorstorage.Manager)),
opts,
))
if rpc {

View File

@ -21,6 +21,7 @@ import (
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/impl/full"
payapi "github.com/filecoin-project/lotus/node/impl/paych"
"github.com/filecoin-project/lotus/node/modules/helpers"
)
var log = logging.Logger("payment-channel-settler")
@ -50,9 +51,10 @@ type paymentChannelSettler struct {
// SettlePaymentChannels checks the chain for events related to payment channels settling and
// submits any vouchers for inbound channels tracked for this node
func SettlePaymentChannels(lc fx.Lifecycle, api API) error {
func SettlePaymentChannels(mctx helpers.MetricsCtx, lc fx.Lifecycle, api API) error {
ctx := helpers.LifecycleCtx(mctx, lc)
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
OnStart: func(context.Context) error {
pcs := newPaymentChannelSettler(ctx, &api)
ev := events.NewEvents(ctx, &api)
return ev.Called(pcs.check, pcs.messageHandler, pcs.revertHandler, int(build.MessageConfidence+1), events.NoTimeout, pcs.matcher)

View File

@ -20,6 +20,7 @@ import (
"golang.org/x/xerrors"
proof2 "github.com/filecoin-project/specs-actors/v2/actors/runtime/proof"
"github.com/filecoin-project/specs-actors/v3/actors/runtime/proof"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
@ -594,10 +595,26 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di dline.Info, ts *ty
log.Infow("computing window post", "batch", batchIdx, "elapsed", elapsed)
if err == nil {
// If we proved nothing, something is very wrong.
if len(postOut) == 0 {
return nil, xerrors.Errorf("received no proofs back from generate window post")
}
// If we generated an incorrect proof, try again.
if correct, err := s.verifier.VerifyWindowPoSt(ctx, proof.WindowPoStVerifyInfo{
Randomness: abi.PoStRandomness(rand),
Proofs: postOut,
ChallengedSectors: sinfos,
Prover: abi.ActorID(mid),
}); err != nil {
log.Errorw("window post verification failed", "post", postOut, "error", err)
time.Sleep(5 * time.Second)
continue
} else if !correct {
log.Errorw("generated incorrect window post proof", "post", postOut, "error", err)
continue
}
// Proof generation successful, stop retrying
somethingToProve = true
params.Partitions = partitions

View File

@ -6,6 +6,7 @@ import (
"testing"
"github.com/stretchr/testify/require"
"golang.org/x/xerrors"
"github.com/ipfs/go-cid"
@ -123,6 +124,34 @@ func (m *mockProver) GenerateWindowPoSt(ctx context.Context, aid abi.ActorID, si
}, nil, nil
}
type mockVerif struct {
}
func (m mockVerif) VerifyWinningPoSt(ctx context.Context, info proof2.WinningPoStVerifyInfo) (bool, error) {
panic("implement me")
}
func (m mockVerif) VerifyWindowPoSt(ctx context.Context, info proof2.WindowPoStVerifyInfo) (bool, error) {
if len(info.Proofs) != 1 {
return false, xerrors.Errorf("expected 1 proof entry")
}
proof := info.Proofs[0]
if !bytes.Equal(proof.ProofBytes, []byte("post-proof")) {
return false, xerrors.Errorf("bad proof")
}
return true, nil
}
func (m mockVerif) VerifySeal(proof2.SealVerifyInfo) (bool, error) {
panic("implement me")
}
func (m mockVerif) GenerateWinningPoStSectorChallenge(context.Context, abi.RegisteredPoStProof, abi.ActorID, abi.PoStRandomness, uint64) ([]uint64, error) {
panic("implement me")
}
type mockFaultTracker struct {
}
@ -176,6 +205,7 @@ func TestWDPostDoPost(t *testing.T) {
scheduler := &WindowPoStScheduler{
api: mockStgMinerAPI,
prover: &mockProver{},
verifier: &mockVerif{},
faultTracker: &mockFaultTracker{},
proofType: proofType,
actor: postAct,

View File

@ -16,6 +16,7 @@ import (
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/node/config"
@ -27,6 +28,7 @@ type WindowPoStScheduler struct {
feeCfg config.MinerFeeConfig
addrSel *AddressSelector
prover storage.Prover
verifier ffiwrapper.Verifier
faultTracker sectorstorage.FaultTracker
proofType abi.RegisteredPoStProof
partitionSectors uint64
@ -41,7 +43,7 @@ type WindowPoStScheduler struct {
// failLk sync.Mutex
}
func NewWindowedPoStScheduler(api storageMinerApi, fc config.MinerFeeConfig, as *AddressSelector, sb storage.Prover, ft sectorstorage.FaultTracker, j journal.Journal, actor address.Address) (*WindowPoStScheduler, error) {
func NewWindowedPoStScheduler(api storageMinerApi, fc config.MinerFeeConfig, as *AddressSelector, sb storage.Prover, verif ffiwrapper.Verifier, ft sectorstorage.FaultTracker, j journal.Journal, actor address.Address) (*WindowPoStScheduler, error) {
mi, err := api.StateMinerInfo(context.TODO(), actor, types.EmptyTSK)
if err != nil {
return nil, xerrors.Errorf("getting sector size: %w", err)
@ -52,6 +54,7 @@ func NewWindowedPoStScheduler(api storageMinerApi, fc config.MinerFeeConfig, as
feeCfg: fc,
addrSel: as,
prover: sb,
verifier: verif,
faultTracker: ft,
proofType: mi.WindowPoStProofType,
partitionSectors: mi.WindowPoStPartitionSectors,