@@ -169,7 +169,7 @@ func (client *SingleDaxClient) startHealthChecks(cc *cluster, host hostPort) {
169
169
ctx , cfn := context .WithTimeout (aws .BackgroundContext (), 1 * time .Second )
170
170
defer cfn ()
171
171
var err error
172
- _ , err = client .endpoints (RequestOptions {MaxRetries : 2 , Context : ctx })
172
+ _ , err = client .endpoints (RequestOptions {MaxRetries : 3 , Context : ctx })
173
173
if err != nil {
174
174
cc .debugLog (fmt .Sprintf ("Health checks failed with error " + err .Error () + " for host :: " + host .host ))
175
175
cc .onHealthCheckFailed (host )
@@ -740,31 +740,41 @@ func (client *SingleDaxClient) executeWithContext(ctx aws.Context, op string, en
740
740
return err
741
741
}
742
742
if err = client .pool .setDeadline (ctx , t ); err != nil {
743
- client .pool .discard (t )
743
+ // If the error is just due to context cancelled or timeout
744
+ // then the tube is still usable because we have not written anything to tube
745
+ if err == ctx .Err () {
746
+ client .pool .put (t )
747
+ return err
748
+ }
749
+ // If we get error while setting deadline of tube
750
+ // probably something is wrong with the tube
751
+ client .pool .closeTube (t )
744
752
return err
745
753
}
746
754
747
755
if err = client .auth (t ); err != nil {
748
- client .pool .discard (t )
756
+ // Auth method writes in the tube and
757
+ // it is not guaranteed that it will be drained completely on error
758
+ client .pool .closeTube (t )
749
759
return err
750
760
}
751
761
752
762
writer := t .CborWriter ()
753
763
if err = encoder (writer ); err != nil {
754
- // Validation errors will cause pool to be discarded as there is no guarantee
764
+ // Validation errors will cause connection to be closed as there is no guarantee
755
765
// that the validation was performed before any data was written into tube
756
- client .pool .discard (t )
766
+ client .pool .closeTube (t )
757
767
return err
758
768
}
759
769
if err := writer .Flush (); err != nil {
760
- client .pool .discard (t )
770
+ client .pool .closeTube (t )
761
771
return err
762
772
}
763
773
764
774
reader := t .CborReader ()
765
775
ex , err := decodeError (reader )
766
- if err != nil { // decode or network error
767
- client .pool .discard (t )
776
+ if err != nil { // decode or network error - doesn't guarantee completely drained tube
777
+ client .pool .closeTube (t )
768
778
return err
769
779
}
770
780
if ex != nil { // user or server error
@@ -774,7 +784,8 @@ func (client *SingleDaxClient) executeWithContext(ctx aws.Context, op string, en
774
784
775
785
err = decoder (reader )
776
786
if err != nil {
777
- client .pool .discard (t )
787
+ // we are not able to completely drain tube
788
+ client .pool .closeTube (t )
778
789
} else {
779
790
client .pool .put (t )
780
791
}
@@ -809,7 +820,7 @@ func (client *SingleDaxClient) recycleTube(t tube, err error) {
809
820
if recycle {
810
821
client .pool .put (t )
811
822
} else {
812
- client .pool .discard (t )
823
+ client .pool .closeTube (t )
813
824
}
814
825
}
815
826
func (client * SingleDaxClient ) auth (t tube ) error {
0 commit comments