diff --git a/chain/events/state/ctxstore.go b/chain/events/state/ctxstore.go new file mode 100644 index 000000000..12b45e425 --- /dev/null +++ b/chain/events/state/ctxstore.go @@ -0,0 +1,25 @@ +package state + +import ( + "context" + + "github.com/ipfs/go-cid" + cbor "github.com/ipfs/go-ipld-cbor" +) + +type contextStore struct { + ctx context.Context + cst *cbor.BasicIpldStore +} + +func (cs *contextStore) Context() context.Context { + return cs.ctx +} + +func (cs *contextStore) Get(ctx context.Context, c cid.Cid, out interface{}) error { + return cs.cst.Get(ctx, c, out) +} + +func (cs *contextStore) Put(ctx context.Context, v interface{}) (cid.Cid, error) { + return cs.cst.Put(ctx, v) +} diff --git a/chain/events/state/diff_adt.go b/chain/events/state/diff_adt.go new file mode 100644 index 000000000..0d281b515 --- /dev/null +++ b/chain/events/state/diff_adt.go @@ -0,0 +1,55 @@ +package state + +import ( + "github.com/filecoin-project/specs-actors/actors/util/adt" + typegen "github.com/whyrusleeping/cbor-gen" +) + +// AdtArrayDiff generalizes adt.Array diffing by accepting a Deferred type that can unmarshalled to its corresponding struct +// in an interface implantation. +// Add should be called when a new k,v is added to the array +// Modify should be called when a value is modified in the array +// Remove should be called when a value is removed from the array +type AdtArrayDiff interface { + Add(val *typegen.Deferred) error + Modify(from, to *typegen.Deferred) error + Remove(val *typegen.Deferred) error +} + +// TODO Performance can be improved by diffing the underlying IPLD graph, e.g. https://github.com/ipfs/go-merkledag/blob/749fd8717d46b4f34c9ce08253070079c89bc56d/dagutils/diff.go#L104 +// CBOR Marshaling will likely be the largest performance bottleneck here. + +// DiffAdtArray accepts two *adt.Array's and an AdtArrayDiff implementation. It does the following: +// - All values that exist in preArr and not in curArr are passed to AdtArrayDiff.Remove() +// - All values that exist in curArr nnd not in prevArr are passed to adtArrayDiff.Add() +// - All values that exist in preArr and in curArr are passed to AdtArrayDiff.Modify() +// - It is the responsibility of AdtArrayDiff.Modify() to determine if the values it was passed have been modified. +func DiffAdtArray(preArr, curArr *adt.Array, out AdtArrayDiff) error { + prevVal := new(typegen.Deferred) + if err := preArr.ForEach(prevVal, func(i int64) error { + curVal := new(typegen.Deferred) + found, err := curArr.Get(uint64(i), curVal) + if err != nil { + return err + } + if !found { + if err := out.Remove(prevVal); err != nil { + return err + } + return nil + } + + if err := out.Modify(prevVal, curVal); err != nil { + return err + } + + return curArr.Delete(uint64(i)) + }); err != nil { + return err + } + + curVal := new(typegen.Deferred) + return curArr.ForEach(curVal, func(i int64) error { + return out.Add(curVal) + }) +} diff --git a/chain/events/state/diff_adt_test.go b/chain/events/state/diff_adt_test.go new file mode 100644 index 000000000..6f1c36ccb --- /dev/null +++ b/chain/events/state/diff_adt_test.go @@ -0,0 +1,131 @@ +package state + +import ( + "bytes" + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + ds "github.com/ipfs/go-datastore" + ds_sync "github.com/ipfs/go-datastore/sync" + bstore "github.com/ipfs/go-ipfs-blockstore" + cbornode "github.com/ipfs/go-ipld-cbor" + typegen "github.com/whyrusleeping/cbor-gen" + + "github.com/filecoin-project/specs-actors/actors/runtime" + "github.com/filecoin-project/specs-actors/actors/util/adt" +) + +func TestDiffAdtArray(t *testing.T) { + ctxstoreA := newContextStore() + ctxstoreB := newContextStore() + + arrA := adt.MakeEmptyArray(ctxstoreA) + arrB := adt.MakeEmptyArray(ctxstoreB) + + require.NoError(t, arrA.Set(0, runtime.CBORBytes([]byte{0}))) // delete + + require.NoError(t, arrA.Set(1, runtime.CBORBytes([]byte{0}))) // modify + require.NoError(t, arrB.Set(1, runtime.CBORBytes([]byte{1}))) + + require.NoError(t, arrA.Set(2, runtime.CBORBytes([]byte{1}))) // delete + + require.NoError(t, arrA.Set(3, runtime.CBORBytes([]byte{0}))) // noop + require.NoError(t, arrB.Set(3, runtime.CBORBytes([]byte{0}))) + + require.NoError(t, arrA.Set(4, runtime.CBORBytes([]byte{0}))) // modify + require.NoError(t, arrB.Set(4, runtime.CBORBytes([]byte{6}))) + + require.NoError(t, arrB.Set(5, runtime.CBORBytes{8})) // add + require.NoError(t, arrB.Set(6, runtime.CBORBytes{9})) // add + + changes := &TestAdtDiff{ + Added: []runtime.CBORBytes{}, + Modified: []TestAdtDiffModified{}, + Removed: []runtime.CBORBytes{}, + } + + assert.NoError(t, DiffAdtArray(arrA, arrB, changes)) + assert.NotNil(t, changes) + + assert.Equal(t, 2, len(changes.Added)) + assert.EqualValues(t, []byte{8}, changes.Added[0]) + assert.EqualValues(t, []byte{9}, changes.Added[1]) + + assert.Equal(t, 2, len(changes.Modified)) + assert.EqualValues(t, []byte{0}, changes.Modified[0].From) + assert.EqualValues(t, []byte{1}, changes.Modified[0].To) + assert.EqualValues(t, []byte{0}, changes.Modified[1].From) + assert.EqualValues(t, []byte{6}, changes.Modified[1].To) + + assert.Equal(t, 2, len(changes.Removed)) + assert.EqualValues(t, []byte{0}, changes.Removed[0]) + assert.EqualValues(t, []byte{1}, changes.Removed[1]) +} + +type TestAdtDiff struct { + Added []runtime.CBORBytes + Modified []TestAdtDiffModified + Removed []runtime.CBORBytes +} + +var _ AdtArrayDiff = &TestAdtDiff{} + +type TestAdtDiffModified struct { + From runtime.CBORBytes + To runtime.CBORBytes +} + +func (t *TestAdtDiff) Add(val *typegen.Deferred) error { + v := new(runtime.CBORBytes) + err := v.UnmarshalCBOR(bytes.NewReader(val.Raw)) + if err != nil { + return err + } + t.Added = append(t.Added, *v) + return nil +} + +func (t *TestAdtDiff) Modify(from, to *typegen.Deferred) error { + vFrom := new(runtime.CBORBytes) + err := vFrom.UnmarshalCBOR(bytes.NewReader(from.Raw)) + if err != nil { + return err + } + + vTo := new(runtime.CBORBytes) + err = vTo.UnmarshalCBOR(bytes.NewReader(to.Raw)) + if err != nil { + return err + } + + if !bytes.Equal(*vFrom, *vTo) { + t.Modified = append(t.Modified, TestAdtDiffModified{ + From: *vFrom, + To: *vTo, + }) + } + return nil +} + +func (t *TestAdtDiff) Remove(val *typegen.Deferred) error { + v := new(runtime.CBORBytes) + err := v.UnmarshalCBOR(bytes.NewReader(val.Raw)) + if err != nil { + return err + } + t.Removed = append(t.Removed, *v) + return nil +} + +func newContextStore() *contextStore { + ctx := context.Background() + bs := bstore.NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore())) + store := cbornode.NewCborStore(bs) + return &contextStore{ + ctx: ctx, + cst: store, + } +} diff --git a/chain/events/state/predicates.go b/chain/events/state/predicates.go index 7fecaf15a..959227adc 100644 --- a/chain/events/state/predicates.go +++ b/chain/events/state/predicates.go @@ -1,10 +1,12 @@ package state import ( + "bytes" "context" "github.com/ipfs/go-cid" cbor "github.com/ipfs/go-ipld-cbor" + typegen "github.com/whyrusleeping/cbor-gen" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-amt-ipld/v2" @@ -163,15 +165,59 @@ type MinerSectorChanges struct { Removed []miner.SectorOnChainInfo } +var _ AdtArrayDiff = &MinerSectorChanges{} + type SectorExtensions struct { From miner.SectorOnChainInfo To miner.SectorOnChainInfo } +func (m *MinerSectorChanges) Add(val *typegen.Deferred) error { + si := new(miner.SectorOnChainInfo) + err := si.UnmarshalCBOR(bytes.NewReader(val.Raw)) + if err != nil { + return err + } + m.Added = append(m.Added, *si) + return nil +} + +func (m *MinerSectorChanges) Modify(from, to *typegen.Deferred) error { + siFrom := new(miner.SectorOnChainInfo) + err := siFrom.UnmarshalCBOR(bytes.NewReader(from.Raw)) + if err != nil { + return err + } + + siTo := new(miner.SectorOnChainInfo) + err = siTo.UnmarshalCBOR(bytes.NewReader(to.Raw)) + if err != nil { + return err + } + + if siFrom.Info.Expiration != siTo.Info.Expiration { + m.Extended = append(m.Extended, SectorExtensions{ + From: *siFrom, + To: *siTo, + }) + } + return nil +} + +func (m *MinerSectorChanges) Remove(val *typegen.Deferred) error { + si := new(miner.SectorOnChainInfo) + err := si.UnmarshalCBOR(bytes.NewReader(val.Raw)) + if err != nil { + return err + } + m.Removed = append(m.Removed, *si) + return nil +} + func (sp *StatePredicates) OnMinerSectorChange() DiffMinerActorStateFunc { return func(ctx context.Context, oldState, newState *miner.State) (changed bool, user UserData, err error) { ctxStore := &contextStore{ - ctx: context.TODO(), + ctx: ctx, cst: sp.cst, } @@ -196,42 +242,7 @@ func (sp *StatePredicates) OnMinerSectorChange() DiffMinerActorStateFunc { return false, nil, err } - var osi miner.SectorOnChainInfo - - // find all sectors that were extended or removed - if err := oldSectors.ForEach(&osi, func(i int64) error { - var nsi miner.SectorOnChainInfo - found, err := newSectors.Get(uint64(osi.Info.SectorNumber), &nsi) - if err != nil { - return err - } - if !found { - sectorChanges.Removed = append(sectorChanges.Removed, osi) - return nil - } - - if nsi.Info.Expiration != osi.Info.Expiration { - sectorChanges.Extended = append(sectorChanges.Extended, SectorExtensions{ - From: osi, - To: nsi, - }) - } - - // we don't update miners state filed with `newSectors.Root()` so this operation is safe. - if err := newSectors.Delete(uint64(osi.Info.SectorNumber)); err != nil { - return err - } - return nil - }); err != nil { - return false, nil, err - } - - // all sectors that remain in newSectors are new - var nsi miner.SectorOnChainInfo - if err := newSectors.ForEach(&nsi, func(i int64) error { - sectorChanges.Added = append(sectorChanges.Added, nsi) - return nil - }); err != nil { + if err := DiffAdtArray(oldSectors, newSectors, sectorChanges); err != nil { return false, nil, err } @@ -243,20 +254,3 @@ func (sp *StatePredicates) OnMinerSectorChange() DiffMinerActorStateFunc { return true, sectorChanges, nil } } - -type contextStore struct { - ctx context.Context - cst *cbor.BasicIpldStore -} - -func (cs *contextStore) Context() context.Context { - return cs.ctx -} - -func (cs *contextStore) Get(ctx context.Context, c cid.Cid, out interface{}) error { - return cs.cst.Get(ctx, c, out) -} - -func (cs *contextStore) Put(ctx context.Context, v interface{}) (cid.Cid, error) { - return cs.cst.Put(ctx, v) -}