-
Notifications
You must be signed in to change notification settings - Fork 4
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #14 from redpanda-data/add-migrations
feat: adds support for migrations
- Loading branch information
Showing
6 changed files
with
340 additions
and
7 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
// Copyright 2024 Redpanda Data, Inc. | ||
// | ||
// Use of this software is governed by the Business Source License | ||
// included in the file licenses/BSL.md | ||
// | ||
// As of the Change Date specified in that file, in accordance with | ||
// the Business Source License, use of this software will be governed | ||
// by the Apache License, Version 2.0 | ||
|
||
package rpadmin | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"net/http" | ||
"reflect" | ||
) | ||
|
||
const ( | ||
baseMigrationEndpoint = "/v1/migrations/" | ||
) | ||
|
||
// AddMigration adds a migration to the cluster. It accepts one of InboundMigration or OutboundMigration. | ||
func (a *AdminAPI) AddMigration(ctx context.Context, migration any) (int, error) { | ||
migrationType := reflect.TypeOf(migration) | ||
if migrationType != reflect.TypeOf(InboundMigration{}) && migrationType != reflect.TypeOf(OutboundMigration{}) { | ||
return 0, fmt.Errorf("invalid migration type: must be either InboundMigration or OutboundMigration") | ||
} | ||
|
||
var response AddMigrationResponse | ||
if err := a.sendOne(ctx, http.MethodPut, baseMigrationEndpoint, migration, &response, false); err != nil { | ||
return -1, err | ||
} | ||
return response.ID, nil | ||
} | ||
|
||
// GetMigration gets a migration by its ID. | ||
func (a *AdminAPI) GetMigration(ctx context.Context, id int) (MigrationState, error) { | ||
var response MigrationState | ||
err := a.sendOne(ctx, http.MethodGet, fmt.Sprintf("baseMigrationEndpoint%d", id), nil, &response, false) | ||
return response, err | ||
} | ||
|
||
// ListMigrations returns a list of all migrations in the cluster. | ||
func (a *AdminAPI) ListMigrations(ctx context.Context) ([]MigrationState, error) { | ||
var response []MigrationState | ||
err := a.sendAny(ctx, http.MethodGet, baseMigrationEndpoint, nil, &response) | ||
return response, err | ||
} | ||
|
||
// DeleteMigration deletes a migration by its ID. | ||
func (a *AdminAPI) DeleteMigration(ctx context.Context, id int) error { | ||
return a.sendAny(ctx, http.MethodDelete, fmt.Sprintf("baseMigrationEndpoint%d", id), nil, nil) | ||
} | ||
|
||
// ExecuteMigration executes a specific action on a migration identified by its ID. The action must be one of: | ||
// prepare, execute, finish, cancel. | ||
func (a *AdminAPI) ExecuteMigration(ctx context.Context, id int, action string) error { | ||
validActions := map[string]bool{ | ||
"prepare": true, | ||
"execute": true, | ||
"finish": true, | ||
"cancel": true, | ||
} | ||
if !validActions[action] { | ||
return fmt.Errorf("invalid action: %s. Must be one of: prepare, execute, finish, cancel", action) | ||
} | ||
return a.sendAny(ctx, http.MethodPost, fmt.Sprintf("%s%d?action=%s", baseMigrationEndpoint, id, action), nil, nil) | ||
} | ||
|
||
// OutboundMigration represents an outbound migration request | ||
type OutboundMigration struct { | ||
MigrationType string `json:"migration_type"` | ||
Topics []Topic `json:"topics"` | ||
ConsumerGroups []string `json:"consumer_groups"` | ||
} | ||
|
||
// InboundMigration represents an inbound migration configuration | ||
type InboundMigration struct { | ||
MigrationType string `json:"migration_type"` | ||
Topics []InboundTopic `json:"topics"` | ||
ConsumerGroups []string `json:"consumer_groups"` | ||
} | ||
|
||
// InboundTopic represents an inbound migration topic | ||
type InboundTopic struct { | ||
SourceTopic Topic `json:"source_topic"` | ||
Alias *Topic `json:"alias,omitempty"` | ||
Location string `json:"location,omitempty"` | ||
} | ||
|
||
// MigrationState represents the state of a migration | ||
type MigrationState struct { | ||
ID int `json:"id"` | ||
State string `json:"state"` | ||
Migration Migration `json:"migration"` | ||
} | ||
|
||
// Migration represents a migration | ||
type Migration struct { | ||
MigrationType string `json:"migration_type"` | ||
Topics []Topic `json:"topics"` | ||
} | ||
|
||
// Topic represents a namespaced topic | ||
type Topic struct { | ||
Topic string `json:"topic"` | ||
Namespace string `json:"ns"` | ||
} | ||
|
||
// AddMigrationResponse is the response from adding a migration | ||
type AddMigrationResponse struct { | ||
ID int `json:"id"` | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,221 @@ | ||
// Copyright 2024 Redpanda Data, Inc. | ||
// | ||
// Use of this software is governed by the Business Source License | ||
// included in the file licenses/BSL.md | ||
// | ||
// As of the Change Date specified in that file, in accordance with | ||
// the Business Source License, use of this software will be governed | ||
// by the Apache License, Version 2.0 | ||
|
||
package rpadmin | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"net/http" | ||
"net/http/httptest" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
func TestAddMigration(t *testing.T) { | ||
type testCase struct { | ||
name string | ||
testFn func(t *testing.T) http.HandlerFunc | ||
input any | ||
expID int | ||
expError bool | ||
} | ||
|
||
successfulAddResponse := AddMigrationResponse{ | ||
ID: 123, | ||
} | ||
|
||
runTest := func(t *testing.T, test testCase) { | ||
server := httptest.NewServer(test.testFn(t)) | ||
defer server.Close() | ||
|
||
client, err := NewAdminAPI([]string{server.URL}, new(NopAuth), nil) | ||
assert.NoError(t, err) | ||
|
||
id, err := client.AddMigration(context.Background(), test.input) | ||
|
||
if test.expError { | ||
assert.Error(t, err) | ||
assert.Equal(t, test.expID, id) // should be -1 for error | ||
} else { | ||
assert.NoError(t, err) | ||
assert.Equal(t, test.expID, id) | ||
} | ||
} | ||
|
||
tests := []testCase{ | ||
{ | ||
name: "should add outbound migration successfully", | ||
testFn: func(t *testing.T) http.HandlerFunc { | ||
return func(w http.ResponseWriter, r *http.Request) { | ||
assert.Equal(t, "/v1/migrations/", r.URL.Path) | ||
assert.Equal(t, http.MethodPut, r.Method) | ||
|
||
var migration OutboundMigration | ||
err := json.NewDecoder(r.Body).Decode(&migration) | ||
assert.NoError(t, err) | ||
assert.Equal(t, "outbound", migration.MigrationType) | ||
|
||
w.WriteHeader(http.StatusOK) | ||
resp, err := json.Marshal(successfulAddResponse) | ||
assert.NoError(t, err) | ||
w.Write(resp) | ||
} | ||
}, | ||
input: OutboundMigration{ | ||
MigrationType: "outbound", | ||
Topics: []Topic{{Topic: "test-topic"}}, | ||
ConsumerGroups: []string{"test-group"}, | ||
}, | ||
expID: 123, | ||
}, | ||
{ | ||
name: "should add inbound migration successfully", | ||
testFn: func(t *testing.T) http.HandlerFunc { | ||
return func(w http.ResponseWriter, r *http.Request) { | ||
assert.Equal(t, "/v1/migrations/", r.URL.Path) | ||
assert.Equal(t, http.MethodPut, r.Method) | ||
|
||
var migration InboundMigration | ||
err := json.NewDecoder(r.Body).Decode(&migration) | ||
assert.NoError(t, err) | ||
assert.Equal(t, "inbound", migration.MigrationType) | ||
|
||
w.WriteHeader(http.StatusOK) | ||
resp, err := json.Marshal(successfulAddResponse) | ||
assert.NoError(t, err) | ||
w.Write(resp) | ||
} | ||
}, | ||
input: InboundMigration{ | ||
MigrationType: "inbound", | ||
Topics: []InboundTopic{{SourceTopic: Topic{Topic: "test-topic"}}}, | ||
ConsumerGroups: []string{"test-group"}, | ||
}, | ||
expID: 123, | ||
}, | ||
{ | ||
name: "should return error for invalid migration type", | ||
testFn: func(t *testing.T) http.HandlerFunc { | ||
return func(_ http.ResponseWriter, _ *http.Request) { | ||
t.Fatal("Server should not be called for invalid migration type") | ||
} | ||
}, | ||
input: struct{ MigrationType string }{MigrationType: "invalid"}, | ||
expError: true, | ||
}, | ||
{ | ||
name: "should not panic on nil response with error", | ||
testFn: func(_ *testing.T) http.HandlerFunc { | ||
return func(w http.ResponseWriter, _ *http.Request) { | ||
w.WriteHeader(http.StatusInternalServerError) | ||
w.Write([]byte("Internal Server Error")) | ||
} | ||
}, | ||
input: OutboundMigration{ | ||
MigrationType: "outbound", | ||
Topics: []Topic{{Topic: "test-topic"}}, | ||
ConsumerGroups: []string{"test-group"}, | ||
}, | ||
expError: true, | ||
expID: -1, | ||
}, | ||
} | ||
|
||
for _, test := range tests { | ||
t.Run(test.name, func(t *testing.T) { | ||
runTest(t, test) | ||
}) | ||
} | ||
} | ||
|
||
func TestExecuteMigration(t *testing.T) { | ||
type testCase struct { | ||
name string | ||
testFn func(t *testing.T) http.HandlerFunc | ||
id int | ||
action string | ||
expError bool | ||
} | ||
|
||
runTest := func(t *testing.T, test testCase) { | ||
server := httptest.NewServer(test.testFn(t)) | ||
defer server.Close() | ||
|
||
client, err := NewAdminAPI([]string{server.URL}, new(NopAuth), nil) | ||
assert.NoError(t, err) | ||
|
||
err = client.ExecuteMigration(context.Background(), test.id, test.action) | ||
|
||
if test.expError { | ||
assert.Error(t, err) | ||
} else { | ||
assert.NoError(t, err) | ||
} | ||
} | ||
|
||
tests := []testCase{ | ||
{ | ||
name: "should execute migration action successfully", | ||
testFn: func(t *testing.T) http.HandlerFunc { | ||
return func(w http.ResponseWriter, r *http.Request) { | ||
assert.Equal(t, "/v1/migrations/123", r.URL.Path) | ||
assert.Equal(t, http.MethodPost, r.Method) | ||
assert.Equal(t, "prepare", r.URL.Query().Get("action")) | ||
|
||
w.WriteHeader(http.StatusOK) | ||
} | ||
}, | ||
id: 123, | ||
action: "prepare", | ||
}, | ||
{ | ||
name: "should return error for invalid action", | ||
testFn: func(t *testing.T) http.HandlerFunc { | ||
return func(_ http.ResponseWriter, _ *http.Request) { | ||
t.Fatal("Server should not be called for invalid action") | ||
} | ||
}, | ||
id: 123, | ||
action: "invalid", | ||
expError: true, | ||
}, | ||
{ | ||
name: "should handle server error", | ||
testFn: func(_ *testing.T) http.HandlerFunc { | ||
return func(w http.ResponseWriter, _ *http.Request) { | ||
w.WriteHeader(http.StatusInternalServerError) | ||
w.Write([]byte("Internal Server Error")) | ||
} | ||
}, | ||
id: 123, | ||
action: "execute", | ||
expError: true, | ||
}, | ||
{ | ||
name: "should handle all valid actions", | ||
testFn: func(t *testing.T) http.HandlerFunc { | ||
return func(w http.ResponseWriter, r *http.Request) { | ||
action := r.URL.Query().Get("action") | ||
assert.Contains(t, []string{"prepare", "execute", "finish", "cancel"}, action) | ||
w.WriteHeader(http.StatusOK) | ||
} | ||
}, | ||
id: 123, | ||
action: "finish", // We'll test one of the valid actions here | ||
}, | ||
} | ||
|
||
for _, test := range tests { | ||
t.Run(test.name, func(t *testing.T) { | ||
runTest(t, test) | ||
}) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters