Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adds support for migrations #14

Merged
merged 5 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/golangci-lint.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ jobs:
go-version: 'stable'

- name: golangci-lint
uses: golangci/golangci-lint-action@v4
uses: golangci/golangci-lint-action@v6
with:
version: v1.56
version: v1.60.2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

working-directory: api
args: --timeout=10m --config=../.golangci.yaml

Expand Down
4 changes: 1 addition & 3 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ linters:
- errname
- errorlint
- exhaustive
- exportloopref
- copyloopvar
- gci
- gocheckcompilerdirectives
- gocognit
Expand Down Expand Up @@ -81,8 +81,6 @@ linters-settings:
# latest Go. We always want the latest formatting.
#
# https://github.com/mvdan/gofumpt/issues/137
gofumpt:
lang-version: "1.22"
gosec:
excludes:
- G104 # unhandled errors, we exclude for the same reason we do not use errcheck
Expand Down
2 changes: 1 addition & 1 deletion Taskfile.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: "3"

vars:
BUILD_ROOT: "{{ .ROOT_DIR }}/build"
GO_VERSION: 1.22.2
GO_VERSION: 1.22.4
GO_BUILD_ROOT: '{{.BUILD_ROOT}}/go/{{.GO_VERSION}}'
MODULES:
sh: find . -maxdepth 2 -name go.mod -execdir pwd \;
Expand Down
114 changes: 114 additions & 0 deletions rpadmin/api_migration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package rpadmin

import (
"context"
"fmt"
"net/http"
"reflect"
)

const (
baseMigrationEndpoint = "/v1/migrations/"
)

// AddMigration adds a migration to the cluster. It accepts one of InboundMigration or OutboundMigration.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you document what is the integer in the response means to the end user?

func (a *AdminAPI) AddMigration(ctx context.Context, migration any) (int, error) {
migrationType := reflect.TypeOf(migration)
Comment on lines +23 to +25
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid having any in the input, what do you think of splitting the AddMigration to AddInboudMigration and AddOutbountMigration. It's easier for the caller to figure the usage.

if migrationType != reflect.TypeOf(InboundMigration{}) && migrationType != reflect.TypeOf(OutboundMigration{}) {
return 0, fmt.Errorf("invalid migration type: must be either InboundMigration or OutboundMigration")
}

var response AddMigrationResponse
if err := a.sendOne(ctx, http.MethodPut, baseMigrationEndpoint, migration, &response, false); err != nil {
return -1, err
}
return response.ID, nil
Comment on lines +30 to +34
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why return the response.ID and not the AddMigrationResponse? it will be better if we return the struct if more information gets added to the response in the future.

}

// GetMigration gets a migration by its ID.
func (a *AdminAPI) GetMigration(ctx context.Context, id int) (MigrationState, error) {
var response MigrationState
err := a.sendOne(ctx, http.MethodGet, fmt.Sprintf("baseMigrationEndpoint%d", id), nil, &response, false)
return response, err
}

// ListMigrations returns a list of all migrations in the cluster.
func (a *AdminAPI) ListMigrations(ctx context.Context) ([]MigrationState, error) {
var response []MigrationState
err := a.sendAny(ctx, http.MethodGet, baseMigrationEndpoint, nil, &response)
return response, err
}

// DeleteMigration deletes a migration by its ID.
func (a *AdminAPI) DeleteMigration(ctx context.Context, id int) error {
return a.sendAny(ctx, http.MethodDelete, fmt.Sprintf("baseMigrationEndpoint%d", id), nil, nil)
}

// ExecuteMigration executes a specific action on a migration identified by its ID. The action must be one of:
// prepare, execute, finish, cancel.
func (a *AdminAPI) ExecuteMigration(ctx context.Context, id int, action string) error {
validActions := map[string]bool{
Copy link
Member

@bojand bojand Aug 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be just a single var declared outside of the method?

Also minor nit / optimization might be to use the empty struct for the values as it caries 0 size.

var validActions = map[string]struct{}{
	"prepare": struct{}{},
	"execute": struct{}{},
	"finish":  struct{}{},
	"cancel":  struct{}{},
}

and then the check can be something like:

if _, valid := validActions[action]; !valid {
	// ...
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's only used in one place so I felt like that was the right place to declare it. I can change it if needed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup tradeoffs. I don't feel strongly about it either way.

"prepare": true,
"execute": true,
"finish": true,
"cancel": true,
}
if !validActions[action] {
return fmt.Errorf("invalid action: %s. Must be one of: prepare, execute, finish, cancel", action)
}
return a.sendAny(ctx, http.MethodPost, fmt.Sprintf("%s%d?action=%s", baseMigrationEndpoint, id, action), nil, nil)
}

// OutboundMigration represents an outbound migration request
type OutboundMigration struct {
MigrationType string `json:"migration_type"`
Topics []Topic `json:"topics"`
ConsumerGroups []string `json:"consumer_groups"`
}

// InboundMigration represents an inbound migration configuration
type InboundMigration struct {
MigrationType string `json:"migration_type"`
Topics []InboundTopic `json:"topics"`
ConsumerGroups []string `json:"consumer_groups"`
}

// InboundTopic represents an inbound migration topic
type InboundTopic struct {
SourceTopic Topic `json:"source_topic"`
Alias *Topic `json:"alias,omitempty"`
Location string `json:"location,omitempty"`
}

// MigrationState represents the state of a migration
type MigrationState struct {
ID int `json:"id"`
State string `json:"state"`
Migration Migration `json:"migration"`
}

// Migration represents a migration
type Migration struct {
MigrationType string `json:"migration_type"`
Topics []Topic `json:"topics"`
}

// Topic represents a namespaced topic
type Topic struct {
Topic string `json:"topic"`
Namespace string `json:"ns"`
}

// AddMigrationResponse is the response from adding a migration
type AddMigrationResponse struct {
ID int `json:"id"`
}
221 changes: 221 additions & 0 deletions rpadmin/api_migration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package rpadmin

import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"

"github.com/stretchr/testify/assert"
)

