deals: Rewrite handler with error handling
This commit is contained in:
parent
3e2d04a540
commit
25dbdd761a
@ -2,23 +2,16 @@ package deals
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"math"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-lotus/chain/address"
|
"github.com/filecoin-project/go-lotus/chain/address"
|
||||||
|
"github.com/filecoin-project/go-lotus/chain/wallet"
|
||||||
"github.com/filecoin-project/go-lotus/lib/sectorbuilder"
|
"github.com/filecoin-project/go-lotus/lib/sectorbuilder"
|
||||||
|
"github.com/filecoin-project/go-lotus/node/modules/dtypes"
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
"github.com/ipfs/go-datastore"
|
"github.com/ipfs/go-datastore"
|
||||||
"github.com/ipfs/go-datastore/namespace"
|
"github.com/ipfs/go-datastore/namespace"
|
||||||
files "github.com/ipfs/go-ipfs-files"
|
|
||||||
"github.com/ipfs/go-merkledag"
|
|
||||||
unixfile "github.com/ipfs/go-unixfs/file"
|
|
||||||
"io"
|
|
||||||
"io/ioutil"
|
|
||||||
"math"
|
|
||||||
"os"
|
|
||||||
|
|
||||||
"github.com/filecoin-project/go-lotus/chain/wallet"
|
|
||||||
"github.com/filecoin-project/go-lotus/lib/cborrpc"
|
|
||||||
"github.com/filecoin-project/go-lotus/node/modules/dtypes"
|
|
||||||
|
|
||||||
cbor "github.com/ipfs/go-ipld-cbor"
|
cbor "github.com/ipfs/go-ipld-cbor"
|
||||||
inet "github.com/libp2p/go-libp2p-core/network"
|
inet "github.com/libp2p/go-libp2p-core/network"
|
||||||
"github.com/libp2p/go-libp2p-core/peer"
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
@ -35,6 +28,8 @@ type MinerDeal struct {
|
|||||||
State DealState
|
State DealState
|
||||||
|
|
||||||
Ref cid.Cid
|
Ref cid.Cid
|
||||||
|
|
||||||
|
s inet.Stream
|
||||||
}
|
}
|
||||||
|
|
||||||
type Handler struct {
|
type Handler struct {
|
||||||
@ -46,18 +41,20 @@ type Handler struct {
|
|||||||
dag dtypes.StagingDAG
|
dag dtypes.StagingDAG
|
||||||
|
|
||||||
deals StateStore
|
deals StateStore
|
||||||
|
conns map[cid.Cid]inet.Stream
|
||||||
incoming chan MinerDeal
|
|
||||||
|
|
||||||
actor address.Address
|
actor address.Address
|
||||||
|
|
||||||
stop chan struct{}
|
incoming chan MinerDeal
|
||||||
stopped chan struct{}
|
updated chan dealUpdate
|
||||||
|
stop chan struct{}
|
||||||
|
stopped chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
type fetchResult struct {
|
type dealUpdate struct {
|
||||||
id cid.Cid
|
newState DealState
|
||||||
err error
|
id cid.Cid
|
||||||
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewHandler(w *wallet.Wallet, ds dtypes.MetadataDS, sb *sectorbuilder.SectorBuilder, dag dtypes.StagingDAG) (*Handler, error) {
|
func NewHandler(w *wallet.Wallet, ds dtypes.MetadataDS, sb *sectorbuilder.SectorBuilder, dag dtypes.StagingDAG) (*Handler, error) {
|
||||||
@ -75,7 +72,12 @@ func NewHandler(w *wallet.Wallet, ds dtypes.MetadataDS, sb *sectorbuilder.Sector
|
|||||||
sb: sb,
|
sb: sb,
|
||||||
dag: dag,
|
dag: dag,
|
||||||
|
|
||||||
|
conns: map[cid.Cid]inet.Stream{},
|
||||||
|
|
||||||
incoming: make(chan MinerDeal),
|
incoming: make(chan MinerDeal),
|
||||||
|
updated: make(chan dealUpdate),
|
||||||
|
stop: make(chan struct{}),
|
||||||
|
stopped: make(chan struct{}),
|
||||||
|
|
||||||
actor: minerAddress,
|
actor: minerAddress,
|
||||||
|
|
||||||
@ -87,117 +89,13 @@ func (h *Handler) Run(ctx context.Context) {
|
|||||||
go func() {
|
go func() {
|
||||||
defer log.Error("quitting deal handler loop")
|
defer log.Error("quitting deal handler loop")
|
||||||
defer close(h.stopped)
|
defer close(h.stopped)
|
||||||
fetched := make(chan fetchResult)
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case deal := <-h.incoming:
|
case deal := <-h.incoming: // Accepted
|
||||||
log.Info("incoming deal")
|
h.onIncoming(deal)
|
||||||
|
case update := <-h.updated: // Staged
|
||||||
if err := h.deals.Begin(deal.ProposalCid, deal); err != nil {
|
h.onUpdated(ctx, update)
|
||||||
// TODO: This can happen when client re-sends proposal
|
|
||||||
log.Errorf("deal tracking failed: %s", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
go func(id cid.Cid) {
|
|
||||||
log.Info("fetching data for a deal")
|
|
||||||
err := merkledag.FetchGraph(ctx, deal.Ref, h.dag)
|
|
||||||
select {
|
|
||||||
case fetched <- fetchResult{
|
|
||||||
id: id,
|
|
||||||
err: err,
|
|
||||||
}:
|
|
||||||
case <-h.stop:
|
|
||||||
}
|
|
||||||
}(deal.ProposalCid)
|
|
||||||
case result := <-fetched:
|
|
||||||
if result.err != nil {
|
|
||||||
log.Errorf("failed to fetch data for deal: %s", result.err)
|
|
||||||
// TODO: fail deal
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: send response if client still there
|
|
||||||
// TODO: staging
|
|
||||||
|
|
||||||
// TODO: async
|
|
||||||
log.Info("sealing deal")
|
|
||||||
|
|
||||||
var deal MinerDeal
|
|
||||||
err := h.deals.MutateMiner(result.id, func(in MinerDeal) (MinerDeal, error) {
|
|
||||||
in.State = Sealing
|
|
||||||
deal = in
|
|
||||||
return in, nil
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
// TODO: fail deal
|
|
||||||
log.Errorf("deal tracking failed (set sealing): %s", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
root, err := h.dag.Get(ctx, deal.Ref)
|
|
||||||
if err != nil {
|
|
||||||
// TODO: fail deal
|
|
||||||
log.Errorf("failed to get file root for deal: %s", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: abstract this away into ReadSizeCloser + implement different modes
|
|
||||||
n, err := unixfile.NewUnixfsFile(ctx, h.dag, root)
|
|
||||||
if err != nil {
|
|
||||||
// TODO: fail deal
|
|
||||||
log.Errorf("cannot open unixfs file: %s", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
uf, ok := n.(files.File)
|
|
||||||
if !ok {
|
|
||||||
// TODO: we probably got directory, how should we handle this in unixfs mode?
|
|
||||||
log.Errorf("unsupported unixfs type")
|
|
||||||
// TODO: fail deal
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
size, err := uf.Size()
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("failed to get file size: %s", err)
|
|
||||||
// TODO: fail deal
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
//////////////
|
|
||||||
|
|
||||||
f, err := ioutil.TempFile(os.TempDir(), "piece-temp-")
|
|
||||||
if err != nil {
|
|
||||||
log.Error(err)
|
|
||||||
// TODO: fail deal
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if _, err := io.Copy(f, uf); err != nil {
|
|
||||||
log.Error(err)
|
|
||||||
// TODO: fail deal
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if err := f.Close(); err != nil {
|
|
||||||
log.Error(err)
|
|
||||||
// TODO: fail deal
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
sectorID, err := h.sb.AddPiece(deal.Proposal.PieceRef, uint64(size), f.Name())
|
|
||||||
if err != nil {
|
|
||||||
// TODO: fail deal
|
|
||||||
log.Errorf("AddPiece failed: %s", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if err := os.Remove(f.Name()); err != nil {
|
|
||||||
log.Error(err)
|
|
||||||
// TODO: fail deal
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Warnf("New Sector: %d", sectorID)
|
|
||||||
|
|
||||||
// TODO: update state, tell client
|
|
||||||
case <-h.stop:
|
case <-h.stop:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -205,100 +103,97 @@ func (h *Handler) Run(ctx context.Context) {
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) HandleStream(s inet.Stream) {
|
func (h *Handler) onIncoming(deal MinerDeal) {
|
||||||
defer s.Close()
|
log.Info("incoming deal")
|
||||||
|
|
||||||
log.Info("Handling storage deal proposal!")
|
h.conns[deal.ProposalCid] = deal.s
|
||||||
|
|
||||||
var proposal SignedStorageDealProposal
|
if err := h.deals.Begin(deal.ProposalCid, deal); err != nil {
|
||||||
if err := cborrpc.ReadCborRPC(s, &proposal); err != nil {
|
// This can happen when client re-sends proposal
|
||||||
log.Errorw("failed to read proposal message", "error", err)
|
h.failDeal(deal.ProposalCid, err)
|
||||||
|
log.Errorf("deal tracking failed: %s", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Validate proposal maybe
|
go func() {
|
||||||
// (and signature, obviously)
|
h.updated <- dealUpdate{
|
||||||
|
newState: Accepted,
|
||||||
|
id: deal.ProposalCid,
|
||||||
|
err: nil,
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
if proposal.Proposal.MinerAddress != h.actor {
|
func (h *Handler) onUpdated(ctx context.Context, update dealUpdate) {
|
||||||
log.Errorf("proposal with wrong MinerAddress: %s", proposal.Proposal.MinerAddress)
|
log.Infof("Deal %s updated state to %d", update.id, update.newState)
|
||||||
// TODO: send error
|
if update.err != nil {
|
||||||
|
log.Errorf("deal %s failed: %s", update.id, update.err)
|
||||||
|
h.failDeal(update.id, update.err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var deal MinerDeal
|
||||||
|
err := h.deals.MutateMiner(update.id, func(d *MinerDeal) error {
|
||||||
|
d.State = update.newState
|
||||||
|
deal = *d
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
h.failDeal(update.id, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
switch proposal.Proposal.SerializationMode {
|
switch update.newState {
|
||||||
//case SerializationRaw:
|
case Accepted:
|
||||||
//case SerializationIPLD:
|
h.handle(ctx, deal, h.accept, Staged)
|
||||||
case SerializationUnixFs:
|
case Staged:
|
||||||
default:
|
h.handle(ctx, deal, h.staged, Sealing)
|
||||||
log.Errorf("deal proposal with unsupported serialization: %s", proposal.Proposal.SerializationMode)
|
case Sealing:
|
||||||
// TODO: send error
|
log.Error("TODO")
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) newDeal(s inet.Stream, proposal StorageDealProposal) (MinerDeal, error) {
|
||||||
// TODO: Review: Not signed?
|
// TODO: Review: Not signed?
|
||||||
proposalNd, err := cbor.WrapObject(proposal.Proposal, math.MaxUint64, -1)
|
proposalNd, err := cbor.WrapObject(proposal, math.MaxUint64, -1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(err)
|
return MinerDeal{}, err
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
response := StorageDealResponse{
|
ref, err := cid.Parse(proposal.PieceRef)
|
||||||
State: Accepted,
|
|
||||||
Message: "",
|
|
||||||
Proposal: proposalNd.Cid(),
|
|
||||||
}
|
|
||||||
|
|
||||||
msg, err := cbor.DumpObject(response)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorw("failed to serialize response message", "error", err)
|
return MinerDeal{}, err
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
def, err := h.w.ListAddrs()
|
return MinerDeal{
|
||||||
if err != nil {
|
|
||||||
log.Error(err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if len(def) != 1 {
|
|
||||||
// NOTE: If this ever happens for a good reason, implement this with GetWorker on the miner actor
|
|
||||||
// TODO: implement with GetWorker on the miner actor
|
|
||||||
log.Errorf("Expected only 1 address in wallet, got %d", len(def))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
sig, err := h.w.Sign(def[0], msg)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorw("failed to sign response message", "error", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Info("accepting deal")
|
|
||||||
|
|
||||||
signedResponse := &SignedStorageDealResponse{
|
|
||||||
Response: response,
|
|
||||||
Signature: sig,
|
|
||||||
}
|
|
||||||
if err := cborrpc.WriteCborRPC(s, signedResponse); err != nil {
|
|
||||||
log.Errorw("failed to write deal response", "error", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
ref, err := cid.Parse(proposal.Proposal.PieceRef)
|
|
||||||
if err != nil {
|
|
||||||
log.Error(err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Info("processing deal")
|
|
||||||
|
|
||||||
h.incoming <- MinerDeal{
|
|
||||||
Client: s.Conn().RemotePeer(),
|
Client: s.Conn().RemotePeer(),
|
||||||
Proposal: proposal.Proposal,
|
Proposal: proposal,
|
||||||
ProposalCid: proposalNd.Cid(),
|
ProposalCid: proposalNd.Cid(),
|
||||||
State: Accepted,
|
State: Unknown,
|
||||||
|
|
||||||
Ref: ref,
|
Ref: ref,
|
||||||
|
|
||||||
|
s: s,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) HandleStream(s inet.Stream) {
|
||||||
|
log.Info("Handling storage deal proposal!")
|
||||||
|
|
||||||
|
proposal, err := h.readProposal(s)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
s.Close()
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
deal, err := h.newDeal(s, proposal.Proposal)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
s.Close()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
h.incoming <- deal
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) Stop() {
|
func (h *Handler) Stop() {
|
||||||
|
100
chain/deals/handler_states.go
Normal file
100
chain/deals/handler_states.go
Normal file
@ -0,0 +1,100 @@
|
|||||||
|
package deals
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
files "github.com/ipfs/go-ipfs-files"
|
||||||
|
"github.com/ipfs/go-merkledag"
|
||||||
|
unixfile "github.com/ipfs/go-unixfs/file"
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type handlerFunc func(ctx context.Context, deal MinerDeal) error
|
||||||
|
|
||||||
|
func (h *Handler) handle(ctx context.Context, deal MinerDeal, cb handlerFunc, next DealState) {
|
||||||
|
go func() {
|
||||||
|
err := cb(ctx, deal)
|
||||||
|
select {
|
||||||
|
case h.updated <- dealUpdate{
|
||||||
|
newState: next,
|
||||||
|
id: deal.ProposalCid,
|
||||||
|
err: err,
|
||||||
|
}:
|
||||||
|
case <-h.stop:
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// ACCEPTED
|
||||||
|
|
||||||
|
func (h *Handler) accept(ctx context.Context, deal MinerDeal) error {
|
||||||
|
log.Info("acc")
|
||||||
|
switch deal.Proposal.SerializationMode {
|
||||||
|
//case SerializationRaw:
|
||||||
|
//case SerializationIPLD:
|
||||||
|
case SerializationUnixFs:
|
||||||
|
default:
|
||||||
|
return xerrors.Errorf("deal proposal with unsupported serialization: %s", deal.Proposal.SerializationMode)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info("fetching data for a deal")
|
||||||
|
err := h.sendSignedResponse(StorageDealResponse{
|
||||||
|
State: Accepted,
|
||||||
|
Message: "",
|
||||||
|
Proposal: deal.ProposalCid,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return merkledag.FetchGraph(ctx, deal.Ref, h.dag)
|
||||||
|
}
|
||||||
|
|
||||||
|
// STAGED
|
||||||
|
|
||||||
|
func (h *Handler) staged(ctx context.Context, deal MinerDeal) error {
|
||||||
|
err := h.sendSignedResponse(StorageDealResponse{
|
||||||
|
State: Staged,
|
||||||
|
Message: "",
|
||||||
|
Proposal: deal.ProposalCid,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
root, err := h.dag.Get(ctx, deal.Ref)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("failed to get file root for deal: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: abstract this away into ReadSizeCloser + implement different modes
|
||||||
|
n, err := unixfile.NewUnixfsFile(ctx, h.dag, root)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("cannot open unixfs file: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
uf, ok := n.(files.File)
|
||||||
|
if !ok {
|
||||||
|
// we probably got directory, unsupported for now
|
||||||
|
return xerrors.Errorf("unsupported unixfs type")
|
||||||
|
}
|
||||||
|
|
||||||
|
size, err := uf.Size()
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("failed to get file size: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var sectorID uint64
|
||||||
|
err = withTemp(uf, func(f string) (err error) {
|
||||||
|
sectorID, err = h.sb.AddPiece(deal.Proposal.PieceRef, uint64(size), f)
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("AddPiece failed: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Warnf("New Sector: %d", sectorID)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// SEALING
|
99
chain/deals/handler_utils.go
Normal file
99
chain/deals/handler_utils.go
Normal file
@ -0,0 +1,99 @@
|
|||||||
|
package deals
|
||||||
|
|
||||||
|
import (
|
||||||
|
"runtime"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-lotus/lib/cborrpc"
|
||||||
|
cbor "github.com/ipfs/go-ipld-cbor"
|
||||||
|
inet "github.com/libp2p/go-libp2p-core/network"
|
||||||
|
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (h *Handler) failDeal(id cid.Cid, cerr error) {
|
||||||
|
if err := h.deals.End(id); err != nil {
|
||||||
|
log.Warnf("deals.End: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if cerr == nil {
|
||||||
|
_, f, l, _ := runtime.Caller(1)
|
||||||
|
cerr = xerrors.Errorf("unknown error (fail called at %s:%d)", f, l)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Error("Deal %s failed: %s", id, cerr)
|
||||||
|
|
||||||
|
err := h.sendSignedResponse(StorageDealResponse{
|
||||||
|
State: Failed,
|
||||||
|
Message: cerr.Error(),
|
||||||
|
Proposal: id,
|
||||||
|
})
|
||||||
|
|
||||||
|
s, ok := h.conns[id]
|
||||||
|
if ok {
|
||||||
|
_ = s.Close()
|
||||||
|
delete(h.conns, id)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("notifying client about deal failure: %s")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) readProposal(s inet.Stream) (proposal SignedStorageDealProposal, err error) {
|
||||||
|
if err := cborrpc.ReadCborRPC(s, &proposal); err != nil {
|
||||||
|
log.Errorw("failed to read proposal message", "error", err)
|
||||||
|
return SignedStorageDealProposal{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Validate proposal maybe
|
||||||
|
// (and signature, obviously)
|
||||||
|
|
||||||
|
if proposal.Proposal.MinerAddress != h.actor {
|
||||||
|
log.Errorf("proposal with wrong MinerAddress: %s", proposal.Proposal.MinerAddress)
|
||||||
|
return SignedStorageDealProposal{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) sendSignedResponse(resp StorageDealResponse) error {
|
||||||
|
s, ok := h.conns[resp.Proposal]
|
||||||
|
if !ok {
|
||||||
|
return xerrors.New("couldn't send response: not connected")
|
||||||
|
}
|
||||||
|
|
||||||
|
msg, err := cbor.DumpObject(&resp)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("serializing response: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
def, err := h.w.ListAddrs()
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
return xerrors.Errorf("listing wallet addresses: %w", err)
|
||||||
|
}
|
||||||
|
if len(def) != 1 {
|
||||||
|
// NOTE: If this ever happens for a good reason, implement this with GetWorker on the miner actor
|
||||||
|
// TODO: implement with GetWorker on the miner actor
|
||||||
|
return xerrors.Errorf("Expected only 1 address in wallet, got %d", len(def))
|
||||||
|
}
|
||||||
|
|
||||||
|
sig, err := h.w.Sign(def[0], msg)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("failed to sign response message: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
signedResponse := SignedStorageDealResponse{
|
||||||
|
Response: resp,
|
||||||
|
Signature: sig,
|
||||||
|
}
|
||||||
|
|
||||||
|
err = cborrpc.WriteCborRPC(s, signedResponse)
|
||||||
|
if err != nil {
|
||||||
|
// Assume client disconnected
|
||||||
|
s.Close()
|
||||||
|
delete(h.conns, resp.Proposal)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
@ -11,7 +11,7 @@ type StateStore struct {
|
|||||||
ds datastore.Datastore
|
ds datastore.Datastore
|
||||||
}
|
}
|
||||||
|
|
||||||
func (st *StateStore) Begin(i cid.Cid, s interface{}) error {
|
func (st *StateStore) Begin(i cid.Cid, state interface{}) error {
|
||||||
k := datastore.NewKey(i.String())
|
k := datastore.NewKey(i.String())
|
||||||
has, err := st.ds.Has(k)
|
has, err := st.ds.Has(k)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -21,7 +21,8 @@ func (st *StateStore) Begin(i cid.Cid, s interface{}) error {
|
|||||||
// TODO: uncomment after deals work
|
// TODO: uncomment after deals work
|
||||||
//return xerrors.Errorf("Already tracking state for %s", i)
|
//return xerrors.Errorf("Already tracking state for %s", i)
|
||||||
}
|
}
|
||||||
b, err := cbor.DumpObject(s)
|
|
||||||
|
b, err := cbor.DumpObject(state)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -43,45 +44,43 @@ func (st *StateStore) End(i cid.Cid) error {
|
|||||||
|
|
||||||
// When this gets used anywhere else, migrate to reflect
|
// When this gets used anywhere else, migrate to reflect
|
||||||
|
|
||||||
func (st *StateStore) MutateMiner(i cid.Cid, mutator func(MinerDeal) (MinerDeal, error)) error {
|
func (st *StateStore) MutateMiner(i cid.Cid, mutator func(*MinerDeal) error) error {
|
||||||
return st.mutate(i, minerMutator(mutator))
|
return st.mutate(i, minerMutator(mutator))
|
||||||
}
|
}
|
||||||
|
|
||||||
func minerMutator(m func(MinerDeal) (MinerDeal, error)) func([]byte) ([]byte, error) {
|
func minerMutator(m func(*MinerDeal) error) func([]byte) ([]byte, error) {
|
||||||
return func(in []byte) ([]byte, error) {
|
return func(in []byte) ([]byte, error) {
|
||||||
var cur MinerDeal
|
var deal MinerDeal
|
||||||
err := cbor.DecodeInto(in, &cur)
|
err := cbor.DecodeInto(in, &deal)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
mutated, err := m(cur)
|
if err := m(&deal); err != nil {
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return cbor.DumpObject(mutated)
|
return cbor.DumpObject(deal)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (st *StateStore) MutateClient(i cid.Cid, mutator func(ClientDeal) (ClientDeal, error)) error {
|
func (st *StateStore) MutateClient(i cid.Cid, mutator func(*ClientDeal) error) error {
|
||||||
return st.mutate(i, clientMutator(mutator))
|
return st.mutate(i, clientMutator(mutator))
|
||||||
}
|
}
|
||||||
|
|
||||||
func clientMutator(m func(ClientDeal) (ClientDeal, error)) func([]byte) ([]byte, error) {
|
func clientMutator(m func(*ClientDeal) error) func([]byte) ([]byte, error) {
|
||||||
return func(in []byte) ([]byte, error) {
|
return func(in []byte) ([]byte, error) {
|
||||||
var cur ClientDeal
|
var deal ClientDeal
|
||||||
err := cbor.DecodeInto(in, &cur)
|
err := cbor.DecodeInto(in, &deal)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
mutated, err := m(cur)
|
if err := m(&deal); err != nil {
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return cbor.DumpObject(mutated)
|
return cbor.DumpObject(deal)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
28
chain/deals/utils.go
Normal file
28
chain/deals/utils.go
Normal file
@ -0,0 +1,28 @@
|
|||||||
|
package deals
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
)
|
||||||
|
|
||||||
|
func withTemp(r io.Reader, cb func(string) error) error {
|
||||||
|
f, err := ioutil.TempFile(os.TempDir(), "lotus-temp-")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := io.Copy(f, r); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := f.Close(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = cb(f.Name())
|
||||||
|
if err != nil {
|
||||||
|
os.Remove(f.Name())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return os.Remove(f.Name())
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user