Coverage for /builds/hweiske/ase/ase/optimize/optimize.py: 94.38%

178 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2024-04-22 11:22 +0000

1"""Structure optimization. """ 

2import time 

3import warnings 

4from collections.abc import Callable 

5from math import sqrt 

6from os.path import isfile 

7from typing import IO, Any, Dict, List, Optional, Tuple, Union 

8 

9from ase import Atoms 

10from ase.calculators.calculator import PropertyNotImplementedError 

11from ase.filters import UnitCellFilter 

12from ase.parallel import world 

13from ase.utils import IOContext, lazyproperty 

14from ase.utils.abc import Optimizable 

15 

16DEFAULT_MAX_STEPS = 100_000_000 

17 

18 

19class RestartError(RuntimeError): 

20 pass 

21 

22 

23class OptimizableAtoms(Optimizable): 

24 def __init__(self, atoms): 

25 self.atoms = atoms 

26 

27 def get_positions(self): 

28 return self.atoms.get_positions() 

29 

30 def set_positions(self, positions): 

31 self.atoms.set_positions(positions) 

32 

33 def get_forces(self): 

34 return self.atoms.get_forces() 

35 

36 @lazyproperty 

37 def _use_force_consistent_energy(self): 

38 # This boolean is in principle invalidated if the 

39 # calculator changes. This can lead to weird things 

40 # in multi-step optimizations. 

41 try: 

42 self.atoms.get_potential_energy(force_consistent=True) 

43 except PropertyNotImplementedError: 

44 # warnings.warn( 

45 # 'Could not get force consistent energy (\'free_energy\'). ' 

46 # 'Please make sure calculator provides \'free_energy\', even ' 

47 # 'if equal to the ordinary energy. ' 

48 # 'This will raise an error in future versions of ASE.', 

49 # FutureWarning) 

50 return False 

51 else: 

52 return True 

53 

54 def get_potential_energy(self): 

55 force_consistent = self._use_force_consistent_energy 

56 return self.atoms.get_potential_energy( 

57 force_consistent=force_consistent) 

58 

59 def iterimages(self): 

60 # XXX document purpose of iterimages 

61 return self.atoms.iterimages() 

62 

63 def __len__(self): 

64 # TODO: return 3 * len(self.atoms), because we want the length 

65 # of this to be the number of DOFs 

66 return len(self.atoms) 

67 

68 

69class Dynamics(IOContext): 

70 """Base-class for all MD and structure optimization classes.""" 

71 

72 def __init__( 

73 self, 

74 atoms: Atoms, 

75 logfile: Optional[Union[IO, str]] = None, 

76 trajectory: Optional[str] = None, 

77 append_trajectory: bool = False, 

78 master: Optional[bool] = None, 

79 comm=world, 

80 ): 

81 """Dynamics object. 

82 

83 Parameters: 

84 

85 atoms: Atoms object 

86 The Atoms object to operate on. 

87 

88 logfile: file object or str 

89 If *logfile* is a string, a file with that name will be opened. 

90 Use '-' for stdout. 

91 

92 trajectory: Trajectory object or str 

93 Attach trajectory object. If *trajectory* is a string a 

94 Trajectory will be constructed. Use *None* for no 

95 trajectory. 

96 

97 append_trajectory: boolean 

98 Defaults to False, which causes the trajectory file to be 

99 overwriten each time the dynamics is restarted from scratch. 

100 If True, the new structures are appended to the trajectory 

101 file instead. 

102 

103 master: boolean 

104 Defaults to None, which causes only rank 0 to save files. If set to 

105 true, this rank will save files. 

106 

107 comm: Communicator object 

108 Communicator to handle parallel file reading and writing. 

109 """ 

110 

111 self.atoms = atoms 

112 self.optimizable = atoms.__ase_optimizable__() 

113 self.logfile = self.openfile(file=logfile, comm=comm, mode='a') 

114 self.observers: List[Tuple[Callable, int, Tuple, Dict[str, Any]]] = [] 

115 self.nsteps = 0 

