lotus/chain/deals/handler.go

216 lines
4.5 KiB
Go
Raw Normal View History

2019-08-02 14:09:54 +00:00
package deals
import (
2019-08-06 22:04:21 +00:00
"context"
"math"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
cbor "github.com/ipfs/go-ipld-cbor"
2019-08-02 14:09:54 +00:00
inet "github.com/libp2p/go-libp2p-core/network"
2019-08-06 22:04:21 +00:00
"github.com/libp2p/go-libp2p-core/peer"
2019-09-10 12:35:43 +00:00
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/types"
"github.com/filecoin-project/go-lotus/node/modules/dtypes"
"github.com/filecoin-project/go-lotus/storage/sectorblocks"
2019-08-02 14:09:54 +00:00
)
2019-08-06 23:08:34 +00:00
func init() {
cbor.RegisterCborType(MinerDeal{})
}
2019-08-06 22:04:21 +00:00
type MinerDeal struct {
Client peer.ID
Proposal StorageDealProposal
ProposalCid cid.Cid
2019-09-10 14:13:24 +00:00
State api.DealState
2019-08-06 22:04:21 +00:00
Ref cid.Cid
2019-09-10 14:13:24 +00:00
SectorID uint64 // Set when State >= DealStaged
2019-08-12 21:48:18 +00:00
s inet.Stream
2019-08-06 22:04:21 +00:00
}
2019-08-02 14:09:54 +00:00
type Handler struct {
2019-09-06 22:39:47 +00:00
pricePerByteBlock types.BigInt // how much we want for storing one byte for one block
2019-08-26 10:04:57 +00:00
secst *sectorblocks.SectorBlocks
2019-08-14 20:27:10 +00:00
full api.FullNode
2019-08-06 22:04:21 +00:00
// TODO: Use a custom protocol or graphsync in the future
// TODO: GC
dag dtypes.StagingDAG
deals StateStore
conns map[cid.Cid]inet.Stream
2019-08-06 22:04:21 +00:00
2019-08-06 23:08:34 +00:00
actor address.Address
incoming chan MinerDeal
2019-09-10 12:35:43 +00:00
updated chan minerDealUpdate
stop chan struct{}
stopped chan struct{}
2019-08-02 14:09:54 +00:00
}
2019-09-10 12:35:43 +00:00
type minerDealUpdate struct {
2019-09-10 14:13:24 +00:00
newState api.DealState
id cid.Cid
err error
2019-08-12 21:48:18 +00:00
mut func(*MinerDeal)
2019-08-07 11:46:09 +00:00
}
2019-08-26 10:04:57 +00:00
func NewHandler(ds dtypes.MetadataDS, secst *sectorblocks.SectorBlocks, dag dtypes.StagingDAG, fullNode api.FullNode) (*Handler, error) {
2019-08-06 23:08:34 +00:00
addr, err := ds.Get(datastore.NewKey("miner-address"))
if err != nil {
return nil, err
}
minerAddress, err := address.NewFromBytes(addr)
if err != nil {
return nil, err
}
return &Handler{
2019-08-14 20:27:10 +00:00
secst: secst,
dag: dag,
full: fullNode,
2019-08-06 22:04:21 +00:00
2019-09-06 22:39:47 +00:00
pricePerByteBlock: types.NewInt(3), // TODO: allow setting
conns: map[cid.Cid]inet.Stream{},
2019-08-06 23:08:34 +00:00
incoming: make(chan MinerDeal),
2019-09-10 12:35:43 +00:00
updated: make(chan minerDealUpdate),
stop: make(chan struct{}),
stopped: make(chan struct{}),
2019-08-06 23:08:34 +00:00
actor: minerAddress,
2019-08-06 22:04:21 +00:00
deals: StateStore{ds: namespace.Wrap(ds, datastore.NewKey("/deals/client"))},
2019-08-06 23:08:34 +00:00
}, nil
2019-08-02 14:09:54 +00:00
}
2019-08-06 22:04:21 +00:00
func (h *Handler) Run(ctx context.Context) {
2019-08-07 20:16:26 +00:00
// TODO: restore state
2019-08-06 22:04:21 +00:00
go func() {
2019-08-06 23:08:34 +00:00
defer log.Error("quitting deal handler loop")
2019-08-06 22:04:21 +00:00
defer close(h.stopped)
for {
select {
2019-09-10 14:13:24 +00:00
case deal := <-h.incoming: // DealAccepted
h.onIncoming(deal)
2019-09-10 14:13:24 +00:00
case update := <-h.updated: // DealStaged
h.onUpdated(ctx, update)
2019-08-06 22:04:21 +00:00
case <-h.stop:
return
}
}
}()
}
func (h *Handler) onIncoming(deal MinerDeal) {
log.Info("incoming deal")
2019-08-02 14:09:54 +00:00
h.conns[deal.ProposalCid] = deal.s
2019-08-02 16:25:10 +00:00
if err := h.deals.Begin(deal.ProposalCid, deal); err != nil {
// This can happen when client re-sends proposal
h.failDeal(deal.ProposalCid, err)
log.Errorf("deal tracking failed: %s", err)
2019-08-02 14:09:54 +00:00
return
}
go func() {
2019-09-10 12:35:43 +00:00
h.updated <- minerDealUpdate{
2019-09-10 14:13:24 +00:00
newState: api.DealAccepted,
id: deal.ProposalCid,
err: nil,
}
}()
}
2019-08-06 23:08:34 +00:00
2019-09-10 12:35:43 +00:00
func (h *Handler) onUpdated(ctx context.Context, update minerDealUpdate) {
log.Infof("Deal %s updated state to %d", update.id, update.newState)
if update.err != nil {
log.Errorf("deal %s failed: %s", update.id, update.err)
h.failDeal(update.id, update.err)
2019-08-06 22:04:21 +00:00
return
}
var deal MinerDeal
err := h.deals.MutateMiner(update.id, func(d *MinerDeal) error {
d.State = update.newState
2019-08-12 21:48:18 +00:00
if update.mut != nil {
update.mut(d)
}
deal = *d
return nil
})
if err != nil {
h.failDeal(update.id, err)
return
}
switch update.newState {
2019-09-10 14:13:24 +00:00
case api.DealAccepted:
h.handle(ctx, deal, h.accept, api.DealStaged)
case api.DealStaged:
h.handle(ctx, deal, h.staged, api.DealSealing)
case api.DealSealing:
h.handle(ctx, deal, h.sealing, api.DealComplete)
2019-08-02 14:09:54 +00:00
}
}
func (h *Handler) newDeal(s inet.Stream, proposal StorageDealProposal) (MinerDeal, error) {
// TODO: Review: Not signed?
proposalNd, err := cbor.WrapObject(proposal, math.MaxUint64, -1)
if err != nil {
return MinerDeal{}, err
}
2019-08-06 23:08:34 +00:00
ref, err := cid.Parse(proposal.PieceRef)
2019-08-06 23:08:34 +00:00
if err != nil {
return MinerDeal{}, err
2019-08-06 23:08:34 +00:00
}
return MinerDeal{
Client: s.Conn().RemotePeer(),
Proposal: proposal,
ProposalCid: proposalNd.Cid(),
2019-09-10 14:13:24 +00:00
State: api.DealUnknown,
Ref: ref,
2019-08-06 23:08:34 +00:00
s: s,
}, nil
}
func (h *Handler) HandleStream(s inet.Stream) {
log.Info("Handling storage deal proposal!")
proposal, err := h.readProposal(s)
if err != nil {
log.Error(err)
s.Close()
2019-08-02 14:09:54 +00:00
return
}
deal, err := h.newDeal(s, proposal.Proposal)
2019-08-06 22:04:21 +00:00
if err != nil {
2019-08-06 23:08:34 +00:00
log.Error(err)
s.Close()
2019-08-06 22:04:21 +00:00
return
}
h.incoming <- deal
2019-08-02 14:09:54 +00:00
}
2019-08-06 23:08:34 +00:00
func (h *Handler) Stop() {
close(h.stop)
<-h.stopped
}