bstore: Network blockstore
This commit is contained in:
parent
4ffded6fef
commit
1577740bc4
441
blockstore/cbor_gen.go
Normal file
441
blockstore/cbor_gen.go
Normal file
@ -0,0 +1,441 @@
|
||||
// Code generated by github.com/whyrusleeping/cbor-gen. DO NOT EDIT.
|
||||
|
||||
package blockstore
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"sort"
|
||||
|
||||
cid "github.com/ipfs/go-cid"
|
||||
cbg "github.com/whyrusleeping/cbor-gen"
|
||||
xerrors "golang.org/x/xerrors"
|
||||
)
|
||||
|
||||
var _ = xerrors.Errorf
|
||||
var _ = cid.Undef
|
||||
var _ = math.E
|
||||
var _ = sort.Sort
|
||||
|
||||
var lengthBufNetRpcReq = []byte{132}
|
||||
|
||||
func (t *NetRpcReq) MarshalCBOR(w io.Writer) error {
|
||||
if t == nil {
|
||||
_, err := w.Write(cbg.CborNull)
|
||||
return err
|
||||
}
|
||||
|
||||
cw := cbg.NewCborWriter(w)
|
||||
|
||||
if _, err := cw.Write(lengthBufNetRpcReq); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// t.Type (blockstore.NetRPCReqType) (uint8)
|
||||
if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.Type)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// t.ID (uint64) (uint64)
|
||||
|
||||
if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.ID)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// t.Cid ([]cid.Cid) (slice)
|
||||
if len(t.Cid) > cbg.MaxLength {
|
||||
return xerrors.Errorf("Slice value in field t.Cid was too long")
|
||||
}
|
||||
|
||||
if err := cw.WriteMajorTypeHeader(cbg.MajArray, uint64(len(t.Cid))); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, v := range t.Cid {
|
||||
if err := cbg.WriteCid(w, v); err != nil {
|
||||
return xerrors.Errorf("failed writing cid field t.Cid: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// t.Data ([][]uint8) (slice)
|
||||
if len(t.Data) > cbg.MaxLength {
|
||||
return xerrors.Errorf("Slice value in field t.Data was too long")
|
||||
}
|
||||
|
||||
if err := cw.WriteMajorTypeHeader(cbg.MajArray, uint64(len(t.Data))); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, v := range t.Data {
|
||||
if len(v) > cbg.ByteArrayMaxLen {
|
||||
return xerrors.Errorf("Byte array in field v was too long")
|
||||
}
|
||||
|
||||
if err := cw.WriteMajorTypeHeader(cbg.MajByteString, uint64(len(v))); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := cw.Write(v[:]); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *NetRpcReq) UnmarshalCBOR(r io.Reader) (err error) {
|
||||
*t = NetRpcReq{}
|
||||
|
||||
cr := cbg.NewCborReader(r)
|
||||
|
||||
maj, extra, err := cr.ReadHeader()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
if err == io.EOF {
|
||||
err = io.ErrUnexpectedEOF
|
||||
}
|
||||
}()
|
||||
|
||||
if maj != cbg.MajArray {
|
||||
return fmt.Errorf("cbor input should be of type array")
|
||||
}
|
||||
|
||||
if extra != 4 {
|
||||
return fmt.Errorf("cbor input had wrong number of fields")
|
||||
}
|
||||
|
||||
// t.Type (blockstore.NetRPCReqType) (uint8)
|
||||
|
||||
maj, extra, err = cr.ReadHeader()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if maj != cbg.MajUnsignedInt {
|
||||
return fmt.Errorf("wrong type for uint8 field")
|
||||
}
|
||||
if extra > math.MaxUint8 {
|
||||
return fmt.Errorf("integer in input was too large for uint8 field")
|
||||
}
|
||||
t.Type = NetRPCReqType(extra)
|
||||
// t.ID (uint64) (uint64)
|
||||
|
||||
{
|
||||
|
||||
maj, extra, err = cr.ReadHeader()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if maj != cbg.MajUnsignedInt {
|
||||
return fmt.Errorf("wrong type for uint64 field")
|
||||
}
|
||||
t.ID = uint64(extra)
|
||||
|
||||
}
|
||||
// t.Cid ([]cid.Cid) (slice)
|
||||
|
||||
maj, extra, err = cr.ReadHeader()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if extra > cbg.MaxLength {
|
||||
return fmt.Errorf("t.Cid: array too large (%d)", extra)
|
||||
}
|
||||
|
||||
if maj != cbg.MajArray {
|
||||
return fmt.Errorf("expected cbor array")
|
||||
}
|
||||
|
||||
if extra > 0 {
|
||||
t.Cid = make([]cid.Cid, extra)
|
||||
}
|
||||
|
||||
for i := 0; i < int(extra); i++ {
|
||||
|
||||
c, err := cbg.ReadCid(cr)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("reading cid field t.Cid failed: %w", err)
|
||||
}
|
||||
t.Cid[i] = c
|
||||
}
|
||||
|
||||
// t.Data ([][]uint8) (slice)
|
||||
|
||||
maj, extra, err = cr.ReadHeader()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if extra > cbg.MaxLength {
|
||||
return fmt.Errorf("t.Data: array too large (%d)", extra)
|
||||
}
|
||||
|
||||
if maj != cbg.MajArray {
|
||||
return fmt.Errorf("expected cbor array")
|
||||
}
|
||||
|
||||
if extra > 0 {
|
||||
t.Data = make([][]uint8, extra)
|
||||
}
|
||||
|
||||
for i := 0; i < int(extra); i++ {
|
||||
{
|
||||
var maj byte
|
||||
var extra uint64
|
||||
var err error
|
||||
|
||||
maj, extra, err = cr.ReadHeader()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if extra > cbg.ByteArrayMaxLen {
|
||||
return fmt.Errorf("t.Data[i]: byte array too large (%d)", extra)
|
||||
}
|
||||
if maj != cbg.MajByteString {
|
||||
return fmt.Errorf("expected byte array")
|
||||
}
|
||||
|
||||
if extra > 0 {
|
||||
t.Data[i] = make([]uint8, extra)
|
||||
}
|
||||
|
||||
if _, err := io.ReadFull(cr, t.Data[i][:]); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
var lengthBufNetRpcResp = []byte{131}
|
||||
|
||||
func (t *NetRpcResp) MarshalCBOR(w io.Writer) error {
|
||||
if t == nil {
|
||||
_, err := w.Write(cbg.CborNull)
|
||||
return err
|
||||
}
|
||||
|
||||
cw := cbg.NewCborWriter(w)
|
||||
|
||||
if _, err := cw.Write(lengthBufNetRpcResp); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// t.Type (blockstore.NetRPCRespType) (uint8)
|
||||
if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.Type)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// t.ID (uint64) (uint64)
|
||||
|
||||
if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.ID)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// t.Data ([]uint8) (slice)
|
||||
if len(t.Data) > cbg.ByteArrayMaxLen {
|
||||
return xerrors.Errorf("Byte array in field t.Data was too long")
|
||||
}
|
||||
|
||||
if err := cw.WriteMajorTypeHeader(cbg.MajByteString, uint64(len(t.Data))); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := cw.Write(t.Data[:]); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *NetRpcResp) UnmarshalCBOR(r io.Reader) (err error) {
|
||||
*t = NetRpcResp{}
|
||||
|
||||
cr := cbg.NewCborReader(r)
|
||||
|
||||
maj, extra, err := cr.ReadHeader()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
if err == io.EOF {
|
||||
err = io.ErrUnexpectedEOF
|
||||
}
|
||||
}()
|
||||
|
||||
if maj != cbg.MajArray {
|
||||
return fmt.Errorf("cbor input should be of type array")
|
||||
}
|
||||
|
||||
if extra != 3 {
|
||||
return fmt.Errorf("cbor input had wrong number of fields")
|
||||
}
|
||||
|
||||
// t.Type (blockstore.NetRPCRespType) (uint8)
|
||||
|
||||
maj, extra, err = cr.ReadHeader()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if maj != cbg.MajUnsignedInt {
|
||||
return fmt.Errorf("wrong type for uint8 field")
|
||||
}
|
||||
if extra > math.MaxUint8 {
|
||||
return fmt.Errorf("integer in input was too large for uint8 field")
|
||||
}
|
||||
t.Type = NetRPCRespType(extra)
|
||||
// t.ID (uint64) (uint64)
|
||||
|
||||
{
|
||||
|
||||
maj, extra, err = cr.ReadHeader()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if maj != cbg.MajUnsignedInt {
|
||||
return fmt.Errorf("wrong type for uint64 field")
|
||||
}
|
||||
t.ID = uint64(extra)
|
||||
|
||||
}
|
||||
// t.Data ([]uint8) (slice)
|
||||
|
||||
maj, extra, err = cr.ReadHeader()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if extra > cbg.ByteArrayMaxLen {
|
||||
return fmt.Errorf("t.Data: byte array too large (%d)", extra)
|
||||
}
|
||||
if maj != cbg.MajByteString {
|
||||
return fmt.Errorf("expected byte array")
|
||||
}
|
||||
|
||||
if extra > 0 {
|
||||
t.Data = make([]uint8, extra)
|
||||
}
|
||||
|
||||
if _, err := io.ReadFull(cr, t.Data[:]); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
var lengthBufNetRpcErr = []byte{131}
|
||||
|
||||
func (t *NetRpcErr) MarshalCBOR(w io.Writer) error {
|
||||
if t == nil {
|
||||
_, err := w.Write(cbg.CborNull)
|
||||
return err
|
||||
}
|
||||
|
||||
cw := cbg.NewCborWriter(w)
|
||||
|
||||
if _, err := cw.Write(lengthBufNetRpcErr); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// t.Type (blockstore.NetRPCErrType) (uint8)
|
||||
if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.Type)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// t.Msg (string) (string)
|
||||
if len(t.Msg) > cbg.MaxLength {
|
||||
return xerrors.Errorf("Value in field t.Msg was too long")
|
||||
}
|
||||
|
||||
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Msg))); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := io.WriteString(w, string(t.Msg)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// t.Cid (cid.Cid) (struct)
|
||||
|
||||
if t.Cid == nil {
|
||||
if _, err := cw.Write(cbg.CborNull); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if err := cbg.WriteCid(cw, *t.Cid); err != nil {
|
||||
return xerrors.Errorf("failed to write cid field t.Cid: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *NetRpcErr) UnmarshalCBOR(r io.Reader) (err error) {
|
||||
*t = NetRpcErr{}
|
||||
|
||||
cr := cbg.NewCborReader(r)
|
||||
|
||||
maj, extra, err := cr.ReadHeader()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
if err == io.EOF {
|
||||
err = io.ErrUnexpectedEOF
|
||||
}
|
||||
}()
|
||||
|
||||
if maj != cbg.MajArray {
|
||||
return fmt.Errorf("cbor input should be of type array")
|
||||
}
|
||||
|
||||
if extra != 3 {
|
||||
return fmt.Errorf("cbor input had wrong number of fields")
|
||||
}
|
||||
|
||||
// t.Type (blockstore.NetRPCErrType) (uint8)
|
||||
|
||||
maj, extra, err = cr.ReadHeader()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if maj != cbg.MajUnsignedInt {
|
||||
return fmt.Errorf("wrong type for uint8 field")
|
||||
}
|
||||
if extra > math.MaxUint8 {
|
||||
return fmt.Errorf("integer in input was too large for uint8 field")
|
||||
}
|
||||
t.Type = NetRPCErrType(extra)
|
||||
// t.Msg (string) (string)
|
||||
|
||||
{
|
||||
sval, err := cbg.ReadString(cr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
t.Msg = string(sval)
|
||||
}
|
||||
// t.Cid (cid.Cid) (struct)
|
||||
|
||||
{
|
||||
|
||||
b, err := cr.ReadByte()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if b != cbg.CborNull[0] {
|
||||
if err := cr.UnreadByte(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c, err := cbg.ReadCid(cr)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to read cid field t.Cid: %w", err)
|
||||
}
|
||||
|
||||
t.Cid = &c
|
||||
}
|
||||
|
||||
}
|
||||
return nil
|
||||
}
|
462
blockstore/net.go
Normal file
462
blockstore/net.go
Normal file
@ -0,0 +1,462 @@
|
||||
package blockstore
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
blocks "github.com/ipfs/go-block-format"
|
||||
"github.com/ipfs/go-cid"
|
||||
ipld "github.com/ipfs/go-ipld-format"
|
||||
"github.com/libp2p/go-msgio"
|
||||
cbg "github.com/whyrusleeping/cbor-gen"
|
||||
"golang.org/x/xerrors"
|
||||
)
|
||||
|
||||
type NetRPCReqType byte
|
||||
|
||||
const (
|
||||
NRpcHas NetRPCReqType = iota
|
||||
NRpcGet NetRPCReqType = iota
|
||||
NRpcGetSize NetRPCReqType = iota
|
||||
NRpcPut NetRPCReqType = iota
|
||||
NRpcDelete NetRPCReqType = iota
|
||||
NRpcList NetRPCReqType = iota
|
||||
|
||||
// todo cancel req
|
||||
)
|
||||
|
||||
type NetRPCRespType byte
|
||||
|
||||
const (
|
||||
NRpcOK NetRPCRespType = iota
|
||||
NRpcErr NetRPCRespType = iota
|
||||
NRpcMore NetRPCRespType = iota
|
||||
)
|
||||
|
||||
type NetRPCErrType byte
|
||||
|
||||
const (
|
||||
NRpcErrGeneric NetRPCErrType = iota
|
||||
NRpcErrNotFound NetRPCErrType = iota
|
||||
)
|
||||
|
||||
type NetRpcReq struct {
|
||||
Type NetRPCReqType
|
||||
ID uint64
|
||||
|
||||
Cid []cid.Cid // todo maxsize?
|
||||
Data [][]byte // todo maxsize?
|
||||
}
|
||||
|
||||
type NetRpcResp struct {
|
||||
Type NetRPCRespType
|
||||
ID uint64
|
||||
|
||||
// error or cids in allkeys
|
||||
Data []byte // todo maxsize?
|
||||
|
||||
next <-chan NetRpcResp
|
||||
}
|
||||
|
||||
type NetRpcErr struct {
|
||||
Type NetRPCErrType
|
||||
|
||||
Msg string
|
||||
|
||||
// in case of NRpcErrNotFound
|
||||
Cid *cid.Cid
|
||||
}
|
||||
|
||||
type NetworkStore struct {
|
||||
// note: writer is thread-safe
|
||||
msgStream msgio.ReadWriteCloser
|
||||
|
||||
// atomic
|
||||
reqCount uint64
|
||||
|
||||
respLk sync.Mutex
|
||||
|
||||
// respMap is nil after store closes
|
||||
respMap map[uint64]chan<- NetRpcResp
|
||||
|
||||
closing chan struct{}
|
||||
closed chan struct{}
|
||||
|
||||
closeLk sync.Mutex
|
||||
onClose func()
|
||||
}
|
||||
|
||||
func NewNetworkStore(mss msgio.ReadWriteCloser) *NetworkStore {
|
||||
ns := &NetworkStore{
|
||||
msgStream: mss,
|
||||
|
||||
respMap: map[uint64]chan<- NetRpcResp{},
|
||||
|
||||
closing: make(chan struct{}),
|
||||
closed: make(chan struct{}),
|
||||
}
|
||||
|
||||
go ns.receive()
|
||||
|
||||
return ns
|
||||
}
|
||||
|
||||
func (n *NetworkStore) shutdown(msg string) {
|
||||
if err := n.msgStream.Close(); err != nil {
|
||||
log.Errorw("closing netstore msg stream", "error", err)
|
||||
}
|
||||
|
||||
nerr := NetRpcErr{
|
||||
Type: NRpcErrGeneric,
|
||||
Msg: msg,
|
||||
Cid: nil,
|
||||
}
|
||||
|
||||
var errb bytes.Buffer
|
||||
if err := nerr.MarshalCBOR(&errb); err != nil {
|
||||
log.Errorw("netstore shutdown: error marshaling error", "err", err)
|
||||
}
|
||||
|
||||
n.respLk.Lock()
|
||||
for id, resps := range n.respMap {
|
||||
resps <- NetRpcResp{
|
||||
Type: NRpcErr,
|
||||
ID: id,
|
||||
Data: errb.Bytes(),
|
||||
}
|
||||
}
|
||||
|
||||
n.respMap = nil
|
||||
|
||||
n.respLk.Unlock()
|
||||
}
|
||||
|
||||
func (n *NetworkStore) OnClose(cb func()) {
|
||||
n.closeLk.Lock()
|
||||
defer n.closeLk.Unlock()
|
||||
|
||||
select {
|
||||
case <-n.closed:
|
||||
cb()
|
||||
default:
|
||||
n.onClose = cb
|
||||
}
|
||||
}
|
||||
|
||||
func (n *NetworkStore) receive() {
|
||||
defer func() {
|
||||
n.closeLk.Lock()
|
||||
defer n.closeLk.Unlock()
|
||||
|
||||
close(n.closed)
|
||||
if n.onClose != nil {
|
||||
n.onClose()
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-n.closing:
|
||||
n.shutdown("netstore stopping")
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
msg, err := n.msgStream.ReadMsg()
|
||||
if err != nil {
|
||||
n.shutdown(fmt.Sprintf("netstore ReadMsg: %s", err))
|
||||
return
|
||||
}
|
||||
|
||||
var resp NetRpcResp
|
||||
if err := resp.UnmarshalCBOR(bytes.NewReader(msg)); err != nil {
|
||||
n.shutdown(fmt.Sprintf("unmarshaling netstore response: %s", err))
|
||||
return
|
||||
}
|
||||
|
||||
n.msgStream.ReleaseMsg(msg)
|
||||
|
||||
n.respLk.Lock()
|
||||
if ch, ok := n.respMap[resp.ID]; ok {
|
||||
if resp.Type == NRpcMore {
|
||||
nch := make(chan NetRpcResp, 1)
|
||||
resp.next = nch
|
||||
n.respMap[resp.ID] = nch
|
||||
} else {
|
||||
delete(n.respMap, resp.ID)
|
||||
}
|
||||
|
||||
ch <- resp
|
||||
}
|
||||
n.respLk.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func (n *NetworkStore) sendRpc(rt NetRPCReqType, cids []cid.Cid, data [][]byte) (uint64, <-chan NetRpcResp, error) {
|
||||
rid := atomic.AddUint64(&n.reqCount, 1)
|
||||
|
||||
respCh := make(chan NetRpcResp, 1) // todo pool?
|
||||
|
||||
n.respLk.Lock()
|
||||
if n.respMap == nil {
|
||||
return 0, nil, xerrors.Errorf("netstore closed")
|
||||
}
|
||||
n.respMap[rid] = respCh
|
||||
n.respLk.Unlock()
|
||||
|
||||
req := NetRpcReq{
|
||||
Type: rt,
|
||||
ID: rid,
|
||||
Cid: cids,
|
||||
Data: data,
|
||||
}
|
||||
|
||||
var rbuf bytes.Buffer // todo buffer pool
|
||||
if err := req.MarshalCBOR(&rbuf); err != nil {
|
||||
n.respLk.Lock()
|
||||
if n.respMap == nil {
|
||||
return 0, nil, xerrors.Errorf("netstore closed")
|
||||
}
|
||||
delete(n.respMap, rid)
|
||||
n.respLk.Unlock()
|
||||
|
||||
return 0, nil, err
|
||||
}
|
||||
|
||||
if err := n.msgStream.WriteMsg(rbuf.Bytes()); err != nil {
|
||||
n.respLk.Lock()
|
||||
if n.respMap == nil {
|
||||
return 0, nil, xerrors.Errorf("netstore closed")
|
||||
}
|
||||
delete(n.respMap, rid)
|
||||
n.respLk.Unlock()
|
||||
|
||||
return 0, nil, err
|
||||
}
|
||||
|
||||
return rid, respCh, nil
|
||||
}
|
||||
|
||||
func (n *NetworkStore) waitResp(ctx context.Context, rch <-chan NetRpcResp, rid uint64) (NetRpcResp, error) {
|
||||
select {
|
||||
case resp := <-rch:
|
||||
if resp.Type == NRpcErr {
|
||||
var e NetRpcErr
|
||||
if err := e.UnmarshalCBOR(bytes.NewReader(resp.Data)); err != nil {
|
||||
return NetRpcResp{}, xerrors.Errorf("unmarshaling error data: %w", err)
|
||||
}
|
||||
|
||||
var err error
|
||||
switch e.Type {
|
||||
case NRpcErrNotFound:
|
||||
if e.Cid != nil {
|
||||
err = ipld.ErrNotFound{
|
||||
Cid: *e.Cid,
|
||||
}
|
||||
} else {
|
||||
err = xerrors.Errorf("block not found, but cid was null")
|
||||
}
|
||||
default:
|
||||
err = xerrors.Errorf("unknown error type")
|
||||
case NRpcErrGeneric:
|
||||
err = xerrors.Errorf("generic error")
|
||||
}
|
||||
|
||||
return NetRpcResp{}, xerrors.Errorf("netstore error response: %s (%w)", e.Msg, err)
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
case <-ctx.Done():
|
||||
// todo send cancel req
|
||||
|
||||
n.respLk.Lock()
|
||||
if n.respMap != nil {
|
||||
delete(n.respMap, rid)
|
||||
}
|
||||
n.respLk.Unlock()
|
||||
|
||||
return NetRpcResp{}, ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (n *NetworkStore) Has(ctx context.Context, c cid.Cid) (bool, error) {
|
||||
req, rch, err := n.sendRpc(NRpcHas, []cid.Cid{c}, nil)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
resp, err := n.waitResp(ctx, rch, req)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if len(resp.Data) != 1 {
|
||||
return false, xerrors.Errorf("expected reposnse length to be 1 byte")
|
||||
}
|
||||
switch resp.Data[0] {
|
||||
case cbg.CborBoolTrue[0]:
|
||||
return true, nil
|
||||
case cbg.CborBoolFalse[0]:
|
||||
return false, nil
|
||||
default:
|
||||
return false, xerrors.Errorf("has: bad response: %x", resp.Data[0])
|
||||
}
|
||||
}
|
||||
|
||||
func (n *NetworkStore) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) {
|
||||
req, rch, err := n.sendRpc(NRpcGet, []cid.Cid{c}, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp, err := n.waitResp(ctx, rch, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return blocks.NewBlockWithCid(resp.Data, c)
|
||||
}
|
||||
|
||||
func (n *NetworkStore) View(ctx context.Context, c cid.Cid, callback func([]byte) error) error {
|
||||
req, rch, err := n.sendRpc(NRpcGet, []cid.Cid{c}, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
resp, err := n.waitResp(ctx, rch, req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return callback(resp.Data) // todo return buf to pool
|
||||
}
|
||||
|
||||
func (n *NetworkStore) GetSize(ctx context.Context, c cid.Cid) (int, error) {
|
||||
req, rch, err := n.sendRpc(NRpcGetSize, []cid.Cid{c}, nil)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
resp, err := n.waitResp(ctx, rch, req)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if len(resp.Data) != 4 {
|
||||
return 0, xerrors.Errorf("expected getsize response to be 4 bytes, was %d", resp.Data)
|
||||
}
|
||||
|
||||
return int(binary.LittleEndian.Uint32(resp.Data)), nil
|
||||
}
|
||||
|
||||
func (n *NetworkStore) Put(ctx context.Context, block blocks.Block) error {
|
||||
return n.PutMany(ctx, []blocks.Block{block})
|
||||
}
|
||||
|
||||
func (n *NetworkStore) PutMany(ctx context.Context, blocks []blocks.Block) error {
|
||||
// todo pool
|
||||
cids := make([]cid.Cid, len(blocks))
|
||||
blkDatas := make([][]byte, len(blocks))
|
||||
for i, block := range blocks {
|
||||
cids[i] = block.Cid()
|
||||
blkDatas[i] = block.RawData()
|
||||
}
|
||||
|
||||
req, rch, err := n.sendRpc(NRpcPut, cids, blkDatas)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = n.waitResp(ctx, rch, req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *NetworkStore) DeleteBlock(ctx context.Context, c cid.Cid) error {
|
||||
return n.DeleteMany(ctx, []cid.Cid{c})
|
||||
}
|
||||
|
||||
func (n *NetworkStore) DeleteMany(ctx context.Context, cids []cid.Cid) error {
|
||||
req, rch, err := n.sendRpc(NRpcDelete, cids, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = n.waitResp(ctx, rch, req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *NetworkStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
|
||||
req, rch, err := n.sendRpc(NRpcList, nil, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
outCh := make(chan cid.Cid, 16)
|
||||
|
||||
go func() {
|
||||
defer close(outCh)
|
||||
// todo defer cancel request
|
||||
|
||||
for {
|
||||
if rch == nil {
|
||||
return
|
||||
}
|
||||
|
||||
resp, err := n.waitResp(ctx, rch, req)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
switch resp.Type {
|
||||
case NRpcOK, NRpcMore:
|
||||
c, err := cid.Cast(resp.Data)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// todo propagate backpressure
|
||||
select {
|
||||
case outCh <- c:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
||||
rch = resp.next
|
||||
default:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return outCh, err
|
||||
}
|
||||
|
||||
func (n *NetworkStore) HashOnRead(enabled bool) {
|
||||
// todo
|
||||
return
|
||||
}
|
||||
|
||||
func (n *NetworkStore) Stop(ctx context.Context) error {
|
||||
close(n.closing)
|
||||
|
||||
select {
|
||||
case <-n.closed:
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
var _ Blockstore = &NetworkStore{}
|
236
blockstore/net_serve.go
Normal file
236
blockstore/net_serve.go
Normal file
@ -0,0 +1,236 @@
|
||||
package blockstore
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/binary"
|
||||
|
||||
block "github.com/ipfs/go-block-format"
|
||||
"github.com/ipfs/go-cid"
|
||||
ipld "github.com/ipfs/go-ipld-format"
|
||||
"github.com/libp2p/go-msgio"
|
||||
cbg "github.com/whyrusleeping/cbor-gen"
|
||||
"golang.org/x/xerrors"
|
||||
)
|
||||
|
||||
type NetworkStoreHandler struct {
|
||||
msgStream msgio.ReadWriteCloser
|
||||
|
||||
bs Blockstore
|
||||
}
|
||||
|
||||
func HandleNetBstoreStream(ctx context.Context, bs Blockstore, mss msgio.ReadWriteCloser) *NetworkStoreHandler {
|
||||
ns := &NetworkStoreHandler{
|
||||
msgStream: mss,
|
||||
bs: bs,
|
||||
}
|
||||
|
||||
go ns.handle(ctx)
|
||||
|
||||
return ns
|
||||
}
|
||||
|
||||
func (h *NetworkStoreHandler) handle(ctx context.Context) {
|
||||
defer func() {
|
||||
if err := h.msgStream.Close(); err != nil {
|
||||
log.Errorw("error closing blockstore stream", "error", err)
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
var req NetRpcReq
|
||||
|
||||
ms, err := h.msgStream.ReadMsg()
|
||||
if err != nil {
|
||||
log.Warnw("bstore stream err", "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := req.UnmarshalCBOR(bytes.NewReader(ms)); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
h.msgStream.ReleaseMsg(ms)
|
||||
|
||||
switch req.Type {
|
||||
case NRpcHas:
|
||||
if len(req.Cid) != 1 {
|
||||
if err := h.respondError(req.ID, xerrors.New("expected request for 1 cid"), cid.Undef); err != nil {
|
||||
log.Warnw("writing error response", "error", err)
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
res, err := h.bs.Has(ctx, req.Cid[0])
|
||||
if err != nil {
|
||||
if err := h.respondError(req.ID, err, req.Cid[0]); err != nil {
|
||||
log.Warnw("writing error response", "error", err)
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
var resData [1]byte
|
||||
if res {
|
||||
resData[0] = cbg.CborBoolTrue[0]
|
||||
} else {
|
||||
resData[0] = cbg.CborBoolFalse[0]
|
||||
}
|
||||
|
||||
if err := h.respond(req.ID, NRpcOK, resData[:]); err != nil {
|
||||
log.Warnw("writing response", "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
case NRpcGet:
|
||||
if len(req.Cid) != 1 {
|
||||
if err := h.respondError(req.ID, xerrors.New("expected request for 1 cid"), cid.Undef); err != nil {
|
||||
log.Warnw("writing error response", "error", err)
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
err := h.bs.View(ctx, req.Cid[0], func(bdata []byte) error {
|
||||
return h.respond(req.ID, NRpcOK, bdata)
|
||||
})
|
||||
if err != nil {
|
||||
if err := h.respondError(req.ID, err, req.Cid[0]); err != nil {
|
||||
log.Warnw("writing error response", "error", err)
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
case NRpcGetSize:
|
||||
if len(req.Cid) != 1 {
|
||||
if err := h.respondError(req.ID, xerrors.New("expected request for 1 cid"), cid.Undef); err != nil {
|
||||
log.Warnw("writing error response", "error", err)
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
sz, err := h.bs.GetSize(ctx, req.Cid[0])
|
||||
if err != nil {
|
||||
if err := h.respondError(req.ID, err, req.Cid[0]); err != nil {
|
||||
log.Warnw("writing error response", "error", err)
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
var resData [4]byte
|
||||
binary.LittleEndian.PutUint32(resData[:], uint32(sz))
|
||||
|
||||
if err := h.respond(req.ID, NRpcOK, resData[:]); err != nil {
|
||||
log.Warnw("writing response", "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
case NRpcPut:
|
||||
blocks := make([]block.Block, len(req.Cid))
|
||||
|
||||
if len(req.Cid) != len(req.Data) {
|
||||
if err := h.respondError(req.ID, xerrors.New("cid count didn't match data count"), cid.Undef); err != nil {
|
||||
log.Warnw("writing error response", "error", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
for i := range req.Cid {
|
||||
blocks[i], err = block.NewBlockWithCid(req.Data[i], req.Cid[i])
|
||||
if err != nil {
|
||||
log.Warnw("make block", "error", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
err := h.bs.PutMany(ctx, blocks)
|
||||
if err != nil {
|
||||
if err := h.respondError(req.ID, err, cid.Undef); err != nil {
|
||||
log.Warnw("writing error response", "error", err)
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if err := h.respond(req.ID, NRpcOK, []byte{}); err != nil {
|
||||
log.Warnw("writing response", "error", err)
|
||||
return
|
||||
}
|
||||
case NRpcDelete:
|
||||
err := h.bs.DeleteMany(ctx, req.Cid)
|
||||
if err != nil {
|
||||
if err := h.respondError(req.ID, err, cid.Undef); err != nil {
|
||||
log.Warnw("writing error response", "error", err)
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if err := h.respond(req.ID, NRpcOK, []byte{}); err != nil {
|
||||
log.Warnw("writing response", "error", err)
|
||||
return
|
||||
}
|
||||
case NRpcList:
|
||||
if err := h.respondError(req.ID, xerrors.New("list todo"), cid.Undef); err != nil {
|
||||
log.Warnw("writing error response", "error", err)
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *NetworkStoreHandler) respondError(req uint64, uerr error, c cid.Cid) error {
|
||||
var resp NetRpcResp
|
||||
resp.ID = req
|
||||
resp.Type = NRpcErr
|
||||
|
||||
nerr := NetRpcErr{
|
||||
Type: NRpcErrGeneric,
|
||||
Msg: uerr.Error(),
|
||||
}
|
||||
if ipld.IsNotFound(uerr) {
|
||||
nerr.Type = NRpcErrNotFound
|
||||
nerr.Cid = &c
|
||||
}
|
||||
|
||||
var edata bytes.Buffer
|
||||
if err := nerr.MarshalCBOR(&edata); err != nil {
|
||||
return xerrors.Errorf("marshaling error data: %w", err)
|
||||
}
|
||||
|
||||
resp.Data = edata.Bytes()
|
||||
|
||||
var msg bytes.Buffer
|
||||
if err := resp.MarshalCBOR(&msg); err != nil {
|
||||
return xerrors.Errorf("marshaling error response: %w", err)
|
||||
}
|
||||
|
||||
if err := h.msgStream.WriteMsg(msg.Bytes()); err != nil {
|
||||
return xerrors.Errorf("write error response: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *NetworkStoreHandler) respond(req uint64, rt NetRPCRespType, data []byte) error {
|
||||
var resp NetRpcResp
|
||||
resp.ID = req
|
||||
resp.Type = rt
|
||||
resp.Data = data
|
||||
|
||||
var msg bytes.Buffer
|
||||
if err := resp.MarshalCBOR(&msg); err != nil {
|
||||
return xerrors.Errorf("marshaling response: %w", err)
|
||||
}
|
||||
|
||||
if err := h.msgStream.WriteMsg(msg.Bytes()); err != nil {
|
||||
return xerrors.Errorf("write response: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
63
blockstore/net_test.go
Normal file
63
blockstore/net_test.go
Normal file
@ -0,0 +1,63 @@
|
||||
package blockstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"testing"
|
||||
|
||||
block "github.com/ipfs/go-block-format"
|
||||
ipld "github.com/ipfs/go-ipld-format"
|
||||
"github.com/libp2p/go-msgio"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestNetBstore(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
cr, sw := io.Pipe()
|
||||
sr, cw := io.Pipe()
|
||||
|
||||
cm := msgio.Combine(msgio.NewWriter(cw), msgio.NewReader(cr))
|
||||
sm := msgio.Combine(msgio.NewWriter(sw), msgio.NewReader(sr))
|
||||
|
||||
bbs := NewMemorySync()
|
||||
_ = HandleNetBstoreStream(ctx, bbs, sm)
|
||||
|
||||
nbs := NewNetworkStore(cm)
|
||||
|
||||
tb1 := block.NewBlock([]byte("aoeu"))
|
||||
|
||||
h, err := nbs.Has(ctx, tb1.Cid())
|
||||
require.NoError(t, err)
|
||||
require.False(t, h)
|
||||
|
||||
err = nbs.Put(ctx, tb1)
|
||||
require.NoError(t, err)
|
||||
|
||||
h, err = nbs.Has(ctx, tb1.Cid())
|
||||
require.NoError(t, err)
|
||||
require.True(t, h)
|
||||
|
||||
sz, err := nbs.GetSize(ctx, tb1.Cid())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 4, sz)
|
||||
|
||||
err = nbs.DeleteBlock(ctx, tb1.Cid())
|
||||
require.NoError(t, err)
|
||||
|
||||
h, err = nbs.Has(ctx, tb1.Cid())
|
||||
require.NoError(t, err)
|
||||
require.False(t, h)
|
||||
|
||||
_, err = nbs.Get(ctx, tb1.Cid())
|
||||
fmt.Println(err)
|
||||
require.True(t, ipld.IsNotFound(err))
|
||||
|
||||
err = nbs.Put(ctx, tb1)
|
||||
require.NoError(t, err)
|
||||
|
||||
b, err := nbs.Get(ctx, tb1.Cid())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "aoeu", string(b.RawData()))
|
||||
}
|
100
blockstore/net_ws.go
Normal file
100
blockstore/net_ws.go
Normal file
@ -0,0 +1,100 @@
|
||||
package blockstore
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/libp2p/go-msgio"
|
||||
"golang.org/x/xerrors"
|
||||
)
|
||||
|
||||
type wsWrapper struct {
|
||||
wc *websocket.Conn
|
||||
|
||||
nextMsg []byte
|
||||
}
|
||||
|
||||
func (w *wsWrapper) Read(b []byte) (int, error) {
|
||||
return 0, xerrors.New("read unsupported")
|
||||
}
|
||||
|
||||
func (w *wsWrapper) ReadMsg() ([]byte, error) {
|
||||
if w.nextMsg != nil {
|
||||
nm := w.nextMsg
|
||||
w.nextMsg = nil
|
||||
return nm, nil
|
||||
}
|
||||
|
||||
mt, r, err := w.wc.NextReader()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
switch mt {
|
||||
case websocket.BinaryMessage, websocket.TextMessage:
|
||||
default:
|
||||
return nil, xerrors.Errorf("unexpected message type")
|
||||
}
|
||||
|
||||
// todo pool
|
||||
// todo limit sizes
|
||||
var mbuf bytes.Buffer
|
||||
if _, err := mbuf.ReadFrom(r); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return mbuf.Bytes(), nil
|
||||
}
|
||||
|
||||
func (w *wsWrapper) ReleaseMsg(bytes []byte) {
|
||||
// todo use a pool
|
||||
}
|
||||
|
||||
func (w *wsWrapper) NextMsgLen() (int, error) {
|
||||
if w.nextMsg != nil {
|
||||
return len(w.nextMsg), nil
|
||||
}
|
||||
|
||||
mt, msg, err := w.wc.ReadMessage()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
switch mt {
|
||||
case websocket.BinaryMessage, websocket.TextMessage:
|
||||
default:
|
||||
return 0, xerrors.Errorf("unexpected message type")
|
||||
}
|
||||
|
||||
w.nextMsg = msg
|
||||
return len(w.nextMsg), nil
|
||||
}
|
||||
|
||||
func (w *wsWrapper) Write(bytes []byte) (int, error) {
|
||||
return 0, xerrors.New("write unsupported")
|
||||
}
|
||||
|
||||
func (w *wsWrapper) WriteMsg(bytes []byte) error {
|
||||
return w.wc.WriteMessage(websocket.BinaryMessage, bytes)
|
||||
}
|
||||
|
||||
func (w *wsWrapper) Close() error {
|
||||
return w.wc.Close()
|
||||
}
|
||||
|
||||
var _ msgio.ReadWriteCloser = &wsWrapper{}
|
||||
|
||||
func wsConnToMio(wc *websocket.Conn) msgio.ReadWriteCloser {
|
||||
return &wsWrapper{
|
||||
wc: wc,
|
||||
}
|
||||
}
|
||||
|
||||
func HandleNetBstoreWS(ctx context.Context, bs Blockstore, wc *websocket.Conn) *NetworkStoreHandler {
|
||||
return HandleNetBstoreStream(ctx, bs, wsConnToMio(wc))
|
||||
}
|
||||
|
||||
func NewNetworkStoreWS(wc *websocket.Conn) *NetworkStore {
|
||||
return NewNetworkStore(wsConnToMio(wc))
|
||||
}
|
10
gen/main.go
10
gen/main.go
@ -7,6 +7,7 @@ import (
|
||||
gen "github.com/whyrusleeping/cbor-gen"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/blockstore"
|
||||
"github.com/filecoin-project/lotus/chain/exchange"
|
||||
"github.com/filecoin-project/lotus/chain/market"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
@ -127,4 +128,13 @@ func main() {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
err = gen.WriteTupleEncodersToFile("./blockstore/cbor_gen.go", "blockstore",
|
||||
blockstore.NetRpcReq{},
|
||||
blockstore.NetRpcResp{},
|
||||
blockstore.NetRpcErr{},
|
||||
)
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user