polish: genericize adt array diff & extract store
- adds testing for adt diff
This commit is contained in:
parent
631b8b5802
commit
509e3b653c
25
chain/events/state/ctxstore.go
Normal file
25
chain/events/state/ctxstore.go
Normal file
@ -0,0 +1,25 @@
|
||||
package state
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
cbor "github.com/ipfs/go-ipld-cbor"
|
||||
)
|
||||
|
||||
type contextStore struct {
|
||||
ctx context.Context
|
||||
cst *cbor.BasicIpldStore
|
||||
}
|
||||
|
||||
func (cs *contextStore) Context() context.Context {
|
||||
return cs.ctx
|
||||
}
|
||||
|
||||
func (cs *contextStore) Get(ctx context.Context, c cid.Cid, out interface{}) error {
|
||||
return cs.cst.Get(ctx, c, out)
|
||||
}
|
||||
|
||||
func (cs *contextStore) Put(ctx context.Context, v interface{}) (cid.Cid, error) {
|
||||
return cs.cst.Put(ctx, v)
|
||||
}
|
55
chain/events/state/diff_adt.go
Normal file
55
chain/events/state/diff_adt.go
Normal file
@ -0,0 +1,55 @@
|
||||
package state
|
||||
|
||||
import (
|
||||
"github.com/filecoin-project/specs-actors/actors/util/adt"
|
||||
typegen "github.com/whyrusleeping/cbor-gen"
|
||||
)
|
||||
|
||||
// AdtArrayDiff generalizes adt.Array diffing by accepting a Deferred type that can unmarshalled to its corresponding struct
|
||||
// in an interface implantation.
|
||||
// Add should be called when a new k,v is added to the array
|
||||
// Modify should be called when a value is modified in the array
|
||||
// Remove should be called when a value is removed from the array
|
||||
type AdtArrayDiff interface {
|
||||
Add(val *typegen.Deferred) error
|
||||
Modify(from, to *typegen.Deferred) error
|
||||
Remove(val *typegen.Deferred) error
|
||||
}
|
||||
|
||||
// TODO Performance can be improved by diffing the underlying IPLD graph, e.g. https://github.com/ipfs/go-merkledag/blob/749fd8717d46b4f34c9ce08253070079c89bc56d/dagutils/diff.go#L104
|
||||
// CBOR Marshaling will likely be the largest performance bottleneck here.
|
||||
|
||||
// DiffAdtArray accepts two *adt.Array's and an AdtArrayDiff implementation. It does the following:
|
||||
// - All values that exist in preArr and not in curArr are passed to AdtArrayDiff.Remove()
|
||||
// - All values that exist in curArr nnd not in prevArr are passed to adtArrayDiff.Add()
|
||||
// - All values that exist in preArr and in curArr are passed to AdtArrayDiff.Modify()
|
||||
// - It is the responsibility of AdtArrayDiff.Modify() to determine if the values it was passed have been modified.
|
||||
func DiffAdtArray(preArr, curArr *adt.Array, out AdtArrayDiff) error {
|
||||
prevVal := new(typegen.Deferred)
|
||||
if err := preArr.ForEach(prevVal, func(i int64) error {
|
||||
curVal := new(typegen.Deferred)
|
||||
found, err := curArr.Get(uint64(i), curVal)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !found {
|
||||
if err := out.Remove(prevVal); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := out.Modify(prevVal, curVal); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return curArr.Delete(uint64(i))
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
curVal := new(typegen.Deferred)
|
||||
return curArr.ForEach(curVal, func(i int64) error {
|
||||
return out.Add(curVal)
|
||||
})
|
||||
}
|
131
chain/events/state/diff_adt_test.go
Normal file
131
chain/events/state/diff_adt_test.go
Normal file
@ -0,0 +1,131 @@
|
||||
package state
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
ds "github.com/ipfs/go-datastore"
|
||||
ds_sync "github.com/ipfs/go-datastore/sync"
|
||||
bstore "github.com/ipfs/go-ipfs-blockstore"
|
||||
cbornode "github.com/ipfs/go-ipld-cbor"
|
||||
typegen "github.com/whyrusleeping/cbor-gen"
|
||||
|
||||
"github.com/filecoin-project/specs-actors/actors/runtime"
|
||||
"github.com/filecoin-project/specs-actors/actors/util/adt"
|
||||
)
|
||||
|
||||
func TestDiffAdtArray(t *testing.T) {
|
||||
ctxstoreA := newContextStore()
|
||||
ctxstoreB := newContextStore()
|
||||
|
||||
arrA := adt.MakeEmptyArray(ctxstoreA)
|
||||
arrB := adt.MakeEmptyArray(ctxstoreB)
|
||||
|
||||
require.NoError(t, arrA.Set(0, runtime.CBORBytes([]byte{0}))) // delete
|
||||
|
||||
require.NoError(t, arrA.Set(1, runtime.CBORBytes([]byte{0}))) // modify
|
||||
require.NoError(t, arrB.Set(1, runtime.CBORBytes([]byte{1})))
|
||||
|
||||
require.NoError(t, arrA.Set(2, runtime.CBORBytes([]byte{1}))) // delete
|
||||
|
||||
require.NoError(t, arrA.Set(3, runtime.CBORBytes([]byte{0}))) // noop
|
||||
require.NoError(t, arrB.Set(3, runtime.CBORBytes([]byte{0})))
|
||||
|
||||
require.NoError(t, arrA.Set(4, runtime.CBORBytes([]byte{0}))) // modify
|
||||
require.NoError(t, arrB.Set(4, runtime.CBORBytes([]byte{6})))
|
||||
|
||||
require.NoError(t, arrB.Set(5, runtime.CBORBytes{8})) // add
|
||||
require.NoError(t, arrB.Set(6, runtime.CBORBytes{9})) // add
|
||||
|
||||
changes := &TestAdtDiff{
|
||||
Added: []runtime.CBORBytes{},
|
||||
Modified: []TestAdtDiffModified{},
|
||||
Removed: []runtime.CBORBytes{},
|
||||
}
|
||||
|
||||
assert.NoError(t, DiffAdtArray(arrA, arrB, changes))
|
||||
assert.NotNil(t, changes)
|
||||
|
||||
assert.Equal(t, 2, len(changes.Added))
|
||||
assert.EqualValues(t, []byte{8}, changes.Added[0])
|
||||
assert.EqualValues(t, []byte{9}, changes.Added[1])
|
||||
|
||||
assert.Equal(t, 2, len(changes.Modified))
|
||||
assert.EqualValues(t, []byte{0}, changes.Modified[0].From)
|
||||
assert.EqualValues(t, []byte{1}, changes.Modified[0].To)
|
||||
assert.EqualValues(t, []byte{0}, changes.Modified[1].From)
|
||||
assert.EqualValues(t, []byte{6}, changes.Modified[1].To)
|
||||
|
||||
assert.Equal(t, 2, len(changes.Removed))
|
||||
assert.EqualValues(t, []byte{0}, changes.Removed[0])
|
||||
assert.EqualValues(t, []byte{1}, changes.Removed[1])
|
||||
}
|
||||
|
||||
type TestAdtDiff struct {
|
||||
Added []runtime.CBORBytes
|
||||
Modified []TestAdtDiffModified
|
||||
Removed []runtime.CBORBytes
|
||||
}
|
||||
|
||||
var _ AdtArrayDiff = &TestAdtDiff{}
|
||||
|
||||
type TestAdtDiffModified struct {
|
||||
From runtime.CBORBytes
|
||||
To runtime.CBORBytes
|
||||
}
|
||||
|
||||
func (t *TestAdtDiff) Add(val *typegen.Deferred) error {
|
||||
v := new(runtime.CBORBytes)
|
||||
err := v.UnmarshalCBOR(bytes.NewReader(val.Raw))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
t.Added = append(t.Added, *v)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *TestAdtDiff) Modify(from, to *typegen.Deferred) error {
|
||||
vFrom := new(runtime.CBORBytes)
|
||||
err := vFrom.UnmarshalCBOR(bytes.NewReader(from.Raw))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
vTo := new(runtime.CBORBytes)
|
||||
err = vTo.UnmarshalCBOR(bytes.NewReader(to.Raw))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !bytes.Equal(*vFrom, *vTo) {
|
||||
t.Modified = append(t.Modified, TestAdtDiffModified{
|
||||
From: *vFrom,
|
||||
To: *vTo,
|
||||
})
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *TestAdtDiff) Remove(val *typegen.Deferred) error {
|
||||
v := new(runtime.CBORBytes)
|
||||
err := v.UnmarshalCBOR(bytes.NewReader(val.Raw))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
t.Removed = append(t.Removed, *v)
|
||||
return nil
|
||||
}
|
||||
|
||||
func newContextStore() *contextStore {
|
||||
ctx := context.Background()
|
||||
bs := bstore.NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore()))
|
||||
store := cbornode.NewCborStore(bs)
|
||||
return &contextStore{
|
||||
ctx: ctx,
|
||||
cst: store,
|
||||
}
|
||||
}
|
@ -1,10 +1,12 @@
|
||||
package state
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
cbor "github.com/ipfs/go-ipld-cbor"
|
||||
typegen "github.com/whyrusleeping/cbor-gen"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-amt-ipld/v2"
|
||||
@ -163,15 +165,59 @@ type MinerSectorChanges struct {
|
||||
Removed []miner.SectorOnChainInfo
|
||||
}
|
||||
|
||||
var _ AdtArrayDiff = &MinerSectorChanges{}
|
||||
|
||||
type SectorExtensions struct {
|
||||
From miner.SectorOnChainInfo
|
||||
To miner.SectorOnChainInfo
|
||||
}
|
||||
|
||||
func (m *MinerSectorChanges) Add(val *typegen.Deferred) error {
|
||||
si := new(miner.SectorOnChainInfo)
|
||||
err := si.UnmarshalCBOR(bytes.NewReader(val.Raw))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
m.Added = append(m.Added, *si)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MinerSectorChanges) Modify(from, to *typegen.Deferred) error {
|
||||
siFrom := new(miner.SectorOnChainInfo)
|
||||
err := siFrom.UnmarshalCBOR(bytes.NewReader(from.Raw))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
siTo := new(miner.SectorOnChainInfo)
|
||||
err = siTo.UnmarshalCBOR(bytes.NewReader(to.Raw))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if siFrom.Info.Expiration != siTo.Info.Expiration {
|
||||
m.Extended = append(m.Extended, SectorExtensions{
|
||||
From: *siFrom,
|
||||
To: *siTo,
|
||||
})
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MinerSectorChanges) Remove(val *typegen.Deferred) error {
|
||||
si := new(miner.SectorOnChainInfo)
|
||||
err := si.UnmarshalCBOR(bytes.NewReader(val.Raw))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
m.Removed = append(m.Removed, *si)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sp *StatePredicates) OnMinerSectorChange() DiffMinerActorStateFunc {
|
||||
return func(ctx context.Context, oldState, newState *miner.State) (changed bool, user UserData, err error) {
|
||||
ctxStore := &contextStore{
|
||||
ctx: context.TODO(),
|
||||
ctx: ctx,
|
||||
cst: sp.cst,
|
||||
}
|
||||
|
||||
@ -196,42 +242,7 @@ func (sp *StatePredicates) OnMinerSectorChange() DiffMinerActorStateFunc {
|
||||
return false, nil, err
|
||||
}
|
||||
|
||||
var osi miner.SectorOnChainInfo
|
||||
|
||||
// find all sectors that were extended or removed
|
||||
if err := oldSectors.ForEach(&osi, func(i int64) error {
|
||||
var nsi miner.SectorOnChainInfo
|
||||
found, err := newSectors.Get(uint64(osi.Info.SectorNumber), &nsi)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !found {
|
||||
sectorChanges.Removed = append(sectorChanges.Removed, osi)
|
||||
return nil
|
||||
}
|
||||
|
||||
if nsi.Info.Expiration != osi.Info.Expiration {
|
||||
sectorChanges.Extended = append(sectorChanges.Extended, SectorExtensions{
|
||||
From: osi,
|
||||
To: nsi,
|
||||
})
|
||||
}
|
||||
|
||||
// we don't update miners state filed with `newSectors.Root()` so this operation is safe.
|
||||
if err := newSectors.Delete(uint64(osi.Info.SectorNumber)); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
return false, nil, err
|
||||
}
|
||||
|
||||
// all sectors that remain in newSectors are new
|
||||
var nsi miner.SectorOnChainInfo
|
||||
if err := newSectors.ForEach(&nsi, func(i int64) error {
|
||||
sectorChanges.Added = append(sectorChanges.Added, nsi)
|
||||
return nil
|
||||
}); err != nil {
|
||||
if err := DiffAdtArray(oldSectors, newSectors, sectorChanges); err != nil {
|
||||
return false, nil, err
|
||||
}
|
||||
|
||||
@ -243,20 +254,3 @@ func (sp *StatePredicates) OnMinerSectorChange() DiffMinerActorStateFunc {
|
||||
return true, sectorChanges, nil
|
||||
}
|
||||
}
|
||||
|
||||
type contextStore struct {
|
||||
ctx context.Context
|
||||
cst *cbor.BasicIpldStore
|
||||
}
|
||||
|
||||
func (cs *contextStore) Context() context.Context {
|
||||
return cs.ctx
|
||||
}
|
||||
|
||||
func (cs *contextStore) Get(ctx context.Context, c cid.Cid, out interface{}) error {
|
||||
return cs.cst.Get(ctx, c, out)
|
||||
}
|
||||
|
||||
func (cs *contextStore) Put(ctx context.Context, v interface{}) (cid.Cid, error) {
|
||||
return cs.cst.Put(ctx, v)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user