Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion ddtrace/ext/app_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ const (
// SpanTypeConsul marks a span as a Consul operation.
SpanTypeConsul = "consul"

// SpanTypeGraphql marks a span as a graphql operation.
// SpanTypeGraphQL marks a span as a graphql operation.
SpanTypeGraphQL = "graphql"

// SpanTypeLLM mas a span as an LLM operation.
SpanTypeLLM = "llm"
)
38 changes: 37 additions & 1 deletion ddtrace/tracer/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,48 @@ import (

"github.com/DataDog/dd-trace-go/v2/instrumentation/options"
"github.com/DataDog/dd-trace-go/v2/internal"
illmobs "github.com/DataDog/dd-trace-go/v2/internal/llmobs"
"github.com/DataDog/dd-trace-go/v2/internal/orchestrion"
)

// ContextWithSpan returns a copy of the given context which includes the span s.
func ContextWithSpan(ctx context.Context, s *Span) context.Context {
return orchestrion.CtxWithValue(ctx, internal.ActiveSpanKey, s)
newCtx := orchestrion.CtxWithValue(ctx, internal.ActiveSpanKey, s)
return contextWithPropagatedLLMSpan(newCtx, s)
}

func contextWithPropagatedLLMSpan(ctx context.Context, s *Span) context.Context {
if s == nil {
return ctx
}
// if there is a propagated llm span already just skip
if _, ok := illmobs.PropagatedLLMSpanFromContext(ctx); ok {
return ctx
}
newCtx := ctx

propagatedLLMObs := propagatedLLMSpanFromTags(s)
if propagatedLLMObs.SpanID != "" && propagatedLLMObs.TraceID != "" {
newCtx = illmobs.ContextWithPropagatedLLMSpan(newCtx, propagatedLLMObs)
}
return newCtx
}

func propagatedLLMSpanFromTags(s *Span) *illmobs.PropagatedLLMSpan {
propagatedLLMObs := &illmobs.PropagatedLLMSpan{}
if s.context == nil || s.context.trace == nil {
return propagatedLLMObs
}
if parentID := s.context.trace.propagatingTag(keyPropagatedLLMObsParentID); parentID != "" {
propagatedLLMObs.SpanID = parentID
}
if mlApp := s.context.trace.propagatingTag(keyPropagatedLLMObsMLAPP); mlApp != "" {
propagatedLLMObs.MLApp = mlApp
}
if trID := s.context.trace.propagatingTag(keyPropagatedLLMObsTraceID); trID != "" {
propagatedLLMObs.TraceID = trID
}
return propagatedLLMObs
}

// SpanFromContext returns the span contained in the given context. A second return
Expand Down
60 changes: 60 additions & 0 deletions ddtrace/tracer/llmobs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package tracer

import (
"context"
"strconv"

"github.com/DataDog/dd-trace-go/v2/internal/llmobs"
)

type llmobsTracerAdapter struct{}

func (l *llmobsTracerAdapter) StartSpan(ctx context.Context, name string, cfg llmobs.StartAPMSpanConfig) (llmobs.APMSpan, context.Context) {
opts := make([]StartSpanOption, 0)
if !cfg.StartTime.IsZero() {
opts = append(opts, StartTime(cfg.StartTime))
}
if cfg.SpanType != "" {
opts = append(opts, SpanType(cfg.SpanType))
}
span, ctx := StartSpanFromContext(ctx, name, opts...)
return &llmobsSpanAdapter{span}, ctx
}

type llmobsSpanAdapter struct {
span *Span
}

func (l *llmobsSpanAdapter) Finish(cfg llmobs.FinishAPMSpanConfig) {
opts := make([]FinishOption, 0)
if !cfg.FinishTime.IsZero() {
opts = append(opts, FinishTime(cfg.FinishTime))
}
if cfg.Error != nil {
opts = append(opts, WithError(cfg.Error))
}
l.span.Finish(opts...)
}

func (l *llmobsSpanAdapter) AddLink(link llmobs.SpanLink) {
l.span.AddLink(SpanLink{
TraceID: link.TraceID,
TraceIDHigh: link.TraceIDHigh,
SpanID: link.SpanID,
Attributes: link.Attributes,
Tracestate: link.Tracestate,
Flags: link.Flags,
})
}

