Skip to content

Commit

Permalink
rework S3 object ACL (#63)
Browse files Browse the repository at this point in the history
  • Loading branch information
viren-nadkarni committed Apr 11, 2024
1 parent b9a8410 commit db05ad7
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 17 deletions.
26 changes: 17 additions & 9 deletions moto/s3/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,17 @@ def public_read(self) -> bool:
return True
return False

def __eq__(self, other: Any) -> bool:
"""
User-defined classes always compare unequal except to themselves by default. Using the `config_dict` allows
to check for equality in ACLs.
:param other: another FakeAcl object to compare to
:return: a boolean indicating equality
"""
if not isinstance(other, FakeAcl):
return False
return self.to_config_dict() == other.to_config_dict()

def __repr__(self) -> str:
return f"FakeAcl(grants: {self.grants})"

Expand All @@ -595,9 +606,7 @@ def to_config_dict(self) -> Dict[str, Any]:
if grantee.uri:
grant_list.append(
{
"grantee": grantee.uri.split(
"http://acs.amazonaws.com/groups/s3/"
)[1],
"grantee": grantee.uri.rsplit("/", maxsplit=1)[1],
"permission": CAMEL_CASED_PERMISSIONS[permission],
}
)
Expand Down Expand Up @@ -628,9 +637,8 @@ def get_canned_acl(acl: str) -> FakeAcl:
elif acl == "public-read":
grants.append(FakeGrant([ALL_USERS_GRANTEE], [PERMISSION_READ]))
elif acl == "public-read-write":
grants.append(
FakeGrant([ALL_USERS_GRANTEE], [PERMISSION_READ, PERMISSION_WRITE])
)
grants.append(FakeGrant([ALL_USERS_GRANTEE], [PERMISSION_READ]))
grants.append(FakeGrant([ALL_USERS_GRANTEE], [PERMISSION_WRITE]))
elif acl == "authenticated-read":
grants.append(FakeGrant([AUTHENTICATED_USERS_GRANTEE], [PERMISSION_READ]))
elif acl == "bucket-owner-read":
Expand All @@ -640,9 +648,9 @@ def get_canned_acl(acl: str) -> FakeAcl:
elif acl == "aws-exec-read":
pass # TODO: bucket owner, EC2 Read
elif acl == "log-delivery-write":
grants.append(
FakeGrant([LOG_DELIVERY_GRANTEE], [PERMISSION_READ_ACP, PERMISSION_WRITE])
)
grants.append(FakeGrant([LOG_DELIVERY_GRANTEE], [PERMISSION_READ_ACP]))
grants.append(FakeGrant([LOG_DELIVERY_GRANTEE], [PERMISSION_WRITE]))

else:
assert False, f"Unknown canned acl: {acl}"
return FakeAcl(grants=grants)
Expand Down
19 changes: 13 additions & 6 deletions moto/s3/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -1477,6 +1477,10 @@ def _key_response_put(
key_name: str,
) -> TYPE_RESPONSE:
self._set_action("KEY", "PUT", query)
# TODO: work on ACL, anonymous users are allowed to upload objects to public bucket, not currently supported
# because of unsupported "anonymous" user.
# An object created by an anon user in a public bucket will be public. If created by an authenticated user,
# by default those objects will be private and owned by the creating user
self._authenticate_and_authorize_s3_action(
bucket_name=bucket_name, key_name=key_name
)
Expand Down Expand Up @@ -1595,8 +1599,6 @@ def _key_response_put(
lock_mode = bucket.default_lock_mode

acl = self._acl_from_headers(request.headers)
if acl is None:
acl = bucket.acl
tagging = self._tagging_from_headers(request.headers)

if "versionId" in query:
Expand All @@ -1623,6 +1625,8 @@ def _key_response_put(
return 200, response_headers, ""

if "acl" in query:
if not acl:
acl = self._acl_from_body()
self.backend.put_object_acl(bucket_name, key_name, acl)
return 200, response_headers, ""

Expand Down Expand Up @@ -1690,8 +1694,7 @@ def _key_response_put(

new_key: FakeKey = self.backend.get_object(bucket_name, key_name) # type: ignore

if acl is not None:
new_key.set_acl(acl)
new_key.set_acl(acl or get_canned_acl("private"))

tdirective = request.headers.get("x-amz-tagging-directive")
if tdirective == "REPLACE":
Expand Down Expand Up @@ -1740,7 +1743,9 @@ def _key_response_put(
metadata = metadata_from_headers(request.headers)
metadata.update(metadata_from_headers(query))
new_key.set_metadata(metadata)
new_key.set_acl(acl)
# TODO: if the user is anonymous in a public bucket, the canned ACL should be `public`
# not currently supported (always-on default credentials)
new_key.set_acl(acl or get_canned_acl("private"))
new_key.website_redirect_location = request.headers.get(
"x-amz-website-redirect-location"
)
Expand Down Expand Up @@ -2221,7 +2226,9 @@ def _key_response_post(
metadata = metadata_from_headers(request.headers)
tagging = self._tagging_from_headers(request.headers)
storage_type = request.headers.get("x-amz-storage-class", "STANDARD")
acl = self._acl_from_headers(request.headers)
# TODO: if the user is anonymous in a public bucket, the canned ACL should be `public`
# not currently supported (always-on default credentials)
acl = self._acl_from_headers(request.headers) or get_canned_acl("private")

multipart_id = self.backend.create_multipart_upload(
bucket_name,
Expand Down
7 changes: 5 additions & 2 deletions tests/test_s3/test_s3_acl.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ def test_s3_object_in_public_bucket():
bucket.create(
ACL="public-read", CreateBucketConfiguration={"LocationConstraint": "us-west-1"}
)
bucket.put_object(Body=b"ABCD", Key="file.txt")
# by default, a file created by an authenticated user in a public bucket will be private
bucket.put_object(Body=b"ABCD", Key="file.txt", ACL="public-read")

s3_anonymous = boto3.resource("s3")
s3_anonymous.meta.client.meta.events.register("choose-signer.s3.*", disable_signing)
Expand All @@ -98,7 +99,9 @@ def test_s3_object_in_public_bucket():
)
assert contents == b"ABCD"

bucket.put_object(ACL="private", Body=b"ABCD", Key="file.txt")
# the object will be created private by default
# TODO: support an anonymous user to create an object in a public bucket, it should have `public` ACL
bucket.put_object(Body=b"ABCD", Key="file.txt")

with pytest.raises(ClientError) as exc:
s3_anonymous.Object(key="file.txt", bucket_name="test-bucket").get()
Expand Down

0 comments on commit db05ad7

Please sign in to comment.