From 46a0333c9c48e495a45a0a66302ce66fceea6f97 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 21 Oct 2019 20:12:11 +0200 Subject: [PATCH] on chain deals: Wip porting deal systems to storagemarket --- api/api.go | 3 + api/types.go | 11 +- api/utils.go | 27 ++ chain/actors/actor_init.go | 4 +- chain/actors/actor_miner.go | 124 ++----- chain/actors/actor_multisig_test.go | 4 +- chain/actors/actor_paych_test.go | 4 +- chain/actors/actor_storagemarket.go | 77 ++++- chain/actors/actor_storagepower.go | 2 +- chain/actors/actors.go | 26 +- chain/actors/actors_test.go | 2 +- chain/actors/harness2_test.go | 2 +- chain/deals/client.go | 83 +++-- chain/deals/client_states.go | 37 ++- chain/deals/client_utils.go | 59 ---- chain/deals/handler_states.go | 313 ------------------ chain/deals/{handler.go => provider.go} | 70 ++-- chain/deals/{asks.go => provider_asks.go} | 64 ++-- chain/deals/provider_states.go | 245 ++++++++++++++ .../{handler_utils.go => provider_utils.go} | 38 ++- chain/deals/types.go | 56 +--- chain/gen/utils.go | 14 +- chain/state/statetree.go | 4 +- chain/state/statetree_test.go | 8 +- chain/vm/invoker.go | 10 +- chain/vm/mkactor.go | 4 +- chain/vm/vm.go | 4 +- node/builder.go | 2 +- node/impl/client/client.go | 6 +- node/modules/storageminer.go | 4 +- paych/simple.go | 4 +- 31 files changed, 583 insertions(+), 728 deletions(-) create mode 100644 api/utils.go delete mode 100644 chain/deals/handler_states.go rename chain/deals/{handler.go => provider.go} (71%) rename chain/deals/{asks.go => provider_asks.go} (66%) create mode 100644 chain/deals/provider_states.go rename chain/deals/{handler_utils.go => provider_utils.go} (66%) diff --git a/api/api.go b/api/api.go index 5e6373fac..12e96d69d 100644 --- a/api/api.go +++ b/api/api.go @@ -3,6 +3,7 @@ package api import ( "context" "fmt" + "github.com/filecoin-project/lotus/chain/actors" sectorbuilder "github.com/filecoin-project/go-sectorbuilder" "github.com/ipfs/go-cid" @@ -11,6 +12,7 @@ import ( "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" + sectorbuilder "github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/address" "github.com/filecoin-project/lotus/chain/store" @@ -132,6 +134,7 @@ type FullNode interface { StateWaitMsg(context.Context, cid.Cid) (*MsgWait, error) StateListMiners(context.Context, *types.TipSet) ([]address.Address, error) StateListActors(context.Context, *types.TipSet) ([]address.Address, error) + StateMarketBalance(context.Context, address.Address) (actors.StorageParticipantBalance, error) PaychGet(ctx context.Context, from, to address.Address, ensureFunds types.BigInt) (*ChannelInfo, error) PaychList(context.Context) ([]address.Address, error) diff --git a/api/types.go b/api/types.go index 588227de2..f584345f3 100644 --- a/api/types.go +++ b/api/types.go @@ -10,12 +10,13 @@ type DealState int const ( DealUnknown = DealState(iota) - DealRejected - DealAccepted - DealStarted + DealRejected // Provider didn't like the proposal + DealAccepted // Proposal accepted, data moved + DealStaged // Data put into the sector + DealSealing // Data in process of being sealed + DealFailed - DealStaged - DealSealing + DealComplete // Internal diff --git a/api/utils.go b/api/utils.go new file mode 100644 index 000000000..5bbac426e --- /dev/null +++ b/api/utils.go @@ -0,0 +1,27 @@ +package api + +import ( + "context" + + "github.com/filecoin-project/lotus/chain/address" + "github.com/filecoin-project/lotus/chain/types" +) + +type SignFunc = func(context.Context, []byte) (*types.Signature, error) + +type Signer func(context.Context, address.Address, []byte) (*types.Signature, error) + +type Signable interface { + Sign(context.Context, SignFunc) error +} + +func SignWith(ctx context.Context, signer Signer, addr address.Address, signable ...Signable) error { + for _, s := range signable { + err := s.Sign(ctx, func(ctx context.Context, b []byte) (*types.Signature, error) { + return signer(ctx, addr, b) + }) + if err != nil { + return err + } + } +} diff --git a/chain/actors/actor_init.go b/chain/actors/actor_init.go index fdff41589..3ab1243a8 100644 --- a/chain/actors/actor_init.go +++ b/chain/actors/actor_init.go @@ -162,7 +162,7 @@ func (ia InitActor) Exec(act *types.Actor, vmctx types.VMContext, p *ExecParams) func IsBuiltinActor(code cid.Cid) bool { switch code { - case StorageMarketActorCodeCid, StorageMinerCodeCid, AccountActorCodeCid, InitActorCodeCid, MultisigActorCodeCid, PaymentChannelActorCodeCid: + case StorageMarketCodeCid, StoragePowerCodeCid, StorageMinerCodeCid, AccountCodeCid, InitCodeCid, MultisigCodeCid, PaymentChannelCodeCid: return true default: return false @@ -170,7 +170,7 @@ func IsBuiltinActor(code cid.Cid) bool { } func IsSingletonActor(code cid.Cid) bool { - return code == StoragePowerActorCodeCid || code == InitActorCodeCid + return code == StoragePowerCodeCid || code == InitCodeCid } func (ias *InitActorState) AddActor(cst *hamt.CborIpldStore, addr address.Address) (address.Address, error) { diff --git a/chain/actors/actor_miner.go b/chain/actors/actor_miner.go index 450a4c905..7f87a2aea 100644 --- a/chain/actors/actor_miner.go +++ b/chain/actors/actor_miner.go @@ -100,29 +100,27 @@ type StorageMinerConstructorParams struct { } type maMethods struct { - Constructor uint64 - CommitSector uint64 - SubmitPoSt uint64 - SlashStorageFault uint64 - GetCurrentProvingSet uint64 - ArbitrateDeal uint64 - DePledge uint64 - GetOwner uint64 - GetWorkerAddr uint64 - GetPower uint64 - GetPeerID uint64 - GetSectorSize uint64 - UpdatePeerID uint64 - ChangeWorker uint64 - IsSlashed uint64 - IsLate uint64 - PaymentVerifyInclusion uint64 - PaymentVerifySector uint64 - AddFaults uint64 - SlashConsensusFault uint64 + Constructor uint64 + CommitSector uint64 + SubmitPoSt uint64 + SlashStorageFault uint64 + GetCurrentProvingSet uint64 + ArbitrateDeal uint64 + DePledge uint64 + GetOwner uint64 + GetWorkerAddr uint64 + GetPower uint64 + GetPeerID uint64 + GetSectorSize uint64 + UpdatePeerID uint64 + ChangeWorker uint64 + IsSlashed uint64 + IsLate uint64 + AddFaults uint64 + SlashConsensusFault uint64 } -var MAMethods = maMethods{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20} +var MAMethods = maMethods{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18} func (sma StorageMinerActor) Exports() []interface{} { return []interface{}{ @@ -142,10 +140,8 @@ func (sma StorageMinerActor) Exports() []interface{} { //14: sma.ChangeWorker, //15: sma.IsSlashed, //16: sma.IsLate, - 17: sma.PaymentVerifyInclusion, - 18: sma.PaymentVerifySector, - 19: sma.AddFaults, - 20: sma.SlashConsensusFault, + 17: sma.AddFaults, + 18: sma.SlashConsensusFault, } } @@ -642,84 +638,6 @@ type PaymentVerifyParams struct { Proof []byte } -type PieceInclVoucherData struct { // TODO: Update spec at https://github.com/filecoin-project/specs/blob/master/actors.md#paymentverify - CommP []byte - PieceSize types.BigInt -} - -type InclusionProof struct { - Sector uint64 // for CommD, also verifies the sector is in sector set - Proof []byte -} - -func (sma StorageMinerActor) PaymentVerifyInclusion(act *types.Actor, vmctx types.VMContext, params *PaymentVerifyParams) ([]byte, ActorError) { - // params.Extra - PieceInclVoucherData - // params.Proof - InclusionProof - - _, self, aerr := loadState(vmctx) - if aerr != nil { - return nil, aerr - } - mi, aerr := loadMinerInfo(vmctx, self) - if aerr != nil { - return nil, aerr - } - - var voucherData PieceInclVoucherData - if err := cbor.DecodeInto(params.Extra, &voucherData); err != nil { - return nil, aerrors.Absorb(err, 2, "failed to decode storage voucher data for verification") - } - var proof InclusionProof - if err := cbor.DecodeInto(params.Proof, &proof); err != nil { - return nil, aerrors.Absorb(err, 3, "failed to decode storage payment proof") - } - - ok, _, commD, aerr := GetFromSectorSet(context.TODO(), vmctx.Storage(), self.Sectors, proof.Sector) - if aerr != nil { - return nil, aerr - } - if !ok { - return nil, aerrors.New(4, "miner does not have required sector") - } - - ok, err := sectorbuilder.VerifyPieceInclusionProof(mi.SectorSize.Uint64(), voucherData.PieceSize.Uint64(), voucherData.CommP, commD, proof.Proof) - if err != nil { - return nil, aerrors.Absorb(err, 5, "verify piece inclusion proof failed") - } - if !ok { - return nil, aerrors.New(6, "piece inclusion proof was invalid") - } - - return nil, nil -} - -func (sma StorageMinerActor) PaymentVerifySector(act *types.Actor, vmctx types.VMContext, params *PaymentVerifyParams) ([]byte, ActorError) { - // params.Extra - BigInt - sector id - // params.Proof - nil - - _, self, aerr := loadState(vmctx) - if aerr != nil { - return nil, aerr - } - - // TODO: ensure no sector ID reusability within related deal lifetime - sector := types.BigFromBytes(params.Extra) - - if len(params.Proof) > 0 { - return nil, aerrors.New(1, "unexpected proof bytes") - } - - ok, _, _, aerr := GetFromSectorSet(context.TODO(), vmctx.Storage(), self.Sectors, sector.Uint64()) - if aerr != nil { - return nil, aerr - } - if !ok { - return nil, aerrors.New(2, "miner does not have required sector") - } - - return nil, nil -} - type AddFaultsParams struct { Faults types.BitField } diff --git a/chain/actors/actor_multisig_test.go b/chain/actors/actor_multisig_test.go index b705ad4a8..b55d17b13 100644 --- a/chain/actors/actor_multisig_test.go +++ b/chain/actors/actor_multisig_test.go @@ -23,7 +23,7 @@ func TestMultiSigCreate(t *testing.T) { } h := NewHarness(t, opts...) - ret, _ := h.CreateActor(t, creatorAddr, actors.MultisigActorCodeCid, + ret, _ := h.CreateActor(t, creatorAddr, actors.MultisigCodeCid, &actors.MultiSigConstructorParams{ Signers: []address.Address{creatorAddr, sig1Addr, sig2Addr}, Required: 2, @@ -49,7 +49,7 @@ func TestMultiSigOps(t *testing.T) { HarnessAddr(&sig1Addr, 100000), HarnessAddr(&sig2Addr, 100000), HarnessAddr(&outsideAddr, 100000), - HarnessActor(&multSigAddr, &creatorAddr, actors.MultisigActorCodeCid, + HarnessActor(&multSigAddr, &creatorAddr, actors.MultisigCodeCid, func() cbg.CBORMarshaler { return &actors.MultiSigConstructorParams{ Signers: []address.Address{creatorAddr, sig1Addr, sig2Addr}, diff --git a/chain/actors/actor_paych_test.go b/chain/actors/actor_paych_test.go index 3b712598c..e23acdf4a 100644 --- a/chain/actors/actor_paych_test.go +++ b/chain/actors/actor_paych_test.go @@ -18,7 +18,7 @@ func TestPaychCreate(t *testing.T) { } h := NewHarness(t, opts...) - ret, _ := h.CreateActor(t, creatorAddr, actors.PaymentChannelActorCodeCid, + ret, _ := h.CreateActor(t, creatorAddr, actors.PaymentChannelCodeCid, &actors.PCAConstructorParams{ To: targetAddr, }) @@ -47,7 +47,7 @@ func TestPaychUpdate(t *testing.T) { } h := NewHarness(t, opts...) - ret, _ := h.CreateActor(t, creatorAddr, actors.PaymentChannelActorCodeCid, + ret, _ := h.CreateActor(t, creatorAddr, actors.PaymentChannelCodeCid, &actors.PCAConstructorParams{ To: targetAddr, }) diff --git a/chain/actors/actor_storagemarket.go b/chain/actors/actor_storagemarket.go index 6004c6113..3753621d4 100644 --- a/chain/actors/actor_storagemarket.go +++ b/chain/actors/actor_storagemarket.go @@ -2,16 +2,17 @@ package actors import ( "bytes" - + "context" "github.com/filecoin-project/go-amt-ipld" "github.com/ipfs/go-cid" "github.com/ipfs/go-hamt-ipld" cbg "github.com/whyrusleeping/cbor-gen" + "golang.org/x/xerrors" - "github.com/filecoin-project/go-lotus/build" - "github.com/filecoin-project/go-lotus/chain/actors/aerrors" - "github.com/filecoin-project/go-lotus/chain/address" - "github.com/filecoin-project/go-lotus/chain/types" + "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain/actors/aerrors" + "github.com/filecoin-project/lotus/chain/address" + "github.com/filecoin-project/lotus/chain/types" ) type StorageMarketActor struct{} @@ -59,16 +60,64 @@ type StorageMarketState struct { NextDealID uint64 // TODO: amt.LastIndex() } +// TODO: serialization mode spec +type SerializationMode uint64 + +const ( + SerializationUnixFSv0 = iota + // IPLD / car +) + type StorageDealProposal struct { PieceRef []byte // cid bytes // TODO: spec says to use cid.Cid, probably not a good idea PieceSize uint64 - Client address.Address - Provider address.Address + PieceSerialization SerializationMode // Needs to be here as it tells how data in the sector maps to PieceRef cid + + Client address.Address + Provider address.Address + ProposalExpiration uint64 - DealExpiration uint64 - StoragePrice types.BigInt - StorageCollateral types.BigInt - ProposerSignature types.Signature + Duration uint64 // TODO: spec proposes 'DealExpiration', but that's awkward as it + // doesn't tell when the deal actually starts, so the price per block is impossible to + // calculate. It also doesn't incentivize the miner to seal / activate sooner, as he + // still get's paid the full amount specified in the deal + // + // Changing to duration makes sure that the price-per-block is defined, and the miner + // doesn't get paid when not storing the sector + + StoragePrice types.BigInt + StorageCollateral types.BigInt + + ProposerSignature *types.Signature +} + +type SignFunc = func(context.Context, []byte) (*types.Signature, error) + +func (sdp *StorageDealProposal) Sign(ctx context.Context, sign SignFunc) error { + if sdp.ProposerSignature != nil { + return xerrors.New("signature already present in StorageDealProposal") + } + var buf bytes.Buffer + if err := sdp.MarshalCBOR(&buf); err != nil { + return err + } + sig, err := sign(ctx, buf.Bytes()) + if err != nil { + return err + } + sdp.ProposerSignature = sig + return nil +} + +func (sdp *StorageDealProposal) Verify() error { + unsigned := *sdp + unsigned.ProposerSignature = nil + var buf bytes.Buffer + if err := sdp.MarshalCBOR(&buf); err != nil { + return err + } + + return sdp.ProposerSignature.Verify(sdp.Client, buf.Bytes()) } type StorageDeal struct { @@ -86,6 +135,8 @@ type WithdrawBalanceParams struct { } func (sma StorageMarketActor) WithdrawBalance(act *types.Actor, vmctx types.VMContext, params *WithdrawBalanceParams) ([]byte, ActorError) { + // TODO: (spec) this should be 2-stage + var self StorageMarketState old := vmctx.Storage().GetHead() if err := vmctx.Storage().Get(old, &self); err != nil { @@ -527,10 +578,6 @@ func (sma StorageMarketActor) SettleExpiredDeals(act *types.Actor, vmctx types.V } -func (sma StorageMarketActor) ProcessStorageDealsPayment(act *types.Actor, vmctx types.VMContext, params *struct{}) ([]byte, ActorError) { - -} - func (sma StorageMarketActor) SlashStorageDealCollateral(act *types.Actor, vmctx types.VMContext, params *struct{}) ([]byte, ActorError) { } diff --git a/chain/actors/actor_storagepower.go b/chain/actors/actor_storagepower.go index 21afb6dd0..a8e47ea2a 100644 --- a/chain/actors/actor_storagepower.go +++ b/chain/actors/actor_storagepower.go @@ -87,7 +87,7 @@ func (spa StoragePowerActor) CreateStorageMiner(act *types.Actor, vmctx types.VM return nil, err } - ret, err := vmctx.Send(InitActorAddress, IAMethods.Exec, vmctx.Message().Value, encoded) + ret, err := vmctx.Send(InitAddress, IAMethods.Exec, vmctx.Message().Value, encoded) if err != nil { return nil, err } diff --git a/chain/actors/actors.go b/chain/actors/actors.go index 0f18964cb..032c6e573 100644 --- a/chain/actors/actors.go +++ b/chain/actors/actors.go @@ -7,15 +7,15 @@ import ( mh "github.com/multiformats/go-multihash" ) -var AccountActorCodeCid cid.Cid -var StoragePowerActorCodeCid cid.Cid -var StorageMarketActorCodeCid cid.Cid +var AccountCodeCid cid.Cid +var StoragePowerCodeCid cid.Cid +var StorageMarketCodeCid cid.Cid var StorageMinerCodeCid cid.Cid -var MultisigActorCodeCid cid.Cid -var InitActorCodeCid cid.Cid -var PaymentChannelActorCodeCid cid.Cid +var MultisigCodeCid cid.Cid +var InitCodeCid cid.Cid +var PaymentChannelCodeCid cid.Cid -var InitActorAddress = mustIDAddress(0) +var InitAddress = mustIDAddress(0) var NetworkAddress = mustIDAddress(1) var StoragePowerAddress = mustIDAddress(2) var StorageMarketAddress = mustIDAddress(3) // TODO: missing from spec @@ -39,11 +39,11 @@ func init() { return c } - AccountActorCodeCid = mustSum("filecoin/1.0/AccountActor") - StoragePowerActorCodeCid = mustSum("filecoin/1.0/StoragePowerActor") - StorageMarketActorCodeCid = mustSum("filecoin/1.0/StorageMarketActor") + AccountCodeCid = mustSum("filecoin/1.0/AccountActor") + StoragePowerCodeCid = mustSum("filecoin/1.0/StoragePowerActor") + StorageMarketCodeCid = mustSum("filecoin/1.0/StorageMarketActor") StorageMinerCodeCid = mustSum("filecoin/1.0/StorageMinerActor") - MultisigActorCodeCid = mustSum("filecoin/1.0/MultisigActor") - InitActorCodeCid = mustSum("filecoin/1.0/InitActor") - PaymentChannelActorCodeCid = mustSum("filecoin/1.0/PaymentChannelActor") + MultisigCodeCid = mustSum("filecoin/1.0/MultisigActor") + InitCodeCid = mustSum("filecoin/1.0/InitActor") + PaymentChannelCodeCid = mustSum("filecoin/1.0/PaymentChannelActor") } diff --git a/chain/actors/actors_test.go b/chain/actors/actors_test.go index 27946664c..57d559cf8 100644 --- a/chain/actors/actors_test.go +++ b/chain/actors/actors_test.go @@ -80,7 +80,7 @@ func TestVMInvokeMethod(t *testing.T) { } msg := &types.Message{ - To: InitActorAddress, + To: InitAddress, From: from, Method: IAMethods.Exec, Params: enc, diff --git a/chain/actors/harness2_test.go b/chain/actors/harness2_test.go index f0fc4b3e7..0ffc02b1b 100644 --- a/chain/actors/harness2_test.go +++ b/chain/actors/harness2_test.go @@ -210,7 +210,7 @@ func (h *Harness) CreateActor(t testing.TB, from address.Address, t.Helper() return h.Apply(t, types.Message{ - To: actors.InitActorAddress, + To: actors.InitAddress, From: from, Method: actors.IAMethods.Exec, Params: DumpObject(t, diff --git a/chain/deals/client.go b/chain/deals/client.go index bf059ab93..8e889836d 100644 --- a/chain/deals/client.go +++ b/chain/deals/client.go @@ -2,6 +2,7 @@ package deals import ( "context" + "github.com/filecoin-project/lotus/chain/store" "math" "github.com/ipfs/go-cid" @@ -27,20 +28,16 @@ import ( func init() { cbor.RegisterCborType(ClientDeal{}) - cbor.RegisterCborType(actors.PieceInclVoucherData{}) // TODO: USE CBORGEN! cbor.RegisterCborType(types.SignedVoucher{}) cbor.RegisterCborType(types.ModVerifyParams{}) cbor.RegisterCborType(types.Signature{}) - cbor.RegisterCborType(actors.PaymentInfo{}) - cbor.RegisterCborType(api.PaymentInfo{}) - cbor.RegisterCborType(actors.InclusionProof{}) } var log = logging.Logger("deals") type ClientDeal struct { ProposalCid cid.Cid - Proposal StorageDealProposal + Proposal actors.StorageDealProposal State api.DealState Miner peer.ID @@ -49,6 +46,7 @@ type ClientDeal struct { type Client struct { sm *stmgr.StateManager + chain *store.ChainStore h host.Host w *wallet.Wallet dag dtypes.ClientDAG @@ -70,9 +68,10 @@ type clientDealUpdate struct { err error } -func NewClient(sm *stmgr.StateManager, h host.Host, w *wallet.Wallet, ds dtypes.MetadataDS, dag dtypes.ClientDAG, discovery *discovery.Local) *Client { +func NewClient(sm *stmgr.StateManager, chain *store.ChainStore, h host.Host, w *wallet.Wallet, ds dtypes.MetadataDS, dag dtypes.ClientDAG, discovery *discovery.Local) *Client { c := &Client{ sm: sm, + chain: chain, h: h, w: w, dag: dag, @@ -164,49 +163,32 @@ func (c *Client) onUpdated(ctx context.Context, update clientDealUpdate) { } type ClientDealProposal struct { - Data cid.Cid + Data cid.Cid + DataSize uint64 - TotalPrice types.BigInt - Duration uint64 + TotalPrice types.BigInt + ProposalExpiration uint64 + Duration uint64 - Payment actors.PaymentInfo - - MinerAddress address.Address - ClientAddress address.Address - MinerID peer.ID + ProviderAddress address.Address + Client address.Address + MinerID peer.ID } -func (c *Client) VerifyParams(ctx context.Context, data cid.Cid) (*actors.PieceInclVoucherData, error) { - commP, size, err := c.commP(ctx, data) - if err != nil { - return nil, err +func (c *Client) Start(ctx context.Context, p ClientDealProposal) (cid.Cid, error) { + proposal := actors.StorageDealProposal{ + PieceRef: p.Data.Bytes(), + PieceSize: p.DataSize, + PieceSerialization: actors.SerializationUnixFSv0, + Client: p.Client, + Provider: p.ProviderAddress, + ProposalExpiration: p.ProposalExpiration, + Duration: p.Duration, + StoragePrice: p.TotalPrice, + StorageCollateral: types.NewInt(p.DataSize), // TODO: real calc } - return &actors.PieceInclVoucherData{ - CommP: commP, - PieceSize: types.NewInt(uint64(size)), - }, nil -} - -func (c *Client) Start(ctx context.Context, p ClientDealProposal, vd *actors.PieceInclVoucherData) (cid.Cid, error) { - proposal := StorageDealProposal{ - PieceRef: p.Data, - SerializationMode: SerializationUnixFs, - CommP: vd.CommP[:], - Size: vd.PieceSize.Uint64(), - TotalPrice: p.TotalPrice, - Duration: p.Duration, - Payment: p.Payment, - MinerAddress: p.MinerAddress, - ClientAddress: p.ClientAddress, - } - - s, err := c.h.NewStream(ctx, p.MinerID, ProtocolID) - if err != nil { - return cid.Undef, err - } - - if err := c.sendProposal(s, proposal, p.ClientAddress); err != nil { + if err := api.SignWith(ctx, c.w.Sign, p.Client, &proposal); err != nil { return cid.Undef, err } @@ -215,6 +197,17 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal, vd *actors.Pie return cid.Undef, err } + s, err := c.h.NewStream(ctx, p.MinerID, DealProtocolID) + if err != nil { + s.Reset() + return cid.Undef, err + } + + if err := cborrpc.WriteCborRPC(s, proposal); err != nil { + s.Reset() + return cid.Undef, err + } + deal := ClientDeal{ ProposalCid: proposalNd.Cid(), Proposal: proposal, @@ -224,12 +217,10 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal, vd *actors.Pie s: s, } - // TODO: actually care about what happens with the deal after it was accepted c.incoming <- deal - // TODO: start tracking after the deal is sealed return deal.ProposalCid, c.discovery.AddPeer(p.Data, discovery.RetrievalPeer{ - Address: proposal.MinerAddress, + Address: proposal.Provider, ID: deal.Miner, }) } diff --git a/chain/deals/client_states.go b/chain/deals/client_states.go index d19270707..28164c61f 100644 --- a/chain/deals/client_states.go +++ b/chain/deals/client_states.go @@ -3,11 +3,9 @@ package deals import ( "context" "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/actors" "golang.org/x/xerrors" - - "github.com/filecoin-project/lotus/build" - "github.com/filecoin-project/lotus/lib/sectorbuilder" ) type clientHandlerFunc func(ctx context.Context, deal ClientDeal) error @@ -39,6 +37,34 @@ func (c *Client) new(ctx context.Context, deal ClientDeal) error { return xerrors.Errorf("deal wasn't accepted (State=%d)", resp.State) } + // TODO: spec says it's optional + pubmsg, err := c.chain.GetMessage(resp.PublishMessage) + if err != nil { + return xerrors.Errorf("getting deal pubsish message: %w", err) + } + + if pubmsg.From != deal.Proposal.Provider { + return xerrors.Errorf("Deal wasn't published by storage provider: from=%s, provider=%s", pubmsg.From, deal.Proposal.Provider) + } + + if pubmsg.To != actors.StorageMarketAddress { + return xerrors.Errorf("Deal publish message wasn't set to StorageMarket actor (to=%s)", pubmsg.To) + } + + if pubmsg.Method != actors.SMAMethods.PublishStorageDeals { + return xerrors.Errorf("Deal publish message called incorrect method (method=%s)", pubmsg.Method) + } + + // TODO: timeout + _, ret, err := c.sm.WaitForMessage(ctx, resp.PublishMessage) + if err != nil { + return xerrors.Errorf("Waiting for deal publish message: %w", err) + } + if ret.ExitCode != 0 { + return xerrors.Errorf("deal publish failed: exit=%d", ret.ExitCode) + } + // TODO: persist dealId + log.Info("DEAL ACCEPTED!") return nil @@ -75,13 +101,14 @@ func (c *Client) staged(ctx context.Context, deal ClientDeal) error { log.Info("DEAL SEALED!") - ok, err := sectorbuilder.VerifyPieceInclusionProof(build.SectorSize, deal.Proposal.Size, deal.Proposal.CommP, resp.CommD, resp.PieceInclusionProof.ProofElements) + // TODO: want? + /*ok, err := sectorbuilder.VerifyPieceInclusionProof(build.SectorSize, deal.Proposal.PieceSize, deal.Proposal.CommP, resp.CommD, resp.PieceInclusionProof.ProofElements) if err != nil { return xerrors.Errorf("verifying piece inclusion proof in staged deal %s: %w", deal.ProposalCid, err) } if !ok { return xerrors.Errorf("verifying piece inclusion proof in staged deal %s failed", deal.ProposalCid) - } + }*/ return nil } diff --git a/chain/deals/client_utils.go b/chain/deals/client_utils.go index c192bbeb4..648e41733 100644 --- a/chain/deals/client_utils.go +++ b/chain/deals/client_utils.go @@ -1,18 +1,11 @@ package deals import ( - "context" - "github.com/filecoin-project/lotus/lib/sectorbuilder" "runtime" "github.com/ipfs/go-cid" - files "github.com/ipfs/go-ipfs-files" - cbor "github.com/ipfs/go-ipld-cbor" - unixfile "github.com/ipfs/go-unixfs/file" - inet "github.com/libp2p/go-libp2p-core/network" "golang.org/x/xerrors" - "github.com/filecoin-project/lotus/chain/address" "github.com/filecoin-project/lotus/lib/cborrpc" ) @@ -32,58 +25,6 @@ func (c *Client) failDeal(id cid.Cid, cerr error) { log.Errorf("deal %s failed: %s", id, cerr) } -func (c *Client) commP(ctx context.Context, data cid.Cid) ([]byte, int64, error) { - root, err := c.dag.Get(ctx, data) - if err != nil { - log.Errorf("failed to get file root for deal: %s", err) - return nil, 0, err - } - - n, err := unixfile.NewUnixfsFile(ctx, c.dag, root) - if err != nil { - log.Errorf("cannot open unixfs file: %s", err) - return nil, 0, err - } - - uf, ok := n.(files.File) - if !ok { - // TODO: we probably got directory, how should we handle this in unixfs mode? - return nil, 0, xerrors.New("unsupported unixfs type") - } - - size, err := uf.Size() - if err != nil { - return nil, 0, err - } - - commP, err := sectorbuilder.GeneratePieceCommitment(uf, uint64(size)) - if err != nil { - return nil, 0, err - } - - return commP[:], size, err -} - -func (c *Client) sendProposal(s inet.Stream, proposal StorageDealProposal, from address.Address) error { - log.Info("Sending deal proposal") - - msg, err := cbor.DumpObject(proposal) - if err != nil { - return err - } - sig, err := c.w.Sign(context.TODO(), from, msg) - if err != nil { - return err - } - - signedProposal := &SignedStorageDealProposal{ - Proposal: proposal, - Signature: sig, - } - - return cborrpc.WriteCborRPC(s, signedProposal) -} - func (c *Client) readStorageDealResp(deal ClientDeal) (*StorageDealResponse, error) { s, ok := c.conns[deal.ProposalCid] if !ok { diff --git a/chain/deals/handler_states.go b/chain/deals/handler_states.go deleted file mode 100644 index a981497e3..000000000 --- a/chain/deals/handler_states.go +++ /dev/null @@ -1,313 +0,0 @@ -package deals - -import ( - "bytes" - "context" - - "github.com/filecoin-project/go-sectorbuilder/sealing_state" - cbor "github.com/ipfs/go-ipld-cbor" - "github.com/ipfs/go-merkledag" - unixfile "github.com/ipfs/go-unixfs/file" - "golang.org/x/xerrors" - - "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/build" - "github.com/filecoin-project/lotus/chain/actors" - "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/lib/sectorbuilder" - "github.com/filecoin-project/lotus/storage/sectorblocks" -) - -type minerHandlerFunc func(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) - -func (h *Handler) handle(ctx context.Context, deal MinerDeal, cb minerHandlerFunc, next api.DealState) { - go func() { - mut, err := cb(ctx, deal) - - if err == nil && next == api.DealNoUpdate { - return - } - - select { - case h.updated <- minerDealUpdate{ - newState: next, - id: deal.ProposalCid, - err: err, - mut: mut, - }: - case <-h.stop: - } - }() -} - -// ACCEPTED - -func (h *Handler) checkVoucher(ctx context.Context, deal MinerDeal, voucher *types.SignedVoucher, lane uint64, maxClose uint64, amount types.BigInt) error { - err := h.full.PaychVoucherCheckValid(ctx, deal.Proposal.Payment.PayChActor, voucher) - if err != nil { - return err - } - - if voucher.Extra == nil { - return xerrors.New("voucher.Extra not set") - } - - if voucher.Extra.Actor != deal.Proposal.MinerAddress { - return xerrors.Errorf("extra params actor didn't match miner address in proposal: '%s' != '%s'", voucher.Extra.Actor, deal.Proposal.MinerAddress) - } - - if voucher.Extra.Method != actors.MAMethods.PaymentVerifyInclusion { - return xerrors.Errorf("expected extra method %d, got %d", actors.MAMethods.PaymentVerifyInclusion, voucher.Extra.Method) - } - - var inclChallenge actors.PieceInclVoucherData - if err := cbor.DecodeInto(voucher.Extra.Data, &inclChallenge); err != nil { - return xerrors.Errorf("failed to decode storage voucher data for verification: %w", err) - } - - if inclChallenge.PieceSize.Uint64() != deal.Proposal.Size { - return xerrors.Errorf("paych challenge piece size didn't match deal proposal size: %d != %d", inclChallenge.PieceSize.Uint64(), deal.Proposal.Size) - } - - if !bytes.Equal(inclChallenge.CommP, deal.Proposal.CommP) { - return xerrors.New("paych challenge commP didn't match deal proposal") - } - - if voucher.MinCloseHeight > maxClose { - return xerrors.Errorf("MinCloseHeight too high (%d), max expected: %d", voucher.MinCloseHeight, maxClose) - } - - if voucher.TimeLock > maxClose { - return xerrors.Errorf("TimeLock too high (%d), max expected: %d", voucher.TimeLock, maxClose) - } - - if len(voucher.Merges) > 0 { - return xerrors.New("didn't expect any merges") - } - - if voucher.Amount.LessThan(amount) { - return xerrors.Errorf("not enough funds in the voucher: %s < %s; vl=%d", voucher.Amount, amount, len(deal.Proposal.Payment.Vouchers)) - } - - if voucher.Lane != lane { - return xerrors.Errorf("expected all vouchers on lane %d, found voucher on lane %d", lane, voucher.Lane) - } - - return nil -} - -func (h *Handler) consumeVouchers(ctx context.Context, deal MinerDeal) error { - curHead, err := h.full.ChainHead(ctx) - if err != nil { - return err - } - if len(deal.Proposal.Payment.Vouchers) == 0 { - return xerrors.Errorf("no payment vouchers for deal") - } - - increment := deal.Proposal.Duration / uint64(len(deal.Proposal.Payment.Vouchers)) - - startH := deal.Proposal.Payment.Vouchers[0].TimeLock - increment - if startH > curHead.Height()+build.DealVoucherSkewLimit { - return xerrors.Errorf("deal starts too far into the future: start=%d; h=%d; max=%d; inc=%d", startH, curHead.Height(), curHead.Height()+build.DealVoucherSkewLimit, increment) - } - - vspec := VoucherSpec(deal.Proposal.Duration, deal.Proposal.TotalPrice, startH, nil) - - lane := deal.Proposal.Payment.Vouchers[0].Lane - - for i, voucher := range deal.Proposal.Payment.Vouchers { - maxClose := curHead.Height() + (increment * uint64(i+1)) + build.DealVoucherSkewLimit - - if err := h.checkVoucher(ctx, deal, voucher, lane, maxClose, vspec[i].Amount); err != nil { - return xerrors.Errorf("validating payment voucher %d: %w", i, err) - } - } - - minPrice := types.BigMul(types.BigMul(h.pricePerByteBlock, types.NewInt(deal.Proposal.Size)), types.NewInt(deal.Proposal.Duration)) - if types.BigCmp(minPrice, deal.Proposal.TotalPrice) > 0 { - return xerrors.Errorf("minimum price: %s", minPrice) - } - - prevAmt := types.NewInt(0) - - for i, voucher := range deal.Proposal.Payment.Vouchers { - delta, err := h.full.PaychVoucherAdd(ctx, deal.Proposal.Payment.PayChActor, voucher, nil, types.BigSub(vspec[i].Amount, prevAmt)) - if err != nil { - return xerrors.Errorf("consuming payment voucher %d: %w", i, err) - } - prevAmt = types.BigAdd(prevAmt, delta) - } - - return nil -} - -func (h *Handler) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) { - switch deal.Proposal.SerializationMode { - //case SerializationRaw: - //case SerializationIPLD: - case SerializationUnixFs: - default: - return nil, xerrors.Errorf("deal proposal with unsupported serialization: %s", deal.Proposal.SerializationMode) - } - - if deal.Proposal.Payment.ChannelMessage != nil { - log.Info("waiting for channel message to appear on chain") - if _, err := h.full.StateWaitMsg(ctx, *deal.Proposal.Payment.ChannelMessage); err != nil { - return nil, xerrors.Errorf("waiting for paych message: %w", err) - } - } - - if err := h.consumeVouchers(ctx, deal); err != nil { - return nil, err - } - - log.Info("fetching data for a deal") - err := h.sendSignedResponse(StorageDealResponse{ - State: api.DealAccepted, - Message: "", - Proposal: deal.ProposalCid, - }) - if err != nil { - return nil, err - } - - return nil, merkledag.FetchGraph(ctx, deal.Ref, h.dag) -} - -// STAGED - -func (h *Handler) staged(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) { - err := h.sendSignedResponse(StorageDealResponse{ - State: api.DealStaged, - Proposal: deal.ProposalCid, - }) - if err != nil { - log.Warnf("Sending deal response failed: %s", err) - } - - root, err := h.dag.Get(ctx, deal.Ref) - if err != nil { - return nil, xerrors.Errorf("failed to get file root for deal: %s", err) - } - - // TODO: abstract this away into ReadSizeCloser + implement different modes - n, err := unixfile.NewUnixfsFile(ctx, h.dag, root) - if err != nil { - return nil, xerrors.Errorf("cannot open unixfs file: %s", err) - } - - uf, ok := n.(sectorblocks.UnixfsReader) - if !ok { - // we probably got directory, unsupported for now - return nil, xerrors.Errorf("unsupported unixfs file type") - } - - sectorID, err := h.secst.AddUnixfsPiece(deal.Proposal.PieceRef, uf, deal.Proposal.Duration) - if err != nil { - return nil, xerrors.Errorf("AddPiece failed: %s", err) - } - - log.Warnf("New Sector: %d", sectorID) - return func(deal *MinerDeal) { - deal.SectorID = sectorID - }, nil -} - -// SEALING - -func getInclusionProof(ref string, status sectorbuilder.SectorSealingStatus) (PieceInclusionProof, error) { - for i, p := range status.Pieces { - if p.Key == ref { - return PieceInclusionProof{ - Position: uint64(i), - ProofElements: p.InclusionProof, - }, nil - } - } - return PieceInclusionProof{}, xerrors.Errorf("pieceInclusionProof for %s in sector %d not found", ref, status.SectorID) -} - -func (h *Handler) waitSealed(ctx context.Context, deal MinerDeal) (sectorbuilder.SectorSealingStatus, error) { - status, err := h.secst.WaitSeal(ctx, deal.SectorID) - if err != nil { - return sectorbuilder.SectorSealingStatus{}, err - } - - switch status.State { - case sealing_state.Sealed: - case sealing_state.Failed: - return sectorbuilder.SectorSealingStatus{}, xerrors.Errorf("sealing sector %d for deal %s (ref=%s) failed: %s", deal.SectorID, deal.ProposalCid, deal.Ref, status.SealErrorMsg) - case sealing_state.Pending: - return sectorbuilder.SectorSealingStatus{}, xerrors.Errorf("sector status was 'pending' after call to WaitSeal (for sector %d)", deal.SectorID) - case sealing_state.Sealing: - return sectorbuilder.SectorSealingStatus{}, xerrors.Errorf("sector status was 'wait' after call to WaitSeal (for sector %d)", deal.SectorID) - default: - return sectorbuilder.SectorSealingStatus{}, xerrors.Errorf("unknown SealStatusCode: %d", status.SectorID) - } - - return status, nil -} - -func (h *Handler) sealing(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) { - status, err := h.waitSealed(ctx, deal) - if err != nil { - return nil, err - } - - // TODO: don't hardcode unixfs - ip, err := getInclusionProof(string(sectorblocks.SerializationUnixfs0)+deal.Ref.String(), status) - if err != nil { - return nil, err - } - - proof := &actors.InclusionProof{ - Sector: deal.SectorID, - Proof: ip.ProofElements, - } - proofB, err := cbor.DumpObject(proof) - if err != nil { - return nil, err - } - - // store proofs for channels - for i, v := range deal.Proposal.Payment.Vouchers { - if v.Extra.Method == actors.MAMethods.PaymentVerifyInclusion { - // TODO: Set correct minAmount - if _, err := h.full.PaychVoucherAdd(ctx, deal.Proposal.Payment.PayChActor, v, proofB, types.NewInt(0)); err != nil { - return nil, xerrors.Errorf("storing payment voucher %d proof: %w", i, err) - } - } - } - - err = h.sendSignedResponse(StorageDealResponse{ - State: api.DealSealing, - Proposal: deal.ProposalCid, - PieceInclusionProof: ip, - CommD: status.CommD[:], - }) - if err != nil { - log.Warnf("Sending deal response failed: %s", err) - } - - return nil, nil -} - -func (h *Handler) complete(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) { - mcid, err := h.commt.WaitCommit(ctx, deal.Proposal.MinerAddress, deal.SectorID) - if err != nil { - log.Warnf("Waiting for sector commitment message: %s", err) - } - - err = h.sendSignedResponse(StorageDealResponse{ - State: api.DealComplete, - Proposal: deal.ProposalCid, - - SectorCommitMessage: &mcid, - }) - if err != nil { - log.Warnf("Sending deal response failed: %s", err) - } - - return nil, nil -} diff --git a/chain/deals/handler.go b/chain/deals/provider.go similarity index 71% rename from chain/deals/handler.go rename to chain/deals/provider.go index 02db7153c..67fbf33f1 100644 --- a/chain/deals/handler.go +++ b/chain/deals/provider.go @@ -14,6 +14,7 @@ import ( "golang.org/x/xerrors" "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/address" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/node/modules/dtypes" @@ -27,7 +28,7 @@ func init() { type MinerDeal struct { Client peer.ID - Proposal StorageDealProposal + Proposal actors.StorageDealProposal ProposalCid cid.Cid State api.DealState @@ -38,7 +39,7 @@ type MinerDeal struct { s inet.Stream } -type Handler struct { +type Provider struct { pricePerByteBlock types.BigInt // how much we want for storing one byte for one block minPieceSize uint64 @@ -73,7 +74,7 @@ type minerDealUpdate struct { mut func(*MinerDeal) } -func NewHandler(ds dtypes.MetadataDS, secst *sectorblocks.SectorBlocks, commt *commitment.Tracker, dag dtypes.StagingDAG, fullNode api.FullNode) (*Handler, error) { +func NewProvider(ds dtypes.MetadataDS, secst *sectorblocks.SectorBlocks, commt *commitment.Tracker, dag dtypes.StagingDAG, fullNode api.FullNode) (*Provider, error) { addr, err := ds.Get(datastore.NewKey("miner-address")) if err != nil { return nil, err @@ -83,7 +84,7 @@ func NewHandler(ds dtypes.MetadataDS, secst *sectorblocks.SectorBlocks, commt *c return nil, err } - h := &Handler{ + h := &Provider{ secst: secst, commt: commt, dag: dag, @@ -120,40 +121,40 @@ func NewHandler(ds dtypes.MetadataDS, secst *sectorblocks.SectorBlocks, commt *c return h, nil } -func (h *Handler) Run(ctx context.Context) { +func (p *Provider) Run(ctx context.Context) { // TODO: restore state go func() { - defer log.Warn("quitting deal handler loop") - defer close(h.stopped) + defer log.Warn("quitting deal provider loop") + defer close(p.stopped) for { select { - case deal := <-h.incoming: // DealAccepted - h.onIncoming(deal) - case update := <-h.updated: // DealStaged - h.onUpdated(ctx, update) - case <-h.stop: + case deal := <-p.incoming: // DealAccepted + p.onIncoming(deal) + case update := <-p.updated: // DealStaged + p.onUpdated(ctx, update) + case <-p.stop: return } } }() } -func (h *Handler) onIncoming(deal MinerDeal) { +func (p *Provider) onIncoming(deal MinerDeal) { log.Info("incoming deal") - h.conns[deal.ProposalCid] = deal.s + p.conns[deal.ProposalCid] = deal.s - if err := h.deals.Begin(deal.ProposalCid, deal); err != nil { + if err := p.deals.Begin(deal.ProposalCid, deal); err != nil { // This can happen when client re-sends proposal - h.failDeal(deal.ProposalCid, err) + p.failDeal(deal.ProposalCid, err) log.Errorf("deal tracking failed: %s", err) return } go func() { - h.updated <- minerDealUpdate{ + p.updated <- minerDealUpdate{ newState: api.DealAccepted, id: deal.ProposalCid, err: nil, @@ -161,15 +162,15 @@ func (h *Handler) onIncoming(deal MinerDeal) { }() } -func (h *Handler) onUpdated(ctx context.Context, update minerDealUpdate) { +func (p *Provider) onUpdated(ctx context.Context, update minerDealUpdate) { log.Infof("Deal %s updated state to %d", update.id, update.newState) if update.err != nil { log.Errorf("deal %s failed: %s", update.id, update.err) - h.failDeal(update.id, update.err) + p.failDeal(update.id, update.err) return } var deal MinerDeal - err := h.deals.MutateMiner(update.id, func(d *MinerDeal) error { + err := p.deals.MutateMiner(update.id, func(d *MinerDeal) error { d.State = update.newState if update.mut != nil { update.mut(d) @@ -178,30 +179,29 @@ func (h *Handler) onUpdated(ctx context.Context, update minerDealUpdate) { return nil }) if err != nil { - h.failDeal(update.id, err) + p.failDeal(update.id, err) return } switch update.newState { case api.DealAccepted: - h.handle(ctx, deal, h.accept, api.DealStaged) + p.handle(ctx, deal, p.accept, api.DealStaged) case api.DealStaged: - h.handle(ctx, deal, h.staged, api.DealSealing) + p.handle(ctx, deal, p.staged, api.DealSealing) case api.DealSealing: - h.handle(ctx, deal, h.sealing, api.DealComplete) + p.handle(ctx, deal, p.sealing, api.DealComplete) case api.DealComplete: - h.handle(ctx, deal, h.complete, api.DealNoUpdate) + p.handle(ctx, deal, p.complete, api.DealNoUpdate) } } -func (h *Handler) newDeal(s inet.Stream, proposal StorageDealProposal) (MinerDeal, error) { - // TODO: Review: Not signed? +func (p *Provider) newDeal(s inet.Stream, proposal actors.StorageDealProposal) (MinerDeal, error) { proposalNd, err := cbor.WrapObject(proposal, math.MaxUint64, -1) if err != nil { return MinerDeal{}, err } - ref, err := cid.Parse(proposal.PieceRef) + ref, err := cid.Cast(proposal.PieceRef) if err != nil { return MinerDeal{}, err } @@ -218,27 +218,27 @@ func (h *Handler) newDeal(s inet.Stream, proposal StorageDealProposal) (MinerDea }, nil } -func (h *Handler) HandleStream(s inet.Stream) { +func (p *Provider) HandleStream(s inet.Stream) { log.Info("Handling storage deal proposal!") - proposal, err := h.readProposal(s) + proposal, err := p.readProposal(s) if err != nil { log.Error(err) s.Close() return } - deal, err := h.newDeal(s, proposal.Proposal) + deal, err := p.newDeal(s, proposal) if err != nil { log.Error(err) s.Close() return } - h.incoming <- deal + p.incoming <- deal } -func (h *Handler) Stop() { - close(h.stop) - <-h.stopped +func (p *Provider) Stop() { + close(p.stop) + <-p.stopped } diff --git a/chain/deals/asks.go b/chain/deals/provider_asks.go similarity index 66% rename from chain/deals/asks.go rename to chain/deals/provider_asks.go index df82550e8..af7c8cce6 100644 --- a/chain/deals/asks.go +++ b/chain/deals/provider_asks.go @@ -14,44 +14,44 @@ import ( "golang.org/x/xerrors" ) -func (h *Handler) SetPrice(p types.BigInt, ttlsecs int64) error { - h.askLk.Lock() - defer h.askLk.Unlock() +func (p *Provider) SetPrice(price types.BigInt, ttlsecs int64) error { + p.askLk.Lock() + defer p.askLk.Unlock() var seqno uint64 - if h.ask != nil { - seqno = h.ask.Ask.SeqNo + 1 + if p.ask != nil { + seqno = p.ask.Ask.SeqNo + 1 } now := time.Now().Unix() ask := &types.StorageAsk{ - Price: p, + Price: price, Timestamp: now, Expiry: now + ttlsecs, - Miner: h.actor, + Miner: p.actor, SeqNo: seqno, - MinPieceSize: h.minPieceSize, + MinPieceSize: p.minPieceSize, } - ssa, err := h.signAsk(ask) + ssa, err := p.signAsk(ask) if err != nil { return err } - return h.saveAsk(ssa) + return p.saveAsk(ssa) } -func (h *Handler) getAsk(m address.Address) *types.SignedStorageAsk { - h.askLk.Lock() - defer h.askLk.Unlock() - if m != h.actor { +func (p *Provider) getAsk(m address.Address) *types.SignedStorageAsk { + p.askLk.Lock() + defer p.askLk.Unlock() + if m != p.actor { return nil } - return h.ask + return p.ask } -func (h *Handler) HandleAskStream(s inet.Stream) { +func (p *Provider) HandleAskStream(s inet.Stream) { defer s.Close() var ar AskRequest if err := cborrpc.ReadCborRPC(s, &ar); err != nil { @@ -59,7 +59,7 @@ func (h *Handler) HandleAskStream(s inet.Stream) { return } - resp := h.processAskRequest(&ar) + resp := p.processAskRequest(&ar) if err := cborrpc.WriteCborRPC(s, resp); err != nil { log.Errorf("failed to write ask response: %s", err) @@ -67,19 +67,19 @@ func (h *Handler) HandleAskStream(s inet.Stream) { } } -func (h *Handler) processAskRequest(ar *AskRequest) *AskResponse { +func (p *Provider) processAskRequest(ar *AskRequest) *AskResponse { return &AskResponse{ - Ask: h.getAsk(ar.Miner), + Ask: p.getAsk(ar.Miner), } } var bestAskKey = datastore.NewKey("latest-ask") -func (h *Handler) tryLoadAsk() error { - h.askLk.Lock() - defer h.askLk.Unlock() +func (p *Provider) tryLoadAsk() error { + p.askLk.Lock() + defer p.askLk.Unlock() - err := h.loadAsk() + err := p.loadAsk() if err != nil { if xerrors.Is(err, datastore.ErrNotFound) { log.Warn("no previous ask found, miner will not accept deals until a price is set") @@ -91,8 +91,8 @@ func (h *Handler) tryLoadAsk() error { return nil } -func (h *Handler) loadAsk() error { - askb, err := h.ds.Get(datastore.NewKey("latest-ask")) +func (p *Provider) loadAsk() error { + askb, err := p.ds.Get(datastore.NewKey("latest-ask")) if err != nil { return xerrors.Errorf("failed to load most recent ask from disk: %w", err) } @@ -102,22 +102,22 @@ func (h *Handler) loadAsk() error { return err } - h.ask = &ssa + p.ask = &ssa return nil } -func (h *Handler) signAsk(a *types.StorageAsk) (*types.SignedStorageAsk, error) { +func (p *Provider) signAsk(a *types.StorageAsk) (*types.SignedStorageAsk, error) { b, err := cbor.DumpObject(a) if err != nil { return nil, err } - worker, err := h.getWorker(h.actor) + worker, err := p.getWorker(p.actor) if err != nil { return nil, xerrors.Errorf("failed to get worker to sign ask: %w", err) } - sig, err := h.full.WalletSign(context.TODO(), worker, b) + sig, err := p.full.WalletSign(context.TODO(), worker, b) if err != nil { return nil, err } @@ -128,17 +128,17 @@ func (h *Handler) signAsk(a *types.StorageAsk) (*types.SignedStorageAsk, error) }, nil } -func (h *Handler) saveAsk(a *types.SignedStorageAsk) error { +func (p *Provider) saveAsk(a *types.SignedStorageAsk) error { b, err := cbor.DumpObject(a) if err != nil { return err } - if err := h.ds.Put(bestAskKey, b); err != nil { + if err := p.ds.Put(bestAskKey, b); err != nil { return err } - h.ask = a + p.ask = a return nil } diff --git a/chain/deals/provider_states.go b/chain/deals/provider_states.go new file mode 100644 index 000000000..a7c321faa --- /dev/null +++ b/chain/deals/provider_states.go @@ -0,0 +1,245 @@ +package deals + +import ( + "context" + "github.com/ipfs/go-cid" + + "github.com/filecoin-project/go-sectorbuilder/sealing_state" + "github.com/ipfs/go-merkledag" + unixfile "github.com/ipfs/go-unixfs/file" + "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/actors" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/lib/sectorbuilder" + "github.com/filecoin-project/lotus/storage/sectorblocks" +) + +type providerHandlerFunc func(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) + +func (p *Provider) handle(ctx context.Context, deal MinerDeal, cb providerHandlerFunc, next api.DealState) { + go func() { + mut, err := cb(ctx, deal) + + if err == nil && next == api.DealNoUpdate { + return + } + + select { + case p.updated <- minerDealUpdate{ + newState: next, + id: deal.ProposalCid, + err: err, + mut: mut, + }: + case <-p.stop: + } + }() +} + +// ACCEPTED + +func (p *Provider) addMarketFunds(ctx context.Context, deal MinerDeal) error { + log.Info("Adding market funds for storage collateral") + smsg, err := p.full.MpoolPushMessage(ctx, &types.Message{ + To: actors.StorageMarketAddress, + From: deal.Proposal.Provider, + Value: deal.Proposal.StorageCollateral, + GasPrice: types.NewInt(0), + GasLimit: types.NewInt(1000000), + Method: actors.SMAMethods.AddBalance, + }) + if err != nil { + return err + } + + r, err := p.full.StateWaitMsg(ctx, smsg.Cid()) + if err != nil { + return err + } + + if r.Receipt.ExitCode != 0 { + return xerrors.Errorf("adding funds to storage miner market actor failed: exit %d", r.Receipt.ExitCode) + } + + return nil +} + +func (p *Provider) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) { + switch deal.Proposal.PieceSerialization { + //case SerializationRaw: + //case SerializationIPLD: + case actors.SerializationUnixFSv0: + default: + return nil, xerrors.Errorf("deal proposal with unsupported serialization: %s", deal.Proposal.PieceSerialization) + } + + // TODO: check StorageCollateral / StoragePrice + + // check market funds + clientMarketBalance, err := p.full.StateMarketBalance(ctx, deal.Proposal.Client) + if err != nil { + return nil, err + } + + // This doesn't guarantee that the client won't withdraw / lock those funds + // but it's a decent first filter + if clientMarketBalance.Available.LessThan(deal.Proposal.StoragePrice) { + return nil, xerrors.New("clientMarketBalance.Available too small") + } + + providerMarketBalance, err := p.full.StateMarketBalance(ctx, deal.Proposal.Client) + if err != nil { + return nil, err + } + + // TODO: this needs to be atomic + if providerMarketBalance.Available.LessThan(deal.Proposal.StorageCollateral) { + if err := p.addMarketFunds(ctx, deal); err != nil { + return nil, err + } + } + + log.Info("publishing deal") + + // TODO: We may want this to happen after fetching data + smsg, err := p.full.MpoolPushMessage(ctx, &types.Message{ + To: actors.StorageMarketAddress, + From: deal.Proposal.Provider, + Value: types.NewInt(0), + GasPrice: types.NewInt(0), + GasLimit: types.NewInt(1000000), + Method: actors.SMAMethods.PublishStorageDeals, + }) + if err != nil { + return nil, err + } + r, err := p.full.StateWaitMsg(ctx, smsg.Cid()) + if err != nil { + return nil, err + } + if r.Receipt.ExitCode != 0 { + return nil, xerrors.Errorf("publishing deal failed: exit %d", r.Receipt.ExitCode) + } + + log.Info("fetching data for a deal") + err = p.sendSignedResponse(StorageDealResponse{ + State: api.DealAccepted, + Message: "", + Proposal: deal.ProposalCid, + PublishMessage: smsg.Cid(), + }) + if err != nil { + return nil, err + } + + return nil, merkledag.FetchGraph(ctx, deal.Ref, p.dag) +} + +// STAGED + +func (p *Provider) staged(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) { + err := p.sendSignedResponse(StorageDealResponse{ + State: api.DealStaged, + Proposal: deal.ProposalCid, + }) + if err != nil { + log.Warnf("Sending deal response failed: %s", err) + } + + root, err := p.dag.Get(ctx, deal.Ref) + if err != nil { + return nil, xerrors.Errorf("failed to get file root for deal: %s", err) + } + + // TODO: abstract this away into ReadSizeCloser + implement different modes + n, err := unixfile.NewUnixfsFile(ctx, p.dag, root) + if err != nil { + return nil, xerrors.Errorf("cannot open unixfs file: %s", err) + } + + uf, ok := n.(sectorblocks.UnixfsReader) + if !ok { + // we probably got directory, unsupported for now + return nil, xerrors.Errorf("unsupported unixfs file type") + } + + pcid, err := cid.Cast(deal.Proposal.PieceRef) + if err != nil { + return nil, err + } + + sectorID, err := p.secst.AddUnixfsPiece(pcid, uf, deal.Proposal.Duration) + if err != nil { + return nil, xerrors.Errorf("AddPiece failed: %s", err) + } + + log.Warnf("New Sector: %d", sectorID) + return func(deal *MinerDeal) { + deal.SectorID = sectorID + }, nil +} + +// SEALING + +func (p *Provider) waitSealed(ctx context.Context, deal MinerDeal) (sectorbuilder.SectorSealingStatus, error) { + status, err := p.secst.WaitSeal(ctx, deal.SectorID) + if err != nil { + return sectorbuilder.SectorSealingStatus{}, err + } + + switch status.State { + case sealing_state.Sealed: + case sealing_state.Failed: + return sectorbuilder.SectorSealingStatus{}, xerrors.Errorf("sealing sector %d for deal %s (ref=%s) failed: %s", deal.SectorID, deal.ProposalCid, deal.Ref, status.SealErrorMsg) + case sealing_state.Pending: + return sectorbuilder.SectorSealingStatus{}, xerrors.Errorf("sector status was 'pending' after call to WaitSeal (for sector %d)", deal.SectorID) + case sealing_state.Sealing: + return sectorbuilder.SectorSealingStatus{}, xerrors.Errorf("sector status was 'wait' after call to WaitSeal (for sector %d)", deal.SectorID) + default: + return sectorbuilder.SectorSealingStatus{}, xerrors.Errorf("unknown SealStatusCode: %d", status.SectorID) + } + + return status, nil +} + +func (p *Provider) sealing(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) { + err := p.sendSignedResponse(StorageDealResponse{ + State: api.DealSealing, + Proposal: deal.ProposalCid, + }) + if err != nil { + log.Warnf("Sending deal response failed: %s", err) + } + + _, err = p.waitSealed(ctx, deal) + if err != nil { + return nil, err + } + // TODO: Spec doesn't say anything about inclusion proofs anywhere + // Not sure what mechanisms prevents miner from storing data that isn't + // clients' data + + return nil, nil +} + +func (p *Provider) complete(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) { + // TODO: Add dealID to commtracker (probably before sealing) + mcid, err := p.commt.WaitCommit(ctx, deal.Proposal.Provider, deal.SectorID) + if err != nil { + log.Warnf("Waiting for sector commitment message: %s", err) + } + + err = p.sendSignedResponse(StorageDealResponse{ + State: api.DealComplete, + Proposal: deal.ProposalCid, + + CommitMessage: mcid, + }) + if err != nil { + log.Warnf("Sending deal response failed: %s", err) + } + + return nil, nil +} diff --git a/chain/deals/handler_utils.go b/chain/deals/provider_utils.go similarity index 66% rename from chain/deals/handler_utils.go rename to chain/deals/provider_utils.go index 099a07e7d..49522d92b 100644 --- a/chain/deals/handler_utils.go +++ b/chain/deals/provider_utils.go @@ -17,8 +17,8 @@ import ( "golang.org/x/xerrors" ) -func (h *Handler) failDeal(id cid.Cid, cerr error) { - if err := h.deals.End(id); err != nil { +func (p *Provider) failDeal(id cid.Cid, cerr error) { + if err := p.deals.End(id); err != nil { log.Warnf("deals.End: %s", err) } @@ -29,16 +29,16 @@ func (h *Handler) failDeal(id cid.Cid, cerr error) { log.Errorf("deal %s failed: %s", id, cerr) - err := h.sendSignedResponse(StorageDealResponse{ + err := p.sendSignedResponse(StorageDealResponse{ State: api.DealFailed, Message: cerr.Error(), Proposal: id, }) - s, ok := h.conns[id] + s, ok := p.conns[id] if ok { _ = s.Reset() - delete(h.conns, id) + delete(p.conns, id) } if err != nil { @@ -46,25 +46,29 @@ func (h *Handler) failDeal(id cid.Cid, cerr error) { } } -func (h *Handler) readProposal(s inet.Stream) (proposal SignedStorageDealProposal, err error) { +func (p *Provider) readProposal(s inet.Stream) (proposal actors.StorageDealProposal, err error) { if err := cborrpc.ReadCborRPC(s, &proposal); err != nil { log.Errorw("failed to read proposal message", "error", err) - return SignedStorageDealProposal{}, err + return proposal, err + } + + if err := proposal.Verify(); err != nil { + return proposal, xerrors.Errorf("verifying StorageDealProposal: %w", err) } // TODO: Validate proposal maybe // (and signature, obviously) - if proposal.Proposal.MinerAddress != h.actor { - log.Errorf("proposal with wrong MinerAddress: %s", proposal.Proposal.MinerAddress) - return SignedStorageDealProposal{}, err + if proposal.Provider != p.actor { + log.Errorf("proposal with wrong ProviderAddress: %s", proposal.Provider) + return proposal, err } return } -func (h *Handler) sendSignedResponse(resp StorageDealResponse) error { - s, ok := h.conns[resp.Proposal] +func (p *Provider) sendSignedResponse(resp StorageDealResponse) error { + s, ok := p.conns[resp.Proposal] if !ok { return xerrors.New("couldn't send response: not connected") } @@ -74,12 +78,12 @@ func (h *Handler) sendSignedResponse(resp StorageDealResponse) error { return xerrors.Errorf("serializing response: %w", err) } - worker, err := h.getWorker(h.actor) + worker, err := p.getWorker(p.actor) if err != nil { return err } - sig, err := h.full.WalletSign(context.TODO(), worker, msg) + sig, err := p.full.WalletSign(context.TODO(), worker, msg) if err != nil { return xerrors.Errorf("failed to sign response message: %w", err) } @@ -93,18 +97,18 @@ func (h *Handler) sendSignedResponse(resp StorageDealResponse) error { if err != nil { // Assume client disconnected s.Close() - delete(h.conns, resp.Proposal) + delete(p.conns, resp.Proposal) } return err } -func (h *Handler) getWorker(miner address.Address) (address.Address, error) { +func (p *Provider) getWorker(miner address.Address) (address.Address, error) { getworker := &types.Message{ To: miner, From: miner, Method: actors.MAMethods.GetWorkerAddr, } - r, err := h.full.StateCall(context.TODO(), getworker, nil) + r, err := p.full.StateCall(context.TODO(), getworker, nil) if err != nil { return address.Undef, xerrors.Errorf("getting worker address: %w", err) } diff --git a/chain/deals/types.go b/chain/deals/types.go index 58970700e..b8b38fbf0 100644 --- a/chain/deals/types.go +++ b/chain/deals/types.go @@ -1,21 +1,16 @@ package deals import ( - "github.com/filecoin-project/lotus/api" "github.com/ipfs/go-cid" cbor "github.com/ipfs/go-ipld-cbor" + "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/address" "github.com/filecoin-project/lotus/chain/types" ) func init() { - cbor.RegisterCborType(StorageDealProposal{}) - cbor.RegisterCborType(SignedStorageDealProposal{}) - - cbor.RegisterCborType(PieceInclusionProof{}) - cbor.RegisterCborType(StorageDealResponse{}) cbor.RegisterCborType(SignedStorageDealResponse{}) @@ -23,60 +18,29 @@ func init() { cbor.RegisterCborType(AskResponse{}) } -const ProtocolID = "/fil/storage/mk/1.0.0" +const DealProtocolID = "/fil/storage/mk/1.0.0" const AskProtocolID = "/fil/storage/ask/1.0.0" -type SerializationMode string - -const ( - SerializationUnixFs = "UnixFs" - SerializationRaw = "Raw" - SerializationIPLD = "IPLD" -) - -type StorageDealProposal struct { - PieceRef cid.Cid // TODO: port to spec - SerializationMode SerializationMode - CommP []byte - - Size uint64 - TotalPrice types.BigInt - Duration uint64 - - Payment actors.PaymentInfo - - MinerAddress address.Address - ClientAddress address.Address -} - -type SignedStorageDealProposal struct { - Proposal StorageDealProposal - - Signature *types.Signature -} - -// response - -type PieceInclusionProof struct { - Position uint64 - ProofElements []byte +type Proposal struct { + DealProposal actors.StorageDealProposal } type StorageDealResponse struct { State api.DealState - // DealRejected / DealAccepted / DealFailed / DealStaged + // DealProposalRejected Message string Proposal cid.Cid - // DealSealing - PieceInclusionProof PieceInclusionProof - CommD []byte // TODO: not in spec + // DealAccepted + StorageDeal actors.StorageDeal + PublishMessage cid.Cid // DealComplete - SectorCommitMessage *cid.Cid + CommitMessage cid.Cid } +// TODO: Do we actually need this to be signed? type SignedStorageDealResponse struct { Response StorageDealResponse diff --git a/chain/gen/utils.go b/chain/gen/utils.go index 06ade0030..12b842473 100644 --- a/chain/gen/utils.go +++ b/chain/gen/utils.go @@ -56,7 +56,7 @@ func SetupInitActor(bs bstore.Blockstore, addrs []address.Address) (*types.Actor } act := &types.Actor{ - Code: actors.InitActorCodeCid, + Code: actors.InitCodeCid, Head: statecid, } @@ -85,7 +85,7 @@ func MakeInitialStateTree(bs bstore.Blockstore, actmap map[address.Address]types return nil, xerrors.Errorf("setup init actor: %w", err) } - if err := state.SetActor(actors.InitActorAddress, initact); err != nil { + if err := state.SetActor(actors.InitAddress, initact); err != nil { return nil, xerrors.Errorf("set init actor: %w", err) } @@ -104,7 +104,7 @@ func MakeInitialStateTree(bs bstore.Blockstore, actmap map[address.Address]types } err = state.SetActor(actors.NetworkAddress, &types.Actor{ - Code: actors.AccountActorCodeCid, + Code: actors.AccountCodeCid, Balance: netAmt, Head: emptyobject, }) @@ -113,7 +113,7 @@ func MakeInitialStateTree(bs bstore.Blockstore, actmap map[address.Address]types } err = state.SetActor(actors.BurntFundsAddress, &types.Actor{ - Code: actors.AccountActorCodeCid, + Code: actors.AccountCodeCid, Balance: types.NewInt(0), Head: emptyobject, }) @@ -123,7 +123,7 @@ func MakeInitialStateTree(bs bstore.Blockstore, actmap map[address.Address]types for a, v := range actmap { err = state.SetActor(a, &types.Actor{ - Code: actors.AccountActorCodeCid, + Code: actors.AccountCodeCid, Balance: v, Head: emptyobject, }) @@ -154,7 +154,7 @@ func SetupStorageMarketActor(bs bstore.Blockstore) (*types.Actor, error) { } return &types.Actor{ - Code: actors.StoragePowerActorCodeCid, + Code: actors.StoragePowerCodeCid, Head: stcid, Nonce: 0, Balance: types.NewInt(0), @@ -333,7 +333,7 @@ func MakeGenesisBlock(bs bstore.Blockstore, balances map[address.Address]types.B } b := &types.BlockHeader{ - Miner: actors.InitActorAddress, + Miner: actors.InitAddress, Tickets: []*types.Ticket{genesisticket}, ElectionProof: []byte("the Genesis block"), Parents: []cid.Cid{}, diff --git a/chain/state/statetree.go b/chain/state/statetree.go index 4f1cdb36d..55a812275 100644 --- a/chain/state/statetree.go +++ b/chain/state/statetree.go @@ -68,7 +68,7 @@ func (st *StateTree) SetActor(addr address.Address, act *types.Actor) error { } func (st *StateTree) lookupID(addr address.Address) (address.Address, error) { - act, err := st.GetActor(actors.InitActorAddress) + act, err := st.GetActor(actors.InitAddress) if err != nil { return address.Undef, xerrors.Errorf("getting init actor: %w", err) } @@ -143,7 +143,7 @@ func (st *StateTree) Snapshot() error { func (st *StateTree) RegisterNewAddress(addr address.Address, act *types.Actor) (address.Address, error) { var out address.Address - err := st.MutateActor(actors.InitActorAddress, func(initact *types.Actor) error { + err := st.MutateActor(actors.InitAddress, func(initact *types.Actor) error { var ias actors.InitActorState if err := st.Store.Get(context.TODO(), initact.Head, &ias); err != nil { return err diff --git a/chain/state/statetree_test.go b/chain/state/statetree_test.go index 79629daeb..360f4778e 100644 --- a/chain/state/statetree_test.go +++ b/chain/state/statetree_test.go @@ -27,7 +27,7 @@ func BenchmarkStateTreeSet(b *testing.B) { err = st.SetActor(a, &types.Actor{ Balance: types.NewInt(1258812523), Code: actors.StorageMinerCodeCid, - Head: actors.AccountActorCodeCid, + Head: actors.AccountCodeCid, Nonce: uint64(i), }) if err != nil { @@ -54,7 +54,7 @@ func BenchmarkStateTreeSetFlush(b *testing.B) { err = st.SetActor(a, &types.Actor{ Balance: types.NewInt(1258812523), Code: actors.StorageMinerCodeCid, - Head: actors.AccountActorCodeCid, + Head: actors.AccountCodeCid, Nonce: uint64(i), }) if err != nil { @@ -80,7 +80,7 @@ func BenchmarkStateTree10kGetActor(b *testing.B) { err = st.SetActor(a, &types.Actor{ Balance: types.NewInt(1258812523 + uint64(i)), Code: actors.StorageMinerCodeCid, - Head: actors.AccountActorCodeCid, + Head: actors.AccountCodeCid, Nonce: uint64(i), }) if err != nil { @@ -123,7 +123,7 @@ func TestSetCache(t *testing.T) { act := &types.Actor{ Balance: types.NewInt(0), Code: actors.StorageMinerCodeCid, - Head: actors.AccountActorCodeCid, + Head: actors.AccountCodeCid, Nonce: 0, } diff --git a/chain/vm/invoker.go b/chain/vm/invoker.go index c91109734..decc590b7 100644 --- a/chain/vm/invoker.go +++ b/chain/vm/invoker.go @@ -29,12 +29,12 @@ func newInvoker() *invoker { } // add builtInCode using: register(cid, singleton) - inv.register(actors.InitActorCodeCid, actors.InitActor{}, actors.InitActorState{}) - inv.register(actors.StoragePowerActorCodeCid, actors.StoragePowerActor{}, actors.StoragePowerState{}) - inv.register(actors.StorageMarketActorCodeCid, actors.StorageMarketActor{}, actors.StorageMarketState{}) + inv.register(actors.InitCodeCid, actors.InitActor{}, actors.InitActorState{}) + inv.register(actors.StoragePowerCodeCid, actors.StoragePowerActor{}, actors.StoragePowerState{}) + inv.register(actors.StorageMarketCodeCid, actors.StorageMarketActor{}, actors.StorageMarketState{}) inv.register(actors.StorageMinerCodeCid, actors.StorageMinerActor{}, actors.StorageMinerActorState{}) - inv.register(actors.MultisigActorCodeCid, actors.MultiSigActor{}, actors.MultiSigActorState{}) - inv.register(actors.PaymentChannelActorCodeCid, actors.PaymentChannelActor{}, actors.PaymentChannelActorState{}) + inv.register(actors.MultisigCodeCid, actors.MultiSigActor{}, actors.MultiSigActorState{}) + inv.register(actors.PaymentChannelCodeCid, actors.PaymentChannelActor{}, actors.PaymentChannelActorState{}) return inv } diff --git a/chain/vm/mkactor.go b/chain/vm/mkactor.go index 6b0fc4ca4..1e1885825 100644 --- a/chain/vm/mkactor.go +++ b/chain/vm/mkactor.go @@ -66,7 +66,7 @@ func NewBLSAccountActor(st *state.StateTree, addr address.Address) (*types.Actor } nact := &types.Actor{ - Code: actors.AccountActorCodeCid, + Code: actors.AccountCodeCid, Balance: types.NewInt(0), Head: c, } @@ -76,7 +76,7 @@ func NewBLSAccountActor(st *state.StateTree, addr address.Address) (*types.Actor func NewSecp256k1AccountActor(st *state.StateTree, addr address.Address) (*types.Actor, aerrors.ActorError) { nact := &types.Actor{ - Code: actors.AccountActorCodeCid, + Code: actors.AccountCodeCid, Balance: types.NewInt(0), Head: EmptyObjectCid, } diff --git a/chain/vm/vm.go b/chain/vm/vm.go index 03faebc29..a37edeb41 100644 --- a/chain/vm/vm.go +++ b/chain/vm/vm.go @@ -177,7 +177,7 @@ func (vmc *VMContext) ChargeGas(amount uint64) aerrors.ActorError { } func (vmc *VMContext) StateTree() (types.StateTree, aerrors.ActorError) { - if vmc.msg.To != actors.InitActorAddress { + if vmc.msg.To != actors.InitAddress { return nil, aerrors.Escalate(fmt.Errorf("only init actor can access state tree directly"), "invalid use of StateTree") } @@ -216,7 +216,7 @@ func ResolveToKeyAddr(state types.StateTree, cst *hamt.CborIpldStore, addr addre return address.Undef, aerrors.Newf(1, "failed to find actor: %s", addr) } - if act.Code != actors.AccountActorCodeCid { + if act.Code != actors.AccountCodeCid { return address.Undef, aerrors.New(1, "address was not for an account actor") } diff --git a/node/builder.go b/node/builder.go index 00ab37328..8a59339cc 100644 --- a/node/builder.go +++ b/node/builder.go @@ -256,7 +256,7 @@ func Online() Option { Override(new(dtypes.StagingDAG), modules.StagingDAG), Override(new(*retrieval.Miner), retrieval.NewMiner), - Override(new(*deals.Handler), deals.NewHandler), + Override(new(*deals.Provider), deals.NewProvider), Override(HandleRetrievalKey, modules.HandleRetrieval), Override(HandleDealsKey, modules.HandleDeals), Override(RunSectorServiceKey, modules.RunSectorService), diff --git a/node/impl/client/client.go b/node/impl/client/client.go index 6269fb932..add507cce 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -114,9 +114,9 @@ func (a *API) ClientStartDeal(ctx context.Context, data cid.Cid, miner address.A ChannelMessage: payment.ChannelMessage, Vouchers: payment.Vouchers, }, - MinerAddress: miner, - ClientAddress: self, - MinerID: pid, + ProviderAddress: miner, + Client: self, + MinerID: pid, } c, err := a.DealClient.Start(ctx, proposal, vd) diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 009ba57b2..54f800a5f 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -98,13 +98,13 @@ func HandleRetrieval(host host.Host, lc fx.Lifecycle, m *retrieval.Miner) { }) } -func HandleDeals(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, h *deals.Handler) { +func HandleDeals(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, h *deals.Provider) { ctx := helpers.LifecycleCtx(mctx, lc) lc.Append(fx.Hook{ OnStart: func(context.Context) error { h.Run(ctx) - host.SetStreamHandler(deals.ProtocolID, h.HandleStream) + host.SetStreamHandler(deals.DealProtocolID, h.HandleStream) host.SetStreamHandler(deals.AskProtocolID, h.HandleAskStream) return nil }, diff --git a/paych/simple.go b/paych/simple.go index d8a3f4163..99c578d05 100644 --- a/paych/simple.go +++ b/paych/simple.go @@ -19,14 +19,14 @@ func (pm *Manager) createPaych(ctx context.Context, from, to address.Address, am enc, aerr := actors.SerializeParams(&actors.ExecParams{ Params: params, - Code: actors.PaymentChannelActorCodeCid, + Code: actors.PaymentChannelCodeCid, }) if aerr != nil { return address.Undef, cid.Undef, aerr } msg := &types.Message{ - To: actors.InitActorAddress, + To: actors.InitAddress, From: from, Value: amt, Method: actors.IAMethods.Exec,