func (l *llmobsSpanAdapter) SpanID() string {
return strconv.FormatUint(l.span.Context().SpanID(), 10)
}

func (l *llmobsSpanAdapter) TraceID() string {
return l.span.Context().TraceID()
}

func (l *llmobsSpanAdapter) SetBaggageItem(key string, value string) {
l.span.SetBaggageItem(key, value)
}
114 changes: 113 additions & 1 deletion ddtrace/tracer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
appsecconfig "github.com/DataDog/dd-trace-go/v2/internal/appsec/config"
"github.com/DataDog/dd-trace-go/v2/internal/civisibility/constants"
"github.com/DataDog/dd-trace-go/v2/internal/globalconfig"
llmobsconfig "github.com/DataDog/dd-trace-go/v2/internal/llmobs/config"
"github.com/DataDog/dd-trace-go/v2/internal/log"
"github.com/DataDog/dd-trace-go/v2/internal/namingschema"
"github.com/DataDog/dd-trace-go/v2/internal/normalizer"
Expand All @@ -45,6 +46,15 @@ import (
"github.com/DataDog/datadog-go/v5/statsd"
)

const (
envLLMObsEnabled = "DD_LLMOBS_ENABLED"
envLLMObsSampleRate = "DD_LLMOBS_SAMPLE_RATE"
envLLMObsMlApp = "DD_LLMOBS_ML_APP"
envLLMObsAgentlessEnabled = "DD_LLMOBS_AGENTLESS_ENABLED"
envLLMObsInstrumentedProxyUrls = "DD_LLMOBS_INSTRUMENTED_PROXY_URLS"
envLLMObsProjectName = "DD_LLMOBS_PROJECT_NAME"
)

