199 lines
4.3 KiB
Go
199 lines
4.3 KiB
Go
package commitment
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/ipfs/go-cid"
|
|
"github.com/ipfs/go-datastore"
|
|
"github.com/ipfs/go-datastore/namespace"
|
|
cbor "github.com/ipfs/go-ipld-cbor"
|
|
logging "github.com/ipfs/go-log"
|
|
"golang.org/x/xerrors"
|
|
|
|
"github.com/filecoin-project/lotus/api"
|
|
"github.com/filecoin-project/lotus/chain/address"
|
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
|
dsq "github.com/ipfs/go-datastore/query"
|
|
)
|
|
|
|
var log = logging.Logger("commitment")
|
|
|
|
func init() {
|
|
cbor.RegisterCborType(commitment{})
|
|
}
|
|
|
|
var commitmentDsPrefix = datastore.NewKey("/commitments")
|
|
|
|
type Tracker struct {
|
|
commitments datastore.Datastore
|
|
|
|
lk sync.Mutex
|
|
|
|
waits map[datastore.Key]chan struct{}
|
|
}
|
|
|
|
func NewTracker(ds dtypes.MetadataDS) *Tracker {
|
|
return &Tracker{
|
|
commitments: namespace.Wrap(ds, commitmentDsPrefix),
|
|
waits: map[datastore.Key]chan struct{}{},
|
|
}
|
|
}
|
|
|
|
type commitment struct {
|
|
DealIDs []uint64
|
|
Msg cid.Cid
|
|
}
|
|
|
|
func commitmentKey(miner address.Address, sectorId uint64) datastore.Key {
|
|
return commitmentDsPrefix.ChildString(miner.String()).ChildString(fmt.Sprintf("%d", sectorId))
|
|
}
|
|
|
|
func (ct *Tracker) TrackCommitSectorMsg(miner address.Address, sectorId uint64, commitMsg cid.Cid) error {
|
|
key := commitmentKey(miner, sectorId)
|
|
|
|
ct.lk.Lock()
|
|
defer ct.lk.Unlock()
|
|
|
|
tracking, err := ct.commitments.Get(key)
|
|
switch err {
|
|
case nil:
|
|
var comm commitment
|
|
if err := cbor.DecodeInto(tracking, &comm); err != nil {
|
|
return err
|
|
}
|
|
|
|
if !comm.Msg.Equals(commitMsg) {
|
|
log.Errorf("commitment tracking for miner %s, sector %d: already tracking %s, got another commitment message: %s", miner, sectorId, comm.Msg, commitMsg)
|
|
}
|
|
|
|
log.Warnf("commitment.TrackCommitSectorMsg called more than once for miner %s, sector %d, message %s", miner, sectorId, commitMsg)
|
|
|
|
// we still want to store it
|
|
fallthrough // TODO: ideally we'd keep around both (even though we'll
|
|
// usually only need the new one)
|
|
case datastore.ErrNotFound:
|
|
comm := &commitment{Msg: commitMsg}
|
|
commB, err := cbor.DumpObject(comm)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := ct.commitments.Put(key, commB); err != nil {
|
|
return err
|
|
}
|
|
|
|
waits, ok := ct.waits[key]
|
|
if ok {
|
|
close(waits)
|
|
delete(ct.waits, key)
|
|
}
|
|
return nil
|
|
default:
|
|
return err
|
|
}
|
|
}
|
|
|
|
func (ct *Tracker) WaitCommit(ctx context.Context, miner address.Address, sectorId uint64) (cid.Cid, error) {
|
|
key := commitmentKey(miner, sectorId)
|
|
|
|
ct.lk.Lock()
|
|
|
|
tracking, err := ct.commitments.Get(key)
|
|
if err != datastore.ErrNotFound {
|
|
ct.lk.Unlock()
|
|
|
|
if err != nil {
|
|
return cid.Undef, err
|
|
}
|
|
|
|
var comm commitment
|
|
if err := cbor.DecodeInto(tracking, &comm); err != nil {
|
|
return cid.Undef, err
|
|
}
|
|
|
|
return comm.Msg, nil
|
|
}
|
|
|
|
wait, ok := ct.waits[key]
|
|
if !ok {
|
|
wait = make(chan struct{})
|
|
ct.waits[key] = wait
|
|
}
|
|
|
|
ct.lk.Unlock()
|
|
|
|
select {
|
|
case <-wait:
|
|
tracking, err := ct.commitments.Get(key)
|
|
if err != nil {
|
|
return cid.Undef, xerrors.Errorf("failed to get commitment after waiting: %w", err)
|
|
}
|
|
|
|
var comm commitment
|
|
if err := cbor.DecodeInto(tracking, &comm); err != nil {
|
|
return cid.Undef, err
|
|
}
|
|
|
|
return comm.Msg, nil
|
|
case <-ctx.Done():
|
|
return cid.Undef, ctx.Err()
|
|
}
|
|
}
|
|
|
|
func (ct *Tracker) List() ([]api.SectorCommitment, error) {
|
|
out := make([]api.SectorCommitment, 0)
|
|
|
|
ct.lk.Lock()
|
|
defer ct.lk.Unlock()
|
|
|
|
res, err := ct.commitments.Query(dsq.Query{})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer res.Close()
|
|
|
|
for {
|
|
res, ok := res.NextSync()
|
|
if !ok {
|
|
break
|
|
}
|
|
|
|
if res.Error != nil {
|
|
return nil, xerrors.Errorf("iterating commitments: %w", err)
|
|
}
|
|
|
|
parts := strings.Split(res.Key, "/")
|
|
if len(parts) != 4 {
|
|
return nil, xerrors.Errorf("expected commitment key to be 4 parts, Key %s", res.Key)
|
|
}
|
|
|
|
miner, err := address.NewFromString(parts[2])
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("parsing miner address: %w", err)
|
|
}
|
|
|
|
sectorID, err := strconv.ParseInt(parts[3], 10, 64)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("parsing sector id: %w", err)
|
|
}
|
|
|
|
var comm commitment
|
|
if err := cbor.DecodeInto(res.Value, &comm); err != nil {
|
|
return nil, xerrors.Errorf("decoding commitment %s (`% X`): %w", res.Key, res.Value, err)
|
|
}
|
|
|
|
out = append(out, api.SectorCommitment{
|
|
SectorID: uint64(sectorID),
|
|
Miner: miner,
|
|
CommitMsg: comm.Msg,
|
|
DealIDs: comm.DealIDs,
|
|
})
|
|
}
|
|
|
|
return out, nil
|
|
}
|