cache_state.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 from __future__ import division
3 from __future__ import print_function
4 
5 import argparse
6 import os, os.path
7 import re
8 import sys
9 import samweb_client as swc
10 import json
11 import subprocess
12 import shlex
13 import pycurl
14 from io import BytesIO
15 
16 
17 # Check if X509_USER_PROXY is already set in the environment first and use it if so.
18 # Fall back to the /tmp/x509up... only if X509_USER_PROXY is not set.
19 X509_USER_PROXY = os.getenv("X509_USER_PROXY", "/tmp/x509up_u%d" % os.getuid())
20 PNFS_DIR_PATTERN = re.compile(r"/pnfs/(?P<area>[^/]+)")
21 # enstore locations look like
22 # "enstore:/path/to/directory(weird_tape_id)", except that sometimes
23 # the "weird_tape_id" part is missing (eg in the output of
24 # samweb.listFilesAndLocations()), so we make that part optional,
25 # which gets us this unreadable re
26 ENSTORE_PATTERN = re.compile(r"^enstore:([^(]+)(\([^)]+\))?")
27 # The base URL for the Fermilab instance of the dcache REST API.
28 #
29 # We use this for finding the online status of files and for requesting prestaging. The full dcache REST API is described in the dcache User Guide:
30 #
31 # https://www.dcache.org/manuals/UserGuide-6.0/frontend.shtml
32 DCACHE_REST_BASE_URL = "https://fndca.fnal.gov:3880/api/v1/namespace"
33 
34 ################################################################################
35 class ProgressBar(object):
36  def __init__(self, total, announce_threshold=50):
37  self.total = total
38  self._total_div10 = total // 10
39 
40  self.announce = total >= announce_threshold
42 
43  self.Update(0)
44 
45  def Update(self, n):
46  current_decile = None
47  if self.total > 10:
48  current_decile = n // self._total_div10
49  if self.announce:
50  if current_decile is None:
51  print( " %d" % n, end=" " )
52  if (current_decile > self._last_announce_decile or n == self.total): # always want to announce 100%
53  curr_perc = int(n / self.total * 100)
54  print( " %d%%" % curr_perc, end=" " )
55 
56  self._last_announce_decile = n // self._total_div10
57 
58  sys.stdout.flush()
59 
60 ################################################################################
61 def make_curl():
62  """Returns a pycurl object with the necessary fields set for Fermilab
63  authentication.
64 
65  The object can be reused for multiple requests to the
66  dcache REST API and curl will reuse the connection, which should speed
67  things up"""
68 
69  c = pycurl.Curl()
70  c.setopt(c.CAINFO, X509_USER_PROXY);
71  c.setopt(c.SSLCERT, X509_USER_PROXY);
72  c.setopt(c.SSLKEY, X509_USER_PROXY);
73  c.setopt(c.SSH_PRIVATE_KEYFILE, X509_USER_PROXY);
74  c.setopt(c.FOLLOWLOCATION, True)
75  c.setopt(c.CAPATH, "/etc/grid-security/certificates");
76 
77  return c
78 
79 ################################################################################
80 def filename_to_namespace(filename):
81  filename_out=filename
82  if filename.startswith("root://fndca1.fnal.gov:1094"):
83  filename_out=filename.replace("root://fndca1.fnal.gov:1094", "")
84  elif filename.startswith("/pnfs/dune"):
85  filename_out=filename.replace("/pnfs/dune", "/pnfs/fnal.gov/usr/dune")
86  elif filename.startswith("enstore:/pnfs/dune"):
87  filename_out=filename.replace("enstore:/pnfs/dune", "/pnfs/fnal.gov/usr/dune")
88 
89  return filename_out
90 
91 ################################################################################
92 def get_file_qos(c, filename):
93  """Using curl object `c`, find the "QoS" of `filename`.
94 
95  QoS is "disk", "tape" or "disk+tape", with the obvious meanings
96 
97  Returns: (currentQos, targetQos) where targetQos is non-empty if
98  there is an outstanding prestage request. currentQos will
99  be empty if there is an error (eg, file does not exist)
100 
101  Uses the dcache REST API frontend, documented in the dcache User Guide, eg:
102 
103  https://www.dcache.org/manuals/UserGuide-6.0/frontend.shtml
104 
105  """
106 
107  # qos=true in the URL causes dcache to tell us whether the file's
108  # on disk or tape, and also the "targetQos", which exists if
109  # there's an outstanding prestage request.
110  #
111  # Update 2020-10-02: it looks like qos is sometimes incorrect, or
112  # at least, not what I thought it was, since online files can have
113  # fileLocality=ONLINE_AND_NEARLINE but qos=tape. So we use
114  # fileLocality for the online-ness of the file, but still request
115  # qos because it gives us the target qos if there's an outstanding
116  # prestage request
117  url="{host}/{path}?qos=true&locality=true".format(host=DCACHE_REST_BASE_URL, path=filename_to_namespace(filename))
118  c.setopt(c.URL, url)
119  mybuffer = BytesIO()
120  c.setopt(c.WRITEFUNCTION, mybuffer.write)
121  c.perform()
122 
123  # Body is a byte string.
124  # We have to know the encoding in order to print it to a text file
125  # such as standard output.
126  body = mybuffer.getvalue().decode('iso-8859-1')
127 
128  j=json.loads(body)
129  qos=""
130  locality=""
131  targetQos=""
132  # "qos" turns out to not quite be right - see comment above
133  # if "currentQos" in j:
134  # qos=j["currentQos"]
135  if "fileLocality" in j:
136  locality=j["fileLocality"]
137  if "targetQos" in j:
138  targetQos=j["targetQos"]
139 
140  return (locality, targetQos)
141 
142 ################################################################################
143 def is_file_online(c, filename):
144  """Using curl object `c`, returns whether `filename` is online"""
145  return "ONLINE" in get_file_qos(c, filename)[0]
146 
147 ################################################################################
148 def request_prestage(c, filename):
149  """Using curl object `c`, request a prestage for `filename`
150 
151  Returns whether the request succeeded (according to dcache)
152 
153  Uses a HTTP post request in a very specific format to request a prestage of a file. Adapted from:
154 
155  https://github.com/DmitryLitvintsev/scripts/blob/master/bash/bring-online.sh
156 
157  Uses the dcache REST API frontend, documented in the dcache User Guide, eg:
158 
159  https://www.dcache.org/manuals/UserGuide-6.0/frontend.shtml
160  """
161  c.setopt(c.POSTFIELDS, """{"action" : "qos", "target" : "disk+tape"}""")
162  c.setopt(c.HTTPHEADER, ["Accept: application/json", "Content-Type: application/json"])
163  c.setopt(c.POST, 1)
164  c.setopt(c.URL, "{host}/{path}".format(host=DCACHE_REST_BASE_URL, path=filename_to_namespace(filename)))
165  mybuffer = BytesIO()
166  c.setopt(c.WRITEFUNCTION, mybuffer.write)
167  c.perform()
168 
169  # Body is a byte string.
170  # We have to know the encoding in order to print it to a text file
171  # such as standard output.
172  body = mybuffer.getvalue().decode('iso-8859-1')
173  j=json.loads(body)
174  return "status" in j and j["status"]=="success"
175 
176 ################################################################################
178  path, filename = os.path.split(f)
179  stat_file="%s/.(get)(%s)(locality)"%(path,filename)
180  theStatFile=open(stat_file)
181  state=theStatFile.readline()
182  theStatFile.close()
183  return 'ONLINE' in state
184 
185 ################################################################################
186 def FilelistCacheCount(files, verbose_flag, METHOD="rest"):
187  assert(METHOD in ("rest", "pnfs"))
188 
189  if len(files) > 1:
190  print( "Checking %d files:" % len(files) )
191  cached = 0
192  pending = 0
193  n = 0
194 
195  # If we're in verbose mode, the per-file output fights with
196  # the progress bar, so disable the progress bar
197  progbar = None if verbose_flag else ProgressBar(len(files))
198 
199  c=make_curl() if METHOD=="rest" else None
200 
201  for f in files:
202  if METHOD=="rest":
203  qos,targetQos=get_file_qos(c, f)
204  if "ONLINE" in qos: cached += 1
205  if "disk" in targetQos: pending += 1
206  if verbose_flag:
207  print( f, qos, "pending" if targetQos else "")
208  elif METHOD=="pnfs":
209  this_cached=is_file_online_pnfs(f)
210  if this_cached: cached += 1
211  if verbose_flag:
212  print( f, "ONLINE" if this_cached else "NEARLINE")
213 
214  n += 1
215  # If we're in verbose mode, the per-file output fights with
216  # the progress bar, so disable the progress bar
217  if not verbose_flag: progbar.Update(n)
218 
219  if not verbose_flag: progbar.Update(progbar.total)
220 
221  # We don't count pending files with the pnfs method, so set it to
222  # something meaningless
223  if METHOD=="pnfs":
224  pending = -1
225  return (cached, pending, n)
226 
227 ################################################################################
228 def FilelistPrestageRequest(files, verbose_flag):
229  announce=len(files) > 1
230  if announce:
231  print( "Prestaging %d files:" % len(files) )
232 
233  c=make_curl()
234  n = len(files)
235  n_request_succeeded = 0
236  for f in files:
237  success=request_prestage(c, f)
238  if success: n_request_succeeded += 1
239  if verbose_flag:
240  print( f, "request succeeded" if success else "request failed" )
241 
242  return (n_request_succeeded, n)
243 
244 ################################################################################
245 def enstore_locations_to_paths(samlist, sparsification=1):
246  """Convert a list of enstore locations as returned by
247  samweb.listFilesAndLocations() into plain pnfs paths. Sparsify by
248  `sparsification`"""
249  pnfspaths=[]
250  for f in samlist[::sparsification]:
251  m=ENSTORE_PATTERN.match(f[0])
252  if m:
253  directory=m.group(1)
254  filename=f[1]
255  pnfspaths.append(os.path.join(directory, filename))
256  else:
257  print( "enstore_locations_to_paths got a non-enstore location", f[0] )
258  return pnfspaths
259 
260 examples="""
261 Examples:
262 
263  Find the cache state of one file:
264 
265  %(prog)s np04_raw_run004513_0008_dl5.root
266 
267  Find the cache state of multiple files. With -v, each file's status
268  is shown; otherwise just a count is shown. Can mix-and-match full
269  paths and SAM filenames:
270 
271  %(prog)s -v /pnfs/dune/tape_backed/myfile.root np04_raw_run004513_0008_dl5.root
272 
273  Summarize the cache state of a SAM dataset:
274 
275  %(prog)s -d protodune-sp_runset_4513_raw_v0
276 
277  Show the cache state of each file matching a SAM query:
278 
279  %(prog)s -v --dim 'run_type protodune-sp and run_number 4513 and data_tier raw'
280 
281  Prestage an individual file, by its SAM filename:
282 
283  %(prog)s -p np04_raw_run004513_0008_dl5.root
284 
285  (In subsequent queries, the file will show up as "pending" until it
286  arrives on disk)
287 
288  Prestage an entire dataset (like samweb prestage-dataset):
289 
290  %(prog)s -p -d protodune-sp_runset_4513_raw_v0
291 """
292 
293 ################################################################################
294 if __name__=="__main__":
295  parser= argparse.ArgumentParser(epilog=examples, formatter_class=argparse.RawDescriptionHelpFormatter)
296 
297  gp = parser.add_mutually_exclusive_group()
298  gp.add_argument("files",
299  nargs="*",
300  default=[],
301  metavar="FILE",
302  help="Files to consider. Can be specified as a full /pnfs path, or just the SAM filename",
303  )
304  gp.add_argument("-d", "--dataset",
305  metavar="DATASET",
306  dest="dataset_name",
307  help="Name of the SAM dataset to check cache status of",
308  )
309  gp.add_argument("-q", "--dim",
310  metavar="\"DIMENSION\"",
311  dest="dimensions",
312  help="sam dimensions to check cache status of",
313  )
314 
315  parser.add_argument("-s","--sparse", type=int, dest='sparse',help="Sparsification factor. This is used to check only a portion of a list of files",default=1)
316  parser.add_argument("-ss", "--snapshot", dest="snapshot", help="[Also requires -d] Use this snapshot ID for the dataset. Specify 'latest' for the most recent one.")
317  parser.add_argument("-v","--verbose", action="store_true", dest="verbose", default=False, help="Print information about individual files")
318  parser.add_argument("-p","--prestage", action="store_true", dest="prestage", default=False, help="Prestage the files specified")
319  parser.add_argument("-m", "--method", choices=["rest", "pnfs"], default="rest", help="Use this method to look up file status.")
320 
321  args=parser.parse_args()
322 
323  # gotta make sure you have a valid certificate.
324  # otherwise the results may lie...
325  if args.method in ("rest"):
326  try:
327  subprocess.check_call(shlex.split("setup_fnal_security --check"), stdout=open(os.devnull), stderr=subprocess.STDOUT)
328  except subprocess.CalledProcessError:
329  print( "Your proxy is expired or missing. Please run `setup_fnal_security` and then try again." )
330  sys.exit(2)
331 
332  filelist = None if args.dataset_name else args.files
333 
334  sam = swc.SAMWebClient("dune")
335 
336  cache_count = 0
337 
338  # Figure out where we want to get our list of files from
339 
340  # See if a SAM dataset was specified
341  if args.dataset_name:
342  print( "Retrieving file list for SAM dataset definition name: '%s'..." % args.dataset_name, end="" )
343  sys.stdout.flush()
344  try:
345  dimensions = None
346  if args.snapshot == "latest":
347  dimensions = "dataset_def_name_newest_snapshot %s" % args.dataset_name
348  elif args.snapshot:
349  dimensions = "snapshot_id %s" % args.snapshot
350  if dimensions:
351  samlist = sam.listFilesAndLocations(dimensions=dimensions, filter_path="enstore")
352  else:
353  #samlist = sam.listFilesAndLocations(defname=args.dataset_name, filter_path="enstore")
354  thislist = sam.listFiles(defname=args.dataset_name)
355  print(len(thislist))
356  samlist = []
357  a = 0
358  for f in thislist:
359  if not (a%100): print("Locating files: %i/%i"%(a, len(thislist)), end='\r')
360  locs = sam.locateFile(f)
361  for l in locs:
362  if l['full_path'].split(':')[0] == 'enstore':
363  samlist.append((l['full_path'], f))
364  break
365  a += 1
366  print()
367  print(len(samlist))
368 
369  filelist = enstore_locations_to_paths(list(samlist), args.sparse)
370  print( " done." )
371  except Exception as e:
372  print( e )
373  print()
374  print( 'Unable to retrieve SAM information for dataset: %s' %(args.dataset_name) )
375  exit(-1)
376  # Take the rest of the commandline as the filenames
377  filelist = args
378  elif args.dimensions:
379  print( "Retrieving file list for SAM dimensions: '%s'..." % args.dimensions, end="" )
380  sys.stdout.flush()
381  try:
382  samlist = sam.listFilesAndLocations(dimensions=args.dimensions, filter_path="enstore")
383  filelist = enstore_locations_to_paths(list(samlist), args.sparse)
384  print( " done." )
385  except Exception as e:
386  print( e )
387  print()
388  print( 'Unable to retrieve SAM information for dimensions: %s' %(args.dimensions) )
389  exit(-1)
390  else:
391  filelist=[]
392  # We were passed a list of files. Loop over them and try to locate each one
393  for f in args.files:
394  if os.path.isfile(f):
395  # We got a path to an actual file
396  # If the file's not on pnfs, just assume it's on a
397  # regular filesystem that is always "cached". Otherwise, add it to the list
398  if f.startswith("/pnfs"):
399  filelist.append(f)
400  else:
401  cache_count += 1
402  continue
403  else:
404  # The argument isn't a file on the file system. Assume
405  # it's a filename in samweb and ask samweb for the
406  # location
407  try:
408  locs = sam.locateFile(f)
409  # locateFile potentially produces multiple
410  # locations. We look through them for the enstore
411  # one, and add it to the list, but without the
412  # "enstore:/" at the front
413  for loc in locs:
414  l=loc["location"]
415  m=ENSTORE_PATTERN.match(l)
416  if m:
417  directory=m.group(1)
418  fullpath=os.path.join(directory, f)
419  filelist.append(fullpath)
420  except (swc.exceptions.FileNotFound, swc.exceptions.HTTPNotFound):
421  print("File is not known to SAM and is not a full path:", f, file=sys.stderr)
422  sys.exit(2)
423 
424  miss_count = 0
425 
426  n_files = len(filelist)
427  announce = n_files > 1 # some status notes if there are lots of files
428 
429  if args.prestage:
430  ngood,n=FilelistPrestageRequest(filelist, args.verbose)
431  sys.exit(0 if ngood==n else 1)
432  else:
433  cache_count, pending_count, total = FilelistCacheCount(filelist, args.verbose, args.method)
434  miss_count = total - cache_count
435 
436  total = float(cache_count + miss_count)
437  cache_frac_str = (" (%d%%)" % round(cache_count/total*100)) if total > 0 else ""
438  miss_frac_str = (" (%d%%)" % round(miss_count/total*100)) if total > 0 else ""
439 
440  if total > 1:
441  print()
442  pending_string=""
443  if pending_count>=0:
444  pending_string="\tPending: %d (%d%%)" % (pending_count, round(pending_count/total*100))
445  print( "Cached: %d%s\tTape only: %d%s%s" % (cache_count, cache_frac_str, miss_count, miss_frac_str, pending_string))
446  elif total == 1:
447  print( "CACHED" if cache_count > 0 else "NOT CACHED", end="")
448  print( " PENDING" if pending_count > 0 else "" )
449 
450  if miss_count == 0:
451  sys.exit(0)
452  else:
453  sys.exit(1)
454 
455 # Local Variables:
456 # python-indent-offset: 4
457 # End:
def make_curl()
Definition: cache_state.py:61
def request_prestage(c, filename)
Definition: cache_state.py:148
int open(const char *, int)
Opens a file descriptor.
static bool format(QChar::Decomposition tag, QString &str, int index, int len)
Definition: qstring.cpp:11496
def enstore_locations_to_paths(samlist, sparsification=1)
Definition: cache_state.py:245
def __init__(self, total, announce_threshold=50)
Definition: cache_state.py:36
def is_file_online(c, filename)
Definition: cache_state.py:143
def FilelistPrestageRequest(files, verbose_flag)
Definition: cache_state.py:228
def is_file_online_pnfs(f)
Definition: cache_state.py:177
def filename_to_namespace(filename)
Definition: cache_state.py:80
void decode(std::any const &a, Hep2Vector &result)
Definition: CLHEP_ps.h:12
def get_file_qos(c, filename)
Definition: cache_state.py:92
void split(std::string const &s, char c, OutIter dest)
Definition: split.h:35
def FilelistCacheCount(files, verbose_flag, METHOD="rest")
Definition: cache_state.py:186