-
Notifications
You must be signed in to change notification settings - Fork 90
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: implement analytics recoder #4388
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall
pkg/analytics/analytics.go
Outdated
|
||
func New(ctx context.Context, opts ...Option) (*LogRecorder, error) { | ||
config := &Config{ | ||
otlpEndpoint: "localhost:4317", // Default endpoint |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does it mean to have a default local endpoint?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its the endpoint we will be sending data to, e.g. telemetry.expanso.io
- this is a temporary solution for local development.
pkg/analytics/analytics.go
Outdated
if err := a.provider.ForceFlush(ctx); err != nil { | ||
return fmt.Errorf("failed to flush analytics: %w", err) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will this block and delay shutdown until ctx times out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure what you mean. This function will return when ForceFlush
and Shutdown
return, or when the passed ctx
times out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
force flush means all buffered events will be flushed to the collector before it exists, or at least thats how it sounds like. If you want to do that, then you need to define a timeout and an aggressive one as pubilshing telemetry should be best effort
pkg/analytics/analytics.go
Outdated
otellog.String(EventKey, string(event)), | ||
otellog.Map(PropertiesKey, properties...), | ||
) | ||
a.provider.Logger("bacalhau-analytics").Emit(ctx, record) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we create the logger in the constructor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logger name is used as the Scope
field from an event - and an event emitted looks roughly like this:
{
"Timestamp": "2024-09-12T21:48:22.150123874Z",
"ObservedTimestamp": "2024-09-12T14:48:22.150128342-07:00",
"Severity": 0,
"SeverityText": "",
"Body": {
"Type": "Empty",
"Value": null
},
"Attributes": [
...<omitted>...
],
"Scope": {
"Name": "bacalhau-analytics",
"Version": "",
"SchemaURL": ""
},
"DroppedAttributes": 0
}
Eventually I image we could use the logger name to differentiate between events from clients and servers, or possibly something else. For now I'll define a constant for this field (suggestions?) and create it in the constructor.
pkg/analytics/analytics.go
Outdated
record.SetTimestamp(time.Now().UTC()) | ||
record.AddAttributes( | ||
otellog.String(EventKey, string(event)), | ||
otellog.Map(PropertiesKey, properties...), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to make things flexible and support more event types with minimal effort, we can just publish properties as a json string as done in the poc https://github.com/bacalhau-project/telemetry-poc/blob/main/app/models.go#L35
though this also works and I see its value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The pattern I've implement here forces us to be specific with the fields we extract from jobs/executions/evaluations/models, removes the possibility of encountering an error while extracting fields (cuz no json marshal), and also provides a type system for each event:
{
"Key": "properties",
"Value": {
"Type": "Map",
"Value": [
{
"Key": "id",
"Value": {
"Type": "String",
"Value": "j-67bff92a-5938-4324-9a1d-5226371e41b1"
}
},
{
"Key": "name",
"Value": {
"Type": "String",
"Value": "j-67bff92a-5938-4324-9a1d-5226371e41b1"
}
},
{
"Key": "namespace",
"Value": {
"Type": "String",
"Value": "default"
}
},
{
"Key": "type",
"Value": {
"Type": "String",
"Value": "batch"
}
},
{
"Key": "count",
"Value": {
"Type": "Int64",
"Value": 1
}
},
{
"Key": "state",
"Value": {
"Type": "String",
"Value": "Completed"
}
},
The alternative is to implement specific types for each event as was done in the POC, e.g.
type Job struct {
ID string `json:"id"`
Type string `json:"type"`
Image string `json:"image"`
CPUCores int `json:"cpu_cores"`
MemoryMB int `json:"memory_mb"`
}
func stubJob() Job {
return Job{
ID: uuid.NewString(),
Type: "docker",
Image: "ubuntu:latest",
CPUCores: 2,
MemoryMB: 4096,
}
}
Which would you prefer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Parsing and processing the first version is going to be a pain, and I don't think it will play nicely with posthog. My preference is having specific types and publish their json format. This simplies post processing as well when we just deserialize to the same type easily and process. lets just be consistent with using pascalcase everywhere
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good, I will create an event structure rather than use attributes.
pkg/analytics/util.go
Outdated
otellog.String("name", j.Name), | ||
otellog.String("namespace", j.Namespace), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might contain PII. We are only interested in knowing if these values are defined or not. Meaning true if name doesn't equal id, and hash of namespace which we can know if they are using default or different values
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it.
If name == id omit the name.
if name != id hash the name and include the digest.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
name_set = name != id # helps understand if name is being set. don't care about its hash or how many names they have
namespace = hash(namespace) # helps with understanding how many namespaces a single network can have
pkg/analytics/util.go
Outdated
otellog.String("task_cpu", t.ResourcesConfig.CPU), | ||
otellog.String("task_memory", t.ResourcesConfig.Memory), | ||
otellog.String("task_disk", t.ResourcesConfig.Disk), | ||
otellog.String("task_gpu", t.ResourcesConfig.GPU), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should publish the converted resource values to have a normalized view of allocated resources. Meaning 1GB and 1024MB should be published as a single value, which should be bytes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Meaning record an integer value representing the disk and memory requirements (in bytes) of the job, yeah?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct. Handle failure gracefuly and set zero values instead of propagating the error
pkg/jobstore/boltdb/store.go
Outdated
func WithRecorder(r analytics.Recorder) Option { | ||
return func(store *BoltJobStore) { | ||
store.recorder = r | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't sound right to have the recorder inside boltdb. Maybe a better place is a watcher, but the new watch library is not integrated with the job store yet. Please add a TODO to revisit this in the future
pkg/jobstore/boltdb/store.go
Outdated
@@ -1000,6 +1009,7 @@ func (b *BoltJobStore) updateJobState(tx *bolt.Tx, request jobstore.UpdateJobSta | |||
} | |||
|
|||
if job.IsTerminal() { | |||
b.recorder.EmitJobEvent(context.TODO(), analytics.JobComplete, job) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JobComplete
is not an accurate event type as also failed and cancelled events are published in the same event, but Completed means successful compeletion in our job states. Just Job
for now will do
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Failed and Cancelled jobs are "complete", yea? What if we called it Job_Terminal
? The nature of a jobs terminality can be gathered from inspecting its State
which is included in the data we send.
By no means am I arguing against your proposal - just trying to think of a more specific name for this event which is recorded when the job reaches a terminal state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
job completed means successful completion
Lines 33 to 35 in 16538b3
// JobStateTypeCompleted is the state of a job that has successfully completed. | |
// Only valid for batch jobs. | |
JobStateTypeCompleted |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant completed in the sense that the job is no longer running - it's stopped, terminal, etc.. I'll just call this Job
cmd/cli/serve/serve.go
Outdated
} | ||
} | ||
|
||
requesterConfig, err := GetRequesterConfig(cfg, isRequesterNode, recorder) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unpopular opinion, it might make sense to instantiate a global analytic recorder similar to how loggers, and otel providers work. This avoids having to pass the recorder around specially that we will be emitting events in different places
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't use a global here since we'd like to only create a recorder if the configuration allows it.
Our logger, and other otel providers can all be configured with environment variables which makes it easier to configure them inside an init()
method.
I'll leave the decision here up to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know you didn't use globals in your implementation, but it is a common practice and we are already doing it today with setting otel trace and metric providers
bacalhau/pkg/telemetry/metrics.go
Line 39 in 246a2bd
otel.SetMeterProvider(meterProvider) |
globals doesn't mean you set them inside an init()
, but wherver you set them you then assign them to a global reference
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay I will set it as the global provider.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One issue I've encountered with this approach (using a global) is that errors are handled via https://github.com/bacalhau-project/bacalhau/blob/main/pkg/telemetry/common.go#L17 and surfaced to the user. Currently exploring options to mitigated this.
pkg/analytics/analytics.go
Outdated
a.provider.Logger("bacalhau-analytics").Emit(ctx, record) | ||
} | ||
|
||
func (a *LogRecorder) EmitJobEvent(ctx context.Context, event EventType, j models.Job) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remember we need to collect information about executions as well to understand how often failures happen before job is completed, how long jobs in the queue before execution is created, how evenly distributed node selection is, and many more
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can achieve this with a new event type and method, yea? Or could simply call EmitEvent
which this method uses.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am trying to simplify processing by having a single self contained event. better to have this in a single event
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay I will create an event structure rather than use attributes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a bug with logging errors. you can merge after addressing it
pkg/telemetry/common.go
Outdated
} else { | ||
log.Err(err).Msg("Error occurred while handling spans") | ||
} | ||
default: | ||
// fall through and log the error | ||
} | ||
log.Err(err).Msg("Error occurred while handling spans") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you are still logging the error at the end and will log the error twice for non analytics errors
No description provided.