Skip to content

Commit 7cc55bd

Browse files
committed
First go.
1 parent d61e09c commit 7cc55bd

File tree

6 files changed

+285
-0
lines changed

6 files changed

+285
-0
lines changed

Dockerfile

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
FROM golang:1.16.4
2+
ENV GOOS=linux
3+
ENV GOARCH=amd64
4+
COPY ./ /build
5+
WORKDIR /build
6+
RUN go mod vendor && go build -o aws-ecs-eds main.go
7+
8+
FROM amazonlinux:2
9+
EXPOSE 5678
10+
WORKDIR /root/
11+
COPY --from=0 /build/aws-ecs-eds /opt
12+
CMD ["/opt/aws-ecs-eds"]

eds-config.yaml

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
admin:
2+
access_log_path: /tmp/admin_access.log
3+
address:
4+
socket_address: { address: 0.0.0.0, port_value: 9901 }
5+
6+
node:
7+
id: test-id
8+
cluster: test-cluster
9+
10+
static_resources:
11+
listeners:
12+
- name: listener_0
13+
address:
14+
socket_address: { address: 0.0.0.0, port_value: 8080 }
15+
filter_chains:
16+
- filters:
17+
- name: envoy.filters.network.http_connection_manager
18+
typed_config:
19+
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
20+
stat_prefix: http_proxy
21+
route_config:
22+
name: local_route
23+
virtual_hosts:
24+
- name: local_service
25+
domains: ["*"]
26+
routes:
27+
- match: { prefix: "/" }
28+
route: { cluster: web }
29+
http_filters:
30+
- name: envoy.filters.http.router
31+
clusters:
32+
- name: web
33+
connect_timeout: 10s
34+
type: EDS
35+
eds_cluster_config:
36+
service_name: lootlink-web
37+
eds_config:
38+
resourceApiVersion: V3
39+
api_config_source:
40+
api_type: GRPC
41+
transport_api_version: V3
42+
grpc_services:
43+
- envoy_grpc:
44+
cluster_name: xds_cluster
45+
- type: STATIC
46+
connect_timeout: 10s
47+
load_assignment:
48+
cluster_name: xds_cluster
49+
endpoints:
50+
- lb_endpoints:
51+
- endpoint:
52+
address:
53+
socket_address:
54+
address: 10.0.0.5
55+
port_value: 5678
56+
http2_protocol_options: {}
57+
name: xds_cluster

envoy.Dockerfile

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
FROM envoyproxy/envoy:v1.18.3
2+
EXPOSE 8080
3+
EXPOSE 9901
4+
COPY eds-config.yaml /etc/envoy/envoy.yaml

go.mod

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
module github.com/boostchicken/aws-ecs-eds
2+
3+
go 1.15
4+
5+
require (
6+
github.com/aws/aws-sdk-go-v2 v1.6.0
7+
github.com/aws/aws-sdk-go-v2/config v1.3.0
8+
github.com/aws/aws-sdk-go-v2/service/ecs v1.4.1
9+
github.com/envoyproxy/go-control-plane v0.9.9
10+
github.com/patrickmn/go-cache v2.1.0+incompatible
11+
github.com/sirupsen/logrus v1.7.0
12+
google.golang.org/grpc v1.36.0
13+
)

