lotus/lib/statestore/store.go

166 lines
3.1 KiB
Go
Raw Normal View History

2019-10-31 21:01:44 +00:00
package statestore
import (
2019-11-01 11:07:05 +00:00
"bytes"
2019-11-01 11:14:32 +00:00
"fmt"
2019-11-01 13:58:48 +00:00
"reflect"
2019-10-31 21:01:44 +00:00
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
2019-11-06 19:44:28 +00:00
cbg "github.com/whyrusleeping/cbor-gen"
"go.uber.org/multierr"
2019-10-31 21:01:44 +00:00
"golang.org/x/xerrors"
2019-11-01 13:58:48 +00:00
2019-11-07 14:11:39 +00:00
"github.com/filecoin-project/lotus/lib/cborutil"
2019-10-31 21:01:44 +00:00
)
type StateStore struct {
ds datastore.Datastore
}
func New(ds datastore.Datastore) *StateStore {
return &StateStore{ds: ds}
2019-10-31 21:01:44 +00:00
}
2019-11-01 11:14:32 +00:00
func toKey(k interface{}) datastore.Key {
switch t := k.(type) {
case uint64:
return datastore.NewKey(fmt.Sprint(t))
case fmt.Stringer:
return datastore.NewKey(t.String())
default:
panic("unexpected key type")
}
}
func (st *StateStore) Begin(i interface{}, state interface{}) error {
k := toKey(i)
2019-10-31 21:01:44 +00:00
has, err := st.ds.Has(k)
if err != nil {
return err
}
if has {
2019-11-07 14:45:53 +00:00
return xerrors.Errorf("already tracking state for %v", i)
2019-10-31 21:01:44 +00:00
}
2019-11-07 14:11:39 +00:00
b, err := cborutil.Dump(state)
2019-10-31 21:01:44 +00:00
if err != nil {
return err
}
return st.ds.Put(k, b)
}
2019-11-01 11:14:32 +00:00
func (st *StateStore) End(i interface{}) error {
k := toKey(i)
2019-10-31 21:01:44 +00:00
has, err := st.ds.Has(k)
if err != nil {
return err
}
if !has {
return xerrors.Errorf("No state for %s", i)
}
return st.ds.Delete(k)
}
2019-11-01 11:07:05 +00:00
func cborMutator(mutator interface{}) func([]byte) ([]byte, error) {
rmut := reflect.ValueOf(mutator)
return func(in []byte) ([]byte, error) {
state := reflect.New(rmut.Type().In(0).Elem())
2019-11-07 14:11:39 +00:00
err := cborutil.ReadCborRPC(bytes.NewReader(in), state.Interface())
2019-11-01 11:07:05 +00:00
if err != nil {
return nil, err
}
out := rmut.Call([]reflect.Value{state})
2019-11-01 22:44:55 +00:00
if err := out[0].Interface(); err != nil {
return nil, err.(error)
2019-11-01 11:07:05 +00:00
}
2019-11-07 14:11:39 +00:00
return cborutil.Dump(state.Interface())
2019-11-01 11:07:05 +00:00
}
}
// mutator func(*T) error
2019-11-01 13:58:48 +00:00
func (st *StateStore) Mutate(i interface{}, mutator interface{}) error {
2019-11-01 11:07:05 +00:00
return st.mutate(i, cborMutator(mutator))
}
2019-11-01 11:14:32 +00:00
func (st *StateStore) mutate(i interface{}, mutator func([]byte) ([]byte, error)) error {
k := toKey(i)
2019-10-31 21:01:44 +00:00
has, err := st.ds.Has(k)
if err != nil {
return err
}
if !has {
return xerrors.Errorf("No state for %s", i)
}
cur, err := st.ds.Get(k)
if err != nil {
return err
}
mutated, err := mutator(cur)
if err != nil {
return err
}
return st.ds.Put(k, mutated)
}
2019-11-07 14:45:53 +00:00
func (st *StateStore) Has(i interface{}) (bool, error) {
return st.ds.Has(toKey(i))
}
2019-11-06 19:44:28 +00:00
func (st *StateStore) Get(i interface{}, out cbg.CBORUnmarshaler) error {
k := toKey(i)
val, err := st.ds.Get(k)
if err != nil {
if xerrors.Is(err, datastore.ErrNotFound) {
2019-11-07 14:45:53 +00:00
return xerrors.Errorf("No state for %s: %w", i, err)
2019-11-06 19:44:28 +00:00
}
return err
}
return out.UnmarshalCBOR(bytes.NewReader(val))
}
2019-11-01 11:07:05 +00:00
// out: *[]T
func (st *StateStore) List(out interface{}) error {
2019-10-31 21:01:44 +00:00
res, err := st.ds.Query(query.Query{})
if err != nil {
2019-11-01 11:07:05 +00:00
return err
2019-10-31 21:01:44 +00:00
}
defer res.Close()
2019-11-01 11:07:05 +00:00
outT := reflect.TypeOf(out).Elem().Elem()
rout := reflect.ValueOf(out)
var errs error
2019-10-31 21:01:44 +00:00
for {
res, ok := res.NextSync()
if !ok {
break
}
if res.Error != nil {
2019-11-01 11:07:05 +00:00
return res.Error
}
elem := reflect.New(outT)
2019-11-07 14:11:39 +00:00
err := cborutil.ReadCborRPC(bytes.NewReader(res.Value), elem.Interface())
2019-11-01 11:07:05 +00:00
if err != nil {
errs = multierr.Append(errs, xerrors.Errorf("decoding state for key '%s': %w", res.Key, err))
continue
2019-10-31 21:01:44 +00:00
}
2019-11-05 18:19:57 +00:00
rout.Elem().Set(reflect.Append(rout.Elem(), elem.Elem()))
2019-10-31 21:01:44 +00:00
}
2019-11-01 11:07:05 +00:00
return nil
2019-10-31 21:01:44 +00:00
}