116 self.max_steps = 0 # to be updated in run or irun 

117 self.comm = comm 

118 

119 if trajectory is not None: 

120 if isinstance(trajectory, str): 

121 from ase.io.trajectory import Trajectory 

122 mode = "a" if append_trajectory else "w" 

123 trajectory = self.closelater(Trajectory( 

124 trajectory, mode=mode, master=master, comm=comm 

125 )) 

126 self.attach(trajectory, atoms=self.optimizable) 

127 

128 self.trajectory = trajectory 

129 

130 def todict(self) -> Dict[str, Any]: 

131 raise NotImplementedError 

132 

133 def get_number_of_steps(self): 

134 return self.nsteps 

135 

136 def insert_observer( 

137 self, function, position=0, interval=1, *args, **kwargs 

138 ): 

139 """Insert an observer. 

140 

141 This can be used for pre-processing before logging and dumping. 

142 

143 Examples 

144 -------- 

145 >>> from ase.build import bulk 

146 >>> from ase.calculators.emt import EMT 

147 >>> from ase.optimize import BFGS 

148 ... 

149 ... 

150 >>> def update_info(atoms, opt): 

151 ... atoms.info["nsteps"] = opt.nsteps 

152 ... 

153 ... 

154 >>> atoms = bulk("Cu", cubic=True) * 2 

155 >>> atoms.rattle() 

156 >>> atoms.calc = EMT() 

157 >>> with BFGS(atoms, logfile=None, trajectory="opt.traj") as opt: 

158 ... opt.insert_observer(update_info, atoms=atoms, opt=opt) 

159 ... opt.run(fmax=0.05, steps=10) 

160 True 

161 """ 

162 if not isinstance(function, Callable): 

163 function = function.write 

164 self.observers.insert(position, (function, interval, args, kwargs)) 

165 

166 def attach(self, function, interval=1, *args, **kwargs): 

167 """Attach callback function. 

168 

169 If *interval > 0*, at every *interval* steps, call *function* with 

170 arguments *args* and keyword arguments *kwargs*. 

171 

172 If *interval <= 0*, after step *interval*, call *function* with 

173 arguments *args* and keyword arguments *kwargs*. This is 

174 currently zero indexed.""" 

175 

176 if hasattr(function, "set_description"): 

177 d = self.todict() 

178 d.update(interval=interval) 

179 function.set_description(d) 

180 if not isinstance(function, Callable): 

181 function = function.write 

182 self.observers.append((function, interval, args, kwargs)) 

183 

184 def call_observers(self): 

185 for function, interval, args, kwargs in self.observers: 

186 call = False 

187 # Call every interval iterations 

188 if interval > 0: 

189 if (self.nsteps % interval) == 0: 

190 call = True 

191 # Call only on iteration interval 

192 elif interval <= 0: 

193 if self.nsteps == abs(interval): 

194 call = True 

195 if call: 

196 function(*args, **kwargs) 

197 

198 def irun(self, steps=DEFAULT_MAX_STEPS): 

199 """Run dynamics algorithm as generator. 

200 

201 Parameters 

202 ---------- 

203 steps : int, default=DEFAULT_MAX_STEPS 

204 Number of dynamics steps to be run. 

205 

206 Yields 

207 ------ 

208 converged : bool 

209 True if the forces on atoms are converged. 

210 

211 Examples 

212 -------- 

213 This method allows, e.g., to run two optimizers or MD thermostats at 

214 the same time. 

215 >>> opt1 = BFGS(atoms) 

216 >>> opt2 = BFGS(StrainFilter(atoms)).irun() 

217 >>> for _ in opt2: 

218 ... opt1.run() 

219 """ 

220 

221 # update the maximum number of steps 

222 self.max_steps = self.nsteps + steps 

223 

224 # compute the initial step 

225 self.optimizable.get_forces() 

226 

227 # log the initial step 

228 if self.nsteps == 0: 

229 self.log() 

230 

231 # we write a trajectory file if it is None 

232 if self.trajectory is None: 

233 self.call_observers() 