func TestAddMigration(t *testing.T) {
type testCase struct {
name string
testFn func(t *testing.T) http.HandlerFunc
input any
expID int
expError bool
}

successfulAddResponse := AddMigrationResponse{
ID: 123,
}

runTest := func(t *testing.T, test testCase) {
server := httptest.NewServer(test.testFn(t))
defer server.Close()

client, err := NewAdminAPI([]string{server.URL}, new(NopAuth), nil)
assert.NoError(t, err)

id, err := client.AddMigration(context.Background(), test.input)

if test.expError {
assert.Error(t, err)
assert.Equal(t, test.expID, id) // should be -1 for error
} else {
assert.NoError(t, err)
assert.Equal(t, test.expID, id)
}
}

tests := []testCase{
{
name: "should add outbound migration successfully",
testFn: func(t *testing.T) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "/v1/migrations/", r.URL.Path)
assert.Equal(t, http.MethodPut, r.Method)

var migration OutboundMigration
err := json.NewDecoder(r.Body).Decode(&migration)
assert.NoError(t, err)
assert.Equal(t, "outbound", migration.MigrationType)

w.WriteHeader(http.StatusOK)
resp, err := json.Marshal(successfulAddResponse)
assert.NoError(t, err)
w.Write(resp)
}
},
input: OutboundMigration{
MigrationType: "outbound",
Topics: []Topic{{Topic: "test-topic"}},
ConsumerGroups: []string{"test-group"},
},
expID: 123,
},
{
name: "should add inbound migration successfully",
testFn: func(t *testing.T) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "/v1/migrations/", r.URL.Path)
assert.Equal(t, http.MethodPut, r.Method)

var migration InboundMigration
err := json.NewDecoder(r.Body).Decode(&migration)
assert.NoError(t, err)
assert.Equal(t, "inbound", migration.MigrationType)

w.WriteHeader(http.StatusOK)
resp, err := json.Marshal(successfulAddResponse)
assert.NoError(t, err)
w.Write(resp)
}
},
input: InboundMigration{
MigrationType: "inbound",
Topics: []InboundTopic{{SourceTopic: Topic{Topic: "test-topic"}}},
ConsumerGroups: []string{"test-group"},
},
expID: 123,
},
{
name: "should return error for invalid migration type",
testFn: func(t *testing.T) http.HandlerFunc {
return func(_ http.ResponseWriter, _ *http.Request) {
t.Fatal("Server should not be called for invalid migration type")
}
},
input: struct{ MigrationType string }{MigrationType: "invalid"},
expError: true,
},
{
name: "should not panic on nil response with error",
testFn: func(_ *testing.T) http.HandlerFunc {
return func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("Internal Server Error"))
}
},
input: OutboundMigration{
MigrationType: "outbound",
Topics: []Topic{{Topic: "test-topic"}},
ConsumerGroups: []string{"test-group"},
},
expError: true,
expID: -1,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
runTest(t, test)
})
}
}

func TestExecuteMigration(t *testing.T) {
type testCase struct {
name string
testFn func(t *testing.T) http.HandlerFunc
id int
action string
expError bool
}

runTest := func(t *testing.T, test testCase) {
server := httptest.NewServer(test.testFn(t))
defer server.Close()

client, err := NewAdminAPI([]string{server.URL}, new(NopAuth), nil)
assert.NoError(t, err)

err = client.ExecuteMigration(context.Background(), test.id, test.action)

if test.expError {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
}

tests := []testCase{
{
name: "should execute migration action successfully",
testFn: func(t *testing.T) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "/v1/migrations/123", r.URL.Path)
assert.Equal(t, http.MethodPost, r.Method)
assert.Equal(t, "prepare", r.URL.Query().Get("action"))

w.WriteHeader(http.StatusOK)
}
},
id: 123,
action: "prepare",
},
{
name: "should return error for invalid action",
testFn: func(t *testing.T) http.HandlerFunc {
return func(_ http.ResponseWriter, _ *http.Request) {
t.Fatal("Server should not be called for invalid action")
}
},
id: 123,
action: "invalid",
expError: true,
},
{
name: "should handle server error",
testFn: func(_ *testing.T) http.HandlerFunc {
return func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("Internal Server Error"))
}
},
id: 123,
action: "execute",
expError: true,
},
{
name: "should handle all valid actions",
testFn: func(t *testing.T) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
action := r.URL.Query().Get("action")
assert.Contains(t, []string{"prepare", "execute", "finish", "cancel"}, action)
w.WriteHeader(http.StatusOK)
}
},
id: 123,
action: "finish", // We'll test one of the valid actions here
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
runTest(t, test)
})
}
}
2 changes: 1 addition & 1 deletion rpadmin/api_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (a *AdminAPI) TransferLeadership(ctx context.Context, ns, topic string, par
return a.sendOne(ctx, http.MethodPost, path, nil, nil, false)
}

// Trigger on-demand balancer.
// TriggerBalancer Trigger on-demand balancer.
func (a *AdminAPI) TriggerBalancer(ctx context.Context) error {
return a.sendToLeader(ctx, http.MethodPost, "/v1/partitions/rebalance", nil, nil)
}
Loading