Skip to content

Commit 3adcf92

Browse files
committed
creating kafka operator, working on #32
1 parent 442d67a commit 3adcf92

10 files changed

+754
-349
lines changed

k8s/go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,6 @@ require (
66
github.com/go-git/go-git/v5 v5.1.0
77
golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect
88
golang.org/x/tools v0.0.0-20200918232735-d647fc253266 // indirect
9+
gopkg.in/yaml.v2 v2.2.4
910
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776
1011
)
File renamed without changes.

k8s/k8ssetup/database.go

+4-16
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@ import (
66
"io/ioutil"
77
"log"
88
"os"
9-
"regexp"
9+
"strings"
1010

11-
"gopkg.in/yaml.v3"
11+
"gopkg.in/yaml.v2"
1212
)
1313

1414
// DatabaseYml is used to read a yml file in order to get the cluster name
@@ -20,12 +20,7 @@ type DatabaseYml struct {
2020
}
2121

2222
func (k k8sSetUpImpl) isDatabaseCreated(cluster string) (bool, error) {
23-
log.Printf("Checking if database cluster %q is already created ...", cluster)
24-
if _, err := k.kubectl("describe", "postgresql/"+cluster); err != nil {
25-
return false, err
26-
}
27-
28-
return true, nil
23+
return k.isResourceCreated("postgresql", cluster, "default")
2924
}
3025

