-
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathgithub_contents.py
161 lines (149 loc) · 5.8 KB
/
github_contents.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import base64
from requests import Session
class GithubContents:
class NotFound(Exception):
pass
class UnknownError(Exception):
pass
def __init__(self, owner, repo, token, branch="main"):
self.owner = owner
self.repo = repo
self.token = token
self.session = Session()
self.branch = branch
def base_url(self):
return "https://api.github.com/repos/{}/{}".format(self.owner, self.repo)
def headers(self):
return {"Authorization": "token {}".format(self.token)}
def read(self, filepath):
"Returns (file_contents_in_bytes, sha1)"
# Try reading using content API
content_url = "{}/contents/{}".format(self.base_url(), filepath)
response = self.session.get(content_url, headers=self.headers())
if response.status_code == 200:
data = response.json()
return base64.b64decode(data["content"]), data["sha"]
elif response.status_code == 404:
raise self.NotFound(filepath)
elif response.status_code == 403:
# It's probably too large
if response.json()["errors"][0]["code"] != "too_large":
raise self.UnknownError(response.content)
else:
return self.read_large(filepath)
else:
raise self.UnknownError(response.content)
def read_large(self, filepath):
"Returns (file_contents_in_bytes, sha1)"
default_tree = self.session.get(
self.base_url() + "/git/trees/{}?recursive=1".format(self.branch),
headers=self.headers(),
).json()
try:
tree_entry = [t for t in default_tree["tree"] if t["path"] == filepath][0]
except IndexError:
raise self.NotFound(filepath)
data = self.session.get(tree_entry["url"], headers=self.headers()).json()
return base64.b64decode(data["content"]), data["sha"]
def write(
self, filepath, content_bytes, sha=None, commit_message="", committer=None
):
if not isinstance(content_bytes, bytes):
raise TypeError("content_bytes must be a bytestring")
github_url = "{}/contents/{}".format(self.base_url(), filepath)
payload = {
"path": filepath,
"content": base64.b64encode(content_bytes).decode("latin1"),
"message": commit_message,
}
if sha:
payload["sha"] = sha
if committer:
payload["committer"] = committer
response = self.session.put(github_url, json=payload, headers=self.headers())
if (
response.status_code == 403
and response.json()["errors"][0]["code"] == "too_large"
):
return self.write_large(filepath, content_bytes, commit_message, committer)
elif (
sha is None
and response.status_code == 422
and "sha" in response.json().get("message", "")
):
# Missing sha - we need to figure out the sha and try again
_, old_sha = self.read(filepath)
return self.write(
filepath,
content_bytes,
sha=old_sha,
commit_message=commit_message,
committer=committer,
)
elif response.status_code in (201, 200):
updated = response.json()
return updated["content"]["sha"], updated["commit"]["sha"]
else:
raise self.UnknownError(
str(response.status_code) + ":" + repr(response.content)
)
def write_large(self, filepath, content_bytes, commit_message="", committer=None):
if not isinstance(content_bytes, bytes):
raise TypeError("content_bytes must be a bytestring")
# Create a new blob with the file contents
created_blob = self.session.post(
self.base_url() + "/git/blobs",
json={
"encoding": "base64",
"content": base64.b64encode(content_bytes).decode("latin1"),
},
headers=self.headers(),
).json()
# Retrieve default tree sha
default_branch_sha = self.session.get(
self.base_url() + "/git/trees/{}?recursive=1".format(self.branch),
headers=self.headers(),
).json()["sha"]
# Construct a new tree
created_tree = self.session.post(
self.base_url() + "/git/trees",
json={
"base_tree": default_branch_sha,
"tree": [
{
"mode": "100644", # file (blob),
"path": filepath,
"type": "blob",
"sha": created_blob["sha"],
}
],
},
headers=self.headers(),
).json()
# Create a commit which references the new tree
payload = {
"message": commit_message,
"parents": [default_branch_sha],
"tree": created_tree["sha"],
}
if committer:
payload["committer"] = committer
created_commit = self.session.post(
self.base_url() + "/git/commits", json=payload, headers=self.headers()
).json()
# Move HEAD reference on master to the new commit
self.session.patch(
self.base_url() + "/git/refs/heads/{}".format(self.branch),
json={"sha": created_commit["sha"]},
headers=self.headers(),
).json()
return created_blob["sha"], created_commit["sha"]
def branch_exists(self):
assert self.branch
return (
self.session.get(
self.base_url() + "/git/refs/heads/{}".format(self.branch),
headers=self.headers(),
).status_code
== 200
)