@@ -10,27 +10,53 @@ use ckb_testkit::{Node, User};
10
10
use crossbeam_channel:: { Receiver , Sender } ;
11
11
use lru:: LruCache ;
12
12
use std:: collections:: HashMap ;
13
- use std:: time:: Instant ;
13
+ use std:: sync:: { Arc } ;
14
+ use std:: time:: { Duration , Instant } ;
15
+ use tokio:: sync:: Semaphore ;
16
+ use futures:: stream:: FuturesUnordered ;
17
+ use futures:: { FutureExt , StreamExt } ;
18
+ use tokio:: time:: sleep as async_sleep;
19
+ use crate :: utils:: maybe_retry_send_transaction_async;
20
+ use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
14
21
15
22
pub struct LiveCellProducer {
16
23
users : Vec < User > ,
17
24
nodes : Vec < Node > ,
18
25
seen_out_points : LruCache < OutPoint , Instant > ,
19
26
}
20
27
21
- // TODO Add more logs
22
28
impl LiveCellProducer {
23
29
pub fn new ( users : Vec < User > , nodes : Vec < Node > ) -> Self {
24
30
let n_users = users. len ( ) ;
31
+
32
+ let mut user_unused_max_cell_count_cache = 1 ;
33
+ // step_by: 20 : using a sampling method to find the user who owns the highest number of cells.
34
+ // seen_out_points lruCache cache size = user_unused_max_cell_count_cache * n_users + 10
35
+ // seen_out_points lruCache: preventing unused cells on the chain from being reused.
36
+ for i in ( 0 ..=users. len ( ) -1 ) . step_by ( 20 ) {
37
+ let user_unused_cell_count_cache = users. get ( i) . expect ( "out of bound" ) . get_spendable_single_secp256k1_cells ( & nodes[ 0 ] ) . len ( ) ;
38
+ if user_unused_cell_count_cache > user_unused_max_cell_count_cache && user_unused_cell_count_cache <= 10000 {
39
+ user_unused_max_cell_count_cache = user_unused_cell_count_cache;
40
+ }
41
+ ckb_testkit:: debug!( "idx:{}:user_unused_cell_count_cache:{}" , i, user_unused_cell_count_cache)
42
+ }
43
+ ckb_testkit:: debug!( "user max cell count cache:{}" , user_unused_max_cell_count_cache) ;
44
+ let lrc_cache_size = n_users * user_unused_max_cell_count_cache + 10 ;
45
+ ckb_testkit:: info!( "init unused cache size:{}" , lrc_cache_size) ;
25
46
Self {
26
47
users,
27
48
nodes,
28
- seen_out_points : LruCache :: new ( n_users + 10 ) ,
49
+ seen_out_points : LruCache :: new ( lrc_cache_size ) ,
29
50
}
30
51
}
31
52
32
- pub fn run ( mut self , live_cell_sender : Sender < CellMeta > ) {
53
+ pub fn run ( mut self , live_cell_sender : Sender < CellMeta > , log_duration : u64 ) {
54
+ let mut count = 0 ;
55
+ let mut start_time = Instant :: now ( ) ;
56
+ let mut duration_count = 0 ;
57
+ let mut fist_send_finished = true ;
33
58
loop {
59
+ // let mut current_loop_start_time = Instant::now();
34
60
let min_tip_number = self
35
61
. nodes
36
62
. iter ( )
@@ -60,8 +86,22 @@ impl LiveCellProducer {
60
86
self . seen_out_points
61
87
. put ( cell. out_point . clone ( ) , Instant :: now ( ) ) ;
62
88
let _ignore = live_cell_sender. send ( cell) ;
89
+ count += 1 ;
90
+ duration_count += 1 ;
91
+ if Instant :: now ( ) . duration_since ( start_time) >= Duration :: from_secs ( log_duration) {
92
+ let elapsed = start_time. elapsed ( ) ;
93
+ ckb_testkit:: info!( "[LiveCellProducer] producer count: {} ,duration time:{:?} , duration tps:{}" , count, elapsed, duration_count* 1000 /elapsed. as_millis( ) ) ;
94
+ duration_count = 0 ;
95
+ start_time = Instant :: now ( ) ;
96
+ }
97
+ }
98
+
99
+ if fist_send_finished {
100
+ fist_send_finished = false ;
101
+ self . seen_out_points . resize ( count + 10 )
63
102
}
64
103
}
104
+ // ckb_testkit::debug!("[LiveCellProducer] delay:{:?}",current_loop_start_time.elapsed());
65
105
}
66
106
}
67
107
}
@@ -74,7 +114,7 @@ pub struct TransactionProducer {
74
114
// #{ lock_hash => live_cell }
75
115
live_cells : HashMap < Byte32 , CellMeta > ,
76
116
// #{ out_point => live_cell }
77
- backlogs : HashMap < OutPoint , CellMeta > ,
117
+ backlogs : HashMap < Byte32 , Vec < CellMeta > > ,
78
118
}
79
119
80
120
impl TransactionProducer {
@@ -113,6 +153,7 @@ impl TransactionProducer {
113
153
mut self ,
114
154
live_cell_receiver : Receiver < CellMeta > ,
115
155
transaction_sender : Sender < TransactionView > ,
156
+ log_duration : u64 ,
116
157
) {
117
158
// Environment variables `CKB_BENCH_ENABLE_DATA1_SCRIPT` and
118
159
// `CKB_BENCH_ENABLE_INVALID_SINCE_EPOCH` are temporary.
@@ -137,17 +178,29 @@ impl TransactionProducer {
137
178
"CKB_BENCH_ENABLE_INVALID_SINCE_EPOCH = {}" ,
138
179
enabled_invalid_since_epoch
139
180
) ;
181
+ let mut count = 0 ;
182
+ let mut start_time = Instant :: now ( ) ;
183
+ let mut duration_count = 0 ;
140
184
141
185
while let Ok ( live_cell) = live_cell_receiver. recv ( ) {
142
186
let lock_hash = live_cell. cell_output . calc_lock_hash ( ) ;
143
- match self . live_cells . entry ( lock_hash. clone ( ) ) {
144
- std:: collections:: hash_map:: Entry :: Occupied ( entry) => {
145
- if entry. get ( ) . out_point == live_cell. out_point {
146
- self . backlogs . insert ( live_cell. out_point . clone ( ) , live_cell) ;
187
+
188
+ if let Some ( _live_cell_in_map) = self . live_cells . get ( & lock_hash) {
189
+ self . backlogs
190
+ . entry ( lock_hash. clone ( ) )
191
+ . or_insert_with ( Vec :: new)
192
+ . push ( live_cell) ;
193
+ } else {
194
+ self . live_cells . insert ( lock_hash. clone ( ) , live_cell) ;
195
+ for ( hash, backlog_cells) in self . backlogs . iter_mut ( ) {
196
+ if self . live_cells . len ( ) >= self . n_inout {
197
+ break ;
198
+ }
199
+ if !self . live_cells . contains_key ( hash) && !backlog_cells. is_empty ( ) {
200
+ if let Some ( backlog_cell) = backlog_cells. pop ( ) {
201
+ self . live_cells . insert ( hash. clone ( ) , backlog_cell) ;
202
+ }
147
203
}
148
- }
149
- std:: collections:: hash_map:: Entry :: Vacant ( entry) => {
150
- entry. insert ( live_cell) ;
151
204
}
152
205
}
153
206
@@ -175,6 +228,7 @@ impl TransactionProducer {
175
228
. values ( )
176
229
. map ( |cell| {
177
230
// use tx_index as random number
231
+ let lock_hash = cell. cell_output . calc_lock_hash ( ) ;
178
232
let tx_index = cell. transaction_info . as_ref ( ) . unwrap ( ) . index ;
179
233
let user = self . users . get ( & lock_hash) . expect ( "should be ok" ) ;
180
234
match tx_index % 3 {
@@ -225,23 +279,140 @@ impl TransactionProducer {
225
279
// SendError occurs, the corresponding transaction receiver is dead
226
280
return ;
227
281
}
282
+ count += 1 ;
283
+ duration_count += 1 ;
284
+ if Instant :: now ( ) . duration_since ( start_time) >= Duration :: from_secs ( log_duration) {
285
+ let elapsed = start_time. elapsed ( ) ;
286
+ ckb_testkit:: info!( "[TransactionProducer] producer count: {} liveCell producer remaining :{} ,duration time:{:?}, duration tps:{} " , count, live_cell_receiver. len( ) , elapsed, duration_count* 1000 /elapsed. as_millis( ) ) ;
287
+ duration_count = 0 ;
288
+ start_time = Instant :: now ( ) ;
289
+ }
290
+ }
291
+ }
292
+ }
293
+ }
228
294
229
- let mut backlogs = HashMap :: new ( ) ;
230
- std:: mem:: swap ( & mut self . backlogs , & mut backlogs) ;
231
- for ( _, live_cell) in backlogs. into_iter ( ) {
232
- let lock_hash = live_cell. cell_output . calc_lock_hash ( ) ;
233
- match self . live_cells . entry ( lock_hash) {
234
- std:: collections:: hash_map:: Entry :: Occupied ( entry) => {
235
- if entry. get ( ) . out_point == live_cell. out_point {
236
- self . backlogs . insert ( live_cell. out_point . clone ( ) , live_cell) ;
237
- }
295
+ pub struct TransactionConsumer {
296
+ nodes : Vec < Node > ,
297
+ }
298
+
299
+
300
+ impl TransactionConsumer {
301
+ pub fn new ( nodes : Vec < Node > ) -> Self {
302
+ Self {
303
+ nodes
304
+ }
305
+ }
306
+
307
+ pub async fn run (
308
+ self ,
309
+ transaction_receiver : Receiver < TransactionView > ,
310
+ max_concurrent_requests : usize ,
311
+ t_tx_interval : Duration ,
312
+ t_bench : Duration ) {
313
+ let start_time = Instant :: now ( ) ;
314
+ let mut last_log_duration = Instant :: now ( ) ;
315
+ let mut benched_transactions = 0 ;
316
+ let mut duplicated_transactions = 0 ;
317
+ let mut loop_count = 0 ;
318
+ let mut i = 0 ;
319
+ let log_duration_time = 3 ;
320
+
321
+ let semaphore = Arc :: new ( Semaphore :: new ( max_concurrent_requests) ) ;
322
+ let transactions_processed = Arc :: new ( AtomicUsize :: new ( 0 ) ) ;
323
+ let transactions_total_time = Arc :: new ( AtomicUsize :: new ( 0 ) ) ;
324
+
325
+
326
+ // let logger_task = print_transactions_processed(transactions_processed.clone(), transactions_total_time.clone());
327
+ // tokio::spawn(logger_task);
328
+ let mut pending_tasks = FuturesUnordered :: new ( ) ;
329
+
330
+ loop {
331
+ loop_count += 1 ;
332
+ let tx = transaction_receiver
333
+ . recv_timeout ( Duration :: from_secs ( 60 * 3 ) )
334
+ . expect ( "timeout to wait transaction_receiver" ) ;
335
+ if t_tx_interval. as_millis ( ) != 0 {
336
+ async_sleep ( t_tx_interval) . await ;
337
+ }
338
+
339
+ i = ( i + 1 ) % self . nodes . len ( ) ;
340
+ let node = self . nodes [ i] . clone ( ) ;
341
+ let permit = semaphore. clone ( ) . acquire_owned ( ) . await ;
342
+ let tx_hash = tx. hash ( ) ;
343
+ let begin_time = Instant :: now ( ) ;
344
+ let task = async move {
345
+ let result = maybe_retry_send_transaction_async ( & node, & tx) . await ;
346
+ drop ( permit) ;
347
+ ( result, tx_hash, Instant :: now ( ) - begin_time)
348
+ } ;
349
+
350
+ pending_tasks. push ( tokio:: spawn ( task) ) ;
351
+ while let Some ( result) = pending_tasks. next ( ) . now_or_never ( ) {
352
+ transactions_processed. fetch_add ( 1 , Ordering :: Relaxed ) ;
353
+
354
+ let mut use_time = Duration :: from_millis ( 0 ) ;
355
+
356
+ match result {
357
+ Some ( Ok ( ( Ok ( is_accepted) , _tx_hash, cost_time) ) ) => {
358
+ use_time = cost_time;
359
+ if is_accepted {
360
+ benched_transactions += 1 ;
361
+ } else {
362
+ duplicated_transactions += 1 ;
238
363
}
239
- std:: collections:: hash_map:: Entry :: Vacant ( entry) => {
240
- entry. insert ( live_cell) ;
364
+ }
365
+ Some ( Ok ( ( Err ( err) , tx_hash, cost_time) ) ) => {
366
+ use_time = cost_time;
367
+ // double spending, discard this transaction
368
+ ckb_testkit:: info!(
369
+ "consumer count :{} failed to send tx {:#x}, error: {}" ,
370
+ loop_count,
371
+ tx_hash,
372
+ err
373
+ ) ;
374
+ if !err. contains ( "TransactionFailedToResolve" ) {
375
+ ckb_testkit:: error!(
376
+ "failed to send tx {:#x}, error: {}" ,
377
+ tx_hash,
378
+ err
379
+ ) ;
241
380
}
242
381
}
382
+ Some ( Err ( e) ) => {
383
+ eprintln ! ( "Error in task: {:?}" , e) ;
384
+ }
385
+ None => break ,
386
+ }
387
+ transactions_total_time. fetch_add ( use_time. as_millis ( ) as usize , Ordering :: Relaxed ) ;
388
+ }
389
+
390
+ if last_log_duration. elapsed ( ) > Duration :: from_secs ( log_duration_time) {
391
+ let elapsed = last_log_duration. elapsed ( ) ;
392
+ last_log_duration = Instant :: now ( ) ;
393
+ let duration_count = transactions_processed. swap ( 0 , Ordering :: Relaxed ) ;
394
+ let duration_total_time = transactions_total_time. swap ( 0 , Ordering :: Relaxed ) ;
395
+ let mut duration_tps = 0 ;
396
+ let mut duration_delay = 0 ;
397
+ if duration_count != 0 {
398
+ duration_delay = duration_total_time / ( duration_count as usize ) ;
399
+ duration_tps = duration_count * 1000 / ( elapsed. as_millis ( ) as usize ) ;
243
400
}
401
+ ckb_testkit:: info!(
402
+ "[TransactionConsumer] consumer :{} transactions, {} duplicated {} , transaction producer remaining :{}, log duration {:?} ,duration send tx tps {},duration avg delay {}ms" ,
403
+ loop_count,
404
+ benched_transactions,
405
+ duplicated_transactions,
406
+ transaction_receiver. len( ) ,
407
+ elapsed,
408
+ duration_tps,
409
+ duration_delay
410
+ ) ;
411
+ }
412
+ if start_time. elapsed ( ) > t_bench {
413
+ break ;
244
414
}
245
415
}
246
416
}
247
417
}
418
+
0 commit comments