Skip to content

Commit 75b749e

Browse files
committed
historical_uptime: add missing observations related bigtable functions
Signed-off-by: bingyuyap <bingyu.yap.21@gmail.com> historical_uptime: test getMessage Signed-off-by: bingyuyap <bingyu.yap.21@gmail.com> historical_uptime: message index to filter checked messages Signed-off-by: bingyuyap <bingyu.yap.21@gmail.com> historical_uptime: use unique id to prevent test conflicts Signed-off-by: bingyuyap <bingyu.yap.21@gmail.com> historical_uptime: fix observation saving function Signed-off-by: bingyuyap <bingyu.yap.21@gmail.com> historical_uptime: cleanup Signed-off-by: bingyuyap <bingyu.yap.21@gmail.com> historical_uptime: cleanup and comments Signed-off-by: bingyuyap <bingyu.yap.21@gmail.com>
1 parent 2434f4c commit 75b749e

File tree

3 files changed

+494
-2
lines changed

3 files changed

+494
-2
lines changed

fly/pkg/bigtable/message.go

+253
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,253 @@
1+
package bigtable
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"strconv"
7+
"time"
8+
9+
"cloud.google.com/go/bigtable"
10+
"google.golang.org/api/option"
11+
)
12+
13+
// GuardianObservation represents an observation made by a guardian.
14+
type Observation struct {
15+
MessageID MessageID `json:"messageId"`
16+
GuardianAddr string `json:"guardianAddr"`
17+
Signature string `json:"signature"`
18+
ObservedAt time.Time `json:"observedAt"`
19+
Status ObservationStatus `json:"status"`
20+
}
21+
22+
type ObservationStatus int
23+
24+
// Message represents the data structure for a message in the Observations table.
25+
type Message struct {
26+
MessageID MessageID `json:"messageId"`
27+
LastObservedAt time.Time `json:"lastObservedAt"`
28+
MetricsChecked bool `json:"metricsChecked"`
29+
}
30+
31+
type MessageID string
32+
33+
type BigtableDB struct {
34+
client *bigtable.Client
35+
}
36+
37+
func NewBigtableDB(ctx context.Context, projectID, instanceID, credentialsFile, emulatorHost string) (*BigtableDB, error) {
38+
var client *bigtable.Client
39+
var err error
40+
41+
if credentialsFile != "" {
42+
client, err = bigtable.NewClient(ctx, projectID, instanceID, option.WithCredentialsFile(credentialsFile))
43+
} else if emulatorHost != "" {
44+
client, err = bigtable.NewClient(ctx, projectID, instanceID, option.WithoutAuthentication(), option.WithEndpoint(emulatorHost))
45+
} else {
46+
client, err = bigtable.NewClient(ctx, projectID, instanceID)
47+
}
48+
if err != nil {
49+
return nil, fmt.Errorf("failed to create Bigtable client: %v", err)
50+
}
51+
52+
db := &BigtableDB{
53+
client: client,
54+
}
55+
56+
return db, nil
57+
}
58+
59+
func (db *BigtableDB) Close() error {
60+
return db.client.Close()
61+
}
62+
63+
// SaveMessage saves the message to the `messages` table.
64+
// It also saves the message index to the `messageIndex` table. This is used to keep track of messages that are not processed.
65+
func (db *BigtableDB) SaveMessage(ctx context.Context, message *Message) error {
66+
tableName := "messages"
67+
columnFamily := "messageData"
68+
69+
rowKey := string(message.MessageID)
70+
lastObservedAtBytes, err := message.LastObservedAt.MarshalBinary()
71+
if err != nil {
72+
return fmt.Errorf("failed to marshal LastObservedAt: %v", err)
73+
}
74+
75+
mut := bigtable.NewMutation()
76+
mut.Set(columnFamily, "lastObservedAt", bigtable.Now(), lastObservedAtBytes)
77+
mut.Set(columnFamily, "metricsChecked", bigtable.Now(), []byte(strconv.FormatBool(message.MetricsChecked)))
78+
79+
err = db.client.Open(tableName).Apply(ctx, rowKey, mut)
80+
if err != nil {
81+
return fmt.Errorf("failed to save message: %v", err)
82+
}
83+
84+
err = db.SaveMessageIndex(ctx, message.MessageID)
85+
if err != nil {
86+
return fmt.Errorf("failed to save message index: %v", err)
87+
}
88+
89+
return nil
90+
}
91+
92+
// MessageIndex is used to keep track of messages that are not processed (i.e. missing observation is not accounted for)
93+
// This index is to reduce the data scanned when querying from the `messages`` table to process.
94+
// We might want to consider adding lastObservedAt to the index to further reduce the data scanned. A tradeoff is that
95+
// we need to update the index whenever the lastObservedAt is updated, which is whenever a new observation is added.
96+
// This could be a performance hit if we have a lot of observations.
97+
func (db *BigtableDB) SaveMessageIndex(ctx context.Context, messageID MessageID) error {
98+
tableName := "messageIndex"
99+
columnFamily := "indexData"
100+
101+
rowKey := string(messageID)
102+
103+
mut := bigtable.NewMutation()
104+
// bigtable doesn't allow empty mutations, so we need to set a placeholder value
105+
mut.Set(columnFamily, "placeholder", bigtable.Now(), nil)
106+
err := db.client.Open(tableName).Apply(ctx, rowKey, mut)
107+
if err != nil {
108+
return fmt.Errorf("failed to save message index: %v", err)
109+
}
110+
111+
return nil
112+
}
113+
114+
// DeleteMessageIndex deletes the message index for the given messageID. This is used when the message is processed and
115+
// missing observation is accounted for.
116+
func (db *BigtableDB) DeleteMessageIndex(ctx context.Context, messageID MessageID) error {
117+
tableName := "messageIndex"
118+
119+
rowKey := string(messageID)
120+
121+
mut := bigtable.NewMutation()
122+
mut.DeleteRow()
123+
124+
err := db.client.Open(tableName).Apply(ctx, rowKey, mut)
125+
if err != nil {
126+
return fmt.Errorf("failed to delete message index: %v", err)
127+
}
128+
129+
return nil
130+
}
131+
132+
func (db *BigtableDB) GetMessage(ctx context.Context, messageID MessageID) (*Message, error) {
133+
tableName := "messages"
134+
rowKey := string(messageID)
135+
136+
table := db.client.Open(tableName)
137+
row, err := table.ReadRow(ctx, rowKey)
138+
if err != nil {
139+
return nil, fmt.Errorf("failed to read message: %v", err)
140+
}
141+
142+
if len(row) == 0 {
143+
return nil, fmt.Errorf("message not found: %s", messageID)
144+
}
145+
146+
var message Message
147+
message.MessageID = messageID
148+
for _, item := range row["messageData"] {
149+
switch item.Column {
150+
case "messageData:lastObservedAt":
151+
var t time.Time
152+
if err := t.UnmarshalBinary(item.Value); err != nil {
153+
return nil, fmt.Errorf("failed to unmarshal LastObservedAt: %v", err)
154+
}
155+
message.LastObservedAt = t
156+
case "messageData:metricsChecked":
157+
metricsChecked, err := strconv.ParseBool(string(item.Value))
158+
if err != nil {
159+
return nil, fmt.Errorf("failed to parse MetricsChecked: %v", err)
160+
}
161+
message.MetricsChecked = metricsChecked
162+
}
163+
}
164+
165+
return &message, nil
166+
}
167+
168+
// SaveObservationAndUpdateMessage saves the observation only if it doesn't already exist.
169+
// It also updates the lastObservedAt of the message.
170+
func (db *BigtableDB) SaveObservationAndUpdateMessage(ctx context.Context, observation *Observation) error {
171+
tableName := "observations"
172+
columnFamily := "observationData"
173+
174+
rowKey := string(observation.MessageID) + "_" + observation.GuardianAddr
175+
176+
// First, check if the observation already exists
177+
table := db.client.Open(tableName)
178+
row, err := table.ReadRow(ctx, rowKey)
179+
if err != nil {
180+
return fmt.Errorf("failed to read observation: %v", err)
181+
}
182+
183+
// If the observation already exists, return without updating
184+
if len(row) > 0 {
185+
return nil
186+
}
187+
188+
timeBinary, err := observation.ObservedAt.MarshalBinary()
189+
if err != nil {
190+
return fmt.Errorf("failed to marshal ObservedAt: %v", err)
191+
}
192+
193+
mut := bigtable.NewMutation()
194+
mut.Set(columnFamily, "signature", bigtable.Now(), []byte(observation.Signature))
195+
mut.Set(columnFamily, "observedAt", bigtable.Now(), timeBinary)
196+
mut.Set(columnFamily, "status", bigtable.Now(), []byte(strconv.Itoa(int(observation.Status))))
197+
198+
err = db.client.Open(tableName).Apply(ctx, rowKey, mut)
199+
if err != nil {
200+
return fmt.Errorf("failed to save observation: %v", err)
201+
}
202+
203+
messageRowKey := string(observation.MessageID)
204+
messageMut := bigtable.NewMutation()
205+
messageMut.Set("messageData", "lastObservedAt", bigtable.Now(), timeBinary)
206+
207+
err = db.client.Open("messages").Apply(ctx, messageRowKey, messageMut)
208+
if err != nil {
209+
return fmt.Errorf("failed to update message: %v", err)
210+
}
211+
212+
return nil
213+
}
214+
215+
func (db *BigtableDB) GetObservation(ctx context.Context, messageID, guardianAddr string) (*Observation, error) {
216+
tableName := "observations"
217+
rowKey := messageID + "_" + guardianAddr
218+
219+
table := db.client.Open(tableName)
220+
row, err := table.ReadRow(ctx, rowKey)
221+
if err != nil {
222+
return nil, fmt.Errorf("failed to read observation: %v", err)
223+
}
224+
225+
if len(row) == 0 {
226+
return nil, fmt.Errorf("observation not found: %s", rowKey)
227+
}
228+
229+
var observation Observation
230+
observation.MessageID = MessageID(messageID)
231+
observation.GuardianAddr = guardianAddr
232+
for _, item := range row["observationData"] {
233+
switch item.Column {
234+
case "observationData:signature":
235+
observation.Signature = string(item.Value)
236+
case "observationData:observedAt":
237+
// Parse the MetricsChecked boolean
238+
var t time.Time
239+
if err := t.UnmarshalBinary(item.Value); err != nil {
240+
return nil, fmt.Errorf("failed to unmarshal LastObservedAt: %v", err)
241+
}
242+
observation.ObservedAt = t
243+
case "observationData:status":
244+
status, err := strconv.Atoi(string(item.Value))
245+
if err != nil {
246+
return nil, fmt.Errorf("failed to parse Status: %v", err)
247+
}
248+
observation.Status = ObservationStatus(status)
249+
}
250+
}
251+
252+
return &observation, nil
253+
}

0 commit comments

Comments
 (0)