var contribIntegrations = map[string]struct {
name string // user readable name for startup logs
imported bool // true if the user has imported the integration
Expand Down Expand Up @@ -113,7 +123,7 @@ var (
defaultStatsdPort = "8125"

// defaultMaxTagsHeaderLen specifies the default maximum length of the X-Datadog-Tags header value.
defaultMaxTagsHeaderLen = 128
defaultMaxTagsHeaderLen = 512

// defaultRateLimit specifies the default trace rate limit used when DD_TRACE_RATE_LIMIT is not set.
defaultRateLimit = 100.0
Expand Down Expand Up @@ -319,6 +329,9 @@ type config struct {

// traceProtocol specifies the trace protocol to use.
traceProtocol float64

// llmobs contains the LLM Observability config
llmobs llmobsconfig.Config
}

// orchestrionConfig contains Orchestrion configuration.
Expand Down Expand Up @@ -506,6 +519,16 @@ func newConfig(opts ...StartOption) (*config, error) {
internal.ForEachStringTag(v, internal.DDTagsDelimiter, func(key, val string) { c.peerServiceMappings[key] = val })
}
c.retryInterval = time.Millisecond

// LLM Observability config
c.llmobs = llmobsconfig.Config{
Enabled: internal.BoolEnv(envLLMObsEnabled, false),
SampleRate: internal.FloatEnv(envLLMObsSampleRate, 1.0),
MLApp: os.Getenv(envLLMObsMlApp),
AgentlessEnabled: llmobsAgentlessEnabledFromEnv(),
InstrumentedProxyURLs: llmobsInstrumentedProxyURLsFromEnv(),
ProjectName: os.Getenv(envLLMObsProjectName),
}
for _, fn := range opts {
if fn == nil {
continue
Expand Down Expand Up @@ -620,10 +643,58 @@ func newConfig(opts ...StartOption) (*config, error) {
if tracingEnabled, _, _ := stableconfig.Bool("DD_APM_TRACING_ENABLED", true); !tracingEnabled {
apmTracingDisabled(c)
}
// Update the llmobs config with stuff needed from the tracer.
c.llmobs.TracerConfig = llmobsconfig.TracerConfig{
DDTags: c.globalTags.get(),
Env: c.env,
Service: c.serviceName,
Version: c.version,
AgentURL: c.agentURL,
APIKey: os.Getenv("DD_API_KEY"),
APPKey: os.Getenv("DD_APP_KEY"),
HTTPClient: c.httpClient,
Site: os.Getenv("DD_SITE"),
SkipSSLVerify: internal.BoolEnv("DD_SKIP_SSL_VALIDATION", false),
}
c.llmobs.AgentFeatures = llmobsconfig.AgentFeatures{
EVPProxyV2: c.agent.evpProxyV2,
}

return c, nil
}

func llmobsAgentlessEnabledFromEnv() *bool {
v, ok := internal.BoolEnvNoDefault(envLLMObsAgentlessEnabled)
if !ok {
return nil
}
return &v
}

func llmobsInstrumentedProxyURLsFromEnv() []string {
v := os.Getenv(envLLMObsInstrumentedProxyUrls)
if v == "" {
return nil
}
seen := make(map[string]struct{})
out := make([]string, 0)
for _, part := range strings.Split(v, ",") {
s := strings.TrimSpace(part)
if s == "" {
continue
}
if _, ok := seen[s]; ok {
continue
}
seen[s] = struct{}{}
out = append(out, s)
}
if len(out) == 0 {
return nil
}
return out
}

func apmTracingDisabled(c *config) {
// Enable tracing as transport layer mode
// This means to stop sending trace metrics, send one trace per minute and those force-kept by other products
Expand Down Expand Up @@ -766,6 +837,9 @@ type agentFeatures struct {

// spanEvents reports whether the trace-agent can receive spans with the `span_events` field.
spanEventsAvailable bool

// evpProxyV2 reports if the trace-agent can receive payloads on the /evp_proxy/v2 endpoint.
evpProxyV2 bool
}

// HasFlag reports whether the agent has set the feat feature flag.
Expand Down Expand Up @@ -822,6 +896,8 @@ func loadAgentFeatures(agentDisabled bool, agentURL *url.URL, httpClient *http.C
switch endpoint {
case "/v0.6/stats":
features.Stats = true
case "/evp_proxy/v2/":
features.evpProxyV2 = true
}
}
features.featureFlags = make(map[string]struct{}, len(info.FeatureFlags))
Expand Down Expand Up @@ -1466,6 +1542,42 @@ func WithTestDefaults(statsdClient any) StartOption {
}
}

func WithLLMObsEnabled(enabled bool) StartOption {
return func(c *config) {
c.llmobs.Enabled = enabled
}
}

func WithLLMObsMLApp(mlApp string) StartOption {
return func(c *config) {
c.llmobs.MLApp = mlApp
}
}

func WithLLMObsProjectName(projectName string) StartOption {
return func(c *config) {
c.llmobs.ProjectName = projectName
}
}

func WithLLMObsSampleRate(sampleRate float64) StartOption {
return func(c *config) {
c.llmobs.SampleRate = sampleRate
}
}

func WithLLMObsAgentlessEnabled(agentlessEnabled bool) StartOption {
return func(c *config) {
c.llmobs.AgentlessEnabled = &agentlessEnabled
}
}

func WithLLMObsInstrumentedProxyURLs(instrumentedProxyURLs []string) StartOption {
return func(c *config) {
c.llmobs.InstrumentedProxyURLs = instrumentedProxyURLs
}
}

// Mock Transport with a real Encoder
type dummyTransport struct {
sync.RWMutex
Expand Down
15 changes: 15 additions & 0 deletions ddtrace/tracer/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/DataDog/dd-trace-go/v2/instrumentation/errortrace"
sharedinternal "github.com/DataDog/dd-trace-go/v2/internal"
"github.com/DataDog/dd-trace-go/v2/internal/globalconfig"
illmobs "github.com/DataDog/dd-trace-go/v2/internal/llmobs"
"github.com/DataDog/dd-trace-go/v2/internal/log"
"github.com/DataDog/dd-trace-go/v2/internal/orchestrion"
"github.com/DataDog/dd-trace-go/v2/internal/samplernames"
Expand Down Expand Up @@ -928,6 +929,14 @@ func (s *Span) AddEvent(name string, opts ...SpanEventOption) {
s.spanEvents = append(s.spanEvents, event)
}

func setLLMObsPropagatingTags(ctx context.Context, spanCtx *SpanContext) {
if llmSpan, ok := illmobs.ActiveLLMSpanFromContext(ctx); ok {
spanCtx.trace.setPropagatingTag(keyPropagatedLLMObsParentID, llmSpan.SpanID())
spanCtx.trace.setPropagatingTag(keyPropagatedLLMObsTraceID, llmSpan.LLMTraceID())
spanCtx.trace.setPropagatingTag(keyPropagatedLLMObsMLAPP, llmSpan.MLApp())
}
}

// used in internal/civisibility/integrations/manual_api_common.go using linkname
func getMeta(s *Span, key string) (string, bool) {
s.mu.RLock()
Expand Down Expand Up @@ -986,6 +995,12 @@ const (
keyBaseService = "_dd.base_service"
// keyProcessTags contains a list of process tags to identify the service.
keyProcessTags = "_dd.tags.process"
// keyPropagatedLLMObsParentID contains the propagated llmobs span ID.
keyPropagatedLLMObsParentID = "_dd.p.llmobs_parent_id"
// keyPropagatedLLMObsMLAPP contains the propagated ML App.
keyPropagatedLLMObsMLAPP = "_dd.p.llmobs_ml_app"
// keyPropagatedLLMObsTraceID contains the propagated llmobs trace ID.
keyPropagatedLLMObsTraceID = "_dd.p.llmobs_trace_id"
)

// The following set of tags is used for user monitoring and set through calls to span.SetUser().
Expand Down
2 changes: 1 addition & 1 deletion ddtrace/tracer/textmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ func (p *propagator) marshalPropagatingTags(ctx *SpanContext) string {
}
if tagLen := sb.Len() + len(k) + len(v); tagLen > p.cfg.MaxTagsHeaderLen {
sb.Reset()
log.Warn("Won't propagate tag: length is (%d) which exceeds the maximum len of (%d).", tagLen, p.cfg.MaxTagsHeaderLen)
log.Warn("Won't propagate tag %q: %q length is (%d) which exceeds the maximum len of (%d).", k, v, tagLen, p.cfg.MaxTagsHeaderLen)
properr = "inject_max_size"
return false
}
Expand Down
12 changes: 12 additions & 0 deletions ddtrace/tracer/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
appsecConfig "github.com/DataDog/dd-trace-go/v2/internal/appsec/config"
"github.com/DataDog/dd-trace-go/v2/internal/datastreams"
"github.com/DataDog/dd-trace-go/v2/internal/globalconfig"
"github.com/DataDog/dd-trace-go/v2/internal/llmobs"
"github.com/DataDog/dd-trace-go/v2/internal/log"
"github.com/DataDog/dd-trace-go/v2/internal/remoteconfig"
"github.com/DataDog/dd-trace-go/v2/internal/samplernames"
Expand Down Expand Up @@ -254,6 +255,12 @@ func Start(opts ...StartOption) error {

appsec.Start(appsecopts...)

if t.config.llmobs.Enabled {
if err := llmobs.Start(t.config.llmobs, &llmobsTracerAdapter{}); err != nil {
return fmt.Errorf("failed to start llmobs: %w", err)
}
}

// start instrumentation telemetry unless it is disabled through the
// DD_INSTRUMENTATION_TELEMETRY_ENABLED env var
t.telemetry = startTelemetry(t.config)
Expand Down Expand Up @@ -294,6 +301,7 @@ func Stop() {
startStopMu.Lock()
defer startStopMu.Unlock()

llmobs.Stop()
setGlobalTracer(&NoopTracer{})
globalinternal.SetTracerInitialized(false)
log.Flush()
Expand Down Expand Up @@ -489,6 +497,7 @@ func Flush() {
if t := getGlobalTracer(); t != nil {
t.Flush()
}
llmobs.Flush()
}

// Flush triggers a flush and waits for it to complete.
Expand Down Expand Up @@ -691,6 +700,9 @@ func spanStart(operationName string, options ...StartSpanOption) *Span {

}
span.context = newSpanContext(span, context)
if pprofContext != nil {
setLLMObsPropagatingTags(pprofContext, span.context)
}
span.setMeta("language", "go")
// add tags from options
for k, v := range opts.Tags {
Expand Down
Loading
Loading