Source code for llg3d.llg3d

"""
Solver for the stochastic Landau-Lifshitz-Gilbert equation in 3D
"""

import argparse
import json
import sys
import time
from dataclasses import dataclass

import numpy as np
from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
status = MPI.Status()

# Parameters: default value and description
parameters = {
    "element": ("Cobalt", "Chemical element of the sample"),
    "N": (5000, "Number of time iterations"),  # default 5000
    "dt": (1.0e-14, "Time step"),  # default 1.e-14
    "Jx": (300, "Number of points in x"),
    "Jy": (21, "Number of points in y"),
    "Jz": (21, "Number of points in z"),
    "dx": (1.0e-9, "Step in x"),  # default 1.e-9
    "T": (0.0, "Temperature"),
    "H_ext": (
        0.0 / (4 * np.pi * 1.0e-7),
        "External field",
    ),  # must be constant, default 0.0
    "n_average": (4000, "Start index of time average"),  # default 4000
    "n_integral": (1, "Spatial average frequency (number of iterations)"),
    "n_profile": (0, "x-profile save frequency (number of iterations)"),
}

json_file = "run.json"


[docs] def progress_bar(rank: int, it, prefix="", size=60, out=sys.stdout): """ Displays a progress bar (Source: https://stackoverflow.com/a/34482761/16593179) """ count = len(it) def show(j): x = int(size * j / count) if rank == 0: print( f"{prefix}[{u'â–ˆ'*x}{('.'*(size-x))}] {j}/{count}", end="\r", file=out, flush=True, ) show(0) for i, item in enumerate(it): yield item # To avoid slowing down the computation, we do not display at every iteration if i % 5 == 0: show(i + 1) show(i + 1) if rank == 0: print("\n", flush=True, file=out)
[docs] @dataclass class Grid: """Stores grid data""" # Parameter values correspond to the global grid Jx: int Jy: int Jz: int dx: float def __post_init__(self) -> None: """Compute grid characteristics""" self.dy = self.dz = self.dx # Setting dx = dy = dz self.Lx = (self.Jx - 1) * self.dx self.Ly = (self.Jy - 1) * self.dy self.Lz = (self.Jz - 1) * self.dz # shape of the local array to the process self.dims = self.Jx // size, self.Jy, self.Jz # elemental volume of a cell self.dV = self.dx * self.dy * self.dz # total volume self.V = self.Lx * self.Ly * self.Lz # total number of points self.ntot = self.Jx * self.Jy * self.Jz self.ncell = (self.Jx - 1) * (self.Jy - 1) * (self.Jz - 1) def __repr__(self): s = "\t" + "\t\t".join(("x", "y", "z")) + "\n" s += f"J =\t{self.Jx}\t\t{self.Jy}\t\t{self.Jz}\n" s += f"L =\t{self.Lx}\t\t{self.Ly}\t\t{self.Lz}\n" s += f"d =\t{self.dx:.08e}\t{self.dy:.08e}\t{self.dz:.08e}\n\n" s += f"dV = {self.dV:.08e}\n" s += f"V = {self.V:.08e}\n" s += f"ntot = {self.ntot:d}\n" return s
[docs] def get_filename( self, T: float, name: str = "m1_integral_space", extension="txt" ) -> str: """ Returns the output file name for a given temperature >>> g = Grid(Jx=300, Jy=21, Jz=21, dx=1.e-9) >>> g.get_filename(1100) 'm1_integral_space_T1100_300x21x21_np<MPI_size>.txt' """ suffix = f"T{int(T)}_{self.Jx}x{self.Jy}x{self.Jz}_np{size}" return f"{name}_{suffix}.{extension}"
[docs] def get_mesh(self, loc: bool = True) -> list: """ Returns a list of 3D arrays with the coordinates of the grid points Args: loc: if True, returns the local coordinates, otherwise the global coordinates """ xglob = np.linspace(0, self.Lx, self.Jx) # global coordinates if loc: xloc = np.split(xglob, size)[rank] # local coordinates x = xloc else: x = xglob return np.meshgrid( x, np.linspace(0, self.Ly, self.Jy), np.linspace(0, self.Lz, self.Jz), indexing="ij", )
[docs] class Element: """Abstract class for chemical elements""" A = 0.0 K = 0.0 gamma = 0.0 mu_0 = 0.0 k_B = 0.0 lambda_G = 0.0 M_s = 0.0 a_eff = 0.0
[docs] def __init__(self, T: float, H_ext: float, g: Grid, dt: float) -> None: self.H_ext = H_ext self.g = g self.dt = dt self.gamma_0 = self.gamma * self.mu_0 self.d0 = np.sqrt(self.A / self.K) # only if K is positive # --- Characteristic Scales --- self.coeff_1 = self.gamma_0 * 2.0 * self.A / (self.mu_0 * self.M_s) self.coeff_2 = self.gamma_0 * 2.0 * self.K / (self.mu_0 * self.M_s) self.coeff_3 = self.gamma_0 * H_ext # corresponds to the temperature actually put into the random field T_simu = T * self.g.dx / self.a_eff # calculation of the random field related to temperature # (we only take the volume over one mesh) h_alea = np.sqrt( 2 * self.lambda_G * self.k_B / (self.gamma_0 * self.mu_0 * self.M_s * self.g.dV) ) H_alea = h_alea * np.sqrt(T_simu) * np.sqrt(1.0 / self.dt) self.coeff_4 = H_alea * self.gamma_0
def get_CFL(self): return self.dt * self.coeff_1 / self.g.dx**2
[docs] class Cobalt(Element): A = 30.0e-12 K = 520.0e3 gamma = 1.76e11 mu_0 = 1.26e-6 k_B = 1.38e-23 lambda_G = 0.5 # 0.5 par defaut M_s = 1400.0e3 a_eff = 0.25e-9 anisotropy = "uniaxial"
[docs] class Iron(Element): A = 21.0e-12 K = 48.0e3 gamma = 1.76e11 mu_0 = 1.26e-6 gamma_0 = gamma * mu_0 k_B = 1.38e-23 lambda_G = 0.5 # 0.5 par defaut M_s = 1700.0e3 a_eff = 0.286e-9 anisotropy = "cubic"
[docs] class Nickel(Element): A = 9.0e-12 K = -5.7e3 gamma = 1.76e11 mu_0 = 1.26e-6 gamma_0 = gamma * mu_0 k_B = 1.38e-23 lambda_G = 0.5 # 0.5 par defaut M_s = 490.0e3 a_eff = 0.345e-9 anisotropy = "cubic"
[docs] def get_boundaries_x( g: Grid, m, blocking: bool = False ) -> tuple[np.ndarray, np.ndarray, MPI.Request, MPI.Request]: """ Returns the boundaries asynchronously: allows overlapping communication time of boundaries with calculations """ # Extract slices for Neumann boundary conditions m_start_x = np.empty((1, g.Jy, g.Jz)) m_end_x = np.empty_like(m_start_x) # Prepare ring communication: # Even if procs 0 and size - 1 shouldn't receive anything from left # and right respectively, it's simpler to express it like this right = (rank + 1) % size left = (rank - 1 + size) % size if blocking: # Wait for boundaries to be available comm.Sendrecv(m[:1, :, :], dest=left, sendtag=0, recvbuf=m_end_x, source=right) comm.Sendrecv( m[-1:, :, :], dest=right, sendtag=1, recvbuf=m_start_x, source=left ) return m_start_x, m_end_x, None, None else: request_start = comm.Irecv(m_start_x, source=left, tag=201) request_end = comm.Irecv(m_end_x, source=right, tag=202) comm.Isend(m[-1:, :, :], dest=right, tag=201) comm.Isend(m[:1, :, :], dest=left, tag=202) return m_start_x, m_end_x, request_start, request_end
[docs] def calculate_laplacian( e: Element, g: Grid, m: np.ndarray, m_start_x: np.ndarray, m_end_x: np.ndarray, request_end: MPI.Request, request_start: MPI.Request, ) -> np.ndarray: """ Returns the Laplacian of m (* coeff_1) in 3D. We start by calculating contributions in y and z, to wait for the end of communications in x. """ # Extract slices for Neumann boundary conditions m_start_y = m[:, 1:2, :] m_end_y = m[:, -2:-1, :] m_start_z = m[:, :, 1:2] m_end_z = m[:, :, -2:-1] m_y_plus = np.concatenate((m[:, 1:, :], m_end_y), axis=1) m_y_minus = np.concatenate((m_start_y, m[:, :-1, :]), axis=1) m_z_plus = np.concatenate((m[:, :, 1:], m_end_z), axis=2) m_z_minus = np.concatenate((m_start_z, m[:, :, :-1]), axis=2) laplacian = ( (m_y_plus + m_y_minus) / g.dy**2 + (m_z_plus + m_z_minus) / g.dz**2 - 2 * (1 / g.dx**2 + 1 / g.dy**2 + 1 / g.dz**2) * m ) # Wait for x-boundaries to be available (communications completed) try: request_end.Wait(status) request_start.Wait(status) except AttributeError: pass # For extreme procs, apply Neumann boundary conditions in x if rank == size - 1: m_end_x = m[-2:-1, :, :] if rank == 0: m_start_x = m[1:2, :, :] m_x_plus = np.concatenate((m[1:, :, :], m_end_x), axis=0) m_x_minus = np.concatenate((m_start_x, m[:-1, :, :]), axis=0) laplacian += (m_x_plus + m_x_minus) / g.dx**2 return e.coeff_1 * laplacian
[docs] def calculate_si( e: Element, g: Grid, m1: np.ndarray, m2: np.ndarray, m3: np.ndarray, R_alea: np.ndarray, boundaries, ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """Returns the s_i = a_i + b_i""" # Precalculate terms used multiple times l_G_m1m2 = e.lambda_G * m1 * m2 l_G_m1m3 = e.lambda_G * m1 * m3 l_G_m2m3 = e.lambda_G * m2 * m3 m1m1 = m1 * m1 m2m2 = m2 * m2 m3m3 = m3 * m3 laplacian_m1 = calculate_laplacian(e, g, m1, *boundaries[0]) laplacian_m2 = calculate_laplacian(e, g, m2, *boundaries[1]) laplacian_m3 = calculate_laplacian(e, g, m3, *boundaries[2]) if e.anisotropy == "uniaxial": aniso_1 = m1 aniso_2 = np.zeros(np.shape(m1)) aniso_3 = np.zeros(np.shape(m1)) if e.anisotropy == "cubic": aniso_1 = -(1 - m1m1 + m2m2 * m3m3) * m1 aniso_2 = -(1 - m2m2 + m1m1 * m3m3) * m2 aniso_3 = -(1 - m3m3 + m1m1 * m2m2) * m3 R_1 = laplacian_m1 + e.coeff_2 * aniso_1 + e.coeff_3 + e.coeff_4 * R_alea[0] R_2 = laplacian_m2 + e.coeff_2 * aniso_2 + e.coeff_4 * R_alea[1] R_3 = laplacian_m3 + e.coeff_2 * aniso_3 + e.coeff_4 * R_alea[2] s1 = ( (-m2 - l_G_m1m3) * R_3 + (m3 - l_G_m1m2) * R_2 + e.lambda_G * (m2m2 + m3m3) * R_1 ) s2 = ( (-m3 - l_G_m1m2) * R_1 + (m1 - l_G_m2m3) * R_3 + e.lambda_G * (m1m1 + m3m3) * R_2 ) s3 = ( (-m1 - l_G_m2m3) * R_2 + (m2 - l_G_m1m3) * R_1 + e.lambda_G * (m1m1 + m2m2) * R_3 ) return s1, s2, s3
[docs] def integral(g, m: np.ndarray) -> float: """ Returns the spatial average of m of shape (g.dims) using the midpoint method on each process """ # Make a copy of m to avoid modifying its value mm = m.copy() # On y and z edges, divide the contribution by 2 mm[:, 0, :] /= 2 mm[:, -1, :] /= 2 mm[:, :, 0] /= 2 mm[:, :, -1] /= 2 # On x edges (only on extreme procs), divide the contribution by 2 if rank == 0: mm[0] /= 2 if rank == size - 1: mm[-1] /= 2 local_sum = mm.sum() # Sum across all processes gathered by process 0 global_sum = comm.reduce(local_sum) # Spatial average is the global sum divided by the number of cells return global_sum / g.ncell if rank == 0 else 0.0
[docs] def integral_yz(m: np.ndarray) -> np.ndarray: """ Returns the spatial average of shape (g.dims[0],) in y and z of m of shape (g.dims) using the midpoint method """ # Make a copy of m to avoid modifying its value mm = m.copy() # On y and z edges, divide the contribution by 2 mm[:, 0, :] /= 2 mm[:, -1, :] /= 2 mm[:, :, 0] /= 2 mm[:, :, -1] /= 2 n_cell_yz = (mm.shape[1] - 1) * (mm.shape[2] - 1) return mm.sum(axis=(1, 2)) / n_cell_yz
[docs] def profile(m: np.ndarray, m_xprof: np.ndarray): """ Retrieves the x profile of the average of m in y and z """ # Gather m in mglob m_mean_yz = integral_yz(m) comm.Gather(m_mean_yz, m_xprof)
[docs] def theta_init(t: float, g: Grid) -> np.ndarray: """Initialization of theta""" x, y, z = g.get_mesh() # return 2.*np.arctan(np.exp(-(x-g.Lx/2+e.d0*e.coeff_3*e.lambda_G*t)/e.d0)) return np.zeros(g.dims)
[docs] def phi_init(t: float, g: Grid, e: Element) -> np.ndarray: """Initialization of phi""" # return np.zeros(shape) + e.coeff_3 * t return np.zeros(g.dims) + e.gamma_0 * e.H_ext * t
[docs] def simulate( N: int, Jx: int, Jy: int, Jz: int, dx: float, T: float, H_ext: float, dt: float, n_average: int, n_integral: int, n_profile: int, element: Element = Cobalt, blocking: bool = False, ): """ Simulates the system for N iterations Returns the computation time, output filename and the temporal average """ if Jx % size != 0: if rank == 0: print( f"Error: Jx must be divisible by the number of processes" f"({Jx = }, np = {size})" ) comm.barrier() MPI.Finalize() exit(2) # Initialize a sequence of random seeds # See: https://numpy.org/doc/stable/reference/random/parallel.html ss = np.random.SeedSequence(12345) # Deploy size x SeedSequence to pass to child processes child_seeds = ss.spawn(size) streams = [np.random.default_rng(s) for s in child_seeds] rng = streams[rank] # Create the grid g = Grid(Jx=Jx, Jy=Jy, Jz=Jz, dx=dx) if rank == 0: print(g) e = element(T, H_ext, g, dt) if rank == 0: print(f"CFL = {e.get_CFL()}") m1 = np.zeros((2,) + g.dims) m2 = np.zeros_like(m1) m3 = np.zeros_like(m1) theta = theta_init(0, g) phi = phi_init(0, g, e) m1[0] = np.cos(theta) m2[0] = np.sin(theta) * np.cos(phi) m3[0] = np.sin(theta) * np.sin(phi) m_xprof = np.zeros(g.Jx) # global coordinates output_filenames = [] if n_integral != 0: output_filenames.append(g.get_filename(T, extension="txt")) if n_profile != 0: output_filenames.extend( [g.get_filename(T, name=f"m{i+1}", extension="npy") for i in range(3)] ) if rank == 0: if n_integral != 0: f_integral = open(output_filenames[0], "w") # integral of m1 if n_profile != 0: f_profiles = [ open(output_filename, "wb") for output_filename in output_filenames[1:] ] # x profiles of m_i t = 0.0 m1_average = 0.0 start_time = time.perf_counter() for n in progress_bar(rank, range(1, N + 1), "Iteration: ", 40): t += dt x_boundaries = [ get_boundaries_x(g, m[0], blocking=blocking) for m in (m1, m2, m3) ] # adding randomness: effect of temperature R_random = rng.standard_normal((3, *g.dims)) # prediction phase s1_pre, s2_pre, s3_pre = calculate_si( e, g, m1[0], m2[0], m3[0], R_random, x_boundaries ) # update m1[1] = m1[0] + dt * s1_pre m2[1] = m2[0] + dt * s2_pre m3[1] = m3[0] + dt * s3_pre # correction phase x_boundaries = [ get_boundaries_x(g, m[0], blocking=blocking) for m in (m1, m2, m3) ] s1_cor, s2_cor, s3_cor = calculate_si( e, g, m1[1], m2[1], m3[1], R_random, x_boundaries ) # update m1[1] = m1[0] + dt * 0.5 * (s1_pre + s1_cor) m2[1] = m2[0] + dt * 0.5 * (s2_pre + s2_cor) m3[1] = m3[0] + dt * 0.5 * (s3_pre + s3_cor) # renormalize to verify the constraint of being on the sphere norm = np.sqrt(m1[1] ** 2 + m2[1] ** 2 + m3[1] ** 2) m1[1] /= norm m2[1] /= norm m3[1] /= norm m1[0] = m1[1] m2[0] = m2[1] m3[0] = m3[1] # Export the average of m1 to a file if n_integral != 0 and n % n_integral == 0: m1_integral_global = integral(g, m1[0]) if rank == 0: if n >= n_average: m1_average += m1_integral_global * n_integral f_integral.write(f"{t:10.8e} {m1_integral_global:10.8e}\n") # Export the x profiles of the averaged m_i in y and z if n_profile != 0 and n % n_profile == 0: for i, m in enumerate((m1[0], m2[0], m3[0])): profile(m, m_xprof) if rank == 0: # add an x profile to the file np.save(f_profiles[i], m_xprof) total_time = time.perf_counter() - start_time if rank == 0: if n_integral != 0: f_integral.close() if n_profile != 0: for i in range(3): f_profiles[i].close() if n > n_average: m1_average /= N - n_average print(f"{m1_average = :e}") return total_time, output_filenames, m1_average
[docs] class ArgumentParser(argparse.ArgumentParser): """An argument parser compatible with MPI""" def _print_message(self, message, file=None): if rank == 0 and message: if file is None: file = sys.stderr file.write(message) def exit(self, status=0, message=None): if message: self._print_message(message, sys.stderr) comm.barrier() MPI.Finalize() exit(status)
[docs] def get_element_class(element_name: str): """Returns the chemical element class from its name""" for cls in Element.__subclasses__(): if cls.__name__ == element_name: return cls
[docs] def simulate_temperature(params: dict) -> dict: """ Runs a simulation for a given parameter set. Returns a dictionary of the run. """ # Backup copy as run['element'] will be modified run = {"params": params.copy(), "results": {}} run["params"]["np"] = size # Referencing the element class from the string params["element"] = globals()[run["params"]["element"]] # Run the simulation total_time, filenames, m1_mean = simulate(**params) if rank == 0: N = run["params"]["N"] run["results"] = { params["T"]: {"total_time": total_time} } # Export the integral of m1 if len(filenames) > 0: run["results"][params["T"]]["integral_file"] = filenames[0] print(f"Integral of m1 in {filenames[0]}") # Export the x-profiles of m1, m2 and m3 if len(filenames) > 1: for i in range(1, 4): run["results"][params["T"]][f"xprofile_m{i}"] = filenames[i] print(f"x-profile of m{i} in {filenames[i]}") print(f"N iterations = {N}") print(f"total_time [s] = {total_time:.03f}") print(f"time/ite [s/iter] = {total_time / N:.03e}") if N > run["params"]["n_average"]: print(f"m1_mean = {m1_mean:e}") run["results"][params["T"]]["m1_mean"] = m1_mean return run
[docs] def temperature_variation(params: dict): """Sweeps an array of temperature""" # Initialize the run dictionary run = {"params": params.copy(), "results": {}} for T in params["T"]: if rank == 0: print("-------------") print(f"T[K] = {T}") print("-------------") params_T = params.copy() params_T["T"] = T # replace the list with the element run_T = simulate_temperature(params_T) # Update the results dictionary if rank == 0: run["results"][T] = run_T["results"][T] return run
[docs] def parameter_list(d: dict) -> str: """Returns parameter values as a string""" width = max([len(s) for s in d]) s = "" for k, v in d.items(): sep = ":" if isinstance(v, str) else "=" s += "{0:<{1}} {2} {3}\n".format(k, width, sep, v) return s
[docs] def write_json(run: dict): """Writes the run dictionary to a JSON file""" if rank == 0: with open(json_file, "w") as f: json.dump(run, f, indent=4) print(f"Summary in {json_file}")
[docs] def parse_args(args) -> argparse.Namespace: """Argument parser for llg3d""" parser = ArgumentParser( description=__doc__, formatter_class=argparse.ArgumentDefaultsHelpFormatter ) # Automatically add arguments from the parameter dictionary for name, data in parameters.items(): value, description = data nargs = "*" if name == "T" else None # T may be a list parser.add_argument( f"-{name}", type=type(value), nargs=nargs, default=value, help=description ) # Add an argument to handle the type of communications parser.add_argument( "-b", "--blocking", action="store_true", help="Use blocking communications", ) return parser.parse_args(args)
[docs] def main(args_main=None): """Evaluates the command line and runs the simulation""" args = parse_args(args_main) if rank == 0: # Display parameters as a list print(parameter_list(vars(args))) if isinstance(args.T, float) or len(args.T) == 1: if isinstance(args.T, list): vars(args)["T"] = vars(args)["T"][0] run = simulate_temperature(vars(args)) write_json(run) else: run = temperature_variation(vars(args)) write_json(run)
if __name__ == "__main__": main()