feat: add OnDealExpiredOrSlashed() to ClientNodeAdapter
This commit is contained in:
parent
a3abff768c
commit
f8725ac3bd
@ -5,6 +5,9 @@ package storageadapter
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/chain/events/state"
|
||||
"sync"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
@ -337,6 +340,118 @@ func (c *ClientNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *ClientNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID abi.DealID, onDealExpired storagemarket.DealExpiredCallback, onDealSlashed storagemarket.DealSlashedCallback) error {
|
||||
// Make sure that only one of the callbacks is called, and only once
|
||||
var lk sync.Mutex
|
||||
onDealExpiredOnce := func(err error) {
|
||||
lk.Lock()
|
||||
defer lk.Unlock()
|
||||
if onDealExpired != nil {
|
||||
onDealExpired(err)
|
||||
}
|
||||
onDealExpired = nil
|
||||
onDealSlashed = nil
|
||||
}
|
||||
onDealSlashedOnce := func(slashEpoch abi.ChainEpoch, err error) {
|
||||
lk.Lock()
|
||||
defer lk.Unlock()
|
||||
if onDealSlashed != nil {
|
||||
onDealSlashed(slashEpoch, err)
|
||||
}
|
||||
onDealExpired = nil
|
||||
onDealSlashed = nil
|
||||
}
|
||||
|
||||
var sd *api.MarketDeal
|
||||
|
||||
// Called immediately to check if the deal has already expired or been slashed
|
||||
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
|
||||
sd, err = stmgr.GetStorageDeal(ctx, c.StateManager, dealID, ts)
|
||||
|
||||
if err != nil {
|
||||
return false, false, xerrors.Errorf("client: failed to look up deal on chain: %w", err)
|
||||
}
|
||||
|
||||
// Check if the deal has already expired
|
||||
if sd.Proposal.EndEpoch <= ts.Height() {
|
||||
onDealExpiredOnce(nil)
|
||||
return true, false, nil
|
||||
}
|
||||
|
||||
// If there is no deal assume it's already been slashed
|
||||
if sd.State.SectorStartEpoch < 0 {
|
||||
onDealSlashedOnce(ts.Height(), nil)
|
||||
return true, false, nil
|
||||
}
|
||||
|
||||
// No events have occurred yet, so return
|
||||
// done: false, more: true (keep listening for events)
|
||||
return false, true, nil
|
||||
}
|
||||
|
||||
// Called when there was a match against the state change we're looking for
|
||||
// and the chain has advanced to the confidence height
|
||||
stateChanged := func(ts *types.TipSet, ts2 *types.TipSet, states events.StateChange, h abi.ChainEpoch) (more bool, err error) {
|
||||
if states == nil {
|
||||
log.Error("timed out waiting for deal expiry")
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Check if the deal has already expired
|
||||
if sd.Proposal.EndEpoch <= ts2.Height() {
|
||||
onDealExpiredOnce(nil)
|
||||
return false, nil
|
||||
}
|
||||
|
||||
changedDeals, ok := states.(state.ChangedDeals)
|
||||
if !ok {
|
||||
panic("Expected state.ChangedDeals")
|
||||
}
|
||||
|
||||
deal, ok := changedDeals[dealID]
|
||||
if !ok {
|
||||
// No change to deal
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// Deal was slashed
|
||||
if deal.To == nil {
|
||||
onDealSlashedOnce(ts2.Height(), nil)
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// Called when there was a chain reorg and the state change was reverted
|
||||
revert := func(ctx context.Context, ts *types.TipSet) error {
|
||||
// TODO: Is it ok to just ignore this?
|
||||
log.Warn("deal state reverted; TODO: actually handle this!")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Watch for state changes to the deal
|
||||
preds := state.NewStatePredicates(c)
|
||||
dealDiff := preds.OnStorageMarketActorChanged(
|
||||
preds.OnDealStateChanged(
|
||||
preds.DealStateChangedForIDs([]abi.DealID{dealID})))
|
||||
match := func(oldTs, newTs *types.TipSet) (bool, events.StateChange, error) {
|
||||
return dealDiff(ctx, oldTs, newTs)
|
||||
}
|
||||
if err := c.ev.StateChanged(checkFunc, stateChanged, revert, build.MessageConfidence+1, build.SealRandomnessLookbackLimit, match); err != nil {
|
||||
return xerrors.Errorf("failed to set up state changed handler: %w", err)
|
||||
}
|
||||
|
||||
// Listen for when the chain reaches the expiration height
|
||||
heightHandler := func(ctx context.Context, ts *types.TipSet, h abi.ChainEpoch) error {
|
||||
onDealExpiredOnce(nil)
|
||||
return nil
|
||||
}
|
||||
c.ev.ChainAt(heightHandler, revert, build.MessageConfidence+1, sd.Proposal.EndEpoch)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *ClientNodeAdapter) SignProposal(ctx context.Context, signer address.Address, proposal samarket.DealProposal) (*samarket.ClientDealProposal, error) {
|
||||
// TODO: output spec signed proposal
|
||||
buf, err := cborutil.Dump(&proposal)
|
||||
|
Loading…
Reference in New Issue
Block a user