lotus/chain/deals/client.go

157 lines
3.0 KiB
Go
Raw Normal View History

2019-08-01 17:12:41 +00:00
package deals
import (
"context"
2019-08-02 14:09:54 +00:00
"io/ioutil"
"os"
2019-08-01 17:12:41 +00:00
"sync/atomic"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
2019-08-02 14:09:54 +00:00
"github.com/libp2p/go-libp2p-core/host"
2019-08-01 17:12:41 +00:00
"github.com/libp2p/go-libp2p-core/peer"
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/store"
"github.com/filecoin-project/go-lotus/chain/types"
2019-08-02 14:09:54 +00:00
"github.com/filecoin-project/go-lotus/lib/cborrpc"
"github.com/filecoin-project/go-lotus/lib/sectorbuilder"
2019-08-01 17:12:41 +00:00
)
var log = logging.Logger("deals")
2019-08-02 14:09:54 +00:00
const ProtocolID = "/fil/storage/mk/1.0.0"
2019-08-01 17:12:41 +00:00
type DealStatus int
const (
DealResolvingMiner = DealStatus(iota)
)
type Deal struct {
ID uint64
Status DealStatus
2019-08-02 14:09:54 +00:00
Miner peer.ID
2019-08-01 17:12:41 +00:00
}
type Client struct {
cs *store.ChainStore
2019-08-02 14:09:54 +00:00
sb *sectorbuilder.SectorBuilder
h host.Host
2019-08-01 17:12:41 +00:00
next uint64
deals map[uint64]Deal
incoming chan Deal
stop chan struct{}
stopped chan struct{}
}
func NewClient(cs *store.ChainStore) *Client {
c := &Client{
cs: cs,
deals: map[uint64]Deal{},
incoming: make(chan Deal, 16),
stop: make(chan struct{}),
stopped: make(chan struct{}),
}
return c
}
func (c *Client) Run() {
go func() {
defer close(c.stopped)
for {
select {
case deal := <-c.incoming:
log.Info("incoming deal")
2019-08-02 14:09:54 +00:00
// TODO: track in datastore
2019-08-01 17:12:41 +00:00
c.deals[deal.ID] = deal
case <-c.stop:
return
}
}
}()
}
2019-08-02 14:09:54 +00:00
func (c *Client) Start(ctx context.Context, data cid.Cid, totalPrice types.BigInt, from address.Address, miner address.Address, minerID peer.ID, blocksDuration uint64) (uint64, error) {
// TODO: Eww
f, err := ioutil.TempFile(os.TempDir(), "commP-temp-")
2019-08-01 17:12:41 +00:00
if err != nil {
return 0, err
}
2019-08-02 14:09:54 +00:00
_, err = f.Write([]byte("hello\n"))
2019-08-01 17:12:41 +00:00
if err != nil {
2019-08-02 14:09:54 +00:00
return 0, err
}
if err := f.Close(); err != nil {
return 0, err
}
commP, err := c.sb.GeneratePieceCommitment(f.Name(), 6)
if err != nil {
return 0, err
}
if err := os.Remove(f.Name()); err != nil {
return 0, err
2019-08-01 17:12:41 +00:00
}
2019-08-02 14:09:54 +00:00
// TODO: use data
proposal := StorageDealProposal{
PieceRef: "bafkqabtimvwgy3yk", // identity 'hello\n'
SerializationMode: SerializationRaw,
CommP: commP[:],
Size: 6,
TotalPrice: totalPrice,
Duration: blocksDuration,
Payment: nil, // TODO
MinerAddress: miner,
ClientAddress: from,
2019-08-01 17:12:41 +00:00
}
2019-08-02 14:09:54 +00:00
s, err := c.h.NewStream(ctx, minerID, ProtocolID)
2019-08-01 17:12:41 +00:00
if err != nil {
return 0, err
}
2019-08-02 14:09:54 +00:00
defer s.Close() // TODO: not too soon?
log.Info("Sending deal proposal")
signedProposal := &SignedStorageDealProposal{
Proposal: proposal,
Signature: nil, // TODO: SIGN!
2019-08-01 17:12:41 +00:00
}
2019-08-02 14:09:54 +00:00
if err := cborrpc.WriteCborRPC(s, signedProposal); err != nil {
2019-08-01 17:12:41 +00:00
return 0, err
}
2019-08-02 14:09:54 +00:00
var resp SignedStorageDealResponse
if err := cborrpc.ReadCborRPC(s, &resp); err != nil {
log.Errorw("failed to read StorageDealResponse message", "error", err)
return 0, err
}
2019-08-01 17:12:41 +00:00
id := atomic.AddUint64(&c.next, 1)
deal := Deal{
ID: id,
Status: DealResolvingMiner,
2019-08-02 14:09:54 +00:00
Miner: minerID,
2019-08-01 17:12:41 +00:00
}
c.incoming <- deal
return id, nil
}
func (c *Client) Stop() {
close(c.stop)
<-c.stopped
}