Skip to content

Commit eb50ec2

Browse files
authored
fix: fix build issues with convoy-ee (#1540)
1 parent b180425 commit eb50ec2

File tree

3 files changed

+396
-372
lines changed

3 files changed

+396
-372
lines changed

cmd/hooks/hooks.go

Lines changed: 379 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,379 @@
1+
package hooks
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"os"
8+
"time"
9+
10+
"github.com/frain-dev/convoy"
11+
"github.com/frain-dev/convoy/cache"
12+
"github.com/frain-dev/convoy/config"
13+
"github.com/frain-dev/convoy/database"
14+
"github.com/frain-dev/convoy/database/postgres"
15+
"github.com/frain-dev/convoy/datastore"
16+
"github.com/frain-dev/convoy/internal/pkg/apm"
17+
"github.com/frain-dev/convoy/internal/pkg/cli"
18+
"github.com/frain-dev/convoy/internal/pkg/rdb"
19+
"github.com/frain-dev/convoy/internal/pkg/searcher"
20+
"github.com/frain-dev/convoy/limiter"
21+
"github.com/frain-dev/convoy/pkg/log"
22+
"github.com/frain-dev/convoy/queue"
23+
redisqueue "github.com/frain-dev/convoy/queue/redis"
24+
"github.com/frain-dev/convoy/tracer"
25+
"github.com/frain-dev/convoy/util"
26+
"github.com/newrelic/go-agent/v3/newrelic"
27+
"github.com/oklog/ulid/v2"
28+
"github.com/spf13/cobra"
29+
"gopkg.in/guregu/null.v4"
30+
)
31+
32+
func PreRun(app *cli.App, db *postgres.Postgres) func(cmd *cobra.Command, args []string) error {
33+
return func(cmd *cobra.Command, args []string) error {
34+
cfgPath, err := cmd.Flags().GetString("config")
35+
if err != nil {
36+
return err
37+
}
38+
39+
err = config.LoadConfig(cfgPath)
40+
if err != nil {
41+
return err
42+
}
43+
44+
cfg, err := config.Get()
45+
if err != nil {
46+
return err
47+
}
48+
49+
// Override with CLI Flags
50+
cliConfig, err := buildCliConfiguration(cmd)
51+
if err != nil {
52+
return err
53+
}
54+
55+
if err = config.Override(cliConfig); err != nil {
56+
return err
57+
}
58+
59+
nwCfg := cfg.Tracer.NewRelic
60+
nRApp, err := newrelic.NewApplication(
61+
newrelic.ConfigAppName(nwCfg.AppName),
62+
newrelic.ConfigLicense(nwCfg.LicenseKey),
63+
newrelic.ConfigDistributedTracerEnabled(nwCfg.DistributedTracerEnabled),
64+
newrelic.ConfigEnabled(nwCfg.ConfigEnabled),
65+
)
66+
if err != nil {
67+
return err
68+
}
69+
70+
apm.SetApplication(nRApp)
71+
72+
var tr tracer.Tracer
73+
var ca cache.Cache
74+
var li limiter.RateLimiter
75+
var q queue.Queuer
76+
77+
if cfg.Queue.Type == config.RedisQueueProvider {
78+
rdb, err := rdb.NewClient(cfg.Queue.Redis.Dsn)
79+
if err != nil {
80+
return err
81+
}
82+
queueNames := map[string]int{
83+
string(convoy.EventQueue): 3,
84+
string(convoy.CreateEventQueue): 3,
85+
string(convoy.SearchIndexQueue): 1,
86+
string(convoy.ScheduleQueue): 1,
87+
string(convoy.DefaultQueue): 1,
88+
string(convoy.StreamQueue): 1,
89+
}
90+
opts := queue.QueueOptions{
91+
Names: queueNames,
92+
RedisClient: rdb,
93+
RedisAddress: cfg.Queue.Redis.Dsn,
94+
Type: string(config.RedisQueueProvider),
95+
PrometheusAddress: cfg.Prometheus.Dsn,
96+
}
97+
q = redisqueue.NewQueue(opts)
98+
}
99+
100+
lo := log.NewLogger(os.Stdout)
101+
102+
if cfg.Tracer.Type == config.NewRelicTracerProvider {
103+
tr, err = tracer.NewTracer(cfg, lo.WithLogger())
104+
if err != nil {
105+
return err
106+
}
107+
}
108+
109+
ca, err = cache.NewCache(cfg.Cache)
110+
if err != nil {
111+
return err
112+
}
113+
114+
li, err = limiter.NewLimiter(cfg.Limiter)
115+
if err != nil {
116+
return err
117+
}
118+
119+
se, err := searcher.NewSearchClient(cfg)
120+
if err != nil {
121+
return err
122+
}
123+
124+
postgresDB, err := postgres.NewDB(cfg)
125+
if err != nil {
126+
return err
127+
}
128+
129+
*db = *postgresDB
130+
131+
if ok := shouldCheckMigration(cmd); ok {
132+
err = checkPendingMigrations(db)
133+
if err != nil {
134+
return err
135+
}
136+
}
137+
138+
app.DB = postgresDB
139+
app.Queue = q
140+
app.Logger = lo
141+
app.Tracer = tr
142+
app.Cache = ca
143+
app.Limiter = li
144+
app.Searcher = se
145+
146+
if ok := shouldBootstrap(cmd); ok {
147+
err = ensureDefaultUser(context.Background(), app)
148+
if err != nil {
149+
return err
150+
}
151+
152+
err = ensureInstanceConfig(context.Background(), app, cfg)
153+
if err != nil {
154+
return err
155+
}
156+
}
157+
158+
return nil
159+
}
160+
}
161+
162+
func PostRun(app *cli.App, db *postgres.Postgres) func(cmd *cobra.Command, args []string) error {
163+
return func(cmd *cobra.Command, args []string) error {
164+
err := db.Close()
165+
if err == nil {
166+
os.Exit(0)
167+
}
168+
return err
169+
}
170+
}
171+
172+
func ensureInstanceConfig(ctx context.Context, a *cli.App, cfg config.Configuration) error {
173+
configRepo := postgres.NewConfigRepo(a.DB)
174+
175+
s3 := datastore.S3Storage{
176+
Bucket: null.NewString(cfg.StoragePolicy.S3.Bucket, true),
177+
AccessKey: null.NewString(cfg.StoragePolicy.S3.AccessKey, true),
178+
SecretKey: null.NewString(cfg.StoragePolicy.S3.SecretKey, true),
179+
Region: null.NewString(cfg.StoragePolicy.S3.Region, true),
180+
SessionToken: null.NewString(cfg.StoragePolicy.S3.SessionToken, true),
181+
Endpoint: null.NewString(cfg.StoragePolicy.S3.Endpoint, true),
182+
}
183+
184+
onPrem := datastore.OnPremStorage{
185+
Path: null.NewString(cfg.StoragePolicy.OnPrem.Path, true),
186+
}
187+
188+
storagePolicy := &datastore.StoragePolicyConfiguration{
189+
Type: datastore.StorageType(cfg.StoragePolicy.Type),
190+
S3: &s3,
191+
OnPrem: &onPrem,
192+
}
193+
194+
config, err := configRepo.LoadConfiguration(ctx)
195+
if err != nil {
196+
if errors.Is(err, datastore.ErrConfigNotFound) {
197+
a.Logger.Info("Creating Instance Config")
198+
return configRepo.CreateConfiguration(ctx, &datastore.Configuration{
199+
UID: ulid.Make().String(),
200+
StoragePolicy: storagePolicy,
201+
IsAnalyticsEnabled: cfg.Analytics.IsEnabled,
202+
IsSignupEnabled: cfg.Auth.IsSignupEnabled,
203+
CreatedAt: time.Now(),
204+
UpdatedAt: time.Now(),
205+
})
206+
}
207+
208+
return err
209+
}
210+
211+
config.StoragePolicy = storagePolicy
212+
config.IsSignupEnabled = cfg.Auth.IsSignupEnabled
213+
config.IsAnalyticsEnabled = cfg.Analytics.IsEnabled
214+
config.UpdatedAt = time.Now()
215+
216+
return configRepo.UpdateConfiguration(ctx, config)
217+
}
218+
219+
func buildCliConfiguration(cmd *cobra.Command) (*config.Configuration, error) {
220+
c := &config.Configuration{}
221+
222+
// CONVOY_DB_DSN, CONVOY_DB_TYPE
223+
dbDsn, err := cmd.Flags().GetString("db")
224+
if err != nil {
225+
return nil, err
226+
}
227+
228+
if !util.IsStringEmpty(dbDsn) {
229+
c.Database = config.DatabaseConfiguration{
230+
Type: config.PostgresDatabaseProvider,
231+
Dsn: dbDsn,
232+
}
233+
}
234+
235+
// CONVOY_REDIS_DSN
236+
redisDsn, err := cmd.Flags().GetString("redis")
237+
if err != nil {
238+
return nil, err
239+
}
240+
241+
// CONVOY_QUEUE_PROVIDER
242+
queueDsn, err := cmd.Flags().GetString("queue")
243+
if err != nil {
244+
return nil, err
245+
}
246+
247+
if !util.IsStringEmpty(queueDsn) {
248+
c.Queue.Type = config.QueueProvider(queueDsn)
249+
if queueDsn == "redis" && !util.IsStringEmpty(redisDsn) {
250+
c.Queue.Redis.Dsn = redisDsn
251+
}
252+
}
253+
254+
return c, nil
255+
}
256+
257+
func checkPendingMigrations(db database.Database) error {
258+
p, ok := db.(*postgres.Postgres)
259+
if !ok {
260+
return errors.New("failed to open database")
261+
}
262+
263+
type ID struct {
264+
Id string
265+
}
266+
counter := map[string]ID{}
267+
268+
files, err := convoy.MigrationFiles.ReadDir("sql")
269+
if err != nil {
270+
return err
271+
}
272+
273+
for _, file := range files {
274+
if !file.IsDir() {
275+
id := ID{Id: file.Name()}
276+
counter[id.Id] = id
277+
}
278+
}
279+
280+
rows, err := p.GetDB().Queryx("SELECT id FROM convoy.gorp_migrations")
281+
if err != nil {
282+
return err
283+
}
284+
285+
for rows.Next() {
286+
var id ID
287+
288+
err = rows.StructScan(&id)
289+
if err != nil {
290+
return err
291+
}
292+
293+
_, ok := counter[id.Id]
294+
if ok {
295+
delete(counter, id.Id)
296+
}
297+
}
298+
rows.Close()
299+
300+
if len(counter) > 0 {
301+
return postgres.ErrPendingMigrationsFound
302+
}
303+
304+
return nil
305+
}
306+
307+
func shouldCheckMigration(cmd *cobra.Command) bool {
308+
if cmd.Annotations == nil {
309+
return true
310+
}
311+
312+
val, ok := cmd.Annotations["CheckMigration"]
313+
if !ok {
314+
return true
315+
}
316+
317+
if val != "false" {
318+
return true
319+
}
320+
321+
return false
322+
}
323+
324+
func shouldBootstrap(cmd *cobra.Command) bool {
325+
if cmd.Annotations == nil {
326+
return true
327+
}
328+
329+
val, ok := cmd.Annotations["ShouldBootstrap"]
330+
if !ok {
331+
return true
332+
}
333+
334+
if val != "false" {
335+
return true
336+
}
337+
338+
return false
339+
}
340+
341+
func ensureDefaultUser(ctx context.Context, a *cli.App) error {
342+
pageable := datastore.Pageable{PerPage: 10, Direction: datastore.Next, NextCursor: datastore.DefaultCursor}
343+
userRepo := postgres.NewUserRepo(a.DB)
344+
users, _, err := userRepo.LoadUsersPaged(ctx, pageable)
345+
if err != nil {
346+
return fmt.Errorf("failed to load users - %w", err)
347+
}
348+
349+
if len(users) > 0 {
350+
return nil
351+
}
352+
353+
p := datastore.Password{Plaintext: "default"}
354+
err = p.GenerateHash()
355+
356+
if err != nil {
357+
return err
358+
}
359+
360+
defaultUser := &datastore.User{
361+
UID: ulid.Make().String(),
362+
FirstName: "default",
363+
LastName: "default",
364+
Email: "superuser@default.com",
365+
Password: string(p.Hash),
366+
EmailVerified: true,
367+
CreatedAt: time.Now(),
368+
UpdatedAt: time.Now(),
369+
}
370+
371+
err = userRepo.CreateUser(ctx, defaultUser)
372+
if err != nil {
373+
return fmt.Errorf("failed to create user - %w", err)
374+
}
375+
376+
a.Logger.Infof("Created Superuser with username: %s and password: %s", defaultUser.Email, p.Plaintext)
377+
378+
return nil
379+
}

0 commit comments

Comments
 (0)