Skip to content

Commit e07f5b2

Browse files
fix(accountingservice): fix graceful shutdown and log issues (#1401)
Co-authored-by: Pierre Tessier <pierre@pierretessier.com>
1 parent cf7bac7 commit e07f5b2

File tree

4 files changed

+41
-26
lines changed

4 files changed

+41
-26
lines changed

src/accountingservice/go.mod

+5-5
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@ module github.com/open-telemetry/opentelemetry-demo/src/accountingservice
33
go 1.22
44

55
require (
6-
github.com/IBM/sarama v1.42.1
6+
github.com/IBM/sarama v1.42.2
77
github.com/sirupsen/logrus v1.9.3
88
go.opentelemetry.io/otel v1.23.1
99
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.23.1
1010
go.opentelemetry.io/otel/sdk v1.23.1
1111
go.opentelemetry.io/otel/trace v1.23.1
12-
google.golang.org/grpc v1.61.0
12+
google.golang.org/grpc v1.61.1
1313
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.3.0
1414
google.golang.org/protobuf v1.32.0
1515
)
@@ -39,9 +39,9 @@ require (
3939
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.23.1 // indirect
4040
go.opentelemetry.io/otel/metric v1.23.1 // indirect
4141
go.opentelemetry.io/proto/otlp v1.1.0 // indirect
42-
golang.org/x/crypto v0.18.0 // indirect
43-
golang.org/x/net v0.20.0 // indirect
44-
golang.org/x/sys v0.16.0 // indirect
42+
golang.org/x/crypto v0.19.0 // indirect
43+
golang.org/x/net v0.21.0 // indirect
44+
golang.org/x/sys v0.17.0 // indirect
4545
golang.org/x/text v0.14.0 // indirect
4646
google.golang.org/genproto/googleapis/api v0.0.0-20240125205218-1f4bbc51befe // indirect
4747
google.golang.org/genproto/googleapis/rpc v0.0.0-20240125205218-1f4bbc51befe // indirect

src/accountingservice/go.sum

+12-12
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
github.com/IBM/sarama v1.42.1 h1:wugyWa15TDEHh2kvq2gAy1IHLjEjuYOYgXz/ruC/OSQ=
2-
github.com/IBM/sarama v1.42.1/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ=
1+
github.com/IBM/sarama v1.42.2 h1:VoY4hVIZ+WQJ8G9KNY/SQlWguBQXQ9uvFPOnrcu8hEw=
2+
github.com/IBM/sarama v1.42.2/go.mod h1:FLPGUGwYqEs62hq2bVG6Io2+5n+pS6s/WOXVKWSLFtE=
33
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
44
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
55
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -90,30 +90,30 @@ go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
9090
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
9191
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
9292
golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
93-
golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc=
94-
golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
93+
golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo=
94+
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
9595
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
9696
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
9797
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
9898
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
9999
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
100100
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
101101
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
102-
golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo=
103-
golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY=
102+
golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4=
103+
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
104104
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
105105
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
106-
golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE=
107-
golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
106+
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
107+
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
108108
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
109109
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
110110
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
111111
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
112112
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
113113
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
114114
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
115-
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
116-
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
115+
golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y=
116+
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
117117
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
118118
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
119119
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
@@ -134,8 +134,8 @@ google.golang.org/genproto/googleapis/api v0.0.0-20240125205218-1f4bbc51befe h1:
134134
google.golang.org/genproto/googleapis/api v0.0.0-20240125205218-1f4bbc51befe/go.mod h1:4jWUdICTdgc3Ibxmr8nAJiiLHwQBY0UI0XZcEMaFKaA=
135135
google.golang.org/genproto/googleapis/rpc v0.0.0-20240125205218-1f4bbc51befe h1:bQnxqljG/wqi4NTXu2+DJ3n7APcEA882QZ1JvhQAq9o=
136136
google.golang.org/genproto/googleapis/rpc v0.0.0-20240125205218-1f4bbc51befe/go.mod h1:PAREbraiVEVGVdTZsVWjSbbTtSyGbAgIIvni8a8CD5s=
137-
google.golang.org/grpc v1.61.0 h1:TOvOcuXn30kRao+gfcvsebNEa5iZIiLkisYEkf7R7o0=
138-
google.golang.org/grpc v1.61.0/go.mod h1:VUbo7IFqmF1QtCAstipjG0GIoq49KvMe9+h1jFLBNJs=
137+
google.golang.org/grpc v1.61.1 h1:kLAiWrZs7YeDM6MumDe7m3y4aM6wacLzM1Y/wiLP9XY=
138+
google.golang.org/grpc v1.61.1/go.mod h1:VUbo7IFqmF1QtCAstipjG0GIoq49KvMe9+h1jFLBNJs=
139139
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.3.0 h1:rNBFJjBCOgVr9pWD7rs/knKL4FRTKgpZmsRfV214zcA=
140140
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.3.0/go.mod h1:Dk1tviKTvMCz5tvh7t+fh94dhmQVHuCt2OzJB3CTW9Y=
141141
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=

src/accountingservice/kafka/consumer.go

+10-7
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@ package kafka
44

55
import (
66
"context"
7-
"log"
8-
97
pb "github.com/open-telemetry/opentelemetry-demo/src/accountingservice/genproto/oteldemo"
108

119
"github.com/IBM/sarama"
@@ -19,7 +17,7 @@ var (
1917
GroupID = "accountingservice"
2018
)
2119

22-
func StartConsumerGroup(ctx context.Context, brokers []string, log *logrus.Logger) error {
20+
func StartConsumerGroup(ctx context.Context, brokers []string, log *logrus.Logger) (sarama.ConsumerGroup, error) {
2321
saramaConfig := sarama.NewConfig()
2422
saramaConfig.Version = ProtocolVersion
2523
// So we can know the partition and offset of messages.
@@ -28,7 +26,7 @@ func StartConsumerGroup(ctx context.Context, brokers []string, log *logrus.Logge
2826

2927
consumerGroup, err := sarama.NewConsumerGroup(brokers, GroupID, saramaConfig)
3028
if err != nil {
31-
return err
29+
return nil, err
3230
}
3331

3432
handler := groupHandler{
@@ -37,9 +35,10 @@ func StartConsumerGroup(ctx context.Context, brokers []string, log *logrus.Logge
3735

3836
err = consumerGroup.Consume(ctx, []string{Topic}, &handler)
3937
if err != nil {
40-
return err
38+
return nil, err
4139
}
42-
return nil
40+
41+
return consumerGroup, nil
4342
}
4443

4544
type groupHandler struct {
@@ -64,7 +63,11 @@ func (g *groupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim s
6463
return err
6564
}
6665

67-
log.Printf("Message claimed: orderId = %s, timestamp = %v, topic = %s", orderResult.OrderId, message.Timestamp, message.Topic)
66+
g.log.WithFields(logrus.Fields{
67+
"orderId": orderResult.OrderId,
68+
"messageTimestamp": message.Timestamp,
69+
"messageTopic": message.Topic,
70+
}).Info("Message claimed")
6871
session.MarkMessage(message, "")
6972

7073
case <-session.Context().Done():

src/accountingservice/main.go

+14-2
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@ import (
1313
"os/signal"
1414
"strings"
1515
"sync"
16+
"syscall"
1617
"time"
1718

19+
"github.com/IBM/sarama"
1820
"github.com/sirupsen/logrus"
1921
"go.opentelemetry.io/otel"
2022
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
@@ -85,6 +87,7 @@ func main() {
8587
if err := tp.Shutdown(context.Background()); err != nil {
8688
log.Printf("Error shutting down tracer provider: %v", err)
8789
}
90+
log.Println("Shutdown trace provider")
8891
}()
8992

9093
var brokers string
@@ -93,13 +96,22 @@ func main() {
9396
brokerList := strings.Split(brokers, ",")
9497
log.Printf("Kafka brokers: %s", strings.Join(brokerList, ", "))
9598

96-
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
99+
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM, syscall.SIGKILL)
97100
defer cancel()
98-
if err := kafka.StartConsumerGroup(ctx, brokerList, log); err != nil {
101+
var consumerGroup sarama.ConsumerGroup
102+
if consumerGroup, err = kafka.StartConsumerGroup(ctx, brokerList, log); err != nil {
99103
log.Fatal(err)
100104
}
105+
defer func() {
106+
if err := consumerGroup.Close(); err != nil {
107+
log.Printf("Error closing consumer group: %v", err)
108+
}
109+
log.Println("Closed consumer group")
110+
}()
101111

102112
<-ctx.Done()
113+
114+
log.Println("Accounting service exited")
103115
}
104116

105117
func mustMapEnv(target *string, envKey string) {

0 commit comments

Comments
 (0)