Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement analytics recoder #4388

Merged
merged 6 commits into from
Sep 17, 2024
Merged

feat: implement analytics recoder #4388

merged 6 commits into from
Sep 17, 2024

Conversation

frrist
Copy link
Member

@frrist frrist commented Sep 9, 2024

No description provided.

Copy link
Member

@wdbaruni wdbaruni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall


func New(ctx context.Context, opts ...Option) (*LogRecorder, error) {
config := &Config{
otlpEndpoint: "localhost:4317", // Default endpoint
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does it mean to have a default local endpoint?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its the endpoint we will be sending data to, e.g. telemetry.expanso.io - this is a temporary solution for local development.

Comment on lines 85 to 87
if err := a.provider.ForceFlush(ctx); err != nil {
return fmt.Errorf("failed to flush analytics: %w", err)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this block and delay shutdown until ctx times out?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what you mean. This function will return when ForceFlush and Shutdown return, or when the passed ctx times out.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

force flush means all buffered events will be flushed to the collector before it exists, or at least thats how it sounds like. If you want to do that, then you need to define a timeout and an aggressive one as pubilshing telemetry should be best effort

otellog.String(EventKey, string(event)),
otellog.Map(PropertiesKey, properties...),
)
a.provider.Logger("bacalhau-analytics").Emit(ctx, record)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we create the logger in the constructor?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logger name is used as the Scope field from an event - and an event emitted looks roughly like this:

{
  "Timestamp": "2024-09-12T21:48:22.150123874Z",
  "ObservedTimestamp": "2024-09-12T14:48:22.150128342-07:00",
  "Severity": 0,
  "SeverityText": "",
  "Body": {
    "Type": "Empty",
    "Value": null
  },
  "Attributes": [
   ...<omitted>...
  ],
  "Scope": {
    "Name": "bacalhau-analytics",
    "Version": "",
    "SchemaURL": ""
  },
  "DroppedAttributes": 0
}

Eventually I image we could use the logger name to differentiate between events from clients and servers, or possibly something else. For now I'll define a constant for this field (suggestions?) and create it in the constructor.

record.SetTimestamp(time.Now().UTC())
record.AddAttributes(
otellog.String(EventKey, string(event)),
otellog.Map(PropertiesKey, properties...),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to make things flexible and support more event types with minimal effort, we can just publish properties as a json string as done in the poc https://github.com/bacalhau-project/telemetry-poc/blob/main/app/models.go#L35

though this also works and I see its value

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pattern I've implement here forces us to be specific with the fields we extract from jobs/executions/evaluations/models, removes the possibility of encountering an error while extracting fields (cuz no json marshal), and also provides a type system for each event:

 {
      "Key": "properties",
      "Value": {
        "Type": "Map",
        "Value": [
          {
            "Key": "id",
            "Value": {
              "Type": "String",
              "Value": "j-67bff92a-5938-4324-9a1d-5226371e41b1"
            }
          },
          {
            "Key": "name",
            "Value": {
              "Type": "String",
              "Value": "j-67bff92a-5938-4324-9a1d-5226371e41b1"
            }
          },
          {
            "Key": "namespace",
            "Value": {
              "Type": "String",
              "Value": "default"
            }
          },
          {
            "Key": "type",
            "Value": {
              "Type": "String",
              "Value": "batch"
            }
          },
          {
            "Key": "count",
            "Value": {
              "Type": "Int64",
              "Value": 1
            }
          },
          {
            "Key": "state",
            "Value": {
              "Type": "String",
              "Value": "Completed"
            }
          },

The alternative is to implement specific types for each event as was done in the POC, e.g.

type Job struct {
	ID       string `json:"id"`
	Type     string `json:"type"`
	Image    string `json:"image"`
	CPUCores int    `json:"cpu_cores"`
	MemoryMB int    `json:"memory_mb"`
}

func stubJob() Job {
	return Job{
		ID:       uuid.NewString(),
		Type:     "docker",
		Image:    "ubuntu:latest",
		CPUCores: 2,
		MemoryMB: 4096,
	}
}

Which would you prefer?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parsing and processing the first version is going to be a pain, and I don't think it will play nicely with posthog. My preference is having specific types and publish their json format. This simplies post processing as well when we just deserialize to the same type easily and process. lets just be consistent with using pascalcase everywhere

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, I will create an event structure rather than use attributes.

Comment on lines 12 to 13
otellog.String("name", j.Name),
otellog.String("namespace", j.Namespace),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might contain PII. We are only interested in knowing if these values are defined or not. Meaning true if name doesn't equal id, and hash of namespace which we can know if they are using default or different values

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.
If name == id omit the name.
if name != id hash the name and include the digest.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

name_set = name != id # helps understand if name is being set. don't care about its hash or how many names they have
namespace = hash(namespace) # helps with understanding how many namespaces a single network can have

Comment on lines 39 to 42
otellog.String("task_cpu", t.ResourcesConfig.CPU),
otellog.String("task_memory", t.ResourcesConfig.Memory),
otellog.String("task_disk", t.ResourcesConfig.Disk),
otellog.String("task_gpu", t.ResourcesConfig.GPU),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should publish the converted resource values to have a normalized view of allocated resources. Meaning 1GB and 1024MB should be published as a single value, which should be bytes

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Meaning record an integer value representing the disk and memory requirements (in bytes) of the job, yeah?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. Handle failure gracefuly and set zero values instead of propagating the error

Comment on lines 66 to 71
func WithRecorder(r analytics.Recorder) Option {
return func(store *BoltJobStore) {
store.recorder = r
}
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't sound right to have the recorder inside boltdb. Maybe a better place is a watcher, but the new watch library is not integrated with the job store yet. Please add a TODO to revisit this in the future

@@ -1000,6 +1009,7 @@ func (b *BoltJobStore) updateJobState(tx *bolt.Tx, request jobstore.UpdateJobSta
}

if job.IsTerminal() {
b.recorder.EmitJobEvent(context.TODO(), analytics.JobComplete, job)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JobComplete is not an accurate event type as also failed and cancelled events are published in the same event, but Completed means successful compeletion in our job states. Just Job for now will do

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Failed and Cancelled jobs are "complete", yea? What if we called it Job_Terminal? The nature of a jobs terminality can be gathered from inspecting its State which is included in the data we send.
By no means am I arguing against your proposal - just trying to think of a more specific name for this event which is recorded when the job reaches a terminal state.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

job completed means successful completion

// JobStateTypeCompleted is the state of a job that has successfully completed.
// Only valid for batch jobs.
JobStateTypeCompleted

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant completed in the sense that the job is no longer running - it's stopped, terminal, etc.. I'll just call this Job

}
}

requesterConfig, err := GetRequesterConfig(cfg, isRequesterNode, recorder)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unpopular opinion, it might make sense to instantiate a global analytic recorder similar to how loggers, and otel providers work. This avoids having to pass the recorder around specially that we will be emitting events in different places

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't use a global here since we'd like to only create a recorder if the configuration allows it.
Our logger, and other otel providers can all be configured with environment variables which makes it easier to configure them inside an init() method.
I'll leave the decision here up to you.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know you didn't use globals in your implementation, but it is a common practice and we are already doing it today with setting otel trace and metric providers

otel.SetMeterProvider(meterProvider)

globals doesn't mean you set them inside an init(), but wherver you set them you then assign them to a global reference

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay I will set it as the global provider.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One issue I've encountered with this approach (using a global) is that errors are handled via https://github.com/bacalhau-project/bacalhau/blob/main/pkg/telemetry/common.go#L17 and surfaced to the user. Currently exploring options to mitigated this.

a.provider.Logger("bacalhau-analytics").Emit(ctx, record)
}

func (a *LogRecorder) EmitJobEvent(ctx context.Context, event EventType, j models.Job) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remember we need to collect information about executions as well to understand how often failures happen before job is completed, how long jobs in the queue before execution is created, how evenly distributed node selection is, and many more

Copy link
Member Author

@frrist frrist Sep 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can achieve this with a new event type and method, yea? Or could simply call EmitEvent which this method uses.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am trying to simplify processing by having a single self contained event. better to have this in a single event

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay I will create an event structure rather than use attributes.

@frrist frrist linked an issue Sep 16, 2024 that may be closed by this pull request
@frrist frrist marked this pull request as ready for review September 16, 2024 17:38
@frrist frrist requested a review from wdbaruni September 16, 2024 18:12
pkg/analytics/analytics.go Outdated Show resolved Hide resolved
pkg/analytics/analytics.go Outdated Show resolved Hide resolved
pkg/analytics/analytics.go Outdated Show resolved Hide resolved
pkg/node/node.go Outdated Show resolved Hide resolved
pkg/node/node.go Outdated Show resolved Hide resolved
pkg/analytics/analytics.go Outdated Show resolved Hide resolved
pkg/analytics/models.go Show resolved Hide resolved
pkg/analytics/models.go Outdated Show resolved Hide resolved
Copy link
Member

@wdbaruni wdbaruni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a bug with logging errors. you can merge after addressing it

Comment on lines 29 to 35
} else {
log.Err(err).Msg("Error occurred while handling spans")
}
default:
// fall through and log the error
}
log.Err(err).Msg("Error occurred while handling spans")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are still logging the error at the end and will log the error twice for non analytics errors

@frrist frrist self-assigned this Sep 17, 2024
@frrist frrist merged commit a912d5b into main Sep 17, 2024
3 of 4 checks passed
@frrist frrist deleted the frrist/analytics branch September 17, 2024 16:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Develop a framework for collecting various types of telemetry
2 participants