@@ -29,6 +29,7 @@ REFLECTION(json_summary_t, name, help, type, metrics);
29
29
#endif
30
30
31
31
struct block_t {
32
+ std::atomic<bool > is_coro_started_ = false ;
32
33
std::atomic<bool > stop_ = false ;
33
34
ylt::detail::moodycamel::ConcurrentQueue<double > sample_queue_;
34
35
std::shared_ptr<TimeWindowQuantiles> quantile_values_;
@@ -75,7 +76,7 @@ class summary_t : public static_metric {
75
76
block_->sample_queue_ .enqueue (value);
76
77
77
78
bool expected = false ;
78
- if (is_coro_started_.compare_exchange_strong (expected, true )) {
79
+ if (block_-> is_coro_started_ .compare_exchange_strong (expected, true )) {
79
80
start (block_).via (excutor_->get_executor ()).start ([](auto &&) {
80
81
});
81
82
}
@@ -220,13 +221,13 @@ class summary_t : public static_metric {
220
221
}
221
222
222
223
if (block->sample_queue_ .size_approx () == 0 ) {
223
- is_coro_started_ = false ;
224
+ block_-> is_coro_started_ = false ;
224
225
if (block->sample_queue_ .size_approx () == 0 ) {
225
226
break ;
226
227
}
227
228
228
229
bool expected = false ;
229
- if (!is_coro_started_.compare_exchange_strong (expected, true )) {
230
+ if (!block_-> is_coro_started_ .compare_exchange_strong (expected, true )) {
230
231
break ;
231
232
}
232
233
@@ -243,7 +244,6 @@ class summary_t : public static_metric {
243
244
std::shared_ptr<block_t > block_;
244
245
static inline std::shared_ptr<coro_io::io_context_pool> excutor_ =
245
246
coro_io::create_io_context_pool (1 );
246
- std::atomic<bool > is_coro_started_ = false ;
247
247
bool has_observe_ = false ;
248
248
};
249
249
@@ -260,6 +260,10 @@ struct sum_and_count_t {
260
260
261
261
template <uint8_t N>
262
262
struct labels_block_t {
263
+ summary_t ::Quantiles quantiles_; // readonly
264
+ std::chrono::milliseconds max_age_;
265
+ uint16_t age_buckets_;
266
+ std::atomic<bool > is_coro_started_ = false ;
263
267
std::atomic<bool > stop_ = false ;
264
268
ylt::detail::moodycamel::ConcurrentQueue<summary_label_sample<N>>
265
269
sample_queue_;
@@ -280,12 +284,16 @@ class basic_dynamic_summary : public dynamic_metric {
280
284
std::array<std::string, N> labels_name,
281
285
std::chrono::milliseconds max_age = std::chrono::seconds{60 },
282
286
uint16_t age_buckets = 5 )
283
- : quantiles_{std::move (quantiles)},
284
- dynamic_metric (MetricType::Summary, std::move(name), std::move(help),
285
- std::move (labels_name)),
286
- max_age_(max_age),
287
- age_buckets_(age_buckets) {
288
- init_block (labels_block_);
287
+ : dynamic_metric(MetricType::Summary, std::move(name), std::move(help),
288
+ std::move (labels_name)) {
289
+ labels_block_ = std::make_shared<labels_block_t <N>>();
290
+ labels_block_->quantiles_ = std::move (quantiles);
291
+ labels_block_->max_age_ = max_age;
292
+ labels_block_->age_buckets_ = age_buckets;
293
+
294
+ start (labels_block_).via (excutor_->get_executor ()).start ([](auto &&) {
295
+ });
296
+
289
297
g_user_metric_count++;
290
298
}
291
299
@@ -307,7 +315,8 @@ class basic_dynamic_summary : public dynamic_metric {
307
315
labels_block_->sample_queue_ .enqueue ({std::move (labels_value), value});
308
316
309
317
bool expected = false ;
310
- if (is_coro_started_.compare_exchange_strong (expected, true )) {
318
+ if (labels_block_->is_coro_started_ .compare_exchange_strong (expected,
319
+ true )) {
311
320
start (labels_block_).via (excutor_->get_executor ()).start ([](auto &&) {
312
321
});
313
322
}
@@ -327,7 +336,7 @@ class basic_dynamic_summary : public dynamic_metric {
327
336
const std::array<std::string, N> &labels_value, double &sum,
328
337
uint64_t &count) {
329
338
std::vector<double > vec;
330
- if (quantiles_.empty ()) {
339
+ if (labels_block_-> quantiles_ .empty ()) {
331
340
co_return std::vector<double >{};
332
341
}
333
342
@@ -339,7 +348,7 @@ class basic_dynamic_summary : public dynamic_metric {
339
348
}
340
349
sum = labels_block_->sum_and_count_ [labels_value].sum ;
341
350
count = labels_block_->sum_and_count_ [labels_value].count ;
342
- for (const auto &quantile : quantiles_) {
351
+ for (const auto &quantile : labels_block_-> quantiles_ ) {
343
352
vec.push_back (it->second ->get (quantile.quantile ));
344
353
}
345
354
},
@@ -359,13 +368,6 @@ class basic_dynamic_summary : public dynamic_metric {
359
368
}
360
369
#endif
361
370
private:
362
- template <typename T>
363
- void init_block (std::shared_ptr<T> &block) {
364
- block = std::make_shared<T>();
365
- start (block).via (excutor_->get_executor ()).start ([](auto &&) {
366
- });
367
- }
368
-
369
371
async_simple::coro::Lazy<void > start (
370
372
std::shared_ptr<labels_block_t <N>> label_block) {
371
373
summary_label_sample<N> sample;
@@ -376,8 +378,9 @@ class basic_dynamic_summary : public dynamic_metric {
376
378
auto &ptr = label_block->label_quantile_values_ [sample.labels_value ];
377
379
378
380
if (ptr == nullptr ) {
379
- ptr = std::make_shared<TimeWindowQuantiles>(quantiles_, max_age_,
380
- age_buckets_);
381
+ ptr = std::make_shared<TimeWindowQuantiles>(
382
+ label_block->quantiles_ , label_block->max_age_ ,
383
+ label_block->age_buckets_ );
381
384
}
382
385
383
386
ptr->insert (sample.value );
@@ -393,13 +396,14 @@ class basic_dynamic_summary : public dynamic_metric {
393
396
co_await async_simple::coro::Yield{};
394
397
395
398
if (label_block->sample_queue_ .size_approx () == 0 ) {
396
- is_coro_started_ = false ;
399
+ label_block-> is_coro_started_ = false ;
397
400
if (label_block->sample_queue_ .size_approx () == 0 ) {
398
401
break ;
399
402
}
400
403
401
404
bool expected = false ;
402
- if (!is_coro_started_.compare_exchange_strong (expected, true )) {
405
+ if (!label_block->is_coro_started_ .compare_exchange_strong (expected,
406
+ true )) {
403
407
break ;
404
408
}
405
409
@@ -412,7 +416,7 @@ class basic_dynamic_summary : public dynamic_metric {
412
416
}
413
417
414
418
async_simple::coro::Lazy<void > serialize_async_with_label (std::string &str) {
415
- if (quantiles_.empty ()) {
419
+ if (labels_block_-> quantiles_ .empty ()) {
416
420
co_return ;
417
421
}
418
422
@@ -432,13 +436,14 @@ class basic_dynamic_summary : public dynamic_metric {
432
436
double sum = 0 ;
433
437
uint64_t count = 0 ;
434
438
auto rates = co_await get_rates (labels_value, sum, count);
435
- for (size_t i = 0 ; i < quantiles_.size (); i++) {
439
+ for (size_t i = 0 ; i < labels_block_-> quantiles_ .size (); i++) {
436
440
str.append (name_);
437
441
str.append (" {" );
438
442
build_label_string (str, labels_name_, labels_value);
439
443
str.append (" ," );
440
444
str.append (" quantile=\" " );
441
- str.append (std::to_string (quantiles_[i].quantile )).append (" \" } " );
445
+ str.append (std::to_string (labels_block_->quantiles_ [i].quantile ))
446
+ .append (" \" } " );
442
447
str.append (std::to_string (rates[i])).append (" \n " );
443
448
}
444
449
@@ -459,7 +464,7 @@ class basic_dynamic_summary : public dynamic_metric {
459
464
#ifdef CINATRA_ENABLE_METRIC_JSON
460
465
async_simple::coro::Lazy<void > serialize_to_json_with_label_async (
461
466
std::string &str) {
462
- if (quantiles_.empty ()) {
467
+ if (labels_block_-> quantiles_ .empty ()) {
463
468
co_return ;
464
469
}
465
470
@@ -482,11 +487,12 @@ class basic_dynamic_summary : public dynamic_metric {
482
487
auto rates = co_await get_rates (labels_value, sum, count);
483
488
metric.count = count;
484
489
metric.sum = sum;
485
- for (size_t i = 0 ; i < quantiles_.size (); i++) {
490
+ for (size_t i = 0 ; i < labels_block_-> quantiles_ .size (); i++) {
486
491
for (size_t i = 0 ; i < labels_value.size (); i++) {
487
492
metric.labels [labels_name_[i]] = labels_value[i];
488
493
}
489
- metric.quantiles .emplace (quantiles_[i].quantile , rates[i]);
494
+ metric.quantiles .emplace (labels_block_->quantiles_ [i].quantile ,
495
+ rates[i]);
490
496
}
491
497
492
498
summary.metrics .push_back (std::move (metric));
@@ -495,13 +501,9 @@ class basic_dynamic_summary : public dynamic_metric {
495
501
}
496
502
#endif
497
503
498
- Quantiles quantiles_; // readonly
499
504
std::shared_ptr<labels_block_t <N>> labels_block_;
500
505
static inline std::shared_ptr<coro_io::io_context_pool> excutor_ =
501
506
coro_io::create_io_context_pool (1 );
502
- std::chrono::milliseconds max_age_;
503
- uint16_t age_buckets_;
504
- std::atomic<bool > is_coro_started_ = false ;
505
507
bool has_observe_ = false ;
506
508
};
507
509
0 commit comments