Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add enum and typed functions #15

Merged
merged 2 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 38 additions & 13 deletions rpadmin/api_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,25 @@ const (
)

// AddMigration adds a migration to the cluster. It accepts one of InboundMigration or OutboundMigration.
func (a *AdminAPI) AddMigration(ctx context.Context, migration any) (int, error) {
func (a *AdminAPI) addMigration(ctx context.Context, migration any) (AddMigrationResponse, error) {
migrationType := reflect.TypeOf(migration)
if migrationType != reflect.TypeOf(InboundMigration{}) && migrationType != reflect.TypeOf(OutboundMigration{}) {
return 0, fmt.Errorf("invalid migration type: must be either InboundMigration or OutboundMigration")
return AddMigrationResponse{}, fmt.Errorf("invalid migration type: must be either InboundMigration or OutboundMigration")
}

var response AddMigrationResponse
if err := a.sendOne(ctx, http.MethodPut, baseMigrationEndpoint, migration, &response, false); err != nil {
return -1, err
return AddMigrationResponse{}, err
}
return response.ID, nil
return response, nil
}

func (a *AdminAPI) AddInboundMigration(ctx context.Context, migration InboundMigration) (AddMigrationResponse, error) {
return a.addMigration(ctx, migration)
}

func (a *AdminAPI) AddOutboundMigration(ctx context.Context, migration OutboundMigration) (AddMigrationResponse, error) {
return a.addMigration(ctx, migration)
}

// GetMigration gets a migration by its ID.
Expand All @@ -53,16 +61,33 @@ func (a *AdminAPI) DeleteMigration(ctx context.Context, id int) error {
return a.sendAny(ctx, http.MethodDelete, fmt.Sprintf("baseMigrationEndpoint%d", id), nil, nil)
}

// ExecuteMigration executes a specific action on a migration identified by its ID. The action must be one of:
// prepare, execute, finish, cancel.
func (a *AdminAPI) ExecuteMigration(ctx context.Context, id int, action string) error {
validActions := map[string]bool{
"prepare": true,
"execute": true,
"finish": true,
"cancel": true,
type MigrationAction int

const (
PrepareMigrationAction MigrationAction = iota
ExecuteMigrationAction
FinishMigrationAction
CancelMigrationAction
)

func (a MigrationAction) String() string {
switch a {
case PrepareMigrationAction:
return "prepare"
case ExecuteMigrationAction:
return "execute"
case FinishMigrationAction:
return "finish"
case CancelMigrationAction:
return "cancel"
default:
return ""
}
if !validActions[action] {
}

// ExecuteMigration executes a specific action on a migration identified by its ID.
func (a *AdminAPI) ExecuteMigration(ctx context.Context, id int, action MigrationAction) error {
if action < PrepareMigrationAction || action > CancelMigrationAction {
return fmt.Errorf("invalid action: %s. Must be one of: prepare, execute, finish, cancel", action)
}
return a.sendAny(ctx, http.MethodPost, fmt.Sprintf("%s%d?action=%s", baseMigrationEndpoint, id, action), nil, nil)
Expand Down
146 changes: 136 additions & 10 deletions rpadmin/api_migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestAddMigration(t *testing.T) {
name string
testFn func(t *testing.T) http.HandlerFunc
input any
expID int
expID AddMigrationResponse
expError bool
}

Expand All @@ -39,7 +39,7 @@ func TestAddMigration(t *testing.T) {
client, err := NewAdminAPI([]string{server.URL}, new(NopAuth), nil)
assert.NoError(t, err)

id, err := client.AddMigration(context.Background(), test.input)
id, err := client.addMigration(context.Background(), test.input)

if test.expError {
assert.Error(t, err)
Expand Down Expand Up @@ -74,7 +74,7 @@ func TestAddMigration(t *testing.T) {
Topics: []Topic{{Topic: "test-topic"}},
ConsumerGroups: []string{"test-group"},
},
expID: 123,
expID: AddMigrationResponse{ID: 123},
},
{
name: "should add inbound migration successfully",
Expand All @@ -99,7 +99,7 @@ func TestAddMigration(t *testing.T) {
Topics: []InboundTopic{{SourceTopic: Topic{Topic: "test-topic"}}},
ConsumerGroups: []string{"test-group"},
},
expID: 123,
expID: AddMigrationResponse{ID: 123},
},
{
name: "should return error for invalid migration type",
Expand All @@ -125,7 +125,7 @@ func TestAddMigration(t *testing.T) {
ConsumerGroups: []string{"test-group"},
},
expError: true,
expID: -1,
expID: AddMigrationResponse{},
},
}

Expand All @@ -141,7 +141,7 @@ func TestExecuteMigration(t *testing.T) {
name string
testFn func(t *testing.T) http.HandlerFunc
id int
action string
action MigrationAction
expError bool
}

Expand Down Expand Up @@ -174,7 +174,7 @@ func TestExecuteMigration(t *testing.T) {
}
},
id: 123,
action: "prepare",
action: PrepareMigrationAction,
},
{
name: "should return error for invalid action",
Expand All @@ -184,7 +184,7 @@ func TestExecuteMigration(t *testing.T) {
}
},
id: 123,
action: "invalid",
action: -1,
expError: true,
},
{
Expand All @@ -196,7 +196,7 @@ func TestExecuteMigration(t *testing.T) {
}
},
id: 123,
action: "execute",
action: ExecuteMigrationAction,
expError: true,
},
{
Expand All @@ -209,7 +209,7 @@ func TestExecuteMigration(t *testing.T) {
}
},
id: 123,
action: "finish", // We'll test one of the valid actions here
action: FinishMigrationAction,
},
}

