From 2343ebc5b81fae8b3c66f8143d7e0fa2bbff3347 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 1 Nov 2019 12:07:05 +0100 Subject: [PATCH] statestore: Use reflect for mutators --- chain/deals/client.go | 12 ++++-- chain/deals/provider.go | 6 +-- chain/deals/state_store.go | 77 -------------------------------------- lib/statestore/store.go | 52 +++++++++++++++++++++---- 4 files changed, 55 insertions(+), 92 deletions(-) delete mode 100644 chain/deals/state_store.go diff --git a/chain/deals/client.go b/chain/deals/client.go index 3ea36c2e7..e89a9886f 100644 --- a/chain/deals/client.go +++ b/chain/deals/client.go @@ -46,7 +46,7 @@ type Client struct { discovery *discovery.Local mpool full.MpoolAPI - deals ClientStateStore + deals *statestore.StateStore conns map[cid.Cid]inet.Stream incoming chan *ClientDeal @@ -72,7 +72,7 @@ func NewClient(sm *stmgr.StateManager, chain *store.ChainStore, h host.Host, w * discovery: discovery, mpool: mpool, - deals: ClientStateStore{statestore.New(namespace.Wrap(ds, datastore.NewKey("/deals/client")))}, + deals: statestore.New(namespace.Wrap(ds, datastore.NewKey("/deals/client"))), conns: map[cid.Cid]inet.Stream{}, incoming: make(chan *ClientDeal, 16), @@ -130,7 +130,7 @@ func (c *Client) onIncoming(deal *ClientDeal) { func (c *Client) onUpdated(ctx context.Context, update clientDealUpdate) { log.Infof("Deal %s updated state to %d", update.id, update.newState) var deal ClientDeal - err := c.deals.MutateClient(update.id, func(d *ClientDeal) error { + err := c.deals.Mutate(update.id, func(d *ClientDeal) error { d.State = update.newState deal = *d return nil @@ -286,7 +286,11 @@ func (c *Client) QueryAsk(ctx context.Context, p peer.ID, a address.Address) (*t } func (c *Client) List() ([]ClientDeal, error) { - return c.deals.ListClient() + var out []ClientDeal + if err := c.deals.List(&out); err != nil { + return nil, err + } + return out, nil } func (c *Client) Stop() { diff --git a/chain/deals/provider.go b/chain/deals/provider.go index aa47a45c5..02966c05d 100644 --- a/chain/deals/provider.go +++ b/chain/deals/provider.go @@ -49,7 +49,7 @@ type Provider struct { // TODO: GC dag dtypes.StagingDAG - deals MinerStateStore + deals *statestore.StateStore ds dtypes.MetadataDS conns map[cid.Cid]inet.Stream @@ -96,7 +96,7 @@ func NewProvider(ds dtypes.MetadataDS, sminer *storage.Miner, dag dtypes.Staging actor: minerAddress, - deals: MinerStateStore{statestore.New(namespace.Wrap(ds, datastore.NewKey("/deals/client")))}, + deals: statestore.New(namespace.Wrap(ds, datastore.NewKey("/deals/client"))), ds: ds, } @@ -164,7 +164,7 @@ func (p *Provider) onUpdated(ctx context.Context, update minerDealUpdate) { return } var deal MinerDeal - err := p.deals.MutateMiner(update.id, func(d *MinerDeal) error { + err := p.deals.Mutate(update.id, func(d *MinerDeal) error { d.State = update.newState if update.mut != nil { update.mut(d) diff --git a/chain/deals/state_store.go b/chain/deals/state_store.go deleted file mode 100644 index b654816c6..000000000 --- a/chain/deals/state_store.go +++ /dev/null @@ -1,77 +0,0 @@ -package deals - -import ( - "bytes" - "github.com/filecoin-project/lotus/lib/statestore" - - "github.com/filecoin-project/lotus/lib/cborrpc" - "github.com/ipfs/go-cid" -) - -type MinerStateStore struct { - *statestore.StateStore -} - -func (st *MinerStateStore) MutateMiner(i cid.Cid, mutator func(*MinerDeal) error) error { - return st.Mutate(i, minerMutator(mutator)) -} - -func minerMutator(m func(*MinerDeal) error) func([]byte) ([]byte, error) { - return func(in []byte) ([]byte, error) { - deal := new(MinerDeal) - err := cborrpc.ReadCborRPC(bytes.NewReader(in), deal) - if err != nil { - return nil, err - } - - if err := m(deal); err != nil { - return nil, err - } - - return cborrpc.Dump(deal) - } -} - -type ClientStateStore struct { - *statestore.StateStore -} - -func (st *ClientStateStore) MutateClient(i cid.Cid, mutator func(*ClientDeal) error) error { - return st.Mutate(i, clientMutator(mutator)) -} - -func clientMutator(m func(*ClientDeal) error) func([]byte) ([]byte, error) { - return func(in []byte) ([]byte, error) { - deal := new(ClientDeal) - err := cborrpc.ReadCborRPC(bytes.NewReader(in), deal) - if err != nil { - return nil, err - } - - if err := m(deal); err != nil { - return nil, err - } - - return cborrpc.Dump(deal) - } -} - -func (st *ClientStateStore) ListClient() ([]ClientDeal, error) { - var out []ClientDeal - - l, err := st.List() - if err != nil { - return nil, err - } - for _, res := range l { - var deal ClientDeal - err := cborrpc.ReadCborRPC(bytes.NewReader(res.Value), &deal) - if err != nil { - return nil, err - } - - out = append(out, deal) - } - - return out, nil -} diff --git a/lib/statestore/store.go b/lib/statestore/store.go index bf884788a..10a4de9bc 100644 --- a/lib/statestore/store.go +++ b/lib/statestore/store.go @@ -1,11 +1,13 @@ package statestore import ( + "bytes" "github.com/filecoin-project/lotus/lib/cborrpc" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/query" "golang.org/x/xerrors" + "reflect" ) type StateStore struct { @@ -46,7 +48,33 @@ func (st *StateStore) End(i cid.Cid) error { return st.ds.Delete(k) } -func (st *StateStore) Mutate(i cid.Cid, mutator func([]byte) ([]byte, error)) error { +func cborMutator(mutator interface{}) func([]byte) ([]byte, error) { + rmut := reflect.ValueOf(mutator) + + return func(in []byte) ([]byte, error) { + state := reflect.New(rmut.Type().In(0).Elem()) + + err := cborrpc.ReadCborRPC(bytes.NewReader(in), state.Interface()) + if err != nil { + return nil, err + } + + out := rmut.Call([]reflect.Value{state}) + + if err := out[0].Interface().(error); err != nil { + return nil, err + } + + return cborrpc.Dump(state.Interface()) + } +} + +// mutator func(*T) error +func (st *StateStore) Mutate(i cid.Cid, mutator interface{}) error { + return st.mutate(i, cborMutator(mutator)) +} + +func (st *StateStore) mutate(i cid.Cid, mutator func([]byte) ([]byte, error)) error { k := datastore.NewKey(i.String()) has, err := st.ds.Has(k) if err != nil { @@ -69,26 +97,34 @@ func (st *StateStore) Mutate(i cid.Cid, mutator func([]byte) ([]byte, error)) er return st.ds.Put(k, mutated) } -func (st *StateStore) List() ([]query.Entry, error) { - var out []query.Entry - +// out: *[]T +func (st *StateStore) List(out interface{}) error { res, err := st.ds.Query(query.Query{}) if err != nil { - return nil, err + return err } defer res.Close() + outT := reflect.TypeOf(out).Elem().Elem() + rout := reflect.ValueOf(out) + for { res, ok := res.NextSync() if !ok { break } if res.Error != nil { - return nil, res.Error + return res.Error } - out = append(out, res.Entry) + elem := reflect.New(outT) + err := cborrpc.ReadCborRPC(bytes.NewReader(res.Value), elem.Interface()) + if err != nil { + return err + } + + rout.Set(reflect.Append(rout.Elem(), elem.Elem())) } - return out, nil + return nil }