netbs: Address review

This commit is contained in:
Łukasz Magiera 2022-11-06 20:00:52 +00:00
parent fcad93dc10
commit 401359646a
4 changed files with 27 additions and 20 deletions

View File

@ -19,12 +19,12 @@ import (
type NetRPCReqType byte type NetRPCReqType byte
const ( const (
NRpcHas NetRPCReqType = iota NRpcHas NetRPCReqType = iota
NRpcGet NetRPCReqType = iota NRpcGet
NRpcGetSize NetRPCReqType = iota NRpcGetSize
NRpcPut NetRPCReqType = iota NRpcPut
NRpcDelete NetRPCReqType = iota NRpcDelete
NRpcList NetRPCReqType = iota NRpcList
// todo cancel req // todo cancel req
) )
@ -32,16 +32,16 @@ const (
type NetRPCRespType byte type NetRPCRespType byte
const ( const (
NRpcOK NetRPCRespType = iota NRpcOK NetRPCRespType = iota
NRpcErr NetRPCRespType = iota NRpcErr
NRpcMore NetRPCRespType = iota NRpcMore
) )
type NetRPCErrType byte type NetRPCErrType byte
const ( const (
NRpcErrGeneric NetRPCErrType = iota NRpcErrGeneric NetRPCErrType = iota
NRpcErrNotFound NetRPCErrType = iota NRpcErrNotFound
) )
type NetRpcReq struct { type NetRpcReq struct {
@ -87,7 +87,7 @@ type NetworkStore struct {
closed chan struct{} closed chan struct{}
closeLk sync.Mutex closeLk sync.Mutex
onClose func() onClose []func()
} }
func NewNetworkStore(mss msgio.ReadWriteCloser) *NetworkStore { func NewNetworkStore(mss msgio.ReadWriteCloser) *NetworkStore {
@ -143,7 +143,7 @@ func (n *NetworkStore) OnClose(cb func()) {
case <-n.closed: case <-n.closed:
cb() cb()
default: default:
n.onClose = cb n.onClose = append(n.onClose, cb)
} }
} }
@ -154,7 +154,9 @@ func (n *NetworkStore) receive() {
close(n.closed) close(n.closed)
if n.onClose != nil { if n.onClose != nil {
n.onClose() for _, f := range n.onClose {
f()
}
} }
}() }()
@ -203,6 +205,7 @@ func (n *NetworkStore) sendRpc(rt NetRPCReqType, cids []cid.Cid, data [][]byte)
n.respLk.Lock() n.respLk.Lock()
if n.respMap == nil { if n.respMap == nil {
n.respLk.Unlock()
return 0, nil, xerrors.Errorf("netstore closed") return 0, nil, xerrors.Errorf("netstore closed")
} }
n.respMap[rid] = respCh n.respMap[rid] = respCh
@ -218,22 +221,24 @@ func (n *NetworkStore) sendRpc(rt NetRPCReqType, cids []cid.Cid, data [][]byte)
var rbuf bytes.Buffer // todo buffer pool var rbuf bytes.Buffer // todo buffer pool
if err := req.MarshalCBOR(&rbuf); err != nil { if err := req.MarshalCBOR(&rbuf); err != nil {
n.respLk.Lock() n.respLk.Lock()
defer n.respLk.Unlock()
if n.respMap == nil { if n.respMap == nil {
return 0, nil, xerrors.Errorf("netstore closed") return 0, nil, xerrors.Errorf("netstore closed")
} }
delete(n.respMap, rid) delete(n.respMap, rid)
n.respLk.Unlock()
return 0, nil, err return 0, nil, err
} }
if err := n.msgStream.WriteMsg(rbuf.Bytes()); err != nil { if err := n.msgStream.WriteMsg(rbuf.Bytes()); err != nil {
n.respLk.Lock() n.respLk.Lock()
defer n.respLk.Unlock()
if n.respMap == nil { if n.respMap == nil {
return 0, nil, xerrors.Errorf("netstore closed") return 0, nil, xerrors.Errorf("netstore closed")
} }
delete(n.respMap, rid) delete(n.respMap, rid)
n.respLk.Unlock()
return 0, nil, err return 0, nil, err
} }
@ -260,10 +265,10 @@ func (n *NetworkStore) waitResp(ctx context.Context, rch <-chan NetRpcResp, rid
} else { } else {
err = xerrors.Errorf("block not found, but cid was null") err = xerrors.Errorf("block not found, but cid was null")
} }
default:
err = xerrors.Errorf("unknown error type")
case NRpcErrGeneric: case NRpcErrGeneric:
err = xerrors.Errorf("generic error") err = xerrors.Errorf("generic error")
default:
err = xerrors.Errorf("unknown error type")
} }
return NetRpcResp{}, xerrors.Errorf("netstore error response: %s (%w)", e.Msg, err) return NetRpcResp{}, xerrors.Errorf("netstore error response: %s (%w)", e.Msg, err)

View File

@ -19,6 +19,7 @@ type NetworkStoreHandler struct {
bs Blockstore bs Blockstore
} }
// NOTE: This code isn't yet hardened to accept untrusted input. See TODOs here and in net.go
func HandleNetBstoreStream(ctx context.Context, bs Blockstore, mss msgio.ReadWriteCloser) *NetworkStoreHandler { func HandleNetBstoreStream(ctx context.Context, bs Blockstore, mss msgio.ReadWriteCloser) *NetworkStoreHandler {
ns := &NetworkStoreHandler{ ns := &NetworkStoreHandler{
msgStream: mss, msgStream: mss,

View File

@ -77,7 +77,7 @@ func (a *APIBlockstoreAccessor) Done(id retrievalmarket.DealID) error {
return a.sub.Done(id) return a.sub.Done(id)
} }
func (a *APIBlockstoreAccessor) UseRetrievalStore(id retrievalmarket.DealID, sid api.RemoteStoreID) error { func (a *APIBlockstoreAccessor) RegisterDealToRetrievalStore(id retrievalmarket.DealID, sid api.RemoteStoreID) error {
a.accessLk.Lock() a.accessLk.Lock()
defer a.accessLk.Unlock() defer a.accessLk.Unlock()

View File

@ -849,7 +849,7 @@ func (a *API) doRetrieval(ctx context.Context, order api.RetrievalOrder, sel dat
id := a.Retrieval.NextID() id := a.Retrieval.NextID()
if order.RemoteStore != nil { if order.RemoteStore != nil {
if err := a.ApiBlockstoreAccessor.UseRetrievalStore(id, *order.RemoteStore); err != nil { if err := a.ApiBlockstoreAccessor.RegisterDealToRetrievalStore(id, *order.RemoteStore); err != nil {
return 0, xerrors.Errorf("registering api store: %w", err) return 0, xerrors.Errorf("registering api store: %w", err)
} }
} }
@ -1030,6 +1030,7 @@ func (a *API) outputCAR(ctx context.Context, ds format.DAGService, bs bstore.Blo
root, root,
dagSpec.selector, dagSpec.selector,
func(node format.Node) error { func(node format.Node) error {
// if we're exporting merkle proofs for this dag, export all nodes read by the traversal
if dagSpec.exportAll { if dagSpec.exportAll {
lk.Lock() lk.Lock()
defer lk.Unlock() defer lk.Unlock()