Skip to content

Commit 1339403

Browse files
committed
Adding association between a Kafka cluster and a notifier
Signed-off-by: muicoder <muicoder@gmail.com> linkedin#611 Adding WeCom/DingTalk template
1 parent e593345 commit 1339403

28 files changed

+752
-25
lines changed

.github/workflows/action.yaml

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
env:
2+
BASE64manifest: IyEvYmluL3NoCgpDTUQ9JChpZiBidWlsZGFoID4vZGV2L251bGw7IHRoZW4gZWNobyBidWlsZGFoOyBlbGlmIHNlYWxvcyA+L2Rldi9udWxsOyB0aGVuIGVjaG8gc2VhbG9zOyBmaSkKTUY9Im1mOiQoZGF0ZSArJUYpIgoKUkVQTz0iJHsxOi1kb2NrZXIuaW8vYml0bmFtaS9tZXRyaWNzLXNlcnZlcjpkb2NrZXIuaW8vbXVpY29kZXIvbWV0cmljcy1zZXJ2ZXJ9IgpUQUdTPSIkezI6LTAuNi4zfSIKVEFHPSIkezM6LSRUQUdTfSIKCmlmIFsgIiR7UkVQTyU6Kn0iICE9ICIkUkVQTyIgXTsgdGhlbgogIGlmIFsgIiR7VEFHUyUsKn0iICE9ICIkVEFHUyIgXTsgdGhlbgogICAgZWNobyAiJFRBR1MiIHwgc2VkICJzfix+XG5+ZyIgfCB3aGlsZSByZWFkIC1yIHRhZzsgZG8KICAgICAgZWNobyAiJHtSRVBPJToqfTokdGFnIgogICAgZG9uZSB8ICRDTUQgcHVsbAogIGVsc2UKICAgICRDTUQgcHVsbCAtLXBvbGljeT1hbHdheXMgLS1wbGF0Zm9ybT1saW51eC9hbWQ2NCAiJHtSRVBPJToqfTokVEFHUyIKICAgICRDTUQgdGFnICIke1JFUE8lOip9OiRUQUdTIiAiJHtSRVBPIyo6fTokVEFHUy1hbWQ2NCIKICAgICRDTUQgcHVsbCAtLXBvbGljeT1hbHdheXMgLS1wbGF0Zm9ybT1saW51eC9hcm02NCAiJHtSRVBPJToqfTokVEFHUyIKICAgICRDTUQgdGFnICIke1JFUE8lOip9OiRUQUdTIiAiJHtSRVBPIyo6fTokVEFHUy1hcm02NCIKICAgIFRBR1M9IiRUQUdTLWFtZDY0LCRUQUdTLWFybTY0IgogIGZpCmZpCgplY2hvICIkVEFHUyIgfCBzZWQgInN+LH5cbn5nIiB8IHdoaWxlIHJlYWQgLXIgdGFnOyBkbwogIGVjaG8gIiR7UkVQTyMqOn06JHRhZyIKZG9uZSB8IHhhcmdzICRDTUQgbWFuaWZlc3QgY3JlYXRlIC0tYWxsICIkTUYiCiRDTUQgbWFuaWZlc3QgcHVzaCAtLWFsbCAiJE1GIiAiZG9ja2VyOi8vJHtSRVBPIyo6fTokVEFHIgokQ01EIG1hbmlmZXN0IHJtICIkTUYiIHx8IHRydWUK
3+
jobs:
4+
aio-manifest:
5+
needs:
6+
- build
7+
runs-on: ubuntu-latest
8+
steps:
9+
- name: Login to DockerHub
10+
uses: docker/login-action@v2
11+
with:
12+
password: ${{ secrets.DOCKERHUB_PASSWORD }}
13+
username: ${{ secrets.DOCKERHUB_USERNAME }}
14+
- name: manifest
15+
run: echo ${{ env.BASE64manifest }} | base64 -d | sh -s docker.io/${{ secrets.DOCKERHUB_USERNAME }}/burrow action-amd64,action-arm64 action
16+
build:
17+
runs-on: ubuntu-latest
18+
steps:
19+
- name: Checkout
20+
uses: actions/checkout@v3
21+
- name: Set up QEMU
22+
uses: docker/setup-qemu-action@v2
23+
- name: Set up Docker Buildx
24+
uses: docker/setup-buildx-action@v2
25+
- name: Login to DockerHub
26+
uses: docker/login-action@v2
27+
with:
28+
password: ${{ secrets.DOCKERHUB_PASSWORD }}
29+
username: ${{ secrets.DOCKERHUB_USERNAME }}
30+
- name: Build and push
31+
uses: docker/build-push-action@v3
32+
with:
33+
context: .
34+
file: Dockerfile.${{ matrix.arch}}
35+
platforms: ${{ matrix.os}}/${{ matrix.arch}}
36+
provenance: false
37+
pull: true
38+
push: true
39+
sbom: false
40+
tags: ${{ secrets.DOCKERHUB_USERNAME }}/burrow:action-${{ matrix.arch}}
41+
strategy:
42+
matrix:
43+
arch:
44+
- arm64
45+
- amd64
46+
os:
47+
- linux
48+
name: build
49+
on:
50+
workflow_dispatch:

