463 lines
8.5 KiB
Go
463 lines
8.5 KiB
Go
package blockstore
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/binary"
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
blocks "github.com/ipfs/go-block-format"
|
|
"github.com/ipfs/go-cid"
|
|
ipld "github.com/ipfs/go-ipld-format"
|
|
"github.com/libp2p/go-msgio"
|
|
cbg "github.com/whyrusleeping/cbor-gen"
|
|
"golang.org/x/xerrors"
|
|
)
|
|
|
|
type NetRPCReqType byte
|
|
|
|
const (
|
|
NRpcHas NetRPCReqType = iota
|
|
NRpcGet NetRPCReqType = iota
|
|
NRpcGetSize NetRPCReqType = iota
|
|
NRpcPut NetRPCReqType = iota
|
|
NRpcDelete NetRPCReqType = iota
|
|
NRpcList NetRPCReqType = iota
|
|
|
|
// todo cancel req
|
|
)
|
|
|
|
type NetRPCRespType byte
|
|
|
|
const (
|
|
NRpcOK NetRPCRespType = iota
|
|
NRpcErr NetRPCRespType = iota
|
|
NRpcMore NetRPCRespType = iota
|
|
)
|
|
|
|
type NetRPCErrType byte
|
|
|
|
const (
|
|
NRpcErrGeneric NetRPCErrType = iota
|
|
NRpcErrNotFound NetRPCErrType = iota
|
|
)
|
|
|
|
type NetRpcReq struct {
|
|
Type NetRPCReqType
|
|
ID uint64
|
|
|
|
Cid []cid.Cid // todo maxsize?
|
|
Data [][]byte // todo maxsize?
|
|
}
|
|
|
|
type NetRpcResp struct {
|
|
Type NetRPCRespType
|
|
ID uint64
|
|
|
|
// error or cids in allkeys
|
|
Data []byte // todo maxsize?
|
|
|
|
next <-chan NetRpcResp
|
|
}
|
|
|
|
type NetRpcErr struct {
|
|
Type NetRPCErrType
|
|
|
|
Msg string
|
|
|
|
// in case of NRpcErrNotFound
|
|
Cid *cid.Cid
|
|
}
|
|
|
|
type NetworkStore struct {
|
|
// note: writer is thread-safe
|
|
msgStream msgio.ReadWriteCloser
|
|
|
|
// atomic
|
|
reqCount uint64
|
|
|
|
respLk sync.Mutex
|
|
|
|
// respMap is nil after store closes
|
|
respMap map[uint64]chan<- NetRpcResp
|
|
|
|
closing chan struct{}
|
|
closed chan struct{}
|
|
|
|
closeLk sync.Mutex
|
|
onClose func()
|
|
}
|
|
|
|
func NewNetworkStore(mss msgio.ReadWriteCloser) *NetworkStore {
|
|
ns := &NetworkStore{
|
|
msgStream: mss,
|
|
|
|
respMap: map[uint64]chan<- NetRpcResp{},
|
|
|
|
closing: make(chan struct{}),
|
|
closed: make(chan struct{}),
|
|
}
|
|
|
|
go ns.receive()
|
|
|
|
return ns
|
|
}
|
|
|
|
func (n *NetworkStore) shutdown(msg string) {
|
|
if err := n.msgStream.Close(); err != nil {
|
|
log.Errorw("closing netstore msg stream", "error", err)
|
|
}
|
|
|
|
nerr := NetRpcErr{
|
|
Type: NRpcErrGeneric,
|
|
Msg: msg,
|
|
Cid: nil,
|
|
}
|
|
|
|
var errb bytes.Buffer
|
|
if err := nerr.MarshalCBOR(&errb); err != nil {
|
|
log.Errorw("netstore shutdown: error marshaling error", "err", err)
|
|
}
|
|
|
|
n.respLk.Lock()
|
|
for id, resps := range n.respMap {
|
|
resps <- NetRpcResp{
|
|
Type: NRpcErr,
|
|
ID: id,
|
|
Data: errb.Bytes(),
|
|
}
|
|
}
|
|
|
|
n.respMap = nil
|
|
|
|
n.respLk.Unlock()
|
|
}
|
|
|
|
func (n *NetworkStore) OnClose(cb func()) {
|
|
n.closeLk.Lock()
|
|
defer n.closeLk.Unlock()
|
|
|
|
select {
|
|
case <-n.closed:
|
|
cb()
|
|
default:
|
|
n.onClose = cb
|
|
}
|
|
}
|
|
|
|
func (n *NetworkStore) receive() {
|
|
defer func() {
|
|
n.closeLk.Lock()
|
|
defer n.closeLk.Unlock()
|
|
|
|
close(n.closed)
|
|
if n.onClose != nil {
|
|
n.onClose()
|
|
}
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case <-n.closing:
|
|
n.shutdown("netstore stopping")
|
|
return
|
|
default:
|
|
}
|
|
|
|
msg, err := n.msgStream.ReadMsg()
|
|
if err != nil {
|
|
n.shutdown(fmt.Sprintf("netstore ReadMsg: %s", err))
|
|
return
|
|
}
|
|
|
|
var resp NetRpcResp
|
|
if err := resp.UnmarshalCBOR(bytes.NewReader(msg)); err != nil {
|
|
n.shutdown(fmt.Sprintf("unmarshaling netstore response: %s", err))
|
|
return
|
|
}
|
|
|
|
n.msgStream.ReleaseMsg(msg)
|
|
|
|
n.respLk.Lock()
|
|
if ch, ok := n.respMap[resp.ID]; ok {
|
|
if resp.Type == NRpcMore {
|
|
nch := make(chan NetRpcResp, 1)
|
|
resp.next = nch
|
|
n.respMap[resp.ID] = nch
|
|
} else {
|
|
delete(n.respMap, resp.ID)
|
|
}
|
|
|
|
ch <- resp
|
|
}
|
|
n.respLk.Unlock()
|
|
}
|
|
}
|
|
|
|
func (n *NetworkStore) sendRpc(rt NetRPCReqType, cids []cid.Cid, data [][]byte) (uint64, <-chan NetRpcResp, error) {
|
|
rid := atomic.AddUint64(&n.reqCount, 1)
|
|
|
|
respCh := make(chan NetRpcResp, 1) // todo pool?
|
|
|
|
n.respLk.Lock()
|
|
if n.respMap == nil {
|
|
return 0, nil, xerrors.Errorf("netstore closed")
|
|
}
|
|
n.respMap[rid] = respCh
|
|
n.respLk.Unlock()
|
|
|
|
req := NetRpcReq{
|
|
Type: rt,
|
|
ID: rid,
|
|
Cid: cids,
|
|
Data: data,
|
|
}
|
|
|
|
var rbuf bytes.Buffer // todo buffer pool
|
|
if err := req.MarshalCBOR(&rbuf); err != nil {
|
|
n.respLk.Lock()
|
|
if n.respMap == nil {
|
|
return 0, nil, xerrors.Errorf("netstore closed")
|
|
}
|
|
delete(n.respMap, rid)
|
|
n.respLk.Unlock()
|
|
|
|
return 0, nil, err
|
|
}
|
|
|
|
if err := n.msgStream.WriteMsg(rbuf.Bytes()); err != nil {
|
|
n.respLk.Lock()
|
|
if n.respMap == nil {
|
|
return 0, nil, xerrors.Errorf("netstore closed")
|
|
}
|
|
delete(n.respMap, rid)
|
|
n.respLk.Unlock()
|
|
|
|
return 0, nil, err
|
|
}
|
|
|
|
return rid, respCh, nil
|
|
}
|
|
|
|
func (n *NetworkStore) waitResp(ctx context.Context, rch <-chan NetRpcResp, rid uint64) (NetRpcResp, error) {
|
|
select {
|
|
case resp := <-rch:
|
|
if resp.Type == NRpcErr {
|
|
var e NetRpcErr
|
|
if err := e.UnmarshalCBOR(bytes.NewReader(resp.Data)); err != nil {
|
|
return NetRpcResp{}, xerrors.Errorf("unmarshaling error data: %w", err)
|
|
}
|
|
|
|
var err error
|
|
switch e.Type {
|
|
case NRpcErrNotFound:
|
|
if e.Cid != nil {
|
|
err = ipld.ErrNotFound{
|
|
Cid: *e.Cid,
|
|
}
|
|
} else {
|
|
err = xerrors.Errorf("block not found, but cid was null")
|
|
}
|
|
default:
|
|
err = xerrors.Errorf("unknown error type")
|
|
case NRpcErrGeneric:
|
|
err = xerrors.Errorf("generic error")
|
|
}
|
|
|
|
return NetRpcResp{}, xerrors.Errorf("netstore error response: %s (%w)", e.Msg, err)
|
|
}
|
|
|
|
return resp, nil
|
|
case <-ctx.Done():
|
|
// todo send cancel req
|
|
|
|
n.respLk.Lock()
|
|
if n.respMap != nil {
|
|
delete(n.respMap, rid)
|
|
}
|
|
n.respLk.Unlock()
|
|
|
|
return NetRpcResp{}, ctx.Err()
|
|
}
|
|
}
|
|
|
|
func (n *NetworkStore) Has(ctx context.Context, c cid.Cid) (bool, error) {
|
|
req, rch, err := n.sendRpc(NRpcHas, []cid.Cid{c}, nil)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
resp, err := n.waitResp(ctx, rch, req)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
if len(resp.Data) != 1 {
|
|
return false, xerrors.Errorf("expected reposnse length to be 1 byte")
|
|
}
|
|
switch resp.Data[0] {
|
|
case cbg.CborBoolTrue[0]:
|
|
return true, nil
|
|
case cbg.CborBoolFalse[0]:
|
|
return false, nil
|
|
default:
|
|
return false, xerrors.Errorf("has: bad response: %x", resp.Data[0])
|
|
}
|
|
}
|
|
|
|
func (n *NetworkStore) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) {
|
|
req, rch, err := n.sendRpc(NRpcGet, []cid.Cid{c}, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
resp, err := n.waitResp(ctx, rch, req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return blocks.NewBlockWithCid(resp.Data, c)
|
|
}
|
|
|
|
func (n *NetworkStore) View(ctx context.Context, c cid.Cid, callback func([]byte) error) error {
|
|
req, rch, err := n.sendRpc(NRpcGet, []cid.Cid{c}, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
resp, err := n.waitResp(ctx, rch, req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return callback(resp.Data) // todo return buf to pool
|
|
}
|
|
|
|
func (n *NetworkStore) GetSize(ctx context.Context, c cid.Cid) (int, error) {
|
|
req, rch, err := n.sendRpc(NRpcGetSize, []cid.Cid{c}, nil)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
resp, err := n.waitResp(ctx, rch, req)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
if len(resp.Data) != 4 {
|
|
return 0, xerrors.Errorf("expected getsize response to be 4 bytes, was %d", resp.Data)
|
|
}
|
|
|
|
return int(binary.LittleEndian.Uint32(resp.Data)), nil
|
|
}
|
|
|
|
func (n *NetworkStore) Put(ctx context.Context, block blocks.Block) error {
|
|
return n.PutMany(ctx, []blocks.Block{block})
|
|
}
|
|
|
|
func (n *NetworkStore) PutMany(ctx context.Context, blocks []blocks.Block) error {
|
|
// todo pool
|
|
cids := make([]cid.Cid, len(blocks))
|
|
blkDatas := make([][]byte, len(blocks))
|
|
for i, block := range blocks {
|
|
cids[i] = block.Cid()
|
|
blkDatas[i] = block.RawData()
|
|
}
|
|
|
|
req, rch, err := n.sendRpc(NRpcPut, cids, blkDatas)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = n.waitResp(ctx, rch, req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (n *NetworkStore) DeleteBlock(ctx context.Context, c cid.Cid) error {
|
|
return n.DeleteMany(ctx, []cid.Cid{c})
|
|
}
|
|
|
|
func (n *NetworkStore) DeleteMany(ctx context.Context, cids []cid.Cid) error {
|
|
req, rch, err := n.sendRpc(NRpcDelete, cids, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = n.waitResp(ctx, rch, req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (n *NetworkStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
|
|
req, rch, err := n.sendRpc(NRpcList, nil, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
outCh := make(chan cid.Cid, 16)
|
|
|
|
go func() {
|
|
defer close(outCh)
|
|
// todo defer cancel request
|
|
|
|
for {
|
|
if rch == nil {
|
|
return
|
|
}
|
|
|
|
resp, err := n.waitResp(ctx, rch, req)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
switch resp.Type {
|
|
case NRpcOK, NRpcMore:
|
|
c, err := cid.Cast(resp.Data)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
// todo propagate backpressure
|
|
select {
|
|
case outCh <- c:
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
|
|
rch = resp.next
|
|
default:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
return outCh, err
|
|
}
|
|
|
|
func (n *NetworkStore) HashOnRead(enabled bool) {
|
|
// todo
|
|
return
|
|
}
|
|
|
|
func (n *NetworkStore) Stop(ctx context.Context) error {
|
|
close(n.closing)
|
|
|
|
select {
|
|
case <-n.closed:
|
|
return nil
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
}
|
|
|
|
var _ Blockstore = &NetworkStore{}
|