package deals import ( "context" "errors" "sync" cid "github.com/ipfs/go-cid" datastore "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" inet "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-cbor-util" "github.com/filecoin-project/go-statestore" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/datatransfer" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/storage" "github.com/filecoin-project/lotus/storage/sectorblocks" ) var ProviderDsPrefix = "/deals/provider" type MinerDeal struct { Client peer.ID Proposal actors.StorageDealProposal ProposalCid cid.Cid State api.DealState Ref cid.Cid DealID uint64 SectorID uint64 // Set when State >= DealStaged s inet.Stream } type Provider struct { pricePerByteBlock types.BigInt // how much we want for storing one byte for one block minPieceSize uint64 ask *types.SignedStorageAsk askLk sync.Mutex secb *sectorblocks.SectorBlocks sminer *storage.Miner full api.FullNode // TODO: This will go away once storage market module + CAR // is implemented dag dtypes.StagingDAG // dataTransfer is the manager of data transfers used by this storage provider dataTransfer dtypes.ProviderDataTransfer deals *statestore.StateStore ds dtypes.MetadataDS conns map[cid.Cid]inet.Stream actor address.Address incoming chan MinerDeal updated chan minerDealUpdate stop chan struct{} stopped chan struct{} } type minerDealUpdate struct { newState api.DealState id cid.Cid err error mut func(*MinerDeal) } var ( // ErrDataTransferFailed means a data transfer for a deal failed ErrDataTransferFailed = errors.New("deal data transfer failed") ) func NewProvider(ds dtypes.MetadataDS, sminer *storage.Miner, secb *sectorblocks.SectorBlocks, dag dtypes.StagingDAG, dataTransfer dtypes.ProviderDataTransfer, fullNode api.FullNode) (*Provider, error) { addr, err := ds.Get(datastore.NewKey("miner-address")) if err != nil { return nil, err } minerAddress, err := address.NewFromBytes(addr) if err != nil { return nil, err } h := &Provider{ sminer: sminer, dag: dag, dataTransfer: dataTransfer, full: fullNode, secb: secb, pricePerByteBlock: types.NewInt(3), // TODO: allow setting minPieceSize: 256, // TODO: allow setting (BUT KEEP MIN 256! (because of how we fill sectors up)) conns: map[cid.Cid]inet.Stream{}, incoming: make(chan MinerDeal), updated: make(chan minerDealUpdate), stop: make(chan struct{}), stopped: make(chan struct{}), actor: minerAddress, deals: statestore.New(namespace.Wrap(ds, datastore.NewKey(ProviderDsPrefix))), ds: ds, } if err := h.tryLoadAsk(); err != nil { return nil, err } if h.ask == nil { // TODO: we should be fine with this state, and just say it means 'not actively accepting deals' // for now... lets just set a price if err := h.SetPrice(types.NewInt(500_000_000), 1000000); err != nil { return nil, xerrors.Errorf("failed setting a default price: %w", err) } } // register a data transfer event handler -- this will move deals from // accepted to staged h.dataTransfer.SubscribeToEvents(h.onDataTransferEvent) return h, nil } func (p *Provider) Run(ctx context.Context) { // TODO: restore state go func() { defer log.Warn("quitting deal provider loop") defer close(p.stopped) for { select { case deal := <-p.incoming: // DealAccepted p.onIncoming(deal) case update := <-p.updated: // DealStaged p.onUpdated(ctx, update) case <-p.stop: return } } }() } func (p *Provider) onIncoming(deal MinerDeal) { log.Info("incoming deal") p.conns[deal.ProposalCid] = deal.s if err := p.deals.Begin(deal.ProposalCid, &deal); err != nil { // This can happen when client re-sends proposal p.failDeal(deal.ProposalCid, err) log.Errorf("deal tracking failed: %s", err) return } go func() { p.updated <- minerDealUpdate{ newState: api.DealAccepted, id: deal.ProposalCid, err: nil, } }() } func (p *Provider) onUpdated(ctx context.Context, update minerDealUpdate) { log.Infof("Deal %s updated state to %s", update.id, api.DealStates[update.newState]) if update.err != nil { log.Errorf("deal %s (newSt: %d) failed: %+v", update.id, update.newState, update.err) p.failDeal(update.id, update.err) return } var deal MinerDeal err := p.deals.Mutate(update.id, func(d *MinerDeal) error { d.State = update.newState if update.mut != nil { update.mut(d) } deal = *d return nil }) if err != nil { p.failDeal(update.id, err) return } switch update.newState { case api.DealAccepted: p.handle(ctx, deal, p.accept, api.DealNoUpdate) case api.DealStaged: p.handle(ctx, deal, p.staged, api.DealSealing) case api.DealSealing: p.handle(ctx, deal, p.sealing, api.DealComplete) case api.DealComplete: p.handle(ctx, deal, p.complete, api.DealNoUpdate) } } // onDataTransferEvent is the function called when an event occurs in a data // transfer -- it reads the voucher to verify this even occurred in a storage // market deal, then, based on the data transfer event that occurred, it generates // and update message for the deal -- either moving to staged for a completion // event or moving to error if a data transfer error occurs func (p *Provider) onDataTransferEvent(event datatransfer.Event, channelState datatransfer.ChannelState) { voucher, ok := channelState.Voucher().(*StorageDataTransferVoucher) // if this event is for a transfer not related to storage, ignore if !ok { return } // data transfer events for opening and progress do not affect deal state var next api.DealState var err error var mut func(*MinerDeal) switch event { case datatransfer.Complete: next = api.DealStaged mut = func(deal *MinerDeal) { deal.DealID = voucher.DealID } case datatransfer.Error: next = api.DealFailed err = ErrDataTransferFailed default: // the only events we care about are complete and error return } select { case p.updated <- minerDealUpdate{ newState: next, id: voucher.Proposal, err: err, mut: mut, }: case <-p.stop: } } func (p *Provider) newDeal(s inet.Stream, proposal Proposal) (MinerDeal, error) { proposalNd, err := cborutil.AsIpld(proposal.DealProposal) if err != nil { return MinerDeal{}, err } return MinerDeal{ Client: s.Conn().RemotePeer(), Proposal: *proposal.DealProposal, ProposalCid: proposalNd.Cid(), State: api.DealUnknown, Ref: proposal.Piece, s: s, }, nil } func (p *Provider) HandleStream(s inet.Stream) { log.Info("Handling storage deal proposal!") proposal, err := p.readProposal(s) if err != nil { log.Error(err) s.Close() return } deal, err := p.newDeal(s, proposal) if err != nil { log.Errorf("%+v", err) s.Close() return } p.incoming <- deal } func (p *Provider) Stop() { close(p.stop) <-p.stopped }