@@ -1033,6 +1033,7 @@ func sendAssetKeySendPayment(t *testing.T, src, dst *HarnessNode, amt uint64,
1033
1033
DestCustomRecords : customRecords ,
1034
1034
PaymentHash : hash [:],
1035
1035
TimeoutSeconds : int32 (PaymentTimeout .Seconds ()),
1036
+ MaxParts : cfg .maxShards ,
1036
1037
}
1037
1038
1038
1039
request := & tchrpc.SendPaymentRequest {
@@ -1090,7 +1091,7 @@ func sendKeySendPayment(t *testing.T, src, dst *HarnessNode,
1090
1091
stream , err := src .RouterClient .SendPaymentV2 (ctxt , req )
1091
1092
require .NoError (t , err )
1092
1093
1093
- result , err := getPaymentResult (stream )
1094
+ result , err := getFinalPaymentResult (stream )
1094
1095
require .NoError (t , err )
1095
1096
require .Equal (t , lnrpc .Payment_SUCCEEDED , result .Status )
1096
1097
}
@@ -1135,6 +1136,12 @@ func createAndPayNormalInvoice(t *testing.T, src, rfqPeer, dst *HarnessNode,
1135
1136
func payInvoiceWithSatoshi (t * testing.T , payer * HarnessNode ,
1136
1137
invoice * lnrpc.AddInvoiceResponse , opts ... payOpt ) {
1137
1138
1139
+ payPayReqWithSatoshi (t , payer , invoice .PaymentRequest , opts ... )
1140
+ }
1141
+
1142
+ func payPayReqWithSatoshi (t * testing.T , payer * HarnessNode , payReq string ,
1143
+ opts ... payOpt ) {
1144
+
1138
1145
cfg := defaultPayConfig ()
1139
1146
for _ , opt := range opts {
1140
1147
opt (cfg )
@@ -1145,15 +1152,30 @@ func payInvoiceWithSatoshi(t *testing.T, payer *HarnessNode,
1145
1152
defer cancel ()
1146
1153
1147
1154
sendReq := & routerrpc.SendPaymentRequest {
1148
- PaymentRequest : invoice .PaymentRequest ,
1149
- TimeoutSeconds : int32 (PaymentTimeout .Seconds ()),
1150
- MaxShardSizeMsat : 80_000_000 ,
1151
- FeeLimitMsat : 1_000_000 ,
1155
+ PaymentRequest : payReq ,
1156
+ TimeoutSeconds : int32 (PaymentTimeout .Seconds ()),
1157
+ FeeLimitMsat : 1_000_000 ,
1158
+ MaxParts : cfg .maxShards ,
1159
+ }
1160
+
1161
+ if cfg .smallShards {
1162
+ sendReq .MaxShardSizeMsat = 80_000_000
1152
1163
}
1164
+
1153
1165
stream , err := payer .RouterClient .SendPaymentV2 (ctxt , sendReq )
1154
1166
require .NoError (t , err )
1155
1167
1156
- result , err := getPaymentResult (stream )
1168
+ if cfg .payStatus == lnrpc .Payment_IN_FLIGHT {
1169
+ t .Logf ("Waiting for initial stream response..." )
1170
+ result , err := stream .Recv ()
1171
+ require .NoError (t , err )
1172
+
1173
+ require .Equal (t , cfg .payStatus , result .Status )
1174
+
1175
+ return
1176
+ }
1177
+
1178
+ result , err := getFinalPaymentResult (stream )
1157
1179
if cfg .errSubStr != "" {
1158
1180
require .ErrorContains (t , err , cfg .errSubStr )
1159
1181
} else {
@@ -1212,6 +1234,7 @@ func payInvoiceWithSatoshiLastHop(t *testing.T, payer *HarnessNode,
1212
1234
1213
1235
type payConfig struct {
1214
1236
smallShards bool
1237
+ maxShards uint32
1215
1238
errSubStr string
1216
1239
allowOverpay bool
1217
1240
feeLimit lnwire.MilliSatoshi
@@ -1240,6 +1263,12 @@ func withSmallShards() payOpt {
1240
1263
}
1241
1264
}
1242
1265
1266
+ func withMaxShards (maxShards uint32 ) payOpt {
1267
+ return func (c * payConfig ) {
1268
+ c .maxShards = maxShards
1269
+ }
1270
+ }
1271
+
1243
1272
func withPayErrSubStr (errSubStr string ) payOpt {
1244
1273
return func (c * payConfig ) {
1245
1274
c .errSubStr = errSubStr
@@ -1310,6 +1339,7 @@ func payInvoiceWithAssets(t *testing.T, payer, rfqPeer *HarnessNode,
1310
1339
TimeoutSeconds : int32 (PaymentTimeout .Seconds ()),
1311
1340
FeeLimitMsat : int64 (cfg .feeLimit ),
1312
1341
DestCustomRecords : cfg .destCustomRecords ,
1342
+ MaxParts : cfg .maxShards ,
1313
1343
}
1314
1344
1315
1345
if cfg .smallShards {
@@ -1394,8 +1424,9 @@ func payInvoiceWithAssets(t *testing.T, payer, rfqPeer *HarnessNode,
1394
1424
}
1395
1425
1396
1426
type invoiceConfig struct {
1397
- errSubStr string
1398
- groupKey []byte
1427
+ errSubStr string
1428
+ groupKey []byte
1429
+ routeHints []* lnrpc.RouteHint
1399
1430
}
1400
1431
1401
1432
func defaultInvoiceConfig () * invoiceConfig {
@@ -2292,6 +2323,158 @@ func macFromBytes(macBytes []byte) (grpc.DialOption, error) {
2292
2323
return grpc .WithPerRPCCredentials (cred ), nil
2293
2324
}
2294
2325
2326
+ func assertMinNumHtlcs (t * testing.T , node * HarnessNode , expected int ) {
2327
+ t .Helper ()
2328
+
2329
+ ctxb := context .Background ()
2330
+
2331
+ err := wait .NoError (func () error {
2332
+ listChansRequest := & lnrpc.ListChannelsRequest {}
2333
+ listChansResp , err := node .ListChannels (ctxb , listChansRequest )
2334
+ if err != nil {
2335
+ return err
2336
+ }
2337
+
2338
+ var numHtlcs int
2339
+ for _ , channel := range listChansResp .Channels {
2340
+ numHtlcs += len (channel .PendingHtlcs )
2341
+ }
2342
+
2343
+ if numHtlcs < expected {
2344
+ return fmt .Errorf ("expected %v HTLCs, got %v, %v" ,
2345
+ expected , numHtlcs ,
2346
+ toProtoJSON (t , listChansResp ))
2347
+ }
2348
+
2349
+ return nil
2350
+ }, defaultTimeout )
2351
+ require .NoError (t , err )
2352
+ }
2353
+
2354
+ type subscribeEventsClient = routerrpc.Router_SubscribeHtlcEventsClient
2355
+
2356
+ type htlcEventConfig struct {
2357
+ timeout time.Duration
2358
+ numEvents int
2359
+ withLinkFailure bool
2360
+ withForwardFailure bool
2361
+ withFailureDetail routerrpc.FailureDetail
2362
+ }
2363
+
2364
+ func defaultHtlcEventConfig () * htlcEventConfig {
2365
+ return & htlcEventConfig {
2366
+ timeout : defaultTimeout ,
2367
+ }
2368
+ }
2369
+
2370
+ type htlcEventOpt func (* htlcEventConfig )
2371
+
2372
+ func withTimeout (timeout time.Duration ) htlcEventOpt {
2373
+ return func (config * htlcEventConfig ) {
2374
+ config .timeout = timeout
2375
+ }
2376
+ }
2377
+
2378
+ func withNumEvents (numEvents int ) htlcEventOpt {
2379
+ return func (config * htlcEventConfig ) {
2380
+ config .numEvents = numEvents
2381
+ }
2382
+ }
2383
+
2384
+ func withLinkFailure (detail routerrpc.FailureDetail ) htlcEventOpt {
2385
+ return func (config * htlcEventConfig ) {
2386
+ config .withLinkFailure = true
2387
+ config .withFailureDetail = detail
2388
+ }
2389
+ }
2390
+
2391
+ func withForwardFailure () htlcEventOpt {
2392
+ return func (config * htlcEventConfig ) {
2393
+ config .withForwardFailure = true
2394
+ }
2395
+ }
2396
+
2397
+ func assertHtlcEvents (t * testing.T , c subscribeEventsClient ,
2398
+ opts ... htlcEventOpt ) {
2399
+
2400
+ t .Helper ()
2401
+
2402
+ cfg := defaultHtlcEventConfig ()
2403
+ for _ , opt := range opts {
2404
+ opt (cfg )
2405
+ }
2406
+
2407
+ timeout := time .After (cfg .timeout )
2408
+ events := make (chan * routerrpc.HtlcEvent )
2409
+
2410
+ go func () {
2411
+ defer close (events )
2412
+
2413
+ for {
2414
+ evt , err := c .Recv ()
2415
+ if err != nil {
2416
+ t .Logf ("Received HTLC event error: %v" , err )
2417
+ return
2418
+ }
2419
+
2420
+ select {
2421
+ case events <- evt :
2422
+ case <- timeout :
2423
+ t .Logf ("Htlc event receive timeout" )
2424
+ return
2425
+ }
2426
+ }
2427
+ }()
2428
+
2429
+ var numEvents int
2430
+ for {
2431
+ type (
2432
+ linkFailEvent = * routerrpc.HtlcEvent_LinkFailEvent
2433
+ forwardFailEvent = * routerrpc.HtlcEvent_ForwardFailEvent
2434
+ )
2435
+
2436
+ select {
2437
+ case evt , ok := <- events :
2438
+ if ! ok {
2439
+ t .Fatalf ("Htlc event stream closed" )
2440
+ return
2441
+ }
2442
+
2443
+ if cfg .withLinkFailure {
2444
+ linkEvent , ok := evt .Event .(linkFailEvent )
2445
+ if ! ok {
2446
+ // We only count link failure events.
2447
+ continue
2448
+ }
2449
+
2450
+ if linkEvent .LinkFailEvent .FailureDetail !=
2451
+ cfg .withFailureDetail {
2452
+
2453
+ continue
2454
+ }
2455
+ }
2456
+
2457
+ if cfg .withForwardFailure {
2458
+ _ , ok := evt .Event .(forwardFailEvent )
2459
+ if ! ok {
2460
+ // We only count link failure events.
2461
+ continue
2462
+ }
2463
+ }
2464
+
2465
+ numEvents ++
2466
+
2467
+ if numEvents == cfg .numEvents {
2468
+ return
2469
+ }
2470
+
2471
+ case <- timeout :
2472
+ t .Fatalf ("Htlc event receive timeout" )
2473
+ return
2474
+ }
2475
+ }
2476
+ }
2477
+
2295
2478
func assertNumHtlcs (t * testing.T , node * HarnessNode , expected int ) {
2296
2479
t .Helper ()
2297
2480
0 commit comments