# Source code for dynamo.vectorfield.Bhattacharya

```
from typing import Callable, Tuple, Union
import numpy as np
# from scipy.integrate import odeint
from scipy.interpolate import griddata
[docs]def path_integral(
VecFnc: Callable,
x_lim: np.ndarray,
y_lim: np.ndarray,
xyGridSpacing: Union[int, float],
dt: float = 1e-2,
tol: float = 1e-2,
numTimeSteps: int = 1400,
) -> Tuple[
int, np.ndarray, np.ndarray, np.ndarray, int, int, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray
]:
"""A deterministic map of Waddington’s epigenetic landscape for cell fate specification
Sudin Bhattacharya, Qiang Zhang and Melvin E. Andersen
Args:
VecFnc: The function of vector field that takes in x, y and returns the velocity at that point.
x_lim: Lower or upper limit of x-axis.
y_lim: Lower or upper limit of y-axis
xyGridSpacing: Grid spacing for "starting points" for each "path" on the potential surface
dt: Time step for the path integral.
tol: Tolerance to test for convergence.
numTimeSteps: A high-enough number for convergence with given dt.
Returns:
numAttractors: Number of attractors identified by the path integral approach.
attractors_num_X_Y: Attractor number and the corresponding x, y coordinates.
sepx_old_new_pathNum: The IDs of the two attractors for each separaxis per row.
numPaths_att: Number of paths per attractor
numPaths: Total Number of paths for defined grid spacing.
numTimeSteps: A high-enough number for convergence with given dt.
pot_path: Potential along the path. (dimension: numPaths x numTimeSteps)
path_tag: Tag for given path (to denote basin of attraction). (dimension: numPaths x 1)
attractors_pot: Potential value of each identified attractors by the path integral approach.
x_path: x-coord along path.
y_path: y-coord along path.
"""
# -- First, generate potential surface from deterministic rate equations –
# Define grid spacing for "starting points" for each "path" on the pot. surface
# Define grid spacing for "starting points" for each "path" on the pot. surface
# No. of time steps for integrating along each path (to ensure uniform arrays)
# Time step and tolerance to test for convergence
# Calculate total no. of paths for defined grid spacing
numPaths = int(np.diff(x_lim) / xyGridSpacing + 1) ** 2
# Initialize "path" variable matrices
x_path = np.zeros((numPaths, numTimeSteps)) # x-coord. along path
y_path = np.zeros((numPaths, numTimeSteps)) # y-coord. along path
pot_path = np.zeros((numPaths, numTimeSteps)) # pot. along path
path_tag = np.ones((numPaths, 1), dtype="int") # tag for given path (to denote basin of attraction)
# ** initialized to 1 for all paths **
# Initialize "Path counter" to 1
path_counter = 0
# Initialize no. of attractors and separatrices (basin boundaries)
num_attractors = 0
num_sepx = 0
# Assign array to keep track of attractors and their coordinates; and pot.
attractors_num_X_Y = None
attractors_pot = None
# Assign array to keep track of no. of paths per attractor
numPaths_att = None
# Assign array to keep track of separatrices
sepx_old_new_pathNum = None
# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
# Loop over x-y grid
for i in np.arange(x_lim[0], x_lim[1] + xyGridSpacing, xyGridSpacing):
for j in np.arange(y_lim[0], y_lim[1] + xyGridSpacing, xyGridSpacing):
# *** Init conds for given (x,y) ***
# Initialize coords.
x0 = i
y0 = j
# ** Set initial value of "potential" to 0 **
p0 = 0 # (to facilitate comparison of "potential drop")
# Initialize "path" variables
x_p = x0
y_p = y0
# Initialize accumulators for "potential" along path
Pot = p0
Pot_old = 1.0e7 # initialize to large number
# Initialize global arrays (time t = 0 counts as "time step #1")
x_path[path_counter, 0] = x_p
y_path[path_counter, 0] = y_p
pot_path[path_counter, 0] = Pot
# Evaluate potential (Integrate) over trajectory from init cond to stable steady state
for n_steps in np.arange(1, numTimeSteps):
# record "old" values of variables
# x_old = x_p;
# y_old = y_p;
# v_old = v;
Pot_old = Pot
# update dxdt, dydt
dxdt, dydt = VecFnc([x_p, y_p])
# update x, y
dx = dxdt * dt
dy = dydt * dt
x_p = x_p + dx
y_p = y_p + dy
x_path[path_counter, n_steps] = x_p
y_path[path_counter, n_steps] = y_p
# update "potential"
dPot = -(dxdt) * dx - (dydt) * dy # signs ensure that "potential" decreases as "velocity" increases
Pot = Pot_old + dPot
pot_path[path_counter, n_steps] = Pot
# ################################################################################################################
# # just use odeint for integration:
# Pot_func = lambda x_p, y_p: -VecFnc([x_p, y_p])**2
# y_path=odeint(Pot_func, x_p, y_p, t=0)
# ################################################################################################################
# check for convergence
if abs(Pot - Pot_old) > tol:
print(1, "Warning: not converged!\n")
# --- assign path tag (to track multiple basins of attraction) ---
if path_counter == 0:
# record attractor of first path and its coords
num_attractors = num_attractors + 1
current_att_num_X_Y = np.array([num_attractors, x_p, y_p]) # create array
attractors_num_X_Y = (
np.vstack((attractors_num_X_Y, current_att_num_X_Y))
if attractors_num_X_Y is not None
else np.array([current_att_num_X_Y])
) # append array (vertically)
attractors_pot = (
np.vstack((attractors_pot, Pot)) if attractors_pot is not None else np.array([Pot])
) # append attractor potentials to array (vertically)
path_tag[path_counter] = num_attractors - 1 # initialize path tag
numPaths_att = (
np.vstack((numPaths_att, 1)) if numPaths_att is not None else np.array([1])
) # append to array (vertically)
else:
# i.e. if path counter > 1
# set path tag to that of previous path (default)
path_tag[path_counter] = path_tag[path_counter - 1]
# record info of previous path
x0_lastPath = x_path[(path_counter - 1), 0]
y0_lastPath = y_path[(path_counter - 1), 0]
xp_lastPath = x_path[(path_counter - 1), numTimeSteps - 1]
yp_lastPath = y_path[(path_counter - 1), numTimeSteps - 1]
pot_p_lastPath = pot_path[(path_counter - 1), numTimeSteps - 1]
# calculate distance between "start points" of current and previous paths
startPt_dist_sqr = (x0 - x0_lastPath) ** 2 + (y0 - y0_lastPath) ** 2
# calculate distance between "end points" of current and previous paths
endPt_dist_sqr = (x_p - xp_lastPath) ** 2 + (y_p - yp_lastPath) ** 2
# check if the current path *ended* in a different point compared to previous path (x-y grid spacing used
# as a "tolerance" for distance)
if endPt_dist_sqr > (2 * (xyGridSpacing**2)):
# --- check if this "different" attractor has been identified before
new_attr_found = 1
for k in range(num_attractors):
x_att = attractors_num_X_Y[k, 1]
y_att = attractors_num_X_Y[k, 2]
if (abs(x_p - x_att) < xyGridSpacing) and (abs(y_p - y_att) < xyGridSpacing):
# this attractor has been identified before
new_attr_found = 0
path_tag[path_counter] = k # DOUBLE CHECK ***
numPaths_att[k] = numPaths_att[k] + 1
break # exit for-loop
if new_attr_found == 1:
num_attractors = num_attractors + 1
current_att_num_X_Y = [
num_attractors,
x_p,
y_p,
] # create array
attractors_num_X_Y = np.vstack(
(attractors_num_X_Y, current_att_num_X_Y)
) # append array (vertically)
path_tag[path_counter] = num_attractors - 1 # DOUBLE CHECK **
numPaths_att = np.vstack((numPaths_att, 1)) # append to array (vertically)
attractors_pot = np.vstack(
(attractors_pot, Pot)
) # append attractor potentials to array (vertically)
# check if start points of current and previous paths are "adjacent" - if so, assign separatrix
if startPt_dist_sqr < (2 * (xyGridSpacing**2)):
curr_sepx = [
path_tag[path_counter - 1],
path_tag[path_counter],
(path_counter - 1),
] # create array
sepx_old_new_pathNum = (
np.vstack((sepx_old_new_pathNum, curr_sepx))
if sepx_old_new_pathNum is not None
else np.array([curr_sepx])
) # append array (vertically)
# attractors_pot = np.vstack((attractors_pot, Pot)) # append attractor potentials to array (vertically) #????????????????????????????????????????????????????????????????????????????????????
num_sepx = num_sepx + 1 # increment no. of separatrices
else:
# --- check if the attractor of the *previous* path
# has been encountered in a separatrix before ---
# (note that current path tag has already been set
# above)
prev_attr_new = 1
for k in range(num_sepx):
attr1 = sepx_old_new_pathNum[k, 0]
attr2 = sepx_old_new_pathNum[k, 1]
if (path_tag[path_counter - 1] == attr1) or (path_tag[path_counter - 1] == attr2):
# this attractor has been identified before
prev_attr_new = 0
break # exit for-loop
if prev_attr_new == 1:
# check if start points of current and previous paths are "adjacent" - if so, assign separatrix
if startPt_dist_sqr < (2 * (xyGridSpacing**2)):
curr_sepx = [
path_tag[path_counter - 1],
path_tag[path_counter],
(path_counter - 1),
] # create array
sepx_old_new_pathNum = (
np.vstack((sepx_old_new_pathNum, curr_sepx))
if sepx_old_new_pathNum is not None
else np.array([curr_sepx])
) # append array (vertically)
# attractors_pot = np.vstack((attractors_pot, pot_p_lastPath)) # append attractor potentials to array vertically) #????????????????????????????????????????????????????????????????????????????????????
num_sepx = num_sepx + 1 # increment no. of separatrices
else:
# i.e. current path converged at same pt. as previous path
# update path tag
# path_tag(path_counter) = path_tag(path_counter - 1);
# update no. of paths for current attractor
# (path tag already updated at start of path-counter loop)
tag = path_tag[path_counter]
numPaths_att[tag - 1] = numPaths_att[tag - 1] + 1
# increment "path counter"
path_counter += 1
return (
attractors_num_X_Y,
sepx_old_new_pathNum,
numPaths_att,
num_attractors,
numPaths,
numTimeSteps,
pot_path,
path_tag,
attractors_pot,
x_path,
y_path,
)
[docs]def alignment(
numPaths: int,
numTimeSteps: int,
pot_path: np.ndarray,
path_tag: np.ndarray,
attractors_pot: np.ndarray,
x_path: np.ndarray,
y_path: np.ndarray,
grid: int = 100,
interpolation_method: str = "linear",
):
"""Align potential values so all path-potentials end up at same global min and then generate potential surface with
interpolation on a grid.
Args:
numPaths: Total Number of paths for defined grid spacing.
numTimeSteps: A high-enough number for convergence with given dt.
pot_path: Potential along the path. (dimension: numPaths x numTimeSteps)
path_tag: Tag for given path (to denote basin of attraction). (dimension: numPaths x 1)
attractors_pot: Potential value of each identified attractors by the path integral approach.
x_path: x-coord. along path.
y_path: y-coord. along path.
grid: No. of grid lines in x- and y- directions
interpolation_method: Method of interpolation in griddata function. One of
``nearest``
return the value at the data point closest to
the point of interpolation. See `NearestNDInterpolator` for
more details.
``linear``
tessellate the input point set to n-dimensional
simplices, and interpolate linearly on each simplex. See
`LinearNDInterpolator` for more details.
``cubic`` (1-D)
return the value determined from a cubic
spline.
``cubic`` (2-D)
return the value determined from a
piecewise cubic, continuously differentiable (C1), and
approximately curvature-minimizing polynomial surface. See
`CloughTocher2DInterpolator` for more details.
Returns:
Xgrid: x-coordinates of the Grid produced from the meshgrid function.
Ygrid: y-coordinates of the Grid produced from the meshgrid function.
Zgrid: z-coordinates or potential at each of the x/y coordinate.
"""
# -- need 1-D "lists" (vectors) to plot all x,y, Pot values along paths --
list_size = numPaths * numTimeSteps
x_p_list = np.zeros((list_size, 1))
y_p_list = np.zeros((list_size, 1))
pot_p_list = np.zeros((list_size, 1))
n_list = 0
# "Align" potential values so all path-potentials end up at same global min.
for n_path in range(numPaths):
tag = path_tag[n_path]
# print(tag)
del_pot = pot_path[n_path, numTimeSteps - 1] - attractors_pot[tag]
# align pot. at each time step along path
for n_steps in range(numTimeSteps):
pot_old = pot_path[n_path, n_steps]
pot_path[n_path, n_steps] = pot_old - del_pot
# add data point to list
x_p_list[n_list] = x_path[n_path, n_steps]
y_p_list[n_list] = y_path[n_path, n_steps]
pot_p_list[n_list] = pot_path[n_path, n_steps]
n_list = n_list + 1 # increment n_list
# Generate surface interpolation grid
# % To generate log-log surface
x_p_list = x_p_list + 0.1
y_p_list = y_p_list + 0.1
# --- Create X,Y grid to interpolate "potential surface" ---
xlin = np.linspace(min(x_p_list), max(x_p_list), grid)
ylin = np.linspace(min(y_p_list), max(y_p_list), grid)
Xgrid, Ygrid = np.meshgrid(xlin, ylin)
Zgrid = griddata(
np.hstack((x_p_list, y_p_list)),
pot_p_list,
np.vstack((Xgrid.flatten(), Ygrid.flatten())).T,
method=interpolation_method,
)
Zgrid = Zgrid.reshape(Xgrid.shape)
# print('Ran surface grid-interpolation okay!\n')
return Xgrid, Ygrid, Zgrid
# %%
```