Skip to content

Commit c13d6f5

Browse files
committed
init
0 parents  commit c13d6f5

File tree

10 files changed

+316
-0
lines changed

10 files changed

+316
-0
lines changed

README.md

+40
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
# Flink Monitoring API Golang library
2+
3+
Detail doc: https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html
4+
5+
```
6+
package main
7+
8+
import (
9+
"fmt"
10+
11+
"github.com/flink-go/api"
12+
)
13+
14+
func main() {
15+
c, err := api.New("127.0.0.1:8081")
16+
if err != nil {
17+
panic(err)
18+
}
19+
20+
// get cluster config
21+
config, err := c.Config()
22+
if err != nil {
23+
panic(err)
24+
}
25+
fmt.Println(config)
26+
}
27+
```
28+
29+
More examples in [example](/example) dir.
30+
### Cluster API
31+
32+
* shutdown cluster
33+
* list config
34+
35+
36+
### Jar File API
37+
38+
* upload jar file
39+
* list jar files
40+
* delete jar file

api.go

+141
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
package api
2+
3+
import (
4+
"bytes"
5+
"encoding/json"
6+
"fmt"
7+
"io"
8+
"mime/multipart"
9+
"net/http"
10+
"os"
11+
"path/filepath"
12+
"strings"
13+
)
14+
15+
// Client reprents flink REST API client
16+
type Client struct {
17+
// Addr reprents flink job manager server address
18+
Addr string
19+
20+
client *httpClient
21+
}
22+
23+
// New returns a flink client
24+
func New(addr string) (*Client, error) {
25+
return &Client{
26+
Addr: addr,
27+
client: newHttpClient(),
28+
}, nil
29+
}
30+
31+
func (c *Client) url(path string) string {
32+
if strings.HasPrefix(c.Addr, "http") {
33+
return fmt.Sprintf("%s%s", c.Addr, path)
34+
}
35+
return fmt.Sprintf("http://%s%s", c.Addr, path)
36+
}
37+
38+
// Shutdown shutdown the flink cluster
39+
func (c *Client) Shutdown() error {
40+
req, err := http.NewRequest("DELETE", c.url("/cluster"), nil)
41+
if err != nil {
42+
return err
43+
}
44+
_, err = c.client.Do(req)
45+
return err
46+
}
47+
48+
type configResp struct {
49+
RefreshInterval int64 `json:"refresh-interval"`
50+
TimezoneName string `json:"timezone-name"`
51+
TimezoneOffset int64 `json:"timezone-offset"`
52+
FlinkVersion string `json:"flink-version"`
53+
FlinkRevision string `json:"flink-revision"`
54+
Features features `json:"features"`
55+
}
56+
type features struct {
57+
WebSubmit bool `json:"web-submit"`
58+
}
59+
60+
// Config returns the configuration of the WebUI
61+
func (c *Client) Config() (configResp, error) {
62+
var r configResp
63+
req, err := http.NewRequest("GET", c.url("/config"), nil)
64+
if err != nil {
65+
return r, err
66+
}
67+
b, err := c.client.Do(req)
68+
err = json.Unmarshal(b, &r)
69+
return r, err
70+
}
71+
72+
type uploadResp struct {
73+
FileName string `json:"filename"`
74+
Status string `json:"status"`
75+
}
76+
77+
// Upload uploads jar file
78+
func (c *Client) UploadJar(fpath string) (uploadResp, error) {
79+
var r uploadResp
80+
file, err := os.Open(fpath)
81+
if err != nil {
82+
return r, err
83+
}
84+
defer file.Close()
85+
86+
body := &bytes.Buffer{}
87+
writer := multipart.NewWriter(body)
88+
part, err := writer.CreateFormFile("jarfile", filepath.Base(file.Name()))
89+
io.Copy(part, file)
90+
writer.Close()
91+
92+
req, err := http.NewRequest("POST", c.url("/jars/upload"), body)
93+
if err != nil {
94+
return r, err
95+
}
96+
req.Header.Add("Content-Type", writer.FormDataContentType())
97+
b, err := c.client.Do(req)
98+
err = json.Unmarshal(b, &r)
99+
return r, err
100+
}
101+
102+
type jarsResp struct {
103+
Address string `json:"address"`
104+
Files []jarFile `json:"files"`
105+
}
106+
107+
type jarFile struct {
108+
ID string `json:"id"`
109+
Name string `json:"name"`
110+
Uploaded int64 `json:"uploaded"`
111+
Entries []entry `json:"entry"`
112+
}
113+
114+
type entry struct {
115+
Name string `json:"name"`
116+
Description string `json:"description"`
117+
}
118+
119+
// Jars eturns a list of all jars previously uploaded
120+
// via '/jars/upload'
121+
func (c *Client) Jars() (jarsResp, error) {
122+
var r jarsResp
123+
req, err := http.NewRequest("GET", c.url("/jars"), nil)
124+
if err != nil {
125+
return r, err
126+
}
127+
b, err := c.client.Do(req)
128+
err = json.Unmarshal(b, &r)
129+
return r, err
130+
}
131+
132+
// DeleteJar deletes a jar file
133+
func (c *Client) DeleteJar(jarid string) error {
134+
uri := fmt.Sprintf("/jars/%s", jarid)
135+
req, err := http.NewRequest("DELETE", c.url(uri), nil)
136+
if err != nil {
137+
return err
138+
}
139+
_, err = c.client.Do(req)
140+
return err
141+
}

