Coverage for /builds/hweiske/ase/ase/optimize/optimize.py: 94.38%
178 statements
« prev ^ index » next coverage.py v7.2.7, created at 2024-04-22 11:22 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2024-04-22 11:22 +0000
1"""Structure optimization. """
2import time
3import warnings
4from collections.abc import Callable
5from math import sqrt
6from os.path import isfile
7from typing import IO, Any, Dict, List, Optional, Tuple, Union
9from ase import Atoms
10from ase.calculators.calculator import PropertyNotImplementedError
11from ase.filters import UnitCellFilter
12from ase.parallel import world
13from ase.utils import IOContext, lazyproperty
14from ase.utils.abc import Optimizable
16DEFAULT_MAX_STEPS = 100_000_000
19class RestartError(RuntimeError):
20 pass
23class OptimizableAtoms(Optimizable):
24 def __init__(self, atoms):
25 self.atoms = atoms
27 def get_positions(self):
28 return self.atoms.get_positions()
30 def set_positions(self, positions):
31 self.atoms.set_positions(positions)
33 def get_forces(self):
34 return self.atoms.get_forces()
36 @lazyproperty
37 def _use_force_consistent_energy(self):
38 # This boolean is in principle invalidated if the
39 # calculator changes. This can lead to weird things
40 # in multi-step optimizations.
41 try:
42 self.atoms.get_potential_energy(force_consistent=True)
43 except PropertyNotImplementedError:
44 # warnings.warn(
45 # 'Could not get force consistent energy (\'free_energy\'). '
46 # 'Please make sure calculator provides \'free_energy\', even '
47 # 'if equal to the ordinary energy. '
48 # 'This will raise an error in future versions of ASE.',
49 # FutureWarning)
50 return False
51 else:
52 return True
54 def get_potential_energy(self):
55 force_consistent = self._use_force_consistent_energy
56 return self.atoms.get_potential_energy(
57 force_consistent=force_consistent)
59 def iterimages(self):
60 # XXX document purpose of iterimages
61 return self.atoms.iterimages()
63 def __len__(self):
64 # TODO: return 3 * len(self.atoms), because we want the length
65 # of this to be the number of DOFs
66 return len(self.atoms)
69class Dynamics(IOContext):
70 """Base-class for all MD and structure optimization classes."""
72 def __init__(
73 self,
74 atoms: Atoms,
75 logfile: Optional[Union[IO, str]] = None,
76 trajectory: Optional[str] = None,
77 append_trajectory: bool = False,
78 master: Optional[bool] = None,
79 comm=world,
80 ):
81 """Dynamics object.
83 Parameters:
85 atoms: Atoms object
86 The Atoms object to operate on.
88 logfile: file object or str
89 If *logfile* is a string, a file with that name will be opened.
90 Use '-' for stdout.
92 trajectory: Trajectory object or str
93 Attach trajectory object. If *trajectory* is a string a
94 Trajectory will be constructed. Use *None* for no
95 trajectory.
97 append_trajectory: boolean
98 Defaults to False, which causes the trajectory file to be
99 overwriten each time the dynamics is restarted from scratch.
100 If True, the new structures are appended to the trajectory
101 file instead.
103 master: boolean
104 Defaults to None, which causes only rank 0 to save files. If set to
105 true, this rank will save files.
107 comm: Communicator object
108 Communicator to handle parallel file reading and writing.
109 """
111 self.atoms = atoms
112 self.optimizable = atoms.__ase_optimizable__()
113 self.logfile = self.openfile(file=logfile, comm=comm, mode='a')
114 self.observers: List[Tuple[Callable, int, Tuple, Dict[str, Any]]] = []
115 self.nsteps = 0
116 self.max_steps = 0 # to be updated in run or irun
117 self.comm = comm
119 if trajectory is not None:
120 if isinstance(trajectory, str):
121 from ase.io.trajectory import Trajectory
122 mode = "a" if append_trajectory else "w"
123 trajectory = self.closelater(Trajectory(
124 trajectory, mode=mode, master=master, comm=comm
125 ))
126 self.attach(trajectory, atoms=self.optimizable)
128 self.trajectory = trajectory
130 def todict(self) -> Dict[str, Any]:
131 raise NotImplementedError
133 def get_number_of_steps(self):
134 return self.nsteps
136 def insert_observer(
137 self, function, position=0, interval=1, *args, **kwargs
138 ):
139 """Insert an observer.
141 This can be used for pre-processing before logging and dumping.
143 Examples
144 --------
145 >>> from ase.build import bulk
146 >>> from ase.calculators.emt import EMT
147 >>> from ase.optimize import BFGS
148 ...
149 ...
150 >>> def update_info(atoms, opt):
151 ... atoms.info["nsteps"] = opt.nsteps
152 ...
153 ...
154 >>> atoms = bulk("Cu", cubic=True) * 2
155 >>> atoms.rattle()
156 >>> atoms.calc = EMT()
157 >>> with BFGS(atoms, logfile=None, trajectory="opt.traj") as opt:
158 ... opt.insert_observer(update_info, atoms=atoms, opt=opt)
159 ... opt.run(fmax=0.05, steps=10)
160 True
161 """
162 if not isinstance(function, Callable):
163 function = function.write
164 self.observers.insert(position, (function, interval, args, kwargs))
166 def attach(self, function, interval=1, *args, **kwargs):
167 """Attach callback function.
169 If *interval > 0*, at every *interval* steps, call *function* with
170 arguments *args* and keyword arguments *kwargs*.
172 If *interval <= 0*, after step *interval*, call *function* with
173 arguments *args* and keyword arguments *kwargs*. This is
174 currently zero indexed."""
176 if hasattr(function, "set_description"):
177 d = self.todict()
178 d.update(interval=interval)
179 function.set_description(d)
180 if not isinstance(function, Callable):
181 function = function.write
182 self.observers.append((function, interval, args, kwargs))
184 def call_observers(self):
185 for function, interval, args, kwargs in self.observers:
186 call = False
187 # Call every interval iterations
188 if interval > 0:
189 if (self.nsteps % interval) == 0:
190 call = True
191 # Call only on iteration interval
192 elif interval <= 0:
193 if self.nsteps == abs(interval):
194 call = True
195 if call:
196 function(*args, **kwargs)
198 def irun(self, steps=DEFAULT_MAX_STEPS):
199 """Run dynamics algorithm as generator.
201 Parameters
202 ----------
203 steps : int, default=DEFAULT_MAX_STEPS
204 Number of dynamics steps to be run.
206 Yields
207 ------
208 converged : bool
209 True if the forces on atoms are converged.
211 Examples
212 --------
213 This method allows, e.g., to run two optimizers or MD thermostats at
214 the same time.
215 >>> opt1 = BFGS(atoms)
216 >>> opt2 = BFGS(StrainFilter(atoms)).irun()
217 >>> for _ in opt2:
218 ... opt1.run()
219 """
221 # update the maximum number of steps
222 self.max_steps = self.nsteps + steps
224 # compute the initial step
225 self.optimizable.get_forces()
227 # log the initial step
228 if self.nsteps == 0:
229 self.log()
231 # we write a trajectory file if it is None
232 if self.trajectory is None:
233 self.call_observers()
234 # We do not write on restart w/ an existing trajectory file
235 # present. This duplicates the same entry twice
236 elif len(self.trajectory) == 0:
237 self.call_observers()
239 # check convergence
240 is_converged = self.converged()
241 yield is_converged
243 # run the algorithm until converged or max_steps reached
244 while not is_converged and self.nsteps < self.max_steps:
245 # compute the next step
246 self.step()
247 self.nsteps += 1
249 # log the step
250 self.log()
251 self.call_observers()
253 # check convergence
254 is_converged = self.converged()
255 yield is_converged
257 def run(self, steps=DEFAULT_MAX_STEPS):
258 """Run dynamics algorithm.
260 This method will return when the forces on all individual
261 atoms are less than *fmax* or when the number of steps exceeds
262 *steps*.
264 Parameters
265 ----------
266 steps : int, default=DEFAULT_MAX_STEPS
267 Number of dynamics steps to be run.
269 Returns
270 -------
271 converged : bool
272 True if the forces on atoms are converged.
273 """
275 for converged in Dynamics.irun(self, steps=steps):
276 pass
277 return converged
279 def converged(self):
280 """" a dummy function as placeholder for a real criterion, e.g. in
281 Optimizer """
282 return False
284 def log(self, *args):
285 """ a dummy function as placeholder for a real logger, e.g. in
286 Optimizer """
287 return True
289 def step(self):
290 """this needs to be implemented by subclasses"""
291 raise RuntimeError("step not implemented.")
294class Optimizer(Dynamics):
295 """Base-class for all structure optimization classes."""
297 # default maxstep for all optimizers
298 defaults = {'maxstep': 0.2}
299 _deprecated = object()
301 def __init__(
302 self,
303 atoms: Atoms,
304 restart: Optional[str] = None,
305 logfile: Optional[Union[IO, str]] = None,
306 trajectory: Optional[str] = None,
307 master: Optional[bool] = None,
308 comm=world,
309 append_trajectory: bool = False,
310 force_consistent=_deprecated,
311 ):
312 """Structure optimizer object.
314 Parameters:
316 atoms: Atoms object
317 The Atoms object to relax.
319 restart: str
320 Filename for restart file. Default value is *None*.
322 logfile: file object or str
323 If *logfile* is a string, a file with that name will be opened.
324 Use '-' for stdout.
326 trajectory: Trajectory object or str
327 Attach trajectory object. If *trajectory* is a string a
328 Trajectory will be constructed. Use *None* for no
329 trajectory.
331 master: boolean
332 Defaults to None, which causes only rank 0 to save files. If
333 set to true, this rank will save files.
335 comm: Communicator object
336 Communicator to handle parallel file reading and writing.
338 append_trajectory: boolean
339 Appended to the trajectory file instead of overwriting it.
341 force_consistent: boolean or None
342 Use force-consistent energy calls (as opposed to the energy
343 extrapolated to 0 K). If force_consistent=None, uses
344 force-consistent energies if available in the calculator, but
345 falls back to force_consistent=False if not.
346 """
347 self.check_deprecated(force_consistent)
349 super().__init__(
350 atoms=atoms,
351 logfile=logfile,
352 trajectory=trajectory,
353 append_trajectory=append_trajectory,
354 master=master,
355 comm=comm)
357 self.restart = restart
359 self.fmax = None
361 if restart is None or not isfile(restart):
362 self.initialize()
363 else:
364 self.read()
365 self.comm.barrier()
367 @classmethod
368 def check_deprecated(cls, force_consistent):
369 if force_consistent is cls._deprecated:
370 return False
372 warnings.warn(
373 'force_consistent keyword is deprecated and will '
374 'be ignored. This will raise an error in future versions '
375 'of ASE.',
376 FutureWarning)
378 def read(self):
379 raise NotImplementedError
381 def todict(self):
382 description = {
383 "type": "optimization",
384 "optimizer": self.__class__.__name__,
385 }
386 # add custom attributes from subclasses
387 for attr in ('maxstep', 'alpha', 'max_steps', 'restart'):
388 if hasattr(self, attr):
389 description.update({attr: getattr(self, attr)})
390 return description
392 def initialize(self):
393 pass
395 def irun(self, fmax=0.05, steps=DEFAULT_MAX_STEPS):
396 """Run optimizer as generator.
398 Parameters
399 ----------
400 fmax : float
401 Convergence criterion of the forces on atoms.
402 steps : int, default=DEFAULT_MAX_STEPS
403 Number of optimizer steps to be run.
405 Yields
406 ------
407 converged : bool
408 True if the forces on atoms are converged.
409 """
410 self.fmax = fmax
411 return Dynamics.irun(self, steps=steps)
413 def run(self, fmax=0.05, steps=DEFAULT_MAX_STEPS):
414 """Run optimizer.
416 Parameters
417 ----------
418 fmax : float
419 Convergence criterion of the forces on atoms.
420 steps : int, default=DEFAULT_MAX_STEPS
421 Number of optimizer steps to be run.
423 Returns
424 -------
425 converged : bool
426 True if the forces on atoms are converged.
427 """
428 self.fmax = fmax
429 return Dynamics.run(self, steps=steps)
431 def converged(self, forces=None):
432 """Did the optimization converge?"""
433 if forces is None:
434 forces = self.optimizable.get_forces()
435 return self.optimizable.converged(forces, self.fmax)
437 def log(self, forces=None):
438 if forces is None:
439 forces = self.optimizable.get_forces()
440 fmax = sqrt((forces ** 2).sum(axis=1).max())
441 e = self.optimizable.get_potential_energy()
442 T = time.localtime()
443 if self.logfile is not None:
444 name = self.__class__.__name__
445 if self.nsteps == 0:
446 args = (" " * len(name), "Step", "Time", "Energy", "fmax")
447 msg = "%s %4s %8s %15s %12s\n" % args
448 self.logfile.write(msg)
450 args = (name, self.nsteps, T[3], T[4], T[5], e, fmax)
451 msg = "%s: %3d %02d:%02d:%02d %15.6f %15.6f\n" % args
452 self.logfile.write(msg)
453 self.logfile.flush()
455 def dump(self, data):
456 from ase.io.jsonio import write_json
457 if self.comm.rank == 0 and self.restart is not None:
458 with open(self.restart, 'w') as fd:
459 write_json(fd, data)
461 def load(self):
462 from ase.io.jsonio import read_json
463 with open(self.restart) as fd:
464 try:
465 from ase.optimize import BFGS
466 if not isinstance(self, BFGS) and isinstance(
467 self.atoms, UnitCellFilter
468 ):
469 warnings.warn(
470 "WARNING: restart function is untested and may result "
471 "in unintended behavior. Namely orig_cell is not "
472 "loaded in the UnitCellFilter. Please test on your own"
473 " to ensure consistent results."
474 )
475 return read_json(fd, always_array=False)
476 except Exception as ex:
477 msg = ('Could not decode restart file as JSON. '
478 'You may need to delete the restart file '
479 f'{self.restart}')
480 raise RestartError(msg) from ex