diff --git a/chain/deals/client.go b/chain/deals/client.go index f9b047d23..1752f0aaf 100644 --- a/chain/deals/client.go +++ b/chain/deals/client.go @@ -3,21 +3,25 @@ package deals import ( "context" "io/ioutil" + "math" "os" - "sync/atomic" sectorbuilder "github.com/filecoin-project/go-sectorbuilder" "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/namespace" cbor "github.com/ipfs/go-ipld-cbor" logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" + "golang.org/x/xerrors" "github.com/filecoin-project/go-lotus/chain/address" "github.com/filecoin-project/go-lotus/chain/store" "github.com/filecoin-project/go-lotus/chain/types" "github.com/filecoin-project/go-lotus/chain/wallet" "github.com/filecoin-project/go-lotus/lib/cborrpc" + "github.com/filecoin-project/go-lotus/node/modules/dtypes" ) var log = logging.Logger("deals") @@ -30,10 +34,10 @@ const ( DealResolvingMiner = DealStatus(iota) ) -type Deal struct { - ID uint64 - Status DealStatus - Miner peer.ID +type ClientDeal struct { + ProposalCid cid.Cid + Status DealStatus + Miner peer.ID } type Client struct { @@ -41,24 +45,23 @@ type Client struct { h host.Host w *wallet.Wallet - next uint64 - deals map[uint64]Deal + deals StateStore - incoming chan Deal + incoming chan ClientDeal stop chan struct{} stopped chan struct{} } -func NewClient(cs *store.ChainStore, h host.Host, w *wallet.Wallet) *Client { +func NewClient(cs *store.ChainStore, h host.Host, w *wallet.Wallet, ds dtypes.MetadataDS) *Client { c := &Client{ cs: cs, h: h, w: w, - deals: map[uint64]Deal{}, + deals: StateStore{ds: namespace.Wrap(ds, datastore.NewKey("/deals/client"))}, - incoming: make(chan Deal, 16), + incoming: make(chan ClientDeal, 16), stop: make(chan struct{}), stopped: make(chan struct{}), @@ -77,7 +80,10 @@ func (c *Client) Run() { log.Info("incoming deal") // TODO: track in datastore - c.deals[deal.ID] = deal + if err := c.deals.Begin(deal.ProposalCid, deal); err != nil { + log.Errorf("deal state begin failed: %s", err) + continue + } case <-c.stop: return @@ -86,25 +92,25 @@ func (c *Client) Run() { }() } -func (c *Client) Start(ctx context.Context, data cid.Cid, totalPrice types.BigInt, from address.Address, miner address.Address, minerID peer.ID, blocksDuration uint64) (uint64, error) { +func (c *Client) Start(ctx context.Context, data cid.Cid, totalPrice types.BigInt, from address.Address, miner address.Address, minerID peer.ID, blocksDuration uint64) (cid.Cid, error) { // TODO: Eww f, err := ioutil.TempFile(os.TempDir(), "commP-temp-") if err != nil { - return 0, err + return cid.Undef, err } _, err = f.Write([]byte("hello\n")) if err != nil { - return 0, err + return cid.Undef, err } if err := f.Close(); err != nil { - return 0, err + return cid.Undef, err } commP, err := sectorbuilder.GeneratePieceCommitment(f.Name(), 6) if err != nil { - return 0, err + return cid.Undef, err } if err := os.Remove(f.Name()); err != nil { - return 0, err + return cid.Undef, err } dummyCid, _ := cid.Parse("bafkqaaa") @@ -129,7 +135,7 @@ func (c *Client) Start(ctx context.Context, data cid.Cid, totalPrice types.BigIn s, err := c.h.NewStream(ctx, minerID, ProtocolID) if err != nil { - return 0, err + return cid.Undef, err } defer s.Close() // TODO: not too soon? @@ -137,11 +143,11 @@ func (c *Client) Start(ctx context.Context, data cid.Cid, totalPrice types.BigIn msg, err := cbor.DumpObject(proposal) if err != nil { - return 0, err + return cid.Undef, err } sig, err := c.w.Sign(from, msg) if err != nil { - return 0, err + return cid.Undef, err } signedProposal := &SignedStorageDealProposal{ @@ -150,7 +156,7 @@ func (c *Client) Start(ctx context.Context, data cid.Cid, totalPrice types.BigIn } if err := cborrpc.WriteCborRPC(s, signedProposal); err != nil { - return 0, err + return cid.Undef, err } log.Info("Reading response") @@ -158,20 +164,30 @@ func (c *Client) Start(ctx context.Context, data cid.Cid, totalPrice types.BigIn var resp SignedStorageDealResponse if err := cborrpc.ReadCborRPC(s, &resp); err != nil { log.Errorw("failed to read StorageDealResponse message", "error", err) - return 0, err + return cid.Undef, err + } + + // TODO: verify signature + + if resp.Response.State != Accepted { + return cid.Undef, xerrors.Errorf("Deal wasn't accepted (State=%d)", resp.Response.State) } log.Info("Registering deal") - id := atomic.AddUint64(&c.next, 1) - deal := Deal{ - ID: id, - Status: DealResolvingMiner, - Miner: minerID, + proposalNd, err := cbor.WrapObject(proposal, math.MaxUint64, -1) + if err != nil { + return cid.Undef, err + } + + deal := ClientDeal{ + ProposalCid: proposalNd.Cid(), + Status: DealResolvingMiner, + Miner: minerID, } c.incoming <- deal - return id, nil + return proposalNd.Cid(), nil } func (c *Client) Stop() { diff --git a/chain/deals/handler.go b/chain/deals/handler.go index 23e9e28bc..82e51466c 100644 --- a/chain/deals/handler.go +++ b/chain/deals/handler.go @@ -1,24 +1,153 @@ package deals import ( + "context" + "github.com/filecoin-project/go-lotus/lib/sectorbuilder" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/namespace" + files "github.com/ipfs/go-ipfs-files" + "github.com/ipfs/go-merkledag" + unixfile "github.com/ipfs/go-unixfs/file" + "math" + "github.com/filecoin-project/go-lotus/chain/wallet" "github.com/filecoin-project/go-lotus/lib/cborrpc" - "math" + "github.com/filecoin-project/go-lotus/node/modules/dtypes" cbor "github.com/ipfs/go-ipld-cbor" inet "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" ) -type Handler struct { - w *wallet.Wallet +type MinerDeal struct { + Client peer.ID + Proposal StorageDealProposal + ProposalCid cid.Cid + State DealState + + Ref cid.Cid } -func NewHandler(w *wallet.Wallet) *Handler { +type Handler struct { + w *wallet.Wallet + sb *sectorbuilder.SectorBuilder + + // TODO: Use a custom protocol or graphsync in the future + // TODO: GC + dag dtypes.StagingDAG + + deals StateStore + + incoming chan MinerDeal + + stop chan struct{} + stopped chan struct{} +} + +func NewHandler(w *wallet.Wallet, ds dtypes.MetadataDS, sb *sectorbuilder.SectorBuilder, dag dtypes.StagingDAG) *Handler { return &Handler{ - w: w, + w: w, + dag: dag, + + deals: StateStore{ds: namespace.Wrap(ds, datastore.NewKey("/deals/client"))}, } } +func (h *Handler) Run(ctx context.Context) { + go func() { + defer close(h.stopped) + fetched := make(chan cid.Cid) + + for { + select { + case deal := <-h.incoming: + log.Info("incoming deal") + + if err := h.deals.Begin(deal.ProposalCid, deal); err != nil { + // TODO: This can happen when client re-sends proposal + log.Errorf("deal tracking failed: %s", err) + return + } + + go func(id cid.Cid) { + err := merkledag.FetchGraph(ctx, deal.Ref, h.dag) + if err != nil { + return + } + + select { + case fetched <- id: + case <-h.stop: + } + }(deal.ProposalCid) + case id := <-fetched: + // TODO: send response if client still there + // TODO: staging + + // TODO: async + log.Info("sealing deal") + + var deal MinerDeal + err := h.deals.MutateMiner(id, func(in MinerDeal) (MinerDeal, error) { + in.State = Sealing + deal = in + return in, nil + }) + if err != nil { + // TODO: fail deal + log.Errorf("deal tracking failed (set sealing): %s", err) + continue + } + + root, err := h.dag.Get(ctx, deal.Ref) + if err != nil { + // TODO: fail deal + log.Errorf("failed to get file root for deal: %s", err) + return + } + + // TODO: abstract this away into ReadSizeCloser + implement different modes + n, err := unixfile.NewUnixfsFile(ctx, h.dag, root) + if err != nil { + // TODO: fail deal + log.Errorf("cannot open unixfs file: %s", err) + return + } + + f, ok := n.(files.File) + if !ok { + // TODO: we probably got directory, how should we handle this in unixfs mode? + log.Errorf("unsupported unixfs type") + // TODO: fail deal + continue + } + + size, err := f.Size() + if err != nil { + log.Errorf("failed to get file size: %s", err) + // TODO: fail deal + return + } + + // TODO: can we use pipes? + sectorID, err := h.sb.AddPiece(ctx, deal.Proposal.PieceRef, uint64(size), f) + if err != nil { + // TODO: fail deal + log.Errorf("AddPiece failed: %s", err) + return + } + + log.Warnf("New Sector: %d", sectorID) + + // TODO: update state, tell client + case <-h.stop: + return + } + } + }() +} + func (h *Handler) HandleStream(s inet.Stream) { defer s.Close() @@ -33,6 +162,16 @@ func (h *Handler) HandleStream(s inet.Stream) { // TODO: Validate proposal maybe // (and signature, obviously) + switch proposal.Proposal.SerializationMode { + //case SerializationRaw: + //case SerializationIPLD: + case SerializationUnixFs: + default: + log.Errorf("deal proposal with unsupported serialization: %s", proposal.Proposal.SerializationMode) + // TODO: send error + return + } + // TODO: Review: Not signed? proposalNd, err := cbor.WrapObject(proposal.Proposal, math.MaxUint64, -1) if err != nil { @@ -40,8 +179,8 @@ func (h *Handler) HandleStream(s inet.Stream) { } response := StorageDealResponse{ - State: Accepted, - Message: "", + State: Accepted, + Message: "", Proposal: proposalNd.Cid(), } @@ -57,7 +196,7 @@ func (h *Handler) HandleStream(s inet.Stream) { } signedResponse := &SignedStorageDealResponse{ - Response: response, + Response: response, Signature: sig, } if err := cborrpc.WriteCborRPC(s, signedResponse); err != nil { @@ -65,4 +204,17 @@ func (h *Handler) HandleStream(s inet.Stream) { return } + ref, err := cid.Parse(proposal.Proposal.PieceRef) + if err != nil { + return + } + + h.incoming <- MinerDeal{ + Client: s.Conn().RemotePeer(), + Proposal: proposal.Proposal, + ProposalCid: proposalNd.Cid(), + State: Accepted, + + Ref: ref, + } } diff --git a/chain/deals/state_store.go b/chain/deals/state_store.go new file mode 100644 index 000000000..89625ee00 --- /dev/null +++ b/chain/deals/state_store.go @@ -0,0 +1,108 @@ +package deals + +import ( + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + cbor "github.com/ipfs/go-ipld-cbor" + "golang.org/x/xerrors" +) + +type StateStore struct { + ds datastore.Datastore +} + +func (st *StateStore) Begin(i cid.Cid, s interface{}) error { + k := datastore.NewKey(i.String()) + has, err := st.ds.Has(k) + if err != nil { + return err + } + if has { + return xerrors.Errorf("Already tracking state for %s", i) + } + b, err := cbor.DumpObject(s) + if err != nil { + return err + } + + return st.ds.Put(k, b) +} + +func (st *StateStore) End(i cid.Cid) error { + k := datastore.NewKey(i.String()) + has, err := st.ds.Has(k) + if err != nil { + return err + } + if !has { + return xerrors.Errorf("No state for %s", i) + } + return st.ds.Delete(k) +} + +// When this gets used anywhere else, migrate to reflect + +func (st *StateStore) MutateMiner(i cid.Cid, mutator func(MinerDeal) (MinerDeal, error)) error { + return st.mutate(i, minerMutator(mutator)) +} + +func minerMutator(m func(MinerDeal) (MinerDeal, error)) func([]byte) ([]byte, error) { + return func(in []byte) ([]byte, error) { + var cur MinerDeal + err := cbor.DecodeInto(in, cur) + if err != nil { + return nil, err + } + + mutated, err := m(cur) + if err != nil { + return nil, err + } + + return cbor.DumpObject(mutated) + } +} + +func (st *StateStore) MutateClient(i cid.Cid, mutator func(ClientDeal) (ClientDeal, error)) error { + return st.mutate(i, clientMutator(mutator)) +} + +func clientMutator(m func(ClientDeal) (ClientDeal, error)) func([]byte) ([]byte, error) { + return func(in []byte) ([]byte, error) { + var cur ClientDeal + err := cbor.DecodeInto(in, cur) + if err != nil { + return nil, err + } + + mutated, err := m(cur) + if err != nil { + return nil, err + } + + return cbor.DumpObject(mutated) + } +} + +func (st *StateStore) mutate(i cid.Cid, mutator func([]byte) ([]byte, error)) error { + k := datastore.NewKey(i.String()) + has, err := st.ds.Has(k) + if err != nil { + return err + } + if !has { + return xerrors.Errorf("No state for %s", i) + } + + cur, err := st.ds.Get(k) + if err != nil { + return err + } + + mutated, err := mutator(cur) + if err != nil { + return err + } + + return st.ds.Put(k, mutated) +} diff --git a/go.mod b/go.mod index 29d51b76a..ca70b2ba3 100644 --- a/go.mod +++ b/go.mod @@ -53,6 +53,7 @@ require ( github.com/libp2p/go-libp2p-tls v0.1.0 github.com/libp2p/go-libp2p-yamux v0.2.1 github.com/libp2p/go-maddr-filter v0.0.5 + github.com/libp2p/go-msgio v0.0.4 github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 github.com/mitchellh/go-homedir v1.1.0 github.com/multiformats/go-base32 v0.0.3 diff --git a/lib/bytesink/fifo.go b/lib/bytesink/fifo.go new file mode 100644 index 000000000..d035fe83e --- /dev/null +++ b/lib/bytesink/fifo.go @@ -0,0 +1,106 @@ +// +build !windows +// TODO: windows now has pipes, verify if it maybe works + +// TODO: extract from filecoin + +package bytesink + +import ( + "fmt" + "io/ioutil" + "os" + "syscall" + + "github.com/pkg/errors" +) + +// FifoByteSink is not safe for concurrent access, as writes to underlying pipe are atomic only +// if len(buf) is less than the OS-specific PIPE_BUF value. +type FifoByteSink struct { + file *os.File + path string +} + +// Open prepares the sink for writing by opening the backing FIFO file. Open +// will block until someone opens the FIFO file for reading. +func (s *FifoByteSink) Open() error { + file, err := os.OpenFile(s.path, os.O_WRONLY, os.ModeNamedPipe) + if err != nil { + return errors.Wrap(err, "failed to open pipe") + } + + s.file = file + + return nil +} + +// Write writes the provided buffer to the underlying pipe. Write will block +// until the provided buffer's bytes have been read from the read end of the +// pipe. +// +// Warning: Writes are atomic only if len(buf) is less than the OS-specific +// PIPE_BUF value. For more information, see: +// +// http://pubs.opengroup.org/onlinepubs/9699919799/functions/write.html +func (s *FifoByteSink) Write(buf []byte) (int, error) { + return s.file.Write(buf) +} + +// Close ensures that the underlying file is closed and removed. +func (s *FifoByteSink) Close() (retErr error) { + cerr := s.file.Close() + if cerr != nil { + return cerr + } + + defer func() { + rerr := os.Remove(s.path) + if retErr == nil { + retErr = rerr + } + }() + + return +} + +// ID produces a string-identifier for this byte sink. For now, this is just the +// path of the FIFO file. This string may get more structured in the future. +func (s *FifoByteSink) ID() string { + return s.path +} + +// NewFifo creates a FIFO pipe and returns a pointer to a FifoByteSink, which +// satisfies the ByteSink interface. The FIFO pipe is used to stream bytes to +// rust-fil-proofs from Go during the piece-adding flow. Writes to the pipe are +// buffered automatically by the OS; the size of the buffer varies. +func NewFifo() (*FifoByteSink, error) { + path, err := createTmpFifoPath() + if err != nil { + return nil, errors.Wrap(err, "creating FIFO path failed") + } + + err = syscall.Mkfifo(path, 0600) + if err != nil { + return nil, errors.Wrap(err, "mkfifo failed") + } + + return &FifoByteSink{ + path: path, + }, nil +} + +// createTmpFifoPath creates a path with which a temporary FIFO file may be +// created. +func createTmpFifoPath() (string, error) { + file, err := ioutil.TempFile("", "") + if err != nil { + return "", err + } + + err = file.Close() + if err != nil { + return "", err + } + + return fmt.Sprintf("%s.fifo", file.Name()), nil +} diff --git a/lib/nsbsnet/bsnet.go b/lib/nsbsnet/bsnet.go new file mode 100644 index 000000000..543b74334 --- /dev/null +++ b/lib/nsbsnet/bsnet.go @@ -0,0 +1,245 @@ +package nsbsnet + +import ( + "context" + "fmt" + "github.com/libp2p/go-libp2p-core/protocol" + "io" + "sync/atomic" + "time" + + bsmsg "github.com/ipfs/go-bitswap/message" + bsnet "github.com/ipfs/go-bitswap/network" + "github.com/libp2p/go-libp2p-core/helpers" + + cid "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log" + "github.com/libp2p/go-libp2p-core/connmgr" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + peerstore "github.com/libp2p/go-libp2p-core/peerstore" + "github.com/libp2p/go-libp2p-core/routing" + msgio "github.com/libp2p/go-msgio" + ma "github.com/multiformats/go-multiaddr" +) + +// TODO: Upstream to bitswap + +var log = logging.Logger("nsbsnet") + +var sendMessageTimeout = time.Minute * 10 + +// NewFromIpfsHost returns a BitSwapNetwork supported by underlying IPFS host. +func NewFromIpfsHost(host host.Host, r routing.ContentRouting, prefix protocol.ID) bsnet.BitSwapNetwork { + bitswapNetwork := impl{ + host: host, + routing: r, + prefix: prefix, + } + return &bitswapNetwork +} + +// impl transforms the ipfs network interface, which sends and receives +// NetMessage objects, into the bitswap network interface. +type impl struct { + host host.Host + routing routing.ContentRouting + prefix protocol.ID + + // inbound messages from the network are forwarded to the receiver + receiver bsnet.Receiver + + stats bsnet.Stats +} + +type streamMessageSender struct { + i *impl + s network.Stream +} + +func (i *impl) ProtocolBitswap() protocol.ID { + return i.prefix + bsnet.ProtocolBitswap +} +func (i *impl) ProtocolBitswapOne() protocol.ID { + return i.prefix + bsnet.ProtocolBitswapOne +} +func (i *impl) ProtocolBitswapNoVers() protocol.ID { + return i.prefix + bsnet.ProtocolBitswapNoVers +} +func (s *streamMessageSender) Close() error { + return helpers.FullClose(s.s) +} + +func (s *streamMessageSender) Reset() error { + return s.s.Reset() +} + +func (s *streamMessageSender) SendMsg(ctx context.Context, msg bsmsg.BitSwapMessage) error { + return s.i.msgToStream(ctx, s.s, msg) +} + +func (i *impl) msgToStream(ctx context.Context, s network.Stream, msg bsmsg.BitSwapMessage) error { + deadline := time.Now().Add(sendMessageTimeout) + if dl, ok := ctx.Deadline(); ok { + deadline = dl + } + + if err := s.SetWriteDeadline(deadline); err != nil { + log.Warningf("error setting deadline: %s", err) + } + + switch s.Protocol() { + case i.ProtocolBitswap(): + if err := msg.ToNetV1(s); err != nil { + log.Debugf("error: %s", err) + return err + } + case i.ProtocolBitswapOne(), i.ProtocolBitswapNoVers(): + if err := msg.ToNetV0(s); err != nil { + log.Debugf("error: %s", err) + return err + } + default: + return fmt.Errorf("unrecognized protocol on remote: %s", s.Protocol()) + } + + if err := s.SetWriteDeadline(time.Time{}); err != nil { + log.Warningf("error resetting deadline: %s", err) + } + return nil +} + +func (i *impl) NewMessageSender(ctx context.Context, p peer.ID) (bsnet.MessageSender, error) { + s, err := i.newStreamToPeer(ctx, p) + if err != nil { + return nil, err + } + + return &streamMessageSender{i: i, s: s}, nil +} + +func (i *impl) newStreamToPeer(ctx context.Context, p peer.ID) (network.Stream, error) { + return i.host.NewStream(ctx, p, i.ProtocolBitswap(), i.ProtocolBitswapOne(), i.ProtocolBitswapNoVers()) +} + +func (i *impl) SendMessage( + ctx context.Context, + p peer.ID, + outgoing bsmsg.BitSwapMessage) error { + + s, err := i.newStreamToPeer(ctx, p) + if err != nil { + return err + } + + if err = i.msgToStream(ctx, s, outgoing); err != nil { + s.Reset() + return err + } + atomic.AddUint64(&i.stats.MessagesSent, 1) + + // TODO(https://github.com/libp2p/go-libp2p-net/issues/28): Avoid this goroutine. + go helpers.AwaitEOF(s) + return s.Close() + +} + +func (i *impl) SetDelegate(r bsnet.Receiver) { + i.receiver = r + i.host.SetStreamHandler(i.ProtocolBitswap(), i.handleNewStream) + i.host.SetStreamHandler(i.ProtocolBitswapOne(), i.handleNewStream) + i.host.SetStreamHandler(i.ProtocolBitswapNoVers(), i.handleNewStream) + i.host.Network().Notify((*netNotifiee)(i)) + // TODO: StopNotify. + +} + +func (i *impl) ConnectTo(ctx context.Context, p peer.ID) error { + return i.host.Connect(ctx, peer.AddrInfo{ID: p}) +} + +// FindProvidersAsync returns a channel of providers for the given key. +func (i *impl) FindProvidersAsync(ctx context.Context, k cid.Cid, max int) <-chan peer.ID { + out := make(chan peer.ID, max) + go func() { + defer close(out) + providers := i.routing.FindProvidersAsync(ctx, k, max) + for info := range providers { + if info.ID == i.host.ID() { + continue // ignore self as provider + } + i.host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.TempAddrTTL) + select { + case <-ctx.Done(): + return + case out <- info.ID: + } + } + }() + return out +} + +// Provide provides the key to the network +func (i *impl) Provide(ctx context.Context, k cid.Cid) error { + return i.routing.Provide(ctx, k, true) +} + +// handleNewStream receives a new stream from the network. +func (i *impl) handleNewStream(s network.Stream) { + defer s.Close() + + if i.receiver == nil { + s.Reset() + return + } + + reader := msgio.NewVarintReaderSize(s, network.MessageSizeMax) + for { + received, err := bsmsg.FromMsgReader(reader) + if err != nil { + if err != io.EOF { + s.Reset() + go i.receiver.ReceiveError(err) + log.Debugf("bitswap net handleNewStream from %s error: %s", s.Conn().RemotePeer(), err) + } + return + } + + p := s.Conn().RemotePeer() + ctx := context.Background() + log.Debugf("bitswap net handleNewStream from %s", s.Conn().RemotePeer()) + i.receiver.ReceiveMessage(ctx, p, received) + atomic.AddUint64(&i.stats.MessagesRecvd, 1) + } +} + +func (i *impl) ConnectionManager() connmgr.ConnManager { + return i.host.ConnManager() +} + +func (i *impl) Stats() bsnet.Stats { + return bsnet.Stats{ + MessagesRecvd: atomic.LoadUint64(&i.stats.MessagesRecvd), + MessagesSent: atomic.LoadUint64(&i.stats.MessagesSent), + } +} + +type netNotifiee impl + +func (nn *netNotifiee) impl() *impl { + return (*impl)(nn) +} + +func (nn *netNotifiee) Connected(n network.Network, v network.Conn) { + nn.impl().receiver.PeerConnected(v.RemotePeer()) +} + +func (nn *netNotifiee) Disconnected(n network.Network, v network.Conn) { + nn.impl().receiver.PeerDisconnected(v.RemotePeer()) +} + +func (nn *netNotifiee) OpenedStream(n network.Network, v network.Stream) {} +func (nn *netNotifiee) ClosedStream(n network.Network, v network.Stream) {} +func (nn *netNotifiee) Listen(n network.Network, a ma.Multiaddr) {} +func (nn *netNotifiee) ListenClose(n network.Network, a ma.Multiaddr) {} diff --git a/lib/sectorbuilder/sectorbuilder.go b/lib/sectorbuilder/sectorbuilder.go index 63cc10988..468238457 100644 --- a/lib/sectorbuilder/sectorbuilder.go +++ b/lib/sectorbuilder/sectorbuilder.go @@ -3,12 +3,17 @@ package sectorbuilder import ( "context" "encoding/binary" + "io" "unsafe" - "github.com/filecoin-project/go-lotus/chain/address" + "golang.org/x/xerrors" + sectorbuilder "github.com/filecoin-project/go-sectorbuilder" logging "github.com/ipfs/go-log" + + "github.com/filecoin-project/go-lotus/chain/address" + "github.com/filecoin-project/go-lotus/lib/bytesink" ) var log = logging.Logger("sectorbuilder") @@ -67,8 +72,84 @@ func (sb *SectorBuilder) Destroy() { sectorbuilder.DestroySectorBuilder(sb.handle) } -func (sb *SectorBuilder) AddPiece(pieceKey string, pieceSize uint64, piecePath string) (uint64, error) { - return sectorbuilder.AddPiece(sb.handle, pieceKey, pieceSize, piecePath) +func (sb *SectorBuilder) AddPiece(ctx context.Context, pieceRef string, pieceSize uint64, pieceReader io.ReadCloser) (uint64, error) { + fifoFile, err := bytesink.NewFifo() + if err != nil { + return 0, err + } + + // errCh holds any error encountered when streaming bytes or making the CGO + // call. The channel is buffered so that the goroutines can exit, which will + // close the pipe, which unblocks the CGO call. + errCh := make(chan error, 2) + + // sectorIDCh receives a value if the CGO call indicates that the client + // piece has successfully been added to a sector. The channel is buffered + // so that the goroutine can exit if a value is sent to errCh before the + // CGO call completes. + sectorIDCh := make(chan uint64, 1) + + // goroutine attempts to copy bytes from piece's reader to the fifoFile + go func() { + // opening the fifoFile blocks the goroutine until a reader is opened on the + // other end of the FIFO pipe + err := fifoFile.Open() + if err != nil { + errCh <- xerrors.Errorf("failed to open fifoFile: %w", err) + return + } + + // closing theg s fifoFile signals to the reader that we're done writing, which + // unblocks the reader + defer func() { + err := fifoFile.Close() + if err != nil { + log.Warnf("failed to close fifoFile: %s", err) + } + }() + + n, err := io.Copy(fifoFile, pieceReader) + if err != nil { + errCh <- xerrors.Errorf("failed to copy to pipe: %w", err) + return + } + + if uint64(n) != pieceSize { + errCh <- xerrors.Errorf("expected to write %d bytes but wrote %d", pieceSize, n) + return + } + }() + + // goroutine makes CGO call, which blocks until FIFO pipe opened for writing + // from within other goroutine + go func() { + id, err := sectorbuilder.AddPiece(sb.handle, pieceRef, pieceSize, fifoFile.ID()) + if err != nil { + msg := "CGO add_piece returned an error (err=%s, fifo path=%s)" + log.Errorf(msg, err, fifoFile.ID()) + errCh <- err + return + } + + sectorIDCh <- id + }() + + select { + case <-ctx.Done(): + errStr := "context completed before CGO call could return" + strFmt := "%s (sinkPath=%s)" + log.Errorf(strFmt, errStr, fifoFile.ID()) + + return 0, xerrors.New(errStr) + case err := <-errCh: + errStr := "error streaming piece-bytes" + strFmt := "%s (sinkPath=%s)" + log.Errorf(strFmt, errStr, fifoFile.ID()) + + return 0, xerrors.Errorf("%w: %s", errStr, err) + case sectorID := <-sectorIDCh: + return sectorID, nil + } } // TODO: should *really really* return an io.ReadCloser diff --git a/node/builder.go b/node/builder.go index 68ec9dcc4..dae50da06 100644 --- a/node/builder.go +++ b/node/builder.go @@ -221,6 +221,8 @@ func Online() Option { Override(new(*sectorbuilder.SectorBuilder), modules.SectorBuilder), Override(new(*storage.Miner), modules.StorageMiner), + Override(new(dtypes.StagingDAG), modules.StagingDAG), + Override(new(*deals.Handler), deals.NewHandler), Override(HandleDealsKey, modules.HandleDeals), ), diff --git a/node/impl/storminer.go b/node/impl/storminer.go index 3219f9943..bac1da2c8 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -3,6 +3,7 @@ package impl import ( "context" "fmt" + "io" "io/ioutil" "math/rand" @@ -20,20 +21,12 @@ type StorageMinerAPI struct { } func (sm *StorageMinerAPI) StoreGarbageData(ctx context.Context) (uint64, error) { - maxSize := uint64(1016) // this is the most data we can fit in a 1024 byte sector - data := make([]byte, maxSize) - fi, err := ioutil.TempFile("", "lotus-garbage") - if err != nil { - return 0, err - } - - if _, err := fi.Write(data); err != nil { - return 0, err - } - fi.Close() + maxSize := 1016 // this is the most data we can fit in a 1024 byte sector name := fmt.Sprintf("fake-file-%d", rand.Intn(100000000)) - sectorId, err := sm.SectorBuilder.AddPiece(name, maxSize, fi.Name()) + + sectorId, err := sm.SectorBuilder.AddPiece(ctx, name, uint64(maxSize), + ioutil.NopCloser(io.LimitReader(rand.New(rand.NewSource(rand.Int63())), int64(maxSize)))) if err != nil { return 0, err } diff --git a/node/modules/chain.go b/node/modules/chain.go index 1de38d4fa..31ac0c727 100644 --- a/node/modules/chain.go +++ b/node/modules/chain.go @@ -5,7 +5,6 @@ import ( "context" "github.com/ipfs/go-bitswap" - "github.com/ipfs/go-bitswap/network" "github.com/ipfs/go-blockservice" "github.com/ipfs/go-car" "github.com/ipfs/go-datastore" @@ -17,13 +16,16 @@ import ( "github.com/filecoin-project/go-lotus/chain/store" "github.com/filecoin-project/go-lotus/chain/types" + "github.com/filecoin-project/go-lotus/lib/nsbsnet" "github.com/filecoin-project/go-lotus/node/modules/dtypes" "github.com/filecoin-project/go-lotus/node/modules/helpers" "github.com/filecoin-project/go-lotus/node/repo" ) func ChainExchange(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt routing.Routing, bs dtypes.ChainGCBlockstore) dtypes.ChainExchange { - bitswapNetwork := network.NewFromIpfsHost(host, rt) + // prefix protocol for chain bitswap + // (so bitswap uses /chain/ipfs/bitswap/1.0.0 internally for chain sync stuff) + bitswapNetwork := nsbsnet.NewFromIpfsHost(host, rt, "/chain") exch := bitswap.New(helpers.LifecycleCtx(mctx, lc), bitswapNetwork, bs) lc.Append(fx.Hook{ OnStop: func(ctx context.Context) error { diff --git a/node/modules/client.go b/node/modules/client.go index 401e19231..0ddd5b7d2 100644 --- a/node/modules/client.go +++ b/node/modules/client.go @@ -2,6 +2,11 @@ package modules import ( "context" + "github.com/filecoin-project/go-lotus/node/modules/helpers" + "github.com/ipfs/go-bitswap" + "github.com/ipfs/go-bitswap/network" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/routing" "path/filepath" "github.com/ipfs/go-blockservice" @@ -9,7 +14,6 @@ import ( "github.com/ipfs/go-datastore/namespace" "github.com/ipfs/go-filestore" blockstore "github.com/ipfs/go-ipfs-blockstore" - offline "github.com/ipfs/go-ipfs-exchange-offline" "github.com/ipfs/go-merkledag" "go.uber.org/fx" @@ -32,9 +36,13 @@ func ClientFstore(r repo.LockedRepo) (dtypes.ClientFilestore, error) { return filestore.NewFilestore(bs, fm), nil } -func ClientDAG(lc fx.Lifecycle, fstore dtypes.ClientFilestore) dtypes.ClientDAG { +func ClientDAG(mctx helpers.MetricsCtx, lc fx.Lifecycle, fstore dtypes.ClientFilestore, rt routing.Routing, h host.Host) dtypes.ClientDAG { ibs := blockstore.NewIdStore((*filestore.Filestore)(fstore)) - bsvc := blockservice.New(ibs, offline.Exchange(ibs)) + + bitswapNetwork := network.NewFromIpfsHost(h, rt) + exch := bitswap.New(helpers.LifecycleCtx(mctx, lc), bitswapNetwork, ibs) + + bsvc := blockservice.New(ibs, exch) dag := merkledag.NewDAGService(bsvc) lc.Append(fx.Hook{ diff --git a/node/modules/dtypes/storage.go b/node/modules/dtypes/storage.go index 46418b689..a33f68c89 100644 --- a/node/modules/dtypes/storage.go +++ b/node/modules/dtypes/storage.go @@ -22,3 +22,5 @@ type ChainBlockService bserv.BlockService type ClientFilestore *filestore.Filestore type ClientDAG ipld.DAGService + +type StagingDAG ipld.DAGService diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 36284c23e..e077cc22d 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -2,9 +2,15 @@ package modules import ( "context" + "github.com/ipfs/go-bitswap" + "github.com/ipfs/go-bitswap/network" + "github.com/libp2p/go-libp2p-core/routing" "path/filepath" + "github.com/ipfs/go-blockservice" "github.com/ipfs/go-datastore" + blockstore "github.com/ipfs/go-ipfs-blockstore" + "github.com/ipfs/go-merkledag" "github.com/libp2p/go-libp2p-core/host" "github.com/mitchellh/go-homedir" "go.uber.org/fx" @@ -16,6 +22,7 @@ import ( "github.com/filecoin-project/go-lotus/lib/sectorbuilder" "github.com/filecoin-project/go-lotus/node/modules/dtypes" "github.com/filecoin-project/go-lotus/node/modules/helpers" + "github.com/filecoin-project/go-lotus/node/repo" "github.com/filecoin-project/go-lotus/storage" ) @@ -99,3 +106,27 @@ func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h func HandleDeals(h host.Host, handler *deals.Handler) { h.SetStreamHandler(deals.ProtocolID, handler.HandleStream) } + +func StagingDAG(mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, rt routing.Routing, h host.Host) (dtypes.StagingDAG, error) { + stagingds, err := r.Datastore("/staging") + if err != nil { + return nil, err + } + + bs := blockstore.NewBlockstore(stagingds) + ibs := blockstore.NewIdStore(bs) + + bitswapNetwork := network.NewFromIpfsHost(h, rt) + exch := bitswap.New(helpers.LifecycleCtx(mctx, lc), bitswapNetwork, bs) + + bsvc := blockservice.New(ibs, exch) + dag := merkledag.NewDAGService(bsvc) + + lc.Append(fx.Hook{ + OnStop: func(_ context.Context) error { + return bsvc.Close() + }, + }) + + return dag, nil +}