lotus/chain/deals/provider.go

295 lines
7.1 KiB
Go
Raw Normal View History

2019-08-02 14:09:54 +00:00
package deals
import (
2019-08-06 22:04:21 +00:00
"context"
"errors"
2019-09-13 21:00:36 +00:00
"sync"
2019-08-06 22:04:21 +00:00
2019-09-13 21:00:36 +00:00
cid "github.com/ipfs/go-cid"
datastore "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
2019-08-02 14:09:54 +00:00
inet "github.com/libp2p/go-libp2p-core/network"
2019-08-06 22:04:21 +00:00
"github.com/libp2p/go-libp2p-core/peer"
2019-09-13 21:00:36 +00:00
"golang.org/x/xerrors"
2019-09-10 12:35:43 +00:00
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/datatransfer"
2019-11-07 14:11:39 +00:00
"github.com/filecoin-project/lotus/lib/cborutil"
2019-11-05 18:40:51 +00:00
"github.com/filecoin-project/lotus/lib/statestore"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/storage"
2019-11-05 18:40:51 +00:00
"github.com/filecoin-project/lotus/storage/sectorblocks"
2019-08-02 14:09:54 +00:00
)
var ProviderDsPrefix = "/deals/provider"
2019-08-06 22:04:21 +00:00
type MinerDeal struct {
Client peer.ID
Proposal actors.StorageDealProposal
2019-08-06 22:04:21 +00:00
ProposalCid cid.Cid
2019-09-10 14:13:24 +00:00
State api.DealState
2019-08-06 22:04:21 +00:00
Ref cid.Cid
DealID uint64
2019-09-10 14:13:24 +00:00
SectorID uint64 // Set when State >= DealStaged
2019-08-12 21:48:18 +00:00
s inet.Stream
2019-08-06 22:04:21 +00:00
}
type Provider struct {
2019-09-06 22:39:47 +00:00
pricePerByteBlock types.BigInt // how much we want for storing one byte for one block
2019-09-13 21:00:36 +00:00
minPieceSize uint64
ask *types.SignedStorageAsk
askLk sync.Mutex
2019-09-06 22:39:47 +00:00
2019-11-05 18:40:51 +00:00
secb *sectorblocks.SectorBlocks
sminer *storage.Miner
2019-10-31 19:03:26 +00:00
full api.FullNode
2019-08-06 22:04:21 +00:00
// TODO: This will go away once storage market module + CAR
// is implemented
2019-08-06 22:04:21 +00:00
dag dtypes.StagingDAG
// dataTransfer is the manager of data transfers used by this storage provider
dataTransfer dtypes.ProviderDataTransfer
2019-11-01 11:07:05 +00:00
deals *statestore.StateStore
2019-09-13 21:00:36 +00:00
ds dtypes.MetadataDS
conns map[cid.Cid]inet.Stream
2019-08-06 22:04:21 +00:00
2019-08-06 23:08:34 +00:00
actor address.Address
incoming chan MinerDeal
2019-09-10 12:35:43 +00:00
updated chan minerDealUpdate
stop chan struct{}
stopped chan struct{}
2019-08-02 14:09:54 +00:00
}
2019-09-10 12:35:43 +00:00
type minerDealUpdate struct {
2019-09-10 14:13:24 +00:00
newState api.DealState
id cid.Cid
err error
2019-08-12 21:48:18 +00:00
mut func(*MinerDeal)
2019-08-07 11:46:09 +00:00
}
var (
// ErrDataTransferFailed means a data transfer for a deal failed
ErrDataTransferFailed = errors.New("Deal data transfer failed")
)
2019-11-11 19:55:25 +00:00
func NewProvider(ds dtypes.MetadataDS, sminer *storage.Miner, secb *sectorblocks.SectorBlocks, dag dtypes.StagingDAG, dataTransfer dtypes.ProviderDataTransfer, fullNode api.FullNode) (*Provider, error) {
2019-08-06 23:08:34 +00:00
addr, err := ds.Get(datastore.NewKey("miner-address"))
if err != nil {
return nil, err
}
minerAddress, err := address.NewFromBytes(addr)
if err != nil {
return nil, err
}
h := &Provider{
2019-11-11 19:55:25 +00:00
sminer: sminer,
dag: dag,
dataTransfer: dataTransfer,
2019-11-11 19:55:25 +00:00
full: fullNode,
secb: secb,
2019-08-06 22:04:21 +00:00
2019-09-06 22:39:47 +00:00
pricePerByteBlock: types.NewInt(3), // TODO: allow setting
2019-11-06 23:09:48 +00:00
minPieceSize: 256, // TODO: allow setting (BUT KEEP MIN 256! (because of how we fill sectors up))
2019-09-06 22:39:47 +00:00
conns: map[cid.Cid]inet.Stream{},
2019-08-06 23:08:34 +00:00
incoming: make(chan MinerDeal),
2019-09-10 12:35:43 +00:00
updated: make(chan minerDealUpdate),
stop: make(chan struct{}),
stopped: make(chan struct{}),
2019-08-06 23:08:34 +00:00
actor: minerAddress,
deals: statestore.New(namespace.Wrap(ds, datastore.NewKey(ProviderDsPrefix))),
2019-09-13 21:00:36 +00:00
ds: ds,
}
if err := h.tryLoadAsk(); err != nil {
return nil, err
}
if h.ask == nil {
// TODO: we should be fine with this state, and just say it means 'not actively accepting deals'
// for now... lets just set a price
2019-10-29 10:19:39 +00:00
if err := h.SetPrice(types.NewInt(500_000_000), 1000000); err != nil {
2019-09-13 21:00:36 +00:00
return nil, xerrors.Errorf("failed setting a default price: %w", err)
}
}
// register a data transfer event handler -- this will move deals from
// accepted to staged
h.dataTransfer.SubscribeToEvents(h.onDataTransferEvent)
2019-09-13 21:00:36 +00:00
return h, nil
2019-08-02 14:09:54 +00:00
}
func (p *Provider) Run(ctx context.Context) {
2019-08-07 20:16:26 +00:00
// TODO: restore state
2019-08-06 22:04:21 +00:00
go func() {
defer log.Warn("quitting deal provider loop")
defer close(p.stopped)
2019-08-06 22:04:21 +00:00
for {
select {
case deal := <-p.incoming: // DealAccepted
p.onIncoming(deal)
case update := <-p.updated: // DealStaged
p.onUpdated(ctx, update)
case <-p.stop:
2019-08-06 22:04:21 +00:00
return
}
}
}()
}
func (p *Provider) onIncoming(deal MinerDeal) {
log.Info("incoming deal")
2019-08-02 14:09:54 +00:00
p.conns[deal.ProposalCid] = deal.s
2019-08-02 16:25:10 +00:00
if err := p.deals.Begin(deal.ProposalCid, &deal); err != nil {
// This can happen when client re-sends proposal
p.failDeal(deal.ProposalCid, err)
log.Errorf("deal tracking failed: %s", err)
2019-08-02 14:09:54 +00:00
return
}
go func() {
p.updated <- minerDealUpdate{
2019-09-10 14:13:24 +00:00
newState: api.DealAccepted,
id: deal.ProposalCid,
err: nil,
}
}()
}
2019-08-06 23:08:34 +00:00
func (p *Provider) onUpdated(ctx context.Context, update minerDealUpdate) {
2019-11-08 17:15:38 +00:00
log.Infof("Deal %s updated state to %s", update.id, api.DealStates[update.newState])
if update.err != nil {
2019-11-06 12:04:33 +00:00
log.Errorf("deal %s (newSt: %d) failed: %+v", update.id, update.newState, update.err)
p.failDeal(update.id, update.err)
2019-08-06 22:04:21 +00:00
return
}
var deal MinerDeal
2019-11-01 11:07:05 +00:00
err := p.deals.Mutate(update.id, func(d *MinerDeal) error {
d.State = update.newState
2019-08-12 21:48:18 +00:00
if update.mut != nil {
update.mut(d)
}
deal = *d
return nil
})
if err != nil {
p.failDeal(update.id, err)
return
}
switch update.newState {
2019-09-10 14:13:24 +00:00
case api.DealAccepted:
p.handle(ctx, deal, p.accept, api.DealNoUpdate)
2019-09-10 14:13:24 +00:00
case api.DealStaged:
p.handle(ctx, deal, p.staged, api.DealSealing)
2019-09-10 14:13:24 +00:00
case api.DealSealing:
p.handle(ctx, deal, p.sealing, api.DealComplete)
2019-09-16 18:04:15 +00:00
case api.DealComplete:
p.handle(ctx, deal, p.complete, api.DealNoUpdate)
2019-08-02 14:09:54 +00:00
}
}
// onDataTransferEvent is the function called when an event occurs in a data
// transfer -- it reads the voucher to verify this even occurred in a storage
// market deal, then, based on the data transfer event that occurred, it generates
// and update message for the deal -- either moving to staged for a completion
// event or moving to error if a data transfer error occurs
func (p *Provider) onDataTransferEvent(event datatransfer.Event, channelState datatransfer.ChannelState) {
voucher, ok := channelState.Voucher().(*StorageDataTransferVoucher)
// if this event is for a transfer not related to storage, ignore
if !ok {
return
}
// data transfer events for opening and progress do not affect deal state
var next api.DealState
var err error
var mut func(*MinerDeal)
switch event {
case datatransfer.Complete:
next = api.DealStaged
mut = func(deal *MinerDeal) {
deal.DealID = voucher.DealID
}
case datatransfer.Error:
next = api.DealFailed
err = ErrDataTransferFailed
default:
// the only events we care about are complete and error
return
}
select {
case p.updated <- minerDealUpdate{
newState: next,
id: voucher.Proposal,
err: err,
mut: mut,
}:
case <-p.stop:
}
}
2019-11-06 17:38:42 +00:00
func (p *Provider) newDeal(s inet.Stream, proposal Proposal) (MinerDeal, error) {
2019-11-07 14:11:39 +00:00
proposalNd, err := cborutil.AsIpld(proposal.DealProposal)
if err != nil {
return MinerDeal{}, err
}
2019-08-06 23:08:34 +00:00
return MinerDeal{
Client: s.Conn().RemotePeer(),
2019-11-06 17:38:42 +00:00
Proposal: *proposal.DealProposal,
ProposalCid: proposalNd.Cid(),
2019-09-10 14:13:24 +00:00
State: api.DealUnknown,
2019-11-06 17:38:42 +00:00
Ref: proposal.Piece,
2019-08-06 23:08:34 +00:00
s: s,
}, nil
}
func (p *Provider) HandleStream(s inet.Stream) {
log.Info("Handling storage deal proposal!")
proposal, err := p.readProposal(s)
if err != nil {
log.Error(err)
s.Close()
2019-08-02 14:09:54 +00:00
return
}
deal, err := p.newDeal(s, proposal)
2019-08-06 22:04:21 +00:00
if err != nil {
2019-11-06 23:59:56 +00:00
log.Errorf("%+v", err)
s.Close()
2019-08-06 22:04:21 +00:00
return
}
p.incoming <- deal
2019-08-02 14:09:54 +00:00
}
2019-08-06 23:08:34 +00:00
func (p *Provider) Stop() {
close(p.stop)
<-p.stopped
2019-08-06 23:08:34 +00:00
}