Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prepare weights for ilp unwrapping #8

Merged
merged 15 commits into from
Jul 16, 2024
Merged
10 changes: 10 additions & 0 deletions kamui/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
import numpy as np
from typing import Optional, Iterable

from .utils import prepare_weights


try:
import maxflow
except ImportError:
Expand Down Expand Up @@ -40,6 +43,7 @@ def calculate_k(
differences: np.ndarray,
weights: Optional[np.ndarray] = None,
adaptive_weighting: bool = True,
merging_method: str = 'mean'
) -> Optional[np.ndarray]:
"""
Args:
Expand All @@ -48,6 +52,7 @@ def calculate_k(
differences: (M,) array of differences, could be float or int
weights: (M,) array of weights
adaptive_weighting: whether to use adaptive weighting
merging_method: how to combine phase weights to yield edge weight
"""

M, N = edges.shape[0], len(simplices)
Expand Down Expand Up @@ -101,6 +106,7 @@ def calculate_k(
else:
c = np.ones((M * 2,), dtype=np.int64)
else:
weights = prepare_weights(weights, edges=edges, merging_method=merging_method)
c = np.tile(weights, 2)

res = linprog(c, A_eq=A_eq, b_eq=b_eq, integrality=1)
Expand All @@ -115,12 +121,14 @@ def calculate_m(
edges: np.ndarray,
differences: np.ndarray,
weights: Optional[np.ndarray] = None,
merging_method: str = 'mean'
) -> Optional[np.ndarray]:
"""
Args:
edges: (M, 2) array of edges
differences: (M,) quantised array of differences, must be int
weights: (M,) array of weights
merging_method: how to combine phase weights to yield edge weight
"""
assert differences.dtype == np.int64, "differences must be int"
M = edges.shape[0]
Expand All @@ -144,6 +152,8 @@ def calculate_m(
)
if weights is None:
weights = np.ones((M,), dtype=np.int64)
else:
weights = prepare_weights(weights, edges=edges, merging_method=merging_method)

c = np.concatenate((np.zeros(N, dtype=np.int64), weights, weights))

Expand Down
47 changes: 47 additions & 0 deletions kamui/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import numpy as np
import numpy.typing as npt
from typing import Tuple, Optional, Iterable, Union

__all__ = ["get_2d_edges_and_simplices", "get_3d_edges_and_simplices"]
Expand Down Expand Up @@ -194,3 +195,49 @@ def get_3d_edges_and_simplices(
).tolist()

return edges, simplices


def prepare_weights(weights: npt.NDArray, edges: npt.NDArray[np.int_], smoothing: float = 0.1,
merging_method: str = 'mean') -> npt.NDArray[np.float_]:
"""Prepare weights for `calculate_m` and `calculate_k` functions.

Assume the weights are the same shape as the phases to be unwrapped.

Scale the weights from 0 to 1. Pick the weights corresponding to the phase pairs connected by the edges.
Compute the mean/max/min (depending on the `merging_method`) of each of those pairs to give a weight for each edge.

Args:
weights : Array of weights of shapr corresponding to the original phases array shape.
edges : Edges connecting the phases. Shape: (N, 2), where N is the number of edges.
smoothing : A positive value between 0 (inclusive) and 1 (not inclusive). This is the minimal value
of the rescaled weights where they are defined. If smoothing > 0, the value of 0 is reserved
for places where the weights are originally NaN. If smoothing == 0, 0 will be used for both
NaN weights and smallest non-NaN ones.
merging_method : Way of combining two phase weights into a single edge weight.

Returns:
Array of weights for the edges, shape: (N,). Rescaled to (0, 1).
"""

if not 0 <= smoothing < 1:
raise ValueError(
"`smoothing` should be a value between 0 (inclusive) and 1 (non inclusive); got " + str(smoothing))

# scale the weights from 0 to 1
weights = weights - np.nanmin(weights)
weights /= np.nanmax(weights)
weights *= (1 - smoothing)
weights += smoothing

# pick the weights corresponding to the phases connected by the edges
# and use `merging_method` to get one weight for each edge
allowed_merging_methods = ['min', 'max', 'mean']
if merging_method not in allowed_merging_methods:
raise ValueError(
"`merging_method` should be one of: " + ', '.join(merging_method) + '; got ' + str(merging_method))
weights_for_edges = getattr(np, merging_method)(weights.ravel()[edges], axis=1)

# make sure there are no NaNs in the weights; replace any with 0s
weights_for_edges[np.isnan(weights_for_edges)] = 0

return weights_for_edges