Skip to content

Commit

Permalink
Graphd fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
ericvolp12 committed Oct 19, 2024
1 parent c1461ee commit d073c36
Show file tree
Hide file tree
Showing 12 changed files with 38 additions and 24 deletions.
2 changes: 1 addition & 1 deletion build/consumer/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Stage 1: Build the Go binary
FROM golang:1.22 AS builder
FROM golang:1.23.2 AS builder

# Create a directory for the application
WORKDIR /app
Expand Down
2 changes: 1 addition & 1 deletion build/feedgen-go/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.22 as bin-builder
FROM golang:1.23.2 as bin-builder

WORKDIR /app

Expand Down
2 changes: 1 addition & 1 deletion build/graphd/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Stage 1: Build the Go binary
FROM golang:1.22 AS builder
FROM golang:1.23.2 AS builder

# Create a directory for the application
WORKDIR /app
Expand Down
2 changes: 1 addition & 1 deletion build/indexer/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.22 as builder
FROM golang:1.23.2 as builder

WORKDIR /app

Expand Down
2 changes: 1 addition & 1 deletion build/jazbot/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Stage 1: Build the Go binary
FROM golang:1.22 AS builder
FROM golang:1.23.2 AS builder

# Create a directory for the application
WORKDIR /app
Expand Down
2 changes: 1 addition & 1 deletion build/search/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.22 as builder
FROM golang:1.23.2 as builder

WORKDIR /app

Expand Down
11 changes: 5 additions & 6 deletions cmd/skypub/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"context"
"fmt"
"log"
"log/slog"
"os"
Expand Down Expand Up @@ -80,11 +79,11 @@ func Skypub(cctx *cli.Context) error {
return err
}

// feedConsumer.AddFeed("https://jazco.dev/feed", "did:plc:q6gjnaw2blty4crticxkmujt")
_, err = feedConsumer.AddUser(ctx, cctx.String("handle"), cctx.String("app-password"))
if err != nil {
return fmt.Errorf("failed to add user: %w", err)
}
feedConsumer.AddFeed("https://www.theverge.com/rss/index.xml", "did:plc:cph4u2yadis3hopldcbz7b5h")
// _, err = feedConsumer.AddUser(ctx, cctx.String("handle"), cctx.String("app-password"))
// if err != nil {
// return fmt.Errorf("failed to add user: %w", err)
// }

go feedConsumer.Start()

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de
github.com/bits-and-blooms/bloom/v3 v3.5.0
github.com/bluesky-social/indigo v0.0.0-20240905024844-a4f38639767f
github.com/bluesky-social/jetstream v0.0.0-20240922064035-41ad263b38d5
github.com/bluesky-social/jetstream v0.0.0-20241016204641-011b54588f28
github.com/ericvolp12/go-gin-prometheus v0.0.0-20221219081010-fc0e0436c283
github.com/ericvolp12/jwt-go-secp256k1 v0.0.2
github.com/gin-contrib/cors v1.4.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ github.com/bits-and-blooms/bloom/v3 v3.5.0 h1:AKDvi1V3xJCmSR6QhcBfHbCN4Vf8FfxeWk
github.com/bits-and-blooms/bloom/v3 v3.5.0/go.mod h1:Y8vrn7nk1tPIlmLtW2ZPV+W7StdVMor6bC1xgpjMZFs=
github.com/bluesky-social/indigo v0.0.0-20240905024844-a4f38639767f h1:Q9cfCAlYWIWPsSDhg5w6qcutQ7YaJtfTjiRLP/mw+pc=
github.com/bluesky-social/indigo v0.0.0-20240905024844-a4f38639767f/go.mod h1:Zx9nSWgd/FxMenkJW07VKnzspxpHBdPrPmS+Fspl2I0=
github.com/bluesky-social/jetstream v0.0.0-20240922064035-41ad263b38d5 h1:imOjtqB5VD7P6vS8TOG+GEFGHyNWHWijlFqiz6PJVeI=
github.com/bluesky-social/jetstream v0.0.0-20240922064035-41ad263b38d5/go.mod h1:/GMZrwKMbAiWU2MVCswH8+Jd3ybrpkHX4qM6tTfyv1k=
github.com/bluesky-social/jetstream v0.0.0-20241016204641-011b54588f28 h1:5S+hUmVOHRUdEC7XSjS5SQrBtGT/LHfzEpkNH00VnVI=
github.com/bluesky-social/jetstream v0.0.0-20241016204641-011b54588f28/go.mod h1:/dE2dmFell/m4zxgIbH3fkiqZ1obzr/ETj4RpgomgMs=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
github.com/bsm/ginkgo/v2 v2.7.0/go.mod h1:AiKlXPm7ItEHNc/2+OkrNG4E0ITzojb9/xWzvQ9XZ9w=
Expand Down
14 changes: 7 additions & 7 deletions pkg/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func (c *Consumer) OnCommit(ctx context.Context, evt *models.Event) error {
return nil
}

