fix: adjust OnDealExpiredOrSlashed timeout to deal expiry

This commit is contained in:
Dirk McCormick 2020-07-08 15:00:54 -04:00
parent 2b9c05d395
commit c9b4dab1e1
2 changed files with 38 additions and 27 deletions

View File

@ -6,7 +6,6 @@ import (
"bytes" "bytes"
"context" "context"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/events/state" "github.com/filecoin-project/lotus/chain/events/state"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -341,16 +340,18 @@ func (c *ClientNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider
} }
func (c *ClientNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID abi.DealID, onDealExpired storagemarket.DealExpiredCallback, onDealSlashed storagemarket.DealSlashedCallback) error { func (c *ClientNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID abi.DealID, onDealExpired storagemarket.DealExpiredCallback, onDealSlashed storagemarket.DealSlashedCallback) error {
var sd *api.MarketDeal head, err := c.ChainHead(ctx)
if err != nil {
return xerrors.Errorf("client: failed to get chain head: %w", err)
}
sd, err := c.StateMarketStorageDeal(ctx, dealID, head.Key())
if err != nil {
return xerrors.Errorf("client: failed to look up deal %d on chain: %w", dealID, err)
}
// Called immediately to check if the deal has already expired or been slashed // Called immediately to check if the deal has already expired or been slashed
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) { checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
sd, err = stmgr.GetStorageDeal(ctx, c.StateManager, dealID, ts)
if err != nil {
return false, false, xerrors.Errorf("client: failed to look up deal on chain: %w", err)
}
// Check if the deal has already expired // Check if the deal has already expired
if sd.Proposal.EndEpoch <= ts.Height() { if sd.Proposal.EndEpoch <= ts.Height() {
onDealExpired(nil) onDealExpired(nil)
@ -371,17 +372,18 @@ func (c *ClientNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID a
// Called when there was a match against the state change we're looking for // Called when there was a match against the state change we're looking for
// and the chain has advanced to the confidence height // and the chain has advanced to the confidence height
stateChanged := func(ts *types.TipSet, ts2 *types.TipSet, states events.StateChange, h abi.ChainEpoch) (more bool, err error) { stateChanged := func(ts *types.TipSet, ts2 *types.TipSet, states events.StateChange, h abi.ChainEpoch) (more bool, err error) {
if states == nil {
log.Error("timed out waiting for deal expiry")
return false, nil
}
// Check if the deal has already expired // Check if the deal has already expired
if sd.Proposal.EndEpoch <= ts2.Height() { if sd.Proposal.EndEpoch <= ts2.Height() {
onDealExpired(nil) onDealExpired(nil)
return false, nil return false, nil
} }
// Timeout waiting for state change
if states == nil {
log.Error("timed out waiting for deal expiry")
return false, nil
}
changedDeals, ok := states.(state.ChangedDeals) changedDeals, ok := states.(state.ChangedDeals)
if !ok { if !ok {
panic("Expected state.ChangedDeals") panic("Expected state.ChangedDeals")
@ -417,7 +419,10 @@ func (c *ClientNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID a
match := func(oldTs, newTs *types.TipSet) (bool, events.StateChange, error) { match := func(oldTs, newTs *types.TipSet) (bool, events.StateChange, error) {
return dealDiff(ctx, oldTs.Key(), newTs.Key()) return dealDiff(ctx, oldTs.Key(), newTs.Key())
} }
if err := c.ev.StateChanged(checkFunc, stateChanged, revert, int(build.MessageConfidence)+1, build.SealRandomnessLookbackLimit, match); err != nil {
// Wait until after the end epoch for the deal and then timeout
timeout := (sd.Proposal.EndEpoch - head.Height()) + 1
if err := c.ev.StateChanged(checkFunc, stateChanged, revert, int(build.MessageConfidence)+1, timeout, match); err != nil {
return xerrors.Errorf("failed to set up state changed handler: %w", err) return xerrors.Errorf("failed to set up state changed handler: %w", err)
} }

View File

@ -356,16 +356,18 @@ func (n *ProviderNodeAdapter) GetDataCap(ctx context.Context, addr address.Addre
} }
func (n *ProviderNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID abi.DealID, onDealExpired storagemarket.DealExpiredCallback, onDealSlashed storagemarket.DealSlashedCallback) error { func (n *ProviderNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID abi.DealID, onDealExpired storagemarket.DealExpiredCallback, onDealSlashed storagemarket.DealSlashedCallback) error {
var sd *api.MarketDeal head, err := n.ChainHead(ctx)
if err != nil {
return xerrors.Errorf("client: failed to get chain head: %w", err)
}
sd, err := n.StateMarketStorageDeal(ctx, dealID, head.Key())
if err != nil {
return xerrors.Errorf("client: failed to look up deal %d on chain: %w", dealID, err)
}
// Called immediately to check if the deal has already expired or been slashed // Called immediately to check if the deal has already expired or been slashed
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) { checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
sd, err = n.StateMarketStorageDeal(ctx, abi.DealID(dealID), ts.Key())
if err != nil {
return false, false, xerrors.Errorf("client: failed to look up deal on chain: %w", err)
}
// Check if the deal has already expired // Check if the deal has already expired
if sd.Proposal.EndEpoch <= ts.Height() { if sd.Proposal.EndEpoch <= ts.Height() {
onDealExpired(nil) onDealExpired(nil)
@ -386,17 +388,18 @@ func (n *ProviderNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID
// Called when there was a match against the state change we're looking for // Called when there was a match against the state change we're looking for
// and the chain has advanced to the confidence height // and the chain has advanced to the confidence height
stateChanged := func(ts *types.TipSet, ts2 *types.TipSet, states events.StateChange, h abi.ChainEpoch) (more bool, err error) { stateChanged := func(ts *types.TipSet, ts2 *types.TipSet, states events.StateChange, h abi.ChainEpoch) (more bool, err error) {
if states == nil {
log.Error("timed out waiting for deal expiry")
return false, nil
}
// Check if the deal has already expired // Check if the deal has already expired
if sd.Proposal.EndEpoch <= ts2.Height() { if sd.Proposal.EndEpoch <= ts2.Height() {
onDealExpired(nil) onDealExpired(nil)
return false, nil return false, nil
} }
// Timeout waiting for state change
if states == nil {
log.Error("timed out waiting for deal expiry")
return false, nil
}
changedDeals, ok := states.(state.ChangedDeals) changedDeals, ok := states.(state.ChangedDeals)
if !ok { if !ok {
panic("Expected state.ChangedDeals") panic("Expected state.ChangedDeals")
@ -432,7 +435,10 @@ func (n *ProviderNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID
match := func(oldTs, newTs *types.TipSet) (bool, events.StateChange, error) { match := func(oldTs, newTs *types.TipSet) (bool, events.StateChange, error) {
return dealDiff(ctx, oldTs.Key(), newTs.Key()) return dealDiff(ctx, oldTs.Key(), newTs.Key())
} }
if err := n.ev.StateChanged(checkFunc, stateChanged, revert, int(build.MessageConfidence)+1, build.SealRandomnessLookbackLimit, match); err != nil {
// Wait until after the end epoch for the deal and then timeout
timeout := (sd.Proposal.EndEpoch - head.Height()) + 1
if err := n.ev.StateChanged(checkFunc, stateChanged, revert, int(build.MessageConfidence)+1, timeout, match); err != nil {
return xerrors.Errorf("failed to set up state changed handler: %w", err) return xerrors.Errorf("failed to set up state changed handler: %w", err)
} }