Skip to content

Commit d4e2999

Browse files
committed
feat: Storage reservations in batch sealing
1 parent 5303e0a commit d4e2999

File tree

7 files changed

+157
-82
lines changed

7 files changed

+157
-82
lines changed

lib/ffi/piece_funcs.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import (
1515
func (sb *SealCalls) WritePiece(ctx context.Context, taskID *harmonytask.TaskID, pieceID storiface.PieceNumber, size int64, data io.Reader) error {
1616
// todo: config(?): allow setting PathStorage for this
1717
// todo storage reservations
18-
paths, _, done, err := sb.sectors.AcquireSector(ctx, taskID, pieceID.Ref(), storiface.FTNone, storiface.FTPiece, storiface.PathSealing)
18+
paths, _, done, err := sb.Sectors.AcquireSector(ctx, taskID, pieceID.Ref(), storiface.FTNone, storiface.FTPiece, storiface.PathSealing)
1919
if err != nil {
2020
return err
2121
}
@@ -68,9 +68,9 @@ func (sb *SealCalls) WritePiece(ctx context.Context, taskID *harmonytask.TaskID,
6868
}
6969

7070
func (sb *SealCalls) PieceReader(ctx context.Context, id storiface.PieceNumber) (io.ReadCloser, error) {
71-
return sb.sectors.storage.ReaderSeq(ctx, id.Ref(), storiface.FTPiece)
71+
return sb.Sectors.storage.ReaderSeq(ctx, id.Ref(), storiface.FTPiece)
7272
}
7373

7474
func (sb *SealCalls) RemovePiece(ctx context.Context, id storiface.PieceNumber) error {
75-
return sb.sectors.storage.Remove(ctx, id.Ref().ID, storiface.FTPiece, true, nil)
75+
return sb.Sectors.storage.Remove(ctx, id.Ref().ID, storiface.FTPiece, true, nil)
7676
}

lib/ffi/scrub_funcs.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import (
1818
)
1919

2020
func (sb *SealCalls) CheckUnsealedCID(ctx context.Context, s storiface.SectorRef) (cid.Cid, error) {
21-
reader, err := sb.sectors.storage.ReaderSeq(ctx, s, storiface.FTUnsealed)
21+
reader, err := sb.Sectors.storage.ReaderSeq(ctx, s, storiface.FTUnsealed)
2222
if err != nil {
2323
return cid.Undef, xerrors.Errorf("getting unsealed sector reader: %w", err)
2424
}

lib/ffi/sdr_funcs.go

+29-18
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"crypto/rand"
66
"encoding/json"
77
"fmt"
8+
"github.com/samber/lo"
89
"io"
910
"os"
1011
"path/filepath"
@@ -43,19 +44,19 @@ type ExternPrecommit2 func(ctx context.Context, sector storiface.SectorRef, cach
4344
}
4445
*/
4546
type SealCalls struct {
46-
sectors *storageProvider
47+
Sectors *storageProvider
4748

4849
/*// externCalls cointain overrides for calling alternative sealing logic
4950
externCalls ExternalSealer*/
5051
}
5152

5253
func NewSealCalls(st *paths.Remote, ls *paths.Local, si paths.SectorIndex) *SealCalls {
5354
return &SealCalls{
54-
sectors: &storageProvider{
55+
Sectors: &storageProvider{
5556
storage: st,
5657
localStore: ls,
5758
sindex: si,
58-
storageReservations: xsync.NewIntegerMapOf[harmonytask.TaskID, *StorageReservation](),
59+
storageReservations: xsync.NewIntegerMapOf[harmonytask.TaskID, []*StorageReservation](),
5960
},
6061
}
6162
}
@@ -64,7 +65,7 @@ type storageProvider struct {
6465
storage *paths.Remote
6566
localStore *paths.Local
6667
sindex paths.SectorIndex
67-
storageReservations *xsync.MapOf[harmonytask.TaskID, *StorageReservation]
68+
storageReservations *xsync.MapOf[harmonytask.TaskID, []*StorageReservation]
6869
}
6970

7071
func (l *storageProvider) AcquireSector(ctx context.Context, taskID *harmonytask.TaskID, sector storiface.SectorRef, existing, allocate storiface.SectorFileType, sealing storiface.PathType) (fspaths, ids storiface.SectorPaths, release func(dontDeclare ...storiface.SectorFileType), err error) {
@@ -74,7 +75,12 @@ func (l *storageProvider) AcquireSector(ctx context.Context, taskID *harmonytask
7475
var ok bool
7576
var resv *StorageReservation
7677
if taskID != nil {
77-
resv, ok = l.storageReservations.Load(*taskID)
78+
resvs, ok := l.storageReservations.Load(*taskID)
79+
if ok {
80+
resv, ok = lo.Find(resvs, func(res *StorageReservation) bool {
81+
return res.SectorRef.ID() == sector.ID
82+
})
83+
}
7884
}
7985
if ok && resv != nil {
8086
if resv.Alloc != allocate || resv.Existing != existing {
@@ -144,7 +150,7 @@ func (l *storageProvider) AcquireSector(ctx context.Context, taskID *harmonytask
144150
}
145151

146152
func (sb *SealCalls) GenerateSDR(ctx context.Context, taskID harmonytask.TaskID, into storiface.SectorFileType, sector storiface.SectorRef, ticket abi.SealRandomness, commDcid cid.Cid) error {
147-
paths, pathIDs, releaseSector, err := sb.sectors.AcquireSector(ctx, &taskID, sector, storiface.FTNone, into, storiface.PathSealing)
153+
paths, pathIDs, releaseSector, err := sb.Sectors.AcquireSector(ctx, &taskID, sector, storiface.FTNone, into, storiface.PathSealing)
148154
if err != nil {
149155
return xerrors.Errorf("acquiring sector paths: %w", err)
150156
}
@@ -223,7 +229,7 @@ func (sb *SealCalls) ensureOneCopy(ctx context.Context, sid abi.SectorID, pathID
223229

224230
log.Debugw("ensureOneCopy", "sector", sid, "type", fileType, "keep", keepIn)
225231

226-
if err := sb.sectors.storage.Remove(ctx, sid, fileType, true, keepIn); err != nil {
232+
if err := sb.Sectors.storage.Remove(ctx, sid, fileType, true, keepIn); err != nil {
227233
return err
228234
}
229235
}
@@ -237,7 +243,7 @@ func (sb *SealCalls) TreeRC(ctx context.Context, task *harmonytask.TaskID, secto
237243
return cid.Undef, cid.Undef, xerrors.Errorf("make phase1 output: %w", err)
238244
}
239245

240-
fspaths, pathIDs, releaseSector, err := sb.sectors.AcquireSector(ctx, task, sector, storiface.FTCache, storiface.FTSealed, storiface.PathSealing)
246+
fspaths, pathIDs, releaseSector, err := sb.Sectors.AcquireSector(ctx, task, sector, storiface.FTCache, storiface.FTSealed, storiface.PathSealing)
241247
if err != nil {
242248
return cid.Undef, cid.Undef, xerrors.Errorf("acquiring sector paths: %w", err)
243249
}
@@ -352,7 +358,7 @@ func (sb *SealCalls) GenerateSynthPoRep() {
352358
}
353359

354360
func (sb *SealCalls) PoRepSnark(ctx context.Context, sn storiface.SectorRef, sealed, unsealed cid.Cid, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness) ([]byte, error) {
355-
vproof, err := sb.sectors.storage.GeneratePoRepVanillaProof(ctx, sn, sealed, unsealed, ticket, seed)
361+
vproof, err := sb.Sectors.storage.GeneratePoRepVanillaProof(ctx, sn, sealed, unsealed, ticket, seed)
356362
if err != nil {
357363
return nil, xerrors.Errorf("failed to generate vanilla proof: %w", err)
358364
}
@@ -498,7 +504,7 @@ func (sb *SealCalls) makePhase1Out(unsCid cid.Cid, spt abi.RegisteredSealProof)
498504
}
499505

500506
func (sb *SealCalls) LocalStorage(ctx context.Context) ([]storiface.StoragePath, error) {
501-
return sb.sectors.localStore.Local(ctx)
507+
return sb.Sectors.localStore.Local(ctx)
502508
}
503509

504510
func changePathType(path string, newType storiface.SectorFileType) (string, error) {
@@ -526,7 +532,7 @@ func changePathType(path string, newType storiface.SectorFileType) (string, erro
526532
return newPath, nil
527533
}
528534
func (sb *SealCalls) FinalizeSector(ctx context.Context, sector storiface.SectorRef, keepUnsealed bool) error {
529-
sectorPaths, pathIDs, releaseSector, err := sb.sectors.AcquireSector(ctx, nil, sector, storiface.FTCache, storiface.FTNone, storiface.PathSealing)
535+
sectorPaths, pathIDs, releaseSector, err := sb.Sectors.AcquireSector(ctx, nil, sector, storiface.FTCache, storiface.FTNone, storiface.PathSealing)
530536
if err != nil {
531537
return xerrors.Errorf("acquiring sector paths: %w", err)
532538
}
@@ -548,7 +554,7 @@ func (sb *SealCalls) FinalizeSector(ctx context.Context, sector storiface.Sector
548554

549555
defer func() {
550556
// We don't pass FTUnsealed to Acquire, so releaseSector won't declare it. Do it here.
551-
if err := sb.sectors.sindex.StorageDeclareSector(ctx, storiface.ID(pathIDs.Unsealed), sector.ID, storiface.FTUnsealed, true); err != nil {
557+
if err := sb.Sectors.sindex.StorageDeclareSector(ctx, storiface.ID(pathIDs.Unsealed), sector.ID, storiface.FTUnsealed, true); err != nil {
552558
log.Errorf("declare unsealed sector error: %+v", err)
553559
}
554560
}()
@@ -666,11 +672,16 @@ func (sb *SealCalls) MoveStorage(ctx context.Context, sector storiface.SectorRef
666672

667673
var opts []storiface.AcquireOption
668674
if taskID != nil {
669-
resv, ok := sb.sectors.storageReservations.Load(*taskID)
675+
resvs, ok := sb.Sectors.storageReservations.Load(*taskID)
670676
// if the reservation is missing MoveStorage will simply create one internally. This is fine as the reservation
671677
// will only be missing when the node is restarting, which means that the missing reservations will get recreated
672678
// anyways, and before we start claiming other tasks.
673679
if ok {
680+
if len(resvs) != 1 {
681+
return xerrors.Errorf("task %d has %d reservations, expected 1", taskID, len(resvs))
682+
}
683+
resv := resvs[0]
684+
674685
defer resv.Release()
675686

676687
if resv.Alloc != storiface.FTNone {
@@ -684,13 +695,13 @@ func (sb *SealCalls) MoveStorage(ctx context.Context, sector storiface.SectorRef
684695
}
685696
}
686697

687-
err := sb.sectors.storage.MoveStorage(ctx, sector, toMove, opts...)
698+
err := sb.Sectors.storage.MoveStorage(ctx, sector, toMove, opts...)
688699
if err != nil {
689700
return xerrors.Errorf("moving storage: %w", err)
690701
}
691702

692703
for _, fileType := range toMove.AllSet() {
693-
if err := sb.sectors.storage.RemoveCopies(ctx, sector.ID, fileType); err != nil {
704+
if err := sb.Sectors.storage.RemoveCopies(ctx, sector.ID, fileType); err != nil {
694705
return xerrors.Errorf("rm copies (t:%s, s:%v): %w", fileType, sector, err)
695706
}
696707
}
@@ -699,7 +710,7 @@ func (sb *SealCalls) MoveStorage(ctx context.Context, sector storiface.SectorRef
699710
}
700711

701712
func (sb *SealCalls) sectorStorageType(ctx context.Context, sector storiface.SectorRef, ft storiface.SectorFileType) (sectorFound bool, ptype storiface.PathType, err error) {
702-
stores, err := sb.sectors.sindex.StorageFindSector(ctx, sector.ID, ft, 0, false)
713+
stores, err := sb.Sectors.sindex.StorageFindSector(ctx, sector.ID, ft, 0, false)
703714
if err != nil {
704715
return false, "", xerrors.Errorf("finding sector: %w", err)
705716
}
@@ -718,7 +729,7 @@ func (sb *SealCalls) sectorStorageType(ctx context.Context, sector storiface.Sec
718729

719730
// PreFetch fetches the sector file to local storage before SDR and TreeRC Tasks
720731
func (sb *SealCalls) PreFetch(ctx context.Context, sector storiface.SectorRef, task *harmonytask.TaskID) (fsPath, pathID storiface.SectorPaths, releaseSector func(...storiface.SectorFileType), err error) {
721-
fsPath, pathID, releaseSector, err = sb.sectors.AcquireSector(ctx, task, sector, storiface.FTCache, storiface.FTNone, storiface.PathSealing)
732+
fsPath, pathID, releaseSector, err = sb.Sectors.AcquireSector(ctx, task, sector, storiface.FTCache, storiface.FTNone, storiface.PathSealing)
722733
if err != nil {
723734
return storiface.SectorPaths{}, storiface.SectorPaths{}, nil, xerrors.Errorf("acquiring sector paths: %w", err)
724735
}
@@ -754,7 +765,7 @@ func (sb *SealCalls) TreeD(ctx context.Context, sector storiface.SectorRef, unse
754765
}
755766

756767
func (sb *SealCalls) SyntheticProofs(ctx context.Context, task *harmonytask.TaskID, sector storiface.SectorRef, sealed cid.Cid, unsealed cid.Cid, randomness abi.SealRandomness, pieces []abi.PieceInfo) error {
757-
fspaths, pathIDs, releaseSector, err := sb.sectors.AcquireSector(ctx, task, sector, storiface.FTCache|storiface.FTSealed, storiface.FTNone, storiface.PathSealing)
768+
fspaths, pathIDs, releaseSector, err := sb.Sectors.AcquireSector(ctx, task, sector, storiface.FTCache|storiface.FTSealed, storiface.FTNone, storiface.PathSealing)
758769
if err != nil {
759770
return xerrors.Errorf("acquiring sector paths: %w", err)
760771
}

lib/ffi/snap_funcs.go

+12-7
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func (sb *SealCalls) EncodeUpdate(
4141
noDecl = storiface.FTUnsealed
4242
}
4343

44-
paths, pathIDs, releaseSector, err := sb.sectors.AcquireSector(ctx, &taskID, sector, storiface.FTNone, storiface.FTUpdate|storiface.FTUpdateCache|storiface.FTUnsealed, storiface.PathSealing)
44+
paths, pathIDs, releaseSector, err := sb.Sectors.AcquireSector(ctx, &taskID, sector, storiface.FTNone, storiface.FTUpdate|storiface.FTUpdateCache|storiface.FTUnsealed, storiface.PathSealing)
4545
if err != nil {
4646
return cid.Undef, cid.Undef, xerrors.Errorf("acquiring sector paths: %w", err)
4747
}
@@ -133,7 +133,7 @@ func (sb *SealCalls) EncodeUpdate(
133133

134134
log.Debugw("get key data", "keyPath", keyPath, "keyCachePath", keyCachePath, "sectorID", sector.ID, "taskID", taskID)
135135

136-
r, err := sb.sectors.storage.ReaderSeq(ctx, sector, storiface.FTSealed)
136+
r, err := sb.Sectors.storage.ReaderSeq(ctx, sector, storiface.FTSealed)
137137
if err != nil {
138138
return cid.Undef, cid.Undef, xerrors.Errorf("getting sealed sector reader: %w", err)
139139
}
@@ -177,7 +177,7 @@ func (sb *SealCalls) EncodeUpdate(
177177

178178
// fetch cache
179179
var buf bytes.Buffer // usually 73.2 MiB
180-
err = sb.sectors.storage.ReadMinCacheInto(ctx, sector, storiface.FTCache, &buf)
180+
err = sb.Sectors.storage.ReadMinCacheInto(ctx, sector, storiface.FTCache, &buf)
181181
if err != nil {
182182
return cid.Undef, cid.Undef, xerrors.Errorf("reading cache: %w", err)
183183
}
@@ -269,7 +269,7 @@ func (sb *SealCalls) EncodeUpdate(
269269
}
270270

271271
func (sb *SealCalls) ProveUpdate(ctx context.Context, proofType abi.RegisteredUpdateProof, sector storiface.SectorRef, key, sealed, unsealed cid.Cid) ([]byte, error) {
272-
jsonb, err := sb.sectors.storage.ReadSnapVanillaProof(ctx, sector)
272+
jsonb, err := sb.Sectors.storage.ReadSnapVanillaProof(ctx, sector)
273273
if err != nil {
274274
return nil, xerrors.Errorf("read snap vanilla proof: %w", err)
275275
}
@@ -301,11 +301,16 @@ func (sb *SealCalls) MoveStorageSnap(ctx context.Context, sector storiface.Secto
301301

302302
var opts []storiface.AcquireOption
303303
if taskID != nil {
304-
resv, ok := sb.sectors.storageReservations.Load(*taskID)
304+
resvs, ok := sb.Sectors.storageReservations.Load(*taskID)
305305
// if the reservation is missing MoveStorage will simply create one internally. This is fine as the reservation
306306
// will only be missing when the node is restarting, which means that the missing reservations will get recreated
307307
// anyways, and before we start claiming other tasks.
308308
if ok {
309+
if len(resvs) != 1 {
310+
return xerrors.Errorf("task %d has %d reservations, expected 1", taskID, len(resvs))
311+
}
312+
resv := resvs[0]
313+
309314
defer resv.Release()
310315

311316
if resv.Alloc != storiface.FTNone {
@@ -319,13 +324,13 @@ func (sb *SealCalls) MoveStorageSnap(ctx context.Context, sector storiface.Secto
319324
}
320325
}
321326

322-
err := sb.sectors.storage.MoveStorage(ctx, sector, toMove, opts...)
327+
err := sb.Sectors.storage.MoveStorage(ctx, sector, toMove, opts...)
323328
if err != nil {
324329
return xerrors.Errorf("moving storage: %w", err)
325330
}
326331

327332
for _, fileType := range toMove.AllSet() {
328-
if err := sb.sectors.storage.RemoveCopies(ctx, sector.ID, fileType); err != nil {
333+
if err := sb.Sectors.storage.RemoveCopies(ctx, sector.ID, fileType); err != nil {
329334
return xerrors.Errorf("rm copies (t:%s, s:%v): %w", fileType, sector, err)
330335
}
331336
}

0 commit comments

Comments
 (0)