Merge pull request #9565 from filecoin-project/feat/netretr

feat: Retrieval into remote blockstores
This commit is contained in:
Łukasz Magiera 2022-11-08 13:28:18 +00:00 committed by GitHub
commit a678b7f0ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
66 changed files with 1975 additions and 404 deletions

View File

@ -896,6 +896,11 @@ workflows:
suite: itest-deals_publish
target: "./itests/deals_publish_test.go"
- test:
name: test-itest-deals_remote_retrieval
suite: itest-deals_remote_retrieval
target: "./itests/deals_remote_retrieval_test.go"
- test:
name: test-itest-deals_retry_deal_no_funds
suite: itest-deals_retry_deal_no_funds

View File

@ -6,6 +6,7 @@ import (
"fmt"
"time"
"github.com/google/uuid"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p/core/peer"
@ -1012,8 +1013,12 @@ type RetrievalOrder struct {
Client address.Address
Miner address.Address
MinerPeer *retrievalmarket.RetrievalPeer
RemoteStore *RemoteStoreID `json:"RemoteStore,omitempty"`
}
type RemoteStoreID = uuid.UUID
type InvocResult struct {
MsgCid cid.Cid
Msg *types.Message

View File

@ -361,6 +361,7 @@ func init() {
Headers: nil,
},
})
addExample(&uuid.UUID{})
}
func GetAPIType(name, pkg string) (i interface{}, t reflect.Type, permStruct []reflect.Type) {

441
blockstore/cbor_gen.go Normal file
View File

@ -0,0 +1,441 @@
// Code generated by github.com/whyrusleeping/cbor-gen. DO NOT EDIT.
package blockstore
import (
"fmt"
"io"
"math"
"sort"
cid "github.com/ipfs/go-cid"
cbg "github.com/whyrusleeping/cbor-gen"
xerrors "golang.org/x/xerrors"
)
var _ = xerrors.Errorf
var _ = cid.Undef
var _ = math.E
var _ = sort.Sort
var lengthBufNetRpcReq = []byte{132}
func (t *NetRpcReq) MarshalCBOR(w io.Writer) error {
if t == nil {
_, err := w.Write(cbg.CborNull)
return err
}
cw := cbg.NewCborWriter(w)
if _, err := cw.Write(lengthBufNetRpcReq); err != nil {
return err
}
// t.Type (blockstore.NetRPCReqType) (uint8)
if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.Type)); err != nil {
return err
}
// t.ID (uint64) (uint64)
if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.ID)); err != nil {
return err
}
// t.Cid ([]cid.Cid) (slice)
if len(t.Cid) > cbg.MaxLength {
return xerrors.Errorf("Slice value in field t.Cid was too long")
}
if err := cw.WriteMajorTypeHeader(cbg.MajArray, uint64(len(t.Cid))); err != nil {
return err
}
for _, v := range t.Cid {
if err := cbg.WriteCid(w, v); err != nil {
return xerrors.Errorf("failed writing cid field t.Cid: %w", err)
}
}
// t.Data ([][]uint8) (slice)
if len(t.Data) > cbg.MaxLength {
return xerrors.Errorf("Slice value in field t.Data was too long")
}
if err := cw.WriteMajorTypeHeader(cbg.MajArray, uint64(len(t.Data))); err != nil {
return err
}
for _, v := range t.Data {
if len(v) > cbg.ByteArrayMaxLen {
return xerrors.Errorf("Byte array in field v was too long")
}
if err := cw.WriteMajorTypeHeader(cbg.MajByteString, uint64(len(v))); err != nil {
return err
}
if _, err := cw.Write(v[:]); err != nil {
return err
}
}
return nil
}
func (t *NetRpcReq) UnmarshalCBOR(r io.Reader) (err error) {
*t = NetRpcReq{}
cr := cbg.NewCborReader(r)
maj, extra, err := cr.ReadHeader()
if err != nil {
return err
}
defer func() {
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
}()
if maj != cbg.MajArray {
return fmt.Errorf("cbor input should be of type array")
}
if extra != 4 {
return fmt.Errorf("cbor input had wrong number of fields")
}
// t.Type (blockstore.NetRPCReqType) (uint8)
maj, extra, err = cr.ReadHeader()
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint8 field")
}
if extra > math.MaxUint8 {
return fmt.Errorf("integer in input was too large for uint8 field")
}
t.Type = NetRPCReqType(extra)
// t.ID (uint64) (uint64)
{
maj, extra, err = cr.ReadHeader()
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.ID = uint64(extra)
}
// t.Cid ([]cid.Cid) (slice)
maj, extra, err = cr.ReadHeader()
if err != nil {
return err
}
if extra > cbg.MaxLength {
return fmt.Errorf("t.Cid: array too large (%d)", extra)
}
if maj != cbg.MajArray {
return fmt.Errorf("expected cbor array")
}
if extra > 0 {
t.Cid = make([]cid.Cid, extra)
}
for i := 0; i < int(extra); i++ {
c, err := cbg.ReadCid(cr)
if err != nil {
return xerrors.Errorf("reading cid field t.Cid failed: %w", err)
}
t.Cid[i] = c
}
// t.Data ([][]uint8) (slice)
maj, extra, err = cr.ReadHeader()
if err != nil {
return err
}
if extra > cbg.MaxLength {
return fmt.Errorf("t.Data: array too large (%d)", extra)
}
if maj != cbg.MajArray {
return fmt.Errorf("expected cbor array")
}
if extra > 0 {
t.Data = make([][]uint8, extra)
}
for i := 0; i < int(extra); i++ {
{
var maj byte
var extra uint64
var err error
maj, extra, err = cr.ReadHeader()
if err != nil {
return err
}
if extra > cbg.ByteArrayMaxLen {
return fmt.Errorf("t.Data[i]: byte array too large (%d)", extra)
}
if maj != cbg.MajByteString {
return fmt.Errorf("expected byte array")
}
if extra > 0 {
t.Data[i] = make([]uint8, extra)
}
if _, err := io.ReadFull(cr, t.Data[i][:]); err != nil {
return err
}
}
}
return nil
}
var lengthBufNetRpcResp = []byte{131}
func (t *NetRpcResp) MarshalCBOR(w io.Writer) error {
if t == nil {
_, err := w.Write(cbg.CborNull)
return err
}
cw := cbg.NewCborWriter(w)
if _, err := cw.Write(lengthBufNetRpcResp); err != nil {
return err
}
// t.Type (blockstore.NetRPCRespType) (uint8)
if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.Type)); err != nil {
return err
}
// t.ID (uint64) (uint64)
if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.ID)); err != nil {
return err
}
// t.Data ([]uint8) (slice)
if len(t.Data) > cbg.ByteArrayMaxLen {
return xerrors.Errorf("Byte array in field t.Data was too long")
}
if err := cw.WriteMajorTypeHeader(cbg.MajByteString, uint64(len(t.Data))); err != nil {
return err
}
if _, err := cw.Write(t.Data[:]); err != nil {
return err
}
return nil
}
func (t *NetRpcResp) UnmarshalCBOR(r io.Reader) (err error) {
*t = NetRpcResp{}
cr := cbg.NewCborReader(r)
maj, extra, err := cr.ReadHeader()
if err != nil {
return err
}
defer func() {
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
}()
if maj != cbg.MajArray {
return fmt.Errorf("cbor input should be of type array")
}
if extra != 3 {
return fmt.Errorf("cbor input had wrong number of fields")
}
// t.Type (blockstore.NetRPCRespType) (uint8)
maj, extra, err = cr.ReadHeader()
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint8 field")
}
if extra > math.MaxUint8 {
return fmt.Errorf("integer in input was too large for uint8 field")
}
t.Type = NetRPCRespType(extra)
// t.ID (uint64) (uint64)
{
maj, extra, err = cr.ReadHeader()
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.ID = uint64(extra)
}
// t.Data ([]uint8) (slice)
maj, extra, err = cr.ReadHeader()
if err != nil {
return err
}
if extra > cbg.ByteArrayMaxLen {
return fmt.Errorf("t.Data: byte array too large (%d)", extra)
}
if maj != cbg.MajByteString {
return fmt.Errorf("expected byte array")
}
if extra > 0 {
t.Data = make([]uint8, extra)
}
if _, err := io.ReadFull(cr, t.Data[:]); err != nil {
return err
}
return nil
}
var lengthBufNetRpcErr = []byte{131}
func (t *NetRpcErr) MarshalCBOR(w io.Writer) error {
if t == nil {
_, err := w.Write(cbg.CborNull)
return err
}
cw := cbg.NewCborWriter(w)
if _, err := cw.Write(lengthBufNetRpcErr); err != nil {
return err
}
// t.Type (blockstore.NetRPCErrType) (uint8)
if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.Type)); err != nil {
return err
}
// t.Msg (string) (string)
if len(t.Msg) > cbg.MaxLength {
return xerrors.Errorf("Value in field t.Msg was too long")
}
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Msg))); err != nil {
return err
}
if _, err := io.WriteString(w, string(t.Msg)); err != nil {
return err
}
// t.Cid (cid.Cid) (struct)
if t.Cid == nil {
if _, err := cw.Write(cbg.CborNull); err != nil {
return err
}
} else {
if err := cbg.WriteCid(cw, *t.Cid); err != nil {
return xerrors.Errorf("failed to write cid field t.Cid: %w", err)
}
}
return nil
}
func (t *NetRpcErr) UnmarshalCBOR(r io.Reader) (err error) {
*t = NetRpcErr{}
cr := cbg.NewCborReader(r)
maj, extra, err := cr.ReadHeader()
if err != nil {
return err
}
defer func() {
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
}()
if maj != cbg.MajArray {
return fmt.Errorf("cbor input should be of type array")
}
if extra != 3 {
return fmt.Errorf("cbor input had wrong number of fields")
}
// t.Type (blockstore.NetRPCErrType) (uint8)
maj, extra, err = cr.ReadHeader()
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint8 field")
}
if extra > math.MaxUint8 {
return fmt.Errorf("integer in input was too large for uint8 field")
}
t.Type = NetRPCErrType(extra)
// t.Msg (string) (string)
{
sval, err := cbg.ReadString(cr)
if err != nil {
return err
}
t.Msg = string(sval)
}
// t.Cid (cid.Cid) (struct)
{
b, err := cr.ReadByte()
if err != nil {
return err
}
if b != cbg.CborNull[0] {
if err := cr.UnreadByte(); err != nil {
return err
}
c, err := cbg.ReadCid(cr)
if err != nil {
return xerrors.Errorf("failed to read cid field t.Cid: %w", err)
}
t.Cid = &c
}
}
return nil
}

424
blockstore/net.go Normal file
View File

