Move statestore to lib

This commit is contained in:
Łukasz Magiera 2019-10-31 22:01:44 +01:00
parent ba937cf859
commit 3cde267a2a
5 changed files with 107 additions and 76 deletions

View File

@ -2,6 +2,7 @@ package deals
import ( import (
"context" "context"
"github.com/filecoin-project/lotus/lib/statestore"
"github.com/filecoin-project/lotus/node/impl/full" "github.com/filecoin-project/lotus/node/impl/full"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
@ -71,7 +72,7 @@ func NewClient(sm *stmgr.StateManager, chain *store.ChainStore, h host.Host, w *
discovery: discovery, discovery: discovery,
mpool: mpool, mpool: mpool,
deals: ClientStateStore{StateStore{ds: namespace.Wrap(ds, datastore.NewKey("/deals/client"))}}, deals: ClientStateStore{statestore.New(namespace.Wrap(ds, datastore.NewKey("/deals/client")))},
conns: map[cid.Cid]inet.Stream{}, conns: map[cid.Cid]inet.Stream{},
incoming: make(chan *ClientDeal, 16), incoming: make(chan *ClientDeal, 16),

View File

@ -2,6 +2,7 @@ package deals
import ( import (
"context" "context"
"github.com/filecoin-project/lotus/lib/statestore"
"sync" "sync"
cid "github.com/ipfs/go-cid" cid "github.com/ipfs/go-cid"
@ -95,7 +96,7 @@ func NewProvider(ds dtypes.MetadataDS, sminer *storage.Miner, dag dtypes.Staging
actor: minerAddress, actor: minerAddress,
deals: MinerStateStore{StateStore{ds: namespace.Wrap(ds, datastore.NewKey("/deals/client"))}}, deals: MinerStateStore{statestore.New(namespace.Wrap(ds, datastore.NewKey("/deals/client")))},
ds: ds, ds: ds,
} }

View File

@ -2,77 +2,18 @@ package deals
import ( import (
"bytes" "bytes"
"github.com/filecoin-project/lotus/lib/statestore"
"github.com/filecoin-project/lotus/lib/cborrpc" "github.com/filecoin-project/lotus/lib/cborrpc"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
"golang.org/x/xerrors"
) )
type StateStore struct {
ds datastore.Datastore
}
func (st *StateStore) Begin(i cid.Cid, state interface{}) error {
k := datastore.NewKey(i.String())
has, err := st.ds.Has(k)
if err != nil {
return err
}
if has {
return xerrors.Errorf("Already tracking state for %s", i)
}
b, err := cborrpc.Dump(state)
if err != nil {
return err
}
return st.ds.Put(k, b)
}
func (st *StateStore) End(i cid.Cid) error {
k := datastore.NewKey(i.String())
has, err := st.ds.Has(k)
if err != nil {
return err
}
if !has {
return xerrors.Errorf("No state for %s", i)
}
return st.ds.Delete(k)
}
func (st *StateStore) mutate(i cid.Cid, mutator func([]byte) ([]byte, error)) error {
k := datastore.NewKey(i.String())
has, err := st.ds.Has(k)
if err != nil {
return err
}
if !has {
return xerrors.Errorf("No state for %s", i)
}
cur, err := st.ds.Get(k)
if err != nil {
return err
}
mutated, err := mutator(cur)
if err != nil {
return err
}
return st.ds.Put(k, mutated)
}
type MinerStateStore struct { type MinerStateStore struct {
StateStore *statestore.StateStore
} }
func (st *MinerStateStore) MutateMiner(i cid.Cid, mutator func(*MinerDeal) error) error { func (st *MinerStateStore) MutateMiner(i cid.Cid, mutator func(*MinerDeal) error) error {
return st.mutate(i, minerMutator(mutator)) return st.Mutate(i, minerMutator(mutator))
} }
func minerMutator(m func(*MinerDeal) error) func([]byte) ([]byte, error) { func minerMutator(m func(*MinerDeal) error) func([]byte) ([]byte, error) {
@ -92,11 +33,11 @@ func minerMutator(m func(*MinerDeal) error) func([]byte) ([]byte, error) {
} }
type ClientStateStore struct { type ClientStateStore struct {
StateStore *statestore.StateStore
} }
func (st *ClientStateStore) MutateClient(i cid.Cid, mutator func(*ClientDeal) error) error { func (st *ClientStateStore) MutateClient(i cid.Cid, mutator func(*ClientDeal) error) error {
return st.mutate(i, clientMutator(mutator)) return st.Mutate(i, clientMutator(mutator))
} }
func clientMutator(m func(*ClientDeal) error) func([]byte) ([]byte, error) { func clientMutator(m func(*ClientDeal) error) func([]byte) ([]byte, error) {
@ -118,18 +59,11 @@ func clientMutator(m func(*ClientDeal) error) func([]byte) ([]byte, error) {
func (st *ClientStateStore) ListClient() ([]ClientDeal, error) { func (st *ClientStateStore) ListClient() ([]ClientDeal, error) {
var out []ClientDeal var out []ClientDeal
res, err := st.ds.Query(query.Query{}) l, err := st.List()
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer res.Close() for _, res := range l {
for {
res, ok := res.NextSync()
if !ok {
break
}
var deal ClientDeal var deal ClientDeal
err := cborrpc.ReadCborRPC(bytes.NewReader(res.Value), &deal) err := cborrpc.ReadCborRPC(bytes.NewReader(res.Value), &deal)
if err != nil { if err != nil {

95
lib/statestore/store.go Normal file
View File

@ -0,0 +1,95 @@
package statestore
import (
"github.com/filecoin-project/lotus/lib/cborrpc"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
"golang.org/x/xerrors"
)
type StateStore struct {
ds datastore.Datastore
}
func New(ds datastore.Datastore) *StateStore {
return &StateStore{ds:ds}
}
func (st *StateStore) Begin(i cid.Cid, state interface{}) error {
k := datastore.NewKey(i.String())
has, err := st.ds.Has(k)
if err != nil {
return err
}
if has {
return xerrors.Errorf("Already tracking state for %s", i)
}
b, err := cborrpc.Dump(state)
if err != nil {
return err
}
return st.ds.Put(k, b)
}
func (st *StateStore) End(i cid.Cid) error {
k := datastore.NewKey(i.String())
has, err := st.ds.Has(k)
if err != nil {
return err
}
if !has {
return xerrors.Errorf("No state for %s", i)
}
return st.ds.Delete(k)
}
func (st *StateStore) Mutate(i cid.Cid, mutator func([]byte) ([]byte, error)) error {
k := datastore.NewKey(i.String())
has, err := st.ds.Has(k)
if err != nil {
return err
}
if !has {
return xerrors.Errorf("No state for %s", i)
}
cur, err := st.ds.Get(k)
if err != nil {
return err
}
mutated, err := mutator(cur)
if err != nil {
return err
}
return st.ds.Put(k, mutated)
}
func (st *StateStore) List() ([]query.Entry, error) {
var out []query.Entry
res, err := st.ds.Query(query.Query{})
if err != nil {
return nil, err
}
defer res.Close()
for {
res, ok := res.NextSync()
if !ok {
break
}
if res.Error != nil {
return nil, res.Error
}
out = append(out, res.Entry)
}
return out, nil
}

View File

@ -47,7 +47,7 @@ func (sm *StorageMinerAPI) StoreGarbageData(ctx context.Context) error {
return return
} }
if err := sm.Sectors.SealSector(ctx, sectorId); err != nil { if err := sm.Miner.SealSector(ctx, sectorId); err != nil {
log.Error(err) log.Error(err)
return return
} }