log := c.Logger.With("repo", evt.Did, "seq", evt.TimeUS, "commit", evt.Commit, "action", evt.Commit.OpType, "collection", evt.Commit.Collection)
log := c.Logger.With("repo", evt.Did, "seq", evt.TimeUS, "commit", evt.Commit, "action", evt.Commit.Operation, "collection", evt.Commit.Collection)

// Parse time from the event time string
evtCreatedAt := time.UnixMicro(evt.TimeUS)
Expand All @@ -310,16 +310,16 @@ func (c *Consumer) OnCommit(ctx context.Context, evt *models.Event) error {
if _, ok := knownCollections[evt.Commit.Collection]; !ok {
metricCollection = "unknown"
}
opsProcessedCounter.WithLabelValues(evt.Commit.OpType, metricCollection, c.SocketURL).Inc()
opsProcessedCounter.WithLabelValues(evt.Commit.Operation, metricCollection, c.SocketURL).Inc()

// recordURI := "at://" + evt.Repo + "/" + op.Path
span.SetAttributes(attribute.String("repo", evt.Did))
span.SetAttributes(attribute.String("collection", evt.Commit.Collection))
span.SetAttributes(attribute.String("rkey", evt.Commit.RKey))
span.SetAttributes(attribute.Int64("seq", evt.TimeUS))
span.SetAttributes(attribute.String("event_kind", evt.Commit.OpType))
switch evt.Commit.OpType {
case models.CommitCreateRecord:
span.SetAttributes(attribute.String("event_kind", evt.Commit.Operation))
switch evt.Commit.Operation {
case models.CommitOperationCreate:
recCreatedAt, err := c.HandleCreateRecord(ctx, evt.Did, evt.Commit.Collection, evt.Commit.RKey, evt.Commit.Record)
if err != nil {
log.Errorf("failed to handle create record: %+v", err)
Expand All @@ -330,7 +330,7 @@ func (c *Consumer) OnCommit(ctx context.Context, evt *models.Event) error {
lastEvtCreatedRecordCreatedGapGauge.WithLabelValues(c.SocketURL).Set(float64(evtCreatedAt.Sub(*recCreatedAt).Seconds()))
lastRecordCreatedEvtProcessedGapGauge.WithLabelValues(c.SocketURL).Set(float64(processedAt.Sub(*recCreatedAt).Seconds()))
}
case models.CommitUpdateRecord:
case models.CommitOperationUpdate:
// Unpack the record and process it
switch evt.Commit.Collection {
case "app.bsky.actor.profile":
Expand Down Expand Up @@ -372,7 +372,7 @@ func (c *Consumer) OnCommit(ctx context.Context, evt *models.Event) error {
log.Errorf("failed to upsert actor from firehose: %+v", err)
}
}
case models.CommitDeleteRecord:
case models.CommitOperationDelete:
err := c.HandleDeleteRecord(ctx, evt.Did, evt.Commit.Collection, evt.Commit.RKey)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
Expand Down
7 changes: 5 additions & 2 deletions pkg/graphd/bitmapper/bitmapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/RoaringBitmap/roaring"
"github.com/hashicorp/golang-lru/arc/v2"
lru "github.com/hashicorp/golang-lru/v2"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
)
Expand Down Expand Up @@ -57,7 +58,7 @@ type Group struct {
// LRU Cache for Entity Bitmaps
entities *arc.ARCCache[uint32, *Entity]
cacheLock sync.Mutex
dbCache *arc.ARCCache[int, *sql.DB]
dbCache *lru.Cache[int, *sql.DB]

// Bookkeeping for Persisting the Group
dbDir string
Expand Down Expand Up @@ -100,7 +101,9 @@ func NewGroup(ctx context.Context, cfg GroupConfig) (*Group, error) {
return nil, fmt.Errorf("failed to create entity cache: %w", err)
}

dbCache, err := arc.NewARC[int, *sql.DB](100)
dbCache, err := lru.NewWithEvict(1000, func(key int, db *sql.DB) {
db.Close()
})
if err != nil {
return nil, fmt.Errorf("failed to create db cache: %w", err)
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/usercount/usercount.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,18 @@ var PDSHostList = []string{
"https://lobster.us-east.host.bsky.network",
"https://magic.us-west.host.bsky.network",
"https://woodear.us-west.host.bsky.network",
"https://cordyceps.us-west.host.bsky.network",
"https://ganoderma.us-west.host.bsky.network",
"https://bracket.us-west.host.bsky.network",
"https://button.us-west.host.bsky.network",
"https://chanterelle.us-west.host.bsky.network",
"https://cremini.us-west.host.bsky.network",
"https://gomphus.us-west.host.bsky.network",
"https://matsutake.us-west.host.bsky.network",
"https://milkcap.us-west.host.bsky.network",
"https://shaggymane.us-west.host.bsky.network",
"https://stinkhorn.us-west.host.bsky.network",
"https://witchesbutter.us-west.host.bsky.network",
}

type PDS struct {
Expand Down

0 comments on commit d073c36

Please sign in to comment.