lotus/chain/deals/state_store.go

143 lines
2.6 KiB
Go
Raw Normal View History

2019-08-06 22:04:21 +00:00
package deals
import (
"bytes"
"github.com/filecoin-project/lotus/lib/cborrpc"
2019-08-06 22:04:21 +00:00
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
2019-09-10 14:13:24 +00:00
"github.com/ipfs/go-datastore/query"
2019-08-06 22:04:21 +00:00
"golang.org/x/xerrors"
)
type StateStore struct {
ds datastore.Datastore
}
func (st *StateStore) Begin(i cid.Cid, state interface{}) error {
2019-08-06 22:04:21 +00:00
k := datastore.NewKey(i.String())
has, err := st.ds.Has(k)
if err != nil {
return err
}
if has {
2019-09-13 19:19:13 +00:00
return xerrors.Errorf("Already tracking state for %s", i)
2019-08-06 22:04:21 +00:00
}
b, err := cborrpc.Dump(state)
2019-08-06 22:04:21 +00:00
if err != nil {
return err
}
return st.ds.Put(k, b)
}
func (st *StateStore) End(i cid.Cid) error {
k := datastore.NewKey(i.String())
has, err := st.ds.Has(k)
if err != nil {
return err
}
if !has {
return xerrors.Errorf("No state for %s", i)
}
return st.ds.Delete(k)
}
2019-09-13 19:19:13 +00:00
func (st *StateStore) mutate(i cid.Cid, mutator func([]byte) ([]byte, error)) error {
k := datastore.NewKey(i.String())
has, err := st.ds.Has(k)
if err != nil {
return err
}
if !has {
return xerrors.Errorf("No state for %s", i)
}
2019-08-06 22:04:21 +00:00
2019-09-13 19:19:13 +00:00
cur, err := st.ds.Get(k)
if err != nil {
return err
}
mutated, err := mutator(cur)
if err != nil {
return err
}
return st.ds.Put(k, mutated)
}
type MinerStateStore struct {
StateStore
}
func (st *MinerStateStore) MutateMiner(i cid.Cid, mutator func(*MinerDeal) error) error {
2019-08-06 22:04:21 +00:00
return st.mutate(i, minerMutator(mutator))
}
func minerMutator(m func(*MinerDeal) error) func([]byte) ([]byte, error) {
2019-08-06 22:04:21 +00:00
return func(in []byte) ([]byte, error) {
deal := new(MinerDeal)
err := cborrpc.ReadCborRPC(bytes.NewReader(in), deal)
2019-08-06 22:04:21 +00:00
if err != nil {
return nil, err
}
if err := m(deal); err != nil {
2019-08-06 22:04:21 +00:00
return nil, err
}
return cborrpc.Dump(deal)
2019-08-06 22:04:21 +00:00
}
}
2019-09-13 19:19:13 +00:00
type ClientStateStore struct {
StateStore
}
func (st *ClientStateStore) MutateClient(i cid.Cid, mutator func(*ClientDeal) error) error {
2019-08-06 22:04:21 +00:00
return st.mutate(i, clientMutator(mutator))
}
func clientMutator(m func(*ClientDeal) error) func([]byte) ([]byte, error) {
2019-08-06 22:04:21 +00:00
return func(in []byte) ([]byte, error) {
deal := new(ClientDeal)
err := cborrpc.ReadCborRPC(bytes.NewReader(in), deal)
2019-08-06 22:04:21 +00:00
if err != nil {
return nil, err
}
if err := m(deal); err != nil {
2019-08-06 22:04:21 +00:00
return nil, err
}
return cborrpc.Dump(deal)
2019-08-06 22:04:21 +00:00
}
}
2019-09-13 19:19:13 +00:00
func (st *ClientStateStore) ListClient() ([]ClientDeal, error) {
2019-09-10 14:13:24 +00:00
var out []ClientDeal
res, err := st.ds.Query(query.Query{})
if err != nil {
return nil, err
}
defer res.Close()
for {
res, ok := res.NextSync()
if !ok {
break
}
var deal ClientDeal
err := cborrpc.ReadCborRPC(bytes.NewReader(res.Value), &deal)
2019-09-10 14:13:24 +00:00
if err != nil {
return nil, err
}
out = append(out, deal)
}
return out, nil
}