diff --git a/.github/workflows/main.yaml b/.github/workflows/main.yaml index 14369869..4d6695ec 100644 --- a/.github/workflows/main.yaml +++ b/.github/workflows/main.yaml @@ -3,11 +3,9 @@ name: Build and test on: workflow_dispatch: push: - branches: - - "**" + branches: [devel, main] pull_request: - branches: - - "main" + branches: [devel, main] jobs: pre_job: @@ -36,6 +34,8 @@ jobs: defaults: run: shell: ${{ matrix.platform.shell }} + env: + ENVIRONMENT_FILE: ${{ matrix.platform.name == 'macos' && 'environment_macos.yaml' || 'environment.yaml' }} steps: # - name: Checkout the repository (commit) @@ -53,7 +53,7 @@ jobs: auto-update-conda: true python-version: ${{ matrix.python-version }} activate-environment: somd2 - environment-file: environment.yaml + environment-file: ${{ env.ENVIRONMENT_FILE }} miniforge-version: latest run-post: ${{ matrix.platform.name != 'windows' }} # diff --git a/README.md b/README.md index 72f55293..52d90089 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,9 @@ conda env create -f environment.yaml (We recommend using [Miniforge](https://github.com/conda-forge/miniforge).) +> [!NOTE] +> On macOS, you will need to use the `environment_macos.yaml` file instead. + Now install `somd2` into the environment: ``` @@ -33,8 +36,10 @@ somd2 --help In order to run an alchemical free-energy simulation you will need to first create a stream file containing the _perturbable_ system of interest. -This can be created using [BioSimSpace](https://github.com/OpenBioSim/biosimspace). For example, following the tutorial -[here](https://biosimspace.openbiosim.org/versions/2023.4.0/tutorials/hydration_freenrg.html). Once the system is created, it can be streamed to file using, e.g.: +This can be created using [BioSimSpace](https://github.com/OpenBioSim/biosimspace). +For example, following the tutorial +[here](https://biosimspace.openbiosim.org/versions/2023.4.0/tutorials/hydration_freenrg.html). +Once the system is created, it can be streamed to file using, e.g.: ```python import BioSimSpace as BSS @@ -112,12 +117,31 @@ the value of `--rest2-scale`. By passing multiple values for `--rest2-scale`, th user can fully control the schedule. When doing so, the number of values must match the number of lambda windows. +## GCMC + +SOMD2 also supports grand canonical Monte Carlo (GCMC) water sampling using +the [loch](https://github.com/OpenBioSim/loch) package. This can be enabled +using the `--gcmc` option. To define a GCMC region, use the `--gcmc-selection` +option, which should be a `Sire` selection string that specifies the atoms +defining the centre of geometry for the GCMC region. The radius of the GCMC +sphere can be controlled using the `--gcmc-radius` option. To see all GCMC +related options, run: + +``` +somd2 --help | grep -A2 ' --gcmc' +``` + +> [!NOTE] +> GCMC is currently only supported when using the CUDA platform and isn't +> available on macOS, where the `pycuda` package is not available. + ## Analysis Simulation output will be written to the directory specified using the `--output-directory` parameter. This will contain a number of files, including -Parquet files for the energy trajectories of each λ window. These can be -processed using [BioSimSpace](https://github.com/OpenBioSim/biosimspace) as follows: +[Parquet files](https://en.wikipedia.org/wiki/Apache_Parquet) for the energy +trajectories of each λ window. These can be processed using +[BioSimSpace](https://github.com/OpenBioSim/biosimspace) as follows: ```python import BioSimSpace as BSS @@ -149,6 +173,15 @@ can be controlled via the `--null-energy` option. The number of neighbours shoul be chosen as a trade off between accuracy and computational cost. A value of around 20% of the number of replicas has been found to be a good starting point. +## Ghost atom modifications + +We support modification of ghost atom bonded terms to avoid spurious coupling +to the physical system using the approach described in +[this](https://pubs.acs.org/doi/10.1021/acs.jctc.0c01328) paper. +These are enabled by default, but can be disabled using the ``--no-ghost-modifications`` +option. Modifications are implemented using the [ghostly](https://gitbub.com/OpenBioSim/ghostly) +package. + ## Note for SOMD1 users For existing users of `somd1`, it's possible to generate input for `somd2` by passing @@ -186,5 +219,42 @@ somd2 somd1.bss --pert-file somd1.pert --somd1-compatibility (This only shows the limited options required. Others will take default values and can be set accordingly.) If you want to load an existing system from a perturbation file and use the -new `somd2` ghost atom bonded-term modifications, then simply omit the -`--somd1-compatibility` option. +new `somd2` [ghost atom bonded-term modifications](https://github.com/OpenBioSim/ghostly), +then simply omit the `--somd1-compatibility` option. + +## GPU oversubscription + +If you have an NVIDIA GPU that supports the multi-process service (MPS), you can +oversubscibe the GPU to run multiple OpenMM contexts on the same GPU at once, +increasing the throughput of your simulation. To do this, you will need to first +enable MPS by running the following command: + +``` +nvidia-cuda-mps-control -d +``` + +The number of contexts that can be run in parallel is then controlled by the +`--oversubscription-factor` option, which defaults to 1. + +More details on MPS, including tuning options, can be found in the following +[techical blog](https://developer.nvidia.com/blog/maximizing-openmm-molecular-dynamics-throughput-with-nvidia-multi-process-service/). + +## Python API + +``SOMD2`` can also be used as a Python API, allowing it to be embedded +within other Python scripts. When doing so, it is important to to wrap +code within a ``if __name__ == "__main__":`` block since multiprocessing +is used with the ``spawn`` start method. + +**## Known issues** + +If using the regular `Runner` class via the Python API, then you will need to +guard calls to its `run()` method within a `if __name__ == "__main__":` block +since it uses multiprocessing with the `spawn` start method. + +During a checkpoint cycle trajectory frames are stored in memory before being +paged to disk. When running replica exchange simulations with a large number +of replicas this can lead to exceeding the temporary file storage limit on +some systems, causing the simulation to hang. This can be resolved by either +reducing the frequency at which frames are stored, or checkpointing more. +(Frames are written to disk and cleared from memory at each checkpoint.) diff --git a/environment.yaml b/environment.yaml index d7f280a9..be77b6cd 100644 --- a/environment.yaml +++ b/environment.yaml @@ -7,5 +7,10 @@ channels: dependencies: - biosimspace - git + - filelock - loguru - numba + - pycuda + - pip: + - git+https://github.com/openbiosim/ghostly + - git+https://github.com/openbiosim/loch diff --git a/environment_macos.yaml b/environment_macos.yaml new file mode 100644 index 00000000..750a5f24 --- /dev/null +++ b/environment_macos.yaml @@ -0,0 +1,14 @@ +name: somd2 + +channels: + - conda-forge + - openbiosim/label/dev + +dependencies: + - biosimspace + - git + - filelock + - loguru + - numba + - pip: + - git+https://github.com/openbiosim/ghostly diff --git a/pyproject.toml b/pyproject.toml index 8e147696..44249a66 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,7 @@ dynamic = ["version"] license-files = ["LICENSE"] [project.scripts] -somd2 = "somd2.app:cli" +somd2 = "somd2.app:somd2" [project.urls] repository = "https://github.com/OpenBioSim/somd2" diff --git a/src/somd2/_utils/_ghosts.py b/src/somd2/_utils/_ghosts.py deleted file mode 100644 index dc40391f..00000000 --- a/src/somd2/_utils/_ghosts.py +++ /dev/null @@ -1,1131 +0,0 @@ -###################################################################### -# SOMD2: GPU accelerated alchemical free-energy engine. -# -# Copyright: 2023-2025 -# -# Authors: The OpenBioSim Team -# -# SOMD2 is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# SOMD2 is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with SOMD2. If not, see . -##################################################################### - -__all__ = ["boresch"] - -from sire.system import System as _System -from sire.legacy.System import System as _LegacySystem - -import sire.legacy.MM as _SireMM -import sire.legacy.Mol as _SireMol -import sire.morph as _morph - -from somd2 import _logger - -from . import _is_ghost -from . import _lam_sym - - -def boresch(system, k_hard=100, k_soft=5, optimise_angles=True, num_optimise=10): - """ - Apply Boresch modifications to ghost atom bonded terms to avoid non-physical - coupling between the ghost atoms and the physical region. - - Parameters - ---------- - - system : sire.system.System, sire.legacy.System.System - The system containing the molecules to be perturbed. - - k_hard : float, optional - The force constant to use to when setting angle terms involving ghost - atoms to 90 degrees to avoid flapping. (In kcal/mol/rad^2) - - k_soft : float, optional - The force constant to use when setting angle terms involving ghost atoms - for non-planar triple junctions. (In kcal/mol/rad^2) - - optimise_angles : bool, optional - Whether to optimise the equilibrium value of the angle terms involving - ghost atoms for non-planar triple junctions. - - num_optimise : int, optional - The number of repeats to average over when optimising the equilibrium - value of the angle terms involving ghost atoms for non-planar triple - junctions. - - Returns - ------- - - system : sire.system.System - The updated system. - - Notes - ----- - - For technical details, please refer to the original publication: - https://pubs.acs.org/doi/10.1021/acs.jctc.0c01328 - """ - - _logger.info(f"Applying Boresch modifications to ghost atom bonded terms") - - # Check the system is a Sire system. - if not isinstance(system, (_System, _LegacySystem)): - raise TypeError( - "'system' must of type 'sire.system.System' or 'sire.legacy.System.System'" - ) - - # Extract the legacy system. - if isinstance(system, _LegacySystem): - system = _System(system) - - # Clone the system. - system = system.clone() - - # Search for perturbable molecules. - try: - pert_mols = system.molecules("property is_perturbable") - except KeyError: - raise KeyError("No perturbable molecules in the system") - - for mol in pert_mols: - # Store the molecule info. - info = mol.info() - - # Generate the end state connectivity objects. - connectivity0 = _create_connectivity(_morph.link_to_reference(mol)) - connectivity1 = _create_connectivity(_morph.link_to_perturbed(mol)) - - # Find the indices of the ghost atoms at each end state. - ghosts0 = [ - _SireMol.AtomIdx(i) - for i, x in enumerate( - _is_ghost(mol, [_SireMol.AtomIdx(i) for i in range(mol.num_atoms())]) - ) - if x - ] - ghosts1 = [ - _SireMol.AtomIdx(i) - for i, x in enumerate( - _is_ghost( - mol, - [_SireMol.AtomIdx(i) for i in range(mol.num_atoms())], - is_lambda1=True, - ) - ) - if x - ] - - # Work out the physical bridge atoms at lambda = 0. These are the atoms - # that connect ghost atoms to the physical region. - bridges0 = {} - for ghost in ghosts0: - for c in connectivity0.connections_to(ghost): - if not _is_ghost(mol, [c])[0]: - if c not in bridges0: - bridges0[c] = [ghost] - else: - bridges0[c].append(ghost) - # Work out the indices of the other physical atoms that are connected to - # the bridge atoms, sorted by the atom index. - physical0 = {} - for b in bridges0: - physical0[b] = [] - for c in connectivity0.connections_to(b): - if not _is_ghost(mol, [c])[0]: - physical0[b].append(c) - for b in physical0: - physical0[b].sort(key=lambda x: x.value()) - - # Repeat the above for lambda = 1. - bridges1 = {} - for ghost in ghosts1: - for c in connectivity1.connections_to(ghost): - if not _is_ghost(mol, [c], is_lambda1=True)[0]: - if c not in bridges1: - bridges1[c] = [ghost] - else: - bridges1[c].append(ghost) - physical1 = {} - for b in bridges1: - physical1[b] = [] - for c in connectivity1.connections_to(b): - if not _is_ghost(mol, [c], is_lambda1=True)[0]: - physical1[b].append(c) - for b in physical1: - physical1[b].sort(key=lambda x: x.value()) - - # Log the results for each end state. - - if len(bridges0) > 0: - _logger.debug("Ghost atom bridges at lambda = 0") - for i, b in enumerate(bridges0): - _logger.debug(f" Bridge {i}: {b.value()}") - _logger.debug( - f" ghosts: [{','.join([str(x.value()) for x in bridges0[b]])}]" - ) - _logger.debug( - f" physical: [{','.join([str(x.value()) for x in physical0[b]])}]" - ) - _logger.debug(f" type: {len(physical0[b])}") - - if len(bridges1) > 0: - _logger.debug("Ghost atom bridges at lambda = 1") - for i, b in enumerate(bridges1): - _logger.debug(f" Bridge {i}: {b.value()}") - _logger.debug( - f" ghosts: [{','.join([str(x.value()) for x in bridges1[b]])}]" - ) - _logger.debug( - f" physical: [{','.join([str(x.value()) for x in physical1[b]])}]" - ) - _logger.debug(f" type: {len(physical1[b])}") - - # Now process the bridges. - - # First lambda = 0. - for b in bridges0: - # Determine the type of junction. - junction = len(physical0[b]) - - # Terminal junction. - if junction == 1: - mol = _terminal(mol, b, bridges0[b], physical0[b]) - - # Dual junction. - elif junction == 2: - mol = _dual(mol, b, bridges0[b], physical0[b], k_hard=k_hard) - - # Triple junction. - elif junction == 3: - mol = _triple( - mol, - b, - bridges0[b], - physical0[b], - k_hard=k_hard, - k_soft=k_soft, - optimise_angles=optimise_angles, - num_optimise=num_optimise, - ) - - # Higher order junction. - else: - mol = _higher( - mol, - b, - bridges0[b], - physical0[b], - k_hard=k_hard, - k_soft=k_soft, - optimise_angles=optimise_angles, - num_optimise=num_optimise, - ) - - # Now lambda = 1. - for b in bridges1: - junction = len(physical1[b]) - - if junction == 1: - mol = _terminal(mol, b, bridges1[b], physical1[b], is_lambda1=True) - - elif junction == 2: - mol = _dual( - mol, b, bridges1[b], physical1[b], k_hard=k_hard, is_lambda1=True - ) - - elif junction == 3: - mol = _triple( - mol, - b, - bridges1[b], - physical1[b], - k_hard=k_hard, - k_soft=k_soft, - optimise_angles=optimise_angles, - num_optimise=num_optimise, - is_lambda1=True, - ) - - # Higher order junction. - else: - mol = _higher( - mol, - b, - bridges1[b], - physical1[b], - k_hard=k_hard, - k_soft=k_soft, - optimise_angles=optimise_angles, - num_optimise=num_optimise, - is_lambda1=True, - ) - - # Update the molecule in the system. - system.update(mol) - - # Return the updated system. - return system - - -def _terminal(mol, bridge, ghosts, physical, is_lambda1=False): - r""" - Apply Boresch modifications to a terminal junction. - - An example terminal junction with three ghost branches. Here X is the - physical bridge atom. - - DR1 - / - / - R---X---DR2 - \ - \ - DR3 - - Parameters - ---------- - - mol : sire.mol.Molecule - The perturbable molecule. - - bridge : sire.legacy.Mol.AtomIdx - The physical bridge atom. - - ghosts : List[sire.legacy.Mol.AtomIdx] - The list of ghost atoms connected to the bridge atom. - - physical : List[sire.legacy.Mol.AtomIdx] - The list of physical atoms connected to the bridge atom. - - is_lambda1 : bool, optional - Whether the junction is at lambda = 1. - - Returns - ------- - - mol : sire.mol.Molecule - The updated molecule. - """ - - _logger.debug( - f"Applying Boresch modifications to terminal ghost junction at " - f"{_lam_sym} = {int(is_lambda1)}:" - ) - - # Store the molecular info. - info = mol.info() - - # Get the end state connectivity property. - if is_lambda1: - connectivity = _create_connectivity(_morph.link_to_perturbed(mol)) - else: - connectivity = _create_connectivity(_morph.link_to_reference(mol)) - - # First, we need to work out the physical atoms two atoms away from the - # bridge atom. - physical2 = [] - # Loop over the physical atoms connected to the bridge atom. - for p in physical: - # Loop over the atoms connected to the physical atom. - for c in connectivity.connections_to(p): - # If the atom is not a ghost atom or the bridge atom itself, we have - # found a physical atom two atoms away from the bridge atom. - if not _is_ghost(mol, [c], is_lambda1)[0] and c != bridge: - if c not in physical2: - physical2.append(c) - - # Sort based on the atom indices. - physical2.sort(key=lambda x: x.value()) - - # Get the end state dihedral functions. - prop = "dihedral0" if not is_lambda1 else "dihedral1" - dihedrals = mol.property(prop) - - # Initialise a container to store the updated dihedrals. - new_dihedrals = _SireMM.FourAtomFunctions(mol.info()) - - # Remove all dihedral terms for all but one of the physical atoms two atoms - # from the physical bridge atom. - physical2.pop(0) - for p in dihedrals.potentials(): - idx0 = info.atom_idx(p.atom0()) - idx1 = info.atom_idx(p.atom1()) - idx2 = info.atom_idx(p.atom2()) - idx3 = info.atom_idx(p.atom3()) - if (idx0 in physical2 and idx3 in ghosts) or ( - idx3 in physical2 and idx0 in ghosts - ): - _logger.debug( - f" Removing dihedral: [{idx0.value()}-{idx1.value()}-{idx2.value()}-{idx3.value()}], {p.function()}" - ) - else: - new_dihedrals.set(idx0, idx1, idx2, idx3, p.function()) - - # Set the updated dihedrals. - mol = mol.edit().set_property(prop, new_dihedrals).molecule().commit() - - # Return the updated molecule. - return mol - - -def _dual(mol, bridge, ghosts, physical, k_hard=100, is_lambda1=False): - r""" - Apply Boresch modifications to a dual junction. - - An example dual junction with two ghost branches. Here X is the physical - bridge atom. - - R1 DR1 - \ / - \ / - X - / \ - / \ - R2 DR2 - - Parameters - ---------- - - mol : sire.mol.Molecule - The perturbable molecule. - - bridge : sire.legacy.Mol.AtomIdx - The physical bridge atom. - - ghosts : List[sire.legacy.Mol.AtomIdx] - The list of ghost atoms connected to the bridge atom. - - physical : List[sire.legacy.Mol.AtomIdx] - The list of physical atoms connected to the bridge atom. - - k_hard : float, optional - The force constant to use when setting angle terms involving ghost - atoms to 90 degrees to avoid flapping. (In kcal/mol/rad^2) - - is_lambda1 : bool, optional - Whether the junction is at lambda = 1. - - Returns - ------- - - mol : sire.mol.Molecule - The updated molecule. - """ - - _logger.debug( - f"Applying Boresch modifications to dual ghost junction at " - f"{_lam_sym} = {int(is_lambda1)}:" - ) - - # Store the molecular info. - info = mol.info() - - # Property suffix based on the end state. - suffix = "0" if not is_lambda1 else "1" - - # Get the end state connectivity property. - try: - connectivity = mol.property("connectivity" + suffix) - except: - connectivity = mol.property("connectivity") - - # Single branch. - if len(ghosts) == 1: - _logger.debug(" Single branch:") - - # First remove all dihedrals starting from the ghost atom and ending in - # physical system. - - # Get the end state bond functions. - angles = mol.property("angle" + suffix) - dihedrals = mol.property("dihedral" + suffix) - - # Initialise a container to store the updated bonded terms. - new_dihedrals = _SireMM.FourAtomFunctions(mol.info()) - - # Dihedrals. - for p in dihedrals.potentials(): - idx0 = info.atom_idx(p.atom0()) - idx1 = info.atom_idx(p.atom1()) - idx2 = info.atom_idx(p.atom2()) - idx3 = info.atom_idx(p.atom3()) - - # Dihedral terminates at the ghost bridge. - if ( - not _is_ghost(mol, [idx0], is_lambda1)[0] - and idx3 in ghosts - or not _is_ghost(mol, [idx3], is_lambda1)[0] - and idx0 in ghosts - ): - _logger.debug( - f" Removing dihedral: [{idx0.value()}-{idx1.value()}-{idx2.value()}-{idx3.value()}], {p.function()}" - ) - # Dihedral terminates at the second physical atom. - elif (_is_ghost(mol, [idx0], is_lambda1)[0] and idx3 == physical[1]) or ( - _is_ghost(mol, [idx3], is_lambda1)[0] and idx0 == physical[1] - ): - _logger.debug( - f" Removing dihedral: [{idx0.value()}-{idx1.value()}-{idx2.value()}-{idx3.value()}], {p.function()}" - ) - else: - new_dihedrals.set(idx0, idx1, idx2, idx3, p.function()) - - # Next we modify the angle terms between the physical and - # ghost atom so that the equilibrium angle is 90 degrees. - new_angles = _SireMM.ThreeAtomFunctions(mol.info()) - for p in angles.potentials(): - idx0 = info.atom_idx(p.atom0()) - idx1 = info.atom_idx(p.atom1()) - idx2 = info.atom_idx(p.atom2()) - - if ( - idx0 in ghosts - and idx2 in physical - or idx0 in physical - and idx2 in ghosts - ): - from math import pi - from sire.legacy.CAS import Symbol - - theta0 = pi / 2.0 - - # Create the new angle function. - amber_angle = _SireMM.AmberAngle(k_hard, theta0) - - # Generate the new angle expression. - expression = amber_angle.to_expression(Symbol("theta")) - - # Set the equilibrium angle to 90 degrees. - new_angles.set(idx0, idx1, idx2, expression) - - _logger.debug( - f" Stiffening angle: [{idx0.value()}-{idx1.value()}-{idx2.value()}], " - f"{p.function()} --> {expression}" - ) - - else: - new_angles.set(idx0, idx1, idx2, p.function()) - - # Update the molecule. - mol = mol.edit().set_property("angle" + suffix, new_angles).molecule().commit() - mol = ( - mol.edit() - .set_property("dihedral" + suffix, new_dihedrals) - .molecule() - .commit() - ) - - # Dual branch. - else: - _logger.debug(" Dual branch:") - - # First, delete all bonded terms between atoms in two ghost branches. - - # Get the end state bond functions. - angles = mol.property("angle" + suffix) - dihedrals = mol.property("dihedral" + suffix) - - # Initialise containers to store the updated bonded terms. - new_angles = _SireMM.ThreeAtomFunctions(mol.info()) - new_dihedrals = _SireMM.FourAtomFunctions(mol.info()) - - # Angles. - for p in angles.potentials(): - idx0 = info.atom_idx(p.atom0()) - idx1 = info.atom_idx(p.atom1()) - idx2 = info.atom_idx(p.atom2()) - - if idx0 in ghosts and idx2 in ghosts: - _logger.debug( - f" Removing angle: [{idx0.value()}-{idx1.value()}-{idx2.value()}], {p.function()}" - ) - - else: - new_angles.set(idx0, idx1, idx2, p.function()) - - # Dihedrals. - for p in dihedrals.potentials(): - idx0 = info.atom_idx(p.atom0()) - idx1 = info.atom_idx(p.atom1()) - idx2 = info.atom_idx(p.atom2()) - idx3 = info.atom_idx(p.atom3()) - - # Work out the number of ghosts in the dihedral. - num_ghosts = len([idx for idx in [idx0, idx1, idx2, idx3] if idx in ghosts]) - - # If there is more than one ghost, then this dihedral must bridge the - # ghost branches. - if num_ghosts > 1: - _logger.debug( - f" Removing dihedral: [{idx0.value()}-{idx1.value()}-{idx2.value()}-{idx3.value()}], {p.function()}" - ) - else: - new_dihedrals.set(idx0, idx1, idx2, idx3, p.function()) - - # Set the updated bonded terms. - mol = mol.edit().set_property("angle" + suffix, new_angles).molecule().commit() - mol = ( - mol.edit() - .set_property("dihedral" + suffix, new_dihedrals) - .molecule() - .commit() - ) - - # Now treat the ghost branches individually. - for d in ghosts: - mol = _dual( - mol, bridge, [d], physical, k_hard=k_hard, is_lambda1=is_lambda1 - ) - - # Return the updated molecule. - return mol - - -def _triple( - mol, - bridge, - ghosts, - physical, - k_hard=100, - k_soft=5, - optimise_angles=True, - num_optimise=10, - is_lambda1=False, -): - r""" - Apply Boresch modifications to a triple junction. - - An example triple junction. Here X is the physical bridge atom. - - R1 - \ - \ - R2---X---DR - / - / - R3 - - Parameters - ---------- - - mol : sire.mol.Molecule - The perturbable molecule. - - bridge : sire.legacy.Mol.AtomIdx - The physical bridge atom. - - ghosts : List[sire.legacy.Mol.AtomIdx] - The list of ghost atoms connected to the bridge atom. - - physical : List[sire.legacy.Mol.AtomIdx] - The list of physical atoms connected to the bridge atom. - - k_hard : float, optional - The force constant to use when setting angle terms involving ghost - atoms to 90 degrees to avoid flapping. (In kcal/mol/rad^2) - - k_soft : float, optional - The force constant to use when setting angle terms involving ghost - atoms for non-planar triple junctions. (In kcal/mol/rad^2) - - optimise_angles : bool, optional - Whether to optimise the equilibrium value of the angle terms involving - ghost atoms for non-planar triple junctions. - - num_optimise : int, optional - The number of repeats to use when optimising the angle terms involving - ghost atoms for non-planar triple junctions. - - is_lambda1 : bool, optional - Whether the junction is at lambda = 1. - - Returns - ------- - - mol : sire.mol.Molecule - The updated molecule. - """ - - _logger.debug( - f"Applying Boresch modifications to triple ghost junction at " - f"{_lam_sym} = {int(is_lambda1)}:" - ) - - # Store the molecular info. - info = mol.info() - - # Property suffix based on the end state. - suffix = "0" if not is_lambda1 else "1" - - # Get the end state connectivity property. - try: - connectivity = mol.property("connectivity" + suffix) - except: - connectivity = mol.property("connectivity") - - # Store the element of the bridge atom. - element = mol.atom(bridge).property("element" + suffix) - - # Planar junction. - if element == _SireMol.Element("C"): - _logger.debug(" Planar junction.") - - # First remove all bonded terms between one of the physical atoms - # and the ghost group. - - # Store the index of the first physical atom. - idx = physical[0] - - # Get the end state bond functions. - angles = mol.property("angle" + suffix) - dihedrals = mol.property("dihedral" + suffix) - - # Initialise a container to store the updated bonded terms. - new_angles = _SireMM.ThreeAtomFunctions(mol.info()) - new_dihedrals = _SireMM.FourAtomFunctions(mol.info()) - - # Angles. - for p in angles.potentials(): - idx0 = info.atom_idx(p.atom0()) - idx1 = info.atom_idx(p.atom1()) - idx2 = info.atom_idx(p.atom2()) - idxs = [idx0, idx1, idx2] - - if idx in idxs and any([x in ghosts for x in idxs]): - _logger.debug( - f" Removing angle: [{idx0.value()}-{idx1.value()}-{idx2.value()}], {p.function()}" - ) - - else: - new_angles.set(idx0, idx1, idx2, p.function()) - - # Dihedrals. - for p in dihedrals.potentials(): - idx0 = info.atom_idx(p.atom0()) - idx1 = info.atom_idx(p.atom1()) - idx2 = info.atom_idx(p.atom2()) - idx3 = info.atom_idx(p.atom3()) - idxs = [idx0, idx1, idx2, idx3] - - if idx in idxs and any([x in ghosts for x in idxs]): - _logger.debug( - f" Removing dihedral: [{idx0.value()}-{idx1.value()}-{idx2.value()}-{idx3.value()}], {p.function()}" - ) - - else: - new_dihedrals.set(idx0, idx1, idx2, idx3, p.function()) - - # Next we modify the angle terms between the remaining physical and - # ghost atoms so that the equilibrium angle is 90 degrees. - new_new_angles = _SireMM.ThreeAtomFunctions(mol.info()) - for p in new_angles.potentials(): - idx0 = info.atom_idx(p.atom0()) - idx1 = info.atom_idx(p.atom1()) - idx2 = info.atom_idx(p.atom2()) - - if ( - idx0 in ghosts - and idx2 in physical[1:] - or idx0 in physical[1:] - and idx2 in ghosts - ): - from math import pi - from sire.legacy.CAS import Symbol - - theta0 = pi / 2.0 - - # Create the new angle function. - amber_angle = _SireMM.AmberAngle(k_hard, theta0) - - # Generate the new angle expression. - expression = amber_angle.to_expression(Symbol("theta")) - - # Set the equilibrium angle to 90 degrees. - new_new_angles.set(idx0, idx1, idx2, expression) - - _logger.debug( - f" Stiffening angle: [{idx0.value()}-{idx1.value()}-{idx2.value()}], " - f"{p.function()} --> {expression}" - ) - - else: - new_new_angles.set(idx0, idx1, idx2, p.function()) - - # Update the molecule. - mol = ( - mol.edit() - .set_property("angle" + suffix, new_new_angles) - .molecule() - .commit() - ) - mol = ( - mol.edit() - .set_property("dihedral" + suffix, new_dihedrals) - .molecule() - .commit() - ) - - # Non-planar junction. - elif element == _SireMol.Element("N"): - _logger.debug(" Non-planar junction.") - - # First, modify the force constants of the angle terms between the ghost - # atoms and the physical system to be very low. - - # Get the end state angle functions. - angles = mol.property("angle" + suffix) - - # Initialise a container to store the updated angle functions. - new_angles = _SireMM.ThreeAtomFunctions(mol.info()) - - # Indices for the softened angle terms. - angle_idxs = [] - - for p in angles.potentials(): - idx0 = info.atom_idx(p.atom0()) - idx1 = info.atom_idx(p.atom1()) - idx2 = info.atom_idx(p.atom2()) - - if ( - idx0 in ghosts - and idx2 in physical - or idx2 in ghosts - and idx0 in physical - ): - from sire.legacy.CAS import Symbol - - theta = Symbol("theta") - - # Cast the angle to an Amber angle to get the expression. - amber_angle = _SireMM.AmberAngle(p.function(), theta) - - # Create a new Amber angle with a modified force constant. - - # We'll optimise the equilibrium angle for the softened angle term. - if optimise_angles: - amber_angle = _SireMM.AmberAngle(0.05, amber_angle.theta0()) - angle_idxs.append((idx0, idx1, idx2)) - # Use the existing equilibrium angle. - else: - amber_angle = _SireMM.AmberAngle(k_soft, amber_angle.theta0()) - - # Generate the new angle expression. - expression = amber_angle.to_expression(theta) - - # Set the force constant to a very low value. - new_angles.set(idx0, idx1, idx2, expression) - - _logger.debug( - f" Softening angle: [{idx0.value()}-{idx1.value()}-{idx2.value()}], " - f"{p.function()} --> {expression}" - ) - - else: - new_angles.set(idx0, idx1, idx2, p.function()) - - # Next, remove all dihedral starting from the ghost atoms and ending in - # the physical system. Also, only preserve dihedrals terminating at one - # of the physical atoms. - - # Get the end state dihedral functions. - dihedrals = mol.property("dihedral" + suffix) - - # Initialise containers to store the updated dihedral functions. - new_dihedrals = _SireMM.FourAtomFunctions(mol.info()) - - for p in dihedrals.potentials(): - idx0 = info.atom_idx(p.atom0()) - idx1 = info.atom_idx(p.atom1()) - idx2 = info.atom_idx(p.atom2()) - idx3 = info.atom_idx(p.atom3()) - idxs = [idx0, idx1, idx2, idx3] - - # If there is one ghost atom, then this dihedral must begin or terminate - # at the ghost atom. - num_ghosts = len([x for x in idxs if x in ghosts]) - if num_ghosts == 1: - _logger.debug( - f" Removing dihedral: [{idx0.value()}-{idx1.value()}-{idx2.value()}-{idx3.value()}], {p.function()}" - ) - # Remove the dihedral if includes a ghost and doesn't terminate at the first - # physical atom. - elif (_is_ghost(mol, [idx0], is_lambda1)[0] and idx3 in physical[1:]) or ( - _is_ghost(mol, [idx3], is_lambda1)[0] and idx0 in physical[1:] - ): - _logger.debug( - f" Removing dihedral: [{idx0.value()}-{idx1.value()}-{idx2.value()}-{idx3.value()}], {p.function()}" - ) - else: - new_dihedrals.set(idx0, idx1, idx2, idx3, p.function()) - - # Update the molecule. - mol = mol.edit().set_property("angle" + suffix, new_angles).molecule().commit() - mol = ( - mol.edit() - .set_property("dihedral" + suffix, new_dihedrals) - .molecule() - .commit() - ) - - # Optimise the equilibrium value of theta0 for the softened angle terms. - if optimise_angles: - _logger.debug(" Optimising equilibrium values for softened angles.") - - import sire.morph as _morph - from sire.units import radian as _radian - - # Initialise the equilibrium angle values. - theta0s = {} - for idx in angle_idxs: - theta0s[idx] = [] - - # Perform multiple minimisations to get an average for the theta0 values. - for _ in range(num_optimise): - # Minimise the molecule. - min_mol = _morph.link_to_reference(mol) - minimiser = min_mol.minimisation( - lambda_value=1.0 if is_lambda1 else 0.0, - constraint="none", - platform="cpu", - ) - minimiser.run() - - # Commit the changes. - min_mol = minimiser.commit() - - # Get the equilibrium angle values. - for idx in angle_idxs: - try: - theta0s[idx].append(min_mol.angles(*idx).sizes()[0].to(_radian)) - except: - raise ValueError(f"Could not find optimised angle term: {idx}") - - # Compute the mean and standard error. - import numpy as _np - - theta0_means = {} - theta0_stds = {} - for idx in theta0s: - theta0_means[idx] = _np.mean(theta0s[idx]) - theta0_stds[idx] = _np.std(theta0s[idx]) / _np.sqrt(len(theta0s[idx])) - - # Get the existing angles. - angles = mol.property("angle" + suffix) - - # Initialise a container to store the updated angle functions. - new_angles = _SireMM.ThreeAtomFunctions(mol.info()) - - # Update the angle potentials. - for p in angles.potentials(): - idx0 = info.atom_idx(p.atom0()) - idx1 = info.atom_idx(p.atom1()) - idx2 = info.atom_idx(p.atom2()) - idx = (idx0, idx1, idx2) - - # This is the softened angle term. - if idx in angle_idxs: - # Get the optimised equilibrium angle. - theta0 = theta0_means[idx] - std = theta0_stds[idx] - - # Create the new angle function. - amber_angle = _SireMM.AmberAngle(k_soft, theta0) - - # Generate the new angle expression. - expression = amber_angle.to_expression(Symbol("theta")) - - # Set the equilibrium angle to 90 degrees. - new_angles.set(idx0, idx1, idx2, expression) - - _logger.debug( - f" Optimising angle: [{idx0.value()}-{idx1.value()}-{idx2.value()}], " - f"{p.function()} --> {expression} (std err: {std:.3f} radian)" - ) - - else: - new_angles.set(idx0, idx1, idx2, p.function()) - - # Update the molecule. - mol = ( - mol.edit() - .set_property("angle" + suffix, new_angles) - .molecule() - .commit() - ) - - # Return the updated molecule. - return mol - - -def _higher( - mol, - bridge, - ghosts, - physical, - k_hard=100, - k_soft=5, - optimise_angles=True, - num_optimise=10, - is_lambda1=False, -): - r""" - Apply Boresch modifications to higher order junctions. - - Parameters - ---------- - - mol : sire.mol.Molecule - The perturbable molecule. - - bridge : sire.legacy.Mol.AtomIdx - The physical bridge atom. - - ghosts : List[sire.legacy.Mol.AtomIdx] - The list of ghost atoms connected to the bridge atom. - - physical : List[sire.legacy.Mol.AtomIdx] - The list of physical atoms connected to the bridge atom. - - k_hard : float, optional - The force constant to use when setting angle terms involving ghost - atoms to 90 degrees to avoid flapping. (In kcal/mol/rad^2) - - k_soft : float, optional - The force constant to use when setting angle terms involving ghost - atoms for non-planar triple junctions. (In kcal/mol/rad^2) - - optimise_angles : bool, optional - Whether to optimise the equilibrium value of the angle terms involving - ghost atoms for non-planar triple junctions. - - num_optimise : int, optional - The number of repeats to use when optimising the angle terms involving - ghost atoms for non-planar triple junctions. - - is_lambda1 : bool, optional - Whether the junction is at lambda = 1. - - Returns - ------- - - mol : sire.mol.Molecule - The updated molecule. - """ - - _logger.debug( - f"Applying Boresch modifications to higher order junction at " - f"{_lam_sym} = {int(is_lambda1)}:" - ) - - # Store the molecular info. - info = mol.info() - - # Property suffix based on the end state. - suffix = "0" if not is_lambda1 else "1" - - # Get the end state connectivity property. - try: - connectivity = mol.property("connectivity" + suffix) - except: - connectivity = mol.property("connectivity") - - # Now remove all bonded interactions between the ghost atoms and one of the - # physical atoms connected to the bridge atom, hence reducing the problem to - # that of a triple junction. - while len(physical) > 3: - # Pop the first physical atom index from the list. - idx = physical.pop(0) - - # Get the end state bond functions. - angles = mol.property("angle" + suffix) - dihedrals = mol.property("dihedral" + suffix) - - # Initialise containers to store the updated bonded terms. - new_angles = _SireMM.ThreeAtomFunctions(mol.info()) - new_dihedrals = _SireMM.FourAtomFunctions(mol.info()) - - # Angles. - for p in angles.potentials(): - idx0 = info.atom_idx(p.atom0()) - idx1 = info.atom_idx(p.atom1()) - idx2 = info.atom_idx(p.atom2()) - - if idx == idx0 and idx2 in ghosts or idx == idx2 and idx0 in ghosts: - _logger.debug( - f" Removing angle: [{idx0.value()}-{idx1.value()}-{idx2.value()}], {p.function()}" - ) - else: - new_angles.set(idx0, idx1, idx2, p.function()) - - # Dihedrals. - for p in dihedrals.potentials(): - idx0 = info.atom_idx(p.atom0()) - idx1 = info.atom_idx(p.atom1()) - idx2 = info.atom_idx(p.atom2()) - idx3 = info.atom_idx(p.atom3()) - idxs = [idx0, idx1, idx2, idx3] - - if idx in idxs and any([x in ghosts for x in idxs]): - _logger.debug( - f" Removing dihedral: [{idx0.value()}-{idx1.value()}-{idx2.value()}-{idx3.value()}], {p.function()}" - ) - else: - new_dihedrals.set(idx0, idx1, idx2, idx3, p.function()) - - # Update the molecule. - mol = mol.edit().set_property("angle" + suffix, new_angles).molecule().commit() - mol = ( - mol.edit() - .set_property("dihedral" + suffix, new_dihedrals) - .molecule() - .commit() - ) - - # Now treat the triple junction. - return _triple( - mol, - bridge, - ghosts, - physical, - k_hard=k_hard, - k_soft=k_soft, - optimise_angles=optimise_angles, - num_optimise=num_optimise, - is_lambda1=is_lambda1, - ) - - -def _create_connectivity(mol): - """ - Create a connectivity object for an end state molecule. - - Parameters - ---------- - - mol : sire.mol.Molecule - The molecule at the end state. - - Returns - - connectivity : sire.legacy.Mol.Connectivity - The connectivity object. - """ - - # Create an editable connectivity object. - connectivity = _SireMol.Connectivity(mol.info()).edit() - - # Loop over the bonds in the molecule and connect the atoms. - for bond in mol.bonds(): - connectivity.connect(bond.atom0().index(), bond.atom1().index()) - - # Commit the changes and return the connectivity object. - return connectivity.commit() diff --git a/src/somd2/app/__init__.py b/src/somd2/app/__init__.py index 45c2fbc2..5bbf9e5a 100644 --- a/src/somd2/app/__init__.py +++ b/src/somd2/app/__init__.py @@ -28,7 +28,8 @@ .. autosummary:: :toctree: generated/ - cli + somd2 + ghostly """ -from .run import * +from ._cli import * diff --git a/src/somd2/app/run.py b/src/somd2/app/_cli.py similarity index 94% rename from src/somd2/app/run.py rename to src/somd2/app/_cli.py index be63c782..5f74c846 100644 --- a/src/somd2/app/run.py +++ b/src/somd2/app/_cli.py @@ -20,17 +20,13 @@ ##################################################################### """ -The somd2 command line program. - -Usage: - To get the help for this program and list all of the - arguments (with defaults) use: - - somd2 --help +SOMD2 command line interface. """ +__all__ = ["somd2"] + -def cli(): +def somd2(): """ SOMD2: Command line interface. """ diff --git a/src/somd2/config/_config.py b/src/somd2/config/_config.py index 0978ee82..7cf88873 100644 --- a/src/somd2/config/_config.py +++ b/src/somd2/config/_config.py @@ -87,6 +87,7 @@ def __init__( timestep="4 fs", temperature="300 K", pressure="1 atm", + surface_tension=None, barostat_frequency=25, integrator="langevin_middle", cutoff_type="pme", @@ -101,7 +102,7 @@ def __init__( swap_end_states=False, coulomb_power=0.0, shift_coulomb="1 A", - shift_delta="2.25 A", + shift_delta="1.5 A", restraints=None, constraint="h_bonds", perturbable_constraint="h_bonds_not_heavy_perturbed", @@ -116,9 +117,10 @@ def __init__( equilibration_constraints=False, energy_frequency="1 ps", save_trajectories=True, - frame_frequency="20 ps", + frame_frequency="100 ps", save_velocities=False, checkpoint_frequency="100 ps", + num_checkpoint_workers=None, num_energy_neighbours=None, null_energy="10000 kcal/mol", platform="auto", @@ -126,15 +128,26 @@ def __init__( max_gpus=None, oversubscription_factor=1, replica_exchange=False, + perturbed_system=None, + gcmc=False, + gcmc_selection=None, + gcmc_excess_chemical_potential="-6.09 kcal/mol", + gcmc_standard_volume="30.543 A^3", + gcmc_num_waters=20, + gcmc_radius="4 A", + gcmc_bulk_sampling_probability=0.1, + gcmc_tolerance=0.0, rest2_scale=1.0, rest2_selection=None, output_directory="output", restart=False, + use_backup=False, write_config=True, overwrite=False, somd1_compatibility=False, pert_file=None, save_energy_components=False, + page_size=None, timeout="300 s", ): """ @@ -156,6 +169,9 @@ def __init__( Simulation pressure. (Simulations will run in the NVT ensemble unless a pressure is specified.) + surface_tension: str + Surface tension to use for NPT simulations with a membrane barostat. + barostat_frequency: int The number of integration steps between barostat updates. @@ -196,9 +212,9 @@ def __init__( Factor by which to scale charges for charge scaled morph. swap_end_states: bool - Whether to perform the perturbation in the reverse direction. + Whether to swap the end states of the alchemical system. - couloumb_power : float + coulomb_power : float Power to use for the soft-core Coulomb interaction. This is used to soften the electrostatic interaction. @@ -266,6 +282,9 @@ def __init__( energy_frequency: str Frequency at which to output energy data. If running using 'replica_exchange', then this will also be the frequency at which replica swaps are attempted. + When performing Grand Canonical Monte Carlo (GCMC) water insertions/deletions + via 'gcmc=True', this will also be the frequency at which GCMC moves are + attempted. save_trajectories: bool Whether to save trajectory files @@ -281,6 +300,14 @@ def __init__( min(energy_frequency, frame_frequency). If zero, then no checkpointing will be performed. + num_checkpoint_workers: int + The number of parallel workers to use when checkpointing during a replica + exchange simulation. By default, this is set to the number of concurrent + GPU contexts, i.e. the number of GPUs multiplied by the oversubscription + factor. The option can be used to reduce the number of workers, which + can be useful when the system size is large, i.e. when many large + trajectory files could be written simultaneously. + platform: str Platform to run simulation on. @@ -293,12 +320,52 @@ def __init__( Does nothing if platform is set to CPU. oversubscription_factor: int - Factor by which to oversubscribe jobs on GPUs during replica exchange simulations. + The number of OpenMM contexts that can be run on a single GPU at the same time. replica_exchange: bool Whether to run replica exchange simulation. Currently this can only be used when GPU resources are available. + perturbed_system: str + The path to a stream file containing a Sire system for the equilibrated perturbed + end state (lambda = 1). This will be used as the starting conformation all lambda + windows > 0.5 when performing a replica exchange simulation. + + gcmc: bool + Whether to perform Grand Canonical Monte Carlo (GCMC) water insertions/deletions. + + gcmc_selection: str + A sire sslection string specifying the atoms that define the centre of geometry + of the GCMC sphere. If None, then GCMC moves will be attempted within the entire + simulation volume. + + gcmc_excess_chemical_potential: str + The excess chemical potential of water in kcal/mol. The default value is calibrated + for the TIP3P water model. This can be calculated from the free energy of decoupling + a single water molecule from bulk. + + gcmc_standard_volume: str + The standard volume of a water molecule in A^3. The default value is calibrated + from NPT simulation of TIP3P water. + + gcmc_num_waters: int + The additional number of ghost water molecules to add to the system. These are + used as placeholders for GCMC insertion moves. + + gcmc_radius: str + The radius of the GCMC sphere. + + gcmc_bulk_sampling_probability: float + The probability of performing bulk GCMC moves, i.e. within the entire simulation + box rather than the GCMC sphere. These can be used to maintain a constant bulk + density, i.e. acting as a barostat. (This option has no affect when + 'gcmc_selection=None'.) + + gcmc_tolerance: float + The tolerance for the GCMC acceptance probability, i.e. the minimum probability + of acceptance for a move. This can be used to exclude low probability candidates + that can cause instabilities or crashes for the MD engine. + rest2_scale: float, list(float) The scaling factor for Replica Exchange with Solute Tempering (REST) simulations. This is the factor by which the temperature of the solute is scaled with respect to @@ -324,6 +391,12 @@ def __init__( restart: bool Whether to restart from a previous simulation using files found in 'output-directory'. + use_backup: bool + Whether to use backup files when restarting a simulation. If True, then + files from the last but one checkpoint will be used, rather than the most + recent checkpoint files. This can be useful if the most recent checkpoint + files are corrupted, or incomplete, e.g. you are recovering from a crash. + write_config: bool Whether to write the configuration options to a YAML file in the output directory. @@ -348,8 +421,12 @@ def __init__( Whether to save the energy contribution for each force when checkpointing. This is useful when debugging crashes. + page_size: int + The page size for trajectory handling in megabytes. If None, then Sire + will automatically set the page size. + timeout: str - Timeout for the minimiser. + Timeout for the minimiser and file lock. num_energy_neighbours: int The number of neighbouring windows to use when computing the energy @@ -372,6 +449,7 @@ def __init__( self.runtime = runtime self.temperature = temperature self.pressure = pressure + self.surface_tension = surface_tension self.barostat_frequency = barostat_frequency self.integrator = integrator self.cutoff_type = cutoff_type @@ -405,20 +483,32 @@ def __init__( self.frame_frequency = frame_frequency self.save_velocities = save_velocities self.checkpoint_frequency = checkpoint_frequency + self.num_checkpoint_workers = num_checkpoint_workers self.platform = platform self.max_threads = max_threads self.max_gpus = max_gpus self.oversubscription_factor = oversubscription_factor self.replica_exchange = replica_exchange + self.perturbed_system = perturbed_system + self.gcmc = gcmc + self.gcmc_selection = gcmc_selection + self.gcmc_excess_chemical_potential = gcmc_excess_chemical_potential + self.gcmc_standard_volume = gcmc_standard_volume + self.gcmc_num_waters = gcmc_num_waters + self.gcmc_radius = gcmc_radius + self.gcmc_bulk_sampling_probability = gcmc_bulk_sampling_probability + self.gcmc_tolerance = gcmc_tolerance self.rest2_scale = rest2_scale self.rest2_selection = rest2_selection self.restart = restart + self.use_backup = use_backup self.somd1_compatibility = somd1_compatibility self.pert_file = pert_file self.save_energy_components = save_energy_components self.timeout = timeout self.num_energy_neighbours = num_energy_neighbours self.null_energy = null_energy + self.page_size = page_size self.write_config = write_config @@ -510,6 +600,13 @@ def as_dict(self, sire_compatible=False): self._charge_scale_factor ): d["lambda_schedule"] = "charge_scaled_morph" + + # Use the path for the perturbed_system option, since the system + # isn't serializable. + if self.perturbed_system is not None: + d["perturbed_system"] = str(self._perturbed_system_file) + d.pop("perturbed_system_file", None) + return d @property @@ -607,6 +704,35 @@ def barostat_frequency(self, barostat_frequency): self._barostat_frequency = barostat_frequency + @property + def surface_tension(self): + return self._surface_tension + + @surface_tension.setter + def surface_tension(self, surface_tension): + if surface_tension is not None and not isinstance(surface_tension, str): + raise TypeError("'surface_tension' must be of type 'str'") + + from sire.units import atm, angstrom + + if surface_tension is not None: + try: + st = _sr.u(surface_tension) + except: + raise ValueError( + f"Unable to parse 'surface_tension' as a Sire GeneralUnit: {surface_tension}" + ) + # Make sure we can handle a value of zero. + if st == 0: + st = 0 * atm * angstrom + elif not st.has_same_units(atm * angstrom): + raise ValueError("'surface_tension' units are invalid.") + + self._surface_tension = st + + else: + self._surface_tension = surface_tension + @property def integrator(self): return self._integrator @@ -1193,12 +1319,28 @@ def checkpoint_frequency(self, checkpoint_frequency): "Checkpoint frequency is low. Should be greater min(energy_frequency, frame_frequency)" ) if t.value() > self._runtime.value(): - _logger.debug( + _logger.warning( "Checkpoint frequency < runtime, checkpointing will not occur before runtime is reached." ) t = _sr.u("0ps") self._checkpoint_frequency = t + @property + def num_checkpoint_workers(self): + return self._num_checkpoint_workers + + @num_checkpoint_workers.setter + def num_checkpoint_workers(self, num_checkpoint_workers): + if num_checkpoint_workers is not None: + if not isinstance(num_checkpoint_workers, int): + try: + num_checkpoint_workers = int(num_checkpoint_workers) + except: + raise ValueError("'num_checkpoint_workers' must be of type 'int'") + if num_checkpoint_workers < 1: + raise ValueError("'num_checkpoint_workers' must be greater than 0") + self._num_checkpoint_workers = num_checkpoint_workers + @property def platform(self): return self._platform @@ -1336,6 +1478,183 @@ def replica_exchange(self, replica_exchange): raise ValueError("'replica_exchange' must be of type 'bool'") self._replica_exchange = replica_exchange + @property + def perturbed_system(self): + return self._perturbed_system + + @perturbed_system.setter + def perturbed_system(self, perturbed_system): + if perturbed_system is not None: + if isinstance(perturbed_system, str): + import os + + if not os.path.exists(perturbed_system): + raise ValueError( + f"'perturbed_system' stream file does not exist: {perturbed_system}" + ) + + try: + self._perturbed_system = _sr.stream.load(perturbed_system) + self._perturbed_system_file = perturbed_system + except Exception as e: + raise ValueError( + f"Unable to load 'perturbed_system' stream file: {e}" + ) + else: + raise TypeError("'perturbed_system' must be of type 'str'") + else: + self._perturbed_system = None + self._perturbed_system_file = None + + @property + def gcmc(self): + return self._gcmc + + @gcmc.setter + def gcmc(self, gcmc): + if not isinstance(gcmc, bool): + raise ValueError("'gcmc' must be of type 'bool'") + + # GCMC isn't supported on macOS. + if gcmc: + import platform as _platform + + if _platform.system() == "Darwin": + raise ValueError("GCMC is not supported on macOS systems.") + + self._gcmc = gcmc + + @property + def gcmc_selection(self): + return self._gcmc_selection + + @gcmc_selection.setter + def gcmc_selection(self, gcmc_selection): + if gcmc_selection is not None: + if not isinstance(gcmc_selection, str): + raise TypeError("'gcmc_selection' must be of type 'str'") + self._gcmc_selection = gcmc_selection + + @property + def gcmc_excess_chemical_potential(self): + return self._gcmc_excess_chemical_potential + + @gcmc_excess_chemical_potential.setter + def gcmc_excess_chemical_potential(self, gcmc_excess_chemical_potential): + if not isinstance(gcmc_excess_chemical_potential, str): + raise TypeError("'gcmc_excess_chemical_potential' must be of type 'str'") + + from sire.units import kcal_per_mol + + try: + gcmc_e = _sr.u(gcmc_excess_chemical_potential) + except: + raise ValueError( + "Unable to parse 'gcmc_excess_chemical_potential' " + f"as a Sire GeneralUnit: {gcmc_excess_chemical_potential}" + ) + + if not gcmc_e.has_same_units(kcal_per_mol): + raise ValueError("'gcmc_excess_chemical_potential' units are invalid.") + + self._gcmc_excess_chemical_potential = gcmc_e + + @property + def gcmc_standard_volume(self): + return self._gcmc_standard_volume + + @gcmc_standard_volume.setter + def gcmc_standard_volume(self, gcmc_standard_volume): + if not isinstance(gcmc_standard_volume, str): + raise TypeError("'gcmc_standard_volume' must be of type 'str'") + + from sire.units import angstrom3 + + try: + gcmc_v = _sr.u(gcmc_standard_volume) + except: + raise ValueError( + "Unable to parse 'gcmc_standard_volume' " + f"as a Sire GeneralUnit: {gcmc_standard_volume}" + ) + + if not gcmc_v.has_same_units(angstrom3): + raise ValueError("'gcmc_standard_volume' units are invalid.") + + self._gcmc_standard_volume = gcmc_v + + @property + def gcmc_num_waters(self): + return self._gcmc_num_waters + + @gcmc_num_waters.setter + def gcmc_num_waters(self, gcmc_num_waters): + if gcmc_num_waters is not None: + if not isinstance(gcmc_num_waters, int): + try: + gcmc_num_waters = int(gcmc_num_waters) + except: + raise ValueError("'gcmc_num_waters' must be an integer") + + if gcmc_num_waters < 0: + raise ValueError("'gcmc_num_waters' must be greater than or equal to 0") + self._gcmc_num_waters = gcmc_num_waters + + @property + def gcmc_radius(self): + return self._gcmc_radius + + @gcmc_radius.setter + def gcmc_radius(self, gcmc_radius): + if not isinstance(gcmc_radius, str): + raise TypeError("'gcmc_radius' must be of type 'str'") + + from sire.units import angstrom + + try: + gcmc_r = _sr.u(gcmc_radius) + except: + raise ValueError( + "Unable to parse 'gcmc_radius' " f"as a Sire GeneralUnit: {gcmc_radius}" + ) + + if not gcmc_r.has_same_units(angstrom): + raise ValueError("'gcmc_radius' units are invalid.") + + self._gcmc_radius = gcmc_r + + @property + def gcmc_bulk_sampling_probability(self): + return self._gcmc_bulk_sampling_probability + + @gcmc_bulk_sampling_probability.setter + def gcmc_bulk_sampling_probability(self, gcmc_bulk_sampling_probability): + if not isinstance(gcmc_bulk_sampling_probability, float): + try: + gcmc_bulk_sampling_probability = float(gcmc_bulk_sampling_probability) + except Exception: + raise ValueError("'gcmc_bulk_sampling_probability' must be a float") + if gcmc_bulk_sampling_probability < 0.0 or gcmc_bulk_sampling_probability > 1.0: + raise ValueError( + "'gcmc_bulk_sampling_probability' must be between 0.0 and 1.0" + ) + self._gcmc_bulk_sampling_probability = gcmc_bulk_sampling_probability + + @property + def gcmc_tolerance(self): + return self._gcmc_tolerance + + @gcmc_tolerance.setter + def gcmc_tolerance(self, gcmc_tolerance): + if not isinstance(gcmc_tolerance, float): + try: + gcmc_tolerance = float(gcmc_tolerance) + except Exception: + raise ValueError("'gcmc_tolerance' must be a float") + if gcmc_tolerance < 0.0: + raise ValueError("'gcmc_tolerance' must be greater than or equal to 0.0") + self._gcmc_tolerance = gcmc_tolerance + @property def rest2_scale(self): return self._rest2_scale @@ -1382,6 +1701,16 @@ def restart(self, restart): raise ValueError("'restart' must be of type 'bool'") self._restart = restart + @property + def use_backup(self): + return self._use_backup + + @use_backup.setter + def use_backup(self, use_backup): + if not isinstance(use_backup, bool): + raise ValueError("'use_backup' must be of type 'bool'") + self._use_backup = use_backup + @property def somd1_compatibility(self): return self._somd1_compatibility @@ -1418,6 +1747,24 @@ def save_energy_components(self, save_energy_components): raise ValueError("'save_energy_components' must be of type 'bool'") self._save_energy_components = save_energy_components + @property + def page_size(self): + return self._page_size + + @page_size.setter + def page_size(self, page_size): + if page_size is not None: + if not isinstance(page_size, int): + try: + page_size = int(page_size) + except: + raise ValueError("'page_size' must be of type 'int'") + + if page_size < 1: + raise ValueError("'page_size' must be greater than 0") + + self._page_size = page_size + @property def timeout(self): return self._timeout @@ -1656,3 +2003,20 @@ def _create_parser(cls): ) return parser + + def _reset_logger(self, logger): + """ + Internal method to reset the logger. + + This can be used when a parallel process is spawned to ensure that + the logger is correctly configured. + """ + + import sys + + logger.remove() + logger.add(sys.stderr, level=self.log_level.upper(), enqueue=True) + if self.log_file is not None and self.output_directory is not None: + logger.add( + self.output_directory / self.log_file, level=self.log_level.upper() + ) diff --git a/src/somd2/runner/_base.py b/src/somd2/runner/_base.py index b523706a..ded857cc 100644 --- a/src/somd2/runner/_base.py +++ b/src/somd2/runner/_base.py @@ -21,7 +21,9 @@ __all__ = ["RunnerBase"] +from glob import glob as _glob from pathlib import Path as _Path +from shutil import copyfile as _copyfile import sire as _sr from sire.system import System as _System @@ -79,12 +81,69 @@ def __init__(self, system, config): self._config = config self._config._extra_args = {} + if self._config.replica_exchange and self._config.perturbed_system is not None: + # Make sure the number of positions is correct. + num_atoms = self._system.num_atoms() + num_pert_atoms = self._config.perturbed_system.num_atoms() + + if num_atoms != num_pert_atoms: + msg = ( + f"Number of atoms in 'perturbed_system' ({num_pert_atoms}) does not match " + f"the number of atoms in the 'system' ({num_atoms})." + ) + _logger.error(msg) + raise ValueError(msg) + + # Make sure the coordinates property is linked. + perturbed_system = _sr.morph.link_to_perturbed( + self._config.perturbed_system + ) + + # Store the positions. + self._perturbed_positions = _sr.io.get_coords_array(perturbed_system) + + # Store the box vectors. + cell = self._config.perturbed_system.space().box_matrix() + c0 = cell.column0() + c1 = cell.column1() + c2 = cell.column2() + self._perturbed_box = ( + (c0.x().value(), c0.y().value(), c0.z().value()), + (c1.x().value(), c1.y().value(), c1.z().value()), + (c2.x().value(), c2.y().value(), c2.z().value()), + ) + else: + self._perturbed_positions = None + self._perturbed_box = None + # Log the versions of somd2 and sire. from somd2 import __version__, _sire_version, _sire_revisionid _logger.info(f"somd2 version: {__version__}") _logger.info(f"sire version: {_sire_version}+{_sire_revisionid}") + # Flag whether frames are being saved. + if ( + self._config.frame_frequency > 0 + and self._config.frame_frequency <= self._config.runtime + ): + self._save_frames = True + else: + self._save_frames = False + + # Make sure the frame frequency doesn't exceed the checkpoint frequency. + # This constraint is currently required to avoid issues with missing + # frames when restarting from a checkpoint. This could be fixed by + # temporarily adjusting the frame frequency for the first checkpoint + # interval after a restart. + if ( + self._save_frames + and self._config.frame_frequency > self._config.checkpoint_frequency + ): + msg = "'frame_frequency' cannot be greater than 'checkpoint_frequency'." + _logger.error(msg) + raise ValueError(msg) + # Check whether we need to apply a perturbation to the reference system. if self._config.pert_file is not None: _logger.info( @@ -158,13 +217,26 @@ def __init__(self, system, config): num_atoms = waters[0].num_atoms() if num_atoms == 3: - # Here we assume TIP3p for any 3-point water model. + # Here we assume TIP3P for any 3-point water model. model = "tip3p" elif num_atoms == 4: - model = "tip4p" + # Check for OPC water. + try: + if ( + waters[0] + .search("element Xx") + .atoms()[0] + .charge() + .value() + < -1.1 + ): + model = "opc" + else: + model = "tip4p" + except: + model = "tip4p" elif num_atoms == 5: model = "tip5p" - try: self._system = _System( _setAmberWater(self._system._system, model) @@ -181,16 +253,25 @@ def __init__(self, system, config): # Ghost atoms are considered light when adding bond constraints. self._config._extra_args["ghosts_are_light"] = True - # Apply Boresch modifications to bonded terms involving ghost atoms to - # avoid spurious couplings to the physical system at the end states. + # Apply modifications to bonded terms involving ghost atoms to avoid + # spurious couplings to the physical system at the end states. elif self._config.ghost_modifications: - from .._utils._ghosts import boresch + from ghostly import modify - self._system = boresch(self._system) + _logger.info("Applying modifications to ghost atom bonded terms") + self._system = modify(self._system) # Check for a periodic space. self._has_space = self._check_space() + # Check for water. + try: + # The search will fail if there are no water molecules. + water = self._system["water"].molecules() + self._has_water = True + except: + self._has_water = False + # Check the end state contraints. self._check_end_state_constraints() @@ -304,6 +385,13 @@ def __init__(self, system, config): # Work out the current hydrogen mass factor. factor_non_water, factor_water = self._get_h_mass_factor(self._system) + # If using SOMD1 compatibility, then adjust the default value. + if self._config.somd1_compatibility and self._config.h_mass_factor == 3.0: + self._config.h_mass_factor = 1.5 + _logger.info( + "Using hydrogen mass repartitioning factor of 1.5 for SOMD1 compatibility." + ) + # We don't support repartiioning water molecules, so check those first. if factor_water is not None: if not isclose(factor_water, 1.0, abs_tol=1e-4): @@ -360,27 +448,154 @@ def __init__(self, system, config): # Flag whether this is a GPU simulation. self._is_gpu = self._config.platform in ["cuda", "opencl", "hip"] - # Need to verify before doing any directory checks + # Need to verify before doing any directory checks. if self._config.restart: self._verify_restart_config() # Check the output directories and create names of output files. self._filenames = self._prepare_output() + # Store the current system as a reference. + self._reference_system = self._system.clone() + # Check for a valid restart. if self._config.restart: + if self._config.use_backup: + self._restore_backup_files() self._is_restart, self._system = self._check_restart() else: self._is_restart = False + self._cleanup() - # Save config whenever 'configure' is called to keep it up to date + # Save config whenever 'configure' is called to keep it up to date. if self._config.write_config: _dict_to_yaml( self._config.as_dict(), self._filenames[0]["config"], ) - # Save the end state topologies to the output directory. + # Append only this number of lines from the end of the dataframe during checkpointing. + self._energy_per_block = int( + self._config.checkpoint_frequency / self._config.energy_frequency + ) + + # Zero the energy sample. + self._nrg_sample = 0 + + # GCMC specific validation. + if self._config.gcmc: + if self._config.platform != "cuda": + msg = "GCMC simulations require the CUDA platform." + _logger.error(msg) + raise ValueError(msg) + + if not self._has_space: + msg = "GCMC simulations require a periodic space." + _logger.error(msg) + raise ValueError(msg) + + if self._config.pressure != None: + msg = "GCMC simulations must be run in the NVT ensemble." + _logger.error(msg) + raise ValueError(msg) + + if isinstance(self._system, list): + mols = self._system[0] + else: + mols = self._system + + # Check that the system is solvated with water molecules. This + # is required for GCMC simulations since the existing waters + # provide a template for the ghost waters. + try: + water = mols["water"].molecules()[0] + except: + msg = "No water molecules in the system. Cannot perform GCMC." + _logger.error(msg) + raise ValueError(msg) + + # Make sure the frame frequency is a multiple of the energy frequency. + + # Get the ratio. + ratio = ( + self._config.frame_frequency / self._config.energy_frequency + ).value() + + # Make sure it's an integer. + if not isclose(ratio, round(ratio), abs_tol=1e-4): + msg = "'frame_frequency' must be a multiple of 'energy_frequency'." + _logger.error(msg) + raise ValueError(msg) + + # Make sure the checkpoint frequency is a multiple of the frame frequency. + + # Get the ratio. + ratio = ( + self._config.checkpoint_frequency / self._config.frame_frequency + ).value() + + # Make sure it's an integer. + if not isclose(ratio, round(ratio), abs_tol=1e-4): + msg = "'checkpoint_frequency' must be a multiple of 'frame_frequency'." + _logger.error(msg) + raise ValueError(msg) + + # Make sure the runtime is a multiple of the frame frequency. + + # Get the ratio. + ratio = (self._config.runtime / self._config.frame_frequency).value() + + # Make sure it's an integer. + if not isclose(ratio, round(ratio), abs_tol=1e-4): + msg = "'runtime' must be a multiple of 'frame_frequency'." + _logger.error(msg) + raise ValueError(msg) + + # Make sure the selection is valid. + if self._config.gcmc_selection is not None: + try: + atoms = _sr.mol.selection_to_atoms( + self._system, self._config.gcmc_selection + ) + except: + msg = "Invalid 'gcmc_selection' value." + _logger.error(msg) + raise ValueError(msg) + + # Store the excess chemcical potential value. + self._mu_ex = self._config.gcmc_excess_chemical_potential.value() + + # Store the initial system time. + if isinstance(self._system, list): + self._initial_time = [] + for system in self._system: + if system is None: + self._initial_time.append(_sr.u("0 ps")) + else: + self._initial_time.append(system.time()) + else: + self._initial_time = [self._system.time()] * len(self._lambda_values) + + # Check for missing systems in a multi-system simulation. + if isinstance(self._system, list): + ref_system = None + missing_systems = [] + for i, system in enumerate(self._system): + if system is not None: + ref_system = None + else: + missing_systems.append(i) + if ref_system is None: + ref_system = self._reference_system + + # Fill in any missing systems. + for i in missing_systems: + self._system[i] = ref_system.clone() + + # Create the lock file name. + self._lock_file = str(self._config.output_directory / "somd2.lock") + + # Write the end-state topologies to the output directory. if isinstance(self._system, list): mols = self._system[0] else: @@ -390,20 +605,21 @@ def __init__(self, system, config): _sr.save(mols0, self._filenames["topology0"]) _sr.save(mols1, self._filenames["topology1"]) - # Append only this number of lines from the end of the dataframe during checkpointing. - self._energy_per_block = int( - self._config.checkpoint_frequency / self._config.energy_frequency - ) + # Update the tajectory page size. + if self._config.page_size is not None: + # Convert from MB to bytes. + page_size = int(self._config.page_size * 1024 * 1024) - # Zero the energy sample. - self._nrg_sample = 0 + # Set the new page size. + _sr.base.PageCache.set_max_page_size(page_size) # Create the default dynamics kwargs dictionary. These can be overloaded # as needed. self._dynamics_kwargs = { "integrator": config.integrator, "temperature": config.temperature, - "pressure": config.pressure if self._has_space else None, + "pressure": config.pressure if self._has_water else None, + "surface_tension": config.surface_tension, "barostat_frequency": config.barostat_frequency, "timestep": config.timestep, "restraints": config.restraints, @@ -425,6 +641,32 @@ def __init__(self, system, config): "map": config._extra_args, } + # Create the GCMC specific kwargs dictionary. + if self._config.gcmc: + self._gcmc_kwargs = { + "reference": self._config.gcmc_selection, + "excess_chemical_potential": str( + self._config.gcmc_excess_chemical_potential + ), + "standard_volume": str(self._config.gcmc_standard_volume), + "radius": str(self._config.gcmc_radius), + "num_ghost_waters": self._config.gcmc_num_waters, + "bulk_sampling_probability": self._config.gcmc_bulk_sampling_probability, + "cutoff_type": self._config.cutoff_type, + "cutoff": str(self._config.cutoff), + "temperature": str(self._config.temperature), + "lambda_schedule": self._config.lambda_schedule, + "coulomb_power": self._config.coulomb_power, + "shift_coulomb": str(self._config.shift_coulomb), + "shift_delta": str(self._config.shift_delta), + "swap_end_states": self._config.swap_end_states, + "tolerance": self._config.gcmc_tolerance, + "overwrite": self._config.overwrite, + "no_logger": True, + } + else: + self._gcmc_kwargs = None + def _check_space(self): """ Check if the system has a periodic space. @@ -442,11 +684,11 @@ def _check_space(self): return True else: _logger.info("No periodic space detected. Assuming vacuum simulation.") - if self._config.cutoff_type == "pme": + if self._config.cutoff_type != "none": _logger.info( - "Cannot use PME for non-periodic simulations. Using RF cutoff instead." + "Cannot use PME for non-periodic simulations. Using no cutoff instead." ) - self._config.cutoff_type = "rf" + self._config.cutoff_type = "none" return False def _check_end_state_constraints(self): @@ -712,6 +954,7 @@ def increment_filename(base_filename, suffix): filenames["energy_components"] = str( output_directory / f"energy_components_{lam}.txt" ) + filenames["gcmc_ghosts"] = str(output_directory / f"gcmc_ghosts_{lam}.txt") if restart: filenames["config"] = str( output_directory / increment_filename("config", "yaml") @@ -761,6 +1004,8 @@ def _prepare_output(self): for file in list(set(deleted)): file.unlink() + # File names for end-state topologies. This can be used for trajectory + # visulation and analysis. filenames["topology0"] = str(self._config.output_directory / "system0.prm7") filenames["topology1"] = str(self._config.output_directory / "system1.prm7") @@ -781,17 +1026,22 @@ def _check_restart(self): """ # List to store systems for each lambda value. - systems = [] + systems = [None] * len(self._lambda_values) for i, lambda_value in enumerate(self._lambda_values): # Try to load the checkpoint file. try: system = _sr.stream.load(self._filenames[i]["checkpoint"]) except: - _logger.warning( - f"Unable to load checkpoint file for {_lam_sym}={lambda_value:.5f}, starting from scratch." - ) - return False, self._system + if not self._config.replica_exchange: + _logger.warning( + f"Unable to load checkpoint file for {_lam_sym}={lambda_value:.5f}, starting from scratch." + ) + # Repex requires all files to be present. + else: + msg = f"Unable to load checkpoint file for {_lam_sym}={lambda_value:.5f}." + _logger.error(msg) + raise ValueError(msg) else: # Check the system is the same as the reference system. are_same, reason = self._systems_are_same(self._system, system) @@ -804,15 +1054,14 @@ def _check_restart(self): self._compare_configs( self._last_config, dict(system.property("config")) ) - except: + except Exception as e: config = dict(system.property("config")) _logger.debug( f"last config: {self._last_config}, current config: {config}" ) - _logger.error( - f"Config for {_lam_sym}={lambda_value} does not match previous config." - ) - raise + msg = f"Config for {_lam_sym}={lambda_value} does not match previous config: {str(e)}" + _logger.error(msg) + raise ValueError(msg) # Make sure the lambda value is consistent. else: lambda_restart = system.property("lambda") @@ -820,13 +1069,49 @@ def _check_restart(self): lambda_restart == lambda_value except: filename = self._filenames[i]["checkpoint"] - raise ValueError( - f"Lambda value from checkpoint file {filename} for {lambda_restart} " - f"does not match expected value {lambda_value}." + msg = ( + f"Lambda value from checkpoint file {filename} for {_lam_sym}={lambda_restart} " + f"does not match expected value {_lam_sym}={lambda_value}." ) + _logger.error(msg) + raise ValueError(msg) - # Append the system to the list. - systems.append(_sr.morph.link_to_reference(system)) + # Store the system to the list. + systems[i] = _sr.morph.link_to_perturbed(system) + + # If this is a GCMC simulation, then remove all ghost waters from each of the systems. + if self._config.gcmc: + # List to store the indices of the current ghost waters. + self._restart_ghost_waters = [] + # List to store the current positions. + self._restart_positions = [] + _logger.info("Removing existing ghost waters from GCMC checkpoint systems") + for i, system in enumerate(systems): + # Store the positions of all atoms. + self._restart_positions.append(_sr.io.get_coords_array(system)) + if system is not None: + # Remove the ghost waters from the system. + try: + # Get the water molecule indices. + waters = system.molecules().find(system["water"].molecules()) + + # Get the ghost waters and their indices. + ghost_waters = system["property is_ghost_water"].molecules() + ghost_waters = system.molecules().find(ghost_waters) + + # Store the indices of the ghost waters in the waters list. + idxs = [] + for index in ghost_waters: + idxs.append(waters.index(index)) + self._restart_ghost_waters.append(idxs) + + for mol in system["property is_ghost_water"].molecules(): + _logger.debug( + f"Removing ghost water molecule {mol.number()} for {_lam_sym}={self._lambda_values[i]:.5f}" + ) + system.remove(mol) + except: + pass return True, systems @@ -856,7 +1141,6 @@ def _compare_configs(config1, config2): "save_trajectory", "frame_frequency", "save_velocities", - "checkpoint_frequency", "platform", "max_threads", "max_gpus", @@ -988,7 +1272,7 @@ def _systems_are_same(system0, system1): return True, None @staticmethod - def _get_gpu_devices(platform): + def _get_gpu_devices(platform, oversubscription_factor=1): """ Get list of available GPUs from CUDA_VISIBLE_DEVICES, OPENCL_VISIBLE_DEVICES, or HIP_VISIBLE_DEVICES. @@ -999,6 +1283,9 @@ def _get_gpu_devices(platform): platform: str The GPU platform to be used for simulations. + oversubscription_factor: int + The number of concurrent workers per GPU. Default is 1. + Returns -------- @@ -1037,6 +1324,7 @@ def _get_gpu_devices(platform): num_gpus = len(available_devices) _logger.info(f"Number of GPUs available: {num_gpus}") + _logger.info(f"Number of concurrent workers per GPU: {oversubscription_factor}") return available_devices @@ -1163,6 +1451,27 @@ def _checkpoint( from somd2 import __version__, _sire_version, _sire_revisionid + # Save the end-state GCMC topologies for trajectory analysis and visualisation. + if self._config.gcmc: + # Only save for first block. + if block == 0: + mols0 = _sr.morph.link_to_reference(system) + mols1 = _sr.morph.link_to_perturbed(system) + + # Save to AMBER format. + _sr.save(mols0, self._filenames["topology0"]) + _sr.save(mols1, self._filenames["topology1"]) + + # Save to PDB format. + _sr.save( + mols0, + self._filenames["topology0"].replace(".prm7", ".pdb"), + ) + _sr.save( + mols1, + self._filenames["topology1"].replace(".prm7", ".pdb"), + ) + # Get the lambda value. lam = self._lambda_values[index] @@ -1179,7 +1488,6 @@ def _checkpoint( "somd2 version": __version__, "sire version": f"{_sire_version}+{_sire_revisionid}", "lambda": str(lam), - "lambda_array": lambda_energy, "speed": speed, "temperature": str(self._config.temperature.value()), } @@ -1192,7 +1500,7 @@ def _checkpoint( # Assemble and save the final trajectory. if self._config.save_trajectories: # Save the final trajectory chunk to file. - if system.num_frames() > 0: + if self._save_frames and system.num_frames() > 0: traj_filename = ( self._filenames[index]["trajectory_chunk"] + f"{block:05d}.dcd" ) @@ -1209,10 +1517,8 @@ def _checkpoint( traj_filename = self._filenames[index]["trajectory"] # Glob for the trajectory chunks. - from glob import glob - traj_chunks = sorted( - glob(f"{self._filenames[index]['trajectory_chunk']}*") + _glob(f"{self._filenames[index]['trajectory_chunk']}*") ) # If this is a restart, then we need to check for an existing @@ -1221,10 +1527,8 @@ def _checkpoint( if self._config.restart: path = _Path(traj_filename) if path.exists() and path.stat().st_size > 0: - from shutil import copyfile - - copyfile(traj_filename, f"{traj_filename}.bak") - traj_chunks = [f"{traj_filename}.bak"] + traj_chunks + _copyfile(traj_filename, f"{traj_filename}.prev") + traj_chunks = [f"{traj_filename}.prev"] + traj_chunks # Load the topology and chunked trajectory files. mols = _sr.load([topology0] + traj_chunks) @@ -1257,7 +1561,7 @@ def _checkpoint( # Save the current trajectory chunk to file. if self._config.save_trajectories: - if system.num_frames() > 0: + if self._save_frames and system.num_frames() > 0: traj_filename = ( self._filenames[index]["trajectory_chunk"] + f"{block:05d}.dcd" ) @@ -1287,6 +1591,42 @@ def _checkpoint( df.iloc[-self._energy_per_block :], ) + def _backup_checkpoint(self, index): + """ + Create a backup of the previous checkpoint files. + + Parameters + ---------- + + index : int + The index of the window or replica. + """ + + try: + # Backup the existing checkpoint file, if it exists. + path = _Path(self._filenames[index]["checkpoint"]) + if path.exists() and path.stat().st_size > 0: + _copyfile( + self._filenames[index]["checkpoint"], + str(self._filenames[index]["checkpoint"]) + ".bak", + ) + traj_filename = self._filenames[index]["trajectory"] + except Exception as e: + return False, e + + try: + # Backup the existing energy trajectory file, if it exists. + path = _Path(self._filenames[index]["energy_traj"]) + if path.exists() and path.stat().st_size > 0: + _copyfile( + self._filenames[index]["energy_traj"], + str(self._filenames[index]["energy_traj"]) + ".bak", + ) + except Exception as e: + return False, e + + return True, None + def _save_energy_components(self, index, context): """ Internal function to save the energy components for each force group to file. @@ -1321,8 +1661,10 @@ def _save_energy_components(self, index, context): # Process the records. for i, f in enumerate(system.getForces()): state = new_context.getState(getEnergy=True, groups={i}) - header += f"{f.getName():>25}" - record += f"{state.getPotentialEnergy().value_in_unit(openmm.unit.kilocalories_per_mole):>25.2f}" + name = f.getName() + name_len = len(name) + header += f"{f.getName():>{name_len+2}}" + record += f"{state.getPotentialEnergy().value_in_unit(openmm.unit.kilocalories_per_mole):>{name_len+2}.2f}" # Write to file. if self._nrg_sample == 0: @@ -1335,3 +1677,37 @@ def _save_energy_components(self, index, context): # Increment the sample number. self._nrg_sample += 1 + + def _restore_backup_files(self): + """ + Restore backup files in the working directory. + """ + + # Find all files with a .bak extension in the working directory. + backup_files = _glob(str(self._config.output_directory / "*.bak")) + + # Strip the .bak extension and copy to the original file name. + for file in backup_files: + path = _Path(file) + new_path = _Path(str(path)[:-4]) + try: + _copyfile(file, new_path) + except Exception as e: + msg = f"Unable to restore backup file {file}: {str(e)}" + _logger.error(msg) + raise IOError(msg) + + def _cleanup(self): + """ + Clean up backup files from the working directory. + """ + + # Find all files with a .bak extension in the working directory. + backup_files = _glob(str(self._config.output_directory / "*.bak")) + + for file in backup_files: + path = _Path(file) + try: + path.unlink() + except Exception as e: + _logger.warning(f"Unable to delete backup file {file}: {str(e)}") diff --git a/src/somd2/runner/_repex.py b/src/somd2/runner/_repex.py index 6d2e3ddc..9e1f18fd 100644 --- a/src/somd2/runner/_repex.py +++ b/src/somd2/runner/_repex.py @@ -21,7 +21,9 @@ __all__ = ["RepexRunner"] +from filelock import FileLock as _FileLock from numba import njit as _njit +from shutil import copyfile as _copyfile import numpy as _np import pickle as _pickle @@ -41,7 +43,18 @@ class DynamicsCache: A class for caching dynamics objects. """ - def __init__(self, system, lambdas, rest2_scale_factors, num_gpus, dynamics_kwargs): + def __init__( + self, + system, + lambdas, + rest2_scale_factors, + num_gpus, + dynamics_kwargs, + gcmc_kwargs=None, + output_directory=None, + perturbed_positions=None, + perturbed_box=None, + ): """ Constructor. @@ -62,6 +75,20 @@ def __init__(self, system, lambdas, rest2_scale_factors, num_gpus, dynamics_kwar dynamics_kwargs: dict A dictionary of default dynamics keyword arguments. + + gcmc_kwargs: dict + GCMC specific keyword arguments. If None, then GCMC is not used. + + output_directory: pathlib.Path + The directory for simulation output. + + perturbed_positions: numpy.ndarray + The positions for the perturbed state. If None, then the perturbed state + is not used. + + perturbed_box: numpy.ndarray + The box vectors for the perturbed state. If None, then the perturbed state + is not used. """ # Warn if the number of replicas is not a multiple of the number of GPUs. @@ -77,14 +104,23 @@ def __init__(self, system, lambdas, rest2_scale_factors, num_gpus, dynamics_kwar self._states = _np.array(range(len(lambdas))) self._old_states = _np.array(range(len(lambdas))) self._openmm_states = [None] * len(lambdas) - self._openmm_volumes = [None] * len(lambdas) + self._gcmc_samplers = [None] * len(lambdas) + self._gcmc_states = [None] * len(lambdas) self._num_proposed = _np.matrix(_np.zeros((len(lambdas), len(lambdas)))) self._num_accepted = _np.matrix(_np.zeros((len(lambdas), len(lambdas)))) self._num_swaps = _np.matrix(_np.zeros((len(lambdas), len(lambdas)))) # Create the dynamics objects. self._create_dynamics( - system, lambdas, rest2_scale_factors, num_gpus, dynamics_kwargs + system, + lambdas, + rest2_scale_factors, + num_gpus, + dynamics_kwargs, + gcmc_kwargs=gcmc_kwargs, + output_directory=output_directory, + perturbed_positions=perturbed_positions, + perturbed_box=perturbed_box, ) def __setstate__(self, state): @@ -106,7 +142,9 @@ def __getstate__(self): "_states": self._states, "_old_states": self._old_states, "_openmm_states": self._openmm_states, - "_openmm_volumes": self._openmm_volumes, + # Don't pickle the GCMC samplers since they need to be recreated. + "_gcmc_samplers": len(self._gcmc_samplers) * [None], + "_gcmc_states": self._gcmc_states, "_num_proposed": self._num_proposed, "_num_accepted": self._num_accepted, "_num_swaps": self._num_swaps, @@ -115,7 +153,16 @@ def __getstate__(self): return d def _create_dynamics( - self, system, lambdas, rest2_scale_factors, num_gpus, dynamics_kwargs + self, + system, + lambdas, + rest2_scale_factors, + num_gpus, + dynamics_kwargs, + gcmc_kwargs=None, + output_directory=None, + perturbed_positions=None, + perturbed_box=None, ): """ Create the dynamics objects. @@ -137,11 +184,29 @@ def _create_dynamics( dynamics_kwargs: dict A dictionary of default dynamics keyword arguments. + + gcmc_kwargs: dict + GCMC specific keyword arguments. If None, then GCMC is not used. + + output_directory: pathlib.Path + The directory for simulation output. + + perturbed_positions: numpy.ndarray + The positions for the perturbed state. If None, then the perturbed state + is not used. + + perturbed_box: numpy.ndarray + The box vectors for the perturbed state. If None, then the perturbed state + is not used. """ # Copy the dynamics keyword arguments. dynamics_kwargs = dynamics_kwargs.copy() + # Copy the GCMC keyword arguments. + if gcmc_kwargs is not None: + gcmc_kwargs = gcmc_kwargs.copy() + # Initialise the dynamics object list. self._dynamics = [] @@ -162,6 +227,35 @@ def _create_dynamics( dynamics_kwargs["lambda_value"] = lam dynamics_kwargs["rest2_scale"] = scale + if gcmc_kwargs is not None: + try: + from loch import GCMCSampler + except: + msg = "loch is not installed. GCMC sampling cannot be performed." + _logger.error(msg) + + ghost_file = str(output_directory / f"gcmc_ghosts_{lam:.5f}.txt") + + # Create the GCMC sampler. + gcmc_sampler = GCMCSampler( + mols, + device=device, + lambda_value=lam, + rest2_scale=scale, + ghost_file=ghost_file, + **gcmc_kwargs, + ) + + # Get the modified GCMC system. + mols = gcmc_sampler.system() + + # Store the GCMC sampler. + self._gcmc_samplers[i] = gcmc_sampler + + _logger.info( + f"Created GCMC sampler for lambda {lam:.5f} on device {device}" + ) + # Create the dynamics object. try: dynamics = mols.dynamics(**dynamics_kwargs) @@ -170,6 +264,23 @@ def _create_dynamics( _logger.error(msg) raise RuntimeError(msg) from e + # Update the box vectors and positions if the perturbed state is used. + if ( + perturbed_positions is not None + and perturbed_box is not None + and lam > 0.5 + ): + from openmm.unit import angstrom + + dynamics.context().setPeriodicBoxVectors(*perturbed_box * angstrom) + dynamics.context().setPositions(perturbed_positions * angstrom) + + # Bind the GCMC sampler to the dynamics object. This allows the + # dynamics object to reset the water state in its internal OpenMM + # context following a crash recovery. + if gcmc_kwargs is not None: + gcmc_sampler.bind_dynamics(dynamics) + # Append the dynamics object. self._dynamics.append(dynamics) @@ -179,7 +290,7 @@ def _create_dynamics( def get(self, index): """ - Get the dynamics object for a given index. + Get the dynamics object (and GCMC sampler) for a given index. Parameters ---------- @@ -191,9 +302,9 @@ def get(self, index): ------- tuple - The dynamics object for the replica. + The dynamics object for the replica and its GCMC sampler. """ - return self._dynamics[index] + return self._dynamics[index], self._gcmc_samplers[index] def set(self, index, dynamics): """ @@ -235,17 +346,30 @@ def save_openmm_state(self, index): from openmm.unit import angstrom # Get the current OpenMM state. - state = self._dynamics[index]._d._omm_mols.getState( - getPositions=True, getVelocities=True + state = ( + self._dynamics[index] + .context() + .getState(getPositions=True, getVelocities=True) ) # Store the state. self._openmm_states[index] = state - # Store the volume. - self._openmm_volumes[index] = state.getPeriodicBoxVolume().value_in_unit( - angstrom**3 - ) + def save_gcmc_state(self, index): + """ + Save the current GCMC water state for the replica. + + Parameters + ---------- + + index: int + The index of the replica. + """ + # Get the GCMC sampler. + gcmc_sampler = self._gcmc_samplers[index] + + # Store the state. + self._gcmc_states[index] = gcmc_sampler.water_state() def get_states(self): """ @@ -280,7 +404,23 @@ def mix_states(self): # The state has changed. if i != state: _logger.debug(f"Replica {i} seeded from state {state}") - self._dynamics[i]._d._omm_mols.setState(self._openmm_states[state]) + self._dynamics[i].context().setState(self._openmm_states[state]) + + # Swap the water state in the GCMCSamplers. + if self._gcmc_samplers[i] is not None: + # Find the indices of the water states that differ. + water_idxs = _np.where( + self._gcmc_states[i] != self._gcmc_states[state] + )[0] + + # Update the water state in the GCMCSampler. + self._gcmc_samplers[i].push() + self._gcmc_samplers[i]._set_water_state( + self._dynamics[i].context(), + indices=water_idxs, + states=self._gcmc_states[state][water_idxs], + ) + self._gcmc_samplers[i].pop() # Update the swap matrix. old_state = self._old_states[i] @@ -321,7 +461,7 @@ def __init__(self, system, config): ---------- system: str, :class: `System ` - The perturbable system to be simulated. This can be either a path + The perturbable system to be simulated. This can either be a path to a stream file, or a Sire system object. config: :class: `Config ` @@ -353,7 +493,9 @@ def __init__(self, system, config): # Get the number of available GPUs. try: - gpu_devices = self._get_gpu_devices("cuda") + gpu_devices = self._get_gpu_devices( + "cuda", self._config.oversubscription_factor + ) except Exception as e: _logger.error(f"Could not determine available GPU devices: {e}") raise e @@ -407,6 +549,10 @@ def __init__(self, system, config): self._rest2_scale_factors, self._num_gpus, dynamics_kwargs, + gcmc_kwargs=self._gcmc_kwargs, + perturbed_positions=self._perturbed_positions, + perturbed_box=self._perturbed_box, + output_directory=self._config.output_directory, ) else: # Check to see if the simulation is already complete. @@ -438,25 +584,43 @@ def __init__(self, system, config): self._rest2_scale_factors, self._num_gpus, self._dynamics_kwargs, + gcmc_kwargs=self._gcmc_kwargs, + output_directory=self._config.output_directory, ) + # Reset the state of the OpenMM contexts and GCMC samplers. + for i in range(len(self._lambda_values)): + dynamics, gcmc_sampler = self._dynamics_cache.get(i) + + # Reset the OpenMM state. + dynamics.context().setState(self._dynamics_cache._openmm_states[i]) + + # Reset the GCMC water state. + if gcmc_sampler is not None: + gcmc_sampler.push() + gcmc_sampler._set_water_state( + dynamics.context(), + states=self._dynamics_cache._gcmc_states[i], + force=True, + ) + gcmc_sampler.pop() + # Conversion factor for reduced potential. kT = (_sr.units.k_boltz * self._config.temperature).to(_sr.units.kcal_per_mol) self._beta = 1.0 / kT - # Store the pressure times Avaogadro's number. - NA = 6.02214076e23 / _sr.units.mole - self._pressure = (self._config.pressure * NA).value() - # If restarting, subtract the time already run from the total runtime if self._config.restart: time = self._system[0].time() self._config.runtime = str(self._config.runtime - time) # Work out the current block number. - self._start_block = int( - round(time.value() / self._config.checkpoint_frequency.value(), 12) - ) + if self._config.checkpoint_frequency.value() > 0.0: + self._start_block = int( + round(time.value() / self._config.checkpoint_frequency.value(), 12) + ) + else: + self._start_block = 0 else: self._start_block = 0 @@ -488,13 +652,18 @@ def run(self): start = time() # Work out the number of repex cycles. - cycles = ceil(self._config.runtime / self._config.energy_frequency) + cycles = (self._config.runtime / self._config.energy_frequency).value() + + # Handle rounding errors to to internal time representation. + if abs(cycles - round(cycles)) < 1e-6: + cycles = int(round(cycles)) + # Round up to ensure we run at least the requested time. + else: + cycles = int(ceil(cycles)) if self._config.checkpoint_frequency.value() > 0.0: # Calculate the number of blocks and the remainder time. - frac = ( - self._config.runtime.value() / self._config.checkpoint_frequency.value() - ) + frac = (self._config.runtime / self._config.checkpoint_frequency).value() # Handle the case where the runtime is less than the checkpoint frequency. if frac < 1.0: @@ -502,7 +671,7 @@ def run(self): self._config.checkpoint_frequency = str(self._config.runtime) num_blocks = int(frac) - rem = frac - num_blocks + rem = round(frac - num_blocks, 12) # Work out the number of repex cycles per block. frac = ( @@ -527,8 +696,17 @@ def run(self): # Store the number of concurrent workers. num_workers = self._num_gpus * self._config.oversubscription_factor + # Store the number of workers to use for checkpointing. + if self._config.num_checkpoint_workers is None: + num_checkpoint_workers = num_workers + else: + num_checkpoint_workers = min( + self._config.num_checkpoint_workers, num_workers + ) + # Work out the required number of batches. num_batches = ceil(self._config.num_lambda / num_workers) + num_checkpoint_batches = ceil(self._config.num_lambda / num_checkpoint_workers) # Create the replica list. replica_list = list(range(self._config.num_lambda)) @@ -575,6 +753,14 @@ def run(self): # Record the start time for the production block. prod_start = time() + # Store the number of blocks per-frame. For GCMC, we need to write the + # indices of the current ghost water residues each time a frame is saved. + # For GCMC simulations, the frame frequency is guaranteed to be a multiple + # of the energy frequency. + cycles_per_frame = int( + self._config.frame_frequency / self._config.energy_frequency + ) + # Perform the replica exchange simulation. for i in range(cycles): _logger.info(f"Running dynamics for cycle {i+1} of {cycles}") @@ -594,6 +780,9 @@ def run(self): # Whether to checkpoint. is_checkpoint = i > 0 and i % cycles_per_checkpoint == 0 + # Whether a frame is saved at the end of the cycle. + write_gcmc_ghosts = i > 0 and i % cycles_per_frame == 0 + # Run a dynamics block for each replica, making sure only each GPU is only # oversubscribed by a factor of self._config.oversubscription_factor. for j in range(num_batches): @@ -604,11 +793,8 @@ def run(self): self._run_block, replicas, repeat(self._lambda_values), - repeat(is_checkpoint), repeat(i == 0), - repeat(i == cycles - 1), - repeat(block), - repeat(num_blocks + int(rem > 0)), + repeat(write_gcmc_ghosts), ): if not result: _logger.error( @@ -620,6 +806,66 @@ def run(self): _logger.error("Dynamics cancelled. Exiting.") _sys.exit(1) + # Checkpoint. + if is_checkpoint or i == cycles - 1: + # Create the lock. + lock = _FileLock(self._lock_file) + + # Acquire the file lock to ensure that the checkpoint files are + # in a consistent state if read by another process. + with lock.acquire(timeout=self._config.timeout.to("seconds")): + # First backup existing checkpoint files. + for j in range(num_checkpoint_batches): + # Get the indices of the replicas in this batch. + replicas = replica_list[ + j + * num_checkpoint_workers : (j + 1) + * num_checkpoint_workers + ] + with ThreadPoolExecutor(max_workers=num_workers) as executor: + try: + for result, error in executor.map( + self._backup_checkpoint, + replicas, + ): + if not result: + _logger.error( + f"Backup failed for {_lam_sym} = " + f"{self._lambda_values[index]:.5f}: {error}" + ) + raise error + except KeyboardInterrupt: + _logger.error("Backup cancelled. Exiting.") + _sys.exit(1) + + # Now write the new checkpoint files. + for j in range(num_checkpoint_batches): + # Get the indices of the replicas in this batch. + replicas = replica_list[ + j + * num_checkpoint_workers : (j + 1) + * num_checkpoint_workers + ] + with ThreadPoolExecutor(max_workers=num_workers) as executor: + try: + for result, error in executor.map( + self._checkpoint, + replicas, + repeat(self._lambda_values), + repeat(block), + repeat(num_blocks + int(rem > 0)), + repeat(i == cycles - 1), + ): + if not result: + _logger.error( + f"Checkpoint failed for {_lam_sym} = " + f"{self._lambda_values[index]:.5f}: {error}" + ) + raise error + except KeyboardInterrupt: + _logger.error("Checkpoint cancelled. Exiting.") + _sys.exit(1) + if i < cycles: # Assemble and energy matrix from the results. _logger.info("Assembling energy matrix") @@ -642,26 +888,45 @@ def run(self): # Update the block number. block += 1 - # Save the transition matrix. - _logger.info("Saving replica exchange transition matrix") - self._save_transition_matrix() - - # Pickle the dynamics cache. - _logger.info("Saving replica exchange state") - with open(self._repex_state, "wb") as f: - _pickle.dump(self._dynamics_cache, f) + # Guard the repex state and transition matrix saving with a file lock. + lock = _FileLock(self._lock_file) + with lock.acquire(timeout=self._config.timeout.to("seconds")): + # Save the transition matrix. + _logger.info("Saving replica exchange transition matrix") + self._save_transition_matrix() + + # Backup the dynamics cache pickle file, if it exists. + if self._repex_state.exists(): + _copyfile( + self._repex_state, + self._repex_state.with_suffix(".pkl.bak"), + ) + + # Pickle the dynamics cache. + _logger.info("Saving replica exchange state") + with open(self._repex_state, "wb") as f: + _pickle.dump(self._dynamics_cache, f) # Record the end time for the production block. prod_end = time() - # Save the final transition matrix. - _logger.info("Saving final replica exchange transition matrix") - self._save_transition_matrix() + lock = _FileLock(self._lock_file) + with lock.acquire(timeout=self._config.timeout.to("seconds")): + # Save the final transition matrix. + _logger.info("Saving final replica exchange transition matrix") + self._save_transition_matrix() + + # Backup the dynamics cache pickle file, if it exists. + if self._repex_state.exists(): + _copyfile( + self._repex_state, + self._repex_state.with_suffix(".pkl.bak"), + ) - # Pickle final state of the dynamics cache. - _logger.info("Saving final replica exchange state") - with open(self._repex_state, "wb") as f: - _pickle.dump(self._dynamics_cache, f) + # Pickle final state of the dynamics cache. + _logger.info("Saving final replica exchange state") + with open(self._repex_state, "wb") as f: + _pickle.dump(self._dynamics_cache, f) # Record the end time. end = time() @@ -673,22 +938,22 @@ def run(self): prod_speed = self._config.runtime.to("ns") / prod_time # Record the average production speed. - _logger.info(f"Average replica speed: {prod_speed:.2f} ns day-1") + _logger.info(f"Overall performance: {prod_speed:.2f} ns day-1") # Log the run time in minutes. _logger.success( f"Simulation finished. Run time: {(end - start) / 60:.2f} minutes" ) + # Delete all backup files from the working directory. + self._cleanup() + def _run_block( self, index, lambdas, - is_checkpoint, is_first_block, - is_final_block, - block, - num_blocks, + write_gcmc_ghosts=False, ): """ Run a dynamics block for a given replica. @@ -702,24 +967,16 @@ def _run_block( lambdas: np.ndarray The lambda values for each replica. - rest2_scale: np.ndarray - The REST2 scaling factor for each replica. - - is_checkpoint: bool - Whether to checkpoint. - is_first_block: bool Whether this is the first block. - is_final_block: bool - Whether this is the final block. - - block: int - The block number. - num_blocks: int The total number of blocks. + write_gcmc_ghosts: bool + Whether to write the indices of GCMC ghost residues to + file. + Returns ------- @@ -738,8 +995,8 @@ def _run_block( lam = lambdas[index] try: - # Get the dynamics object. - dynamics = self._dynamics_cache.get(index) + # Get the dynamics object (and GCMC sampler). + dynamics, gcmc_sampler = self._dynamics_cache.get(index) # Minimise the system if this is a restart simulation and this is # the first block. @@ -748,11 +1005,25 @@ def _run_block( _logger.info(f"Minimising restart at {_lam_sym} = {lam:.5f}") dynamics.minimise(timeout=self._config.timeout) - _logger.info(f"Running dynamics for {_lam_sym} = {lam:.5f}") + _logger.info(f"Running dynamics at {_lam_sym} = {lam:.5f}") # Draw new velocities from the Maxwell-Boltzmann distribution. dynamics.randomise_velocities() + # Perform a GCMC move. For repex this needs to be done before the + # dynamics block so that the final energies, which are used in the + # repex acceptance criteria, are correct. + if gcmc_sampler is not None: + # Push the PyCUDA context on top of the stack. + gcmc_sampler.push() + + # Perform the GCMC move. + _logger.info(f"Performing GCMC move at {_lam_sym} = {lam:.5f}") + gcmc_sampler.move(dynamics.context()) + + # Remove the PyCUDA context from the stack. + gcmc_sampler.pop() + # Run the dynamics. dynamics.run( self._config.energy_frequency, @@ -764,12 +1035,29 @@ def _run_block( auto_fix_minimise=True, num_energy_neighbours=self._config.num_energy_neighbours, null_energy=self._config.null_energy, + # GCMC specific options. + excess_chemical_potential=( + self._mu_ex if gcmc_sampler is not None else None + ), + num_waters=( + _np.sum(gcmc_sampler.water_state()) + if gcmc_sampler is not None + else None + ), ) # Set the state. self._dynamics_cache.save_openmm_state(index) - # Get the energies at each lambda value. + # Save the GCMC state. + if gcmc_sampler is not None: + self._dynamics_cache.save_gcmc_state(index) + # The frame frequency was hit, so write the indices of the + # current ghost water residues to file. + if write_gcmc_ghosts: + gcmc_sampler.write_ghost_residues() + + # Get the energy at each lambda value. energies = ( dynamics._d.energy_trajectory() .to_pandas(to_alchemlyb=True, energy_unit="kcal/mol") @@ -777,33 +1065,12 @@ def _run_block( .to_numpy() ) - # Checkpoint. - if is_checkpoint or is_final_block: - # Commit the current system. - system = dynamics.commit() - - # Get the simulation speed. - speed = dynamics.time_speed() - - # Checkpoint. - with self._lock: - self._checkpoint( - system, index, block, speed, is_final_block=is_final_block - ) - - # Delete all trajectory frames from the Sire system within the - # dynamics object. - dynamics._d._sire_mols.delete_all_frames() - - _logger.info( - f"Finished block {block+1} of {self._start_block + num_blocks} " - f"for {_lam_sym} = {lam:.5f}" - ) - - if is_final_block: - _logger.success(f"{_lam_sym} = {lam:.5f} complete") - except Exception as e: + try: + # Save the energy components for debugging purposes. + self._save_energy_components(index, dynamics.context()) + except: + pass return False, index, e # Return the index and the energies. @@ -838,8 +1105,21 @@ def _minimise(self, index): _logger.info(f"Minimising at {_lam_sym} = {self._lambda_values[index]:.5f}") try: - # Get the dynamics object. - dynamics = self._dynamics_cache.get(index) + # Get the dynamics object (and GCMC sampler). + dynamics, gcmc_sampler = self._dynamics_cache.get(index) + + if gcmc_sampler is not None: + # Push the PyCUDA context on top of the stack. + gcmc_sampler.push() + + _logger.info( + f"Pre-equilibrating with GCMC moves at {_lam_sym} = {self._lambda_values[index]:.5f}" + ) + for i in range(100): + gcmc_sampler.move(dynamics.context()) + + # Remove the PyCUDA context from the stack. + gcmc_sampler.pop() # Minimise. dynamics.minimise(timeout=self._config.timeout) @@ -874,8 +1154,24 @@ def _equilibrate(self, index): _logger.info(f"Equilibrating at {_lam_sym} = {self._lambda_values[index]:.5f}") try: - # Get the dynamics object. - dynamics = self._dynamics_cache.get(index) + # Get the dynamics object (and GCMC sampler). + dynamics, gcmc_sampler = self._dynamics_cache.get(index) + + if gcmc_sampler is not None: + # Push the PyCUDA context on top of the stack. + gcmc_sampler.push() + + _logger.info( + f"Equilibrating with GCMC moves at {_lam_sym} = {self._lambda_values[index]:.5f}" + ) + for i in range(100): + gcmc_sampler.move(dynamics.context()) + + # Remove the PyCUDA context from the stack. + gcmc_sampler.pop() + + # Store the current water state. + water_state = gcmc_sampler.water_state() # Equilibrate. dynamics.run( @@ -889,8 +1185,9 @@ def _equilibrate(self, index): # Commit the system. system = dynamics.commit() - # Reset the timer to zero. - system.set_time(_sr.u("0ps")) + # Reset the timer. + if self._initial_time[index].value() != 0: + system.set_time(self._initial_time[index]) # Delete the dynamics object. self._dynamics_cache.delete(index) @@ -909,6 +1206,28 @@ def _equilibrate(self, index): # Create the production dynamics object. dynamics = system.dynamics(**dynamics_kwargs) + # Reset the GCMC water state. The dynamics object is created from + # the original Sire system, so the water state in the context does + # not match the current GCMC water state. + if gcmc_sampler is not None: + # Reset the GCMC sampler. This resets the sampling statistics and + # clears the associated OpenMM forces. This is required since a new + # context is created following equilibration, e.g. because constraints + # or the timestep are different for the production phase. + gcmc_sampler.reset() + + # Push the PyCUDA context on top of the stack. + gcmc_sampler.push() + + # Set the water state. + gcmc_sampler._set_water_state(dynamics.context()) + + # Remove the PyCUDA context from the stack. + gcmc_sampler.pop() + + # Re-bind the GCMC sampler to the dynamics object. + dynamics._d._gcmc_sampler = gcmc_sampler + # Perform minimisation at the end of equilibration only if the # timestep is increasing, or the constraint is changing. if (self._config.timestep > self._config.equilibration_timestep) or ( @@ -928,6 +1247,11 @@ def _equilibrate(self, index): ) except Exception as e: + try: + # Save the energy components for debugging purposes. + self._save_energy_components(index, dynamics.context()) + except: + pass return False, index, e return True, index, None @@ -962,7 +1286,7 @@ def _compute_energies(self, index): ) # Get the dynamics object. - dynamics = self._dynamics_cache.get(index) + dynamics, _ = self._dynamics_cache.get(index) # Create an array to hold the energies. energies = _np.zeros(self._config.num_lambda) @@ -970,14 +1294,14 @@ def _compute_energies(self, index): # Loop over the states. for i in range(self._config.num_lambda): # Set the state. - dynamics._d._omm_mols.setState(self._dynamics_cache._openmm_states[i]) + dynamics.context().setState(self._dynamics_cache._openmm_states[i]) dynamics._d._clear_state() # Compute and store the energy for this state. energies[i] = dynamics.current_potential_energy().value() # Reset the state. - dynamics._d._omm_mols.setState(self._dynamics_cache._openmm_states[index]) + dynamics.context().setState(self._dynamics_cache._openmm_states[index]) return index, energies @@ -994,15 +1318,89 @@ def _assemble_results(self, results): # Create the matrix. matrix = _np.zeros((len(results), len(results))) - # Fill the matrix. + # Fill the matrix. The energy returned by the dynamics block already + # includes the pressure and grand canonical contributions. for i, energies in results: for j, energy in enumerate(energies): - matrix[i, j] = self._beta * ( - energy + self._pressure * self._dynamics_cache._openmm_volumes[i] - ) + matrix[i, j] = self._beta * energy return matrix + def _checkpoint(self, index, lambdas, block, num_blocks, is_final_block=False): + """ + Checkpoint the simulation. + + Parameters + ---------- + + index: int + The index of the replica. + + lambdas: np.ndarray + The lambda values for each replica. + + block: int + The current block number. + + num_blocks: int + The total number of blocks in the simulation. + + is_final_block: bool + Whether this is the final block. + """ + try: + # Get the lambda value. + lam = lambdas[index] + + # Get the dynamics object (and GCMC sampler). + dynamics, gcmc_sampler = self._dynamics_cache.get(index) + + # Commit the current system. + system = dynamics.commit() + + # If performing GCMC, then we need to flag the ghost waters. + if gcmc_sampler is not None: + system = gcmc_sampler._flag_ghost_waters(system) + + # Get the simulation speed. + speed = dynamics.time_speed() + + # Call the base class checkpoint method to save the system state. + with self._lock: + super()._checkpoint( + system, index, block, speed, is_final_block=is_final_block + ) + + # Delete all trajectory frames from the Sire system within the + # dynamics object. + dynamics._d._sire_mols.delete_all_frames() + + _logger.info( + f"Finished block {block+1} of {self._start_block + num_blocks} " + f"for {_lam_sym} = {lam:.5f}" + ) + + # Log the number of waters within the GCMC sampling volume. + if gcmc_sampler is not None: + # Push the PyCUDA context on top of the stack. + gcmc_sampler.push() + + _logger.info( + f"Current number of waters in GCMC volume at {_lam_sym} = {lam:.5f} " + f"is {gcmc_sampler.num_waters()}" + ) + + # Remove the PyCUDA context from the stack. + gcmc_sampler.pop() + + if is_final_block: + _logger.success(f"{_lam_sym} = {lam:.5f} complete") + + return True, None + + except Exception as e: + return False, e + @staticmethod @_njit def _mix_replicas(num_replicas, energy_matrix, proposed, accepted): @@ -1082,6 +1480,13 @@ def _save_transition_matrix(self): else: t[i_state, i_state] = 1.0 + # Backup the existing transition matrix, if it exists. + if self._repex_matrix.exists(): + _copyfile( + self._repex_matrix, + self._repex_matrix.with_suffix(".txt.bak"), + ) + # Save the replica exchange swap acceptance matrix. _np.savetxt( self._repex_matrix, diff --git a/src/somd2/runner/_runner.py b/src/somd2/runner/_runner.py index bbad57cb..76f6b543 100644 --- a/src/somd2/runner/_runner.py +++ b/src/somd2/runner/_runner.py @@ -21,6 +21,11 @@ __all__ = ["Runner"] +from filelock import FileLock as _FileLock +from time import time as _timer + +import numpy as _np + import sire as _sr from somd2 import _logger @@ -88,22 +93,29 @@ def _create_shared_resources(self): Also intialises the list with all available GPUs. """ if self._is_gpu: - if self._config.max_gpus is None: - self._gpu_pool = self._manager.list( - self._zero_gpu_devices(self._get_gpu_devices(self._config.platform)) - ) - else: - self._gpu_pool = self._manager.list( - self._zero_gpu_devices( - self._get_gpu_devices(self._config.platform)[ - : self._config.max_gpus - ] + devices = self._get_gpu_devices( + self._config.platform, self._config.oversubscription_factor + ) + if self._config.max_gpus is not None: + if self._config.max_gpus > len(devices): + _logger.warning( + f"Requested {self._config.max_gpus} GPUs, but only {len(devices)} are available." ) + num_devices = min(len(devices), self._config.max_gpus) + else: + num_devices = len(devices) + + # Create the GPU pool from the available devices. + self._gpu_pool = self._manager.list( + self._initialise_gpu_devices( + num_devices, + self._config.oversubscription_factor, ) + ) def _update_gpu_pool(self, gpu_num): """ - Updates the GPU pool to remove the GPU that has been assigned to a worker. + Updates the GPU pool to add the GPU assigned to a worker that has finished. Parameters ---------- @@ -115,7 +127,7 @@ def _update_gpu_pool(self, gpu_num): def _remove_gpu_from_pool(self, gpu_num): """ - Removes a GPU from the GPU pool. + Removes a GPU from the GPU pool when it is assigned to a worker. Parameters ---------- @@ -126,28 +138,39 @@ def _remove_gpu_from_pool(self, gpu_num): self._gpu_pool.remove(gpu_num) @staticmethod - def _zero_gpu_devices(devices): + def _initialise_gpu_devices(num_devices, oversubscription_factor=1): """ - Set all device numbers relative to the lowest (the device number becomes - equal to its index in the list). + Create the list of avaiable GPU devices. + + Parameters + ---------- + + num_devices: int + The number of GPU devices to use. + + oversubscription_factor: int + The oversubscription factor for the GPUs. This is the number of + workers that can use a single GPU at the same time. Returns ------- - devices : [int] - List of zeroed available device numbers. + devices : [(str, int)] + List of available device numbers with oversubscription factor. """ - return [str(devices.index(value)) for value in devices] + devices = [] + for i in range(oversubscription_factor): + for j in range(num_devices): + devices.append((str(j), i)) + return devices def run(self): """ Use concurrent.futures to run lambda windows in parallel """ - from time import time - # Record the start time. - start = time() + start = _timer() # Create shared resources. self._create_shared_resources() @@ -176,8 +199,12 @@ def run(self): self._max_workers = 1 import concurrent.futures as _futures + import multiprocessing as _mp - with _futures.ProcessPoolExecutor(max_workers=self.max_workers) as executor: + success = True + with _futures.ProcessPoolExecutor( + max_workers=self.max_workers, mp_context=_mp.get_context("spawn") + ) as executor: jobs = {} for index, lambda_value in enumerate(self._lambda_values): jobs[executor.submit(self.run_window, index)] = lambda_value @@ -185,26 +212,40 @@ def run(self): for job in _futures.as_completed(jobs): lambda_value = jobs[job] try: - result = job.result() + success, time = job.result() except Exception as e: - result = False _logger.error( f"Exception raised for {_lam_sym} = {lambda_value}: {e}" ) + success = False # Kill all current and future jobs if keyboard interrupt. except KeyboardInterrupt: _logger.error("Cancelling job...") for pid in executor._processes: executor._processes[pid].terminate() + success = False - # Record the end time. - end = time() + if success: + # Record the end time. + end = _timer() - # Log the run time in minutes. - _logger.success( - f"Simulation finished. Run time: {(end - start) / 60:.2f} minutes" - ) + # Work how many fractional days the simulation took. + days = (end - start) / 86400 + + # Calculate the speed in nanoseconds per day. + speed = time.to("ns") / days + + # Log the speed. + _logger.info(f"Overall performance: {speed:.2f} ns day-1") + + # Log the run time in minutes. + _logger.success( + f"Simulation finished. Run time: {(end - start) / 60:.2f} minutes" + ) + + # Cleanup backup files. + self._cleanup() def run_window(self, index): """ @@ -221,8 +262,15 @@ def run_window(self, index): success: bool Whether the simulation was successful. + + time: sire.units.GeneralUnit + The duration of the simulation. """ + # Since this method is called in a separate process with the "spawn" + # method, we need to re-set the logger. + self._config._reset_logger(_logger) + # Get the lambda value. lambda_value = self._lambda_values[index] @@ -235,7 +283,7 @@ def run_window(self, index): _logger.success( f"{_lam_sym} = {lambda_value} already complete. Skipping." ) - return True + return True, time else: _logger.info( f"Restarting {_lam_sym} = {lambda_value} at time {time}, " @@ -248,16 +296,17 @@ def run_window(self, index): if self._is_gpu: # Get a GPU from the pool. with self._lock: - gpu_num = self._gpu_pool[0] - self._remove_gpu_from_pool(gpu_num) + gpu = self._gpu_pool[0] + gpu_num = gpu[0] + self._remove_gpu_from_pool(gpu) if lambda_value is not None: _logger.info( f"Running {_lam_sym} = {lambda_value} on GPU {gpu_num}" ) - # Run the smullation. + # Run the simulation. try: - self._run( + time = self._run( system, index, device=gpu_num, @@ -265,11 +314,12 @@ def run_window(self, index): ) with self._lock: - self._update_gpu_pool(gpu_num) + self._update_gpu_pool(gpu) except Exception as e: with self._lock: - self._update_gpu_pool(gpu_num) + self._update_gpu_pool(gpu) _logger.error(f"Error running {_lam_sym} = {lambda_value}: {e}") + return False, _sr.u("0ps") # All other platforms. else: @@ -277,11 +327,12 @@ def run_window(self, index): # Run the simulation. try: - self._run(system, index, is_restart=self._is_restart) + time = self._run(system, index, is_restart=self._is_restart) except Exception as e: _logger.error(f"Error running {_lam_sym} = {lambda_value}: {e}") + return False, _sr.u("0ps") - return True + return True, time def _run( self, system, index, device=None, lambda_minimisation=None, is_restart=False @@ -310,8 +361,8 @@ def _run( Returns ------- - df : pandas dataframe - Dataframe containing the sire energy trajectory. + time: sire.units.GeneralUnit + The duration of the simulation. """ # Get the lambda value. @@ -330,12 +381,15 @@ def _run( _logger.success( f"{_lam_sym} = {lambda_value} already complete. Skipping." ) - return + return _sr.u("0ps") # Work out the current block number. - self._start_block = int( - round(time.value() / self._config.checkpoint_frequency.value(), 12) - ) + if self._config.checkpoint_frequency.value() > 0.0: + self._start_block = int( + round(time.value() / self._config.checkpoint_frequency.value(), 12) + ) + else: + self._start_block = 0 # Subtract the current time from the runtime. time = self._config.runtime - time @@ -355,6 +409,31 @@ def generate_lam_vals(lambda_base, increment=0.001): lam_vals = [lambda_base - increment, lambda_base + increment] return lam_vals + # Prepare the GCMC sampler. + if self._config.gcmc: + _logger.info(f"Preparing GCMC sampler at {_lam_sym} = {lambda_value:.5f}") + + try: + from loch import GCMCSampler + except: + msg = "loch is not installed. GCMC sampling cannot be performed." + _logger.error(msg) + + gcmc_sampler = GCMCSampler( + system, + device=int(device), + lambda_value=lambda_value, + rest2_scale=rest2_scale, + ghost_file=self._filenames[index]["gcmc_ghosts"], + **self._gcmc_kwargs, + ) + + # Get the GCMC system. + system = gcmc_sampler.system() + + else: + gcmc_sampler = None + # Minimisation. if self._config.minimise: # Minimise with no constraints if we need to equilibrate first. @@ -379,7 +458,9 @@ def generate_lam_vals(lambda_base, increment=0.001): raise RuntimeError(f"Minimisation failed: {e}") # Equilibration. + is_equilibrated = False if self._config.equilibration_time.value() > 0.0 and not is_restart: + is_equilibrated = True try: # Run without saving energies or frames. _logger.info(f"Equilibrating at {_lam_sym} = {lambda_value:.5f}") @@ -410,6 +491,18 @@ def generate_lam_vals(lambda_base, increment=0.001): # Create the dynamics object. dynamics = system.dynamics(**dynamics_kwargs) + # Equilibrate with GCMC moves. + if gcmc_sampler is not None: + # Bind the GCMC sampler to the dynamics object. + gcmc_sampler.bind_dynamics(dynamics) + + _logger.info( + f"Equilibrating with GCMC moves at {_lam_sym} = {lambda_value:.5f}" + ) + + for i in range(100): + gcmc_sampler.move(dynamics.context()) + # Run without saving energies or frames. dynamics.run( self._config.equilibration_time, @@ -422,8 +515,9 @@ def generate_lam_vals(lambda_base, increment=0.001): # Commit the system. system = dynamics.commit() - # Reset the timer to zero. - system.set_time(_sr.u("0ps")) + # Reset the timer. + if self._initial_time[index].value() != 0: + system.set_time(self._initial_time[index]) # Perform minimisation at the end of equilibration only if the # timestep is increasing, or the constraint is changing. @@ -431,7 +525,7 @@ def generate_lam_vals(lambda_base, increment=0.001): not self._config.equilibration_constraints and self._config.perturbable_constraint != "none" ): - self._minimisation( + system = self._minimisation( system, lambda_value=lambda_value, rest2_scale=rest2_scale, @@ -440,6 +534,10 @@ def generate_lam_vals(lambda_base, increment=0.001): perturbable_constraint=self._config.perturbable_constraint, ) except Exception as e: + try: + self._save_energy_components(index, dynamics.context()) + except: + pass raise RuntimeError(f"Equilibration failed: {e}") # Work out the lambda values for finite-difference gradient analysis. @@ -489,6 +587,47 @@ def generate_lam_vals(lambda_base, increment=0.001): # Create the dynamics object. dynamics = system.dynamics(**dynamics_kwargs) + # Reset the GCMC sampler. This resets the sampling statistics and clears + # the associated OpenMM forces. + if gcmc_sampler is not None: + gcmc_sampler.reset() + + # Bind the GCMC sampler to the dynamics object. + gcmc_sampler.bind_dynamics(dynamics) + + # If this is a restart, then we need to reset the GCMC water state + # to match that of the restart system. + if self._is_restart: + from openmm.unit import angstrom + + # First set all waters to non-ghosts. + gcmc_sampler._set_water_state( + dynamics.context(), + states=_np.ones(len(gcmc_sampler._water_indices)), + force=True, + ) + + # Now set the ghost waters. + gcmc_sampler._set_water_state( + dynamics.context(), + self._restart_ghost_waters[index], + states=_np.zeros(len(gcmc_sampler._water_indices)), + force=True, + ) + + # Finally, reset the context positions to match the restart system. + dynamics.context().setPositions( + self._restart_positions[index] * angstrom + ) + # Otherwise, if we've performed equilibration, then we need to reset + # the water state in the new context to match the equilibrated system. + elif is_equilibrated: + # Reset the water state. + gcmc_sampler._set_water_state( + dynamics.context(), + force=True, + ) + # Set the number of neighbours used for the energy calculation. # If not None, then we add one to account for the extra windows # used for finite-difference gradient analysis. @@ -497,6 +636,9 @@ def generate_lam_vals(lambda_base, increment=0.001): else: num_energy_neighbours = None + # Store the checkpoint time in nanoseconds. + checkpoint_interval = self._config.checkpoint_frequency.to("ns") + # Run the simulation, checkpointing in blocks. if self._config.checkpoint_frequency.value() > 0.0: @@ -511,25 +653,84 @@ def generate_lam_vals(lambda_base, increment=0.001): num_blocks = int(frac) rem = round(frac - num_blocks, 12) + # Store the star time. + start = _timer() + # Run the dynamics in blocks. for block in range(int(num_blocks)): # Add the start block number. block += self._start_block + # Record the start time. + block_start = _timer() + # Run the dynamics. try: - dynamics.run( - self._config.checkpoint_frequency, - energy_frequency=self._config.energy_frequency, - frame_frequency=self._config.frame_frequency, - lambda_windows=lambda_array, - rest2_scale_factors=rest2_scale_factors, - save_velocities=self._config.save_velocities, - auto_fix_minimise=True, - num_energy_neighbours=num_energy_neighbours, - null_energy=self._config.null_energy, - ) + # GCMC specific handling. Note that the frame and checkpoint + # frequencies are multiples of the energy frequency so we can + # run in energy frequency blocks with no remainder. + if self._config.gcmc: + # Initialise the run time and time at which the next frame is saved. + runtime = _sr.u("0ps") + save_frames = self._config.frame_frequency > 0 + next_frame = self._config.frame_frequency + + # Loop until we reach the runtime. + while runtime <= self._config.checkpoint_frequency: + # Run the dynamics in blocks of the energy frequency. + dynamics.run( + self._config.energy_frequency, + energy_frequency=self._config.energy_frequency, + frame_frequency=self._config.frame_frequency, + lambda_windows=lambda_array, + rest2_scale_factors=rest2_scale_factors, + save_velocities=self._config.save_velocities, + auto_fix_minimise=True, + num_energy_neighbours=num_energy_neighbours, + null_energy=self._config.null_energy, + # GCMC specific options. + excess_chemical_potential=( + self._mu_ex if gcmc_sampler is not None else None + ), + num_waters=( + _np.sum(gcmc_sampler.water_state()) + if gcmc_sampler is not None + else None + ), + ) + + # Perform a GCMC move. + _logger.info( + f"Performing GCMC move at {_lam_sym} = {lambda_value:.5f}" + ) + gcmc_sampler.move(dynamics.context()) + + # Update the runtime. + runtime += self._config.energy_frequency + + # If a frame is saved, then we need to save current indices + # of the ghost water residues. + if save_frames and runtime >= next_frame: + gcmc_sampler.write_ghost_residues() + next_frame += self._config.frame_frequency + + else: + dynamics.run( + self._config.checkpoint_frequency, + energy_frequency=self._config.energy_frequency, + frame_frequency=self._config.frame_frequency, + lambda_windows=lambda_array, + rest2_scale_factors=rest2_scale_factors, + save_velocities=self._config.save_velocities, + auto_fix_minimise=True, + num_energy_neighbours=num_energy_neighbours, + null_energy=self._config.null_energy, + ) except Exception as e: + try: + self._save_energy_components(index, dynamics.context()) + except: + pass raise RuntimeError( f"Dynamics block {block+1} for {_lam_sym} = {lambda_value:.5f} failed: {e}" ) @@ -543,24 +744,43 @@ def generate_lam_vals(lambda_base, increment=0.001): # Commit the current system. system = dynamics.commit() - # Get the simulation speed. - speed = dynamics.time_speed() + # If performing GCMC, then we need to flag the ghost waters. + if gcmc_sampler is not None: + system = gcmc_sampler._flag_ghost_waters(system) + + # Record the end time. + block_end = _timer() + + # Work how many fractional days the block took. + block_time = (block_end - block_start) / 86400 + + # Calculate the speed in nanoseconds per day. + speed = checkpoint_interval / block_time # Check if this is the final block. is_final_block = ( block - self._start_block ) == num_blocks - 1 and rem == 0 - # Checkpoint. - self._checkpoint( - system, - index, - block, - speed, - lambda_energy=lambda_energy, - lambda_grad=lambda_grad, - is_final_block=is_final_block, - ) + # Create the lock. + lock = _FileLock(self._lock_file) + + # Acquire the file lock to ensure that the checkpoint files are + # in a consistent state if read by another process. + with lock.acquire(timeout=self._config.timeout.to("seconds")): + # Backup any existing checkpoint files. + self._backup_checkpoint(index) + + # Write the checkpoint files. + self._checkpoint( + system, + index, + block, + speed, + lambda_energy=lambda_energy, + lambda_grad=lambda_grad, + is_final_block=is_final_block, + ) # Delete all trajectory frames from the Sire system within the # dynamics object. @@ -571,6 +791,13 @@ def generate_lam_vals(lambda_base, increment=0.001): f"for {_lam_sym} = {lambda_value:.5f}" ) + # Log the number of waters within the GCMC sampling volume. + if gcmc_sampler is not None: + _logger.info( + f"Current number of waters in GCMC volume at {_lam_sym} = {lambda_value:.5f} " + f"is {gcmc_sampler.num_waters()}" + ) + if is_final_block: _logger.success( f"{_lam_sym} = {lambda_value:.5f} complete, speed = {speed:.2f} ns day-1" @@ -580,9 +807,10 @@ def generate_lam_vals(lambda_base, increment=0.001): f"Checkpoint failed for {_lam_sym} = {lambda_value:.5f}: {e}" ) - # Handle the remainder time. + # Handle the remainder time. (There will be no remainer when GCMC sampling.) if rem > 0: block += 1 + block_start = _timer() try: dynamics.run( rem, @@ -603,19 +831,30 @@ def generate_lam_vals(lambda_base, increment=0.001): # Commit the current system. system = dynamics.commit() - # Get the simulation speed. - speed = dynamics.time_speed() - - # Checkpoint. - self._checkpoint( - system, - index, - block, - speed, - lambda_energy=lambda_energy, - lambda_grad=lambda_grad, - is_final_block=True, - ) + # Record the end time. + block_end = _timer() + + # Work how many fractional days the block took. + block_time = (block_end - block_start) / 86400 + + # Calculate the speed in nanoseconds per day. + speed = checkpoint_interval / block_time + + # Create the lock. + lock = _FileLock(self._lock_file) + + # Acquire the file lock to ensure that the checkpoint files are + # in a consistent state if read by another process. + with lock.acquire(timeout=self._config.timeout.to("seconds")): + self._checkpoint( + system, + index, + block, + speed, + lambda_energy=lambda_energy, + lambda_grad=lambda_grad, + is_final_block=True, + ) # Delete all trajectory frames from the Sire system within the # dynamics object. @@ -630,23 +869,67 @@ def generate_lam_vals(lambda_base, increment=0.001): f"{_lam_sym} = {lambda_value:.5f} complete, speed = {speed:.2f} ns day-1" ) except Exception as e: + try: + self._save_energy_components(index, dynamics.context()) + except: + pass raise RuntimeError( f"Final dynamics block for {lam_sym} = {lambda_value:.5f} failed: {e}" ) else: try: - dynamics.run( - time, - energy_frequency=self._config.energy_frequency, - frame_frequency=self._config.frame_frequency, - lambda_windows=lambda_array, - rest2_scale_factors=rest2_scale_factors, - save_velocities=self._config.save_velocities, - auto_fix_minimise=True, - num_energy_neighbours=num_energy_neighbours, - null_energy=self._config.null_energy, - ) + if gcmc_sampler is not None: + # Initialise the run time and time at which the next frame is saved. + runtime = _sr.u("0ps") + save_frames = self._config.frame_frequency > 0 + next_frame = self._config.frame_frequency + + # Loop until we reach the runtime. + while runtime <= time: + # Run the dynamics in blocks of the energy frequency. + dynamics.run( + self._config.energy_frequency, + energy_frequency=self._config.energy_frequency, + frame_frequency=self._config.frame_frequency, + lambda_windows=lambda_array, + rest2_scale_factors=rest2_scale_factors, + save_velocities=self._config.save_velocities, + auto_fix_minimise=True, + num_energy_neighbours=num_energy_neighbours, + null_energy=self._config.null_energy, + ) + + # Perform a GCMC move. + _logger.info( + f"Performing GCMC move at {_lam_sym} = {lambda_value:.5f}" + ) + gcmc_sampler.move(dynamics.context()) + + # Update the runtime. + runtime += self._config.energy_frequency + + # If a frame is saved, then we need to save current indices + # of the ghost water residues. + if save_frames and runtime >= next_frame: + gcmc_sampler.write_ghost_residues() + next_frame += self._config.frame_frequency + else: + dynamics.run( + time, + energy_frequency=self._config.energy_frequency, + frame_frequency=self._config.frame_frequency, + lambda_windows=lambda_array, + rest2_scale_factors=rest2_scale_factors, + save_velocities=self._config.save_velocities, + auto_fix_minimise=True, + num_energy_neighbours=num_energy_neighbours, + null_energy=self._config.null_energy, + ) except Exception as e: + try: + self._save_energy_components(index, dynamics.context()) + except: + pass raise RuntimeError( f"Dynamics for {_lam_sym} = {lambda_value:.5f} failed: {e}" ) @@ -654,8 +937,15 @@ def generate_lam_vals(lambda_base, increment=0.001): # Commit the current system. system = dynamics.commit() - # Get the simulation speed. - speed = dynamics.time_speed() + # Record the end time. + end = _timer() + + # Work how many fractional days the simulation took. + days = (end - start) / 86400 + + # Calculate the speed in nanoseconds per day. + speed = time.to("ns") / days + # Checkpoint. self._checkpoint( system, @@ -663,14 +953,16 @@ def generate_lam_vals(lambda_base, increment=0.001): 0, speed, is_final_block=True, - lambda_grad=lambda_grad, lambda_energy=lambda_energy, + lambda_grad=lambda_grad, ) _logger.success( f"{_lam_sym} = {lambda_value:.5f} complete, speed = {speed:.2f} ns day-1" ) + return time + def _minimisation( self, system, @@ -703,6 +995,12 @@ def _minimisation( perturbable_constraint: str The constraint for perturbable molecules. + + Returns + ------- + + system: :class: `System ` + The minimised system. """ _logger.info(f"Minimising at {_lam_sym} = {lambda_value:.5f}") diff --git a/tests/_utils/test_ghosts.py b/tests/_utils/test_ghosts.py deleted file mode 100644 index 8eabfbbe..00000000 --- a/tests/_utils/test_ghosts.py +++ /dev/null @@ -1,324 +0,0 @@ -import sire as sr - -from somd2._utils._ghosts import boresch - - -def test_hexane_to_propane(): - """ - Test ghost atom modifications for hexane to propane. This has a terminal - junction at lambda = 1. - """ - - # Load the system. - mols = sr.load_test_files("hex2prp.s3") - - # Store the orginal angles and dihedrals at lambda = 1. - angles = mols[0].property("angle1") - dihedrals = mols[0].property("dihedral1") - - # Apply the ghost atom modifications. - new_mols = boresch(mols) - - # Get the new angles and dihedrals. - new_angles = new_mols[0].property("angle1") - new_dihedrals = new_mols[0].property("dihedral1") - - # No angles should be removed. - assert angles.num_functions() == new_angles.num_functions() - - # Six dihedrals should be removed. - assert dihedrals.num_functions() - 6 == new_dihedrals.num_functions() - - # Create dihedral IDs for the missing dihedrals. - - from sire.legacy.Mol import AtomIdx - - missing_dihedrals = [ - (AtomIdx(4), AtomIdx(3), AtomIdx(2), AtomIdx(11)), - (AtomIdx(4), AtomIdx(3), AtomIdx(2), AtomIdx(12)), - (AtomIdx(11), AtomIdx(2), AtomIdx(3), AtomIdx(13)), - (AtomIdx(11), AtomIdx(2), AtomIdx(3), AtomIdx(14)), - (AtomIdx(12), AtomIdx(2), AtomIdx(3), AtomIdx(14)), - (AtomIdx(12), AtomIdx(2), AtomIdx(3), AtomIdx(13)), - ] - - # Store the molecular info. - info = mols[0].info() - - # Check that the missing dihedrals are in the original dihedrals. - assert ( - all( - check_dihedral(info, dihedrals.potentials(), *dihedral) - for dihedral in missing_dihedrals - ) - == True - ) - - # Check that the missing dihedrals are not the new dihedrals. - assert ( - all( - check_dihedral(info, new_dihedrals.potentials(), *dihedral) - for dihedral in missing_dihedrals - ) - == False - ) - - -def test_toluene_to_pyridine(): - """ - Test ghost atom modifications for toluene to pyridine. This has a dual - junction with a single branch at lambda = 1. - """ - - # Load the system. - mols = sr.load_test_files("tol2pyr.s3") - - # Store the orginal angles and dihedrals at lambda = 1. - angles = mols[0].property("angle1") - dihedrals = mols[0].property("dihedral1") - - # Apply the ghost atom modifications. - new_mols = boresch(mols) - - # Get the new angles and dihedrals. - new_angles = new_mols[0].property("angle1") - new_dihedrals = new_mols[0].property("dihedral1") - - # The number of angles should remain the same. - assert angles.num_functions() == new_angles.num_functions() - - # There should be seven fewer dihedrals. - assert dihedrals.num_functions() - 7 == new_dihedrals.num_functions() - - # Create dihedral IDs for the missing dihedrals. - - from sire.legacy.Mol import AtomIdx - - missing_dihedrals = [ - (AtomIdx(0), AtomIdx(1), AtomIdx(2), AtomIdx(3)), - (AtomIdx(0), AtomIdx(1), AtomIdx(2), AtomIdx(10)), - (AtomIdx(0), AtomIdx(1), AtomIdx(6), AtomIdx(5)), - (AtomIdx(0), AtomIdx(1), AtomIdx(6), AtomIdx(14)), - (AtomIdx(6), AtomIdx(1), AtomIdx(0), AtomIdx(7)), - (AtomIdx(6), AtomIdx(1), AtomIdx(0), AtomIdx(8)), - (AtomIdx(6), AtomIdx(1), AtomIdx(0), AtomIdx(9)), - ] - - # Store the molecular info. - info = mols[0].info() - - # Check that the missing dihedrals are in the original dihedrals. - assert ( - all( - check_dihedral(info, dihedrals.potentials(), *dihedral) - for dihedral in missing_dihedrals - ) - == True - ) - - # Check that the missing dihedrals are not in the new dihedrals. - assert ( - all( - check_dihedral(info, new_dihedrals.potentials(), *dihedral) - for dihedral in missing_dihedrals - ) - == False - ) - - # Create a list of angle IDs for the modified angles. - modified_angles = [ - (AtomIdx(0), AtomIdx(1), AtomIdx(2)), - (AtomIdx(0), AtomIdx(1), AtomIdx(6)), - ] - - # Functional form of the modified angles. - expression = "100 [theta - 1.5708]^2" - - # Check that the original angles don't have the modified functional form. - for p in angles.potentials(): - idx0 = info.atom_idx(p.atom0()) - idx1 = info.atom_idx(p.atom1()) - idx2 = info.atom_idx(p.atom2()) - - if (idx0, idx1, idx2) in modified_angles: - assert str(p.function()) != expression - - # Check that the modified angles have the correct functional form. - for p in new_angles.potentials(): - idx0 = info.atom_idx(p.atom0()) - idx1 = info.atom_idx(p.atom1()) - idx2 = info.atom_idx(p.atom2()) - - if (idx0, idx1, idx2) in modified_angles: - assert str(p.function()) == expression - - -def test_acetone_to_propenol(): - """ - Test ghost atom modifications for acetone to propenol. This is a more - complex perturbation with a terminal junction at lambda = 0 and a planar - triple junction at lambda = 1. - """ - - # Load the system. - mols = sr.load_test_files("acepol.s3") - - # Store the orginal angles and dihedrals at lambda = 0 and lambda = 1. - angles0 = mols[0].property("angle0") - angles1 = mols[0].property("angle1") - dihedrals0 = mols[0].property("dihedral0") - dihedrals1 = mols[0].property("dihedral1") - - # Apply the ghost atom modifications. - new_mols = boresch(mols) - - # Get the new angles and dihedrals. - new_angles0 = new_mols[0].property("angle0") - new_angles1 = new_mols[0].property("angle1") - new_dihedrals0 = new_mols[0].property("dihedral0") - new_dihedrals1 = new_mols[0].property("dihedral1") - - # The number of angles should remain the same at lambda = 0. - assert angles0.num_functions() == new_angles0.num_functions() - - # The number of dihedrals should be one fewer at lambda = 0. - assert dihedrals0.num_functions() - 1 == new_dihedrals0.num_functions() - - # The number of angles should be one fewer at lambda = 1. - assert angles1.num_functions() - 1 == new_angles1.num_functions() - - # The number of dihedrals should be two fewer at lambda = 1. - assert dihedrals1.num_functions() - 2 == new_dihedrals1.num_functions() - - # Create dihedral IDs for the missing dihedrals at lambda = 0. - - from sire.legacy.Mol import AtomIdx - - missing_dihedrals0 = [ - (AtomIdx(8), AtomIdx(3), AtomIdx(9), AtomIdx(10)), - ] - - # Store the molecular info. - info = mols[0].info() - - # Check that the missing dihedrals are in the original dihedrals at lambda = 0. - assert ( - all( - check_dihedral(info, dihedrals0.potentials(), *dihedral) - for dihedral in missing_dihedrals0 - ) - == True - ) - - # Check that the missing dihedrals are not in the new dihedrals at lambda = 0. - assert ( - all( - check_dihedral(info, new_dihedrals0.potentials(), *dihedral) - for dihedral in missing_dihedrals0 - ) - == False - ) - - # Create dihedral IDs for the missing dihedrals at lambda = 1. - missing_dihedrals1 = [ - (AtomIdx(0), AtomIdx(1), AtomIdx(3), AtomIdx(7)), - (AtomIdx(2), AtomIdx(1), AtomIdx(3), AtomIdx(7)), - ] - - # Check that the missing dihedrals are in the original dihedrals at lambda = 1. - assert ( - all( - check_dihedral(info, dihedrals1.potentials(), *dihedral) - for dihedral in missing_dihedrals1 - ) - == True - ) - - # Check that the missing dihedrals are not in the new dihedrals at lambda = 1. - assert ( - all( - check_dihedral(info, new_dihedrals1.potentials(), *dihedral) - for dihedral in missing_dihedrals1 - ) - == False - ) - - # Create angle IDs for the removed angles at lambda = 1. - removed_angles = [ - (AtomIdx(1), AtomIdx(3), AtomIdx(7)), - ] - - # Check that the removed angles are in the original angles at lambda = 1. - assert ( - all(check_angle(info, angles1.potentials(), *angle) for angle in removed_angles) - == True - ) - - # Check that the removed angles are not in the new angles at lambda = 1. - assert ( - all( - check_angle(info, new_angles1.potentials(), *angle) - for angle in removed_angles - ) - == False - ) - - # Create angle IDs for the modified angles at lambda = 1. - modified_angles = [ - (AtomIdx(7), AtomIdx(3), AtomIdx(8)), - (AtomIdx(7), AtomIdx(3), AtomIdx(9)), - ] - - # Functional form of the modified angles. - expression = "100 [theta - 1.5708]^2" - - # Check that the original angles don't have the modified functional form. - for p in angles1.potentials(): - idx0 = info.atom_idx(p.atom0()) - idx1 = info.atom_idx(p.atom1()) - idx2 = info.atom_idx(p.atom2()) - - if (idx0, idx1, idx2) in modified_angles: - assert str(p.function()) != expression - - # Check that the modified angles have the correct functional form. - for p in new_angles1.potentials(): - idx0 = info.atom_idx(p.atom0()) - idx1 = info.atom_idx(p.atom1()) - idx2 = info.atom_idx(p.atom2()) - - if (idx0, idx1, idx2) in modified_angles: - assert str(p.function()) == expression - - -def check_angle(info, potentials, idx0, idx1, idx2): - """ - Check if an angle potential is in a list of potentials. - """ - - for p in potentials: - if ( - idx0 == info.atom_idx(p.atom0()) - and idx1 == info.atom_idx(p.atom1()) - and idx2 == info.atom_idx(p.atom2()) - ): - return True - - return False - - -def check_dihedral(info, potentials, idx0, idx1, idx2, idx3): - """ - Check if a dihedral potential is in a list of potentials. - """ - - for p in potentials: - if ( - idx0 == info.atom_idx(p.atom0()) - and idx1 == info.atom_idx(p.atom1()) - and idx2 == info.atom_idx(p.atom2()) - and idx3 == info.atom_idx(p.atom3()) - ): - return True - - return False diff --git a/tests/runner/test_lambda_values.py b/tests/runner/test_lambda_values.py index c19f5b28..ec6267f1 100644 --- a/tests/runner/test_lambda_values.py +++ b/tests/runner/test_lambda_values.py @@ -41,9 +41,13 @@ def test_lambda_values(ethane_methanol): Path(tmpdir) / "energy_traj_0.00000.parquet" ) - # Make sure the lambda_array in the metadata is correct. This is the - # lambda_values list in the config. - assert meta["lambda_array"] == [0.0, 0.5, 1.0] + # Make sure the energy trajectory has the expected columns. + cols = energy_traj.columns + found = 0 + for col in cols: + if col in config["lambda_values"]: + found += 1 + assert found == len(config["lambda_values"]) # Make sure the second dimension of the energy trajectory is the correct # size. This is one for the current lambda value, one for its gradient, @@ -84,9 +88,13 @@ def test_lambda_energy(ethane_methanol): Path(tmpdir) / "energy_traj_0.00000.parquet" ) - # Make sure the lambda_array in the metadata is correct. This is the - # sampled lambda_values plus the lambda_energy values in the config. - assert meta["lambda_array"] == [0.0, 0.5, 1.0] + # Make sure the energy trajectory has the expected columns. + cols = energy_traj.columns + found = 0 + for col in cols: + if col in config["lambda_values"] or col in config["lambda_energy"]: + found += 1 + assert found == len(config["lambda_values"]) + len(config["lambda_energy"]) # Make sure the second dimension of the energy trajectory is the correct. # This is the sampled lambda values, i.e. unique entries from lambda_values