-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: adds support for migrations #14
Changes from all commits
2a90304
b6e5f2b
2f6cbe6
5d643ca
844e47a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
// Copyright 2024 Redpanda Data, Inc. | ||
// | ||
// Use of this software is governed by the Business Source License | ||
// included in the file licenses/BSL.md | ||
// | ||
// As of the Change Date specified in that file, in accordance with | ||
// the Business Source License, use of this software will be governed | ||
// by the Apache License, Version 2.0 | ||
|
||
package rpadmin | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"net/http" | ||
"reflect" | ||
) | ||
|
||
const ( | ||
baseMigrationEndpoint = "/v1/migrations/" | ||
) | ||
|
||
// AddMigration adds a migration to the cluster. It accepts one of InboundMigration or OutboundMigration. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you document what is the integer in the response means to the end user? |
||
func (a *AdminAPI) AddMigration(ctx context.Context, migration any) (int, error) { | ||
migrationType := reflect.TypeOf(migration) | ||
Comment on lines
+23
to
+25
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To avoid having |
||
if migrationType != reflect.TypeOf(InboundMigration{}) && migrationType != reflect.TypeOf(OutboundMigration{}) { | ||
return 0, fmt.Errorf("invalid migration type: must be either InboundMigration or OutboundMigration") | ||
} | ||
|
||
var response AddMigrationResponse | ||
if err := a.sendOne(ctx, http.MethodPut, baseMigrationEndpoint, migration, &response, false); err != nil { | ||
return -1, err | ||
} | ||
return response.ID, nil | ||
Comment on lines
+30
to
+34
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why return the |
||
} | ||
|
||
// GetMigration gets a migration by its ID. | ||
func (a *AdminAPI) GetMigration(ctx context.Context, id int) (MigrationState, error) { | ||
var response MigrationState | ||
err := a.sendOne(ctx, http.MethodGet, fmt.Sprintf("baseMigrationEndpoint%d", id), nil, &response, false) | ||
return response, err | ||
} | ||
|
||
// ListMigrations returns a list of all migrations in the cluster. | ||
func (a *AdminAPI) ListMigrations(ctx context.Context) ([]MigrationState, error) { | ||
var response []MigrationState | ||
err := a.sendAny(ctx, http.MethodGet, baseMigrationEndpoint, nil, &response) | ||
return response, err | ||
} | ||
|
||
// DeleteMigration deletes a migration by its ID. | ||
func (a *AdminAPI) DeleteMigration(ctx context.Context, id int) error { | ||
return a.sendAny(ctx, http.MethodDelete, fmt.Sprintf("baseMigrationEndpoint%d", id), nil, nil) | ||
} | ||
|
||
// ExecuteMigration executes a specific action on a migration identified by its ID. The action must be one of: | ||
// prepare, execute, finish, cancel. | ||
func (a *AdminAPI) ExecuteMigration(ctx context.Context, id int, action string) error { | ||
validActions := map[string]bool{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should this be just a single var declared outside of the method? Also minor nit / optimization might be to use the empty struct for the values as it caries var validActions = map[string]struct{}{
"prepare": struct{}{},
"execute": struct{}{},
"finish": struct{}{},
"cancel": struct{}{},
} and then the check can be something like: if _, valid := validActions[action]; !valid {
// ...
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's only used in one place so I felt like that was the right place to declare it. I can change it if needed There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yup tradeoffs. I don't feel strongly about it either way. |
||
"prepare": true, | ||
"execute": true, | ||
"finish": true, | ||
"cancel": true, | ||
} | ||
if !validActions[action] { | ||
return fmt.Errorf("invalid action: %s. Must be one of: prepare, execute, finish, cancel", action) | ||
} | ||
return a.sendAny(ctx, http.MethodPost, fmt.Sprintf("%s%d?action=%s", baseMigrationEndpoint, id, action), nil, nil) | ||
} | ||
|
||
// OutboundMigration represents an outbound migration request | ||
type OutboundMigration struct { | ||
MigrationType string `json:"migration_type"` | ||
Topics []Topic `json:"topics"` | ||
ConsumerGroups []string `json:"consumer_groups"` | ||
} | ||
|
||
// InboundMigration represents an inbound migration configuration | ||
type InboundMigration struct { | ||
MigrationType string `json:"migration_type"` | ||
Topics []InboundTopic `json:"topics"` | ||
ConsumerGroups []string `json:"consumer_groups"` | ||
} | ||
|
||
// InboundTopic represents an inbound migration topic | ||
type InboundTopic struct { | ||
SourceTopic Topic `json:"source_topic"` | ||
Alias *Topic `json:"alias,omitempty"` | ||
Location string `json:"location,omitempty"` | ||
} | ||
|
||
// MigrationState represents the state of a migration | ||
type MigrationState struct { | ||
ID int `json:"id"` | ||
State string `json:"state"` | ||
Migration Migration `json:"migration"` | ||
} | ||
|
||
// Migration represents a migration | ||
type Migration struct { | ||
MigrationType string `json:"migration_type"` | ||
Topics []Topic `json:"topics"` | ||
} | ||
|
||
// Topic represents a namespaced topic | ||
type Topic struct { | ||
Topic string `json:"topic"` | ||
Namespace string `json:"ns"` | ||
} | ||
|
||
// AddMigrationResponse is the response from adding a migration | ||
type AddMigrationResponse struct { | ||
ID int `json:"id"` | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,221 @@ | ||
// Copyright 2024 Redpanda Data, Inc. | ||
// | ||
// Use of this software is governed by the Business Source License | ||
// included in the file licenses/BSL.md | ||
// | ||
// As of the Change Date specified in that file, in accordance with | ||
// the Business Source License, use of this software will be governed | ||
// by the Apache License, Version 2.0 | ||
|
||
package rpadmin | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"net/http" | ||
"net/http/httptest" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
func TestAddMigration(t *testing.T) { | ||
type testCase struct { | ||
name string | ||
testFn func(t *testing.T) http.HandlerFunc | ||
input any | ||
expID int | ||
expError bool | ||
} | ||
|
||
successfulAddResponse := AddMigrationResponse{ | ||
ID: 123, | ||
} | ||
|
||
runTest := func(t *testing.T, test testCase) { | ||
server := httptest.NewServer(test.testFn(t)) | ||
defer server.Close() | ||
|
||
client, err := NewAdminAPI([]string{server.URL}, new(NopAuth), nil) | ||
assert.NoError(t, err) | ||
|
||
id, err := client.AddMigration(context.Background(), test.input) | ||
|
||
if test.expError { | ||
assert.Error(t, err) | ||
assert.Equal(t, test.expID, id) // should be -1 for error | ||
} else { | ||
assert.NoError(t, err) | ||
assert.Equal(t, test.expID, id) | ||
} | ||
} | ||
|
||
tests := []testCase{ | ||
{ | ||
name: "should add outbound migration successfully", | ||
testFn: func(t *testing.T) http.HandlerFunc { | ||
return func(w http.ResponseWriter, r *http.Request) { | ||
assert.Equal(t, "/v1/migrations/", r.URL.Path) | ||
assert.Equal(t, http.MethodPut, r.Method) | ||
|
||
var migration OutboundMigration | ||
err := json.NewDecoder(r.Body).Decode(&migration) | ||
assert.NoError(t, err) | ||
assert.Equal(t, "outbound", migration.MigrationType) | ||
|
||
w.WriteHeader(http.StatusOK) | ||
resp, err := json.Marshal(successfulAddResponse) | ||
assert.NoError(t, err) | ||
w.Write(resp) | ||
} | ||
}, | ||
input: OutboundMigration{ | ||
MigrationType: "outbound", | ||
Topics: []Topic{{Topic: "test-topic"}}, | ||
ConsumerGroups: []string{"test-group"}, | ||
}, | ||
expID: 123, | ||
}, | ||
{ | ||
name: "should add inbound migration successfully", | ||
testFn: func(t *testing.T) http.HandlerFunc { | ||
return func(w http.ResponseWriter, r *http.Request) { | ||
assert.Equal(t, "/v1/migrations/", r.URL.Path) | ||
assert.Equal(t, http.MethodPut, r.Method) | ||
|
||
var migration InboundMigration | ||
err := json.NewDecoder(r.Body).Decode(&migration) | ||
assert.NoError(t, err) | ||
assert.Equal(t, "inbound", migration.MigrationType) | ||
|
||
w.WriteHeader(http.StatusOK) | ||
resp, err := json.Marshal(successfulAddResponse) | ||
assert.NoError(t, err) | ||
w.Write(resp) | ||
} | ||
}, | ||
input: InboundMigration{ | ||
MigrationType: "inbound", | ||
Topics: []InboundTopic{{SourceTopic: Topic{Topic: "test-topic"}}}, | ||
ConsumerGroups: []string{"test-group"}, | ||
}, | ||
expID: 123, | ||
}, | ||
{ | ||
name: "should return error for invalid migration type", | ||
testFn: func(t *testing.T) http.HandlerFunc { | ||
return func(_ http.ResponseWriter, _ *http.Request) { | ||
t.Fatal("Server should not be called for invalid migration type") | ||
} | ||
}, | ||
input: struct{ MigrationType string }{MigrationType: "invalid"}, | ||
expError: true, | ||
}, | ||
{ | ||
name: "should not panic on nil response with error", | ||
testFn: func(_ *testing.T) http.HandlerFunc { | ||
return func(w http.ResponseWriter, _ *http.Request) { | ||
w.WriteHeader(http.StatusInternalServerError) | ||
w.Write([]byte("Internal Server Error")) | ||
} | ||
}, | ||
input: OutboundMigration{ | ||
MigrationType: "outbound", | ||
Topics: []Topic{{Topic: "test-topic"}}, | ||
ConsumerGroups: []string{"test-group"}, | ||
}, | ||
expError: true, | ||
expID: -1, | ||
}, | ||
} | ||
|
||
for _, test := range tests { | ||
t.Run(test.name, func(t *testing.T) { | ||
runTest(t, test) | ||
}) | ||
} | ||
} | ||
|
||
func TestExecuteMigration(t *testing.T) { | ||
type testCase struct { | ||
name string | ||
testFn func(t *testing.T) http.HandlerFunc | ||
id int | ||
action string | ||
expError bool | ||
} | ||
|
||
runTest := func(t *testing.T, test testCase) { | ||
server := httptest.NewServer(test.testFn(t)) | ||
defer server.Close() | ||
|
||
client, err := NewAdminAPI([]string{server.URL}, new(NopAuth), nil) | ||
assert.NoError(t, err) | ||
|
||
err = client.ExecuteMigration(context.Background(), test.id, test.action) | ||
|
||
if test.expError { | ||
assert.Error(t, err) | ||
} else { | ||
assert.NoError(t, err) | ||
} | ||
} | ||
|
||
tests := []testCase{ | ||
{ | ||
name: "should execute migration action successfully", | ||
testFn: func(t *testing.T) http.HandlerFunc { | ||
return func(w http.ResponseWriter, r *http.Request) { | ||
assert.Equal(t, "/v1/migrations/123", r.URL.Path) | ||
assert.Equal(t, http.MethodPost, r.Method) | ||
assert.Equal(t, "prepare", r.URL.Query().Get("action")) | ||
|
||
w.WriteHeader(http.StatusOK) | ||
} | ||
}, | ||
id: 123, | ||
action: "prepare", | ||
}, | ||
{ | ||
name: "should return error for invalid action", | ||
testFn: func(t *testing.T) http.HandlerFunc { | ||
return func(_ http.ResponseWriter, _ *http.Request) { | ||
t.Fatal("Server should not be called for invalid action") | ||
} | ||
}, | ||
id: 123, | ||
action: "invalid", | ||
expError: true, | ||
}, | ||
{ | ||
name: "should handle server error", | ||
testFn: func(_ *testing.T) http.HandlerFunc { | ||
return func(w http.ResponseWriter, _ *http.Request) { | ||
w.WriteHeader(http.StatusInternalServerError) | ||
w.Write([]byte("Internal Server Error")) | ||
} | ||
}, | ||
id: 123, | ||
action: "execute", | ||
expError: true, | ||
}, | ||
{ | ||
name: "should handle all valid actions", | ||
testFn: func(t *testing.T) http.HandlerFunc { | ||
return func(w http.ResponseWriter, r *http.Request) { | ||
action := r.URL.Query().Get("action") | ||
assert.Contains(t, []string{"prepare", "execute", "finish", "cancel"}, action) | ||
w.WriteHeader(http.StatusOK) | ||
} | ||
}, | ||
id: 123, | ||
action: "finish", // We'll test one of the valid actions here | ||
}, | ||
} | ||
|
||
for _, test := range tests { | ||
t.Run(test.name, func(t *testing.T) { | ||
runTest(t, test) | ||
}) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you confirm that https://github.com/redpanda-data/common-go/actions/runs/10511516964/job/29122660139 is not related to this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Issue started prior to this PR:
https://github.com/redpanda-data/common-go/actions/runs/10394862238/job/28835193918