Merge pull request #2246 from TroyWind/fit-multiple-piece-into-a-sector

a sector contains multiple deals(pieces), the state of deals incorrect
This commit is contained in:
Aayush Rajasekaran 2020-07-08 11:21:41 -04:00 committed by GitHub
commit 56a892e5f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 33 additions and 29 deletions

View File

@ -485,13 +485,15 @@ func (me *messageEvents) checkNewCalls(ts *types.TipSet) (map[triggerID]eventDat
for tid, matchFns := range me.matchers { for tid, matchFns := range me.matchers {
var matched bool var matched bool
var once bool
for _, matchFn := range matchFns { for _, matchFn := range matchFns {
ok, err := matchFn(msg) matchOne, ok, err := matchFn(msg)
if err != nil { if err != nil {
log.Errorf("event matcher failed: %s", err) log.Errorf("event matcher failed: %s", err)
continue continue
} }
matched = ok matched = ok
once = matchOne
if matched { if matched {
break break
@ -500,7 +502,9 @@ func (me *messageEvents) checkNewCalls(ts *types.TipSet) (map[triggerID]eventDat
if matched { if matched {
res[tid] = msg res[tid] = msg
break if once {
break
}
} }
} }
}) })
@ -548,7 +552,7 @@ func (me *messageEvents) messagesForTs(ts *types.TipSet, consume func(*types.Mes
// `curH`-`ts.Height` = `confidence` // `curH`-`ts.Height` = `confidence`
type MsgHandler func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error) type MsgHandler func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error)
type MsgMatchFunc func(msg *types.Message) (bool, error) type MsgMatchFunc func(msg *types.Message) (matchOnce bool, matched bool, err error)
// Called registers a callback which is triggered when a specified method is // Called registers a callback which is triggered when a specified method is
// called on an actor, or a timeout is reached. // called on an actor, or a timeout is reached.

View File

@ -561,9 +561,9 @@ func TestAtChainedConfidenceNull(t *testing.T) {
require.Equal(t, false, reverted) require.Equal(t, false, reverted)
} }
func matchAddrMethod(to address.Address, m abi.MethodNum) func(msg *types.Message) (bool, error) { func matchAddrMethod(to address.Address, m abi.MethodNum) func(msg *types.Message) (matchOnce bool, matched bool, err error) {
return func(msg *types.Message) (bool, error) { return func(msg *types.Message) (matchOnce bool, matched bool, err error) {
return to == msg.To && m == msg.Method, nil return true, to == msg.To && m == msg.Method, nil
} }
} }

View File

@ -34,11 +34,11 @@ func (me *messageEvents) CheckMsg(ctx context.Context, smsg types.ChainMsg, hnd
} }
func (me *messageEvents) MatchMsg(inmsg *types.Message) MsgMatchFunc { func (me *messageEvents) MatchMsg(inmsg *types.Message) MsgMatchFunc {
return func(msg *types.Message) (bool, error) { return func(msg *types.Message) (matchOnce bool, matched bool, err error) {
if msg.From == inmsg.From && msg.Nonce == inmsg.Nonce && !inmsg.Equals(msg) { if msg.From == inmsg.From && msg.Nonce == inmsg.Nonce && !inmsg.Equals(msg) {
return false, xerrors.Errorf("matching msg %s from %s, nonce %d: got duplicate origin/nonce msg %d", inmsg.Cid(), inmsg.From, inmsg.Nonce, msg.Nonce) return true, false, xerrors.Errorf("matching msg %s from %s, nonce %d: got duplicate origin/nonce msg %d", inmsg.Cid(), inmsg.From, inmsg.Nonce, msg.Nonce)
} }
return inmsg.Equals(msg), nil return true, inmsg.Equals(msg), nil
} }
} }

View File

