-
-
Notifications
You must be signed in to change notification settings - Fork 37
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added Simple LogRecordProcessor and refactored shutdown and forceFlush #136
base: add-log-record-exporters
Are you sure you want to change the base?
Added Simple LogRecordProcessor and refactored shutdown and forceFlush #136
Conversation
Warning This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
This stack of pull requests is managed by Graphite. Learn more about stacking. Join @evanlauer1 and the rest of your teammates on Graphite |
88286a6
to
548bed8
Compare
2ee2412
to
8d06716
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Finished self-review
FlushResult (..), | ||
takeWorseFlushResult, | ||
takeWorstFlushResult, | ||
exportResultToFlushResult, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Helper functions to convert from ExportResult -> FlushResult
and FlushResult -> ShutdownResult
. These are used to propagate Failures and Timeouts up the chain of export
to forceFlush
to shutdown
.
|
||
exportResultToFlushResult :: ExportResult -> FlushResult | ||
exportResultToFlushResult Success = FlushSuccess | ||
exportResultToFlushResult (Failure _) = FlushError |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know of a better way to do this conversion than drop the Exception
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exportResultToFlushResult (Failure _) = FlushError | |
exportResultToFlushResult (Failure mErr) = FlushError $ toList mErr |
@@ -58,7 +58,7 @@ data LogRecordExporter body = LogRecordExporter | |||
-- Result: | |||
-- Success - The batch has been successfully exported. For protocol exporters this typically means that the data is sent over the wire and delivered to the destination server. | |||
-- Failure - exporting failed. The batch must be dropped. For example, this can happen when the batch contains bad data and cannot be serialized. | |||
, logRecordExporterForceFlush :: IO () | |||
, logRecordExporterForceFlush :: IO FlushResult |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may not be necessary for forceFlush
to return a FlushResult
at every step in the chain of Exporter
to Processor
to LoggerProvider
but I think it makes sense for each step to be able to communicate a failure or timeout.
@@ -111,7 +111,7 @@ data LogRecordProcessor body = LogRecordProcessor | |||
-- ^ Called when a LogRecord is emitted. This method is called synchronously on the thread that emitted the LogRecord, therefore it SHOULD NOT block or throw exceptions. | |||
-- | |||
-- A LogRecordProcessor may freely modify logRecord for the duration of the OnEmit call. If logRecord is needed after OnEmit returns (i.e. for asynchronous processing) only reads are permitted. | |||
, logRecordProcessorShutdown :: IO (Async ShutdownResult) | |||
, logRecordProcessorShutdown :: IO ShutdownResult |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It makes more sense to me for the LoggerProvider
or a Processor
to internally implement or choose not to implement concurrency than for it to be enforced by the types. I imagine there are situations where one would not want something to be asynchronous. Removing Async
from the signature also allows combinators like withAsync
and mapConcurrently
to be used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At least for tracing, the docs say this:
Shutdown can be implemented as a blocking API or an asynchronous API which notifies the caller via a callback or an event. OpenTelemetry client authors can decide if they want to make the shutdown timeout configurable.
I can appreciate the argument in the other direction, but would prefer to keep Async
here to stay consistent with the SpanProcessor API https://hackage.haskell.org/package/hs-opentelemetry-api-0.1.0.0/docs/OpenTelemetry-Processor.html#t:Processor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another reason I made the change is that having Async
in the signature made it much more difficult to propagate and combine ExportResult
s, FlushResult
s, and ShutdownResult
s. I also figured that it would be good if either shutdown
, forceFlush
, and export
were either all asynchronous or all synchronous.
Also, the way I rewrote shutdownLoggerProvider
seems cleaner and more abstract (using forConcurrently
) than the way it was written for shutdownTracerProvider
before. Plus, people defining a LogRecordProcessor
no longer have to worry about the asynchronous part because it is handled by the LoggerProvider
.
Let me know your thoughts.
api/src/OpenTelemetry/Logs/Core.hs
Outdated
@@ -124,41 +126,56 @@ setGlobalLoggerProvider :: (MonadIO m) => LoggerProvider body -> m () | |||
setGlobalLoggerProvider = liftIO . writeIORef globalLoggerProvider | |||
|
|||
|
|||
defaultShutdownTimeout :: Int | |||
defaultShutdownTimeout = 5_000_000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shutdown
previously never timed out. I used the same shutdown timeout as forceFlush
, but there is no reason that 5,000,000 microseconds is a magic number. There might be a better default timeout.
|
||
{- | This is an implementation of LogRecordProcessor which passes finished logs and passes the export-friendly ReadableLogRecord | ||
representation to the configured LogRecordExporter, as soon as they are finished. | ||
-} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This implementation mirrors SpanProcessor.Simple
but with a lot of refactoring. It also adds propagating ExportResult
s to FlushResult
s to ShutdownResult
s.
shutdownResult <- logRecordExporterShutdown | ||
|
||
pure $ takeWorseShutdownResult shutdownResult $ flushResultToShutdownResult flushResult | ||
, logRecordProcessorForceFlush |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SpanProcessor.Simple
does not implement forceFlush
. Instead, it has a defacto force flush in shutdown
.
In this implementation, forceFlush
is implemented and shutdown
calls forceFlush
as it's supposed to.
lr <- emitLogRecord l (emptyLogRecordArguments ("a bad one" :: String)) | ||
logRecordProcessorOnEmit processorNoShutdown lr Context.empty | ||
numExportsNoShutdown <- readIORef numExportsNoShutdownRef | ||
numExportsNoShutdown `shouldBe` 3 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OnEmit
calls should do nothing after shutdown
.
exportRes <- logRecordExporterExport testExporter H.empty | ||
exportRes `shouldSatisfy` \case | ||
Success -> False | ||
Failure _ -> True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes sure that the Exporter's shutdown method was called.
shutdownLoggerProvider Nothing lp | ||
|
||
numExports <- readIORef numExportsRef | ||
numExports `shouldBe` 3 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This ensures that each emitted logRecord
made it to the exporter.
bc412e8
to
dc76362
Compare
f415f88
to
6797029
Compare
1c75434
to
608e2b0
Compare
6797029
to
e5db1ec
Compare
608e2b0
to
3a0df02
Compare
e5db1ec
to
e6fa1b9
Compare
3a0df02
to
444d7f7
Compare
e6fa1b9
to
8ee8167
Compare
444d7f7
to
45e0207
Compare
8ee8167
to
7b85d46
Compare
51a0d36
to
33a94d5
Compare
7b85d46
to
2377db3
Compare
33a94d5
to
693ac4c
Compare
2377db3
to
a23f5bf
Compare
693ac4c
to
7cb5e40
Compare
a23f5bf
to
6fe460b
Compare
7cb5e40
to
939738e
Compare
6fe460b
to
b3274c6
Compare
f7d80a7
to
cf6b2ca
Compare
b3274c6
to
7bfaada
Compare
cf6b2ca
to
793d987
Compare
7bfaada
to
98e0d0c
Compare
793d987
to
4d35dc5
Compare
98e0d0c
to
2b1ff85
Compare
4d35dc5
to
b67e4e7
Compare
2b1ff85
to
cdabce7
Compare
b67e4e7
to
88ca14d
Compare
5ef10f4
to
486717e
Compare
8f00ae6
to
6d08447
Compare
486717e
to
d669016
Compare
6d08447
to
af20283
Compare
d669016
to
304f604
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly feedback on how you're propagating the errors, but overall LGTM
@@ -175,6 +200,25 @@ data FlushResult | |||
deriving (Show) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd recommend tweaking FlushError
a bit here to carry exceptions with it:
| FlushError [SomeException]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do.
@@ -175,6 +200,25 @@ data FlushResult | |||
deriving (Show) | |||
|
|||
|
|||
-- | Returns @FlushError@ if either argument is @FlushError@, @FlushTimeout@ if either argument is @FlushTimeout@, and @FlushSuccess@ otherwise. | |||
takeWorseFlushResult :: FlushResult -> FlushResult -> FlushResult |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you take my advice on changing FlushError
, you can make this into (<>)
by implementing the Semigroup
instance:
-- | Returns @FlushError@ if either argument is @FlushError@, @FlushTimeout@
-- if either argument is @FlushTimeout@, and @FlushSuccess@ otherwise.
instance Semigroup FlushResult where
(<>) (FlushError l) (FlushError r) = FlushError (l <> r)
(<>) (FlushError l) _ = FlushError l
(<>) _ (FlushError r) = FlushError r
(<>) FlushTimeout _ = FlushTimeout
(<>) _ FlushTimeout = FlushTimeout
(<>) FlushSuccess FlushSuccess = FlushSuccess
instance Monoid FlushResult where
mempty = FlushSuccess
|
||
|
||
takeWorstFlushResult :: (Foldable t) => t FlushResult -> FlushResult | ||
takeWorstFlushResult = foldr takeWorseFlushResult FlushSuccess |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you go the semigroup / monoid route outlined above, this becomes
takeWorstFlushResult = foldr takeWorseFlushResult FlushSuccess | |
takeWorstFlushResult = fold |
|
||
exportResultToFlushResult :: ExportResult -> FlushResult | ||
exportResultToFlushResult Success = FlushSuccess | ||
exportResultToFlushResult (Failure _) = FlushError |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exportResultToFlushResult (Failure _) = FlushError | |
exportResultToFlushResult (Failure mErr) = FlushError $ toList mErr |
@@ -111,7 +111,7 @@ data LogRecordProcessor body = LogRecordProcessor | |||
-- ^ Called when a LogRecord is emitted. This method is called synchronously on the thread that emitted the LogRecord, therefore it SHOULD NOT block or throw exceptions. | |||
-- | |||
-- A LogRecordProcessor may freely modify logRecord for the duration of the OnEmit call. If logRecord is needed after OnEmit returns (i.e. for asynchronous processing) only reads are permitted. | |||
, logRecordProcessorShutdown :: IO (Async ShutdownResult) | |||
, logRecordProcessorShutdown :: IO ShutdownResult |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At least for tracing, the docs say this:
Shutdown can be implemented as a blocking API or an asynchronous API which notifies the caller via a callback or an event. OpenTelemetry client authors can decide if they want to make the shutdown timeout configurable.
I can appreciate the argument in the other direction, but would prefer to keep Async
here to stay consistent with the SpanProcessor API https://hackage.haskell.org/package/hs-opentelemetry-api-0.1.0.0/docs/OpenTelemetry-Processor.html#t:Processor
@@ -10,7 +10,13 @@ module OpenTelemetry.Internal.Common.Types ( | |||
AnyValue (..), | |||
ToValue (..), | |||
ShutdownResult (..), | |||
takeWorseShutdownResult, | |||
takeWorstShutdownResult, | |||
flushResultToShutdownResult, | |||
FlushResult (..), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit of a driveby request on my part, but it would be nice to reexport FlushResult
so that callers of forceFlushTracerProvider can actually use the type.
|
||
exporterFlushRes <- logRecordExporterForceFlush exporter | ||
|
||
pure $ takeWorseFlushResult exporterFlushRes chanFlushRes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pure $ takeWorseFlushResult exporterFlushRes chanFlushRes | |
pure (exporterFlushRes <> chanFlushRes) |
304f604
to
58f8451
Compare
…ces, and made error handlers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Finished self-review for changes to FlushResult
and ShutdownResult
|
||
|
||
shutdownErrorHandler :: SomeException -> IO ShutdownResult | ||
shutdownErrorHandler = pure . ShutdownError . pure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
used with handle
and catch
from Control.Exception
|
||
|
||
takeWorstShutdownResult :: (Foldable t) => t ShutdownResult -> ShutdownResult | ||
takeWorstShutdownResult = fold |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Developers may be able to use fold
instead of this function, but I think takeWorsShutdownResult
better communicates the meaning of the action and fold
might be less well known
Hi @evanlauer1, whats' the status of this? Would you like me to give it a review? |
Big Context
Logging Support is being added to hs-opentelemetry. Logging spec
Small (This PR) Context
Two built-in processors are necessary for a functional SDK: the simple processor and the batching processor. This PR implements the first of the two. The simple processor follows the spec for
LogRecordProcessor
s.Testing
stack build
runsLogRecordProcessor
passes