lotus/paychmgr/store.go

203 lines
3.7 KiB
Go
Raw Normal View History

package paychmgr
2019-08-09 21:42:56 +00:00
import (
2019-11-04 19:03:11 +00:00
"bytes"
2019-09-06 22:39:47 +00:00
"errors"
"fmt"
"strings"
"sync"
2020-02-13 00:10:07 +00:00
"github.com/filecoin-project/specs-actors/actors/builtin/paych"
2019-08-09 21:42:56 +00:00
"github.com/ipfs/go-datastore"
2019-08-12 19:05:57 +00:00
"github.com/ipfs/go-datastore/namespace"
dsq "github.com/ipfs/go-datastore/query"
"golang.org/x/xerrors"
2019-09-16 16:40:26 +00:00
"github.com/filecoin-project/go-address"
cborrpc "github.com/filecoin-project/go-cbor-util"
2020-02-13 00:10:07 +00:00
"github.com/filecoin-project/lotus/node/modules/dtypes"
2019-08-09 21:42:56 +00:00
)
2019-09-06 22:39:47 +00:00
var ErrChannelNotTracked = errors.New("channel not tracked")
type Store struct {
lk sync.Mutex // TODO: this can be split per paych
2019-08-09 21:42:56 +00:00
ds datastore.Batching
}
2019-09-16 16:40:26 +00:00
func NewStore(ds dtypes.MetadataDS) *Store {
2019-08-12 19:05:57 +00:00
ds = namespace.Wrap(ds, datastore.NewKey("/paych/"))
return &Store{
2019-08-09 21:42:56 +00:00
ds: ds,
}
}
const (
DirInbound = 1
DirOutbound = 2
)
2019-09-09 13:59:07 +00:00
type VoucherInfo struct {
2020-02-13 00:10:07 +00:00
Voucher *paych.SignedVoucher
2019-09-09 13:59:07 +00:00
Proof []byte
}
type ChannelInfo struct {
Channel address.Address
Control address.Address
Target address.Address
2019-11-04 19:03:11 +00:00
Direction uint64
Vouchers []*VoucherInfo
2020-02-20 08:37:10 +00:00
NextLane int64
}
func dskeyForChannel(addr address.Address) datastore.Key {
2019-08-12 19:05:57 +00:00
return datastore.NewKey(addr.String())
}
func (ps *Store) putChannelInfo(ci *ChannelInfo) error {
k := dskeyForChannel(ci.Channel)
2019-11-05 16:56:43 +00:00
b, err := cborrpc.Dump(ci)
if err != nil {
return err
}
2019-11-05 16:56:43 +00:00
return ps.ds.Put(k, b)
}
func (ps *Store) getChannelInfo(addr address.Address) (*ChannelInfo, error) {
k := dskeyForChannel(addr)
b, err := ps.ds.Get(k)
2019-09-06 22:39:47 +00:00
if err == datastore.ErrNotFound {
return nil, ErrChannelNotTracked
}
if err != nil {
return nil, err
}
var ci ChannelInfo
2019-11-04 19:03:11 +00:00
if err := ci.UnmarshalCBOR(bytes.NewReader(b)); err != nil {
return nil, err
}
return &ci, nil
}
func (ps *Store) TrackChannel(ch *ChannelInfo) error {
ps.lk.Lock()
defer ps.lk.Unlock()
2019-09-16 17:23:48 +00:00
return ps.trackChannel(ch)
}
func (ps *Store) trackChannel(ch *ChannelInfo) error {
_, err := ps.getChannelInfo(ch.Channel)
switch err {
default:
return err
case nil:
return fmt.Errorf("already tracking channel: %s", ch.Channel)
2019-09-06 22:39:47 +00:00
case ErrChannelNotTracked:
return ps.putChannelInfo(ch)
}
}
func (ps *Store) ListChannels() ([]address.Address, error) {
ps.lk.Lock()
defer ps.lk.Unlock()
2019-08-12 19:05:57 +00:00
res, err := ps.ds.Query(dsq.Query{KeysOnly: true})
if err != nil {
return nil, err
}
defer res.Close()
var out []address.Address
for {
res, ok := res.NextSync()
if !ok {
break
}
if res.Error != nil {
return nil, err
}
addr, err := address.NewFromString(strings.TrimPrefix(res.Key, "/"))
if err != nil {
return nil, xerrors.Errorf("failed reading paych key (%q) from datastore: %w", res.Key, err)
}
out = append(out, addr)
}
return out, nil
2019-08-09 21:42:56 +00:00
}
func (ps *Store) findChan(filter func(*ChannelInfo) bool) (address.Address, error) {
res, err := ps.ds.Query(dsq.Query{})
if err != nil {
return address.Undef, err
}
defer res.Close()
var ci ChannelInfo
for {
res, ok := res.NextSync()
if !ok {
break
}
if res.Error != nil {
return address.Undef, err
}
2019-11-04 19:03:11 +00:00
if err := ci.UnmarshalCBOR(bytes.NewReader(res.Value)); err != nil {
return address.Undef, err
}
if !filter(&ci) {
continue
}
addr, err := address.NewFromString(strings.TrimPrefix(res.Key, "/"))
if err != nil {
return address.Undef, xerrors.Errorf("failed reading paych key (%q) from datastore: %w", res.Key, err)
}
return addr, nil
}
return address.Undef, nil
}
2020-02-20 08:37:10 +00:00
func (ps *Store) AllocateLane(ch address.Address) (int64, error) {
ps.lk.Lock()
defer ps.lk.Unlock()
ci, err := ps.getChannelInfo(ch)
if err != nil {
return 0, err
}
out := ci.NextLane
ci.NextLane++
return out, ps.putChannelInfo(ci)
}
2019-09-09 13:59:07 +00:00
func (ps *Store) VouchersForPaych(ch address.Address) ([]*VoucherInfo, error) {
2019-08-12 19:51:01 +00:00
ci, err := ps.getChannelInfo(ch)
if err != nil {
return nil, err
}
return ci.Vouchers, nil
2019-08-09 21:42:56 +00:00
}