@ -0,0 +1,424 @@
package blockstore
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"sync"
"sync/atomic"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
"github.com/libp2p/go-msgio"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"
)
type NetRPCReqType byte
const (
NRpcHas NetRPCReqType = iota
NRpcGet
NRpcGetSize
NRpcPut
NRpcDelete
// todo cancel req
)
type NetRPCRespType byte
const (
NRpcOK NetRPCRespType = iota
NRpcErr
NRpcMore
)
type NetRPCErrType byte
const (
NRpcErrGeneric NetRPCErrType = iota
NRpcErrNotFound
)
type NetRpcReq struct {
Type NetRPCReqType
ID uint64
Cid []cid.Cid // todo maxsize?
Data [][]byte // todo maxsize?
}
type NetRpcResp struct {
Type NetRPCRespType
ID uint64
// error or cids in allkeys
Data []byte // todo maxsize?
next <-chan NetRpcResp
}
type NetRpcErr struct {
Type NetRPCErrType
Msg string
// in case of NRpcErrNotFound
Cid *cid.Cid
}
type NetworkStore struct {
// note: writer is thread-safe
msgStream msgio.ReadWriteCloser
// atomic
reqCount uint64
respLk sync.Mutex
// respMap is nil after store closes
respMap map[uint64]chan<- NetRpcResp
closing chan struct{}
closed chan struct{}
closeLk sync.Mutex
onClose []func()
}
func NewNetworkStore(mss msgio.ReadWriteCloser) *NetworkStore {
ns := &NetworkStore{
msgStream: mss,
respMap: map[uint64]chan<- NetRpcResp{},
closing: make(chan struct{}),
closed: make(chan struct{}),
}
go ns.receive()
return ns
}
func (n *NetworkStore) shutdown(msg string) {
if err := n.msgStream.Close(); err != nil {
log.Errorw("closing netstore msg stream", "error", err)
}
nerr := NetRpcErr{
Type: NRpcErrGeneric,
Msg: msg,
Cid: nil,
}
var errb bytes.Buffer
if err := nerr.MarshalCBOR(&errb); err != nil {
log.Errorw("netstore shutdown: error marshaling error", "err", err)
}
n.respLk.Lock()
for id, resps := range n.respMap {
resps <- NetRpcResp{
Type: NRpcErr,
ID: id,
Data: errb.Bytes(),
}
}
n.respMap = nil
n.respLk.Unlock()
}
func (n *NetworkStore) OnClose(cb func()) {
n.closeLk.Lock()
defer n.closeLk.Unlock()
select {
case <-n.closed:
cb()
default:
n.onClose = append(n.onClose, cb)
}
}
func (n *NetworkStore) receive() {
defer func() {
n.closeLk.Lock()
defer n.closeLk.Unlock()
close(n.closed)
if n.onClose != nil {
for _, f := range n.onClose {
f()
}
}
}()
for {
select {
case <-n.closing:
n.shutdown("netstore stopping")
return
default:
}
msg, err := n.msgStream.ReadMsg()
if err != nil {
n.shutdown(fmt.Sprintf("netstore ReadMsg: %s", err))
return
}
var resp NetRpcResp
if err := resp.UnmarshalCBOR(bytes.NewReader(msg)); err != nil {
n.shutdown(fmt.Sprintf("unmarshaling netstore response: %s", err))
return
}
n.msgStream.ReleaseMsg(msg)
n.respLk.Lock()
if ch, ok := n.respMap[resp.ID]; ok {
if resp.Type == NRpcMore {
nch := make(chan NetRpcResp, 1)
resp.next = nch
n.respMap[resp.ID] = nch
} else {
delete(n.respMap, resp.ID)
}
ch <- resp
}
n.respLk.Unlock()
}
}
func (n *NetworkStore) sendRpc(rt NetRPCReqType, cids []cid.Cid, data [][]byte) (uint64, <-chan NetRpcResp, error) {
rid := atomic.AddUint64(&n.reqCount, 1)
respCh := make(chan NetRpcResp, 1) // todo pool?
n.respLk.Lock()
if n.respMap == nil {
n.respLk.Unlock()
return 0, nil, xerrors.Errorf("netstore closed")
}
n.respMap[rid] = respCh
n.respLk.Unlock()
req := NetRpcReq{
Type: rt,
ID: rid,
Cid: cids,
Data: data,
}
var rbuf bytes.Buffer // todo buffer pool
if err := req.MarshalCBOR(&rbuf); err != nil {
n.respLk.Lock()
defer n.respLk.Unlock()
if n.respMap == nil {
return 0, nil, xerrors.Errorf("netstore closed")
}
delete(n.respMap, rid)
return 0, nil, err
}
if err := n.msgStream.WriteMsg(rbuf.Bytes()); err != nil {
n.respLk.Lock()
defer n.respLk.Unlock()
if n.respMap == nil {
return 0, nil, xerrors.Errorf("netstore closed")
}
delete(n.respMap, rid)
return 0, nil, err
}
return rid, respCh, nil
}
func (n *NetworkStore) waitResp(ctx context.Context, rch <-chan NetRpcResp, rid uint64) (NetRpcResp, error) {
select {
case resp := <-rch:
if resp.Type == NRpcErr {
var e NetRpcErr
if err := e.UnmarshalCBOR(bytes.NewReader(resp.Data)); err != nil {
return NetRpcResp{}, xerrors.Errorf("unmarshaling error data: %w", err)
}
var err error
switch e.Type {
case NRpcErrNotFound:
if e.Cid != nil {
err = ipld.ErrNotFound{
Cid: *e.Cid,
}
} else {
err = xerrors.Errorf("block not found, but cid was null")
}
case NRpcErrGeneric:
err = xerrors.Errorf("generic error")
default:
err = xerrors.Errorf("unknown error type")
}
return NetRpcResp{}, xerrors.Errorf("netstore error response: %s (%w)", e.Msg, err)
}
return resp, nil
case <-ctx.Done():
// todo send cancel req
n.respLk.Lock()
if n.respMap != nil {
delete(n.respMap, rid)
}
n.respLk.Unlock()
return NetRpcResp{}, ctx.Err()
}
}
func (n *NetworkStore) Has(ctx context.Context, c cid.Cid) (bool, error) {
req, rch, err := n.sendRpc(NRpcHas, []cid.Cid{c}, nil)
if err != nil {
return false, err
}
resp, err := n.waitResp(ctx, rch, req)
if err != nil {
return false, err
}
if len(resp.Data) != 1 {
return false, xerrors.Errorf("expected reposnse length to be 1 byte")
}
switch resp.Data[0] {
case cbg.CborBoolTrue[0]:
return true, nil
case cbg.CborBoolFalse[0]:
return false, nil
default:
return false, xerrors.Errorf("has: bad response: %x", resp.Data[0])
}
}
func (n *NetworkStore) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) {
req, rch, err := n.sendRpc(NRpcGet, []cid.Cid{c}, nil)
if err != nil {
return nil, err
}
resp, err := n.waitResp(ctx, rch, req)
if err != nil {
return nil, err
}
return blocks.NewBlockWithCid(resp.Data, c)
}
func (n *NetworkStore) View(ctx context.Context, c cid.Cid, callback func([]byte) error) error {
req, rch, err := n.sendRpc(NRpcGet, []cid.Cid{c}, nil)
if err != nil {
return err
}
resp, err := n.waitResp(ctx, rch, req)
if err != nil {
return err
}
return callback(resp.Data) // todo return buf to pool
}
func (n *NetworkStore) GetSize(ctx context.Context, c cid.Cid) (int, error) {
req, rch, err := n.sendRpc(NRpcGetSize, []cid.Cid{c}, nil)
if err != nil {
return 0, err
}
resp, err := n.waitResp(ctx, rch, req)
if err != nil {
return 0, err
}
if len(resp.Data) != 4 {
return 0, xerrors.Errorf("expected getsize response to be 4 bytes, was %d", resp.Data)
}
return int(binary.LittleEndian.Uint32(resp.Data)), nil
}
func (n *NetworkStore) Put(ctx context.Context, block blocks.Block) error {
return n.PutMany(ctx, []blocks.Block{block})
}
func (n *NetworkStore) PutMany(ctx context.Context, blocks []blocks.Block) error {
// todo pool
cids := make([]cid.Cid, len(blocks))
blkDatas := make([][]byte, len(blocks))
for i, block := range blocks {
cids[i] = block.Cid()
blkDatas[i] = block.RawData()
}
req, rch, err := n.sendRpc(NRpcPut, cids, blkDatas)
if err != nil {
return err
}
_, err = n.waitResp(ctx, rch, req)
if err != nil {
return err
}
return nil
}
func (n *NetworkStore) DeleteBlock(ctx context.Context, c cid.Cid) error {
return n.DeleteMany(ctx, []cid.Cid{c})
}
func (n *NetworkStore) DeleteMany(ctx context.Context, cids []cid.Cid) error {
req, rch, err := n.sendRpc(NRpcDelete, cids, nil)
if err != nil {
return err
}
_, err = n.waitResp(ctx, rch, req)
if err != nil {
return err
}
return nil
}
func (n *NetworkStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return nil, xerrors.Errorf("not supported")
}
func (n *NetworkStore) HashOnRead(enabled bool) {
// todo
return
}
func (n *NetworkStore) Stop(ctx context.Context) error {
close(n.closing)
select {
case <-n.closed:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
var _ Blockstore = &NetworkStore{}

237
blockstore/net_serve.go Normal file
View File

@ -0,0 +1,237 @@
package blockstore
import (
"bytes"
"context"
"encoding/binary"
block "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
"github.com/libp2p/go-msgio"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"
)
type NetworkStoreHandler struct {
msgStream msgio.ReadWriteCloser
bs Blockstore
}
// NOTE: This code isn't yet hardened to accept untrusted input. See TODOs here and in net.go
func HandleNetBstoreStream(ctx context.Context, bs Blockstore, mss msgio.ReadWriteCloser) *NetworkStoreHandler {
ns := &NetworkStoreHandler{
msgStream: mss,
bs: bs,
}
go ns.handle(ctx)
return ns
}
func (h *NetworkStoreHandler) handle(ctx context.Context) {
defer func() {
if err := h.msgStream.Close(); err != nil {
log.Errorw("error closing blockstore stream", "error", err)
}
}()
for {
var req NetRpcReq
ms, err := h.msgStream.ReadMsg()
if err != nil {
log.Warnw("bstore stream err", "error", err)
return
}
if err := req.UnmarshalCBOR(bytes.NewReader(ms)); err != nil {
return
}
h.msgStream.ReleaseMsg(ms)
switch req.Type {
case NRpcHas:
if len(req.Cid) != 1 {
if err := h.respondError(req.ID, xerrors.New("expected request for 1 cid"), cid.Undef); err != nil {
log.Warnw("writing error response", "error", err)
return
}
continue
}
res, err := h.bs.Has(ctx, req.Cid[0])
if err != nil {
if err := h.respondError(req.ID, err, req.Cid[0]); err != nil {
log.Warnw("writing error response", "error", err)
return
}
continue
}
var resData [1]byte
if res {
resData[0] = cbg.CborBoolTrue[0]
} else {
resData[0] = cbg.CborBoolFalse[0]
}
if err := h.respond(req.ID, NRpcOK, resData[:]); err != nil {
log.Warnw("writing response", "error", err)
return
}
case NRpcGet:
if len(req.Cid) != 1 {
if err := h.respondError(req.ID, xerrors.New("expected request for 1 cid"), cid.Undef); err != nil {
log.Warnw("writing error response", "error", err)
return
}
continue
}
err := h.bs.View(ctx, req.Cid[0], func(bdata []byte) error {
return h.respond(req.ID, NRpcOK, bdata)
})
if err != nil {
if err := h.respondError(req.ID, err, req.Cid[0]); err != nil {
log.Warnw("writing error response", "error", err)
return
}
continue
}
case NRpcGetSize:
if len(req.Cid) != 1 {
if err := h.respondError(req.ID, xerrors.New("expected request for 1 cid"), cid.Undef); err != nil {
log.Warnw("writing error response", "error", err)
return
}
continue
}
sz, err := h.bs.GetSize(ctx, req.Cid[0])
if err != nil {
if err := h.respondError(req.ID, err, req.Cid[0]); err != nil {
log.Warnw("writing error response", "error", err)
return
}
continue
}
var resData [4]byte
binary.LittleEndian.PutUint32(resData[:], uint32(sz))
if err := h.respond(req.ID, NRpcOK, resData[:]); err != nil {
log.Warnw("writing response", "error", err)
return
}
case NRpcPut:
blocks := make([]block.Block, len(req.Cid))
if len(req.Cid) != len(req.Data) {
if err := h.respondError(req.ID, xerrors.New("cid count didn't match data count"), cid.Undef); err != nil {
log.Warnw("writing error response", "error", err)
}
return
}
for i := range req.Cid {
blocks[i], err = block.NewBlockWithCid(req.Data[i], req.Cid[i])
if err != nil {
log.Warnw("make block", "error", err)
return
}
}
err := h.bs.PutMany(ctx, blocks)
if err != nil {
if err := h.respondError(req.ID, err, cid.Undef); err != nil {
log.Warnw("writing error response", "error", err)
return
}
continue
}
if err := h.respond(req.ID, NRpcOK, []byte{}); err != nil {
log.Warnw("writing response", "error", err)
return
}
case NRpcDelete:
err := h.bs.DeleteMany(ctx, req.Cid)
if err != nil {
if err := h.respondError(req.ID, err, cid.Undef); err != nil {
log.Warnw("writing error response", "error", err)
return
}
continue
}
if err := h.respond(req.ID, NRpcOK, []byte{}); err != nil {
log.Warnw("writing response", "error", err)
return
}
default:
if err := h.respondError(req.ID, xerrors.New("unsupported request type"), cid.Undef); err != nil {
log.Warnw("writing error response", "error", err)
return
}
continue
}
}
}
func (h *NetworkStoreHandler) respondError(req uint64, uerr error, c cid.Cid) error {
var resp NetRpcResp
resp.ID = req
resp.Type = NRpcErr
nerr := NetRpcErr{
Type: NRpcErrGeneric,
Msg: uerr.Error(),
}
if ipld.IsNotFound(uerr) {
nerr.Type = NRpcErrNotFound
nerr.Cid = &c
}
var edata bytes.Buffer
if err := nerr.MarshalCBOR(&edata); err != nil {
return xerrors.Errorf("marshaling error data: %w", err)
}
resp.Data = edata.Bytes()
var msg bytes.Buffer
if err := resp.MarshalCBOR(&msg); err != nil {
return xerrors.Errorf("marshaling error response: %w", err)
}
if err := h.msgStream.WriteMsg(msg.Bytes()); err != nil {
return xerrors.Errorf("write error response: %w", err)
}
return nil
}
func (h *NetworkStoreHandler) respond(req uint64, rt NetRPCRespType, data []byte) error {
var resp NetRpcResp
resp.ID = req
resp.Type = rt
resp.Data = data
var msg bytes.Buffer
if err := resp.MarshalCBOR(&msg); err != nil {
return xerrors.Errorf("marshaling response: %w", err)
}
if err := h.msgStream.WriteMsg(msg.Bytes()); err != nil {
return xerrors.Errorf("write response: %w", err)
}
return nil
}

63
blockstore/net_test.go Normal file
View File

@ -0,0 +1,63 @@
package blockstore
import (
"context"
"fmt"
"io"
"testing"
block "github.com/ipfs/go-block-format"
ipld "github.com/ipfs/go-ipld-format"
"github.com/libp2p/go-msgio"
"github.com/stretchr/testify/require"
)
func TestNetBstore(t *testing.T) {
ctx := context.Background()
cr, sw := io.Pipe()
sr, cw := io.Pipe()
cm := msgio.Combine(msgio.NewWriter(cw), msgio.NewReader(cr))
sm := msgio.Combine(msgio.NewWriter(sw), msgio.NewReader(sr))
bbs := NewMemorySync()
_ = HandleNetBstoreStream(ctx, bbs, sm)
nbs := NewNetworkStore(cm)
tb1 := block.NewBlock([]byte("aoeu"))
h, err := nbs.Has(ctx, tb1.Cid())
require.NoError(t, err)
require.False(t, h)
err = nbs.Put(ctx, tb1)
require.NoError(t, err)
h, err = nbs.Has(ctx, tb1.Cid())
require.NoError(t, err)
require.True(t, h)
sz, err := nbs.GetSize(ctx, tb1.Cid())
require.NoError(t, err)
require.Equal(t, 4, sz)
err = nbs.DeleteBlock(ctx, tb1.Cid())
require.NoError(t, err)
h, err = nbs.Has(ctx, tb1.Cid())
require.NoError(t, err)
require.False(t, h)
_, err = nbs.Get(ctx, tb1.Cid())
fmt.Println(err)
require.True(t, ipld.IsNotFound(err))
err = nbs.Put(ctx, tb1)
require.NoError(t, err)
b, err := nbs.Get(ctx, tb1.Cid())
require.NoError(t, err)
require.Equal(t, "aoeu", string(b.RawData()))
}

100
blockstore/net_ws.go Normal file
View File

@ -0,0 +1,100 @@
package blockstore
import (
"bytes"
"context"
"github.com/gorilla/websocket"
"github.com/libp2p/go-msgio"
"golang.org/x/xerrors"
)
type wsWrapper struct {
wc *websocket.Conn
nextMsg []byte
}
func (w *wsWrapper) Read(b []byte) (int, error) {
return 0, xerrors.New("read unsupported")
}
func (w *wsWrapper) ReadMsg() ([]byte, error) {
if w.nextMsg != nil {
nm := w.nextMsg
w.nextMsg = nil
return nm, nil
}
mt, r, err := w.wc.NextReader()
if err != nil {
return nil, err
}
switch mt {
case websocket.BinaryMessage, websocket.TextMessage:
default:
return nil, xerrors.Errorf("unexpected message type")
}
// todo pool
// todo limit sizes
var mbuf bytes.Buffer
if _, err := mbuf.ReadFrom(r); err != nil {
return nil, err
}
return mbuf.Bytes(), nil
}
func (w *wsWrapper) ReleaseMsg(bytes []byte) {
// todo use a pool
}
func (w *wsWrapper) NextMsgLen() (int, error) {
if w.nextMsg != nil {
return len(w.nextMsg), nil
}
mt, msg, err := w.wc.ReadMessage()
if err != nil {
return 0, err
}
switch mt {
case websocket.BinaryMessage, websocket.TextMessage:
default:
return 0, xerrors.Errorf("unexpected message type")
}
w.nextMsg = msg
return len(w.nextMsg), nil
}
func (w *wsWrapper) Write(bytes []byte) (int, error) {
return 0, xerrors.New("write unsupported")
}
func (w *wsWrapper) WriteMsg(bytes []byte) error {
return w.wc.WriteMessage(websocket.BinaryMessage, bytes)
}
func (w *wsWrapper) Close() error {
return w.wc.Close()
}
var _ msgio.ReadWriteCloser = &wsWrapper{}
func wsConnToMio(wc *websocket.Conn) msgio.ReadWriteCloser {
return &wsWrapper{
wc: wc,
}
}
func HandleNetBstoreWS(ctx context.Context, bs Blockstore, wc *websocket.Conn) *NetworkStoreHandler {
return HandleNetBstoreStream(ctx, bs, wsConnToMio(wc))
}
func NewNetworkStoreWS(wc *websocket.Conn) *NetworkStore {
return NewNetworkStore(wsConnToMio(wc))
}

Binary file not shown.

View File

@ -3,14 +3,9 @@ package cli
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"os"
"path"
"sort"
"strings"
"time"
@ -29,8 +24,6 @@ import (
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse"
textselector "github.com/ipld/go-ipld-selector-text-lite"
"github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
@ -40,6 +33,7 @@ import (
lapi "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
cliutil "github.com/filecoin-project/lotus/cli/util"
"github.com/filecoin-project/lotus/markets/utils"
"github.com/filecoin-project/lotus/node/repo"
)
@ -337,60 +331,6 @@ Examples:
},
}
func ClientExportStream(apiAddr string, apiAuth http.Header, eref lapi.ExportRef, car bool) (io.ReadCloser, error) {
rj, err := json.Marshal(eref)
if err != nil {
return nil, xerrors.Errorf("marshaling export ref: %w", err)
}
ma, err := multiaddr.NewMultiaddr(apiAddr)
if err == nil {
_, addr, err := manet.DialArgs(ma)
if err != nil {
return nil, err
}
// todo: make cliutil helpers for this
apiAddr = "http://" + addr
}
aa, err := url.Parse(apiAddr)
if err != nil {
return nil, xerrors.Errorf("parsing api address: %w", err)
}
switch aa.Scheme {
case "ws":
aa.Scheme = "http"
case "wss":
aa.Scheme = "https"
}
aa.Path = path.Join(aa.Path, "rest/v0/export")
req, err := http.NewRequest("GET", fmt.Sprintf("%s?car=%t&export=%s", aa, car, url.QueryEscape(string(rj))), nil)
if err != nil {
return nil, err
}
req.Header = apiAuth
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
em, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, xerrors.Errorf("reading error body: %w", err)
}
resp.Body.Close() // nolint
return nil, xerrors.Errorf("getting root car: http %d: %s", resp.StatusCode, string(em))
}
return resp.Body, nil
}
var clientRetrieveCatCmd = &cli.Command{
Name: "cat",
Usage: "Show data from network",
@ -440,7 +380,7 @@ var clientRetrieveCatCmd = &cli.Command{
eref.DAGs = append(eref.DAGs, lapi.DagSpec{DataSelector: &sel})
}
rc, err := ClientExportStream(ainfo.Addr, ainfo.AuthHeader(), *eref, false)
rc, err := cliutil.ClientExportStream(ainfo.Addr, ainfo.AuthHeader(), *eref, false)
if err != nil {
return err
}
@ -528,7 +468,7 @@ var clientRetrieveLsCmd = &cli.Command{
DataSelector: &dataSelector,
})
rc, err := ClientExportStream(ainfo.Addr, ainfo.AuthHeader(), *eref, true)
rc, err := cliutil.ClientExportStream(ainfo.Addr, ainfo.AuthHeader(), *eref, true)
if err != nil {
return xerrors.Errorf("export: %w", err)
}
@ -583,6 +523,7 @@ var clientRetrieveLsCmd = &cli.Command{
dserv,
roots[0],
sel,
nil,
func(p traversal.Progress, n ipld.Node, r traversal.VisitReason) error {
if r == traversal.VisitReason_SelectionMatch {
fmt.Println(p.Path)

View File

@ -46,6 +46,7 @@ import (
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
cliutil "github.com/filecoin-project/lotus/cli/util"
)
var StateCmd = &cli.Command{
@ -230,7 +231,7 @@ var StateMinerInfo = &cli.Command{
return xerrors.Errorf("getting miner info: %w", err)
}
fmt.Printf("Proving Period Start:\t%s\n", EpochTime(cd.CurrentEpoch, cd.PeriodStart))
fmt.Printf("Proving Period Start:\t%s\n", cliutil.EpochTime(cd.CurrentEpoch, cd.PeriodStart))
return nil
},
@ -1816,8 +1817,8 @@ var StateSectorCmd = &cli.Command{
}
fmt.Println("DealIDs: ", si.DealIDs)
fmt.Println()
fmt.Println("Activation: ", EpochTimeTs(ts.Height(), si.Activation, ts))
fmt.Println("Expiration: ", EpochTimeTs(ts.Height(), si.Expiration, ts))
fmt.Println("Activation: ", cliutil.EpochTimeTs(ts.Height(), si.Activation, ts))
fmt.Println("Expiration: ", cliutil.EpochTimeTs(ts.Height(), si.Expiration, ts))
fmt.Println()
fmt.Println("DealWeight: ", si.DealWeight)
fmt.Println("VerifiedDealWeight: ", si.VerifiedDealWeight)

View File

@ -2,19 +2,13 @@ package cli
import (
"context"
"fmt"
"os"
"time"
"github.com/fatih/color"
"github.com/hako/durafmt"
"github.com/ipfs/go-cid"
"github.com/mattn/go-isatty"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api/v0api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types"
)
@ -43,36 +37,3 @@ func parseTipSet(ctx context.Context, api v0api.FullNode, vals []string) (*types
return types.NewTipSet(headers)
}
func EpochTime(curr, e abi.ChainEpoch) string {
switch {
case curr > e:
return fmt.Sprintf("%d (%s ago)", e, durafmt.Parse(time.Second*time.Duration(int64(build.BlockDelaySecs)*int64(curr-e))).LimitFirstN(2))
case curr == e:
return fmt.Sprintf("%d (now)", e)
case curr < e:
return fmt.Sprintf("%d (in %s)", e, durafmt.Parse(time.Second*time.Duration(int64(build.BlockDelaySecs)*int64(e-curr))).LimitFirstN(2))
}
panic("math broke")
}
// EpochTimeTs is like EpochTime, but also outputs absolute time. `ts` is only
// used to provide a timestamp at some epoch to calculate time from. It can be
// a genesis tipset.
//
// Example output: `1944975 (01 Jul 22 08:07 CEST, 10 hours 29 minutes ago)`
func EpochTimeTs(curr, e abi.ChainEpoch, ts *types.TipSet) string {
timeStr := time.Unix(int64(ts.MinTimestamp()+(uint64(e-ts.Height())*build.BlockDelaySecs)), 0).Format(time.RFC822)
switch {
case curr > e:
return fmt.Sprintf("%d (%s, %s ago)", e, timeStr, durafmt.Parse(time.Second*time.Duration(int64(build.BlockDelaySecs)*int64(curr-e))).LimitFirstN(2))
case curr == e:
return fmt.Sprintf("%d (%s, now)", e, timeStr)
case curr < e:
return fmt.Sprintf("%d (%s, in %s)", e, timeStr, durafmt.Parse(time.Second*time.Duration(int64(build.BlockDelaySecs)*int64(e-curr))).LimitFirstN(2))
}
panic("math broke")
}

46
cli/util/epoch.go Normal file
View File

@ -0,0 +1,46 @@
package cliutil
import (
"fmt"
"time"
"github.com/hako/durafmt"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types"
)
func EpochTime(curr, e abi.ChainEpoch) string {
switch {
case curr > e:
return fmt.Sprintf("%d (%s ago)", e, durafmt.Parse(time.Second*time.Duration(int64(build.BlockDelaySecs)*int64(curr-e))).LimitFirstN(2))
case curr == e:
return fmt.Sprintf("%d (now)", e)
case curr < e:
return fmt.Sprintf("%d (in %s)", e, durafmt.Parse(time.Second*time.Duration(int64(build.BlockDelaySecs)*int64(e-curr))).LimitFirstN(2))
}
panic("math broke")
}
// EpochTimeTs is like EpochTime, but also outputs absolute time. `ts` is only
// used to provide a timestamp at some epoch to calculate time from. It can be
// a genesis tipset.
//
// Example output: `1944975 (01 Jul 22 08:07 CEST, 10 hours 29 minutes ago)`
func EpochTimeTs(curr, e abi.ChainEpoch, ts *types.TipSet) string {
timeStr := time.Unix(int64(ts.MinTimestamp()+(uint64(e-ts.Height())*build.BlockDelaySecs)), 0).Format(time.RFC822)
switch {
case curr > e:
return fmt.Sprintf("%d (%s, %s ago)", e, timeStr, durafmt.Parse(time.Second*time.Duration(int64(build.BlockDelaySecs)*int64(curr-e))).LimitFirstN(2))
case curr == e:
return fmt.Sprintf("%d (%s, now)", e, timeStr)
case curr < e:
return fmt.Sprintf("%d (%s, in %s)", e, timeStr, durafmt.Parse(time.Second*time.Duration(int64(build.BlockDelaySecs)*int64(e-curr))).LimitFirstN(2))
}
panic("math broke")
}

78
cli/util/retrieval.go Normal file
View File

@ -0,0 +1,78 @@
package cliutil
import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"path"
"github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api"
)
func ApiAddrToUrl(apiAddr string) (*url.URL, error) {
ma, err := multiaddr.NewMultiaddr(apiAddr)
if err == nil {
_, addr, err := manet.DialArgs(ma)
if err != nil {
return nil, err
}
// todo: make cliutil helpers for this
apiAddr = "http://" + addr
}
aa, err := url.Parse(apiAddr)
if err != nil {
return nil, xerrors.Errorf("parsing api address: %w", err)
}
switch aa.Scheme {
case "ws":
aa.Scheme = "http"
case "wss":
aa.Scheme = "https"
}
return aa, nil
}
func ClientExportStream(apiAddr string, apiAuth http.Header, eref api.ExportRef, car bool) (io.ReadCloser, error) {
rj, err := json.Marshal(eref)
if err != nil {
return nil, xerrors.Errorf("marshaling export ref: %w", err)
}
aa, err := ApiAddrToUrl(apiAddr)
if err != nil {
return nil, err
}
aa.Path = path.Join(aa.Path, "rest/v0/export")
req, err := http.NewRequest("GET", fmt.Sprintf("%s?car=%t&export=%s", aa, car, url.QueryEscape(string(rj))), nil)
if err != nil {
return nil, err
}
req.Header = apiAuth
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
em, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, xerrors.Errorf("reading error body: %w", err)
}
resp.Body.Close() // nolint
return nil, xerrors.Errorf("getting root car: http %d: %s", resp.StatusCode, string(em))
}
return resp.Body, nil
}

