Skip to content

Commit

Permalink
Refactoring image processing
Browse files Browse the repository at this point in the history
  • Loading branch information
ericvolp12 committed Sep 16, 2024
1 parent 62f3276 commit a1cba9b
Show file tree
Hide file tree
Showing 12 changed files with 244 additions and 130 deletions.
2 changes: 1 addition & 1 deletion cmd/feed-generator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func FeedGenerator(cctx *cli.Context) error {

// Create a postlabel feed
log.Print("initializing postlabel feed")
postLabelFeed, postLabelFeedAliases, err := postlabel.NewFeed(ctx, feedActorDID, postRegistry)
postLabelFeed, postLabelFeedAliases, err := postlabel.NewFeed(ctx, feedActorDID, store)
if err != nil {
log.Fatalf("Failed to create PostLabelFeed: %v", err)
}
Expand Down
72 changes: 31 additions & 41 deletions cmd/indexer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"log"
Expand Down Expand Up @@ -648,14 +647,9 @@ func (index *Index) IndexImages(ctx context.Context, pageSize int32) {
logger.Info("Processing images...")
start := time.Now()

unprocessedImages, err := index.PostRegistry.GetUnprocessedImages(ctx, pageSize)
unprocessedImages, err := index.Store.Queries.ListImagesToProcess(ctx, pageSize)
if err != nil {
if errors.As(err, &search.NotFoundError{}) {
logger.Info("No unprocessed images found, skipping process cycle...")

} else {
logger.Error("Failed to get unprocessed images, skipping process cycle", "error", err)
}
logger.Error("Failed to get unprocessed images", "error", err)
return
}

Expand All @@ -664,17 +658,22 @@ func (index *Index) IndexImages(ctx context.Context, pageSize int32) {
return
}

imgMap := make(map[string]int64, len(unprocessedImages))
imageIDs := make([]int64, len(unprocessedImages))
for i, img := range unprocessedImages {
imgMap[fmt.Sprintf("%s_%s_%s", img.PostActorDid, img.PostRkey, img.Cid)] = img.SubjectID
imageIDs[i] = img.ID
}

imagesIndexedCounter.Add(float64(len(unprocessedImages)))

imageMetas := make([]*objectdetection.ImageMeta, len(unprocessedImages))
for i, image := range unprocessedImages {
imageMetas[i] = &objectdetection.ImageMeta{
PostID: image.PostID,
ActorDID: image.AuthorDID,
CID: image.CID,
URL: fmt.Sprintf("https://cdn.bsky.app/img/feed_thumbnail/plain/%s/%s@jpeg", image.AuthorDID, image.CID),
MimeType: image.MimeType,
CreatedAt: image.CreatedAt,
PostID: image.PostRkey,
ActorDID: image.PostActorDid,
CID: image.Cid,
URL: fmt.Sprintf("https://cdn.bsky.app/img/feed_thumbnail/plain/%s/%s@jpeg", image.PostActorDid, image.Cid),
}
}

Expand All @@ -684,8 +683,6 @@ func (index *Index) IndexImages(ctx context.Context, pageSize int32) {
return
}

executionTime := time.Now()

successCount := atomic.NewInt32(0)

sem := semaphore.NewWeighted(10)
Expand All @@ -701,24 +698,6 @@ func (index *Index) IndexImages(ctx context.Context, pageSize int32) {
successCount.Inc()
}

cvClasses, err := json.Marshal(result.Results)
if err != nil {
logger.Error("Failed to marshal classes", "error", err)
return
}

err = index.PostRegistry.AddCVDataToImage(
ctx,
result.Meta.CID,
result.Meta.PostID,
executionTime,
cvClasses,
)
if err != nil {
logger.Error("Failed to update image", "error", err)
return
}

imageLabels := []string{}
for _, class := range result.Results {
if class.Confidence >= 0.75 {
Expand All @@ -728,17 +707,22 @@ func (index *Index) IndexImages(ctx context.Context, pageSize int32) {

for _, label := range imageLabels {
postLabel := fmt.Sprintf("%s:%s", "cv", label)
err = index.PostRegistry.AddPostLabel(ctx, result.Meta.PostID, result.Meta.ActorDID, postLabel)
if err != nil {
logger.Error("Failed to add label to post", "error", err)

// Get the SubjectID for the post
subjectID, ok := imgMap[fmt.Sprintf("%s_%s_%s", result.Meta.ActorDID, result.Meta.PostID, result.Meta.CID)]
if !ok {
logger.Error("Failed to get SubjectID for image", "actor_did", result.Meta.ActorDID, "post_id", result.Meta.PostID, "cid", result.Meta.CID)
continue
}

err = index.Store.Queries.CreateRecentPostLabel(ctx, store_queries.CreateRecentPostLabelParams{
ActorDid: result.Meta.ActorDID,
Rkey: result.Meta.PostID,
Label: postLabel,
ActorDid: result.Meta.ActorDID,
Rkey: result.Meta.PostID,
Label: postLabel,
SubjectID: sql.NullInt64{Int64: subjectID, Valid: true},
})
if err != nil {
logger.Error("Failed to create recent post label", "error", err)
logger.Error("Failed to create recent post label", "error", err, "actor_did", result.Meta.ActorDID, "rkey", result.Meta.PostID, "label", postLabel)
}
}
}(result)
Expand All @@ -748,6 +732,12 @@ func (index *Index) IndexImages(ctx context.Context, pageSize int32) {
logger.Error("Failed to acquire semaphore", "error", err)
}

// Dequeue the images
err = index.Store.Queries.DequeueImages(ctx, imageIDs)
if err != nil {
logger.Error("Failed to dequeue images", "error", err)
}

successes := int(successCount.Load())

successfullyIndexedImagesCounter.Add(float64(successes))
Expand Down
10 changes: 10 additions & 0 deletions pkg/consumer/post.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,16 @@ func (c *Consumer) HandleCreatePost(ctx context.Context, repo, rkey string, inde
if err != nil {
log.Errorf("failed to create image: %+v", err)
}
err = c.Store.Queries.EnqueueImage(ctx, store_queries.EnqueueImageParams{
Cid: img.Image.Ref.String(),
PostActorDid: repo,
PostRkey: rkey,
SubjectID: subj.ID,
AltText: sql.NullString{String: img.Alt, Valid: img.Alt != ""},
})
if err != nil {
log.Errorf("failed to enqueue image: %+v", err)
}
}
}

Expand Down
17 changes: 17 additions & 0 deletions pkg/consumer/store/queries/images.sql
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,20 @@ WHERE post_actor_did = $1
AND post_rkey = $2
ORDER BY created_at DESC
LIMIT $3;
-- name: EnqueueImage :exec
INSERT INTO images_to_process (
cid,
post_actor_did,
post_rkey,
subject_id,
alt_text
)
VALUES ($1, $2, $3, $4, $5);
-- name: ListImagesToProcess :many
SELECT *
FROM images_to_process
ORDER BY id ASC
LIMIT $1;
-- name: DequeueImages :exec
DELETE FROM images_to_process
WHERE id = ANY($1::BIGINT []);
10 changes: 5 additions & 5 deletions pkg/consumer/store/queries/recent_post_labels.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
-- name: CreateRecentPostLabel :exec
INSERT INTO recent_post_labels(actor_did, rkey, label)
VALUES ($1, $2, $3);
INSERT INTO recent_post_labels(actor_did, rkey, label, subject_id)
VALUES ($1, $2, $3, $4);
-- name: DeleteRecentPostLabel :exec
DELETE FROM recent_post_labels
WHERE actor_did = $1
Expand All @@ -14,10 +14,10 @@ WHERE actor_did = $1
ORDER BY label ASC;
-- name: ListRecentPostsByLabelHot :many
SELECT l.actor_did,
l.rkey
l.rkey,
rp.score
FROM recent_post_labels l
JOIN recent_posts_with_score rp ON l.actor_did = rp.actor_did
AND l.rkey = rp.rkey
JOIN recent_posts_with_score rp ON l.subject_id = rp.subject_id
WHERE label = $1
AND score < coalesce(sqlc.narg('score')::float, 100000)
ORDER BY score DESC
Expand Down
1 change: 1 addition & 0 deletions pkg/consumer/store/schema/feeds.sql
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ CREATE TABLE recent_post_labels (
actor_did TEXT NOT NULL,
rkey TEXT NOT NULL,
label TEXT NOT NULL,
subject_id BIGINT,
PRIMARY KEY (actor_did, rkey, label)
);
CREATE INDEX recent_post_labels_label_idx ON recent_post_labels (label);
Expand Down
8 changes: 8 additions & 0 deletions pkg/consumer/store/schema/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,14 @@ CREATE TABLE images (
PRIMARY KEY (post_actor_did, post_rkey, cid)
);
create index images_created_at_index on images (created_at desc);
CREATE TABLE images_to_process (
id BIGSERIAL PRIMARY KEY,
cid TEXT NOT NULL,
post_actor_did TEXT NOT NULL,
post_rkey TEXT NOT NULL,
subject_id BIGINT NOT NULL,
alt_text TEXT
);
-- Backfill Status
CREATE TABLE repo_backfill_status (
repo TEXT NOT NULL,
Expand Down
30 changes: 30 additions & 0 deletions pkg/consumer/store/store_queries/db.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit a1cba9b

Please sign in to comment.