Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added LogProcessors and implemented shutdown and forceFlush for LoggerProviders #132

Merged
merged 4 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/hs-opentelemetry-api.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ library
OpenTelemetry.Internal.Trace.Id
OpenTelemetry.LogAttributes
OpenTelemetry.Logging.Core
OpenTelemetry.LogRecordProcessor
OpenTelemetry.Processor
OpenTelemetry.Propagator
OpenTelemetry.Resource
Expand Down
18 changes: 18 additions & 0 deletions api/src/OpenTelemetry/Internal/Common/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ module OpenTelemetry.Internal.Common.Types (
InstrumentationLibrary (..),
AnyValue (..),
ToValue (..),
ShutdownResult (..),
FlushResult (..),
) where

import Data.ByteString (ByteString)
Expand Down Expand Up @@ -153,3 +155,19 @@ instance (ToValue a) => ToValue (H.HashMap Text a) where
instance ToValue AnyValue where
toValue :: AnyValue -> AnyValue
toValue = id


data ShutdownResult = ShutdownSuccess | ShutdownFailure | ShutdownTimeout


-- | The outcome of a call to @OpenTelemetry.Trace.forceFlush@ or @OpenTelemetry.Logging.forceFlush@
data FlushResult
= -- | One or more spans or @LogRecord@s did not export from all associated exporters
-- within the alotted timeframe.
FlushTimeout
| -- | Flushing spans or @LogRecord@s to all associated exporters succeeded.
FlushSuccess
| -- | One or more exporters failed to successfully export one or more
-- unexported spans or @LogRecord@s.
FlushError
deriving (Show)
82 changes: 72 additions & 10 deletions api/src/OpenTelemetry/Internal/Logging/Core.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE TypeApplications #-}

