diff --git a/api/hs-opentelemetry-api.cabal b/api/hs-opentelemetry-api.cabal index 408a87ef..61e13cbd 100644 --- a/api/hs-opentelemetry-api.cabal +++ b/api/hs-opentelemetry-api.cabal @@ -41,6 +41,7 @@ library OpenTelemetry.Internal.Trace.Id OpenTelemetry.LogAttributes OpenTelemetry.Logging.Core + OpenTelemetry.LogRecordProcessor OpenTelemetry.Processor OpenTelemetry.Propagator OpenTelemetry.Resource diff --git a/api/src/OpenTelemetry/Internal/Common/Types.hs b/api/src/OpenTelemetry/Internal/Common/Types.hs index 0d80f8c7..f736460d 100644 --- a/api/src/OpenTelemetry/Internal/Common/Types.hs +++ b/api/src/OpenTelemetry/Internal/Common/Types.hs @@ -9,6 +9,8 @@ module OpenTelemetry.Internal.Common.Types ( InstrumentationLibrary (..), AnyValue (..), ToValue (..), + ShutdownResult (..), + FlushResult (..), ) where import Data.ByteString (ByteString) @@ -153,3 +155,19 @@ instance (ToValue a) => ToValue (H.HashMap Text a) where instance ToValue AnyValue where toValue :: AnyValue -> AnyValue toValue = id + + +data ShutdownResult = ShutdownSuccess | ShutdownFailure | ShutdownTimeout + + +-- | The outcome of a call to @OpenTelemetry.Trace.forceFlush@ or @OpenTelemetry.Logging.forceFlush@ +data FlushResult + = -- | One or more spans or @LogRecord@s did not export from all associated exporters + -- within the alotted timeframe. + FlushTimeout + | -- | Flushing spans or @LogRecord@s to all associated exporters succeeded. + FlushSuccess + | -- | One or more exporters failed to successfully export one or more + -- unexported spans or @LogRecord@s. + FlushError + deriving (Show) diff --git a/api/src/OpenTelemetry/Internal/Logging/Core.hs b/api/src/OpenTelemetry/Internal/Logging/Core.hs index 181a3818..06dff4f1 100644 --- a/api/src/OpenTelemetry/Internal/Logging/Core.hs +++ b/api/src/OpenTelemetry/Internal/Logging/Core.hs @@ -1,4 +1,5 @@ {-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE NumericUnderscores #-} {-# LANGUAGE TypeApplications #-} module OpenTelemetry.Internal.Logging.Core ( @@ -7,6 +8,8 @@ module OpenTelemetry.Internal.Logging.Core ( createLoggerProvider, setGlobalLoggerProvider, getGlobalLoggerProvider, + shutdownLoggerProvider, + forceFlushLoggerProvider, makeLogger, emitLogRecord, addAttribute, @@ -17,7 +20,8 @@ module OpenTelemetry.Internal.Logging.Core ( ) where import Control.Applicative -import Control.Monad (void, when) +import Control.Concurrent.Async +import Control.Monad import Control.Monad.Trans import Control.Monad.Trans.Maybe import Data.Coerce @@ -27,6 +31,7 @@ import Data.IORef import Data.Maybe import Data.Text (Text) import qualified Data.Text as T +import qualified Data.Vector as V import Data.Version (showVersion) import GHC.IO (unsafePerformIO) import qualified OpenTelemetry.Attributes as A @@ -41,6 +46,7 @@ import qualified OpenTelemetry.LogAttributes as LA import OpenTelemetry.Resource (MaterializedResources, emptyMaterializedResources) import Paths_hs_opentelemetry_api (version) import System.Clock +import System.Timeout (timeout) getCurrentTimestamp :: (MonadIO m) => m Timestamp @@ -55,7 +61,7 @@ data LoggerProviderOptions = LoggerProviderOptions {- | Options for creating a @LoggerProvider@ with no resources and default limits. - In effect, logging is a no-op when using this configuration. + In effect, logging is a no-op when using this configuration and no-op Processors. -} emptyLoggerProviderOptions :: LoggerProviderOptions emptyLoggerProviderOptions = @@ -69,16 +75,22 @@ emptyLoggerProviderOptions = You should generally use @getGlobalLoggerProvider@ for most applications. -} -createLoggerProvider :: LoggerProviderOptions -> LoggerProvider -createLoggerProvider LoggerProviderOptions {..} = +createLoggerProvider :: [LogRecordProcessor] -> LoggerProviderOptions -> LoggerProvider +createLoggerProvider ps LoggerProviderOptions {..} = LoggerProvider - { loggerProviderResource = loggerProviderOptionsResource + { loggerProviderProcessors = V.fromList ps + , loggerProviderResource = loggerProviderOptionsResource , loggerProviderAttributeLimits = loggerProviderOptionsAttributeLimits } +-- | Logging is no-op when using this @LoggerProvider@ because it has no processors and empty options. +noOpLoggerProvider :: LoggerProvider +noOpLoggerProvider = createLoggerProvider [] emptyLoggerProviderOptions + + globalLoggerProvider :: IORef LoggerProvider -globalLoggerProvider = unsafePerformIO $ newIORef $ createLoggerProvider emptyLoggerProviderOptions +globalLoggerProvider = unsafePerformIO $ newIORef noOpLoggerProvider {-# NOINLINE globalLoggerProvider #-} @@ -96,13 +108,53 @@ setGlobalLoggerProvider :: (MonadIO m) => LoggerProvider -> m () setGlobalLoggerProvider = liftIO . writeIORef globalLoggerProvider +{- | This method provides a way for provider to do any cleanup required. + + This will also trigger shutdowns on all internal processors. +-} +shutdownLoggerProvider :: (MonadIO m) => LoggerProvider -> m () +shutdownLoggerProvider LoggerProvider {loggerProviderProcessors} = liftIO $ do + asyncShutdownResults <- V.forM loggerProviderProcessors $ \processor -> do + logRecordProcessorShutdown processor + mapM_ wait asyncShutdownResults + + +{- | This method provides a way for provider to immediately export all @LogRecord@s that have not yet + been exported for all the internal processors. +-} +forceFlushLoggerProvider + :: (MonadIO m) + => LoggerProvider + -> Maybe Int + -- ^ Optional timeout in microseconds, defaults to 5,000,000 (5s) + -> m FlushResult + -- ^ Result that denotes whether the flush action succeeded, failed, or timed out. +forceFlushLoggerProvider LoggerProvider {loggerProviderProcessors} mtimeout = liftIO $ do + jobs <- V.forM loggerProviderProcessors $ \processor -> async $ do + logRecordProcessorForceFlush processor + mresult <- + timeout (fromMaybe 5_000_000 mtimeout) $ + V.foldM + ( \status action -> do + res <- waitCatch action + pure $! case res of + Left _err -> FlushError + Right _ok -> status + ) + FlushSuccess + jobs + case mresult of + Nothing -> pure FlushTimeout + Just res -> pure res + + makeLogger :: LoggerProvider -- ^ The @LoggerProvider@ holds the configuration for the @Logger@. -> InstrumentationLibrary -- ^ The library that the @Logger@ instruments. This uniquely identifies the @Logger@. -> Logger -makeLogger loggerProvider loggerInstrumentationScope = Logger {..} +makeLogger loggerLoggerProvider loggerInstrumentationScope = Logger {..} createImmutableLogRecord @@ -166,9 +218,12 @@ emitOTelLogRecord attrs severity bodyText = do } -{- | Emits a LogRecord with properties specified by the passed in Logger and LogRecordArguments. +{- | Emits a @LogRecord@ with properties specified by the passed in Logger and LogRecordArguments. If observedTimestamp is not set in LogRecordArguments, it will default to the current timestamp. If context is not specified in LogRecordArguments it will default to the current context. + +The emitted @LogRecord@ will be passed to any @LogRecordProcessor@s registered on the @LoggerProvider@ +that created the @Logger@. -} emitLogRecord :: (MonadIO m) @@ -176,8 +231,15 @@ emitLogRecord -> LogRecordArguments -> m ReadWriteLogRecord emitLogRecord l args = do - ilr <- createImmutableLogRecord (loggerProviderAttributeLimits $ loggerProvider l) args - liftIO $ mkReadWriteLogRecord l ilr + let LoggerProvider {loggerProviderProcessors, loggerProviderAttributeLimits} = loggerLoggerProvider l + + ilr <- createImmutableLogRecord loggerProviderAttributeLimits args + lr <- liftIO $ mkReadWriteLogRecord l ilr + + ctxt <- getContext + mapM_ (\processor -> liftIO $ logRecordProcessorOnEmit processor lr ctxt) loggerProviderProcessors + + pure lr {- | Add an attribute to a @LogRecord@. diff --git a/api/src/OpenTelemetry/Internal/Logging/Types.hs b/api/src/OpenTelemetry/Internal/Logging/Types.hs index 09661e3e..d7037b6f 100644 --- a/api/src/OpenTelemetry/Internal/Logging/Types.hs +++ b/api/src/OpenTelemetry/Internal/Logging/Types.hs @@ -3,6 +3,7 @@ {-# LANGUAGE NamedFieldPuns #-} module OpenTelemetry.Internal.Logging.Types ( + LogRecordProcessor (..), LoggerProvider (..), Logger (..), ReadWriteLogRecord, @@ -18,28 +19,63 @@ module OpenTelemetry.Internal.Logging.Types ( toShortName, ) where +import Control.Concurrent.Async import Data.Function (on) import qualified Data.HashMap.Strict as H import Data.IORef (IORef, atomicModifyIORef, modifyIORef, newIORef, readIORef) import Data.Text (Text) +import Data.Vector (Vector) import OpenTelemetry.Common (Timestamp, TraceFlags) import OpenTelemetry.Context.Types (Context) -import OpenTelemetry.Internal.Common.Types (InstrumentationLibrary) +import OpenTelemetry.Internal.Common.Types (InstrumentationLibrary, ShutdownResult) import OpenTelemetry.Internal.Trace.Id (SpanId, TraceId) import OpenTelemetry.LogAttributes import OpenTelemetry.Resource (MaterializedResources) +data LogRecordProcessor = LogRecordProcessor + { logRecordProcessorOnEmit :: ReadWriteLogRecord -> Context -> IO () + -- ^ Called when a LogRecord is emitted. This method is called synchronously on the thread that emitted the LogRecord, therefore it SHOULD NOT block or throw exceptions. + -- + -- A LogRecordProcessor may freely modify logRecord for the duration of the OnEmit call. If logRecord is needed after OnEmit returns (i.e. for asynchronous processing) only reads are permitted. + , logRecordProcessorShutdown :: IO (Async ShutdownResult) + -- ^ Shuts down the processor. Called when SDK is shut down. This is an opportunity for processor to do any cleanup required. + -- + -- Shutdown SHOULD be called only once for each LogRecordProcessor instance. After the call to Shutdown, subsequent calls to OnEmit are not allowed. SDKs SHOULD ignore these calls gracefully, if possible. + -- + -- Shutdown SHOULD provide a way to let the caller know whether it succeeded, failed or timed out. + -- + -- Shutdown MUST include the effects of ForceFlush. + -- + -- Shutdown SHOULD complete or abort within some timeout. Shutdown can be implemented as a blocking API or an asynchronous API which notifies the caller via a callback or an event. + -- OpenTelemetry SDK authors can decide if they want to make the shutdown timeout configurable. + , logRecordProcessorForceFlush :: IO () + -- ^ This is a hint to ensure that any tasks associated with LogRecords for which the LogRecordProcessor had already received events prior to the call to ForceFlush SHOULD be completed + -- as soon as possible, preferably before returning from this method. + -- + -- In particular, if any LogRecordProcessor has any associated exporter, it SHOULD try to call the exporter’s Export with all LogRecords for which this was not already done and then invoke ForceFlush on it. + -- The built-in LogRecordProcessors MUST do so. If a timeout is specified (see below), the LogRecordProcessor MUST prioritize honoring the timeout over finishing all calls. It MAY skip or abort some or all + -- Export or ForceFlush calls it has made to achieve this goal. + -- + -- ForceFlush SHOULD provide a way to let the caller know whether it succeeded, failed or timed out. + -- + -- ForceFlush SHOULD only be called in cases where it is absolutely necessary, such as when using some FaaS providers that may suspend the process after an invocation, but before the LogRecordProcessor exports the emitted LogRecords. + -- + -- ForceFlush SHOULD complete or abort within some timeout. ForceFlush can be implemented as a blocking API or an asynchronous API which notifies the caller via a callback or an event. OpenTelemetry SDK authors + -- can decide if they want to make the flush timeout configurable. + } + + -- | @Logger@s can be created from @LoggerProvider@s data LoggerProvider = LoggerProvider - { loggerProviderResource :: MaterializedResources + { loggerProviderProcessors :: Vector LogRecordProcessor + , loggerProviderResource :: MaterializedResources -- ^ Describes the source of the log, aka resource. Multiple occurrences of events coming from the same event source can happen across time and they all have the same value of Resource. -- Can contain for example information about the application that emits the record or about the infrastructure where the application runs. Data formats that represent this data model -- may be designed in a manner that allows the Resource field to be recorded only once per batch of log records that come from the same source. SHOULD follow OpenTelemetry semantic conventions for Resources. -- This field is optional. , loggerProviderAttributeLimits :: AttributeLimits } - deriving (Show, Eq) {- | @LogRecords@ can be created from @Loggers@. @Logger@s are uniquely identified by the @libraryName@, @libraryVersion@, @schemaUrl@ fields of @InstrumentationLibrary@. @@ -48,7 +84,7 @@ Creating two @Logger@s with the same identity but different @libraryAttributes@ data Logger = Logger { loggerInstrumentationScope :: InstrumentationLibrary -- ^ Details about the library that the @Logger@ instruments. - , loggerProvider :: LoggerProvider + , loggerLoggerProvider :: LoggerProvider -- ^ The @LoggerProvider@ that created this @Logger@. All configuration for the @Logger@ is contained in the @LoggerProvider@. } @@ -128,11 +164,11 @@ instance IsReadableLogRecord ReadableLogRecord where instance IsReadableLogRecord ReadWriteLogRecord where readLogRecord (ReadWriteLogRecord _ ref) = readIORef ref readLogRecordInstrumentationScope (ReadWriteLogRecord (Logger {loggerInstrumentationScope}) _) = loggerInstrumentationScope - readLogRecordResource (ReadWriteLogRecord Logger {loggerProvider = LoggerProvider {loggerProviderResource}} _) = loggerProviderResource + readLogRecordResource (ReadWriteLogRecord Logger {loggerLoggerProvider = LoggerProvider {loggerProviderResource}} _) = loggerProviderResource instance IsReadWriteLogRecord ReadWriteLogRecord where - readLogRecordAttributeLimits (ReadWriteLogRecord Logger {loggerProvider = LoggerProvider {loggerProviderAttributeLimits}} _) = loggerProviderAttributeLimits + readLogRecordAttributeLimits (ReadWriteLogRecord Logger {loggerLoggerProvider = LoggerProvider {loggerProviderAttributeLimits}} _) = loggerProviderAttributeLimits modifyLogRecord (ReadWriteLogRecord _ ref) = modifyIORef ref atomicModifyLogRecord (ReadWriteLogRecord _ ref) = atomicModifyIORef ref diff --git a/api/src/OpenTelemetry/Internal/Trace/Types.hs b/api/src/OpenTelemetry/Internal/Trace/Types.hs index 7b2989ea..8edeacf6 100644 --- a/api/src/OpenTelemetry/Internal/Trace/Types.hs +++ b/api/src/OpenTelemetry/Internal/Trace/Types.hs @@ -44,9 +44,6 @@ data Exporter a = Exporter } -data ShutdownResult = ShutdownSuccess | ShutdownFailure | ShutdownTimeout - - data Processor = Processor { processorOnStart :: IORef ImmutableSpan -> Context -> IO () -- ^ Called when a span is started. This method is called synchronously on the thread that started the span, therefore it should not block or throw exceptions. @@ -186,19 +183,6 @@ data SpanArguments = SpanArguments } --- | The outcome of a call to 'OpenTelemetry.Trace.forceFlush' -data FlushResult - = -- | One or more spans did not export from all associated exporters - -- within the alotted timeframe. - FlushTimeout - | -- | Flushing spans to all associated exporters succeeded. - FlushSuccess - | -- | One or more exporters failed to successfully export one or more - -- unexported spans. - FlushError - deriving (Show) - - {- | @SpanKind@ describes the relationship between the @Span@, its parents, and its children in a Trace. @SpanKind@ describes two independent properties that benefit tracing systems during analysis. diff --git a/api/src/OpenTelemetry/LogRecordProcessor.hs b/api/src/OpenTelemetry/LogRecordProcessor.hs new file mode 100644 index 00000000..f05346a0 --- /dev/null +++ b/api/src/OpenTelemetry/LogRecordProcessor.hs @@ -0,0 +1,19 @@ +{- | + @LogRecordProcessor@ is an interface which allows hooks for @LogRecord@ emit method invocations. + + Built-in log processors are responsible for batching and conversion of spans to exportable representation and passing batches to exporters. + + Log processors can be registered directly on SDK LoggerProvider and they are invoked in the same order as they were registered. + + Each processor registered on LoggerProvider is a start of pipeline that consist of log processor and optional exporter. SDK MUST allow to end each pipeline with individual exporter. + + SDK MUST allow users to implement and configure custom processors and decorate built-in processors for advanced scenarios such as tagging or filtering. +-} +module OpenTelemetry.LogRecordProcessor ( + LogRecordProcessor (..), + ShutdownResult (..), +) where + +import OpenTelemetry.Internal.Common.Types +import OpenTelemetry.Internal.Logging.Types + diff --git a/api/src/OpenTelemetry/Logging/Core.hs b/api/src/OpenTelemetry/Logging/Core.hs index 6464d129..31dd2aff 100644 --- a/api/src/OpenTelemetry/Logging/Core.hs +++ b/api/src/OpenTelemetry/Logging/Core.hs @@ -6,6 +6,8 @@ module OpenTelemetry.Logging.Core ( createLoggerProvider, setGlobalLoggerProvider, getGlobalLoggerProvider, + shutdownLoggerProvider, + forceFlushLoggerProvider, -- * @Logger@ operations InstrumentationLibrary (..), diff --git a/api/src/OpenTelemetry/Processor.hs b/api/src/OpenTelemetry/Processor.hs index 7e32f743..c6c5bead 100644 --- a/api/src/OpenTelemetry/Processor.hs +++ b/api/src/OpenTelemetry/Processor.hs @@ -26,5 +26,6 @@ module OpenTelemetry.Processor ( ShutdownResult (..), ) where +import OpenTelemetry.Internal.Common.Types import OpenTelemetry.Internal.Trace.Types diff --git a/api/test/OpenTelemetry/Logging/CoreSpec.hs b/api/test/OpenTelemetry/Logging/CoreSpec.hs index 11423447..556be228 100644 --- a/api/test/OpenTelemetry/Logging/CoreSpec.hs +++ b/api/test/OpenTelemetry/Logging/CoreSpec.hs @@ -15,16 +15,24 @@ import OpenTelemetry.Resource.OperatingSystem import Test.Hspec +newtype TestLogRecordProcessor = TestLogRecordProcessor LogRecordProcessor + + +instance Show TestLogRecordProcessor where + show _ = "LogRecordProcessor {..}" + + spec :: Spec spec = describe "Core" $ do describe "The global logger provider" $ do it "Returns a no-op LoggerProvider when not initialized" $ do LoggerProvider {..} <- getGlobalLoggerProvider + fmap TestLogRecordProcessor loggerProviderProcessors `shouldSatisfy` null loggerProviderResource `shouldBe` emptyMaterializedResources loggerProviderAttributeLimits `shouldBe` LA.defaultAttributeLimits it "Allows a LoggerProvider to be set and returns that with subsequent calls to getGlobalLoggerProvider" $ do let lp = - createLoggerProvider $ + createLoggerProvider [] $ LoggerProviderOptions { loggerProviderOptionsResource = materializeResources $ @@ -45,7 +53,9 @@ spec = describe "Core" $ do setGlobalLoggerProvider lp glp <- getGlobalLoggerProvider - glp `shouldBe` lp + fmap TestLogRecordProcessor (loggerProviderProcessors glp) `shouldSatisfy` null + loggerProviderResource glp `shouldBe` loggerProviderResource lp + loggerProviderAttributeLimits glp `shouldBe` loggerProviderAttributeLimits lp describe "addAttribute" $ do it "works" $ do lp <- getGlobalLoggerProvider