To anyone that finds this thread later and has a similar need: I wrote a python script to accomplish my original goal of generating image flags. It implements the algorithm suggested by Axel where image flags are generated on the first time step based on minimum image distance then updated at later times if the particle moves more than half a cell length. It writes the new dump in the ADIOS format for fast IO.
The script assumes a constant box and number of particles, but should work for triclinic boxes. I did not implement exception handling/error checking, so robustness is probably lacking.
This implementation seems to be about 5x faster than the LAMMPS-based approach I discussed originally. The rate-limiting step is reading the plain text dump; all other operations are reasonably fast due to numpy vectorization and ADIOS2 writing. I’m sure other optimizations are possible, but are likely marginal.
Feel free to use/modify/etc to suit your needs.
import numpy as np
import scipy as sp
from operator import itemgetter
from itertools import islice
import adios2
input_filename = 'dump.txt'
output_filename = 'dump_adios.bp'
# helper function to write ADIOS attributes
def writeAttr(file,dump):
# TODO: make sure the keys are sorted correctly
out_colnames = list(dump.colnames.keys())
out_colnames.extend(['ix','iy','iz'])
bnd_list = [int(x=='f') for x in dump.boundary_str.replace(' ','')]
file.write_attribute('LAMMPS/dump_style','custom')
file.write_attribute('LAMMPS/num_ver','20250722')
file.write_attribute('LAMMPS/version','22 Jul 2025')
file.write_attribute('boundary',bnd_list)
file.write_attribute('boundarystr',dump.boundary_str)
file.write_attribute('columns',out_colnames)
file.write_attribute('columnstr',' '.join(out_colnames))
file.write_attribute('triclinic',np.array(int(dump.triclinic),dtype=np.int32))
# helper function to write ADIOS timestep info
def writeStep(file,dump,header,atomdata):
file.begin_step()
file.write('atoms',atomdata)
file.write('boxxhi',dump.box.lammps_box[0,1])
file.write('boxxlo',dump.box.lammps_box[0,0])
file.write('boxyhi',dump.box.lammps_box[1,1])
file.write('boxylo',dump.box.lammps_box[1,0])
file.write('boxzhi',dump.box.lammps_box[2,1])
file.write('boxzlo',dump.box.lammps_box[2,0])
file.write('natoms',np.array(dump.natoms,dtype=np.uint64))
file.write('ncolumns',dump.ncol)
file.write('nme',dump.natoms)
file.write('ntimestep',np.array(int(header[1]),dtype=np.uint64))
file.write('nprocs',1)
file.write('offset',1)
file.end_step()
# helper class to store box from lammps dump and compute cell tensor, etc.
class Box:
def __init__(self,inbox): # must pass in an array of proper shape
assert(inbox.shape[0] == 3)
self.lammps_box = np.array(inbox)
self.tensor = np.zeros((3,3))
if inbox.shape[1] == 3:
# need to modify the first two boundary values for triclinic cells to get correct cell
inbox[0,0] -= np.amin([0.0,inbox[0,2],inbox[1,2],inbox[0,2]+inbox[1,2]])
inbox[0,1] -= np.amax([0.0,inbox[0,2],inbox[1,2],inbox[0,2]+inbox[1,2]])
inbox[1,0] -= np.minimum(0,inbox[2,2])
inbox[1,1] -= np.maximum(0,inbox[2,2])
self.tensor[0,1] = inbox[0,2]
self.tensor[0,2] = inbox[1,2]
self.tensor[1,2] = inbox[2,2]
for j in range(3):
self.tensor[j,j] = inbox[j,1] - inbox[j,0]
self.origin = inbox[:,0]
self.length = np.sqrt(np.sum(self.tensor**2,1))
self.volume = np.linalg.det(self.tensor)
self.tensorinv = np.linalg.inv(self.tensor)
# class to read dump
class Dump:
def __init__(self,file):
self.file = file
self.format_unset = True
self.colnames = {}
self.ncol = 0
self.usedir = 3*[False] # keep track of whether this dimension is periodic
self.dirlabels = ('x','y','z')
def readHeader(self):
try:
lines = [ x.strip() for x in islice(self.file,9) ]
except:
return # TODO: introduce error handling
if self.format_unset:
self.natoms = int(lines[3])
if ("ITEM: BOX BOUNDS" in lines[4]):
self.boundary_str = ' '.join(lines[4].split()[3:6])
self.usedir = [ x == 'pp' for x in lines[4].split()[3:6] ]
if ("xy xz yz" in lines[4]): # this is a triclinic box
self.triclinic = True
boxtmp = np.zeros((3,3))
else:
self.triclinic = False
boxtmp = np.zeros((3,2))
for j in range(3): # loop over all dimensions
linetmp = lines[5+j].split()
if self.triclinic:
assert(len(linetmp)==3) # three items for triclinic
else:
assert(len(linetmp)==2) # only two for orthorhombic
boxtmp[j,:] = np.asarray(linetmp,dtype=float)
self.box = Box(boxtmp)
if ("ITEM: ATOMS" in lines[8]):
keys = lines[8].split()[2:]
self.ncol = len(keys)
for j in range(self.ncol):
self.colnames[keys[j]] = j
self.format_unset = False
return lines
def readFrame(self):
header = self.readHeader()
if len(header) > 0:
data = np.loadtxt(self.file,max_rows=self.natoms)
else:
data = np.array([])
return header, data
# TODO: create main() fnc.
# initialize adios2 to be compatible with lammps
adios = adios2.Adios()
bpIO = adios.declare_io('Binary_output')
bpIO.set_engine('BP4')
# open input and output files
with open(input_filename,'r') as ifile, adios2.Stream(bpIO,output_filename,"w") as ofile:
dmp = Dump(ifile)
hdr, data = dmp.readFrame()
data = data[data[:,dmp.colnames["id"]].argsort(),:]
# create a cyclic buffer to store positions for the current and previous time
posarray = np.zeros((dmp.natoms,3,2))
posarray[:,:,0] = data[:,itemgetter(*dmp.dirlabels)(dmp.colnames)]
# construct initial image flags
molids = data[:,dmp.colnames['mol']].astype(int)
# pick an atom in each molecule as the starting point
r0_ids = np.unique(molids,return_index = True)[1]
r0 = posarray[r0_ids,:,0]
# create a sparse array to assign atoms to molecules
molmat = sp.sparse.csr_array((np.ones(dmp.natoms),(np.array(range(dmp.natoms)),molids)))
img_flags = -np.round(np.dot(posarray[:,:,0] - molmat.dot(r0),dmp.box.tensorinv.T))
img_flags[:,np.logical_not(dmp.usedir)] = 0
writeAttr(ofile,dmp)
out_array = np.hstack((data,img_flags))
writeStep(ofile,dmp,hdr,out_array)
idx = 1
while True:
hdr, data = dmp.readFrame()
if len(data)==0:
break
data = data[data[:,dmp.colnames["id"]].argsort(),:]
posarray[:,:,idx] = data[:,itemgetter(*dmp.dirlabels)(dmp.colnames)]
# update image flags based on delta from previous time step
img_flags[:,dmp.usedir] -= np.round(np.dot(posarray[:,:,idx] - posarray[:,:,1-idx],dmp.box.tensorinv.T))[:,dmp.usedir]
# write output data
out_array = np.hstack((data,img_flags))
writeStep(ofile,dmp,hdr,out_array)
# update indices of cyclic buffer
idx += 1
idx %= 2