Skip to content

Commit

Permalink
using channels and semaphore logic to do archivation concurrently
Browse files Browse the repository at this point in the history
  • Loading branch information
rasoro committed Jan 7, 2025
1 parent 6f65fa3 commit a81f151
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 18 deletions.
4 changes: 4 additions & 0 deletions archives/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type Config struct {

RollupOrgTimeout int `help:"rollup timeout for all org archives, limit in hours (default 3)"`
BuildRollupArchiveTimeout int `help:"rollup for single archive timeout, limit in hours (default 1)"`

MaxConcurrentArchivation int `help:"max concurrent org archivation (default 2)"`
}

// NewConfig returns a new default configuration object
Expand Down Expand Up @@ -58,6 +60,8 @@ func NewConfig() *Config {

RollupOrgTimeout: 3,
BuildRollupArchiveTimeout: 1,

MaxConcurrentArchivation: 2,
}

return &config
Expand Down
43 changes: 25 additions & 18 deletions cmd/rp-archiver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,29 @@ func main() {
logrus.WithError(err).Fatal("cannot write to temp directory")
}

semaphore := make(chan struct{}, config.MaxConcurrentArchivation)

archiveTask := func(org archives.Org) {
// no single org should take more than 12 hours
ctx, cancel := context.WithTimeout(context.Background(), time.Hour*12)

log := logrus.WithField("org", org.Name).WithField("org_id", org.ID)

if config.ArchiveMessages {
_, _, err = archives.ArchiveOrg(ctx, time.Now(), config, db, s3Client, org, archives.MessageType)
if err != nil {
log.WithError(err).WithField("archive_type", archives.MessageType).Error("error archiving org messages")
}
}
if config.ArchiveRuns {
_, _, err = archives.ArchiveOrg(ctx, time.Now(), config, db, s3Client, org, archives.RunType)
if err != nil {
log.WithError(err).WithField("archive_type", archives.RunType).Error("error archiving org runs")
}
}
cancel()
}

for {
start := time.Now().In(time.UTC)

Expand Down Expand Up @@ -110,24 +133,8 @@ func main() {

// for each org, do our export
for _, org := range orgs {
// no single org should take more than 12 hours
ctx, cancel := context.WithTimeout(context.Background(), time.Hour*12)
log := logrus.WithField("org", org.Name).WithField("org_id", org.ID)

if config.ArchiveMessages {
_, _, err = archives.ArchiveOrg(ctx, time.Now(), config, db, s3Client, org, archives.MessageType)
if err != nil {
log.WithError(err).WithField("archive_type", archives.MessageType).Error("error archiving org messages")
}
}
if config.ArchiveRuns {
_, _, err = archives.ArchiveOrg(ctx, time.Now(), config, db, s3Client, org, archives.RunType)
if err != nil {
log.WithError(err).WithField("archive_type", archives.RunType).Error("error archiving org runs")
}
}

cancel()
semaphore <- struct{}{}
go archiveTask(org)
}

// ok, we did all our work for our orgs, quit if so configured or sleep until the next day
Expand Down

0 comments on commit a81f151

Please sign in to comment.