234 # We do not write on restart w/ an existing trajectory file 

235 # present. This duplicates the same entry twice 

236 elif len(self.trajectory) == 0: 

237 self.call_observers() 

238 

239 # check convergence 

240 is_converged = self.converged() 

241 yield is_converged 

242 

243 # run the algorithm until converged or max_steps reached 

244 while not is_converged and self.nsteps < self.max_steps: 

245 # compute the next step 

246 self.step() 

247 self.nsteps += 1 

248 

249 # log the step 

250 self.log() 

251 self.call_observers() 

252 

253 # check convergence 

254 is_converged = self.converged() 

255 yield is_converged 

256 

257 def run(self, steps=DEFAULT_MAX_STEPS): 

258 """Run dynamics algorithm. 

259 

260 This method will return when the forces on all individual 

261 atoms are less than *fmax* or when the number of steps exceeds 

262 *steps*. 

263 

264 Parameters 

265 ---------- 

266 steps : int, default=DEFAULT_MAX_STEPS 

267 Number of dynamics steps to be run. 

268 

269 Returns 

270 ------- 

271 converged : bool 

272 True if the forces on atoms are converged. 

273 """ 

274 

275 for converged in Dynamics.irun(self, steps=steps): 

276 pass 

277 return converged 

278 

279 def converged(self): 

280 """" a dummy function as placeholder for a real criterion, e.g. in 

281 Optimizer """ 

282 return False 

283 

284 def log(self, *args): 

285 """ a dummy function as placeholder for a real logger, e.g. in 

286 Optimizer """ 

287 return True 

288 

289 def step(self): 

290 """this needs to be implemented by subclasses""" 

291 raise RuntimeError("step not implemented.") 

292 

293 

294class Optimizer(Dynamics): 

295 """Base-class for all structure optimization classes.""" 

296 

297 # default maxstep for all optimizers 

298 defaults = {'maxstep': 0.2} 

299 _deprecated = object() 

300 

301 def __init__( 

302 self, 

303 atoms: Atoms, 

304 restart: Optional[str] = None, 

305 logfile: Optional[Union[IO, str]] = None, 

306 trajectory: Optional[str] = None, 

307 master: Optional[bool] = None, 

308 comm=world, 

309 append_trajectory: bool = False, 

310 force_consistent=_deprecated, 

311 ): 

312 """Structure optimizer object. 

313 

314 Parameters: 

315 

316 atoms: Atoms object 

317 The Atoms object to relax. 

318 

319 restart: str 

320 Filename for restart file. Default value is *None*. 

321 

322 logfile: file object or str 

323 If *logfile* is a string, a file with that name will be opened. 

324 Use '-' for stdout. 

325 

326 trajectory: Trajectory object or str 

327 Attach trajectory object. If *trajectory* is a string a 

328 Trajectory will be constructed. Use *None* for no 

329 trajectory. 

330 

331 master: boolean 

332 Defaults to None, which causes only rank 0 to save files. If 

333 set to true, this rank will save files. 

334 

335 comm: Communicator object 

336 Communicator to handle parallel file reading and writing. 

337 

338 append_trajectory: boolean 

339 Appended to the trajectory file instead of overwriting it. 

340 

341 force_consistent: boolean or None 

342 Use force-consistent energy calls (as opposed to the energy 

343 extrapolated to 0 K). If force_consistent=None, uses 

344 force-consistent energies if available in the calculator, but 

345 falls back to force_consistent=False if not. 

346 """ 

347 self.check_deprecated(force_consistent) 

348 

349 super().__init__( 

350 atoms=atoms, 

351 logfile=logfile, 

352 trajectory=trajectory, 

353 append_trajectory=append_trajectory, 

354 master=master, 

355 comm=comm) 

356 

357 self.restart = restart 

358 

359 self.fmax = None 

360 

361 if restart is None or not isfile(restart): 

362 self.initialize() 

363 else: 

364 self.read() 

365 self.comm.barrier() 

366 

367 @classmethod 

