statestore: Use reflect for mutators

This commit is contained in:
Łukasz Magiera 2019-11-01 12:07:05 +01:00
parent 1583cf2593
commit 2343ebc5b8
4 changed files with 55 additions and 92 deletions

View File

@ -46,7 +46,7 @@ type Client struct {
discovery *discovery.Local discovery *discovery.Local
mpool full.MpoolAPI mpool full.MpoolAPI
deals ClientStateStore deals *statestore.StateStore
conns map[cid.Cid]inet.Stream conns map[cid.Cid]inet.Stream
incoming chan *ClientDeal incoming chan *ClientDeal
@ -72,7 +72,7 @@ func NewClient(sm *stmgr.StateManager, chain *store.ChainStore, h host.Host, w *
discovery: discovery, discovery: discovery,
mpool: mpool, mpool: mpool,
deals: ClientStateStore{statestore.New(namespace.Wrap(ds, datastore.NewKey("/deals/client")))}, deals: statestore.New(namespace.Wrap(ds, datastore.NewKey("/deals/client"))),
conns: map[cid.Cid]inet.Stream{}, conns: map[cid.Cid]inet.Stream{},
incoming: make(chan *ClientDeal, 16), incoming: make(chan *ClientDeal, 16),
@ -130,7 +130,7 @@ func (c *Client) onIncoming(deal *ClientDeal) {
func (c *Client) onUpdated(ctx context.Context, update clientDealUpdate) { func (c *Client) onUpdated(ctx context.Context, update clientDealUpdate) {
log.Infof("Deal %s updated state to %d", update.id, update.newState) log.Infof("Deal %s updated state to %d", update.id, update.newState)
var deal ClientDeal var deal ClientDeal
err := c.deals.MutateClient(update.id, func(d *ClientDeal) error { err := c.deals.Mutate(update.id, func(d *ClientDeal) error {
d.State = update.newState d.State = update.newState
deal = *d deal = *d
return nil return nil
@ -286,7 +286,11 @@ func (c *Client) QueryAsk(ctx context.Context, p peer.ID, a address.Address) (*t
} }
func (c *Client) List() ([]ClientDeal, error) { func (c *Client) List() ([]ClientDeal, error) {
return c.deals.ListClient() var out []ClientDeal
if err := c.deals.List(&out); err != nil {
return nil, err
}
return out, nil
} }
func (c *Client) Stop() { func (c *Client) Stop() {

View File

@ -49,7 +49,7 @@ type Provider struct {
// TODO: GC // TODO: GC
dag dtypes.StagingDAG dag dtypes.StagingDAG
deals MinerStateStore deals *statestore.StateStore
ds dtypes.MetadataDS ds dtypes.MetadataDS
conns map[cid.Cid]inet.Stream conns map[cid.Cid]inet.Stream
@ -96,7 +96,7 @@ func NewProvider(ds dtypes.MetadataDS, sminer *storage.Miner, dag dtypes.Staging
actor: minerAddress, actor: minerAddress,
deals: MinerStateStore{statestore.New(namespace.Wrap(ds, datastore.NewKey("/deals/client")))}, deals: statestore.New(namespace.Wrap(ds, datastore.NewKey("/deals/client"))),
ds: ds, ds: ds,
} }
@ -164,7 +164,7 @@ func (p *Provider) onUpdated(ctx context.Context, update minerDealUpdate) {
return return
} }
var deal MinerDeal var deal MinerDeal
err := p.deals.MutateMiner(update.id, func(d *MinerDeal) error { err := p.deals.Mutate(update.id, func(d *MinerDeal) error {
d.State = update.newState d.State = update.newState
if update.mut != nil { if update.mut != nil {
update.mut(d) update.mut(d)

View File

@ -1,77 +0,0 @@
package deals
import (
"bytes"
"github.com/filecoin-project/lotus/lib/statestore"
"github.com/filecoin-project/lotus/lib/cborrpc"
"github.com/ipfs/go-cid"
)
type MinerStateStore struct {
*statestore.StateStore
}
func (st *MinerStateStore) MutateMiner(i cid.Cid, mutator func(*MinerDeal) error) error {
return st.Mutate(i, minerMutator(mutator))
}
func minerMutator(m func(*MinerDeal) error) func([]byte) ([]byte, error) {
return func(in []byte) ([]byte, error) {
deal := new(MinerDeal)
err := cborrpc.ReadCborRPC(bytes.NewReader(in), deal)
if err != nil {
return nil, err
}
if err := m(deal); err != nil {
return nil, err
}
return cborrpc.Dump(deal)
}
}
type ClientStateStore struct {
*statestore.StateStore
}
func (st *ClientStateStore) MutateClient(i cid.Cid, mutator func(*ClientDeal) error) error {
return st.Mutate(i, clientMutator(mutator))
}
func clientMutator(m func(*ClientDeal) error) func([]byte) ([]byte, error) {
return func(in []byte) ([]byte, error) {
deal := new(ClientDeal)
err := cborrpc.ReadCborRPC(bytes.NewReader(in), deal)
if err != nil {
return nil, err
}
if err := m(deal); err != nil {
return nil, err
}
return cborrpc.Dump(deal)
}
}
func (st *ClientStateStore) ListClient() ([]ClientDeal, error) {
var out []ClientDeal
l, err := st.List()
if err != nil {
return nil, err
}
for _, res := range l {
var deal ClientDeal
err := cborrpc.ReadCborRPC(bytes.NewReader(res.Value), &deal)
if err != nil {
return nil, err
}
out = append(out, deal)
}
return out, nil
}

View File

@ -1,11 +1,13 @@
package statestore package statestore
import ( import (
"bytes"
"github.com/filecoin-project/lotus/lib/cborrpc" "github.com/filecoin-project/lotus/lib/cborrpc"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query" "github.com/ipfs/go-datastore/query"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"reflect"
) )
type StateStore struct { type StateStore struct {
@ -46,7 +48,33 @@ func (st *StateStore) End(i cid.Cid) error {
return st.ds.Delete(k) return st.ds.Delete(k)
} }
func (st *StateStore) Mutate(i cid.Cid, mutator func([]byte) ([]byte, error)) error { func cborMutator(mutator interface{}) func([]byte) ([]byte, error) {
rmut := reflect.ValueOf(mutator)
return func(in []byte) ([]byte, error) {
state := reflect.New(rmut.Type().In(0).Elem())
err := cborrpc.ReadCborRPC(bytes.NewReader(in), state.Interface())
if err != nil {
return nil, err
}
out := rmut.Call([]reflect.Value{state})
if err := out[0].Interface().(error); err != nil {
return nil, err
}
return cborrpc.Dump(state.Interface())
}
}
// mutator func(*T) error
func (st *StateStore) Mutate(i cid.Cid, mutator interface{}) error {
return st.mutate(i, cborMutator(mutator))
}
func (st *StateStore) mutate(i cid.Cid, mutator func([]byte) ([]byte, error)) error {
k := datastore.NewKey(i.String()) k := datastore.NewKey(i.String())
has, err := st.ds.Has(k) has, err := st.ds.Has(k)
if err != nil { if err != nil {
@ -69,26 +97,34 @@ func (st *StateStore) Mutate(i cid.Cid, mutator func([]byte) ([]byte, error)) er
return st.ds.Put(k, mutated) return st.ds.Put(k, mutated)
} }
func (st *StateStore) List() ([]query.Entry, error) { // out: *[]T
var out []query.Entry func (st *StateStore) List(out interface{}) error {
res, err := st.ds.Query(query.Query{}) res, err := st.ds.Query(query.Query{})
if err != nil { if err != nil {
return nil, err return err
} }
defer res.Close() defer res.Close()
outT := reflect.TypeOf(out).Elem().Elem()
rout := reflect.ValueOf(out)
for { for {
res, ok := res.NextSync() res, ok := res.NextSync()
if !ok { if !ok {
break break
} }
if res.Error != nil { if res.Error != nil {
return nil, res.Error return res.Error
} }
out = append(out, res.Entry) elem := reflect.New(outT)
err := cborrpc.ReadCborRPC(bytes.NewReader(res.Value), elem.Interface())
if err != nil {
return err
}
rout.Set(reflect.Append(rout.Elem(), elem.Elem()))
} }
return out, nil return nil
} }