Expand All @@ -219,3 +219,129 @@ func TestExecuteMigration(t *testing.T) {
})
}
}

func TestAddInboundMigration(t *testing.T) {
tests := []struct {
name string
migration InboundMigration
serverResponse AddMigrationResponse
serverStatus int
expectError bool
}{
{
name: "successful inbound migration",
migration: InboundMigration{
MigrationType: "inbound",
Topics: []InboundTopic{{SourceTopic: Topic{Topic: "test-topic"}}},
ConsumerGroups: []string{"test-group"},
},
serverResponse: AddMigrationResponse{ID: 456},
serverStatus: http.StatusOK,
expectError: false,
},
{
name: "server error",
migration: InboundMigration{
MigrationType: "inbound",
Topics: []InboundTopic{{SourceTopic: Topic{Topic: "test-topic"}}},
},
serverStatus: http.StatusInternalServerError,
expectError: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "/v1/migrations/", r.URL.Path)
assert.Equal(t, http.MethodPut, r.Method)

var receivedMigration InboundMigration
err := json.NewDecoder(r.Body).Decode(&receivedMigration)
assert.NoError(t, err)
assert.Equal(t, tt.migration, receivedMigration)

w.WriteHeader(tt.serverStatus)
if tt.serverStatus == http.StatusOK {
json.NewEncoder(w).Encode(tt.serverResponse)
}
}))
defer server.Close()

client, err := NewAdminAPI([]string{server.URL}, new(NopAuth), nil)
assert.NoError(t, err)

resp, err := client.AddInboundMigration(context.Background(), tt.migration)

if tt.expectError {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.Equal(t, tt.serverResponse, resp)
}
})
}
}

func TestAddOutboundMigration(t *testing.T) {
tests := []struct {
name string
migration OutboundMigration
serverResponse AddMigrationResponse
serverStatus int
expectError bool
}{
{
name: "successful outbound migration",
migration: OutboundMigration{
MigrationType: "outbound",
Topics: []Topic{{Topic: "test-topic"}},
ConsumerGroups: []string{"test-group"},
},
serverResponse: AddMigrationResponse{ID: 789},
serverStatus: http.StatusOK,
expectError: false,
},
{
name: "server error",
migration: OutboundMigration{
MigrationType: "outbound",
Topics: []Topic{{Topic: "test-topic"}},
},
serverStatus: http.StatusInternalServerError,
expectError: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "/v1/migrations/", r.URL.Path)
assert.Equal(t, http.MethodPut, r.Method)

var receivedMigration OutboundMigration
err := json.NewDecoder(r.Body).Decode(&receivedMigration)
assert.NoError(t, err)
assert.Equal(t, tt.migration, receivedMigration)

w.WriteHeader(tt.serverStatus)
if tt.serverStatus == http.StatusOK {
json.NewEncoder(w).Encode(tt.serverResponse)
}
}))
defer server.Close()

client, err := NewAdminAPI([]string{server.URL}, new(NopAuth), nil)
assert.NoError(t, err)

resp, err := client.AddOutboundMigration(context.Background(), tt.migration)

if tt.expectError {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.Equal(t, tt.serverResponse, resp)
}
})
}
}
Loading