368 def check_deprecated(cls, force_consistent): 

369 if force_consistent is cls._deprecated: 

370 return False 

371 

372 warnings.warn( 

373 'force_consistent keyword is deprecated and will ' 

374 'be ignored. This will raise an error in future versions ' 

375 'of ASE.', 

376 FutureWarning) 

377 

378 def read(self): 

379 raise NotImplementedError 

380 

381 def todict(self): 

382 description = { 

383 "type": "optimization", 

384 "optimizer": self.__class__.__name__, 

385 } 

386 # add custom attributes from subclasses 

387 for attr in ('maxstep', 'alpha', 'max_steps', 'restart'): 

388 if hasattr(self, attr): 

389 description.update({attr: getattr(self, attr)}) 

390 return description 

391 

392 def initialize(self): 

393 pass 

394 

395 def irun(self, fmax=0.05, steps=DEFAULT_MAX_STEPS): 

396 """Run optimizer as generator. 

397 

398 Parameters 

399 ---------- 

400 fmax : float 

401 Convergence criterion of the forces on atoms. 

402 steps : int, default=DEFAULT_MAX_STEPS 

403 Number of optimizer steps to be run. 

404 

405 Yields 

406 ------ 

407 converged : bool 

408 True if the forces on atoms are converged. 

409 """ 

410 self.fmax = fmax 

411 return Dynamics.irun(self, steps=steps) 

412 

413 def run(self, fmax=0.05, steps=DEFAULT_MAX_STEPS): 

414 """Run optimizer. 

415 

416 Parameters 

417 ---------- 

418 fmax : float 

419 Convergence criterion of the forces on atoms. 

420 steps : int, default=DEFAULT_MAX_STEPS 

421 Number of optimizer steps to be run. 

422 

423 Returns 

424 ------- 

425 converged : bool 

426 True if the forces on atoms are converged. 

427 """ 

428 self.fmax = fmax 

429 return Dynamics.run(self, steps=steps) 

430 

431 def converged(self, forces=None): 

432 """Did the optimization converge?""" 

433 if forces is None: 

434 forces = self.optimizable.get_forces() 

435 return self.optimizable.converged(forces, self.fmax) 

436 

437 def log(self, forces=None): 

438 if forces is None: 

439 forces = self.optimizable.get_forces() 

440 fmax = sqrt((forces ** 2).sum(axis=1).max()) 

441 e = self.optimizable.get_potential_energy() 

442 T = time.localtime() 

443 if self.logfile is not None: 

444 name = self.__class__.__name__ 

445 if self.nsteps == 0: 

446 args = (" " * len(name), "Step", "Time", "Energy", "fmax") 

447 msg = "%s %4s %8s %15s %12s\n" % args 

448 self.logfile.write(msg) 

449 

450 args = (name, self.nsteps, T[3], T[4], T[5], e, fmax) 

451 msg = "%s: %3d %02d:%02d:%02d %15.6f %15.6f\n" % args 

452 self.logfile.write(msg) 

453 self.logfile.flush() 

454 

455 def dump(self, data): 

456 from ase.io.jsonio import write_json 

457 if self.comm.rank == 0 and self.restart is not None: 

458 with open(self.restart, 'w') as fd: 

459 write_json(fd, data) 

460 

461 def load(self): 

462 from ase.io.jsonio import read_json 

463 with open(self.restart) as fd: 

464 try: 

465 from ase.optimize import BFGS 

466 if not isinstance(self, BFGS) and isinstance( 

467 self.atoms, UnitCellFilter 

468 ): 

469 warnings.warn( 

470 "WARNING: restart function is untested and may result " 

471 "in unintended behavior. Namely orig_cell is not " 

472 "loaded in the UnitCellFilter. Please test on your own" 

473 " to ensure consistent results." 

474 ) 

475 return read_json(fd, always_array=False) 

476 except Exception as ex: 

477 msg = ('Could not decode restart file as JSON. ' 

478 'You may need to delete the restart file ' 

479 f'{self.restart}') 

480 raise RestartError(msg) from ex