Skip to content

Commit 87acc83

Browse files
add MemOptimizedCheckpoint to read checkpoint actions from the log
1 parent cb4d850 commit 87acc83

File tree

2 files changed

+82
-20
lines changed

2 files changed

+82
-20
lines changed

log.go

+41-20
Original file line numberDiff line numberDiff line change
@@ -84,13 +84,14 @@ func ForTableWithStore(dataPath string, config Config, clock Clock, ls *store.St
8484
}
8585

8686
logImpl := &logImpl{
87-
dataPath: dataPath,
88-
logPath: logPath,
89-
clock: clock,
90-
store: *logStore,
91-
deltaLogLock: deltaLogLock,
92-
history: historyManager,
93-
snapshotReader: snaptshotManager,
87+
dataPath: dataPath,
88+
logPath: logPath,
89+
clock: clock,
90+
store: *logStore,
91+
deltaLogLock: deltaLogLock,
92+
history: historyManager,
93+
snapshotReader: snaptshotManager,
94+
checkpointReader: &parquetReader,
9495
}
9596

9697
return logImpl, nil
@@ -173,13 +174,14 @@ func ForTable(dataPath string, config Config, clock Clock) (Log, error) {
173174
// }
174175

175176
type logImpl struct {
176-
dataPath string
177-
logPath string
178-
clock Clock
179-
store store.Store
180-
deltaLogLock *sync.Mutex
181-
history *historyManager
182-
snapshotReader *SnapshotReader
177+
dataPath string
178+
logPath string
179+
clock Clock
180+
store store.Store
181+
deltaLogLock *sync.Mutex
182+
history *historyManager
183+
snapshotReader *SnapshotReader
184+
checkpointReader *checkpointReader
183185
}
184186

185187
// Snapshot the current Snapshot of the Delta table.
@@ -222,23 +224,32 @@ func (l *logImpl) Path() string {
222224
return l.dataPath
223225
}
224226

225-
// Get all actions starting from startVersion (inclusive) in increasing order of committed version.
227+
// Changes Get all actions starting from startVersion (inclusive) in increasing order of committed version.
226228
// If startVersion doesn't exist, return an empty Iterator.
227229
func (l *logImpl) Changes(startVersion int64, failOnDataLoss bool) (iter.Iter[VersionLog], error) {
228230
if startVersion < 0 {
229231
return nil, eris.Wrap(errno.ErrIllegalArgument, "invalid startVersion")
230232
}
231-
232-
fs, err := l.store.ListFrom(filenames.DeltaFile("", startVersion))
233+
var fs iter.Iter[*store.FileMeta]
234+
checkpointIncluded := true
235+
fs, err := l.store.ListFrom(filenames.CheckpointFileSingular("", startVersion))
233236
if err != nil {
234-
return nil, err
237+
checkpointIncluded = false
238+
fs, err = l.store.ListFrom(filenames.DeltaFile("", startVersion))
239+
if err != nil {
240+
return nil, err
241+
}
235242
}
236243
defer fs.Close()
237244

238245
var deltaPaths []string
239246
for f, err := fs.Next(); err == nil; f, err = fs.Next() {
240-
if filenames.IsDeltaFile(f.Path()) {
241-
deltaPaths = append(deltaPaths, f.Path())
247+
p := f.Path()
248+
if checkpointIncluded && filenames.IsCheckpointFile(p) {
249+
deltaPaths = append(deltaPaths, p)
250+
checkpointIncluded = false
251+
} else if filenames.IsDeltaFile(p) {
252+
deltaPaths = append(deltaPaths, p)
242253
}
243254
}
244255
if err != nil && err != io.EOF {
@@ -248,6 +259,16 @@ func (l *logImpl) Changes(startVersion int64, failOnDataLoss bool) (iter.Iter[Ve
248259
lastSeenVersion := startVersion - 1
249260
versionLogs := make([]VersionLog, len(deltaPaths))
250261
for i, deltaPath := range deltaPaths {
262+
if filenames.IsCheckpointFile(deltaPath) {
263+
version := filenames.CheckpointVersion(deltaPath)
264+
versionLogs[i] = &MemOptimizedCheckpoint{
265+
version: version,
266+
path: deltaPath,
267+
store: l.store,
268+
cr: l.checkpointReader,
269+
}
270+
continue
271+
}
251272
version := filenames.DeltaVersion(deltaPath)
252273
if failOnDataLoss && version > lastSeenVersion+1 {
253274
return nil, errno.IllegalStateError("fail on data loss")

version_log.go

+41
Original file line numberDiff line numberDiff line change
@@ -69,3 +69,44 @@ func (m *MemOptimizedVersionLog) ActionIter() (iter.Iter[action.Action], error)
6969
}
7070
return mapIter, nil
7171
}
72+
73+
type MemOptimizedCheckpoint struct {
74+
version int64
75+
path string
76+
store store.Store
77+
cr *checkpointReader
78+
}
79+
80+
func (m *MemOptimizedCheckpoint) Version() int64 {
81+
return m.version * -1
82+
}
83+
84+
func (m *MemOptimizedCheckpoint) Actions() ([]action.Action, error) {
85+
cr := *(m.cr)
86+
i, err := cr.Read(m.path)
87+
if err != nil {
88+
return nil, err
89+
}
90+
defer i.Close()
91+
92+
return iter.Map(i, func(a action.Action) (action.Action, error) {
93+
if a.Wrap().MetaData != nil {
94+
md := a.Wrap().MetaData
95+
if md.Configuration == nil {
96+
md.Configuration = map[string]string{}
97+
}
98+
if md.PartitionColumns == nil {
99+
md.PartitionColumns = []string{}
100+
}
101+
if md.Format.Options == nil {
102+
md.Format.Options = map[string]string{}
103+
}
104+
}
105+
return a, nil
106+
})
107+
}
108+
109+
func (m *MemOptimizedCheckpoint) ActionIter() (iter.Iter[action.Action], error) {
110+
cr := *(m.cr)
111+
return cr.Read(m.path)
112+
}

0 commit comments

Comments
 (0)