@ -292,44 +292,44 @@ func (c *ClientNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider
var sectorNumber abi.SectorNumber var sectorNumber abi.SectorNumber
var sectorFound bool var sectorFound bool
matchEvent := func(msg *types.Message) (bool, error) { matchEvent := func(msg *types.Message) (matchOnce bool, matched bool, err error) {
if msg.To != provider { if msg.To != provider {
return false, nil return true, false, nil
} }
switch msg.Method { switch msg.Method {
case builtin.MethodsMiner.PreCommitSector: case builtin.MethodsMiner.PreCommitSector:
var params miner.SectorPreCommitInfo var params miner.SectorPreCommitInfo
if err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)); err != nil { if err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)); err != nil {
return false, xerrors.Errorf("unmarshal pre commit: %w", err) return true, false, xerrors.Errorf("unmarshal pre commit: %w", err)
} }
for _, did := range params.DealIDs { for _, did := range params.DealIDs {
if did == abi.DealID(dealId) { if did == abi.DealID(dealId) {
sectorNumber = params.SectorNumber sectorNumber = params.SectorNumber
sectorFound = true sectorFound = true
return false, nil return true, false, nil
} }
} }
return false, nil return true, false, nil
case builtin.MethodsMiner.ProveCommitSector: case builtin.MethodsMiner.ProveCommitSector:
var params miner.ProveCommitSectorParams var params miner.ProveCommitSectorParams
if err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)); err != nil { if err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)); err != nil {
return false, xerrors.Errorf("failed to unmarshal prove commit sector params: %w", err) return true, false, xerrors.Errorf("failed to unmarshal prove commit sector params: %w", err)
} }
if !sectorFound { if !sectorFound {
return false, nil return true, false, nil
} }
if params.SectorNumber != sectorNumber { if params.SectorNumber != sectorNumber {
return false, nil return true, false, nil
} }
return true, nil return false, true, nil
default: default:
return false, nil return true, false, nil
} }
} }

View File

@ -279,44 +279,44 @@ func (n *ProviderNodeAdapter) OnDealSectorCommitted(ctx context.Context, provide
var sectorNumber abi.SectorNumber var sectorNumber abi.SectorNumber
var sectorFound bool var sectorFound bool
matchEvent := func(msg *types.Message) (bool, error) { matchEvent := func(msg *types.Message) (matchOnce bool, matched bool, err error) {
if msg.To != provider { if msg.To != provider {
return false, nil return true, false, nil
} }
switch msg.Method { switch msg.Method {
case builtin.MethodsMiner.PreCommitSector: case builtin.MethodsMiner.PreCommitSector:
var params miner.SectorPreCommitInfo var params miner.SectorPreCommitInfo
if err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)); err != nil { if err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)); err != nil {
return false, xerrors.Errorf("unmarshal pre commit: %w", err) return true, false, xerrors.Errorf("unmarshal pre commit: %w", err)
} }
for _, did := range params.DealIDs { for _, did := range params.DealIDs {
if did == abi.DealID(dealID) { if did == abi.DealID(dealID) {
sectorNumber = params.SectorNumber sectorNumber = params.SectorNumber
sectorFound = true sectorFound = true
return false, nil return true, false, nil
} }
} }
return false, nil return true, false, nil
case builtin.MethodsMiner.ProveCommitSector: case builtin.MethodsMiner.ProveCommitSector:
var params miner.ProveCommitSectorParams var params miner.ProveCommitSectorParams
if err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)); err != nil { if err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)); err != nil {
return false, xerrors.Errorf("failed to unmarshal prove commit sector params: %w", err) return true, false, xerrors.Errorf("failed to unmarshal prove commit sector params: %w", err)
} }
if !sectorFound { if !sectorFound {
return false, nil return true, false, nil
} }
if params.SectorNumber != sectorNumber { if params.SectorNumber != sectorNumber {
return false, nil return true, false, nil
} }
return true, nil return false, true, nil
default: default:
return false, nil return true, false, nil
} }
} }