Source code for llg3d.solver.mpi

"""
LLG3D Solver using MPI.

The parallelization is done in the x direction.
"""

import argparse
import sys
import time

import numpy as np
from mpi4py import MPI

from .. import comm, rank, size, status
from ..output import progress_bar, get_output_files, close_output_files
from ..grid import Grid
from ..element import Element
from .solver import cross_product, compute_H_anisotropy


[docs] def get_boundaries_x( g: Grid, m: np.ndarray, blocking: bool = False ) -> tuple[np.ndarray, np.ndarray, MPI.Request, MPI.Request]: """ Returns the boundaries asynchronously. Allows overlapping communication time of boundaries with calculations. Args: g: Grid object m: Magnetization array (shape (3, nx, ny, nz)) blocking: Whether to use blocking communication for boundaries Returns: - m_i_x_start: Start boundary in x direction - m_i_x_end: End boundary in x direction - request_start: Request for start boundary - request_end: Request for end boundary """ # Extract slices for Neumann boundary conditions m_i_x_start = np.empty((1, g.Jy, g.Jz)) m_i_x_end = np.empty_like(m_i_x_start) # Prepare ring communication: # Even if procs 0 and size - 1 shouldn't receive anything from left # and right respectively, it's simpler to express it like this right = (rank + 1) % size left = (rank - 1 + size) % size if blocking: # Wait for boundaries to be available comm.Sendrecv( m[:1, :, :], dest=left, sendtag=0, recvbuf=m_i_x_end, source=right ) comm.Sendrecv( m[-1:, :, :], dest=right, sendtag=1, recvbuf=m_i_x_start, source=left ) return m_i_x_start, m_i_x_end, None, None else: request_start = comm.Irecv(m_i_x_start, source=left, tag=201) request_end = comm.Irecv(m_i_x_end, source=right, tag=202) comm.Isend(m[-1:, :, :], dest=right, tag=201) comm.Isend(m[:1, :, :], dest=left, tag=202) return m_i_x_start, m_i_x_end, request_start, request_end
[docs] def laplacian3D( m_i: np.ndarray, dx2_inv: float, dy2_inv: float, dz2_inv: float, center_coeff: float, m_i_x_start: np.ndarray, m_i_x_end: np.ndarray, request_end: MPI.Request, request_start: MPI.Request, ) -> np.ndarray: """ Returns the Laplacian of m_i in 3D. We start by calculating contributions in y and z, to wait for the end of communications in x. Args: m_i: i-component of the magnetization array (shape (nx, ny, nz)) dx2_inv: inverse of the squared grid spacing in x direction dy2_inv: inverse of the squared grid spacing in y direction dz2_inv: inverse of the squared grid spacing in z direction center_coeff: center coefficient for the Laplacian m_i_x_start: start boundary in x direction m_i_x_end: end boundary in x direction request_start: request for start boundary request_end: request for end boundary Returns: Laplacian of m_i (shape (nx, ny, nz)) """ # Extract slices for Neumann boundary conditions m_i_y_start = m_i[:, 1:2, :] m_i_y_end = m_i[:, -2:-1, :] m_i_z_start = m_i[:, :, 1:2] m_i_z_end = m_i[:, :, -2:-1] m_i_y_plus = np.concatenate((m_i[:, 1:, :], m_i_y_end), axis=1) m_i_y_minus = np.concatenate((m_i_y_start, m_i[:, :-1, :]), axis=1) m_i_z_plus = np.concatenate((m_i[:, :, 1:], m_i_z_end), axis=2) m_i_z_minus = np.concatenate((m_i_z_start, m_i[:, :, :-1]), axis=2) laplacian = ( dy2_inv * (m_i_y_plus + m_i_y_minus) + dz2_inv * (m_i_z_plus + m_i_z_minus) + center_coeff * m_i ) # Wait for x-boundaries to be available (communications completed) try: request_end.Wait(status) request_start.Wait(status) except AttributeError: pass # Blocking case # For extreme procs, apply Neumann boundary conditions in x if rank == size - 1: m_i_x_end = m_i[-2:-1, :, :] if rank == 0: m_i_x_start = m_i[1:2, :, :] m_i_x_plus = np.concatenate((m_i[1:, :, :], m_i_x_end), axis=0) m_i_x_minus = np.concatenate((m_i_x_start, m_i[:-1, :, :]), axis=0) laplacian += dx2_inv * (m_i_x_plus + m_i_x_minus) return laplacian
[docs] def compute_laplacian( g: Grid, m: np.ndarray, boundaries: tuple[np.ndarray, np.ndarray, MPI.Request, MPI.Request], ) -> np.ndarray: """ Compute the laplacian of m in 3D. Args: g: Grid object m: Magnetization array (shape (3, nx, ny, nz)) boundaries: Boundaries for x direction Returns: Laplacian of m (shape (3, nx, ny, nz)) """ dx2_inv, dy2_inv, dz2_inv, center_coeff = g.get_laplacian_coeff() return np.stack( [ laplacian3D(m[0], dx2_inv, dy2_inv, dz2_inv, center_coeff, *boundaries[0]), laplacian3D(m[1], dx2_inv, dy2_inv, dz2_inv, center_coeff, *boundaries[1]), laplacian3D(m[2], dx2_inv, dy2_inv, dz2_inv, center_coeff, *boundaries[2]), ], axis=0, )
[docs] def compute_slope( e: Element, g: Grid, m: np.ndarray, R_random: np.ndarray, boundaries: tuple[np.ndarray, np.ndarray, MPI.Request, MPI.Request], ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """ Compute the slope of the LLG equation. Args: g: Grid object e: Element object m: Magnetization array (shape (3, nx, ny, nz)) R_random: Random field array (shape (3, nx, ny, nz)) boundaries: Boundaries for x direction Returns: Slope array (shape (3, nx, ny, nz)) """ # Precalculate terms used multiple times H_aniso = compute_H_anisotropy(e, m) laplacian_m = compute_laplacian(g, m, boundaries) R_eff = e.coeff_1 * laplacian_m + R_random + H_aniso R_eff[0] += e.coeff_3 m_cross_R_eff = cross_product(m, R_eff) m_cross_m_cross_R_eff = cross_product(m, m_cross_R_eff) s = -(m_cross_R_eff + e.lambda_G * m_cross_m_cross_R_eff) return s
[docs] def space_average(g: Grid, m: np.ndarray) -> float: """ Returns the spatial average of m with shape (g.dims) using the midpoint method. Performs the local sum on each process and then reduces it to process 0. Args: g: Grid object m: Array to be integrated (shape (x, y, z)) Returns: Spatial average of m """ # Make a copy of m to avoid modifying its value mm = m.copy() # On y and z edges, divide the contribution by 2 mm[:, 0, :] /= 2 mm[:, -1, :] /= 2 mm[:, :, 0] /= 2 mm[:, :, -1] /= 2 # On x edges (only on extreme procs), divide the contribution by 2 if rank == 0: mm[0] /= 2 if rank == size - 1: mm[-1] /= 2 local_sum = mm.sum() # Sum across all processes gathered by process 0 global_sum = comm.reduce(local_sum) # Spatial average is the global sum divided by the number of cells return global_sum / g.ncell if rank == 0 else 0.0
[docs] def integral_yz(m: np.ndarray) -> np.ndarray: """ Returns the spatial average of m using the midpoint method along y and z. Args: m: Array to be integrated Returns: np.ndarray: Spatial average of m in y and z of shape (g.dims[0],) """ # Make a copy of m to avoid modifying its value mm = m.copy() # On y and z edges, divide the contribution by 2 mm[:, 0, :] /= 2 mm[:, -1, :] /= 2 mm[:, :, 0] /= 2 mm[:, :, -1] /= 2 n_cell_yz = (mm.shape[1] - 1) * (mm.shape[2] - 1) return mm.sum(axis=(1, 2)) / n_cell_yz
[docs] def profile(m: np.ndarray, m_xprof: np.ndarray): """ Retrieves the x profile of the average of m in y and z. Args: m: Array to be integrated m_xprof: Array to store the x profile """ # Gather m in mglob m_mean_yz = integral_yz(m) comm.Gather(m_mean_yz, m_xprof)
[docs] def theta_init(t: float, g: Grid) -> np.ndarray: """Initialization of theta.""" x, y, z = g.get_mesh(local=True) return np.zeros(g.dims)
[docs] def phi_init(t: float, g: Grid, e: Element) -> np.ndarray: """Initialization of phi.""" # return np.zeros(shape) + e.coeff_3 * t return np.zeros(g.dims) + e.gamma_0 * e.H_ext * t
[docs] def simulate( N: int, Jx: int, Jy: int, Jz: int, dx: float, T: float, H_ext: float, dt: float, start_averaging: int, n_mean: int, n_profile: int, element_class: Element, seed: int, blocking: bool, **_, ) -> tuple[float, str, float]: """ Simulates the system for N iterations. Args: N: Number of iterations Jx: Number of grid points in x direction Jy: Number of grid points in y direction Jz: Number of grid points in z direction dx: Grid spacing T: Temperature in Kelvin H_ext: External magnetic field strength dt: Time step for the simulation start_averaging: Number of iterations for averaging n_mean: Number of iterations for integral output n_profile: Number of iterations for profile output element_class: Element of the sample (default: Cobalt) blocking: Whether to use blocking communication for boundaries seed: Random seed for temperature fluctuations Returns: - The grid object - The time taken for the simulation - The output filename """ if Jx % size != 0: if rank == 0: print( f"Error: Jx must be divisible by the number of processes" f"({Jx = }, np = {size})" ) comm.barrier() MPI.Finalize() exit(2) # Initialize a sequence of random seeds # See: https://numpy.org/doc/stable/reference/random/parallel.html ss = np.random.SeedSequence(seed) # Deploy size x SeedSequence to pass to child processes child_seeds = ss.spawn(size) streams = [np.random.default_rng(s) for s in child_seeds] rng = streams[rank] # Create the grid g = Grid(Jx, Jy, Jz, dx) if rank == 0: print(g) e = element_class(T, H_ext, g, dt) if rank == 0: print(f"CFL = {e.get_CFL()}") m_n = np.zeros((3,) + g.dims) theta = theta_init(0, g) phi = phi_init(0, g, e) m_n[0] = np.cos(theta) m_n[1] = np.sin(theta) * np.cos(phi) m_n[2] = np.sin(theta) * np.sin(phi) m_xprof = np.zeros(g.Jx) # global coordinates f_mean, f_profiles, output_filenames = get_output_files(g, T, n_mean, n_profile) t = 0.0 m1_average = 0.0 start_time = time.perf_counter() for n in progress_bar(range(1, N + 1), "Iteration: ", 40): t += dt # Prediction phase x_boundaries = [ get_boundaries_x(g, m_n[i], blocking=blocking) for i in range(3) ] # adding randomness: effect of temperature R_random = e.coeff_4 * rng.standard_normal((3, *g.dims)) s_pre = compute_slope(e, g, m_n, R_random, x_boundaries) m_pre = m_n + dt * s_pre # correction phase x_boundaries = [ get_boundaries_x(g, m_pre[i], blocking=blocking) for i in range(3) ] s_cor = compute_slope(e, g, m_pre, R_random, x_boundaries) m_n += dt * 0.5 * (s_pre + s_cor) # renormalize to verify the constraint of being on the sphere norm = np.sqrt(m_n[0] ** 2 + m_n[1] ** 2 + m_n[2] ** 2) m_n /= norm # Export the average of m1 to a file if n_mean != 0 and n % n_mean == 0: m1_mean = space_average(g, m_n[0]) if rank == 0: if n >= start_averaging: m1_average += m1_mean * n_mean f_mean.write(f"{t:10.8e} {m1_mean:10.8e}\n") # Export the x profiles of the averaged m_i in y and z if n_profile != 0 and n % n_profile == 0: for i in 0, 1, 2: profile(m_n[i], m_xprof) if rank == 0: # add an x profile to the file np.save(f_profiles[i], m_xprof) total_time = time.perf_counter() - start_time if rank == 0: close_output_files(f_mean, f_profiles) if n > start_averaging: m1_average /= N - start_averaging return total_time, output_filenames, m1_average
[docs] class ArgumentParser(argparse.ArgumentParser): """An argument parser compatible with MPI.""" def _print_message(self, message, file=None): if rank == 0 and message: if file is None: file = sys.stderr file.write(message)
[docs] def exit(self, status=0, message=None): """ Exit the program using MPI finalize. Args: status: Exit status code message: Optional exit message file: Output file (default: stderr) """ if message: self._print_message(message, sys.stderr) comm.barrier() MPI.Finalize() exit(status)