main.go

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"errors"
6+
"github.com/aws/aws-sdk-go-v2/aws"
7+
"github.com/aws/aws-sdk-go-v2/config"
8+
"github.com/aws/aws-sdk-go-v2/service/ecs"
9+
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
10+
endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
11+
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
12+
endpointservice "github.com/envoyproxy/go-control-plane/envoy/service/endpoint/v3"
13+
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
14+
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
15+
"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
16+
gocache "github.com/patrickmn/go-cache"
17+
log "github.com/sirupsen/logrus"
18+
"google.golang.org/grpc"
19+
"google.golang.org/grpc/reflection"
20+
"net"
21+
"os"
22+
"strconv"
23+
"time"
24+
)
25+
26+
type server struct {
27+
ecs *ecs.Client
28+
cache *gocache.Cache
29+
}
30+
31+
func init() {
32+
33+
// Log as JSON instead of the default ASCII formatter.
34+
log.SetFormatter(&log.TextFormatter{})
35+
36+
// Output to stdout instead of the default stderr
37+
// Can be any io.Writer, see below for File example
38+
log.SetOutput(os.Stdout)
39+
40+
// Only log the warning severity or above.
41+
log.SetLevel(log.InfoLevel)
42+
}
43+
44+
func (*server) receive(stream endpointservice.EndpointDiscoveryService_StreamEndpointsServer, reqChannel chan *discovery.DiscoveryRequest) {
45+
for {
46+
req, err := stream.Recv()
47+
if err != nil {
48+
log.Error("Error while receiving message from stream", err)
49+
return
50+
}
51+
52+
select {
53+
case reqChannel <- req:
54+
case <-stream.Context().Done():
55+
log.Error("Stream closed")
56+
return
57+
}
58+
}
59+
}
60+
61+
func (s *server) StreamEndpoints(stream endpointservice.EndpointDiscoveryService_StreamEndpointsServer) error {
62+
stop := make(chan struct{})
63+
reqChannel := make(chan *discovery.DiscoveryRequest, 1)
64+
go s.receive(stream, reqChannel)
65+
66+
for {
67+
select {
68+
case req, ok := <-reqChannel:
69+
if !ok {
70+
log.Error("Error receiving request")
71+
return errors.New("Error receiving request")
72+
}
73+
eds, cacheOk := s.cache.Get(req.ResourceNames[0])
74+
if !cacheOk {
75+
eds = s.generateEDS(req.ResourceNames[0])
76+
s.cache.Set(req.ResourceNames[0], eds, time.Minute*1)
77+
}
78+
response := cache.RawResponse{Version: req.VersionInfo,
79+
Resources: []types.ResourceWithTtl{{Resource: eds.(*endpoint.ClusterLoadAssignment)}},
80+
Request: &discovery.DiscoveryRequest{TypeUrl: resource.EndpointType}}
81+
cacheResp, err := response.GetDiscoveryResponse()
82+
err = stream.Send(cacheResp)
83+
if err != nil {
84+
log.Error("Error StreamingEndpoint ", err)
85+
return err
86+
}
87+
case <-stop:
88+
return nil
89+
}
90+
}
91+
}
92+
93+
func (s *server) DeltaEndpoints(stream endpointservice.EndpointDiscoveryService_DeltaEndpointsServer) error {
94+
log.Info("DeltaEndpoints service not implemented")
95+
return nil
96+
}
97+
98+
func (*server) FetchEndpoints(ctx context.Context, req *discovery.DiscoveryRequest) (*discovery.DiscoveryResponse, error) {
99+
log.Info("FetchEndpoints service not implemented")
100+
return nil, nil
101+
}
102+
103+
func (s *server) generateEDS(cluster string) *endpoint.ClusterLoadAssignment {
104+
var lbEndpoints = make([]*endpoint.LbEndpoint, 0)
105+
106+
s.getTaskIps(&lbEndpoints, cluster, nil)
107+
108+
ret := &endpoint.ClusterLoadAssignment{
109+
ClusterName: cluster,
110+
Endpoints: []*endpoint.LocalityLbEndpoints{
111+
{
112+
LbEndpoints: lbEndpoints,
113+
},
114+
},
115+
}
116+
117+
return ret
118+
}
119+
120+
func (s *server) getTaskIps(lbEndpoints *[]*endpoint.LbEndpoint, cluster string, nextToken *string) {
121+
taskArns, err := s.ecs.ListTasks(context.Background(), &ecs.ListTasksInput{Cluster: aws.String(cluster), NextToken: nextToken})
122+
if err != nil {
123+
log.Error("Error listing AWS tasks ", err)
124+
return
125+
}
126+
tasks, err := s.ecs.DescribeTasks(context.Background(), &ecs.DescribeTasksInput{
127+
Tasks: taskArns.TaskArns, Cluster: aws.String(cluster),
128+
})
129+
if err != nil {
130+
log.Error("Error Describing AWS tasks ", err)
131+
return
132+
}
133+
port, err := strconv.Atoi(os.Getenv(cluster + "_port"))
134+
if err != nil {
135+
port = 80
136+
}
137+
for _, task := range tasks.Tasks {
138+
for _, attachment := range task.Attachments {
139+
for _, details := range attachment.Details {
140+
if aws.ToString(details.Name) == "privateIPv4Address" {
141+
*lbEndpoints = append(*lbEndpoints, &endpoint.LbEndpoint{HostIdentifier: &endpoint.LbEndpoint_Endpoint{
142+
Endpoint: &endpoint.Endpoint{
143+
Address: &core.Address{
144+
Address: &core.Address_SocketAddress{
145+
SocketAddress: &core.SocketAddress{
146+
Address: aws.ToString(details.Value),
147+
PortSpecifier: &core.SocketAddress_PortValue{
148+
PortValue: uint32(port),
149+
},
150+
},
151+
},
152+
},
153+
},
154+
},
155+
})
156+
}
157+
}
158+
}
159+
}
160+
if taskArns.NextToken != nil {
161+
s.getTaskIps(lbEndpoints, cluster, taskArns.NextToken)
162+
}
163+
}
164+
func main() {
165+
grpcServer := grpc.NewServer()
166+
edsListen := os.Getenv("EDS_LISTEN")
167+
if edsListen == "" {
168+
edsListen = "0.0.0.0:5678"
169+
}
170+
lis, err := net.Listen("tcp", edsListen)
171+
if err != nil {
172+
log.Error(err)
173+
}
174+
175+
cfg, _ := config.LoadDefaultConfig(context.Background())
176+
endpointservice.RegisterEndpointDiscoveryServiceServer(grpcServer, &server{ecs: ecs.NewFromConfig(cfg), cache: gocache.New(time.Minute*1, time.Minute*1)})
177+
178+
reflection.Register(grpcServer)
179+
180+
log.Infof("management server listening on %d", 5678)
181+
if err = grpcServer.Serve(lis); err != nil {
182+
log.Error(err)
183+
}
184+
}

main_test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"github.com/aws/aws-sdk-go-v2/config"
6+
"github.com/aws/aws-sdk-go-v2/service/ecs"
7+
"testing"
8+
)
9+
10+
func TestGenerateEds(t *testing.T) {
11+
cfg, _ := config.LoadDefaultConfig(context.Background(), config.WithRegion("us-east-1"))
12+
s := &server{ecs: ecs.NewFromConfig(cfg)}
13+
ret := s.generateEDS("lootlink-web")
14+
t.Log(ret)
15+
}

0 commit comments

Comments
 (0)