Skip to content

Commit 80cc2fa

Browse files
authored
1 parent 8467776 commit 80cc2fa

File tree

2 files changed

+151
-4
lines changed

2 files changed

+151
-4
lines changed

checkpoint.go

+10-4
Original file line numberDiff line numberDiff line change
@@ -164,8 +164,13 @@ func FindLastCompleteCheckpoint(s store.Store, cv CheckpointInstance) (mo.Option
164164
}
165165
defer iter.Close()
166166

167-
var checkpoints []*CheckpointInstance
168-
for f, err := iter.Next(); err == nil; f, err = iter.Next() {
167+
var (
168+
checkpoints []*CheckpointInstance
169+
iterErr error
170+
f *store.FileMeta
171+
)
172+
173+
for f, iterErr = iter.Next(); iterErr == nil; f, iterErr = iter.Next() {
169174

170175
if !filenames.IsCheckpointFile(f.Path()) {
171176
continue
@@ -177,8 +182,9 @@ func FindLastCompleteCheckpoint(s store.Store, cv CheckpointInstance) (mo.Option
177182
break
178183
}
179184
}
180-
if err != nil && err != io.EOF {
181-
return mo.None[*CheckpointInstance](), eris.Wrap(err, "")
185+
186+
if iterErr != nil && iterErr != io.EOF {
187+
return mo.None[*CheckpointInstance](), eris.Wrap(iterErr, "")
182188
}
183189

184190
lastCheckpoint := GetLatestCompleteCheckpointFromList(checkpoints, cv)

checkpoint_test.go

+141
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
package deltago_test
2+
3+
import (
4+
"fmt"
5+
"io"
6+
"testing"
7+
8+
delta "github.com/csimplestring/delta-go"
9+
"github.com/rotisserie/eris"
10+
"github.com/samber/mo"
11+
12+
"github.com/csimplestring/delta-go/iter"
13+
"github.com/csimplestring/delta-go/store"
14+
"github.com/stretchr/testify/assert"
15+
)
16+
17+
func TestFindLastCompleteCheckpoint(t *testing.T) {
18+
19+
s, err := newMemLogStore()
20+
assert.NoError(t, err)
21+
22+
t.Run("returns err when first call to iterator fails with an error different from io.EOF", func(t *testing.T) {
23+
24+
t.Cleanup(s.reset)
25+
26+
callFailure := fmt.Errorf("RequestError: send request failed")
27+
28+
failIf = func() (bool, error) {
29+
return true, callFailure
30+
}
31+
32+
lastCheckpoint, err := delta.FindLastCompleteCheckpoint(s, delta.MaxInstance)
33+
34+
assert.Equal(t, lastCheckpoint, mo.None[*delta.CheckpointInstance]())
35+
assert.Equal(t, err.Error(), eris.Wrap(callFailure, "").Error())
36+
})
37+
38+
t.Run("returns err when any call to iterator fails with an error different from io.EOF", func(t *testing.T) {
39+
40+
t.Cleanup(s.reset)
41+
42+
callFailure := fmt.Errorf("RequestError: send request failed")
43+
44+
var n int
45+
failIf = func() (bool, error) {
46+
n++
47+
if n == 3 {
48+
return true, callFailure
49+
}
50+
return false, nil
51+
}
52+
53+
lastCheckpoint, err := delta.FindLastCompleteCheckpoint(s, delta.MaxInstance)
54+
55+
assert.Equal(t, lastCheckpoint, mo.None[*delta.CheckpointInstance]())
56+
assert.Equal(t, err.Error(), eris.Wrap(callFailure, "").Error())
57+
})
58+
59+
t.Run("returns no err when io.EOF is returned", func(t *testing.T) {
60+
61+
t.Cleanup(s.reset)
62+
63+
var n int
64+
failIf = func() (bool, error) {
65+
n++
66+
if n == 2 {
67+
return true, io.EOF
68+
}
69+
return false, nil
70+
}
71+
72+
lastCheckpoint, err := delta.FindLastCompleteCheckpoint(s, delta.MaxInstance)
73+
74+
assert.Equal(t, lastCheckpoint, mo.None[*delta.CheckpointInstance]())
75+
assert.Equal(t, err, nil)
76+
})
77+
78+
}
79+
80+
func newMemLogStore() (*memLogStore, error) {
81+
return &memLogStore{}, nil
82+
}
83+
84+
type memLogStore struct {
85+
mustFail func() (bool, error)
86+
}
87+
88+
func (s *memLogStore) reset() {
89+
s.mustFail = func() (bool, error) { return false, nil }
90+
}
91+
92+
func (s *memLogStore) Root() string {
93+
return ""
94+
}
95+
96+
func (s *memLogStore) Read(path string) (iter.Iter[string], error) {
97+
return nil, fmt.Errorf("not implemented")
98+
}
99+
100+
var failIf func() (bool, error)
101+
102+
func (s *memLogStore) ListFrom(path string) (iter.Iter[*store.FileMeta], error) {
103+
return &memIter{
104+
mustFail: failIf,
105+
}, nil
106+
}
107+
108+
type memIter struct {
109+
mustFail func() (bool, error)
110+
}
111+
112+
func (i *memIter) Next() (*store.FileMeta, error) {
113+
fail, err := i.mustFail()
114+
if fail {
115+
return nil, err
116+
}
117+
return &store.FileMeta{}, nil
118+
}
119+
120+
func (i *memIter) Close() error {
121+
return fmt.Errorf("not implemented")
122+
}
123+
124+
func (s *memLogStore) Write(path string, actions iter.Iter[string], overwrite bool) error {
125+
return fmt.Errorf("not implemented")
126+
}
127+
128+
func (s *memLogStore) ResolvePathOnPhysicalStore(path string) (string, error) {
129+
return "", fmt.Errorf("not implemented")
130+
}
131+
132+
func (s *memLogStore) IsPartialWriteVisible(path string) bool {
133+
return false
134+
}
135+
136+
func (s *memLogStore) Exists(path string) (bool, error) {
137+
return false, fmt.Errorf("not implemented")
138+
}
139+
func (s *memLogStore) Create(path string) error {
140+
return fmt.Errorf("not implemented")
141+
}

0 commit comments

Comments
 (0)