Skip to content

Commit 8c34d27

Browse files
committed
add savepoints API
1 parent 5c5cc95 commit 8c34d27

19 files changed

+178
-19
lines changed

README.md

+25-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
Detail doc: https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html
44

5+
Status: Beta
6+
7+
58
```
69
package main
710
@@ -50,4 +53,25 @@ More examples in [example](/example) dir.
5053
* stop a job
5154
* job overview
5255
* job detail
53-
* checkpoints
56+
57+
### checkpoints
58+
59+
* get all checkpoints of a job
60+
* stop a job with a savepoint
61+
62+
### TODO:
63+
64+
* vertices
65+
* checkpoints/config
66+
* /jobs/:jobid/checkpoints/details/:checkpointid
67+
* /jobs/:jobid/config
68+
* /jobs/:jobid/exceptions
69+
* /jobs/:jobid/execution-result
70+
* /jobs/:jobid/metrics
71+
* /jobs/:jobid/plan
72+
* /jobs/:jobid/rescaling
73+
* /jobs/:jobid/rescaling/:triggerid
74+
* overview
75+
* /savepoint-disposal
76+
* /taskmanagers
77+

example/checkpoints.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,13 @@ import (
88
)
99

1010
func main() {
11-
c, err := api.New(os.Getenv("FILNK_API"))
11+
c, err := api.New(os.Getenv("FLINK_API"))
1212
if err != nil {
1313
panic(err)
1414
}
1515

1616
// Checkpoints test
17-
v, err := c.Checkpoints("8355c4efb63ddd4ea26a7829156a4c58")
17+
v, err := c.Checkpoints("2bd452ba193d1575a4acc9ed09f896ea")
1818
if err != nil {
1919
panic(err)
2020
}

example/config.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
)
99

1010
func main() {
11-
c, err := api.New(os.Getenv("FILNK_API"))
11+
c, err := api.New(os.Getenv("FLINK_API"))
1212
if err != nil {
1313
panic(err)
1414
}

example/deleteJar.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77
)
88

99
func main() {
10-
c, err := api.New(os.Getenv("FILNK_API"))
10+
c, err := api.New(os.Getenv("FLINK_API"))
1111
if err != nil {
1212
panic(err)
1313
}

example/jobDetail.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
)
99

1010
func main() {
11-
c, err := api.New(os.Getenv("FILNK_API"))
11+
c, err := api.New(os.Getenv("FLINK_API"))
1212
if err != nil {
1313
panic(err)
1414
}

example/jobManagerConfig.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
)
99

1010
func main() {
11-
c, err := api.New(os.Getenv("FILNK_API"))
11+
c, err := api.New(os.Getenv("FLINK_API"))
1212
if err != nil {
1313
panic(err)
1414
}

example/jobManagerMetrics.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
)
99

1010
func main() {
11-
c, err := api.New(os.Getenv("FILNK_API"))
11+
c, err := api.New(os.Getenv("FLINK_API"))
1212
if err != nil {
1313
panic(err)
1414
}

example/jobs.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
)
99

1010
func main() {
11-
c, err := api.New(os.Getenv("FILNK_API"))
11+
c, err := api.New(os.Getenv("FLINK_API"))
1212
if err != nil {
1313
panic(err)
1414
}

example/jobsOverview.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
)
99

1010
func main() {
11-
c, err := api.New(os.Getenv("FILNK_API"))
11+
c, err := api.New(os.Getenv("FLINK_API"))
1212
if err != nil {
1313
panic(err)
1414
}

example/listJars.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
)
99

1010
func main() {
11-
c, err := api.New(os.Getenv("FILNK_API"))
11+
c, err := api.New(os.Getenv("FLINK_API"))
1212
if err != nil {
1313
panic(err)
1414
}

example/planJar.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
)
99

1010
func main() {
11-
c, err := api.New(os.Getenv("FILNK_API"))
11+
c, err := api.New(os.Getenv("FLINK_API"))
1212
if err != nil {
1313
panic(err)
1414
}

example/runJar.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
)
99

1010
func main() {
11-
c, err := api.New(os.Getenv("FILNK_API"))
11+
c, err := api.New(os.Getenv("FLINK_API"))
1212
if err != nil {
1313
panic(err)
1414
}

example/savepoints.go

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"os"
6+
7+
"github.com/flink-go/api"
8+
)
9+
10+
func main() {
11+
c, err := api.New(os.Getenv("FLINK_API"))
12+
if err != nil {
13+
panic(err)
14+
}
15+
16+
// save points test
17+
v, err := c.SavePoints("2bd452ba193d1575a4acc9ed09f896ea", "test", false)
18+
if err != nil {
19+
panic(err)
20+
}
21+
fmt.Println(v)
22+
}

example/shutdown.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77
)
88

99
func main() {
10-
c, err := api.New(os.Getenv("FILNK_API"))
10+
c, err := api.New(os.Getenv("FLINK_API"))
1111
if err != nil {
1212
panic(err)
1313
}

example/stopJob.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77
)
88

99
func main() {
10-
c, err := api.New(os.Getenv("FILNK_API"))
10+
c, err := api.New(os.Getenv("FLINK_API"))
1111
if err != nil {
1212
panic(err)
1313
}

example/stopJobWithSavepoint.go

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"os"
6+
7+
"github.com/flink-go/api"
8+
)
9+
10+
func main() {
11+
c, err := api.New(os.Getenv("FLINK_API"))
12+
if err != nil {
13+
panic(err)
14+
}
15+
16+
v, err := c.StopJobWithSavepoint("2bd452ba193d1575a4acc9ed09f896ea", "test", false)
17+
if err != nil {
18+
panic(err)
19+
}
20+
fmt.Println(v)
21+
}

example/uploadJar.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
)
99

1010
func main() {
11-
c, err := api.New(os.Getenv("FILNK_API"))
11+
c, err := api.New(os.Getenv("FLINK_API"))
1212
if err != nil {
1313
panic(err)
1414
}

job.go

+93-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package api
22

33
import (
4+
"bytes"
45
"encoding/json"
56
"fmt"
67
"net/http"
@@ -292,7 +293,7 @@ type statics struct {
292293

293294
type latest struct {
294295
Completed completedCheckpointsStatics `json:"completed"`
295-
Savepoint completedCheckpointsStatics `json:"savepoint"`
296+
Savepoint savepointsStatics `json:"savepoint"`
296297
Failed failedCheckpointsStatics `json:"failed"`
297298
Restored restoredCheckpointsStatics `json:"restored"`
298299
}
@@ -313,6 +314,21 @@ type completedCheckpointsStatics struct {
313314
Discarded bool `json:"discarded"`
314315
}
315316

317+
type savepointsStatics struct {
318+
ID int `json:"id"`
319+
Status string `json:"status"`
320+
IsSavepoint bool `json:"is_savepoint"`
321+
TriggerTimestamp int64 `json:"trigger_timestamp"`
322+
LatestAckTimestamp int64 `json:"latest_ack_timestamp"`
323+
StateSize int64 `json:"state_size"`
324+
End2EndDuration int64 `json:"end_to_end_duration"`
325+
AlignmentBuffered int64 `json:"alignment_buffered"`
326+
NumSubtasks int64 `json:"num_subtasks"`
327+
NumAcknowledgedSubtasks int64 `json:"num_acknowledged_subtasks"`
328+
tasks taskCheckpointsStatics `json:"tasks"`
329+
ExternalPath string `json:"external_path"`
330+
Discarded bool `json:"discarded"`
331+
}
316332
type taskCheckpointsStatics struct {
317333
ID string `json:"id"`
318334
Status string `json:"status"`
@@ -369,3 +385,79 @@ func (c *Client) Checkpoints(jobID string) (checkpointsResp, error) {
369385
err = json.Unmarshal(b, &r)
370386
return r, err
371387
}
388+
389+
type savePointsResp struct {
390+
RequestID string `json:"request-id"`
391+
}
392+
393+
// SavePoints triggers a savepoint, and optionally cancels the
394+
// job afterwards. This async operation would return a
395+
// 'triggerid' for further query identifier.
396+
func (c *Client) SavePoints(jobID string, saveDir string, cancleJob bool) (savePointsResp, error) {
397+
var r savePointsResp
398+
399+
type savePointsReq struct {
400+
SaveDir string `json:"target-directory"`
401+
CancleJob bool `json:"cancel-job"`
402+
}
403+
404+
d := savePointsReq{
405+
SaveDir: saveDir,
406+
CancleJob: cancleJob,
407+
}
408+
data := new(bytes.Buffer)
409+
json.NewEncoder(data).Encode(d)
410+
uri := fmt.Sprintf("/jobs/%s/savepoints", jobID)
411+
req, err := http.NewRequest(
412+
"POST",
413+
c.url(uri),
414+
data,
415+
)
416+
if err != nil {
417+
return r, err
418+
}
419+
b, err := c.client.Do(req)
420+
if err != nil {
421+
return r, err
422+
}
423+
err = json.Unmarshal(b, &r)
424+
return r, err
425+
}
426+
427+
type stopJobResp struct {
428+
RequestID string `json:"request-id"`
429+
}
430+
431+
// StopJob stops a job with a savepoint. Optionally, it can also
432+
// emit a MAX_WATERMARK before taking the savepoint to flush out
433+
// any state waiting for timers to fire. This async operation
434+
// would return a 'triggerid' for further query identifier.
435+
func (c *Client) StopJobWithSavepoint(jobID string, saveDir string, drain bool) (stopJobResp, error) {
436+
var r stopJobResp
437+
type stopJobReq struct {
438+
SaveDir string `json:"targetDirectory"`
439+
Drain bool `json:"drain"`
440+
}
441+
442+
d := stopJobReq{
443+
SaveDir: saveDir,
444+
Drain: drain,
445+
}
446+
data := new(bytes.Buffer)
447+
json.NewEncoder(data).Encode(d)
448+
uri := fmt.Sprintf("/jobs/%s/stop", jobID)
449+
req, err := http.NewRequest(
450+
"POST",
451+
c.url(uri),
452+
data,
453+
)
454+
if err != nil {
455+
return r, err
456+
}
457+
b, err := c.client.Do(req)
458+
if err != nil {
459+
return r, err
460+
}
461+
err = json.Unmarshal(b, &r)
462+
return r, err
463+
}

request.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ func (c *httpClient) Do(req *http.Request) ([]byte, error) {
2727
if err != nil {
2828
return nil, err
2929
}
30-
if resp.StatusCode != 200 {
31-
return nil, fmt.Errorf("http status not 200: %d %s", resp.StatusCode, string(body))
30+
if int(resp.StatusCode/100) != 2 {
31+
return nil, fmt.Errorf("http status not 2xx: %d %s", resp.StatusCode, string(body))
3232
}
3333
return body, nil
3434
}

0 commit comments

Comments
 (0)