3126
func (k k8sSetUpImpl) createDatabase(fileName string) error {
@@ -57,18 +52,11 @@ func (k k8sSetUpImpl) getClusterName(fileName string) (clusterName string, err e
5752
func (k k8sSetUpImpl) isDatabaseRunning(cluster string) (bool, error) {
5853
log.Printf("Checking if database cluster %q is already running ...", cluster)
5954
output, err := k.kubectl("get", "postgresql/"+cluster, "-o", "jsonpath={.status}")
60-
status := ""
6155
if err != nil {
6256
return false, err
6357
}
6458

65-
var re = regexp.MustCompile(`(?m).*:(.*)]`)
66-
match := re.FindStringSubmatch(output)
67-
if len(match) > 1 {
68-
status = match[1]
69-
}
70-
71-
return status == "Running", nil
59+
return strings.Contains(output, "Running"), nil
7260
}
7361

7462
func (k k8sSetUpImpl) waitDatabaseCreation(cluster string) {

k8s/k8ssetup/database_test.go

+9-10
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,14 @@ import (
77
"testing"
88
)
99

10+
1011
func Test_getClusterName(t *testing.T) {
1112
k8sImpl := NewK8sSetUp().(*k8sSetUpImpl)
1213

1314
t.Run("must return the cluster name", func(t *testing.T) {
1415
expect := "cluster"
1516
var expectErr error = nil
16-
got, gotErr := k8sImpl.getClusterName(getFilePath("cluster.yml"))
17+
got, gotErr := k8sImpl.getClusterName(getFilePath("psql-cluster.yml"))
1718

1819
if gotErr != expectErr {
1920
t.Fatalf("Got error %v, expect error %v", gotErr, expectErr)
@@ -102,7 +103,7 @@ func Test_isDatabaseCreated(t *testing.T) {
102103
func Test_createDatabase(t *testing.T) {
103104
k8sImpl := NewK8sSetUp().(*k8sSetUpImpl)
104105

105-
t.Run("must return true if database is created", func(t *testing.T) {
106+
t.Run("must return no error if database is created", func(t *testing.T) {
106107
k8sImpl.executeCommand = func(cmdName string, params ...string) (string, error) {
107108
return "", nil
108109
}
@@ -115,7 +116,7 @@ func Test_createDatabase(t *testing.T) {
115116
}
116117
})
117118

118-
t.Run("must return false if database is not created", func(t *testing.T) {
119+
t.Run("must return error if database is not created", func(t *testing.T) {
119120
var errInvalid = errors.New("invalid")
120121
k8sImpl.executeCommand = func(cmdName string, params ...string) (string, error) {
121122
return "", errInvalid
@@ -130,15 +131,13 @@ func Test_createDatabase(t *testing.T) {
130131
})
131132
}
132133

133-
// map[PostgresClusterStatus:Running]
134134
func Test_isDatabaseRunning(t *testing.T) {
135135
k8sImpl := NewK8sSetUp().(*k8sSetUpImpl)
136136

137137
t.Run("must return true if database is running", func(t *testing.T) {
138138
k8sImpl.executeCommand = func(cmdName string, params ...string) (string, error) {
139139
return "map[PostgresClusterStatus:Running]", nil
140140
}
141-
142141
expect := true
143142
var expectErr error = nil
144143
got, gotErr := k8sImpl.isDatabaseRunning("cluster")
@@ -243,7 +242,7 @@ func Test_DatabaseCreation(t *testing.T) {
243242
}
244243

245244
var expect error = nil
246-
got := k8sImpl.DatabaseCreation("cluster.yml")
245+
got := k8sImpl.DatabaseCreation("psql-cluster.yml")
247246
if got != expect {
248247
t.Fatalf("Got error %v, expect error %v", got, expect)
249248
}
@@ -267,7 +266,7 @@ func Test_DatabaseCreation(t *testing.T) {
267266
}
268267

269268
expect := "already exists"
270-
got := k8sImpl.DatabaseCreation("cluster.yml")
269+
got := k8sImpl.DatabaseCreation("psql-cluster.yml")
271270
if !strings.Contains(got.Error(), expect) {
272271
t.Fatalf("Got error %v, expect error %v", got, expect)
273272
}
@@ -278,14 +277,14 @@ func Test_DatabaseCreation(t *testing.T) {
278277
if params[0] == "describe" && params[1] == "postgresql/cluster" {
279278
return "error", errors.New("error kubectl describe")
280279
}
281-
if params[0] == "create" && params[1] == "-f" && params[2] == "cluster.yml" {
280+
if params[0] == "create" && params[1] == "-f" && params[2] == "psql-cluster.yml" {
282281
return "error", errors.New("error kubectl create")
283282
}
284283
return "map[PostgresClusterStatus:Running]", nil
285284
}
286285

287286
expect := "error creating database cluster"
288-
got := k8sImpl.DatabaseCreation("cluster.yml")
287+
got := k8sImpl.DatabaseCreation("psql-cluster.yml")
289288
if !strings.Contains(got.Error(), expect) {
290289
t.Fatalf("Got error %v, expect error %v", got, expect)
291290
}
@@ -303,7 +302,7 @@ func Test_DatabaseCreation(t *testing.T) {
303302
}
304303

305304
expect := "error creating job for cluster"
306-
got := k8sImpl.DatabaseCreation("cluster.yml")
305+
got := k8sImpl.DatabaseCreation("psql-cluster.yml")
307306
if !strings.Contains(got.Error(), expect) {
308307
t.Fatalf("Got error %v, expect error %v", got, expect)
309308
}

k8s/k8ssetup/k8ssetup.go

+18-49
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ type K8sSetUp interface {
1818
Initialize() error
1919
InstallPostgresqlOperator() error
2020
DatabaseCreation(fileName string) error
21-
InstallKafkaOperator() error
21+
CheckKudoInstallation() error
22+
KafkaClusterCreation(fileName string) error
2223
}
2324

2425
type k8sSetUpImpl struct {
@@ -51,20 +52,13 @@ func (k k8sSetUpImpl) InstallPostgresqlOperator() error {
5152
return nil
5253
}
5354

54-
func (k k8sSetUpImpl) InstallKafkaOperator() error {
55-
log.Println("Installing Kafka operator ...")
55+
func (k k8sSetUpImpl) CheckKudoInstallation() error {
56+
log.Println("Checking kudo installation ...")
5657

57-
if installed := k.isKafkaOperatorInstalled(); !installed {
58-
log.Println("Kafka operator not installed ...")
59-
if err := k.doKafkaOperatorInstallation(); err != nil {
60-
return fmt.Errorf("error installing Kafka operator: %v", err)
61-
}
62-
k.waiKafkaOperatorRunning()
63-
64-
} else {
65-
log.Println("Kafka operator is installed ...")
58+
if _, err := k.kubectl("kudo", "version"); err != nil {
59+
return fmt.Errorf("kudo is not installed: %v", err)
6660
}
67-
61+
log.Println("Kudo is installed ...")
6862
return nil
6963
}
7064

@@ -77,16 +71,7 @@ func (k k8sSetUpImpl) waitPsqlOperatorRunning() {
7771
log.Print("Psql operator is running")
7872
}
7973

80-
func (k k8sSetUpImpl) waiKafkaOperatorRunning() {
81-
cnt := true
82-
for cnt {
83-
ready, err := k.isKafkaOperatorRunning()
84-
cnt = !(err == nil && ready)
85-
}
86-
log.Print("Kafka operator is running")
87-
}
88-
89-
func (k k8sSetUpImpl) isOperatorRunning(name, namespace string) (running bool, err error) {
74+
func (k k8sSetUpImpl) isPodRunning(name, namespace string) (running bool, err error) {
9075
log.Printf("Checking if %s operator is already running ...", name)
9176

9277
var podNames, output string
@@ -105,11 +90,7 @@ func (k k8sSetUpImpl) isOperatorRunning(name, namespace string) (running bool, e
10590
}
10691

10792
func (k k8sSetUpImpl) isPsqlOperatorRunning() (bool, error) {
108-
return k.isOperatorRunning("postgres-operator", "default")
109-
}
110-
111-
func (k k8sSetUpImpl) isKafkaOperatorRunning() (bool, error) {
112-
return k.isOperatorRunning("strimzi-cluster-operator", "kafka")
93+
return k.isPodRunning("postgres-operator", "default")
11394
}
11495

11596
func (k k8sSetUpImpl) isPostgreSQLOperatorInstalled() bool {
@@ -121,15 +102,6 @@ func (k k8sSetUpImpl) isPostgreSQLOperatorInstalled() bool {
121102
return true
122103
}
123104

124-
func (k k8sSetUpImpl) isKafkaOperatorInstalled() bool {
125-
log.Println("Checking if kafka operator is already installed ...")
126-
if _, err := k.kubectl("describe", "deployment.apps/strimzi-cluster-operator", "-n", "kafka"); err != nil {
127-
return false
128-
}
129-
130-
return true
131-
}
132-
133105
func (k *k8sSetUpImpl) doPsqlOperatorInstallation() error {
134106
log.Println("Installing postgreSQL operator ...")
135107
dir, err := ioutil.TempDir("", "pets-go-infra")
@@ -164,18 +136,6 @@ func (k *k8sSetUpImpl) doPsqlOperatorInstallation() error {
164136
return nil
165137
}
166138

167-
func (k *k8sSetUpImpl) doKafkaOperatorInstallation() error {
168-
log.Println("Installing kafka operator ...")
169-
if _, err := k.kubectl("create", "namespace", "kafka"); err != nil {
170-
return fmt.Errorf("error in kubectl create: %v", err)
171-
}
172-
if _, err := k.kubectl("apply", "-f", "https://strimzi.io/install/latest?namespace=kafka", "-n", "kafka"); err != nil {
173-
return fmt.Errorf("error in kubectl apply: %v", err)
174-
}
175-
176-
return nil
177-
}
178-
179139
func (k k8sSetUpImpl) kubectl(params ...string) (output string, err error) {
180140
return k.executeCommand(k.kubectlPath, params...)
181141
}
@@ -200,6 +160,15 @@ func (k k8sSetUpImpl) defaultExecuteCommand(cmdName string, params ...string) (o
200160
return
201161
}
202162

163+
func (k k8sSetUpImpl) isResourceCreated(rtype, name, namespace string) (bool, error) {
164+
log.Printf("Checking if resource %q name %q is already created ...", rtype, name)
165+
if _, err := k.kubectl("describe", rtype+"/"+name, "-n", namespace); err != nil {
166+
return false, err
167+
}
168+
169+
return true, nil
170+
}
171+
203172
// NewK8sSetUp returns a K8sSetUp interface
204173
func NewK8sSetUp() K8sSetUp {
205174
impl := &k8sSetUpImpl{

0 commit comments

Comments
 (0)