module OpenTelemetry.Internal.Logging.Core (
Expand All @@ -7,6 +8,8 @@ module OpenTelemetry.Internal.Logging.Core (
createLoggerProvider,
setGlobalLoggerProvider,
getGlobalLoggerProvider,
shutdownLoggerProvider,
forceFlushLoggerProvider,
makeLogger,
emitLogRecord,
addAttribute,
Expand All @@ -17,7 +20,8 @@ module OpenTelemetry.Internal.Logging.Core (
) where

import Control.Applicative
import Control.Monad (void, when)
import Control.Concurrent.Async
import Control.Monad
import Control.Monad.Trans
import Control.Monad.Trans.Maybe
import Data.Coerce
Expand All @@ -27,6 +31,7 @@ import Data.IORef
import Data.Maybe
import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Vector as V
import Data.Version (showVersion)
import GHC.IO (unsafePerformIO)
import qualified OpenTelemetry.Attributes as A
Expand All @@ -41,6 +46,7 @@ import qualified OpenTelemetry.LogAttributes as LA
import OpenTelemetry.Resource (MaterializedResources, emptyMaterializedResources)
import Paths_hs_opentelemetry_api (version)
import System.Clock
import System.Timeout (timeout)


getCurrentTimestamp :: (MonadIO m) => m Timestamp
Expand All @@ -55,7 +61,7 @@ data LoggerProviderOptions = LoggerProviderOptions

{- | Options for creating a @LoggerProvider@ with no resources and default limits.

In effect, logging is a no-op when using this configuration.
In effect, logging is a no-op when using this configuration and no-op Processors.
-}
emptyLoggerProviderOptions :: LoggerProviderOptions
emptyLoggerProviderOptions =
Expand All @@ -69,16 +75,22 @@ emptyLoggerProviderOptions =

You should generally use @getGlobalLoggerProvider@ for most applications.
-}
createLoggerProvider :: LoggerProviderOptions -> LoggerProvider
createLoggerProvider LoggerProviderOptions {..} =
createLoggerProvider :: [LogRecordProcessor] -> LoggerProviderOptions -> LoggerProvider
createLoggerProvider ps LoggerProviderOptions {..} =
LoggerProvider
{ loggerProviderResource = loggerProviderOptionsResource
{ loggerProviderProcessors = V.fromList ps
, loggerProviderResource = loggerProviderOptionsResource
, loggerProviderAttributeLimits = loggerProviderOptionsAttributeLimits
}


-- | Logging is no-op when using this @LoggerProvider@ because it has no processors and empty options.
noOpLoggerProvider :: LoggerProvider
noOpLoggerProvider = createLoggerProvider [] emptyLoggerProviderOptions


globalLoggerProvider :: IORef LoggerProvider
globalLoggerProvider = unsafePerformIO $ newIORef $ createLoggerProvider emptyLoggerProviderOptions
globalLoggerProvider = unsafePerformIO $ newIORef noOpLoggerProvider
{-# NOINLINE globalLoggerProvider #-}


Expand All @@ -96,13 +108,53 @@ setGlobalLoggerProvider :: (MonadIO m) => LoggerProvider -> m ()
setGlobalLoggerProvider = liftIO . writeIORef globalLoggerProvider


{- | This method provides a way for provider to do any cleanup required.

This will also trigger shutdowns on all internal processors.
-}
shutdownLoggerProvider :: (MonadIO m) => LoggerProvider -> m ()
shutdownLoggerProvider LoggerProvider {loggerProviderProcessors} = liftIO $ do
asyncShutdownResults <- V.forM loggerProviderProcessors $ \processor -> do
logRecordProcessorShutdown processor
mapM_ wait asyncShutdownResults


{- | This method provides a way for provider to immediately export all @LogRecord@s that have not yet
been exported for all the internal processors.
-}
forceFlushLoggerProvider
:: (MonadIO m)
=> LoggerProvider
-> Maybe Int
-- ^ Optional timeout in microseconds, defaults to 5,000,000 (5s)
-> m FlushResult
-- ^ Result that denotes whether the flush action succeeded, failed, or timed out.
forceFlushLoggerProvider LoggerProvider {loggerProviderProcessors} mtimeout = liftIO $ do
jobs <- V.forM loggerProviderProcessors $ \processor -> async $ do
logRecordProcessorForceFlush processor
mresult <-
timeout (fromMaybe 5_000_000 mtimeout) $
V.foldM
( \status action -> do
res <- waitCatch action
pure $! case res of
Left _err -> FlushError
Right _ok -> status
)
FlushSuccess
jobs
case mresult of
Nothing -> pure FlushTimeout
Just res -> pure res


makeLogger
:: LoggerProvider
-- ^ The @LoggerProvider@ holds the configuration for the @Logger@.
-> InstrumentationLibrary
-- ^ The library that the @Logger@ instruments. This uniquely identifies the @Logger@.
-> Logger
makeLogger loggerProvider loggerInstrumentationScope = Logger {..}
makeLogger loggerLoggerProvider loggerInstrumentationScope = Logger {..}


createImmutableLogRecord
Expand Down Expand Up @@ -166,18 +218,28 @@ emitOTelLogRecord attrs severity bodyText = do
}


{- | Emits a LogRecord with properties specified by the passed in Logger and LogRecordArguments.
{- | Emits a @LogRecord@ with properties specified by the passed in Logger and LogRecordArguments.
If observedTimestamp is not set in LogRecordArguments, it will default to the current timestamp.
If context is not specified in LogRecordArguments it will default to the current context.

The emitted @LogRecord@ will be passed to any @LogRecordProcessor@s registered on the @LoggerProvider@
that created the @Logger@.
-}
emitLogRecord
:: (MonadIO m)
=> Logger
-> LogRecordArguments
-> m ReadWriteLogRecord
emitLogRecord l args = do
ilr <- createImmutableLogRecord (loggerProviderAttributeLimits $ loggerProvider l) args
liftIO $ mkReadWriteLogRecord l ilr
let LoggerProvider {loggerProviderProcessors, loggerProviderAttributeLimits} = loggerLoggerProvider l

ilr <- createImmutableLogRecord loggerProviderAttributeLimits args
lr <- liftIO $ mkReadWriteLogRecord l ilr

ctxt <- getContext
mapM_ (\processor -> liftIO $ logRecordProcessorOnEmit processor lr ctxt) loggerProviderProcessors

pure lr


{- | Add an attribute to a @LogRecord@.
Expand Down
48 changes: 42 additions & 6 deletions api/src/OpenTelemetry/Internal/Logging/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
{-# LANGUAGE NamedFieldPuns #-}

module OpenTelemetry.Internal.Logging.Types (
LogRecordProcessor (..),
LoggerProvider (..),
Logger (..),
ReadWriteLogRecord,
Expand All @@ -18,28 +19,63 @@ module OpenTelemetry.Internal.Logging.Types (
toShortName,
) where

import Control.Concurrent.Async
import Data.Function (on)
import qualified Data.HashMap.Strict as H
import Data.IORef (IORef, atomicModifyIORef, modifyIORef, newIORef, readIORef)
import Data.Text (Text)
import Data.Vector (Vector)
import OpenTelemetry.Common (Timestamp, TraceFlags)
import OpenTelemetry.Context.Types (Context)
import OpenTelemetry.Internal.Common.Types (InstrumentationLibrary)
import OpenTelemetry.Internal.Common.Types (InstrumentationLibrary, ShutdownResult)
import OpenTelemetry.Internal.Trace.Id (SpanId, TraceId)
import OpenTelemetry.LogAttributes
import OpenTelemetry.Resource (MaterializedResources)


data LogRecordProcessor = LogRecordProcessor
{ logRecordProcessorOnEmit :: ReadWriteLogRecord -> Context -> IO ()
-- ^ Called when a LogRecord is emitted. This method is called synchronously on the thread that emitted the LogRecord, therefore it SHOULD NOT block or throw exceptions.
--
-- A LogRecordProcessor may freely modify logRecord for the duration of the OnEmit call. If logRecord is needed after OnEmit returns (i.e. for asynchronous processing) only reads are permitted.
, logRecordProcessorShutdown :: IO (Async ShutdownResult)
-- ^ Shuts down the processor. Called when SDK is shut down. This is an opportunity for processor to do any cleanup required.
--
-- Shutdown SHOULD be called only once for each LogRecordProcessor instance. After the call to Shutdown, subsequent calls to OnEmit are not allowed. SDKs SHOULD ignore these calls gracefully, if possible.
--
-- Shutdown SHOULD provide a way to let the caller know whether it succeeded, failed or timed out.
--
-- Shutdown MUST include the effects of ForceFlush.
--
-- Shutdown SHOULD complete or abort within some timeout. Shutdown can be implemented as a blocking API or an asynchronous API which notifies the caller via a callback or an event.
-- OpenTelemetry SDK authors can decide if they want to make the shutdown timeout configurable.
, logRecordProcessorForceFlush :: IO ()
-- ^ This is a hint to ensure that any tasks associated with LogRecords for which the LogRecordProcessor had already received events prior to the call to ForceFlush SHOULD be completed
-- as soon as possible, preferably before returning from this method.
--
-- In particular, if any LogRecordProcessor has any associated exporter, it SHOULD try to call the exporter’s Export with all LogRecords for which this was not already done and then invoke ForceFlush on it.
-- The built-in LogRecordProcessors MUST do so. If a timeout is specified (see below), the LogRecordProcessor MUST prioritize honoring the timeout over finishing all calls. It MAY skip or abort some or all
-- Export or ForceFlush calls it has made to achieve this goal.
--
-- ForceFlush SHOULD provide a way to let the caller know whether it succeeded, failed or timed out.
--
-- ForceFlush SHOULD only be called in cases where it is absolutely necessary, such as when using some FaaS providers that may suspend the process after an invocation, but before the LogRecordProcessor exports the emitted LogRecords.
--
-- ForceFlush SHOULD complete or abort within some timeout. ForceFlush can be implemented as a blocking API or an asynchronous API which notifies the caller via a callback or an event. OpenTelemetry SDK authors
-- can decide if they want to make the flush timeout configurable.
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This type is similar to Processor defined in OpenTelemetry.Internal.Trace.Types. Notable differences are the type variable body because LogRecord needs it, and changing onStart and onEnd to onEmit in accordance with the spec. Maybe Processor should be renamed to SpanProcessor because there is more than one type of Processor now?



-- | @Logger@s can be created from @LoggerProvider@s
data LoggerProvider = LoggerProvider
{ loggerProviderResource :: MaterializedResources
{ loggerProviderProcessors :: Vector LogRecordProcessor
, loggerProviderResource :: MaterializedResources
-- ^ Describes the source of the log, aka resource. Multiple occurrences of events coming from the same event source can happen across time and they all have the same value of Resource.
-- Can contain for example information about the application that emits the record or about the infrastructure where the application runs. Data formats that represent this data model
-- may be designed in a manner that allows the Resource field to be recorded only once per batch of log records that come from the same source. SHOULD follow OpenTelemetry semantic conventions for Resources.
-- This field is optional.
, loggerProviderAttributeLimits :: AttributeLimits
}
deriving (Show, Eq)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cannot derive Show and Eq because Show and Eq instances do not make sense for LogProcessors because they contain functions.



{- | @LogRecords@ can be created from @Loggers@. @Logger@s are uniquely identified by the @libraryName@, @libraryVersion@, @schemaUrl@ fields of @InstrumentationLibrary@.
Expand All @@ -48,7 +84,7 @@ Creating two @Logger@s with the same identity but different @libraryAttributes@
data Logger = Logger
{ loggerInstrumentationScope :: InstrumentationLibrary
-- ^ Details about the library that the @Logger@ instruments.
, loggerProvider :: LoggerProvider
, loggerLoggerProvider :: LoggerProvider
-- ^ The @LoggerProvider@ that created this @Logger@. All configuration for the @Logger@ is contained in the @LoggerProvider@.
}

Expand Down Expand Up @@ -128,11 +164,11 @@ instance IsReadableLogRecord ReadableLogRecord where
instance IsReadableLogRecord ReadWriteLogRecord where
readLogRecord (ReadWriteLogRecord _ ref) = readIORef ref
readLogRecordInstrumentationScope (ReadWriteLogRecord (Logger {loggerInstrumentationScope}) _) = loggerInstrumentationScope
readLogRecordResource (ReadWriteLogRecord Logger {loggerProvider = LoggerProvider {loggerProviderResource}} _) = loggerProviderResource
readLogRecordResource (ReadWriteLogRecord Logger {loggerLoggerProvider = LoggerProvider {loggerProviderResource}} _) = loggerProviderResource


instance IsReadWriteLogRecord ReadWriteLogRecord where
readLogRecordAttributeLimits (ReadWriteLogRecord Logger {loggerProvider = LoggerProvider {loggerProviderAttributeLimits}} _) = loggerProviderAttributeLimits
readLogRecordAttributeLimits (ReadWriteLogRecord Logger {loggerLoggerProvider = LoggerProvider {loggerProviderAttributeLimits}} _) = loggerProviderAttributeLimits
modifyLogRecord (ReadWriteLogRecord _ ref) = modifyIORef ref
atomicModifyLogRecord (ReadWriteLogRecord _ ref) = atomicModifyIORef ref

Expand Down
16 changes: 0 additions & 16 deletions api/src/OpenTelemetry/Internal/Trace/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@ data Exporter a = Exporter
}


data ShutdownResult = ShutdownSuccess | ShutdownFailure | ShutdownTimeout


data Processor = Processor
{ processorOnStart :: IORef ImmutableSpan -> Context -> IO ()
-- ^ Called when a span is started. This method is called synchronously on the thread that started the span, therefore it should not block or throw exceptions.
Expand Down Expand Up @@ -186,19 +183,6 @@ data SpanArguments = SpanArguments
}


-- | The outcome of a call to 'OpenTelemetry.Trace.forceFlush'
data FlushResult
= -- | One or more spans did not export from all associated exporters
-- within the alotted timeframe.
FlushTimeout
| -- | Flushing spans to all associated exporters succeeded.
FlushSuccess
| -- | One or more exporters failed to successfully export one or more
-- unexported spans.
FlushError
deriving (Show)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to Common



{- |
@SpanKind@ describes the relationship between the @Span@, its parents, and its children in a Trace. @SpanKind@ describes two independent properties that benefit tracing systems during analysis.

Expand Down
19 changes: 19 additions & 0 deletions api/src/OpenTelemetry/LogRecordProcessor.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{- |
@LogRecordProcessor@ is an interface which allows hooks for @LogRecord@ emit method invocations.

Built-in log processors are responsible for batching and conversion of spans to exportable representation and passing batches to exporters.

Log processors can be registered directly on SDK LoggerProvider and they are invoked in the same order as they were registered.

Each processor registered on LoggerProvider is a start of pipeline that consist of log processor and optional exporter. SDK MUST allow to end each pipeline with individual exporter.

SDK MUST allow users to implement and configure custom processors and decorate built-in processors for advanced scenarios such as tagging or filtering.
-}
module OpenTelemetry.LogRecordProcessor (
LogRecordProcessor (..),
ShutdownResult (..),
) where

import OpenTelemetry.Internal.Common.Types
import OpenTelemetry.Internal.Logging.Types

2 changes: 2 additions & 0 deletions api/src/OpenTelemetry/Logging/Core.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ module OpenTelemetry.Logging.Core (
createLoggerProvider,
setGlobalLoggerProvider,
getGlobalLoggerProvider,
shutdownLoggerProvider,
forceFlushLoggerProvider,

-- * @Logger@ operations
InstrumentationLibrary (..),
Expand Down
1 change: 1 addition & 0 deletions api/src/OpenTelemetry/Processor.hs
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@ module OpenTelemetry.Processor (
ShutdownResult (..),
) where

import OpenTelemetry.Internal.Common.Types
import OpenTelemetry.Internal.Trace.Types

Loading