Skip to content

Commit

Permalink
feat: adds support for migrations
Browse files Browse the repository at this point in the history
  • Loading branch information
gene-redpanda committed Aug 22, 2024
1 parent 8e6eb58 commit 09818d4
Show file tree
Hide file tree
Showing 2 changed files with 326 additions and 0 deletions.
105 changes: 105 additions & 0 deletions rpadmin/api_migration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package rpadmin

import (
"context"
"fmt"
"net/http"
"reflect"
)

const (
baseMigrationEndpoint = "/v1/migrations/"
)

// AddMigration adds a migration to the cluster. It accepts one of InboundMigration or OutboundMigration.
func (a *AdminAPI) AddMigration(ctx context.Context, migration any) (int, error) {
migrationType := reflect.TypeOf(migration)
if migrationType != reflect.TypeOf(InboundMigration{}) && migrationType != reflect.TypeOf(OutboundMigration{}) {
return 0, fmt.Errorf("invalid migration type: must be either InboundMigration or OutboundMigration")
}

var response AddMigrationResponse
if err := a.sendOne(ctx, http.MethodPut, baseMigrationEndpoint, migration, &response, false); err != nil {
return -1, err
}
return response.ID, nil
}

// GetMigration gets a migration by its ID.
func (a *AdminAPI) GetMigration(ctx context.Context, id int) (MigrationState, error) {
var response MigrationState
err := a.sendOne(ctx, http.MethodGet, fmt.Sprintf("baseMigrationEndpoint%d", id), nil, &response, false)
return response, err
}

// ListMigrations returns a list of all migrations in the cluster.
func (a *AdminAPI) ListMigrations(ctx context.Context) ([]MigrationState, error) {
var response []MigrationState
err := a.sendAny(ctx, http.MethodGet, baseMigrationEndpoint, nil, &response)
return response, err
}

// DeleteMigration deletes a migration by its ID.
func (a *AdminAPI) DeleteMigration(ctx context.Context, id int) error {
return a.sendAny(ctx, http.MethodDelete, fmt.Sprintf("baseMigrationEndpoint%d", id), nil, nil)
}

// ExecuteMigration executes a specific action on a migration identified by its ID. The action must be one of:
// prepare, execute, finish, cancel.
func (a *AdminAPI) ExecuteMigration(ctx context.Context, id int, action string) error {
validActions := map[string]bool{
"prepare": true,
"execute": true,
"finish": true,
"cancel": true,
}
if !validActions[action] {
return fmt.Errorf("invalid action: %s. Must be one of: prepare, execute, finish, cancel", action)
}
return a.sendAny(ctx, http.MethodPost, fmt.Sprintf("%s%d?action=%s", baseMigrationEndpoint, id, action), nil, nil)
}

// OutboundMigration represents an outbound migration request
type OutboundMigration struct {
MigrationType string `json:"migration_type"`
Topics []Topic `json:"topics"`
ConsumerGroups []string `json:"consumer_groups"`
}

// InboundMigration represents an inbound migration configuration
type InboundMigration struct {
MigrationType string `json:"migration_type"`
Topics []InboundTopic `json:"topics"`
ConsumerGroups []string `json:"consumer_groups"`
}

// InboundTopic represents an inbound migration topic
type InboundTopic struct {
SourceTopic Topic `json:"source_topic"`
Alias *Topic `json:"alias,omitempty"`
Location string `json:"location,omitempty"`
}

// MigrationState represents the state of a migration
type MigrationState struct {
ID int `json:"id"`
State string `json:"state"`
Migration Migration `json:"migration"`
}

// Migration represents a migration
type Migration struct {
MigrationType string `json:"migration_type"`
Topics []Topic `json:"topics"`
}

// Topic represents a namespaced topic
type Topic struct {
Topic string `json:"topic"`
Namespace string `json:"ns"`
}

// AddMigrationResponse is the response from adding a migration
type AddMigrationResponse struct {
ID int `json:"id"`
}
221 changes: 221 additions & 0 deletions rpadmin/api_migration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
// Copyright 2023 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package rpadmin

import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"

"github.com/stretchr/testify/assert"
)

