lotus/paych/store.go

249 lines
4.5 KiB
Go
Raw Normal View History

2019-08-09 21:42:56 +00:00
package paych
import (
2019-09-09 13:59:07 +00:00
"bytes"
2019-09-06 22:39:47 +00:00
"errors"
"fmt"
"strings"
"sync"
2019-08-09 21:42:56 +00:00
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/types"
"github.com/ipfs/go-datastore"
2019-08-12 19:05:57 +00:00
"github.com/ipfs/go-datastore/namespace"
dsq "github.com/ipfs/go-datastore/query"
cbor "github.com/ipfs/go-ipld-cbor"
"golang.org/x/xerrors"
2019-08-09 21:42:56 +00:00
)
2019-09-06 22:39:47 +00:00
var ErrChannelNotTracked = errors.New("channel not tracked")
func init() {
2019-09-09 13:59:07 +00:00
cbor.RegisterCborType(VoucherInfo{})
cbor.RegisterCborType(ChannelInfo{})
}
type Store struct {
lk sync.Mutex // TODO: this can be split per paych
2019-08-09 21:42:56 +00:00
ds datastore.Batching
}
func NewStore(ds datastore.Batching) *Store {
2019-08-12 19:05:57 +00:00
ds = namespace.Wrap(ds, datastore.NewKey("/paych/"))
return &Store{
2019-08-09 21:42:56 +00:00
ds: ds,
}
}
const (
DirInbound = 1
DirOutbound = 2
)
2019-09-09 13:59:07 +00:00
type VoucherInfo struct {
Voucher *types.SignedVoucher
Proof []byte
}
type ChannelInfo struct {
Channel address.Address
Control address.Address
Target address.Address
Direction int
Vouchers []*VoucherInfo
NextLane uint64
}
func dskeyForChannel(addr address.Address) datastore.Key {
2019-08-12 19:05:57 +00:00
return datastore.NewKey(addr.String())
}
func (ps *Store) putChannelInfo(ci *ChannelInfo) error {
k := dskeyForChannel(ci.Channel)
b, err := cbor.DumpObject(ci)
if err != nil {
return err
}
return ps.ds.Put(k, b)
}
func (ps *Store) getChannelInfo(addr address.Address) (*ChannelInfo, error) {
k := dskeyForChannel(addr)
b, err := ps.ds.Get(k)
2019-09-06 22:39:47 +00:00
if err == datastore.ErrNotFound {
return nil, ErrChannelNotTracked
}
if err != nil {
return nil, err
}
var ci ChannelInfo
if err := cbor.DecodeInto(b, &ci); err != nil {
return nil, err
}
return &ci, nil
}
func (ps *Store) TrackChannel(ch *ChannelInfo) error {
ps.lk.Lock()
defer ps.lk.Unlock()
_, err := ps.getChannelInfo(ch.Channel)
switch err {
default:
return err
case nil:
return fmt.Errorf("already tracking channel: %s", ch.Channel)
2019-09-06 22:39:47 +00:00
case ErrChannelNotTracked:
return ps.putChannelInfo(ch)
}
}
func (ps *Store) ListChannels() ([]address.Address, error) {
ps.lk.Lock()
defer ps.lk.Unlock()
2019-08-12 19:05:57 +00:00
res, err := ps.ds.Query(dsq.Query{KeysOnly: true})
if err != nil {
return nil, err
}
defer res.Close()
var out []address.Address
for {
res, ok := res.NextSync()
if !ok {
break
}
if res.Error != nil {
return nil, err
}
addr, err := address.NewFromString(strings.TrimPrefix(res.Key, "/"))
if err != nil {
return nil, xerrors.Errorf("failed reading paych key (%q) from datastore: %w", res.Key, err)
}
out = append(out, addr)
}
return out, nil
2019-08-09 21:42:56 +00:00
}
func (ps *Store) findChan(filter func(*ChannelInfo) bool) (address.Address, error) {
ps.lk.Lock()
defer ps.lk.Unlock()
res, err := ps.ds.Query(dsq.Query{})
if err != nil {
return address.Undef, err
}
defer res.Close()
var ci ChannelInfo
for {
res, ok := res.NextSync()
if !ok {
break
}
if res.Error != nil {
return address.Undef, err
}
if err := cbor.DecodeInto(res.Value, &ci); err != nil {
return address.Undef, err
}
if !filter(&ci) {
continue
}
addr, err := address.NewFromString(strings.TrimPrefix(res.Key, "/"))
if err != nil {
return address.Undef, xerrors.Errorf("failed reading paych key (%q) from datastore: %w", res.Key, err)
}
return addr, nil
}
return address.Undef, nil
}
2019-09-09 13:59:07 +00:00
func (ps *Store) AddVoucher(ch address.Address, sv *types.SignedVoucher, proof []byte) error {
ps.lk.Lock()
defer ps.lk.Unlock()
2019-08-12 19:51:01 +00:00
ci, err := ps.getChannelInfo(ch)
if err != nil {
return err
}
2019-09-09 13:59:07 +00:00
// look for duplicates
for i, v := range ci.Vouchers {
2019-09-09 19:21:37 +00:00
if !sv.Equals(v.Voucher) {
2019-09-09 13:59:07 +00:00
continue
}
if v.Proof != nil {
if !bytes.Equal(v.Proof, proof) {
2019-09-09 19:21:37 +00:00
log.Warnf("AddVoucher: multiple proofs for single voucher, storing both")
2019-09-09 13:59:07 +00:00
break
}
2019-09-09 19:21:37 +00:00
log.Warnf("AddVoucher: voucher re-added with matching proof")
2019-09-09 13:59:07 +00:00
return nil
}
log.Warnf("AddVoucher: adding proof to stored voucher")
ci.Vouchers[i] = &VoucherInfo{
Voucher: v.Voucher,
Proof: proof,
}
return ps.putChannelInfo(ci)
}
ci.Vouchers = append(ci.Vouchers, &VoucherInfo{
Voucher: sv,
Proof: proof,
})
2019-08-12 19:51:01 +00:00
if ci.NextLane <= sv.Lane {
ci.NextLane = sv.Lane + 1
}
2019-08-12 19:51:01 +00:00
return ps.putChannelInfo(ci)
2019-08-09 21:42:56 +00:00
}
func (ps *Store) AllocateLane(ch address.Address) (uint64, error) {
ps.lk.Lock()
defer ps.lk.Unlock()
ci, err := ps.getChannelInfo(ch)
if err != nil {
return 0, err
}
out := ci.NextLane
ci.NextLane++
return out, ps.putChannelInfo(ci)
}
2019-09-09 13:59:07 +00:00
func (ps *Store) VouchersForPaych(ch address.Address) ([]*VoucherInfo, error) {
2019-08-12 19:51:01 +00:00
ci, err := ps.getChannelInfo(ch)
if err != nil {
return nil, err
}
return ci.Vouchers, nil
2019-08-09 21:42:56 +00:00
}