paramfetch: check checksums in parallel

This commit is contained in:
Łukasz Magiera 2019-10-27 10:18:27 +01:00
parent 874be79958
commit 117ec636c5
7 changed files with 66 additions and 28 deletions

View File

@ -129,8 +129,8 @@ type StorageMinerStruct struct {
StoreGarbageData func(context.Context) (uint64, error) `perm:"write"` StoreGarbageData func(context.Context) (uint64, error) `perm:"write"`
SectorsStatus func(context.Context, uint64) (sectorbuilder.SectorSealingStatus, error) `perm:"read"` SectorsStatus func(context.Context, uint64) (sectorbuilder.SectorSealingStatus, error) `perm:"read"`
SectorsList func(context.Context) ([]uint64, error) `perm:"read"` SectorsList func(context.Context) ([]uint64, error) `perm:"read"`
SectorsRefs func(context.Context) (map[string][]SealedRef, error) `perm:"read"` SectorsRefs func(context.Context) (map[string][]SealedRef, error) `perm:"read"`
} }

View File

@ -8,10 +8,13 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
"sync"
rice "github.com/GeertJohan/go.rice" rice "github.com/GeertJohan/go.rice"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
"github.com/minio/blake2b-simd" "github.com/minio/blake2b-simd"
"go.uber.org/multierr"
"golang.org/x/xerrors"
pb "gopkg.in/cheggaaa/pb.v1" pb "gopkg.in/cheggaaa/pb.v1"
) )
@ -27,6 +30,13 @@ type paramFile struct {
SectorSize uint64 `json:"sector_size"` SectorSize uint64 `json:"sector_size"`
} }
type fetch struct {
wg sync.WaitGroup
fetchLk sync.Mutex
errs []error
}
func GetParams(storage bool) error { func GetParams(storage bool) error {
if err := os.Mkdir(paramdir, 0755); err != nil && !os.IsExist(err) { if err := os.Mkdir(paramdir, 0755); err != nil && !os.IsExist(err) {
return err return err
@ -39,6 +49,8 @@ func GetParams(storage bool) error {
return err return err
} }
ft := &fetch{}
for name, info := range params { for name, info := range params {
if !SupportedSectorSize(info.SectorSize) { if !SupportedSectorSize(info.SectorSize) {
continue continue
@ -47,35 +59,61 @@ func GetParams(storage bool) error {
continue continue
} }
if err := maybeFetch(name, info); err != nil { ft.maybeFetchAsync(name, info)
return err
}
} }
return nil return ft.wait()
} }
func maybeFetch(name string, info paramFile) error { func (ft *fetch) maybeFetchAsync(name string, info paramFile) {
path := filepath.Join(paramdir, name) ft.wg.Add(1)
go func() {
defer ft.wg.Done()
path := filepath.Join(paramdir, name)
err := ft.checkFile(path, info)
if !os.IsNotExist(err) && err != nil {
log.Warn(err)
}
if err == nil {
return
}
ft.fetchLk.Lock()
defer ft.fetchLk.Unlock()
if err := doFetch(path, info); err != nil {
ft.errs = append(ft.errs, xerrors.Errorf("fetching file %s: %w", path, err))
}
}()
}
func (ft *fetch) checkFile(path string, info paramFile) error {
f, err := os.Open(path) f, err := os.Open(path)
if err == nil { if err != nil {
defer f.Close() return err
}
defer f.Close()
h := blake2b.New512() h := blake2b.New512()
if _, err := io.Copy(h, f); err != nil { if _, err := io.Copy(h, f); err != nil {
return err return err
}
sum := h.Sum(nil)
strSum := hex.EncodeToString(sum[:16])
if strSum == info.Digest {
return nil
}
log.Warnf("Checksum mismatch in param file %s, %s != %s", name, strSum, info.Digest)
} }
return doFetch(path, info) sum := h.Sum(nil)
strSum := hex.EncodeToString(sum[:16])
if strSum == info.Digest {
return nil
}
return xerrors.Errorf("checksum mismatch in param file %s, %s != %s", path, strSum, info.Digest)
}
func (ft *fetch) wait() error {
ft.wg.Wait()
return multierr.Combine(ft.errs...)
} }
func doFetch(out string, info paramFile) error { func doFetch(out string, info paramFile) error {

View File

@ -372,7 +372,7 @@ func MakeGenesisBlock(bs bstore.Blockstore, balances map[address.Address]types.B
log.Infof("Empty Genesis root: %s", emptyroot) log.Infof("Empty Genesis root: %s", emptyroot)
genesisticket := &types.Ticket{ genesisticket := &types.Ticket{
VRFProof: []byte("vrf proof"), VRFProof: []byte("vrf proof0000000vrf proof0000000"),
} }
b := &types.BlockHeader{ b := &types.BlockHeader{

View File

@ -27,7 +27,7 @@ func testBlockHeader(t testing.TB) *BlockHeader {
ElectionProof: []byte("cats won the election"), ElectionProof: []byte("cats won the election"),
Tickets: []*Ticket{ Tickets: []*Ticket{
&Ticket{ &Ticket{
VRFProof: []byte("vrf proof"), VRFProof: []byte("vrf proof0000000vrf proof0000000"),
}, },
}, },
Parents: []cid.Cid{c, c}, Parents: []cid.Cid{c, c},

1
go.mod
View File

@ -79,6 +79,7 @@ require (
go.uber.org/dig v1.7.0 // indirect go.uber.org/dig v1.7.0 // indirect
go.uber.org/fx v1.9.0 go.uber.org/fx v1.9.0
go.uber.org/goleak v0.10.0 // indirect go.uber.org/goleak v0.10.0 // indirect
go.uber.org/multierr v1.1.0
go.uber.org/zap v1.10.0 go.uber.org/zap v1.10.0
go4.org v0.0.0-20190313082347-94abd6928b1d // indirect go4.org v0.0.0-20190313082347-94abd6928b1d // indirect
golang.org/x/crypto v0.0.0-20190829043050-9756ffdc2472 // indirect golang.org/x/crypto v0.0.0-20190829043050-9756ffdc2472 // indirect

View File

@ -181,7 +181,6 @@ func SealTicketGen(api api.FullNode) sector.TicketFn {
return nil, xerrors.Errorf("unexpected randomness len: %d (expected %d)", n, sectorbuilder.CommLen) return nil, xerrors.Errorf("unexpected randomness len: %d (expected %d)", n, sectorbuilder.CommLen)
} }
return &sectorbuilder.SealTicket{ return &sectorbuilder.SealTicket{
BlockHeight: ts.Height(), BlockHeight: ts.Height(),
TicketBytes: tkt, TicketBytes: tkt,

View File

@ -38,7 +38,7 @@ type TicketFn func(context.Context) (*sectorbuilder.SealTicket, error)
type Store struct { type Store struct {
waitingLk sync.Mutex waitingLk sync.Mutex
sb *sectorbuilder.SectorBuilder sb *sectorbuilder.SectorBuilder
tktFn TicketFn tktFn TicketFn
dealsLk sync.Mutex dealsLk sync.Mutex
@ -54,7 +54,7 @@ type Store struct {
func NewStore(sb *sectorbuilder.SectorBuilder, ds dtypes.MetadataDS, tktFn TicketFn) *Store { func NewStore(sb *sectorbuilder.SectorBuilder, ds dtypes.MetadataDS, tktFn TicketFn) *Store {
return &Store{ return &Store{
sb: sb, sb: sb,
tktFn:tktFn, tktFn: tktFn,
deals: namespace.Wrap(ds, sectorDealsPrefix), deals: namespace.Wrap(ds, sectorDealsPrefix),
waiting: map[uint64]chan struct{}{}, waiting: map[uint64]chan struct{}{},
closeCh: make(chan struct{}), closeCh: make(chan struct{}),