func TestAddMigration(t *testing.T) {
type testCase struct {
name string
testFn func(t *testing.T) http.HandlerFunc
input any
expID int
expError bool
}

successfulAddResponse := AddMigrationResponse{
ID: 123,
}

runTest := func(t *testing.T, test testCase) {
server := httptest.NewServer(test.testFn(t))
defer server.Close()

client, err := NewAdminAPI([]string{server.URL}, new(NopAuth), nil)
assert.NoError(t, err)

id, err := client.AddMigration(context.Background(), test.input)

if test.expError {
assert.Error(t, err)
assert.Equal(t, test.expID, id) // should be -1 for error
} else {
assert.NoError(t, err)
assert.Equal(t, test.expID, id)
}
}

tests := []testCase{
{
name: "should add outbound migration successfully",
testFn: func(t *testing.T) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "/v1/migrations/", r.URL.Path)
assert.Equal(t, http.MethodPut, r.Method)

var migration OutboundMigration
err := json.NewDecoder(r.Body).Decode(&migration)
assert.NoError(t, err)
assert.Equal(t, "outbound", migration.MigrationType)

w.WriteHeader(http.StatusOK)
resp, err := json.Marshal(successfulAddResponse)
assert.NoError(t, err)
w.Write(resp)
}
},
input: OutboundMigration{
MigrationType: "outbound",
Topics: []Topic{{Topic: "test-topic"}},
ConsumerGroups: []string{"test-group"},
},
expID: 123,
},
{
name: "should add inbound migration successfully",
testFn: func(t *testing.T) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "/v1/migrations/", r.URL.Path)
assert.Equal(t, http.MethodPut, r.Method)

var migration InboundMigration
err := json.NewDecoder(r.Body).Decode(&migration)
assert.NoError(t, err)
assert.Equal(t, "inbound", migration.MigrationType)

w.WriteHeader(http.StatusOK)
resp, err := json.Marshal(successfulAddResponse)
assert.NoError(t, err)
w.Write(resp)
}
},
input: InboundMigration{
MigrationType: "inbound",
Topics: []InboundTopic{{SourceTopic: Topic{Topic: "test-topic"}}},
ConsumerGroups: []string{"test-group"},
},
expID: 123,
},
{
name: "should return error for invalid migration type",
testFn: func(t *testing.T) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
t.Fatal("Server should not be called for invalid migration type")
}
},
input: struct{ MigrationType string }{MigrationType: "invalid"},
expError: true,
},
{
name: "should not panic on nil response with error",
testFn: func(t *testing.T) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("Internal Server Error"))
}
},
input: OutboundMigration{
MigrationType: "outbound",
Topics: []Topic{{Topic: "test-topic"}},
ConsumerGroups: []string{"test-group"},
},
expError: true,
expID: -1,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
runTest(t, test)
})
}
}

func TestExecuteMigration(t *testing.T) {
type testCase struct {
name string
testFn func(t *testing.T) http.HandlerFunc
id int
action string
expError bool
}

runTest := func(t *testing.T, test testCase) {
server := httptest.NewServer(test.testFn(t))
defer server.Close()

client, err := NewAdminAPI([]string{server.URL}, new(NopAuth), nil)
assert.NoError(t, err)

err = client.ExecuteMigration(context.Background(), test.id, test.action)

if test.expError {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
}

tests := []testCase{
{
name: "should execute migration action successfully",
testFn: func(t *testing.T) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "/v1/migrations/123", r.URL.Path)
assert.Equal(t, http.MethodPost, r.Method)
assert.Equal(t, "prepare", r.URL.Query().Get("action"))

w.WriteHeader(http.StatusOK)
}
},
id: 123,
action: "prepare",
},
{
name: "should return error for invalid action",
testFn: func(t *testing.T) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
t.Fatal("Server should not be called for invalid action")
}
},
id: 123,
action: "invalid",
expError: true,
},
{
name: "should handle server error",
testFn: func(t *testing.T) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("Internal Server Error"))
}
},
id: 123,
action: "execute",
expError: true,
},
{
name: "should handle all valid actions",
testFn: func(t *testing.T) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
action := r.URL.Query().Get("action")
assert.Contains(t, []string{"prepare", "execute", "finish", "cancel"}, action)
w.WriteHeader(http.StatusOK)
}
},
id: 123,
action: "finish", // We'll test one of the valid actions here
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
runTest(t, test)
})
}
}

0 comments on commit 09818d4

Please sign in to comment.