Skip to content

Commit 19e8a1e

Browse files
committed
Introduce network functions
Generic network functions are implemented that allow for verification of signed data against the network. Signed data can come from keys with specific NFTs; stake against a specific CNT; and against alias stake keys following the Orcfax aliasing protocol.
1 parent f2d7541 commit 19e8a1e

File tree

2 files changed

+108
-40
lines changed

2 files changed

+108
-40
lines changed

src/simple_sign/backend.py

+36-26
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
import logging
1111
from dataclasses import dataclass
12-
from typing import Callable, Final
12+
from typing import Callable, Final, Optional
1313

1414
import cachetools.func
1515
import pycardano as pyc
@@ -73,15 +73,15 @@ def retrieve_staked_holders(self, token_policy: str) -> list:
7373
raise NotImplementedError()
7474

7575
def retrieve_nft_holders(
76-
self, policy: str, deny_list: list, seek_addr: str = None
76+
self, policy: str, deny_list: list, addr: str = None
7777
) -> list:
7878
"""Retrieve a list of NFT holders, e.g. a license to operate
7979
a decentralized node.
8080
"""
8181
raise NotImplementedError()
8282

8383
def retrieve_metadata(
84-
self, value: int, policy: str, tag: str, callback: Callable = None
84+
self, value: int, policy: str, tag: str, callback: Optional[Callable | None]
8585
) -> list:
8686
"""Retrieve metadata from the backend."""
8787
raise NotImplementedError()
@@ -100,20 +100,25 @@ def __init__(
100100
self._port = port
101101

102102
@cachetools.func.ttl_cache(ttl=60)
103-
def _retrieve_unspent_utxos(self, addr: str = "") -> dict:
103+
def _retrieve_unspent_utxos(self, addr: str = None) -> dict:
104104
"""Retrieve unspent utxos from Kupo.
105105
106-
NB. Kupo must be configured to capture sparingly.
106+
NB. Kupo must be configured to capture sparingly, i.e. the
107+
policies and addresses it is watching and slot from which it is
108+
watching must be as specific as possible for this function to
109+
perform well.
107110
"""
108-
if not addr:
109-
resp = requests.get(
110-
f"{self._base_url}:{self._port}/matches?unspent", timeout=30
111-
)
112-
return resp.json()
113-
resp = requests.get(
114-
f"{self._base_url}:{self._port}/matches/{addr}?unspent", timeout=30
115-
)
116-
return resp.json()
111+
kupo_err: Final[str] = "hint"
112+
request_string = f"{self._base_url}:{self._port}/matches?unspent"
113+
if addr:
114+
request_string = f"{self._base_url}:{self._port}/matches/{addr}?unspent"
115+
logger.info("requesting unspent: '%s'", request_string)
116+
resp = requests.get(request_string, timeout=30)
117+
ret = resp.json()
118+
if kupo_err in ret:
119+
logger.error("unable to retrieve data due to Kupo request error: %s", ret)
120+
return []
121+
return ret
117122

118123
def _retrieve_metadata(self, tag: str, tx_list: list[ValidTx]):
119124
"""Return metadata based on slot and transaction ID. This is
@@ -145,25 +150,26 @@ def _retrieve_metadata(self, tag: str, tx_list: list[ValidTx]):
145150
md_list.append(md_dict[0])
146151
return md_list
147152

148-
def retrieve_staked_holders(self, token_policy: str, seek_addr: str = None) -> list:
153+
def retrieve_staked_holders(self, token_policy: str, addr: str = None) -> list:
149154
"""Retrieve a list of staked holders against a given CNT."""
150155
unspent = self._retrieve_unspent_utxos()
151156
addresses_with_fact = {}
152157
for item in unspent:
153-
addr = item["address"]
154-
if seek_addr and addr != seek_addr:
158+
unspent_addr = item["address"]
159+
unspent_staking = _get_staking_from_addr(unspent_addr)
160+
if addr and addr not in (unspent_addr, unspent_staking):
155161
# don't process further than we have to if we're only
156162
# looking for a single address.
157163
continue
158-
staking = _get_staking_from_addr(addr)
164+
staking = unspent_staking
159165
assets = item["value"]["assets"]
160166
for key, value in assets.items():
161167
if token_policy in key:
162168
addresses_with_fact = _sum_dict(staking, value, addresses_with_fact)
163169
return addresses_with_fact
164170

165171
def retrieve_nft_holders(
166-
self, policy: str, deny_list: list = None, seek_addr: str = None
172+
self, policy: str, deny_list: list = None, addr: str = None
167173
) -> list:
168174
"""Retrieve a list of NFT holders, e.g. a license to operate
169175
a decentralized node.
@@ -172,17 +178,21 @@ def retrieve_nft_holders(
172178
to remove some results that are unhelpful, e.g. the minting
173179
address if desired.
174180
"""
175-
unspent = self._retrieve_unspent_utxos()
181+
if addr:
182+
unspent = self._retrieve_unspent_utxos(addr)
183+
else:
184+
unspent = self._retrieve_unspent_utxos()
176185
holders = {}
177186
for item in unspent:
178-
addr = item["address"]
179-
if seek_addr and addr != seek_addr:
187+
unspent_addr = item["address"]
188+
unspent_staking = _get_staking_from_addr(unspent_addr)
189+
if addr and addr not in (unspent_addr, unspent_staking):
180190
# don't process further than we have to if we're only
181191
# looking for a single address.
182192
continue
183-
staking = _get_staking_from_addr(addr)
184-
if addr in deny_list:
193+
if deny_list and unspent_addr in deny_list:
185194
continue
195+
staking = unspent_staking
186196
assets = item["value"]["assets"]
187197
for key, _ in assets.items():
188198
if not key.startswith(policy):
@@ -195,6 +205,7 @@ def _get_valid_txs(unspent: list[dict], value: int, policy: str) -> list[ValidTx
195205
"""Retrieve a list of valid transactions according to our
196206
policy rules.
197207
"""
208+
logger.info("getting valid txs for policy: '%s'", policy)
198209
valid_txs = []
199210
if not unspent:
200211
return valid_txs
@@ -206,7 +217,6 @@ def _get_valid_txs(unspent: list[dict], value: int, policy: str) -> list[ValidTx
206217
for asset in assets:
207218
if policy not in asset:
208219
continue
209-
logger.error(policy)
210220
slot = item["created_at"]["slot_no"]
211221
tx_id = item["transaction_id"]
212222
address = item["address"]
@@ -225,7 +235,7 @@ def retrieve_metadata(
225235
value: int,
226236
policy: str,
227237
tag: str,
228-
callback: Callable = None,
238+
callback: Optional[Callable | None],
229239
) -> list:
230240
"""Retrieve a list of aliased signing addresses. An aliased
231241
signing address is an address that has been setup using a

src/simple_sign/sign.py

+72-14
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,25 @@
33
# pylint: disable=W0613
44

55
import argparse
6+
import binascii
7+
import copy
68
import logging
79
import os
810
import sys
911
import time
10-
from typing import Final
12+
from typing import Callable, Final
1113

1214
import pycardano as pyc
1315

1416
try:
17+
from src.simple_sign.backend import KupoContext
1518
from src.simple_sign.version import get_version
1619
except ModuleNotFoundError:
1720
try:
21+
from backend import KupoContext
1822
from version import get_version
1923
except ModuleNotFoundError:
24+
from simple_sign.backend import KupoContext
2025
from simple_sign.version import get_version
2126

2227
# Set up logging.
@@ -44,35 +49,88 @@ class UnknownSigningKey(Exception):
4449
"""Exception to raise when the signing key is unknown."""
4550

4651

47-
def retrieve_aliased(pkey: str) -> str:
52+
def retrieve_aliased(
53+
context: KupoContext,
54+
policy_id: str,
55+
tag: str,
56+
value: int,
57+
callback: Callable,
58+
) -> str:
4859
"""Retrieve another public key aliased by the given lookup.
4960
5061
The result might then be used to verify using one of the other
51-
methods in this library, e.g.
62+
methods in this library, e.g. given an staking key returned for an
63+
alias, verify if the staking key also holds the correct amount
64+
of stake for a given token.
5265
53-
1. lookup aliased staking key.
54-
2. lookup staking key in license pool.
55-
3. if not exists, raise exception, else, pass.
66+
NB. to keep in mind, does aliasing already guarantee a license is
67+
held? If the policy is supplied?
5668
57-
We want to do this on an address by address basis. The difficulty
58-
is consistent parsing of metadata that allows this function to be
59-
broadly applicable across functions.
69+
Aliasing can potentially be a generic process, it exists in this
70+
library by way of helping realize that. It could be removed in
71+
future, and so any feedback is appreciated if it works for you.
72+
73+
For more information; https://docs.orcfax.io/signing-key-aliasing
6074
"""
61-
raise NotImplementedError("reading staked values is not yet implemented")
75+
if not policy_id:
76+
policy_id = ""
77+
if not value or not tag:
78+
raise NotImplementedError("function requires a lovelace value and metadata tag")
79+
aliases = context.retrieve_metadata(
80+
value=value,
81+
tag=tag,
82+
policy=policy_id,
83+
callback=callback,
84+
)
85+
return aliases
6286

6387

64-
def signature_in_staked_pool(pkey: str, token_policy_id: str, min_stake: int) -> bool:
88+
def signature_in_staked_pool(
89+
context: KupoContext, pkey: str, token_policy_id: str, min_stake: int
90+
) -> bool:
6591
"""Validate whether the signing key belongs to a someone who has
6692
enough stake in a given token.
6793
"""
68-
raise NotImplementedError("reading staked values is not yet implemented")
94+
staking = context.retrieve_staked_holders(
95+
addr=pkey,
96+
token_policy=token_policy_id,
97+
)
98+
for key, value in copy.deepcopy(staking).items():
99+
if value > min_stake:
100+
continue
101+
del staking[key]
102+
try:
103+
staked = staking[pkey]
104+
if not int(staked) >= min_stake:
105+
raise UnknownSigningKey(
106+
f"addr: '{pkey}', does not have enough stake: '{min_stake}'",
107+
)
108+
except IndexError:
109+
raise UnknownSigningKey(
110+
f"addr: '{pkey}', is not knonwn to the network",
111+
) from IndexError
112+
return True
69113

70114

71-
def signature_in_license_pool(pkey: str, policy_id: str) -> bool:
115+
def signature_in_license_pool(
116+
context: KupoContext, pkey: str, policy_id: str, suffix: str = ""
117+
) -> bool:
72118
"""Validate whether signing key matches one of those in a pool of
73119
licenses associated with the project and return True if so.
74120
"""
75-
raise NotImplementedError("reading from license pool is not yet implemented")
121+
md = context.retrieve_nft_holders(
122+
policy=policy_id,
123+
addr=pkey,
124+
)
125+
holding = {}
126+
for k, v in md.items():
127+
license_name = k.replace(policy_id, "").replace(".", "").replace(suffix, "")
128+
license_name = binascii.unhexlify(license_name).decode()
129+
holding[license_name] = v
130+
if not holding:
131+
raise UnknownSigningKey(f"addr '{pkey}' is not in possession of a license")
132+
logger.info("information in license pool: '%s'", holding)
133+
return True
76134

77135

78136
def signature_in_constitution_datum_utxo(pkey: str) -> bool:

0 commit comments

Comments
 (0)