182 lines
3.5 KiB
Go
182 lines
3.5 KiB
Go
package deals
|
|
|
|
import (
|
|
"context"
|
|
"io/ioutil"
|
|
"os"
|
|
"sync/atomic"
|
|
|
|
sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
|
|
"github.com/ipfs/go-cid"
|
|
cbor "github.com/ipfs/go-ipld-cbor"
|
|
logging "github.com/ipfs/go-log"
|
|
"github.com/libp2p/go-libp2p-core/host"
|
|
"github.com/libp2p/go-libp2p-core/peer"
|
|
|
|
"github.com/filecoin-project/go-lotus/chain/address"
|
|
"github.com/filecoin-project/go-lotus/chain/store"
|
|
"github.com/filecoin-project/go-lotus/chain/types"
|
|
"github.com/filecoin-project/go-lotus/chain/wallet"
|
|
"github.com/filecoin-project/go-lotus/lib/cborrpc"
|
|
)
|
|
|
|
var log = logging.Logger("deals")
|
|
|
|
const ProtocolID = "/fil/storage/mk/1.0.0"
|
|
|
|
type DealStatus int
|
|
|
|
const (
|
|
DealResolvingMiner = DealStatus(iota)
|
|
)
|
|
|
|
type Deal struct {
|
|
ID uint64
|
|
Status DealStatus
|
|
Miner peer.ID
|
|
}
|
|
|
|
type Client struct {
|
|
cs *store.ChainStore
|
|
h host.Host
|
|
w *wallet.Wallet
|
|
|
|
next uint64
|
|
deals map[uint64]Deal
|
|
|
|
incoming chan Deal
|
|
|
|
stop chan struct{}
|
|
stopped chan struct{}
|
|
}
|
|
|
|
func NewClient(cs *store.ChainStore, h host.Host, w *wallet.Wallet) *Client {
|
|
c := &Client{
|
|
cs: cs,
|
|
h: h,
|
|
w: w,
|
|
|
|
deals: map[uint64]Deal{},
|
|
|
|
incoming: make(chan Deal, 16),
|
|
|
|
stop: make(chan struct{}),
|
|
stopped: make(chan struct{}),
|
|
}
|
|
|
|
return c
|
|
}
|
|
|
|
func (c *Client) Run() {
|
|
go func() {
|
|
defer close(c.stopped)
|
|
|
|
for {
|
|
select {
|
|
case deal := <-c.incoming:
|
|
log.Info("incoming deal")
|
|
|
|
// TODO: track in datastore
|
|
c.deals[deal.ID] = deal
|
|
|
|
case <-c.stop:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (c *Client) Start(ctx context.Context, data cid.Cid, totalPrice types.BigInt, from address.Address, miner address.Address, minerID peer.ID, blocksDuration uint64) (uint64, error) {
|
|
// TODO: Eww
|
|
f, err := ioutil.TempFile(os.TempDir(), "commP-temp-")
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
_, err = f.Write([]byte("hello\n"))
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
if err := f.Close(); err != nil {
|
|
return 0, err
|
|
}
|
|
commP, err := sectorbuilder.GeneratePieceCommitment(f.Name(), 6)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
if err := os.Remove(f.Name()); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
dummyCid, _ := cid.Parse("bafkqaaa")
|
|
|
|
// TODO: use data
|
|
proposal := StorageDealProposal{
|
|
PieceRef: "bafkqabtimvwgy3yk", // identity 'hello\n'
|
|
SerializationMode: SerializationRaw,
|
|
CommP: commP[:],
|
|
Size: 6,
|
|
TotalPrice: totalPrice,
|
|
Duration: blocksDuration,
|
|
Payment: PaymentInfo{
|
|
PayChActor: address.Address{},
|
|
Payer: address.Address{},
|
|
Channel: 0,
|
|
ChannelMessage: dummyCid,
|
|
Vouchers: nil,
|
|
},
|
|
MinerAddress: miner,
|
|
ClientAddress: from,
|
|
}
|
|
|
|
s, err := c.h.NewStream(ctx, minerID, ProtocolID)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
defer s.Close() // TODO: not too soon?
|
|
|
|
log.Info("Sending deal proposal")
|
|
|
|
msg, err := cbor.DumpObject(proposal)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
sig, err := c.w.Sign(from, msg)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
signedProposal := &SignedStorageDealProposal{
|
|
Proposal: proposal,
|
|
Signature: sig,
|
|
}
|
|
|
|
if err := cborrpc.WriteCborRPC(s, signedProposal); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
log.Info("Reading response")
|
|
|
|
var resp SignedStorageDealResponse
|
|
if err := cborrpc.ReadCborRPC(s, &resp); err != nil {
|
|
log.Errorw("failed to read StorageDealResponse message", "error", err)
|
|
return 0, err
|
|
}
|
|
|
|
log.Info("Registering deal")
|
|
|
|
id := atomic.AddUint64(&c.next, 1)
|
|
deal := Deal{
|
|
ID: id,
|
|
Status: DealResolvingMiner,
|
|
Miner: minerID,
|
|
}
|
|
|
|
c.incoming <- deal
|
|
return id, nil
|
|
}
|
|
|
|
func (c *Client) Stop() {
|
|
close(c.stop)
|
|
<-c.stopped
|
|
}
|