deals: Split state store

This commit is contained in:
Łukasz Magiera 2019-09-13 21:19:13 +02:00
parent 08ea758216
commit 9c276e5331
4 changed files with 55 additions and 50 deletions

View File

@ -9,7 +9,7 @@ import (
type DealState int type DealState int
const ( const (
DealUnknown = iota DealUnknown = DealState(iota)
DealRejected DealRejected
DealAccepted DealAccepted
DealStarted DealStarted

View File

@ -51,7 +51,7 @@ type Client struct {
dag dtypes.ClientDAG dag dtypes.ClientDAG
discovery *discovery.Local discovery *discovery.Local
deals StateStore deals ClientStateStore
conns map[cid.Cid]inet.Stream conns map[cid.Cid]inet.Stream
incoming chan ClientDeal incoming chan ClientDeal
@ -75,7 +75,7 @@ func NewClient(cs *store.ChainStore, h host.Host, w *wallet.Wallet, ds dtypes.Me
dag: dag, dag: dag,
discovery: discovery, discovery: discovery,
deals: StateStore{ds: namespace.Wrap(ds, datastore.NewKey("/deals/client"))}, deals: ClientStateStore{StateStore{ds: namespace.Wrap(ds, datastore.NewKey("/deals/client"))}},
conns: map[cid.Cid]inet.Stream{}, conns: map[cid.Cid]inet.Stream{},
incoming: make(chan ClientDeal, 16), incoming: make(chan ClientDeal, 16),

View File

@ -45,7 +45,7 @@ type Handler struct {
// TODO: GC // TODO: GC
dag dtypes.StagingDAG dag dtypes.StagingDAG
deals StateStore deals MinerStateStore
conns map[cid.Cid]inet.Stream conns map[cid.Cid]inet.Stream
actor address.Address actor address.Address
@ -89,7 +89,7 @@ func NewHandler(ds dtypes.MetadataDS, secst *sectorblocks.SectorBlocks, dag dtyp
actor: minerAddress, actor: minerAddress,
deals: StateStore{ds: namespace.Wrap(ds, datastore.NewKey("/deals/client"))}, deals: MinerStateStore{StateStore{ds: namespace.Wrap(ds, datastore.NewKey("/deals/client"))}},
}, nil }, nil
} }

View File

@ -19,8 +19,7 @@ func (st *StateStore) Begin(i cid.Cid, state interface{}) error {
return err return err
} }
if has { if has {
// TODO: uncomment after deals work return xerrors.Errorf("Already tracking state for %s", i)
//return xerrors.Errorf("Already tracking state for %s", i)
} }
b, err := cbor.DumpObject(state) b, err := cbor.DumpObject(state)
@ -43,48 +42,6 @@ func (st *StateStore) End(i cid.Cid) error {
return st.ds.Delete(k) return st.ds.Delete(k)
} }
// When this gets used anywhere else, migrate to reflect
func (st *StateStore) MutateMiner(i cid.Cid, mutator func(*MinerDeal) error) error {
return st.mutate(i, minerMutator(mutator))
}
func minerMutator(m func(*MinerDeal) error) func([]byte) ([]byte, error) {
return func(in []byte) ([]byte, error) {
var deal MinerDeal
err := cbor.DecodeInto(in, &deal)
if err != nil {
return nil, err
}
if err := m(&deal); err != nil {
return nil, err
}
return cbor.DumpObject(deal)
}
}
func (st *StateStore) MutateClient(i cid.Cid, mutator func(*ClientDeal) error) error {
return st.mutate(i, clientMutator(mutator))
}
func clientMutator(m func(*ClientDeal) error) func([]byte) ([]byte, error) {
return func(in []byte) ([]byte, error) {
var deal ClientDeal
err := cbor.DecodeInto(in, &deal)
if err != nil {
return nil, err
}
if err := m(&deal); err != nil {
return nil, err
}
return cbor.DumpObject(deal)
}
}
func (st *StateStore) mutate(i cid.Cid, mutator func([]byte) ([]byte, error)) error { func (st *StateStore) mutate(i cid.Cid, mutator func([]byte) ([]byte, error)) error {
k := datastore.NewKey(i.String()) k := datastore.NewKey(i.String())
has, err := st.ds.Has(k) has, err := st.ds.Has(k)
@ -108,7 +65,55 @@ func (st *StateStore) mutate(i cid.Cid, mutator func([]byte) ([]byte, error)) er
return st.ds.Put(k, mutated) return st.ds.Put(k, mutated)
} }
func (st *StateStore) ListClient() ([]ClientDeal, error) { type MinerStateStore struct {
StateStore
}
func (st *MinerStateStore) MutateMiner(i cid.Cid, mutator func(*MinerDeal) error) error {
return st.mutate(i, minerMutator(mutator))
}
func minerMutator(m func(*MinerDeal) error) func([]byte) ([]byte, error) {
return func(in []byte) ([]byte, error) {
var deal MinerDeal
err := cbor.DecodeInto(in, &deal)
if err != nil {
return nil, err
}
if err := m(&deal); err != nil {
return nil, err
}
return cbor.DumpObject(deal)
}
}
type ClientStateStore struct {
StateStore
}
func (st *ClientStateStore) MutateClient(i cid.Cid, mutator func(*ClientDeal) error) error {
return st.mutate(i, clientMutator(mutator))
}
func clientMutator(m func(*ClientDeal) error) func([]byte) ([]byte, error) {
return func(in []byte) ([]byte, error) {
var deal ClientDeal
err := cbor.DecodeInto(in, &deal)
if err != nil {
return nil, err
}
if err := m(&deal); err != nil {
return nil, err
}
return cbor.DumpObject(deal)
}
}
func (st *ClientStateStore) ListClient() ([]ClientDeal, error) {
var out []ClientDeal var out []ClientDeal
res, err := st.ds.Query(query.Query{}) res, err := st.ds.Query(query.Query{})