-
Notifications
You must be signed in to change notification settings - Fork 83
/
Copy pathstream_decoder.h
136 lines (121 loc) · 6.36 KB
/
stream_decoder.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#pragma once
#include <functional>
#include "envoy/common/time.h"
#include "envoy/event/deferred_deletable.h"
#include "envoy/event/dispatcher.h"
#include "envoy/http/conn_pool.h"
#include "envoy/server/tracer_config.h"
#include "nighthawk/common/operation_callback.h"
#include "nighthawk/common/request_source.h"
#include "nighthawk/common/statistic.h"
#include "external/envoy/source/common/common/random_generator.h"
#include "external/envoy/source/common/http/header_map_impl.h"
#include "external/envoy/source/common/stream_info/stream_info_impl.h"
#include "external/envoy/source/common/tracing/http_tracer_impl.h"
namespace Nighthawk {
namespace Client {
class StreamDecoderCompletionCallback {
public:
virtual ~StreamDecoderCompletionCallback() = default;
virtual void onComplete(bool success, const Envoy::Http::ResponseHeaderMap& headers) PURE;
virtual void onPoolFailure(Envoy::Http::ConnectionPool::PoolFailureReason reason) PURE;
virtual void exportLatency(const uint32_t response_code, const uint64_t latency_ns) PURE;
virtual void handleResponseData(const Envoy::Buffer::Instance& response_data) PURE;
};
// TODO(oschaaf): create a StreamDecoderPool?
/**
* A self destructing response decoder that discards the response body.
*/
class StreamDecoder : public Envoy::Http::ResponseDecoder,
public Envoy::Http::StreamCallbacks,
public Envoy::Http::ConnectionPool::Callbacks,
public Envoy::Event::DeferredDeletable,
public Envoy::Logger::Loggable<Envoy::Logger::Id::main> {
public:
StreamDecoder(Envoy::Event::Dispatcher& dispatcher, Envoy::TimeSource& time_source,
StreamDecoderCompletionCallback& decoder_completion_callback,
OperationCallback caller_completion_callback, Statistic& connect_statistic,
Statistic& latency_statistic, Statistic& response_header_sizes_statistic,
Statistic& response_body_sizes_statistic, Statistic& origin_latency_statistic,
HeaderMapPtr request_headers, bool measure_latencies, uint32_t request_body_size,
Envoy::Random::RandomGenerator& random_generator,
Envoy::Tracing::TracerSharedPtr& tracer,
absl::string_view latency_response_header_name)
: dispatcher_(dispatcher), time_source_(time_source),
decoder_completion_callback_(decoder_completion_callback),
caller_completion_callback_(std::move(caller_completion_callback)),
connect_statistic_(connect_statistic), latency_statistic_(latency_statistic),
response_header_sizes_statistic_(response_header_sizes_statistic),
response_body_sizes_statistic_(response_body_sizes_statistic),
origin_latency_statistic_(origin_latency_statistic),
request_headers_(std::move(request_headers)), connect_start_(time_source_.monotonicTime()),
measure_latencies_(measure_latencies), request_body_size_(request_body_size),
downstream_address_setter_(std::make_shared<Envoy::Network::ConnectionInfoSetterImpl>(
// The two addresses aren't used in an execution of Nighthawk.
/* downstream_local_address = */ nullptr, /* downstream_remote_address = */ nullptr)),
stream_info_(time_source_, downstream_address_setter_,
Envoy::StreamInfo::FilterState::LifeSpan::FilterChain),
random_generator_(random_generator), tracer_(tracer),
latency_response_header_name_(latency_response_header_name) {
if (measure_latencies_ && tracer_ != nullptr) {
setupForTracing();
}
stream_info_.setUpstreamInfo(std::make_shared<Envoy::StreamInfo::UpstreamInfoImpl>());
}
// Http::StreamDecoder
void decode1xxHeaders(Envoy::Http::ResponseHeaderMapPtr&&) override {}
void decodeHeaders(Envoy::Http::ResponseHeaderMapPtr&& headers, bool end_stream) override;
void decodeData(Envoy::Buffer::Instance&, bool end_stream) override;
void decodeTrailers(Envoy::Http::ResponseTrailerMapPtr&& trailers) override;
void decodeMetadata(Envoy::Http::MetadataMapPtr&&) override { PANIC("not implemented"); }
void dumpState(std::ostream&, int) const override { PANIC("not implemented"); }
// Http::StreamCallbacks
void onResetStream(Envoy::Http::StreamResetReason reason,
absl::string_view transport_failure_reason) override;
void onAboveWriteBufferHighWatermark() override {}
void onBelowWriteBufferLowWatermark() override {}
// ConnectionPool::Callbacks
void onPoolFailure(Envoy::Http::ConnectionPool::PoolFailureReason reason,
absl::string_view transport_failure_reason,
Envoy::Upstream::HostDescriptionConstSharedPtr host) override;
void onPoolReady(Envoy::Http::RequestEncoder& encoder,
Envoy::Upstream::HostDescriptionConstSharedPtr host,
Envoy::StreamInfo::StreamInfo& stream_info,
absl::optional<Envoy::Http::Protocol> protocol) override;
static Envoy::StreamInfo::CoreResponseFlag
streamResetReasonToResponseFlag(Envoy::Http::StreamResetReason reset_reason);
void finalizeActiveSpan();
void setupForTracing();
private:
void onComplete(bool success);
static const std::string& staticUploadContent() {
static const auto s = new std::string(4194304, 'a');
return *s;
}
Envoy::Event::Dispatcher& dispatcher_;
Envoy::TimeSource& time_source_;
StreamDecoderCompletionCallback& decoder_completion_callback_;
OperationCallback caller_completion_callback_;
Statistic& connect_statistic_;
Statistic& latency_statistic_;
Statistic& response_header_sizes_statistic_;
Statistic& response_body_sizes_statistic_;
Statistic& origin_latency_statistic_;
HeaderMapPtr request_headers_;
Envoy::Http::ResponseHeaderMapPtr response_headers_;
Envoy::Http::ResponseTrailerMapPtr trailer_headers_;
const Envoy::MonotonicTime connect_start_;
Envoy::MonotonicTime request_start_;
bool complete_ = false;
bool measure_latencies_;
const uint32_t request_body_size_;
Envoy::Tracing::EgressConfigImpl config_;
std::shared_ptr<Envoy::Network::ConnectionInfoSetterImpl> downstream_address_setter_;
Envoy::StreamInfo::StreamInfoImpl stream_info_;
Envoy::Random::RandomGenerator& random_generator_;
Envoy::Tracing::TracerSharedPtr& tracer_;
Envoy::Tracing::SpanPtr active_span_;
const std::string latency_response_header_name_;
};
} // namespace Client
} // namespace Nighthawk