Skip to content

Commit e3549c3

Browse files
committed
historical_uptime: add missing observations related bigtable functions
Signed-off-by: bingyuyap <bingyu.yap.21@gmail.com> historical_uptime: test getMessage Signed-off-by: bingyuyap <bingyu.yap.21@gmail.com> historical_uptime: message index to filter checked messages Signed-off-by: bingyuyap <bingyu.yap.21@gmail.com> historical_uptime: use unique id to prevent test conflicts Signed-off-by: bingyuyap <bingyu.yap.21@gmail.com> historical_uptime: fix observation saving function Signed-off-by: bingyuyap <bingyu.yap.21@gmail.com> historical_uptime: cleanup Signed-off-by: bingyuyap <bingyu.yap.21@gmail.com> historical_uptime: cleanup and comments Signed-off-by: bingyuyap <bingyu.yap.21@gmail.com>
1 parent 2434f4c commit e3549c3

File tree

3 files changed

+499
-2
lines changed

3 files changed

+499
-2
lines changed

fly/pkg/bigtable/message.go

+258
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,258 @@
1+
package bigtable
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"strconv"
7+
"time"
8+
9+
"cloud.google.com/go/bigtable"
10+
"google.golang.org/api/option"
11+
)
12+
13+
// GuardianObservation represents an observation made by a guardian.
14+
type Observation struct {
15+
MessageID MessageID `json:"messageId"`
16+
GuardianAddr string `json:"guardianAddr"`
17+
Signature string `json:"signature"`
18+
ObservedAt time.Time `json:"observedAt"`
19+
Status ObservationStatus `json:"status"`
20+
}
21+
22+
type ObservationStatus int
23+
24+
// Message represents the data structure for a message in the Observations table.
25+
type Message struct {
26+
MessageID MessageID `json:"messageId"`
27+
LastObservedAt time.Time `json:"lastObservedAt"`
28+
MetricsChecked bool `json:"metricsChecked"`
29+
}
30+
31+
type MessageID string
32+
33+
type BigtableDB struct {
34+
client *bigtable.Client
35+
}
36+
37+
const (
38+
MessageTableName = "historical_uptime_messages"
39+
ObservationTableName = "historical_uptime_observations"
40+
MessageIndexTableName = "historical_uptime_message_index"
41+
)
42+
43+
func NewBigtableDB(ctx context.Context, projectID, instanceID, credentialsFile, emulatorHost string) (*BigtableDB, error) {
44+
var client *bigtable.Client
45+
var err error
46+
47+
if credentialsFile != "" {
48+
client, err = bigtable.NewClient(ctx, projectID, instanceID, option.WithCredentialsFile(credentialsFile))
49+
} else if emulatorHost != "" {
50+
client, err = bigtable.NewClient(ctx, projectID, instanceID, option.WithoutAuthentication(), option.WithEndpoint(emulatorHost))
51+
} else {
52+
client, err = bigtable.NewClient(ctx, projectID, instanceID)
53+
}
54+
if err != nil {
55+
return nil, fmt.Errorf("failed to create Bigtable client: %v", err)
56+
}
57+
58+
db := &BigtableDB{
59+
client: client,
60+
}
61+
62+
return db, nil
63+
}
64+
65+
func (db *BigtableDB) Close() error {
66+
return db.client.Close()
67+
}
68+
69+
// SaveMessage saves the message to the `messages` table.
70+
// It also saves the message index to the `messageIndex` table. This is used to keep track of messages that are not processed.
71+
func (db *BigtableDB) SaveMessage(ctx context.Context, message *Message) error {
72+
tableName := MessageTableName
73+
columnFamily := "messageData"
74+
75+
rowKey := string(message.MessageID)
76+
lastObservedAtBytes, err := message.LastObservedAt.MarshalBinary()
77+
if err != nil {
78+
return fmt.Errorf("failed to marshal LastObservedAt: %v", err)
79+
}
80+
81+
mut := bigtable.NewMutation()
82+
mut.Set(columnFamily, "lastObservedAt", bigtable.Now(), lastObservedAtBytes)
83+
mut.Set(columnFamily, "metricsChecked", bigtable.Now(), []byte(strconv.FormatBool(message.MetricsChecked)))
84+
85+
err = db.client.Open(tableName).Apply(ctx, rowKey, mut)
86+
if err != nil {
87+
return fmt.Errorf("failed to save message: %v", err)
88+
}
89+
90+
err = db.SaveMessageIndex(ctx, message.MessageID)
91+
if err != nil {
92+
return fmt.Errorf("failed to save message index: %v", err)
93+
}
94+
95+
return nil
96+
}
97+
98+
// MessageIndex is used to keep track of messages that are not processed (i.e. missing observation is not accounted for)
99+
// This index is to reduce the data scanned when querying from the `messages` table to process.
100+
// We might want to consider adding `lastObservedAt` to the index to further reduce the data scanned. A tradeoff is that
101+
// we need to update the index whenever the `lastObservedAt` is updated, which is whenever a new observation is added.
102+
// This could be a performance hit if we have a lot of observations.
103+
func (db *BigtableDB) SaveMessageIndex(ctx context.Context, messageID MessageID) error {
104+
tableName := MessageIndexTableName
105+
columnFamily := "indexData"
106+
107+
rowKey := string(messageID)
108+
109+
mut := bigtable.NewMutation()
110+
// bigtable doesn't allow empty mutations, so we need to set a placeholder value
111+
mut.Set(columnFamily, "placeholder", bigtable.Now(), nil)
112+
err := db.client.Open(tableName).Apply(ctx, rowKey, mut)
113+
if err != nil {
114+
return fmt.Errorf("failed to save message index: %v", err)
115+
}
116+
117+
return nil
118+
}
119+
120+
// DeleteMessageIndex deletes the message index for the given messageID. This is used when the message is processed and
121+
// missing observation is accounted for.
122+
func (db *BigtableDB) DeleteMessageIndex(ctx context.Context, messageID MessageID) error {
123+
tableName := MessageIndexTableName
124+
125+
rowKey := string(messageID)
126+
127+
mut := bigtable.NewMutation()
128+
mut.DeleteRow()
129+
130+
err := db.client.Open(tableName).Apply(ctx, rowKey, mut)
131+
if err != nil {
132+
return fmt.Errorf("failed to delete message index: %v", err)
133+
}
134+
135+
return nil
136+
}
137+
138+
func (db *BigtableDB) GetMessage(ctx context.Context, messageID MessageID) (*Message, error) {
139+
tableName := MessageTableName
140+
rowKey := string(messageID)
141+
142+
table := db.client.Open(tableName)
143+
row, err := table.ReadRow(ctx, rowKey)
144+
if err != nil {
145+
return nil, fmt.Errorf("failed to read message: %v", err)
146+
}
147+
148+
if len(row) == 0 {
149+
return nil, fmt.Errorf("message not found: %s", messageID)
150+
}
151+
152+
var message Message
153+
message.MessageID = messageID
154+
for _, item := range row["messageData"] {
155+
switch item.Column {
156+
case "messageData:lastObservedAt":
157+
var t time.Time
158+
if err := t.UnmarshalBinary(item.Value); err != nil {
159+
return nil, fmt.Errorf("failed to unmarshal LastObservedAt: %v", err)
160+
}
161+
message.LastObservedAt = t
162+
case "messageData:metricsChecked":
163+
metricsChecked, err := strconv.ParseBool(string(item.Value))
164+
if err != nil {
165+
return nil, fmt.Errorf("failed to parse MetricsChecked: %v", err)
166+
}
167+
message.MetricsChecked = metricsChecked
168+
}
169+
}
170+
171+
return &message, nil
172+
}
173+
174+
// SaveObservationAndUpdateMessage saves the observation only if it doesn't already exist.
175+
// It also updates the lastObservedAt of the message.
176+
func (db *BigtableDB) SaveObservationAndUpdateMessage(ctx context.Context, observation *Observation) error {
177+
tableName := ObservationTableName
178+
columnFamily := "observationData"
179+
180+
rowKey := string(observation.MessageID) + "_" + observation.GuardianAddr
181+
182+
// First, check if the observation already exists
183+
table := db.client.Open(tableName)
184+
row, err := table.ReadRow(ctx, rowKey)
185+
if err != nil {
186+
return fmt.Errorf("failed to read observation: %v", err)
187+
}
188+
189+
// If the observation already exists, return without updating
190+
if len(row) > 0 {
191+
return nil
192+
}
193+
194+
timeBinary, err := observation.ObservedAt.MarshalBinary()
195+
if err != nil {
196+
return fmt.Errorf("failed to marshal ObservedAt: %v", err)
197+
}
198+
199+
mut := bigtable.NewMutation()
200+
mut.Set(columnFamily, "signature", bigtable.Now(), []byte(observation.Signature))
201+
mut.Set(columnFamily, "observedAt", bigtable.Now(), timeBinary)
202+
mut.Set(columnFamily, "status", bigtable.Now(), []byte(strconv.Itoa(int(observation.Status))))
203+
204+
err = db.client.Open(tableName).Apply(ctx, rowKey, mut)
205+
if err != nil {
206+
return fmt.Errorf("failed to save observation: %v", err)
207+
}
208+
209+
messageRowKey := string(observation.MessageID)
210+
messageMut := bigtable.NewMutation()
211+
messageMut.Set("messageData", "lastObservedAt", bigtable.Now(), timeBinary)
212+
213+
err = db.client.Open(MessageTableName).Apply(ctx, messageRowKey, messageMut)
214+
if err != nil {
215+
return fmt.Errorf("failed to update message: %v", err)
216+
}
217+
218+
return nil
219+
}
220+
221+
func (db *BigtableDB) GetObservation(ctx context.Context, messageID, guardianAddr string) (*Observation, error) {
222+
tableName := ObservationTableName
223+
rowKey := messageID + "_" + guardianAddr
224+
225+
table := db.client.Open(tableName)
226+
row, err := table.ReadRow(ctx, rowKey)
227+
if err != nil {
228+
return nil, fmt.Errorf("failed to read observation: %v", err)
229+
}
230+
231+
if len(row) == 0 {
232+
return nil, fmt.Errorf("observation not found: %s", rowKey)
233+
}
234+
235+
var observation Observation
236+
observation.MessageID = MessageID(messageID)
237+
observation.GuardianAddr = guardianAddr
238+
for _, item := range row["observationData"] {
239+
switch item.Column {
240+
case "observationData:signature":
241+
observation.Signature = string(item.Value)
242+
case "observationData:observedAt":
243+
var t time.Time
244+
if err := t.UnmarshalBinary(item.Value); err != nil {
245+
return nil, fmt.Errorf("failed to unmarshal LastObservedAt: %v", err)
246+
}
247+
observation.ObservedAt = t
248+
case "observationData:status":
249+
status, err := strconv.Atoi(string(item.Value))
250+
if err != nil {
251+
return nil, fmt.Errorf("failed to parse Status: %v", err)
252+
}
253+
observation.Status = ObservationStatus(status)
254+
}
255+
}
256+
257+
return &observation, nil
258+
}

0 commit comments

Comments
 (0)