View File

@ -33,6 +33,7 @@ import (
"github.com/filecoin-project/lotus/chain/actors/builtin/reward"
"github.com/filecoin-project/lotus/chain/types"
lcli "github.com/filecoin-project/lotus/cli"
cliutil "github.com/filecoin-project/lotus/cli/util"
"github.com/filecoin-project/lotus/journal/alerting"
sealing "github.com/filecoin-project/lotus/storage/pipeline"
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
@ -664,7 +665,7 @@ func producedBlocks(ctx context.Context, count int, maddr address.Address, napi
fmt.Printf("%8d | %s | %s\n", ts.Height(), bh.Cid(), types.FIL(minerReward))
count--
} else if tty && bh.Height%120 == 0 {
_, _ = fmt.Fprintf(os.Stderr, "\r\x1b[0KChecking epoch %s", lcli.EpochTime(head.Height(), bh.Height))
_, _ = fmt.Fprintf(os.Stderr, "\r\x1b[0KChecking epoch %s", cliutil.EpochTime(head.Height(), bh.Height))
}
}
tsk = ts.Parents()

View File

@ -49,6 +49,7 @@ import (
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/journal/fsjournal"
storageminer "github.com/filecoin-project/lotus/miner"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo"
@ -218,7 +219,7 @@ var initCmd = &cli.Command{
return err
}
var localPaths []paths.LocalPath
var localPaths []storiface.LocalPath
if pssb := cctx.StringSlice("pre-sealed-sectors"); len(pssb) != 0 {
log.Infof("Setting up storage config with presealed sectors: %v", pssb)
@ -228,14 +229,14 @@ var initCmd = &cli.Command{
if err != nil {
return err
}
localPaths = append(localPaths, paths.LocalPath{
localPaths = append(localPaths, storiface.LocalPath{
Path: psp,
})
}
}
if !cctx.Bool("no-local-storage") {
b, err := json.MarshalIndent(&paths.LocalStorageMeta{
b, err := json.MarshalIndent(&storiface.LocalStorageMeta{
ID: storiface.ID(uuid.New().String()),
Weight: 10,
CanSeal: true,
@ -249,12 +250,12 @@ var initCmd = &cli.Command{
return xerrors.Errorf("persisting storage metadata (%s): %w", filepath.Join(lr.Path(), "sectorstore.json"), err)
}
localPaths = append(localPaths, paths.LocalPath{
localPaths = append(localPaths, storiface.LocalPath{
Path: lr.Path(),
})
}
if err := lr.SetStorage(func(sc *paths.StorageConfig) {
if err := lr.SetStorage(func(sc *storiface.StorageConfig) {
sc.StoragePaths = append(sc.StoragePaths, localPaths...)
}); err != nil {
return xerrors.Errorf("set storage config: %w", err)
@ -471,7 +472,7 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api v1api.FullNode
}
stor := paths.NewRemote(lstor, si, http.Header(sa), 10, &paths.DefaultPartialFileHandler{})
smgr, err := sealer.New(ctx, lstor, stor, lr, si, sealer.Config{
smgr, err := sealer.New(ctx, lstor, stor, lr, si, config.SealerConfig{
ParallelFetchLimit: 10,
AllowAddPiece: true,
AllowPreCommit1: true,
@ -481,7 +482,7 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api v1api.FullNode
AllowReplicaUpdate: true,
AllowProveReplicaUpdate2: true,
AllowRegenSectorKey: true,
}, wsts, smsts)
}, config.ProvingConfig{}, wsts, smsts)
if err != nil {
return err
}

View File

@ -27,7 +27,7 @@ import (
"github.com/filecoin-project/lotus/lib/backupds"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/lotus/storage/paths"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
var restoreCmd = &cli.Command{
@ -52,7 +52,7 @@ var restoreCmd = &cli.Command{
ctx := lcli.ReqContext(cctx)
log.Info("Initializing lotus miner using a backup")
var storageCfg *paths.StorageConfig
var storageCfg *storiface.StorageConfig
if cctx.IsSet("storage-config") {
cf, err := homedir.Expand(cctx.String("storage-config"))
if err != nil {
@ -64,7 +64,7 @@ var restoreCmd = &cli.Command{
return xerrors.Errorf("reading storage config: %w", err)
}
storageCfg = &paths.StorageConfig{}
storageCfg = &storiface.StorageConfig{}
err = json.Unmarshal(cfb, storageCfg)
if err != nil {
return xerrors.Errorf("cannot unmarshal json for storage config: %w", err)
@ -95,7 +95,7 @@ var restoreCmd = &cli.Command{
},
}
func restore(ctx context.Context, cctx *cli.Context, targetPath string, strConfig *paths.StorageConfig, manageConfig func(*config.StorageMiner) error, after func(api lapi.FullNode, addr address.Address, peerid peer.ID, mi api.MinerInfo) error) error {
func restore(ctx context.Context, cctx *cli.Context, targetPath string, strConfig *storiface.StorageConfig, manageConfig func(*config.StorageMiner) error, after func(api lapi.FullNode, addr address.Address, peerid peer.ID, mi api.MinerInfo) error) error {
if cctx.NArg() != 1 {
return lcli.IncorrectNumArgs(cctx)
}
@ -214,7 +214,7 @@ func restore(ctx context.Context, cctx *cli.Context, targetPath string, strConfi
if strConfig != nil {
log.Info("Restoring storage path config")
err = lr.SetStorage(func(scfg *paths.StorageConfig) {
err = lr.SetStorage(func(scfg *storiface.StorageConfig) {
*scfg = *strConfig
})
if err != nil {
@ -223,8 +223,8 @@ func restore(ctx context.Context, cctx *cli.Context, targetPath string, strConfi
} else {
log.Warn("--storage-config NOT SET. NO SECTOR PATHS WILL BE CONFIGURED")
// setting empty config to allow miner to be started
if err := lr.SetStorage(func(sc *paths.StorageConfig) {
sc.StoragePaths = append(sc.StoragePaths, paths.LocalPath{})
if err := lr.SetStorage(func(sc *storiface.StorageConfig) {
sc.StoragePaths = append(sc.StoragePaths, storiface.LocalPath{})
}); err != nil {
return xerrors.Errorf("set storage config: %w", err)
}

View File

@ -17,7 +17,7 @@ import (
lcli "github.com/filecoin-project/lotus/cli"
cliutil "github.com/filecoin-project/lotus/cli/util"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/storage/paths"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
const (
@ -78,7 +78,7 @@ var serviceCmd = &cli.Command{
return xerrors.Errorf("please provide Lotus markets repo path via flag %s", FlagMarketsRepo)
}
if err := restore(ctx, cctx, repoPath, &paths.StorageConfig{}, func(cfg *config.StorageMiner) error {
if err := restore(ctx, cctx, repoPath, &storiface.StorageConfig{}, func(cfg *config.StorageMiner) error {
cfg.Subsystems.EnableMarkets = es.Contains(MarketsService)
cfg.Subsystems.EnableMining = false
cfg.Subsystems.EnableSealing = false

View File

@ -26,6 +26,7 @@ import (
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
lcli "github.com/filecoin-project/lotus/cli"
cliutil "github.com/filecoin-project/lotus/cli/util"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
@ -185,18 +186,18 @@ var provingInfoCmd = &cli.Command{
fmt.Printf("Current Epoch: %d\n", cd.CurrentEpoch)
fmt.Printf("Proving Period Boundary: %d\n", cd.PeriodStart%cd.WPoStProvingPeriod)
fmt.Printf("Proving Period Start: %s\n", lcli.EpochTimeTs(cd.CurrentEpoch, cd.PeriodStart, head))
fmt.Printf("Next Period Start: %s\n\n", lcli.EpochTimeTs(cd.CurrentEpoch, cd.PeriodStart+cd.WPoStProvingPeriod, head))
fmt.Printf("Proving Period Start: %s\n", cliutil.EpochTimeTs(cd.CurrentEpoch, cd.PeriodStart, head))
fmt.Printf("Next Period Start: %s\n\n", cliutil.EpochTimeTs(cd.CurrentEpoch, cd.PeriodStart+cd.WPoStProvingPeriod, head))
fmt.Printf("Faults: %d (%.2f%%)\n", faults, faultPerc)
fmt.Printf("Recovering: %d\n", recovering)
fmt.Printf("Deadline Index: %d\n", cd.Index)
fmt.Printf("Deadline Sectors: %d\n", curDeadlineSectors)
fmt.Printf("Deadline Open: %s\n", lcli.EpochTime(cd.CurrentEpoch, cd.Open))
fmt.Printf("Deadline Close: %s\n", lcli.EpochTime(cd.CurrentEpoch, cd.Close))
fmt.Printf("Deadline Challenge: %s\n", lcli.EpochTime(cd.CurrentEpoch, cd.Challenge))
fmt.Printf("Deadline FaultCutoff: %s\n", lcli.EpochTime(cd.CurrentEpoch, cd.FaultCutoff))
fmt.Printf("Deadline Open: %s\n", cliutil.EpochTime(cd.CurrentEpoch, cd.Open))
fmt.Printf("Deadline Close: %s\n", cliutil.EpochTime(cd.CurrentEpoch, cd.Close))
fmt.Printf("Deadline Challenge: %s\n", cliutil.EpochTime(cd.CurrentEpoch, cd.Challenge))
fmt.Printf("Deadline FaultCutoff: %s\n", cliutil.EpochTime(cd.CurrentEpoch, cd.FaultCutoff))
return nil
},
}

View File

@ -32,6 +32,7 @@ import (
"github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/filecoin-project/lotus/chain/types"
lcli "github.com/filecoin-project/lotus/cli"
cliutil "github.com/filecoin-project/lotus/cli/util"
"github.com/filecoin-project/lotus/lib/strle"
"github.com/filecoin-project/lotus/lib/tablewriter"
sealing "github.com/filecoin-project/lotus/storage/pipeline"
@ -485,9 +486,9 @@ var sectorsListCmd = &cli.Command{
if !inSSet {
m["Expiration"] = "n/a"
} else {
m["Expiration"] = lcli.EpochTime(head.Height(), exp)
m["Expiration"] = cliutil.EpochTime(head.Height(), exp)
if st.Early > 0 {
m["RecoveryTimeout"] = color.YellowString(lcli.EpochTime(head.Height(), st.Early))
m["RecoveryTimeout"] = color.YellowString(cliutil.EpochTime(head.Height(), st.Early))
}
}
if inSSet && cctx.Bool("initial-pledge") {
@ -666,10 +667,10 @@ var sectorsCheckExpireCmd = &cli.Command{
"ID": sector.SectorNumber,
"SealProof": sector.SealProof,
"InitialPledge": types.FIL(sector.InitialPledge).Short(),
"Activation": lcli.EpochTime(currEpoch, sector.Activation),
"Expiration": lcli.EpochTime(currEpoch, sector.Expiration),
"MaxExpiration": lcli.EpochTime(currEpoch, MaxExpiration),
"MaxExtendNow": lcli.EpochTime(currEpoch, MaxExtendNow),
"Activation": cliutil.EpochTime(currEpoch, sector.Activation),
"Expiration": cliutil.EpochTime(currEpoch, sector.Expiration),
"MaxExpiration": cliutil.EpochTime(currEpoch, MaxExpiration),
"MaxExtendNow": cliutil.EpochTime(currEpoch, MaxExtendNow),
})
}
@ -1909,7 +1910,7 @@ var sectorsExpiredCmd = &cli.Command{
toRemove = append(toRemove, s)
}
fmt.Printf("%d%s\t%s\t%s\n", s, rmMsg, st.State, lcli.EpochTime(head.Height(), st.Expiration))
fmt.Printf("%d%s\t%s\t%s\n", s, rmMsg, st.State, cliutil.EpochTime(head.Height(), st.Expiration))
return nil
})

View File

@ -29,7 +29,6 @@ import (
"github.com/filecoin-project/lotus/chain/types"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/lib/tablewriter"
"github.com/filecoin-project/lotus/storage/paths"
sealing "github.com/filecoin-project/lotus/storage/pipeline"
"github.com/filecoin-project/lotus/storage/sealer/fsutil"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
@ -148,7 +147,7 @@ over time
}
}
cfg := &paths.LocalStorageMeta{
cfg := &storiface.LocalStorageMeta{
ID: storiface.ID(uuid.New().String()),
Weight: cctx.Uint64("weight"),
CanSeal: cctx.Bool("seal"),

View File

@ -27,7 +27,6 @@ import (
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/wallet/key"
"github.com/filecoin-project/lotus/genesis"
"github.com/filecoin-project/lotus/storage/paths"
"github.com/filecoin-project/lotus/storage/sealer/ffiwrapper"
"github.com/filecoin-project/lotus/storage/sealer/ffiwrapper/basicfs"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
@ -126,7 +125,7 @@ func PreSeal(maddr address.Address, spt abi.RegisteredSealProof, offset abi.Sect
}
{
b, err := json.MarshalIndent(&paths.LocalStorageMeta{
b, err := json.MarshalIndent(&storiface.LocalStorageMeta{
ID: storiface.ID(uuid.New().String()),
Weight: 0, // read-only
CanSeal: false,

View File

@ -447,10 +447,10 @@ var runCmd = &cli.Command{
return err
}
var localPaths []paths.LocalPath
var localPaths []storiface.LocalPath
if !cctx.Bool("no-local-storage") {
b, err := json.MarshalIndent(&paths.LocalStorageMeta{
b, err := json.MarshalIndent(&storiface.LocalStorageMeta{
ID: storiface.ID(uuid.New().String()),
Weight: 10,
CanSeal: true,
@ -464,12 +464,12 @@ var runCmd = &cli.Command{
return xerrors.Errorf("persisting storage metadata (%s): %w", filepath.Join(lr.Path(), "sectorstore.json"), err)
}
localPaths = append(localPaths, paths.LocalPath{
localPaths = append(localPaths, storiface.LocalPath{
Path: lr.Path(),
})
}
if err := lr.SetStorage(func(sc *paths.StorageConfig) {
if err := lr.SetStorage(func(sc *storiface.StorageConfig) {
sc.StoragePaths = append(sc.StoragePaths, localPaths...)
}); err != nil {
return xerrors.Errorf("set storage config: %w", err)

View File

@ -92,8 +92,8 @@ func (w *Worker) StorageAddLocal(ctx context.Context, path string) error {
return xerrors.Errorf("opening local path: %w", err)
}
if err := w.Storage.SetStorage(func(sc *paths.StorageConfig) {
sc.StoragePaths = append(sc.StoragePaths, paths.LocalPath{Path: path})
if err := w.Storage.SetStorage(func(sc *storiface.StorageConfig) {
sc.StoragePaths = append(sc.StoragePaths, storiface.LocalPath{Path: path})
}); err != nil {
return xerrors.Errorf("get storage config: %w", err)
}
@ -127,8 +127,8 @@ func (w *Worker) StorageDetachLocal(ctx context.Context, path string) error {
// drop from the persisted storage.json
var found bool
if err := w.Storage.SetStorage(func(sc *paths.StorageConfig) {
out := make([]paths.LocalPath, 0, len(sc.StoragePaths))
if err := w.Storage.SetStorage(func(sc *storiface.StorageConfig) {
out := make([]storiface.LocalPath, 0, len(sc.StoragePaths))
for _, storagePath := range sc.StoragePaths {
if storagePath.Path != path {
out = append(out, storagePath)

View File

@ -13,7 +13,6 @@ import (
"golang.org/x/xerrors"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/storage/paths"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
@ -103,7 +102,7 @@ var storageAttachCmd = &cli.Command{
}
}
cfg := &paths.LocalStorageMeta{
cfg := &storiface.LocalStorageMeta{
ID: storiface.ID(uuid.New().String()),
Weight: cctx.Uint64("weight"),
CanSeal: cctx.Bool("seal"),

View File

@ -1992,7 +1992,8 @@ Inputs:
"Address": "f01234",
"ID": "12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf",
"PieceCID": null
}
},
"RemoteStore": "00000000-0000-0000-0000-000000000000"
}
]
```

View File

@ -694,7 +694,7 @@
# to use when evaluating tasks against this worker. An empty value defaults
# to "hardware".
#
# type: sealer.ResourceFilteringStrategy
# type: ResourceFilteringStrategy
# env var: LOTUS_STORAGE_RESOURCEFILTERING
#ResourceFiltering = "hardware"

View File

@ -7,6 +7,7 @@ import (
gen "github.com/whyrusleeping/cbor-gen"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/chain/exchange"
"github.com/filecoin-project/lotus/chain/market"
"github.com/filecoin-project/lotus/chain/types"
@ -127,4 +128,13 @@ func main() {
fmt.Println(err)
os.Exit(1)
}
err = gen.WriteTupleEncodersToFile("./blockstore/cbor_gen.go", "blockstore",
blockstore.NetRpcReq{},
blockstore.NetRpcResp{},
blockstore.NetRpcErr{},
)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
}

4
go.mod
View File

@ -63,6 +63,7 @@ require (
github.com/golang/mock v1.6.0
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.7.4
github.com/gorilla/websocket v1.5.0
github.com/hako/durafmt v0.0.0-20200710122514-c0fb7b4da026
github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e
github.com/hashicorp/go-multierror v1.1.1
@ -115,6 +116,7 @@ require (
github.com/libp2p/go-libp2p-record v0.2.0
github.com/libp2p/go-libp2p-routing-helpers v0.2.3
github.com/libp2p/go-maddr-filter v0.1.0
github.com/libp2p/go-msgio v0.2.0
github.com/mattn/go-isatty v0.0.16
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1
github.com/mitchellh/go-homedir v1.1.0
@ -213,7 +215,6 @@ require (
github.com/golang/snappy v0.0.4 // indirect
github.com/google/go-cmp v0.5.8 // indirect
github.com/google/gopacket v1.1.19 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/hannahhoward/cbor-gen-for v0.0.0-20200817222906-ea96cece81f1 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/huin/goupnp v1.0.3 // indirect
@ -254,7 +255,6 @@ require (
github.com/libp2p/go-libp2p-kbucket v0.5.0 // indirect
github.com/libp2p/go-libp2p-noise v0.5.0 // indirect
github.com/libp2p/go-libp2p-tls v0.5.0 // indirect
github.com/libp2p/go-msgio v0.2.0 // indirect
github.com/libp2p/go-nat v0.1.0 // indirect
github.com/libp2p/go-netroute v0.2.0 // indirect
github.com/libp2p/go-openssl v0.1.0 // indirect

View File

@ -0,0 +1,104 @@
package itests
import (
"bytes"
"context"
"fmt"
"io"
"net/url"
"os"
"path"
"testing"
"time"
"github.com/google/uuid"
"github.com/gorilla/websocket"
"github.com/ipld/go-car"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api"
bstore "github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/itests/kit"
)
func TestNetStoreRetrieval(t *testing.T) {
kit.QuietMiningLogs()
blocktime := 5 * time.Millisecond
ctx := context.Background()
full, miner, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ThroughRPC())
ens.InterconnectAll().BeginMining(blocktime)
time.Sleep(5 * time.Second)
// For these tests where the block time is artificially short, just use
// a deal start epoch that is guaranteed to be far enough in the future
// so that the deal starts sealing in time
dealStartEpoch := abi.ChainEpoch(2 << 12)
rseed := 7
dh := kit.NewDealHarness(t, full, miner, miner)
dealCid, res, _ := dh.MakeOnlineDeal(context.Background(), kit.MakeFullDealParams{
Rseed: rseed,
StartEpoch: dealStartEpoch,
UseCARFileForStorageDeal: true,
})
// create deal store
id := uuid.New()
rstore := bstore.NewMemorySync()
au, err := url.Parse(full.ListenURL)
require.NoError(t, err)
switch au.Scheme {
case "http":
au.Scheme = "ws"
case "https":
au.Scheme = "wss"
}
au.Path = path.Join(au.Path, "/rest/v0/store/"+id.String())
conn, _, err := websocket.DefaultDialer.Dial(au.String(), nil)
require.NoError(t, err)
_ = bstore.HandleNetBstoreWS(ctx, rstore, conn)
dh.PerformRetrievalWithOrder(ctx, dealCid, res.Root, false, func(offer api.QueryOffer, address address.Address) api.RetrievalOrder {
order := offer.Order(address)
order.RemoteStore = &id
return order
})
// check blockstore blocks
carv1FilePath, _ := kit.CreateRandomCARv1(t, rseed, 200)
cb, err := os.ReadFile(carv1FilePath)
require.NoError(t, err)
cr, err := car.NewCarReader(bytes.NewReader(cb))
require.NoError(t, err)
var blocks int
for {
cb, err := cr.Next()
if err == io.EOF {
fmt.Println("blocks: ", blocks)
return
}
require.NoError(t, err)
sb, err := rstore.Get(ctx, cb.Cid())
require.NoError(t, err)
require.EqualValues(t, cb.RawData(), sb.RawData())
blocks++
}
}

View File

@ -19,6 +19,7 @@ import (
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/shared_testutil"
"github.com/filecoin-project/go-fil-markets/storagemarket"
@ -308,6 +309,12 @@ func (dh *DealHarness) StartSealingWaiting(ctx context.Context) {
}
func (dh *DealHarness) PerformRetrieval(ctx context.Context, deal *cid.Cid, root cid.Cid, carExport bool, offers ...api.QueryOffer) (path string) {
return dh.PerformRetrievalWithOrder(ctx, deal, root, carExport, func(offer api.QueryOffer, a address.Address) api.RetrievalOrder {
return offer.Order(a)
}, offers...)
}
func (dh *DealHarness) PerformRetrievalWithOrder(ctx context.Context, deal *cid.Cid, root cid.Cid, carExport bool, makeOrder func(api.QueryOffer, address.Address) api.RetrievalOrder, offers ...api.QueryOffer) (path string) {
var offer api.QueryOffer
if len(offers) == 0 {
// perform retrieval.
@ -331,7 +338,9 @@ func (dh *DealHarness) PerformRetrieval(ctx context.Context, deal *cid.Cid, root
updates, err := dh.client.ClientGetRetrievalUpdates(updatesCtx)
require.NoError(dh.t, err)
retrievalRes, err := dh.client.ClientRetrieve(ctx, offer.Order(caddr))
order := makeOrder(offer, caddr)
retrievalRes, err := dh.client.ClientRetrieve(ctx, order)
require.NoError(dh.t, err)
consumeEvents:
for {
@ -357,6 +366,11 @@ consumeEvents:
}
cancel()
if order.RemoteStore != nil {
// if we're retrieving into a remote store, skip export
return ""
}
require.NoError(dh.t, dh.client.ClientExport(ctx,
api.ExportRef{
Root: root,

View File

@ -586,11 +586,11 @@ func (n *Ensemble) Start() *Ensemble {
psd := m.PresealDir
noPaths := m.options.noStorage
err := lr.SetStorage(func(sc *paths.StorageConfig) {
err := lr.SetStorage(func(sc *storiface.StorageConfig) {
if noPaths {
sc.StoragePaths = []paths.LocalPath{}
sc.StoragePaths = []storiface.LocalPath{}
}
sc.StoragePaths = append(sc.StoragePaths, paths.LocalPath{Path: psd})
sc.StoragePaths = append(sc.StoragePaths, storiface.LocalPath{Path: psd})
})
require.NoError(n.t, err)
@ -632,7 +632,7 @@ func (n *Ensemble) Start() *Ensemble {
// disable resource filtering so that local worker gets assigned tasks
// regardless of system pressure.
node.Override(new(sectorstorage.Config), func() sectorstorage.Config {
node.Override(new(config.SealerConfig), func() config.SealerConfig {
scfg := config.DefaultStorageMiner()
if noLocal {
@ -645,8 +645,8 @@ func (n *Ensemble) Start() *Ensemble {
scfg.Storage.Assigner = assigner
scfg.Storage.DisallowRemoteFinalize = disallowRemoteFinalize
scfg.Storage.ResourceFiltering = sectorstorage.ResourceFilteringDisabled
return scfg.StorageManager()
scfg.Storage.ResourceFiltering = config.ResourceFilteringDisabled
return scfg.Storage
}),
// upgrades
@ -737,8 +737,8 @@ func (n *Ensemble) Start() *Ensemble {
require.NoError(n.t, err)
if m.options.noStorage {
err := lr.SetStorage(func(sc *paths.StorageConfig) {
sc.StoragePaths = []paths.LocalPath{}
err := lr.SetStorage(func(sc *storiface.StorageConfig) {
sc.StoragePaths = []storiface.LocalPath{}
})
require.NoError(n.t, err)
}

View File

@ -27,6 +27,7 @@ type TestFullNode struct {
// ListenAddr is the address on which an API server is listening, if an
// API server is created for this Node.
ListenAddr multiaddr.Multiaddr
ListenURL string
DefaultKey *key.Key
options nodeOpts

View File

@ -26,7 +26,6 @@ import (
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/wallet/key"
"github.com/filecoin-project/lotus/miner"
"github.com/filecoin-project/lotus/storage/paths"
sealing "github.com/filecoin-project/lotus/storage/pipeline"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
@ -175,7 +174,7 @@ func (tm *TestMiner) FlushSealingBatches(ctx context.Context) {
const metaFile = "sectorstore.json"
func (tm *TestMiner) AddStorage(ctx context.Context, t *testing.T, conf func(*paths.LocalStorageMeta)) storiface.ID {
func (tm *TestMiner) AddStorage(ctx context.Context, t *testing.T, conf func(*storiface.LocalStorageMeta)) storiface.ID {
p := t.TempDir()
if err := os.MkdirAll(p, 0755); err != nil {
@ -189,7 +188,7 @@ func (tm *TestMiner) AddStorage(ctx context.Context, t *testing.T, conf func(*pa
require.NoError(t, err)
}
cfg := &paths.LocalStorageMeta{
cfg := &storiface.LocalStorageMeta{
ID: storiface.ID(uuid.New().String()),
Weight: 10,
CanSeal: false,

View File

@ -15,7 +15,6 @@ import (
"github.com/stretchr/testify/require"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/storage/paths"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
@ -38,7 +37,7 @@ type TestWorker struct {
options nodeOpts
}
func (tm *TestWorker) AddStorage(ctx context.Context, t *testing.T, conf func(*paths.LocalStorageMeta)) storiface.ID {
func (tm *TestWorker) AddStorage(ctx context.Context, t *testing.T, conf func(*storiface.LocalStorageMeta)) storiface.ID {
p := t.TempDir()
if err := os.MkdirAll(p, 0755); err != nil {
@ -52,7 +51,7 @@ func (tm *TestWorker) AddStorage(ctx context.Context, t *testing.T, conf func(*p
require.NoError(t, err)
}
cfg := &paths.LocalStorageMeta{
cfg := &storiface.LocalStorageMeta{
ID: storiface.ID(uuid.New().String()),
Weight: 10,
CanSeal: false,

View File

@ -65,7 +65,7 @@ func fullRpc(t *testing.T, f *TestFullNode) *TestFullNode {
cl, stop, err := client.NewFullNodeRPCV1(context.Background(), "ws://"+srv.Listener.Addr().String()+"/rpc/v1", nil)
require.NoError(t, err)
t.Cleanup(stop)
f.ListenAddr, f.FullNode = maddr, cl
f.ListenAddr, f.ListenURL, f.FullNode = maddr, srv.URL, cl
return f
}

View File

@ -15,7 +15,6 @@ import (
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/itests/kit"
"github.com/filecoin-project/lotus/storage/paths"
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
@ -74,7 +73,7 @@ func TestPathDetachRedeclare(t *testing.T) {
checkSectors(ctx, t, client, miner, 2, 2)
// attach a new path
newId := miner.AddStorage(ctx, t, func(cfg *paths.LocalStorageMeta) {
newId := miner.AddStorage(ctx, t, func(cfg *storiface.LocalStorageMeta) {
cfg.CanStore = true
})
@ -194,7 +193,7 @@ func TestPathDetachRedeclareWorker(t *testing.T) {
checkSectors(ctx, t, client, miner, 2, 2)
// attach a new path
newId := sealw.AddStorage(ctx, t, func(cfg *paths.LocalStorageMeta) {
newId := sealw.AddStorage(ctx, t, func(cfg *storiface.LocalStorageMeta) {
cfg.CanStore = true
})
@ -239,7 +238,7 @@ func TestPathDetachRedeclareWorker(t *testing.T) {
require.Len(t, local, 0)
// add a new one again, and move the sectors there
newId = sealw.AddStorage(ctx, t, func(cfg *paths.LocalStorageMeta) {
newId = sealw.AddStorage(ctx, t, func(cfg *storiface.LocalStorageMeta) {
cfg.CanStore = true
})

View File

@ -10,7 +10,6 @@ import (
"github.com/stretchr/testify/require"
"github.com/filecoin-project/lotus/itests/kit"
"github.com/filecoin-project/lotus/storage/paths"
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
@ -45,7 +44,7 @@ func TestPathTypeFilters(t *testing.T) {
}
runTest(t, "invalid-type-alert", func(t *testing.T, ctx context.Context, miner *kit.TestMiner, run func()) {
slU := miner.AddStorage(ctx, t, func(meta *paths.LocalStorageMeta) {
slU := miner.AddStorage(ctx, t, func(meta *storiface.LocalStorageMeta) {
meta.CanSeal = true
meta.AllowTypes = []string{"unsealed", "seeled"}
})
@ -79,18 +78,18 @@ func TestPathTypeFilters(t *testing.T) {
runTest(t, "seal-to-stor-unseal-allowdeny", func(t *testing.T, ctx context.Context, miner *kit.TestMiner, run func()) {
// allow all types in the sealing path
sealScratch := miner.AddStorage(ctx, t, func(meta *paths.LocalStorageMeta) {
sealScratch := miner.AddStorage(ctx, t, func(meta *storiface.LocalStorageMeta) {
meta.CanSeal = true
})
// unsealed storage
unsStor := miner.AddStorage(ctx, t, func(meta *paths.LocalStorageMeta) {
unsStor := miner.AddStorage(ctx, t, func(meta *storiface.LocalStorageMeta) {
meta.CanStore = true
meta.AllowTypes = []string{"unsealed"}
})
// other storage
sealStor := miner.AddStorage(ctx, t, func(meta *paths.LocalStorageMeta) {
sealStor := miner.AddStorage(ctx, t, func(meta *storiface.LocalStorageMeta) {
meta.CanStore = true
meta.DenyTypes = []string{"unsealed"}
})
@ -115,14 +114,14 @@ func TestPathTypeFilters(t *testing.T) {
runTest(t, "sealstor-unseal-allowdeny", func(t *testing.T, ctx context.Context, miner *kit.TestMiner, run func()) {
// unsealed storage
unsStor := miner.AddStorage(ctx, t, func(meta *paths.LocalStorageMeta) {
unsStor := miner.AddStorage(ctx, t, func(meta *storiface.LocalStorageMeta) {
meta.CanStore = true
meta.CanSeal = true
meta.AllowTypes = []string{"unsealed"}
})
// other storage
sealStor := miner.AddStorage(ctx, t, func(meta *paths.LocalStorageMeta) {
sealStor := miner.AddStorage(ctx, t, func(meta *storiface.LocalStorageMeta) {
meta.CanStore = true
meta.CanSeal = true
meta.DenyTypes = []string{"unsealed"}
@ -147,29 +146,29 @@ func TestPathTypeFilters(t *testing.T) {
runTest(t, "seal-store-allseparate", func(t *testing.T, ctx context.Context, miner *kit.TestMiner, run func()) {
// sealing stores
slU := miner.AddStorage(ctx, t, func(meta *paths.LocalStorageMeta) {
slU := miner.AddStorage(ctx, t, func(meta *storiface.LocalStorageMeta) {
meta.CanSeal = true
meta.AllowTypes = []string{"unsealed"}
})
slS := miner.AddStorage(ctx, t, func(meta *paths.LocalStorageMeta) {
slS := miner.AddStorage(ctx, t, func(meta *storiface.LocalStorageMeta) {
meta.CanSeal = true
meta.AllowTypes = []string{"sealed"}
})
slC := miner.AddStorage(ctx, t, func(meta *paths.LocalStorageMeta) {
slC := miner.AddStorage(ctx, t, func(meta *storiface.LocalStorageMeta) {
meta.CanSeal = true
meta.AllowTypes = []string{"cache"}
})
// storage stores
stU := miner.AddStorage(ctx, t, func(meta *paths.LocalStorageMeta) {
stU := miner.AddStorage(ctx, t, func(meta *storiface.LocalStorageMeta) {
meta.CanStore = true
meta.AllowTypes = []string{"unsealed"}
})
stS := miner.AddStorage(ctx, t, func(meta *paths.LocalStorageMeta) {
stS := miner.AddStorage(ctx, t, func(meta *storiface.LocalStorageMeta) {
meta.CanStore = true
meta.AllowTypes = []string{"sealed"}
})
stC := miner.AddStorage(ctx, t, func(meta *paths.LocalStorageMeta) {
stC := miner.AddStorage(ctx, t, func(meta *storiface.LocalStorageMeta) {
meta.CanStore = true
meta.AllowTypes = []string{"cache"}
})

View File

@ -11,7 +11,7 @@ import (
"github.com/filecoin-project/lotus/itests/kit"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/storage/paths"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
func TestDealsWithFinalizeEarly(t *testing.T) {
@ -36,11 +36,11 @@ func TestDealsWithFinalizeEarly(t *testing.T) {
ctx := context.Background()
miner.AddStorage(ctx, t, func(meta *paths.LocalStorageMeta) {
miner.AddStorage(ctx, t, func(meta *storiface.LocalStorageMeta) {
meta.Weight = 1000000000
meta.CanSeal = true
})
miner.AddStorage(ctx, t, func(meta *paths.LocalStorageMeta) {
miner.AddStorage(ctx, t, func(meta *storiface.LocalStorageMeta) {
meta.Weight = 1000000000
meta.CanStore = true
})

View File

@ -17,7 +17,6 @@ import (
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/impl"
"github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/storage/sealer"
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
"github.com/filecoin-project/lotus/storage/wdpost"
@ -35,10 +34,10 @@ func TestWindowPostNoBuiltinWindow(t *testing.T) {
kit.PresealSectors(sectors), // 2 sectors per partition, 2 partitions in all 48 deadlines
kit.LatestActorsAt(-1),
kit.ConstructorOpts(
node.Override(new(sealer.Config), func() sealer.Config {
c := config.DefaultStorageMiner().StorageManager()
c.DisableBuiltinWindowPoSt = true
return c
node.Override(new(config.ProvingConfig), func() config.ProvingConfig {
c := config.DefaultStorageMiner()
c.Proving.DisableBuiltinWindowPoSt = true
return c.Proving
}),
node.Override(new(*wdpost.WindowPoStScheduler), modules.WindowPostScheduler(
config.DefaultStorageMiner().Fees,
@ -92,10 +91,10 @@ func TestWindowPostNoBuiltinWindowWithWorker(t *testing.T) {
kit.PresealSectors(sectors), // 2 sectors per partition, 2 partitions in all 48 deadlines
kit.LatestActorsAt(-1),
kit.ConstructorOpts(
node.Override(new(sealer.Config), func() sealer.Config {
c := config.DefaultStorageMiner().StorageManager()
c.DisableBuiltinWindowPoSt = true
return c
node.Override(new(config.ProvingConfig), func() config.ProvingConfig {
c := config.DefaultStorageMiner()
c.Proving.DisableBuiltinWindowPoSt = true
return c.Proving
}),
node.Override(new(*wdpost.WindowPoStScheduler), modules.WindowPostScheduler(
config.DefaultStorageMiner().Fees,

View File

@ -408,10 +408,10 @@ func TestWindowPostWorkerManualPoSt(t *testing.T) {
func TestSchedulerRemoveRequest(t *testing.T) {
ctx := context.Background()
_, miner, worker, ens := kit.EnsembleWorker(t, kit.WithAllSubsystems(), kit.ThroughRPC(), kit.WithNoLocalSealing(true),
_, miner, worker, _ := kit.EnsembleWorker(t, kit.WithAllSubsystems(), kit.ThroughRPC(), kit.WithNoLocalSealing(true),
kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTDataCid, sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTCommit2, sealtasks.TTUnseal})) // no mock proofs
ens.InterconnectAll().BeginMining(50 * time.Millisecond)
//ens.InterconnectAll().BeginMining(50 * time.Millisecond)
e, err := worker.Enabled(ctx)
require.NoError(t, err)

View File

@ -8,8 +8,12 @@ import (
"github.com/ipfs/go-cid"
bstore "github.com/ipfs/go-ipfs-blockstore"
"github.com/ipld/go-car/v2/blockstore"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/lotus/api"
lbstore "github.com/filecoin-project/lotus/blockstore"
)
// ProxyBlockstoreAccessor is an accessor that returns a fixed blockstore.
@ -32,6 +36,85 @@ func (p *ProxyBlockstoreAccessor) Done(_ retrievalmarket.DealID) error {
return nil
}
func NewAPIBlockstoreAdapter(sub retrievalmarket.BlockstoreAccessor) *APIBlockstoreAccessor {
return &APIBlockstoreAccessor{
sub: sub,
retrStores: map[retrievalmarket.DealID]api.RemoteStoreID{},
remoteStores: map[api.RemoteStoreID]bstore.Blockstore{},
}
}
// APIBlockstoreAccessor adds support to API-specified remote blockstores
type APIBlockstoreAccessor struct {
sub retrievalmarket.BlockstoreAccessor
retrStores map[retrievalmarket.DealID]api.RemoteStoreID
remoteStores map[api.RemoteStoreID]bstore.Blockstore
accessLk sync.Mutex
}
func (a *APIBlockstoreAccessor) Get(id retrievalmarket.DealID, payloadCID retrievalmarket.PayloadCID) (bstore.Blockstore, error) {
a.accessLk.Lock()
defer a.accessLk.Unlock()
as, has := a.retrStores[id]
if !has {
return a.sub.Get(id, payloadCID)
}
return a.remoteStores[as], nil
}
func (a *APIBlockstoreAccessor) Done(id retrievalmarket.DealID) error {
a.accessLk.Lock()
defer a.accessLk.Unlock()
if _, has := a.retrStores[id]; has {
delete(a.retrStores, id)
return nil
}
return a.sub.Done(id)
}
func (a *APIBlockstoreAccessor) RegisterDealToRetrievalStore(id retrievalmarket.DealID, sid api.RemoteStoreID) error {
a.accessLk.Lock()
defer a.accessLk.Unlock()
if _, has := a.retrStores[id]; has {
return xerrors.Errorf("apistore for deal %d already registered", id)
}
if _, has := a.remoteStores[sid]; !has {
return xerrors.Errorf("remote store not found")
}
a.retrStores[id] = sid
return nil
}
func (a *APIBlockstoreAccessor) RegisterApiStore(sid api.RemoteStoreID, st *lbstore.NetworkStore) error {
a.accessLk.Lock()
defer a.accessLk.Unlock()
if _, has := a.remoteStores[sid]; has {
return xerrors.Errorf("remote store already registered with this uuid")
}
a.remoteStores[sid] = st
st.OnClose(func() {
a.accessLk.Lock()
defer a.accessLk.Unlock()
if _, has := a.remoteStores[sid]; has {
delete(a.remoteStores, sid)
}
})
return nil
}
var _ retrievalmarket.BlockstoreAccessor = &APIBlockstoreAccessor{}
type CARBlockstoreAccessor struct {
rootdir string
lk sync.Mutex

View File

@ -26,6 +26,7 @@ func TraverseDag(
ds mdagipld.DAGService,
startFrom cid.Cid,
optionalSelector ipld.Node,
onOpen func(node mdagipld.Node) error,
visitCallback traversal.AdvVisitFn,
) error {
@ -61,6 +62,12 @@ func TraverseDag(
return nil, err
}
if onOpen != nil {
if err := onOpen(node); err != nil {
return nil, err
}
}
return bytes.NewBuffer(node.RawData()), nil
}
unixfsnode.AddUnixFSReificationToLinkSystem(&linkSystem)

View File

@ -29,6 +29,7 @@ import (
ledgerwallet "github.com/filecoin-project/lotus/chain/wallet/ledger"
"github.com/filecoin-project/lotus/chain/wallet/remotewallet"
"github.com/filecoin-project/lotus/lib/peermgr"
"github.com/filecoin-project/lotus/markets/retrievaladapter"
"github.com/filecoin-project/lotus/markets/storageadapter"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/hello"
@ -129,6 +130,7 @@ var ChainNode = Options(
Override(new(*market.FundManager), market.NewFundManager),
Override(new(dtypes.ClientDatastore), modules.NewClientDatastore),
Override(new(storagemarket.BlockstoreAccessor), modules.StorageBlockstoreAccessor),
Override(new(*retrievaladapter.APIBlockstoreAccessor), retrievaladapter.NewAPIBlockstoreAdapter),
Override(new(storagemarket.StorageClient), modules.StorageClient),
Override(new(storagemarket.StorageClientNode), storageadapter.NewClientNodeAdapter),
Override(HandleMigrateClientFundsKey, modules.HandleMigrateClientFunds),

View File

@ -224,7 +224,8 @@ func ConfigStorageMiner(c interface{}) Option {
Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter(&cfg.Fees, &cfg.Dealmaking)),
),
Override(new(sectorstorage.Config), cfg.StorageManager()),
Override(new(config.SealerConfig), cfg.Storage),
Override(new(config.ProvingConfig), cfg.Proving),
Override(new(*ctladdr.AddressSelector), modules.AddressSelector(&cfg.Addresses)),
)
}

View File

@ -15,7 +15,6 @@ import (
"github.com/filecoin-project/lotus/chain/actors/builtin"
"github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/storage/sealer"
)
const (
@ -162,7 +161,7 @@ func DefaultStorageMiner() *StorageMiner {
Assigner: "utilization",
// By default use the hardware resource filtering strategy.
ResourceFiltering: sealer.ResourceFilteringHardware,
ResourceFiltering: ResourceFilteringHardware,
},
Dealmaking: DealmakingConfig{
@ -274,3 +273,17 @@ func (dur Duration) MarshalText() ([]byte, error) {
d := time.Duration(dur)
return []byte(d.String()), nil
}
// ResourceFilteringStrategy is an enum indicating the kinds of resource
// filtering strategies that can be configured for workers.
type ResourceFilteringStrategy string
const (
// ResourceFilteringHardware specifies that available hardware resources
// should be evaluated when scheduling a task against the worker.
ResourceFilteringHardware = ResourceFilteringStrategy("hardware")
// ResourceFilteringDisabled disables resource filtering against this
// worker. The scheduler may assign any task to this worker.
ResourceFilteringDisabled = ResourceFilteringStrategy("disabled")
)

34
node/config/dep_test.go Normal file
View File

@ -0,0 +1,34 @@
package config
import (
"os"
"os/exec"
"path/filepath"
"runtime"
"strings"
"testing"
)
func goCmd() string {
var exeSuffix string
if runtime.GOOS == "windows" {
exeSuffix = ".exe"
}
path := filepath.Join(runtime.GOROOT(), "bin", "go"+exeSuffix)
if _, err := os.Stat(path); err == nil {
return path
}
return "go"
}
func TestDoesntDependOnFFI(t *testing.T) {
deps, err := exec.Command(goCmd(), "list", "-deps", "github.com/filecoin-project/lotus/node/config").Output()
if err != nil {
t.Fatal(err)
}
for _, pkg := range strings.Fields(string(deps)) {
if pkg == "github.com/filecoin-project/filecoin-ffi" {
t.Fatal("config depends on filecoin-ffi")
}
}
}

View File

@ -894,7 +894,7 @@ If you see stuck Finalize tasks after enabling this setting, check
},
{
Name: "ResourceFiltering",
Type: "sealer.ResourceFilteringStrategy",
Type: "ResourceFilteringStrategy",
Comment: `ResourceFiltering instructs the system which resource filtering strategy
to use when evaluating tasks against this worker. An empty value defaults

View File

@ -8,11 +8,10 @@ import (
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/storage/paths"
"github.com/filecoin-project/lotus/storage/sealer"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
func StorageFromFile(path string, def *paths.StorageConfig) (*paths.StorageConfig, error) {
func StorageFromFile(path string, def *storiface.StorageConfig) (*storiface.StorageConfig, error) {
file, err := os.Open(path)
switch {
case os.IsNotExist(err):
@ -28,8 +27,8 @@ func StorageFromFile(path string, def *paths.StorageConfig) (*paths.StorageConfi
return StorageFromReader(file)
}
func StorageFromReader(reader io.Reader) (*paths.StorageConfig, error) {
var cfg paths.StorageConfig
func StorageFromReader(reader io.Reader) (*storiface.StorageConfig, error) {
var cfg storiface.StorageConfig
err := json.NewDecoder(reader).Decode(&cfg)
if err != nil {
return nil, err
@ -38,7 +37,7 @@ func StorageFromReader(reader io.Reader) (*paths.StorageConfig, error) {
return &cfg, nil
}
func WriteStorageFile(path string, config paths.StorageConfig) error {
func WriteStorageFile(path string, config storiface.StorageConfig) error {
b, err := json.MarshalIndent(config, "", " ")
if err != nil {
return xerrors.Errorf("marshaling storage config: %w", err)
@ -50,28 +49,3 @@ func WriteStorageFile(path string, config paths.StorageConfig) error {
return nil
}
func (c *StorageMiner) StorageManager() sealer.Config {
return sealer.Config{
ParallelFetchLimit: c.Storage.ParallelFetchLimit,
AllowSectorDownload: c.Storage.AllowSectorDownload,
AllowAddPiece: c.Storage.AllowAddPiece,
AllowPreCommit1: c.Storage.AllowPreCommit1,
AllowPreCommit2: c.Storage.AllowPreCommit2,
AllowCommit: c.Storage.AllowCommit,
AllowUnseal: c.Storage.AllowUnseal,
AllowReplicaUpdate: c.Storage.AllowReplicaUpdate,
AllowProveReplicaUpdate2: c.Storage.AllowProveReplicaUpdate2,
AllowRegenSectorKey: c.Storage.AllowRegenSectorKey,
ResourceFiltering: c.Storage.ResourceFiltering,
DisallowRemoteFinalize: c.Storage.DisallowRemoteFinalize,
LocalWorkerName: c.Storage.LocalWorkerName,
Assigner: c.Storage.Assigner,
ParallelCheckLimit: c.Proving.ParallelCheckLimit,
DisableBuiltinWindowPoSt: c.Proving.DisableBuiltinWindowPoSt,
DisableBuiltinWinningPoSt: c.Proving.DisableBuiltinWinningPoSt,
}
}

View File

@ -4,7 +4,6 @@ import (
"github.com/ipfs/go-cid"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/storage/sealer"
)
// // NOTE: ONLY PUT STRUCT DEFINITIONS IN THIS FILE
@ -452,7 +451,7 @@ type SealerConfig struct {
// ResourceFiltering instructs the system which resource filtering strategy
// to use when evaluating tasks against this worker. An empty value defaults
// to "hardware".
ResourceFiltering sealer.ResourceFilteringStrategy
ResourceFiltering ResourceFilteringStrategy
}
type BatchFeeConfig struct {

View File

@ -10,6 +10,7 @@ import (
"os"
"sort"
"strings"
"sync"
"time"
"github.com/ipfs/go-blockservice"
@ -97,6 +98,7 @@ type API struct {
Imports dtypes.ClientImportMgr
StorageBlockstoreAccessor storagemarket.BlockstoreAccessor
RtvlBlockstoreAccessor rm.BlockstoreAccessor
ApiBlockstoreAccessor *retrievaladapter.APIBlockstoreAccessor
DataTransfer dtypes.ClientDataTransfer
Host host.Host
@ -845,6 +847,13 @@ func (a *API) doRetrieval(ctx context.Context, order api.RetrievalOrder, sel dat
}
id := a.Retrieval.NextID()
if order.RemoteStore != nil {
if err := a.ApiBlockstoreAccessor.RegisterDealToRetrievalStore(id, *order.RemoteStore); err != nil {
return 0, xerrors.Errorf("registering api store: %w", err)
}
}
id, err = a.Retrieval.Retrieve(
ctx,
id,
@ -999,6 +1008,8 @@ func (a *API) outputCAR(ctx context.Context, ds format.DAGService, bs bstore.Blo
roots[i] = dag.root
}
var lk sync.Mutex
return dest.doWrite(func(w io.Writer) error {
if err := car.WriteHeader(&car.CarHeader{
@ -1011,13 +1022,29 @@ func (a *API) outputCAR(ctx context.Context, ds format.DAGService, bs bstore.Blo
cs := cid.NewSet()
for _, dagSpec := range dags {
dagSpec := dagSpec
if err := utils.TraverseDag(
ctx,
ds,
root,
dagSpec.selector,
func(node format.Node) error {
// if we're exporting merkle proofs for this dag, export all nodes read by the traversal
if dagSpec.exportAll {
lk.Lock()
defer lk.Unlock()
if cs.Visit(node.Cid()) {
err := util.LdWrite(w, node.Cid().Bytes(), node.RawData())
if err != nil {
return xerrors.Errorf("writing block data: %w", err)
}
}
}
return nil
},
func(p traversal.Progress, n ipld.Node, r traversal.VisitReason) error {
if r == traversal.VisitReason_SelectionMatch {
if !dagSpec.exportAll && r == traversal.VisitReason_SelectionMatch {
var c cid.Cid
if p.LastBlock.Link == nil {
c = root
@ -1082,8 +1109,9 @@ func (a *API) outputUnixFS(ctx context.Context, root cid.Cid, ds format.DAGServi
}
type dagSpec struct {
root cid.Cid
selector ipld.Node
root cid.Cid
selector ipld.Node
exportAll bool
}
func parseDagSpec(ctx context.Context, root cid.Cid, dsp []api.DagSpec, ds format.DAGService, car bool) ([]dagSpec, error) {
@ -1098,6 +1126,7 @@ func parseDagSpec(ctx context.Context, root cid.Cid, dsp []api.DagSpec, ds forma
out := make([]dagSpec, len(dsp))
for i, spec := range dsp {
out[i].exportAll = spec.ExportMerkleProof
if spec.DataSelector == nil {
return nil, xerrors.Errorf("invalid DagSpec at position %d: `DataSelector` can not be nil", i)
@ -1131,6 +1160,7 @@ func parseDagSpec(ctx context.Context, root cid.Cid, dsp []api.DagSpec, ds forma
ds,
root,
rsn,
nil,
func(p traversal.Progress, n ipld.Node, r traversal.VisitReason) error {
if r == traversal.VisitReason_SelectionMatch {
if !car && p.LastBlock.Path.String() != p.Path.String() {

View File

@ -202,9 +202,9 @@ func StorageClient(lc fx.Lifecycle, h host.Host, dataTransfer dtypes.ClientDataT
// RetrievalClient creates a new retrieval client attached to the client blockstore
func RetrievalClient(forceOffChain bool) func(lc fx.Lifecycle, h host.Host, r repo.LockedRepo, dt dtypes.ClientDataTransfer, payAPI payapi.PaychAPI, resolver discovery.PeerResolver,
ds dtypes.MetadataDS, chainAPI full.ChainAPI, stateAPI full.StateAPI, accessor retrievalmarket.BlockstoreAccessor, j journal.Journal) (retrievalmarket.RetrievalClient, error) {
ds dtypes.MetadataDS, chainAPI full.ChainAPI, stateAPI full.StateAPI, accessor *retrievaladapter.APIBlockstoreAccessor, j journal.Journal) (retrievalmarket.RetrievalClient, error) {
return func(lc fx.Lifecycle, h host.Host, r repo.LockedRepo, dt dtypes.ClientDataTransfer, payAPI payapi.PaychAPI, resolver discovery.PeerResolver,
ds dtypes.MetadataDS, chainAPI full.ChainAPI, stateAPI full.StateAPI, accessor retrievalmarket.BlockstoreAccessor, j journal.Journal) (retrievalmarket.RetrievalClient, error) {
ds dtypes.MetadataDS, chainAPI full.ChainAPI, stateAPI full.StateAPI, accessor *retrievaladapter.APIBlockstoreAccessor, j journal.Journal) (retrievalmarket.RetrievalClient, error) {
adapter := retrievaladapter.NewRetrievalClientNode(forceOffChain, payAPI, chainAPI, stateAPI)
network := rmnet.NewFromLibp2pHost(h)
ds = namespace.Wrap(ds, datastore.NewKey("/retrievals/client"))

View File

@ -794,17 +794,17 @@ func LocalStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, ls paths.LocalStorag
return paths.NewLocal(ctx, ls, si, urls)
}
func RemoteStorage(lstor *paths.Local, si paths.SectorIndex, sa sealer.StorageAuth, sc sealer.Config) *paths.Remote {
func RemoteStorage(lstor *paths.Local, si paths.SectorIndex, sa sealer.StorageAuth, sc config.SealerConfig) *paths.Remote {
return paths.NewRemote(lstor, si, http.Header(sa), sc.ParallelFetchLimit, &paths.DefaultPartialFileHandler{})
}
func SectorStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, lstor *paths.Local, stor paths.Store, ls paths.LocalStorage, si paths.SectorIndex, sc sealer.Config, ds dtypes.MetadataDS) (*sealer.Manager, error) {
func SectorStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, lstor *paths.Local, stor paths.Store, ls paths.LocalStorage, si paths.SectorIndex, sc config.SealerConfig, pc config.ProvingConfig, ds dtypes.MetadataDS) (*sealer.Manager, error) {
ctx := helpers.LifecycleCtx(mctx, lc)
wsts := statestore.New(namespace.Wrap(ds, WorkerCallsPrefix))
smsts := statestore.New(namespace.Wrap(ds, ManagerWorkPrefix))
sst, err := sealer.New(ctx, lstor, stor, ls, si, sc, wsts, smsts)
sst, err := sealer.New(ctx, lstor, stor, ls, si, sc, pc, wsts, smsts)
if err != nil {
return nil, err
}

View File

@ -25,8 +25,8 @@ import (
badgerbs "github.com/filecoin-project/lotus/blockstore/badger"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/storage/paths"
"github.com/filecoin-project/lotus/storage/sealer/fsutil"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
const (
@ -572,26 +572,26 @@ func (fsr *fsLockedRepo) SetConfig(c func(interface{})) error {
return nil
}
func (fsr *fsLockedRepo) GetStorage() (paths.StorageConfig, error) {
func (fsr *fsLockedRepo) GetStorage() (storiface.StorageConfig, error) {
fsr.storageLk.Lock()
defer fsr.storageLk.Unlock()
return fsr.getStorage(nil)
}
func (fsr *fsLockedRepo) getStorage(def *paths.StorageConfig) (paths.StorageConfig, error) {
func (fsr *fsLockedRepo) getStorage(def *storiface.StorageConfig) (storiface.StorageConfig, error) {
c, err := config.StorageFromFile(fsr.join(fsStorageConfig), def)
if err != nil {
return paths.StorageConfig{}, err
return storiface.StorageConfig{}, err
}
return *c, nil
}
func (fsr *fsLockedRepo) SetStorage(c func(*paths.StorageConfig)) error {
func (fsr *fsLockedRepo) SetStorage(c func(*storiface.StorageConfig)) error {
fsr.storageLk.Lock()
defer fsr.storageLk.Unlock()
sc, err := fsr.getStorage(&paths.StorageConfig{})
sc, err := fsr.getStorage(&storiface.StorageConfig{})
if err != nil {
return xerrors.Errorf("get storage: %w", err)
}

View File

@ -9,8 +9,8 @@ import (
"github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/storage/paths"
"github.com/filecoin-project/lotus/storage/sealer/fsutil"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
// BlockstoreDomain represents the domain of a blockstore.
@ -73,8 +73,8 @@ type LockedRepo interface {
Config() (interface{}, error)
SetConfig(func(interface{})) error
GetStorage() (paths.StorageConfig, error)
SetStorage(func(*paths.StorageConfig)) error
GetStorage() (storiface.StorageConfig, error)
SetStorage(func(*storiface.StorageConfig)) error
Stat(path string) (fsutil.FsStat, error)
DiskUsage(path string) (int64, error)

View File

@ -18,7 +18,6 @@ import (
"github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/storage/paths"
"github.com/filecoin-project/lotus/storage/sealer/fsutil"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
@ -37,7 +36,7 @@ type MemRepo struct {
keystore map[string]types.KeyInfo
blockstore blockstore.Blockstore
sc *paths.StorageConfig
sc *storiface.StorageConfig
tempDir string
// holds the current config value
@ -59,13 +58,13 @@ func (lmem *lockedMemRepo) RepoType() RepoType {
return lmem.t
}
func (lmem *lockedMemRepo) GetStorage() (paths.StorageConfig, error) {
func (lmem *lockedMemRepo) GetStorage() (storiface.StorageConfig, error) {
if err := lmem.checkToken(); err != nil {
return paths.StorageConfig{}, err
return storiface.StorageConfig{}, err
}
if lmem.mem.sc == nil {
lmem.mem.sc = &paths.StorageConfig{StoragePaths: []paths.LocalPath{
lmem.mem.sc = &storiface.StorageConfig{StoragePaths: []storiface.LocalPath{
{Path: lmem.Path()},
}}
}
@ -73,7 +72,7 @@ func (lmem *lockedMemRepo) GetStorage() (paths.StorageConfig, error) {
return *lmem.mem.sc, nil
}
func (lmem *lockedMemRepo) SetStorage(c func(*paths.StorageConfig)) error {
func (lmem *lockedMemRepo) SetStorage(c func(*storiface.StorageConfig)) error {
if err := lmem.checkToken(); err != nil {
return err
}
@ -126,14 +125,14 @@ func (lmem *lockedMemRepo) Path() string {
}
func (lmem *lockedMemRepo) initSectorStore(t string) {
if err := config.WriteStorageFile(filepath.Join(t, fsStorageConfig), paths.StorageConfig{
StoragePaths: []paths.LocalPath{
if err := config.WriteStorageFile(filepath.Join(t, fsStorageConfig), storiface.StorageConfig{
StoragePaths: []storiface.LocalPath{
{Path: t},
}}); err != nil {
panic(err)
}
b, err := json.MarshalIndent(&paths.LocalStorageMeta{
b, err := json.MarshalIndent(&storiface.LocalStorageMeta{
ID: storiface.ID(uuid.New().String()),
Weight: 10,
CanSeal: true,

View File

@ -3,13 +3,16 @@ package node
import (
"context"
"encoding/json"
"fmt"
"net"
"net/http"
_ "net/http/pprof"
"runtime"
"strconv"
"github.com/google/uuid"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"github.com/multiformats/go-multiaddr"
@ -23,6 +26,7 @@ import (
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/v0api"
"github.com/filecoin-project/lotus/api/v1api"
bstore "github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/lib/rpcenc"
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/metrics/proxy"
@ -92,6 +96,7 @@ func FullNodeHandler(a v1api.FullNode, permissioned bool, opts ...jsonrpc.Server
// Import handler
handleImportFunc := handleImport(a.(*impl.FullNodeAPI))
handleExportFunc := handleExport(a.(*impl.FullNodeAPI))
handleRemoteStoreFunc := handleRemoteStore(a.(*impl.FullNodeAPI))
if permissioned {
importAH := &auth.Handler{
Verify: a.AuthVerify,
@ -104,9 +109,16 @@ func FullNodeHandler(a v1api.FullNode, permissioned bool, opts ...jsonrpc.Server
Next: handleExportFunc,
}
m.Handle("/rest/v0/export", exportAH)
storeAH := &auth.Handler{
Verify: a.AuthVerify,
Next: handleRemoteStoreFunc,
}
m.Handle("/rest/v0/store/{uuid}", storeAH)
} else {
m.HandleFunc("/rest/v0/import", handleImportFunc)
m.HandleFunc("/rest/v0/export", handleExportFunc)
m.HandleFunc("/rest/v0/store/{uuid}", handleRemoteStoreFunc)
}
// debugging
@ -256,3 +268,34 @@ func handleFractionOpt(name string, setter func(int)) http.HandlerFunc {
setter(fr)
}
}
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
func handleRemoteStore(a *impl.FullNodeAPI) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
id, err := uuid.Parse(vars["uuid"])
if err != nil {
http.Error(w, fmt.Sprintf("parse uuid: %s", err), http.StatusBadRequest)
return
}
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Error(err)
w.WriteHeader(500)
return
}
nstore := bstore.NewNetworkStoreWS(c)
if err := a.ApiBlockstoreAccessor.RegisterApiStore(id, nstore); err != nil {
log.Errorw("registering api bstore", "error", err)
_ = c.Close()
return
}
}
}

View File

@ -21,67 +21,9 @@ import (
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
// LocalStorageMeta [path]/sectorstore.json
type LocalStorageMeta struct {
ID storiface.ID
// A high weight means data is more likely to be stored in this path
Weight uint64 // 0 = readonly
// Intermediate data for the sealing process will be stored here
CanSeal bool
// Finalized sectors that will be proved over time will be stored here
CanStore bool
// MaxStorage specifies the maximum number of bytes to use for sector storage
// (0 = unlimited)
MaxStorage uint64
// List of storage groups this path belongs to
Groups []string
// List of storage groups to which data from this path can be moved. If none
// are specified, allow to all
AllowTo []string
// AllowTypes lists sector file types which are allowed to be put into this
// path. If empty, all file types are allowed.
//
// Valid values:
// - "unsealed"
// - "sealed"
// - "cache"
// - "update"
// - "update-cache"
// Any other value will generate a warning and be ignored.
AllowTypes []string
// DenyTypes lists sector file types which aren't allowed to be put into this
// path.
//
// Valid values:
// - "unsealed"
// - "sealed"
// - "cache"
// - "update"
// - "update-cache"
// Any other value will generate a warning and be ignored.
DenyTypes []string
}
// StorageConfig .lotusstorage/storage.json
type StorageConfig struct {
StoragePaths []LocalPath
}
type LocalPath struct {
Path string
}
type LocalStorage interface {
GetStorage() (StorageConfig, error)
SetStorage(func(*StorageConfig)) error
GetStorage() (storiface.StorageConfig, error)
SetStorage(func(*storiface.StorageConfig)) error
Stat(path string) (fsutil.FsStat, error)
@ -213,7 +155,7 @@ func (st *Local) OpenPath(ctx context.Context, p string) error {
return xerrors.Errorf("reading storage metadata for %s: %w", p, err)
}
var meta LocalStorageMeta
var meta storiface.LocalStorageMeta
if err := json.Unmarshal(mb, &meta); err != nil {
return xerrors.Errorf("unmarshalling storage metadata for %s: %w", p, err)
}
@ -309,7 +251,7 @@ func (st *Local) Redeclare(ctx context.Context, filterId *storiface.ID, dropMiss
return xerrors.Errorf("reading storage metadata for %s: %w", p.local, err)
}
var meta LocalStorageMeta
var meta storiface.LocalStorageMeta
if err := json.Unmarshal(mb, &meta); err != nil {
return xerrors.Errorf("unmarshalling storage metadata for %s: %w", p.local, err)
}

View File

@ -19,18 +19,18 @@ const pathSize = 16 << 20
type TestingLocalStorage struct {
root string
c StorageConfig
c storiface.StorageConfig
}
func (t *TestingLocalStorage) DiskUsage(path string) (int64, error) {
return 1, nil
}
func (t *TestingLocalStorage) GetStorage() (StorageConfig, error) {
func (t *TestingLocalStorage) GetStorage() (storiface.StorageConfig, error) {
return t.c, nil
}
func (t *TestingLocalStorage) SetStorage(f func(*StorageConfig)) error {
func (t *TestingLocalStorage) SetStorage(f func(*storiface.StorageConfig)) error {
f(&t.c)
return nil
}
@ -51,7 +51,7 @@ func (t *TestingLocalStorage) init(subpath string) error {
metaFile := filepath.Join(path, MetaFile)
meta := &LocalStorageMeta{
meta := &storiface.LocalStorageMeta{
ID: storiface.ID(uuid.New().String()),
Weight: 1,
CanSeal: true,

View File

@ -7,6 +7,7 @@ import (
lru "github.com/hashicorp/golang-lru"
"github.com/filecoin-project/lotus/storage/sealer/fsutil"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
var StatTimeout = 5 * time.Second
@ -47,11 +48,11 @@ type diskUsageResult struct {
time time.Time
}
func (c *cachedLocalStorage) GetStorage() (StorageConfig, error) {
func (c *cachedLocalStorage) GetStorage() (storiface.StorageConfig, error) {
return c.base.GetStorage()
}
func (c *cachedLocalStorage) SetStorage(f func(*StorageConfig)) error {
func (c *cachedLocalStorage) SetStorage(f func(*storiface.StorageConfig)) error {
return c.base.SetStorage(f)
}

View File

@ -38,7 +38,7 @@ func createTestStorage(t *testing.T, p string, seal bool, att ...*paths.Local) s
}
}
cfg := &paths.LocalStorageMeta{
cfg := &storiface.LocalStorageMeta{
ID: storiface.ID(uuid.New().String()),
Weight: 10,
CanSeal: seal,
@ -77,8 +77,8 @@ func TestMoveShared(t *testing.T) {
_ = lr.Close()
})
err = lr.SetStorage(func(config *paths.StorageConfig) {
*config = paths.StorageConfig{}
err = lr.SetStorage(func(config *storiface.StorageConfig) {
*config = storiface.StorageConfig{}
})
require.NoError(t, err)

View File

@ -19,6 +19,7 @@ import (
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-statestore"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/storage/paths"
"github.com/filecoin-project/lotus/storage/sealer/ffiwrapper"
"github.com/filecoin-project/lotus/storage/sealer/fsutil"
@ -90,57 +91,12 @@ type result struct {
err error
}
// ResourceFilteringStrategy is an enum indicating the kinds of resource
// filtering strategies that can be configured for workers.
type ResourceFilteringStrategy string
const (
// ResourceFilteringHardware specifies that available hardware resources
// should be evaluated when scheduling a task against the worker.
ResourceFilteringHardware = ResourceFilteringStrategy("hardware")
// ResourceFilteringDisabled disables resource filtering against this
// worker. The scheduler may assign any task to this worker.
ResourceFilteringDisabled = ResourceFilteringStrategy("disabled")
)
type Config struct {
ParallelFetchLimit int
// Local worker config
AllowSectorDownload bool
AllowAddPiece bool
AllowPreCommit1 bool
AllowPreCommit2 bool
AllowCommit bool
AllowUnseal bool
AllowReplicaUpdate bool
AllowProveReplicaUpdate2 bool
AllowRegenSectorKey bool
LocalWorkerName string
// ResourceFiltering instructs the system which resource filtering strategy
// to use when evaluating tasks against this worker. An empty value defaults
// to "hardware".
ResourceFiltering ResourceFilteringStrategy
// PoSt config
ParallelCheckLimit int
DisableBuiltinWindowPoSt bool
DisableBuiltinWinningPoSt bool
DisallowRemoteFinalize bool
Assigner string
}
type StorageAuth http.Header
type WorkerStateStore *statestore.StateStore
type ManagerStateStore *statestore.StateStore
func New(ctx context.Context, lstor *paths.Local, stor paths.Store, ls paths.LocalStorage, si paths.SectorIndex, sc Config, wss WorkerStateStore, mss ManagerStateStore) (*Manager, error) {
func New(ctx context.Context, lstor *paths.Local, stor paths.Store, ls paths.LocalStorage, si paths.SectorIndex, sc config.SealerConfig, pc config.ProvingConfig, wss WorkerStateStore, mss ManagerStateStore) (*Manager, error) {
prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor, index: si})
if err != nil {
return nil, xerrors.Errorf("creating prover instance: %w", err)
@ -164,9 +120,9 @@ func New(ctx context.Context, lstor *paths.Local, stor paths.Store, ls paths.Loc
localProver: prover,
parallelCheckLimit: sc.ParallelCheckLimit,
disableBuiltinWindowPoSt: sc.DisableBuiltinWindowPoSt,
disableBuiltinWinningPoSt: sc.DisableBuiltinWinningPoSt,
parallelCheckLimit: pc.ParallelCheckLimit,
disableBuiltinWindowPoSt: pc.DisableBuiltinWindowPoSt,
disableBuiltinWinningPoSt: pc.DisableBuiltinWinningPoSt,
disallowRemoteFinalize: sc.DisallowRemoteFinalize,
work: mss,
@ -212,7 +168,7 @@ func New(ctx context.Context, lstor *paths.Local, stor paths.Store, ls paths.Loc
}
wcfg := WorkerConfig{
IgnoreResourceFiltering: sc.ResourceFiltering == ResourceFilteringDisabled,
IgnoreResourceFiltering: sc.ResourceFiltering == config.ResourceFilteringDisabled,
TaskTypes: localTasks,
Name: sc.LocalWorkerName,
}
@ -235,8 +191,8 @@ func (m *Manager) AddLocalStorage(ctx context.Context, path string) error {
return xerrors.Errorf("opening local path: %w", err)
}
if err := m.ls.SetStorage(func(sc *paths.StorageConfig) {
sc.StoragePaths = append(sc.StoragePaths, paths.LocalPath{Path: path})
if err := m.ls.SetStorage(func(sc *storiface.StorageConfig) {
sc.StoragePaths = append(sc.StoragePaths, storiface.LocalPath{Path: path})
}); err != nil {
return xerrors.Errorf("get storage config: %w", err)
}
@ -269,8 +225,8 @@ func (m *Manager) DetachLocalStorage(ctx context.Context, path string) error {
// drop from the persisted storage.json
var found bool
if err := m.ls.SetStorage(func(sc *paths.StorageConfig) {
out := make([]paths.LocalPath, 0, len(sc.StoragePaths))
if err := m.ls.SetStorage(func(sc *storiface.StorageConfig) {
out := make([]storiface.LocalPath, 0, len(sc.StoragePaths))
for _, storagePath := range sc.StoragePaths {
if storagePath.Path != path {
out = append(out, storagePath)

View File

@ -39,7 +39,7 @@ func init() {
logging.SetAllLoggers(logging.LevelDebug)
}
type testStorage paths.StorageConfig
type testStorage storiface.StorageConfig
func (t testStorage) DiskUsage(path string) (int64, error) {
return 1, nil // close enough
@ -50,7 +50,7 @@ func newTestStorage(t *testing.T) *testStorage {
require.NoError(t, err)
{
b, err := json.MarshalIndent(&paths.LocalStorageMeta{
b, err := json.MarshalIndent(&storiface.LocalStorageMeta{
ID: storiface.ID(uuid.New().String()),
Weight: 1,
CanSeal: true,
@ -63,7 +63,7 @@ func newTestStorage(t *testing.T) *testStorage {
}
return &testStorage{
StoragePaths: []paths.LocalPath{
StoragePaths: []storiface.LocalPath{
{Path: tp},
},
}
@ -82,12 +82,12 @@ func (t testStorage) cleanup() {
}
}
func (t testStorage) GetStorage() (paths.StorageConfig, error) {
return paths.StorageConfig(t), nil
func (t testStorage) GetStorage() (storiface.StorageConfig, error) {
return storiface.StorageConfig(t), nil
}
func (t *testStorage) SetStorage(f func(*paths.StorageConfig)) error {
f((*paths.StorageConfig)(t))
func (t *testStorage) SetStorage(f func(*storiface.StorageConfig)) error {
f((*storiface.StorageConfig)(t))
return nil
}

View File

@ -21,6 +21,7 @@ import (
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-statestore"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/storage/paths"
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
@ -30,7 +31,7 @@ import (
// only uses miner and does NOT use any remote worker.
func TestPieceProviderSimpleNoRemoteWorker(t *testing.T) {
// Set up sector storage manager
sealerCfg := Config{
sealerCfg := config.SealerConfig{
ParallelFetchLimit: 10,
AllowAddPiece: true,
AllowPreCommit1: true,
@ -89,7 +90,7 @@ func TestReadPieceRemoteWorkers(t *testing.T) {
logging.SetAllLoggers(logging.LevelDebug)
// miner's worker can only add pieces to an unsealed sector.
sealerCfg := Config{
sealerCfg := config.SealerConfig{
ParallelFetchLimit: 10,
AllowAddPiece: true,
AllowPreCommit1: false,
@ -198,7 +199,7 @@ func generatePieceData(size uint64) []byte {
return bz
}
func newPieceProviderTestHarness(t *testing.T, mgrConfig Config, sectorProofType abi.RegisteredSealProof) *pieceProviderTestHarness {
func newPieceProviderTestHarness(t *testing.T, mgrConfig config.SealerConfig, sectorProofType abi.RegisteredSealProof) *pieceProviderTestHarness {
ctx := context.Background()
// listen on tcp socket to create an http server later
address := "0.0.0.0:0"
@ -217,7 +218,7 @@ func newPieceProviderTestHarness(t *testing.T, mgrConfig Config, sectorProofType
wsts := statestore.New(namespace.Wrap(dstore, datastore.NewKey("/worker/calls")))
smsts := statestore.New(namespace.Wrap(dstore, datastore.NewKey("/stmgr/calls")))
mgr, err := New(ctx, localStore, remoteStore, storage, index, mgrConfig, wsts, smsts)
mgr, err := New(ctx, localStore, remoteStore, storage, index, mgrConfig, config.ProvingConfig{}, wsts, smsts)
require.NoError(t, err)
// start a http server on the manager to serve sector file requests.

View File

@ -153,3 +153,61 @@ type SecDataHttpHeader struct {
Key string
Value string
}
// StorageConfig .lotusstorage/storage.json
type StorageConfig struct {
StoragePaths []LocalPath
}
type LocalPath struct {
Path string
}
// LocalStorageMeta [path]/sectorstore.json
type LocalStorageMeta struct {
ID ID
// A high weight means data is more likely to be stored in this path
Weight uint64 // 0 = readonly
// Intermediate data for the sealing process will be stored here
CanSeal bool
// Finalized sectors that will be proved over time will be stored here
CanStore bool
// MaxStorage specifies the maximum number of bytes to use for sector storage
// (0 = unlimited)
MaxStorage uint64
// List of storage groups this path belongs to
Groups []string
// List of storage groups to which data from this path can be moved. If none
// are specified, allow to all
AllowTo []string
// AllowTypes lists sector file types which are allowed to be put into this
// path. If empty, all file types are allowed.
//
// Valid values:
// - "unsealed"
// - "sealed"
// - "cache"
// - "update"
// - "update-cache"
// Any other value will generate a warning and be ignored.
AllowTypes []string
// DenyTypes lists sector file types which aren't allowed to be put into this
// path.
//
// Valid values:
// - "unsealed"
// - "sealed"
// - "cache"
// - "update"
// - "update-cache"
// Any other value will generate a warning and be ignored.
DenyTypes []string
}

View File

@ -40,7 +40,6 @@ import (
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/impl"
"github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/lotus/storage/paths"
sealing "github.com/filecoin-project/lotus/storage/pipeline"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
@ -198,9 +197,9 @@ func PrepareMiner(t *TestEnvironment) (*LotusMiner, error) {
}
}
var localPaths []paths.LocalPath
var localPaths []storiface.LocalPath
b, err := json.MarshalIndent(&paths.LocalStorageMeta{
b, err := json.MarshalIndent(&storiface.LocalStorageMeta{
ID: storiface.ID(uuid.New().String()),
Weight: 10,
CanSeal: true,
@ -214,11 +213,11 @@ func PrepareMiner(t *TestEnvironment) (*LotusMiner, error) {
return nil, fmt.Errorf("persisting storage metadata (%s): %w", filepath.Join(lr.Path(), "sectorstore.json"), err)
}
localPaths = append(localPaths, paths.LocalPath{
localPaths = append(localPaths, storiface.LocalPath{
Path: lr.Path(),
})
if err := lr.SetStorage(func(sc *paths.StorageConfig) {
if err := lr.SetStorage(func(sc *storiface.StorageConfig) {
sc.StoragePaths = append(sc.StoragePaths, localPaths...)
}); err != nil {
return nil, err