Skip to content

Commit

Permalink
Wrap LogRecordExporter in MVar so it cannot be called concurrently
Browse files Browse the repository at this point in the history
  • Loading branch information
evanlauer1 committed Jul 31, 2024
1 parent cdabce7 commit fbf660d
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 9 deletions.
16 changes: 14 additions & 2 deletions api/src/OpenTelemetry/Exporter/LogRecord.hs
Original file line number Diff line number Diff line change
@@ -1,8 +1,20 @@
module OpenTelemetry.Exporter.LogRecord (
LogRecordExporter (..),
LogRecordExporter,
LogRecordExporterInternal (..),
mkLogRecordExporter,
logRecordExporterExport,
logRecordExporterForceFlush,
logRecordExporterShutdown,
ShutdownResult (..),
) where

import OpenTelemetry.Internal.Logs.Types (LogRecordExporter (..))
import OpenTelemetry.Internal.Logs.Types (
LogRecordExporter,
LogRecordExporterInternal (..),
logRecordExporterExport,
logRecordExporterForceFlush,
logRecordExporterShutdown,
mkLogRecordExporter,
)
import OpenTelemetry.Processor.LogRecord (ShutdownResult (..))

37 changes: 30 additions & 7 deletions api/src/OpenTelemetry/Internal/Logs/Types.hs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE NamedFieldPuns #-}

module OpenTelemetry.Internal.Logs.Types (
LogRecordExporter (..),
LogRecordExporter,
LogRecordExporterInternal (..),
mkLogRecordExporter,
logRecordExporterExport,
logRecordExporterForceFlush,
logRecordExporterShutdown,
LogRecordProcessor (..),
LoggerProvider (..),
Logger (..),
Expand All @@ -20,6 +23,7 @@ module OpenTelemetry.Internal.Logs.Types (
toShortName,
) where

import Control.Concurrent (MVar, newMVar, withMVar)
import Control.Concurrent.Async
import Data.Function (on)
import Data.HashMap.Strict (HashMap)
Expand All @@ -39,8 +43,8 @@ import OpenTelemetry.Resource (MaterializedResources)
The goal of the interface is to minimize burden of implementation for protocol-dependent telemetry exporters. The protocol exporter is expected to be primarily a simple telemetry data encoder and transmitter.
-}
data LogRecordExporter = LogRecordExporter
{ logRecordExporterExport :: Vector ReadableLogRecord -> IO ExportResult
data LogRecordExporterInternal = LogRecordExporterInternal
{ logRecordExporterExportInternal :: Vector ReadableLogRecord -> IO ExportResult
-- ^ Exports a batch of ReadableLogRecords. Protocol exporters that will implement this function are typically expected to serialize
-- and transmit the data to the destination.
--
Expand All @@ -59,7 +63,7 @@ data LogRecordExporter = LogRecordExporter
-- Result:
-- Success - The batch has been successfully exported. For protocol exporters this typically means that the data is sent over the wire and delivered to the destination server.
-- Failure - exporting failed. The batch must be dropped. For example, this can happen when the batch contains bad data and cannot be serialized.
, logRecordExporterForceFlush :: IO ()
, logRecordExporterForceFlushInternal :: IO ()
-- ^ This is a hint to ensure that the export of any ReadableLogRecords the exporter has received prior to the call to ForceFlush SHOULD
-- be completed as soon as possible, preferably before returning from this method.
--
Expand All @@ -70,7 +74,7 @@ data LogRecordExporter = LogRecordExporter
--
-- ForceFlush SHOULD complete or abort within some timeout. ForceFlush can be implemented as a blocking API or an asynchronous API which
-- notifies the caller via a callback or an event. OpenTelemetry SDK authors MAY decide if they want to make the flush timeout configurable.
, logRecordExporterShutdown :: IO ()
, logRecordExporterShutdownInternal :: IO ()
-- ^
-- Shuts down the exporter. Called when SDK is shut down. This is an opportunity for exporter to do any cleanup required.
--
Expand All @@ -82,6 +86,25 @@ data LogRecordExporter = LogRecordExporter
}


newtype LogRecordExporter = LogRecordExporter {unExporter :: MVar LogRecordExporterInternal}


mkLogRecordExporter :: LogRecordExporterInternal -> IO LogRecordExporter
mkLogRecordExporter = fmap LogRecordExporter . newMVar


logRecordExporterExport :: LogRecordExporter -> Vector ReadableLogRecord -> IO ExportResult
logRecordExporterExport exporter lrs = withMVar (unExporter exporter) $ \e -> logRecordExporterExportInternal e lrs


logRecordExporterForceFlush :: LogRecordExporter -> IO ()
logRecordExporterForceFlush = flip withMVar logRecordExporterForceFlushInternal . unExporter


logRecordExporterShutdown :: LogRecordExporter -> IO ()
logRecordExporterShutdown = flip withMVar logRecordExporterShutdownInternal . unExporter


{- | LogRecordProcessor is an interface which allows hooks for LogRecord emitting.
Built-in processors are responsible for batching and conversion of LogRecords to exportable representation and passing batches to exporters.
Expand Down

0 comments on commit fbf660d

Please sign in to comment.