@@ -18,6 +18,7 @@ type K8sSetUp interface {
18
18
Initialize () error
19
19
InstallPostgresqlOperator () error
20
20
DatabaseCreation (fileName string ) error
21
+ InstallKafkaOperator () error
21
22
}
22
23
23
24
type k8sSetUpImpl struct {
@@ -50,6 +51,23 @@ func (k k8sSetUpImpl) InstallPostgresqlOperator() error {
50
51
return nil
51
52
}
52
53
54
+ func (k k8sSetUpImpl ) InstallKafkaOperator () error {
55
+ log .Println ("Installing Kafka operator ..." )
56
+
57
+ if installed := k .isKafkaOperatorInstalled (); ! installed {
58
+ log .Println ("Kafka operator not installed ..." )
59
+ if err := k .doKafkaOperatorInstallation (); err != nil {
60
+ return fmt .Errorf ("error installing Kafka operator: %v" , err )
61
+ }
62
+ k .waiKafkaOperatorRunning ()
63
+
64
+ } else {
65
+ log .Println ("Kafka operator is installed ..." )
66
+ }
67
+
68
+ return nil
69
+ }
70
+
53
71
func (k k8sSetUpImpl ) waitPsqlOperatorRunning () {
54
72
cnt := true
55
73
for cnt {
@@ -59,24 +77,41 @@ func (k k8sSetUpImpl) waitPsqlOperatorRunning() {
59
77
log .Print ("Psql operator is running" )
60
78
}
61
79
62
- func (k k8sSetUpImpl ) isPsqlOperatorRunning () (running bool , err error ) {
63
- log .Printf ("Checking if postgres operator is already running ..." )
80
+ func (k k8sSetUpImpl ) waiKafkaOperatorRunning () {
81
+ cnt := true
82
+ for cnt {
83
+ ready , err := k .isKafkaOperatorRunning ()
84
+ cnt = ! (err == nil && ready )
85
+ }
86
+ log .Print ("Kafka operator is running" )
87
+ }
88
+
89
+ func (k k8sSetUpImpl ) isOperatorRunning (name , namespace string ) (running bool , err error ) {
90
+ log .Printf ("Checking if %s operator is already running ..." , name )
64
91
65
92
var podNames , output string
66
- if podNames , err = k .kubectl ("get" , "pod" , "-o" , "name" ); err == nil {
67
- for _ , name := range strings .Split (podNames , "\n " ) {
68
- if strings .Contains (name , "postgres-operator" ) {
69
- if output , err = k .kubectl ("get" , name , "-o" , "jsonpath='{.status.phase }'" ); err != nil {
93
+ if podNames , err = k .kubectl ("get" , "pod" , "-o" , "name" , "-n" , namespace ); err == nil {
94
+ for _ , podName := range strings .Split (podNames , "\n " ) {
95
+ if strings .Contains (podName , name ) {
96
+ if output , err = k .kubectl ("get" , podName , "-o" , "jsonpath='{.status.containerStatuses[0].ready }'" , "-n" , namespace ); err != nil {
70
97
return false , err
71
98
}
72
- running = output == "'Running '"
99
+ running = output == "'true '"
73
100
break
74
101
}
75
102
}
76
103
}
77
104
return
78
105
}
79
106
107
+ func (k k8sSetUpImpl ) isPsqlOperatorRunning () (bool , error ) {
108
+ return k .isOperatorRunning ("postgres-operator" , "default" )
109
+ }
110
+
111
+ func (k k8sSetUpImpl ) isKafkaOperatorRunning () (bool , error ) {
112
+ return k .isOperatorRunning ("strimzi-cluster-operator" , "kafka" )
113
+ }
114
+
80
115
func (k k8sSetUpImpl ) isPostgreSQLOperatorInstalled () bool {
81
116
log .Println ("Checking if postgresql operator is already installed ..." )
82
117
if _ , err := k .kubectl ("describe" , "service/postgres-operator" ); err != nil {
@@ -86,6 +121,15 @@ func (k k8sSetUpImpl) isPostgreSQLOperatorInstalled() bool {
86
121
return true
87
122
}
88
123
124
+ func (k k8sSetUpImpl ) isKafkaOperatorInstalled () bool {
125
+ log .Println ("Checking if kafka operator is already installed ..." )
126
+ if _ , err := k .kubectl ("describe" , "deployment.apps/strimzi-cluster-operator" , "-n" , "kafka" ); err != nil {
127
+ return false
128
+ }
129
+
130
+ return true
131
+ }
132
+
89
133
func (k * k8sSetUpImpl ) doPsqlOperatorInstallation () error {
90
134
log .Println ("Installing postgreSQL operator ..." )
91
135
dir , err := ioutil .TempDir ("" , "pets-go-infra" )
@@ -120,6 +164,18 @@ func (k *k8sSetUpImpl) doPsqlOperatorInstallation() error {
120
164
return nil
121
165
}
122
166
167
+ func (k * k8sSetUpImpl ) doKafkaOperatorInstallation () error {
168
+ log .Println ("Installing kafka operator ..." )
169
+ if _ , err := k .kubectl ("create" , "namespace" , "kafka" ); err != nil {
170
+ return fmt .Errorf ("error in kubectl create: %v" , err )
171
+ }
172
+ if _ , err := k .kubectl ("apply" , "-f" , "https://strimzi.io/install/latest?namespace=kafka" , "-n" , "kafka" ); err != nil {
173
+ return fmt .Errorf ("error in kubectl apply: %v" , err )
174
+ }
175
+
176
+ return nil
177
+ }
178
+
123
179
func (k k8sSetUpImpl ) kubectl (params ... string ) (output string , err error ) {
124
180
return k .executeCommand (k .kubectlPath , params ... )
125
181
}
0 commit comments