.github/workflows/ci.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
name: CI
22

3-
on: [push, pull_request]
3+
on: [pull_request]
44

55
jobs:
66
test:

Dockerfile.amd64

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
FROM quay.io/coreos/etcd:v3.3.27 as etcd
2+
FROM quay.io/coreos/zetcd:v0.0.5 as zetcd
3+
FROM edenhill/kcat:1.7.1 as kcat
4+
5+
FROM golang:1.20-alpine as builder
6+
ARG git_user=muicoder
7+
ARG git_repo=Burrow
8+
ARG git_branch=action
9+
ENV CGO_ENABLED=0
10+
RUN set -ex && \
11+
wget -qO- https://github.com/$git_user/$git_repo/archive/refs/heads/$git_branch.tar.gz | tar -xz && \
12+
cd $git_repo-$git_branch && \
13+
go get -u all && go mod verify && go mod tidy && \
14+
go build -trimpath -ldflags '-s -w -extldflags "-static"' -o $GOPATH/bin/burrow && \
15+
go install -ldflags '-s -w -extldflags "-static"' github.com/etcd-io/zetcd/cmd/zkboom@latest && \
16+
go install -ldflags '-s -w -extldflags "-static"' github.com/etcd-io/zetcd/cmd/zkctl@latest && \
17+
ls -lh $GOPATH/bin
18+
19+
FROM alpine:edge as cached
20+
COPY --from=etcd /usr/local/bin/etcd* /cached/
21+
COPY --from=zetcd /usr/local/bin/zetcd* /cached/
22+
COPY --from=builder /go/bin/* /cached/
23+
COPY --from=kcat /usr/bin/kcat /cached/
24+
# kcat on libcrypto1.1+libssl1.1
25+
FROM alpine:3.16
26+
RUN apk add --no-cache curl jq wget tzdata libcurl lz4-libs zstd-libs ca-certificates openssl
27+
COPY --from=cached /cached /usr/local/bin/

Dockerfile.arm64

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
FROM quay.io/coreos/etcd:v3.3.27-arm64 as etcd
2+
FROM kbzjung359/zetcd:v0.0.5-alpine-arm64 as zetcd
3+
4+
FROM golang:1.20-alpine as builder
5+
ARG git_user=muicoder
6+
ARG git_repo=Burrow
7+
ARG git_branch=action
8+
ENV CGO_ENABLED=0
9+
RUN set -ex && \
10+
wget -qO- https://github.com/$git_user/$git_repo/archive/refs/heads/$git_branch.tar.gz | tar -xz && \
11+
cd $git_repo-$git_branch && \
12+
go get -u all && go mod verify && go mod tidy && \
13+
go build -trimpath -ldflags '-s -w -extldflags "-static"' -o $GOPATH/bin/burrow && \
14+
go install -ldflags '-s -w -extldflags "-static"' github.com/etcd-io/zetcd/cmd/zkboom@latest && \
15+
go install -ldflags '-s -w -extldflags "-static"' github.com/etcd-io/zetcd/cmd/zkctl@latest && \
16+
ls -lh $GOPATH/bin
17+
18+
FROM alpine:edge as cached
19+
COPY --from=etcd /usr/local/bin/etcd* /cached/
20+
COPY --from=zetcd /usr/local/bin/zetcd* /cached/
21+
COPY --from=builder /go/bin/* /cached/
22+
# kcat on libcrypto1.1+libssl1.1
23+
FROM alpine:3.16
24+
RUN apk add --no-cache curl jq wget tzdata libcurl lz4-libs zstd-libs ca-certificates openssl
25+
COPY --from=cached /cached /usr/local/bin/

config/burrow.toml

+1
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ min-distance=1
5858

5959
[notifier.default]
6060
class-name="http"
61+
cluster="local"
6162
url-open="http://someservice.example.com:1467/v1/event"
6263
interval=60
6364
timeout=5

config/default-dingtalk-post.tmpl

+52
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
{"msgtype": "markdown","markdown": {"title":"Kafka LagChecker", "text": "
2+
{{- $StatusURL := "https://pkg.go.dev/github.com/linkedin/Burrow/core/protocol#StatusConstant"}}
3+
{{- $FormatString := "2006-01-02 15:04:05"}}
4+
# Kafka: {{.Cluster}}
5+
ConsumerGroup: {{.Group}}{{- with .Result.Status}}
6+
{{- if eq . 0}}NotFound{{end}}
7+
{{- if eq . 1}}normal{{end}}
8+
{{- if eq . 2}}lagging{{end}}
9+
{{- if eq . 3}}abnormal{{end}}
10+
{{- end}}
11+
**Status:** Total(Partitions={{.Result.TotalPartitions}},Lag={{.Result.TotalLag}})[{{- with .Result.Status}}
12+
{{- if eq . 0}}NotFound{{end}}
13+
{{- if eq . 1}}{{.}}{{end}}
14+
{{- if eq . 2}}{{.}}{{end}}
15+
{{- if eq . 3}}{{.}}{{end}}
16+
{{- end}}]({{$StatusURL}}){{printf "%.2f" .Result.Complete}}
17+
{{- if .Result.Maxlag|maxlag}}
18+
**MaxLagDetails:**
19+
{{- with .Result.Maxlag}}
20+
{{.Topic}}[{{.Status.String}}](){{printf "%.2f" .Complete}}
21+
{{- if .Owner}}\n\tConsumerHost={{.Owner}}{{end}}
22+
{{- if .ClientID}}\n\tConsumerClientID={{.ClientID}}{{end}}
23+
\tCurrentLag={{.CurrentLag}}
24+
\tPartition={{.Partition}}
25+
{{- end}}
26+
{{- end}}
27+
{{- $TotalErrors := len .Result.Partitions}}
28+
{{- if $TotalErrors}}
29+
### {{$TotalErrors}} partitions have problems
30+
>**CountPartitions:**
31+
{{- range $k,$v := .Result.Partitions|partitioncounts}}
32+
{{- if ne $v 0}}\n\t{{$k}}={{$v}}{{end}}
33+
{{- end}}
34+
**TopicsByStatus:**
35+
{{- range $k,$v := .Result.Partitions|topicsbystatus}}
36+
\t{{$k}}={{$v}}
37+
{{- end}}
38+
**PartitionDetails:**
39+
{{- range .Result.Partitions}}
40+
{{.Topic}}[{{.Status.String}}](){{printf "%.2f" .Complete}}
41+
{{- if .Owner}}\n\tConsumerHost={{.Owner}}{{end}}
42+
{{- if .ClientID}}\n\tConsumerClientID={{.ClientID}}{{end}}
43+
\tCurrentLag={{.CurrentLag}}
44+
\tPartition={{.Partition}}
45+
\tStart={{formattimestamp .Start.Timestamp $FormatString}}
46+
\t\tOffset={{.Start.Offset}}\tLag={{.Start.Lag.Value}}
47+
\tEnd={{formattimestamp .End.Timestamp $FormatString}}
48+
\t\tOffset={{.End.Offset}}\tLag={{.End.Lag.Value}}
49+
{{- end}}
50+
{{- end}}
51+
"
52+
}}

config/default-wecom-post.tmpl

+52
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
{"msgtype": "markdown","markdown": {"content": "
2+
{{- $StatusURL := "https://pkg.go.dev/github.com/linkedin/Burrow/core/protocol#StatusConstant"}}
3+
{{- $FormatString := "2006-01-02 15:04:05"}}
4+
# Kafka: {{.Cluster}}
5+
ConsumerGroup: {{.Group}}{{- with .Result.Status}}
6+
{{- if eq . 0}}NotFound{{end}}
7+
{{- if eq . 1}}<font color=\"info\">normal</font>{{end}}
8+
{{- if eq . 2}}<font color=\"warning\">lagging</font>{{end}}
9+
{{- if eq . 3}}<font color=\"comment\">abnormal</font>{{end}}
10+
{{- end}}
11+
**Status:** Total(Partitions={{.Result.TotalPartitions}},Lag={{.Result.TotalLag}})[{{- with .Result.Status}}
12+
{{- if eq . 0}}NotFound{{end}}
13+
{{- if eq . 1}}<font color=\"info\">{{.}}</font>{{end}}
14+
{{- if eq . 2}}<font color=\"warning\">{{.}}</font>{{end}}
15+
{{- if eq . 3}}<font color=\"comment\">{{.}}</font>{{end}}
16+
{{- end}}]({{$StatusURL}}){{printf "%.2f" .Result.Complete}}
17+
{{- if .Result.Maxlag|maxlag}}
18+
**MaxLagDetails:**
19+
{{- with .Result.Maxlag}}
20+
{{.Topic}}[{{.Status.String}}](){{printf "%.2f" .Complete}}
21+
{{- if .Owner}}\n\tConsumerHost={{.Owner}}{{end}}
22+
{{- if .ClientID}}\n\tConsumerClientID={{.ClientID}}{{end}}
23+
\tCurrentLag={{.CurrentLag}}
24+
\tPartition={{.Partition}}
25+
{{- end}}
26+
{{- end}}
27+
{{- $TotalErrors := len .Result.Partitions}}
28+
{{- if $TotalErrors}}
29+
### <font color=\"comment\">{{$TotalErrors}} partitions have problems</font>
30+
>**CountPartitions:**
31+
{{- range $k,$v := .Result.Partitions|partitioncounts}}
32+
{{- if ne $v 0}}\n\t{{$k}}={{$v}}{{end}}
33+
{{- end}}
34+
**TopicsByStatus:**
35+
{{- range $k,$v := .Result.Partitions|topicsbystatus}}
36+
\t{{$k}}={{$v}}
37+
{{- end}}
38+
**PartitionDetails:**
39+
{{- range .Result.Partitions}}
40+
{{.Topic}}[{{.Status.String}}](){{printf "%.2f" .Complete}}
41+
{{- if .Owner}}\n\tConsumerHost={{.Owner}}{{end}}
42+
{{- if .ClientID}}\n\tConsumerClientID={{.ClientID}}{{end}}
43+
\tCurrentLag={{.CurrentLag}}
44+
\tPartition={{.Partition}}
45+
\tStart={{formattimestamp .Start.Timestamp $FormatString}}
46+
\t\tOffset={{.Start.Offset}}\tLag={{.Start.Lag.Value}}
47+
\tEnd={{formattimestamp .End.Timestamp $FormatString}}
48+
\t\tOffset={{.End.Offset}}\tLag={{.End.Lag.Value}}
49+
{{- end}}
50+
{{- end}}
51+
"
52+
}}

core/internal/helpers/coordinators.go

+6
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,12 @@ func (m *MockModule) GetName() string {
7474
return args.String(0)
7575
}
7676

77+
// GetCluster mocks the notifier.Module GetCluster func
78+
func (m *MockModule) GetCluster() string {
79+
args := m.Called()
80+
return args.String(0)
81+
}
82+
7783
// GetGroupAllowlist mocks the notifier.Module GetGroupAllowlist func
7884
func (m *MockModule) GetGroupAllowlist() *regexp.Regexp {
7985
args := m.Called()

core/internal/httpserver/config.go

+2
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ func (hc *Coordinator) configNotifierHTTP(w http.ResponseWriter, r *http.Request
213213
SendClose: viper.GetBool(configRoot + ".send-close"),
214214
ExtraCa: viper.GetString(configRoot + ".extra-ca"),
215215
NoVerify: viper.GetString(configRoot + ".noverify"),
216+
Cluster: viper.GetString(configRoot + ".cluster"),
216217
},
217218
Request: requestInfo,
218219
})
@@ -265,6 +266,7 @@ func (hc *Coordinator) configNotifierEmail(w http.ResponseWriter, r *http.Reques
265266
To: viper.GetString(configRoot + ".to"),
266267
ExtraCa: viper.GetString(configRoot + ".extra-ca"),
267268
NoVerify: viper.GetString(configRoot + ".noverify"),
269+
Cluster: viper.GetString(configRoot + ".cluster"),
268270
},
269271
Request: requestInfo,
270272
})

core/internal/httpserver/structs.go

+2
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,7 @@ type httpResponseConfigModuleNotifierHTTP struct {
202202
SendClose bool `json:"send-close"`
203203
ExtraCa string `json:"extra-ca"`
204204
NoVerify string `json:"noverify"`
205+
Cluster string `json:"cluster"`
205206
}
206207

207208
type httpResponseConfigModuleNotifierSlack struct {
@@ -238,6 +239,7 @@ type httpResponseConfigModuleNotifierEmail struct {
238239
To string `json:"to"`
239240
ExtraCa string `json:"extra-ca"`
240241
NoVerify string `json:"noverify"`
242+
Cluster string `json:"cluster"`
241243
}
242244

243245
type httpResponseConfigModuleNotifierNull struct {

core/internal/notifier/coordinator.go

+11-2
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ import (
4848
type Module interface {
4949
protocol.Module
5050
GetName() string
51+
GetCluster() string
5152
GetGroupAllowlist() *regexp.Regexp
5253
GetGroupDenylist() *regexp.Regexp
5354
GetLogger() *zap.Logger
@@ -95,7 +96,7 @@ type Coordinator struct {
9596

9697
// getModuleForClass returns the correct module based on the passed className. As part of the Configure steps, if there
9798
// is any error, it will panic with an appropriate message describing the problem.
98-
func getModuleForClass(app *protocol.ApplicationContext, moduleName, className string, groupAllowlist, groupDenylist *regexp.Regexp, extras map[string]string, templateOpen, templateClose *template.Template) protocol.Module {
99+
func getModuleForClass(app *protocol.ApplicationContext, moduleName, className string, groupAllowlist, groupDenylist *regexp.Regexp, extras map[string]string, templateOpen, templateClose *template.Template, cluster string) protocol.Module {
99100
logger := app.Logger.With(
100101
zap.String("type", "module"),
101102
zap.String("coordinator", "notifier"),
@@ -113,6 +114,7 @@ func getModuleForClass(app *protocol.ApplicationContext, moduleName, className s
113114
extras: extras,
114115
templateOpen: templateOpen,
115116
templateClose: templateClose,
117+
cluster: cluster,
116118
}
117119
case "email":
118120
return &EmailNotifier{
@@ -123,6 +125,7 @@ func getModuleForClass(app *protocol.ApplicationContext, moduleName, className s
123125
extras: extras,
124126
templateOpen: templateOpen,
125127
templateClose: templateClose,
128+
cluster: cluster,
126129
}
127130
case "null":
128131
return &NullNotifier{
@@ -133,6 +136,7 @@ func getModuleForClass(app *protocol.ApplicationContext, moduleName, className s
133136
extras: extras,
134137
templateOpen: templateOpen,
135138
templateClose: templateClose,
139+
cluster: cluster,
136140
}
137141
default:
138142
panic("Unknown notifier className provided: " + className)
@@ -194,6 +198,8 @@ func (nc *Coordinator) Configure() {
194198
groupAllowlist = re
195199
}
196200

201+
cluster := viper.GetString(configRoot + ".cluster")
202+
197203
// Compile the denylist for the consumer groups to not notify for
198204
var groupDenylist *regexp.Regexp
199205
denylist := viper.GetString(configRoot + ".group-denylist")
@@ -227,7 +233,7 @@ func (nc *Coordinator) Configure() {
227233
templateClose = tmpl.Templates()[0]
228234
}
229235

230-
module := getModuleForClass(nc.App, name, viper.GetString(configRoot+".class-name"), groupAllowlist, groupDenylist, extras, templateOpen, templateClose)
236+
module := getModuleForClass(nc.App, name, viper.GetString(configRoot+".class-name"), groupAllowlist, groupDenylist, extras, templateOpen, templateClose, cluster)
231237
module.Configure(name, configRoot)
232238
nc.modules[name] = module
233239
interval := viper.GetInt64(configRoot + ".interval")
@@ -436,6 +442,9 @@ func (nc *Coordinator) checkAndSendResponseToModules(response *protocol.Consumer
436442
for _, genericModule := range nc.modules {
437443
module := genericModule.(Module)
438444

445+
if module.GetCluster() != "" && response.Cluster != module.GetCluster() {
446+
continue
447+
}
439448
// No allowlist means everything passes
440449
groupAllowlist := module.GetGroupAllowlist()
441450
groupDenylist := module.GetGroupDenylist()

0 commit comments

Comments
 (0)