example/config.go

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/flink-go/api"
7+
)
8+
9+
func main() {
10+
c, err := api.New("127.0.0.1:8081")
11+
if err != nil {
12+
panic(err)
13+
}
14+
15+
// config test
16+
config, err := c.Config()
17+
if err != nil {
18+
panic(err)
19+
}
20+
fmt.Println(config)
21+
}

example/deleteJar.go

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package main
2+
3+
import (
4+
"github.com/flink-go/api"
5+
)
6+
7+
func main() {
8+
c, err := api.New("127.0.0.1:8081")
9+
if err != nil {
10+
panic(err)
11+
}
12+
13+
// delete test
14+
err = c.DeleteJar("efb8367e-aa0d-4ceb-957f-0e8f46ed4b10_test.jar")
15+
if err != nil {
16+
panic(err)
17+
}
18+
}

example/listJars.go

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/flink-go/api"
7+
)
8+
9+
func main() {
10+
c, err := api.New("127.0.0.1:8081")
11+
if err != nil {
12+
panic(err)
13+
}
14+
15+
// upload test
16+
resp, err := c.Jars()
17+
if err != nil {
18+
panic(err)
19+
}
20+
fmt.Println(resp)
21+
}

example/shutdown.go

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package main
2+
3+
import (
4+
"github.com/flink-go/api"
5+
)
6+
7+
func main() {
8+
c, err := api.New("127.0.0.1:8081")
9+
if err != nil {
10+
panic(err)
11+
}
12+
13+
// shutdown test
14+
if err := c.Shutdown(); err != nil {
15+
panic(err)
16+
}
17+
}

example/testdata/test.jar

3.4 MB
Binary file not shown.

example/uploadJar.go

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/flink-go/api"
7+
)
8+
9+
func main() {
10+
c, err := api.New("127.0.0.1:8081")
11+
if err != nil {
12+
panic(err)
13+
}
14+
15+
// upload test
16+
resp, err := c.UploadJar("./testdata/test.jar")
17+
if err != nil {
18+
panic(err)
19+
}
20+
fmt.Println(resp)
21+
}

go.mod

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
module github.com/flink-go/api
2+
3+
go 1.14

request.go

+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package api
2+
3+
import (
4+
"fmt"
5+
"io/ioutil"
6+
"net/http"
7+
)
8+
9+
type httpClient struct {
10+
client http.Client
11+
}
12+
13+
func newHttpClient() *httpClient {
14+
return &httpClient{
15+
client: http.Client{},
16+
}
17+
}
18+
19+
func (c *httpClient) Do(req *http.Request) ([]byte, error) {
20+
resp, err := c.client.Do(req)
21+
if err != nil {
22+
return nil, err
23+
}
24+
25+
defer resp.Body.Close()
26+
body, err := ioutil.ReadAll(resp.Body)
27+
if err != nil {
28+
return nil, err
29+
}
30+
if resp.StatusCode != 200 {
31+
return nil, fmt.Errorf("http status not 200: %d %s", resp.StatusCode, string(body))
32+
}
33+
return body, nil
34+
}

0 commit comments

Comments
 (0)