larbatch_utilities.py
Go to the documentation of this file.
1 #! /usr/bin/env python
2 ######################################################################
3 #
4 # Name: larbatch_utilities.py
5 #
6 # Purpose: This module contains low level utilities that are used in
7 # either modules project_utilities or larbatch_posix.
8 #
9 # Created: 13-Jun-2016 Herbert Greenlee
10 #
11 # The following functions are provided as interfaces to ifdh. These
12 # functions are equipped with authentication checking, timeouts and
13 # other protections.
14 #
15 # ifdh_cp - Interface for "ifdh cp."
16 # ifdh_ls - Interface for "ifdh ls."
17 # ifdh_ll - Interface for "ifdh ll."
18 # ifdh_mkdir - Interface for "ifdh mkdir."
19 # ifdh_rmdir - Interface for "ifdh rmdir."
20 # ifdh_mv - Interface for "ifdh mv."
21 # ifdh_rm - Interface for "ifdh rm."
22 # ifdh_chmod - Interface for "ifdh chmod."
23 #
24 # The following functions are provided as interfaces to posix tools
25 # with additional protections or timeouts.
26 #
27 # posix_cp - Copy file with timeout.
28 #
29 # Authentication functions.
30 #
31 # test_ticket - Raise an exception of user does not have a valid kerberos ticket.
32 # get_kca - Get a kca certificate.
33 # get_proxy - Get a grid proxy.
34 # test_kca - Get a kca certificate if necessary.
35 # text_proxy - Get a grid proxy if necessary.
36 # get_experiment - Get standard experiment name.
37 # get_user - Get authenticated user.
38 # get_prouser - Get production user.
39 # get_role - Get VO role.
40 #
41 # SAM functions.
42 #
43 # dimensions - Return sam query dimensions for stage.
44 # get_sam_metadata - Return sam metadata fcl parameters for stage.
45 # get_bluearc_server - Sam fictitious server for bluearc.
46 # get_dcache_server - Sam fictitious server for dCache.
47 # get_dropbox - Return dropbox based on sam metadata.
48 #
49 # Other functions.
50 #
51 # get_ups_products - Top level ups products.
52 # get_setup_script_path - Full path of experiment setup script.
53 # wait_for_subprocess - For use with subprocesses with timeouts.
54 # dcache_server - Return dCache server.
55 # dcache_path - Convert dCache local path to path on server.
56 # xrootd_server_port - Return xrootd server and port (as <server>:<port>).
57 # xrootd_uri - Convert dCache path to xrootd uri.
58 # gridftp_uri - Convert dCache path to gridftp uri.
59 # srm_uri - Convert dCache path to srm uri.
60 # nfs_server - Node name of a computer in which /pnfs filesystem is nfs-mounted.
61 # parse_mode - Parse the ten-character file mode string ("ls -l").
62 # check_running - Check for running project.py submission process.
63 # convert_str - Accepting unicode or bytes as input, convert to default python str.
64 # convert_bytes - Accepting unicode or bytes as input, convert to bytes.
65 # test_jobsub - Test whether jobsub_client is set up.
66 #
67 ######################################################################
68 
69 from __future__ import absolute_import
70 from __future__ import print_function
71 import sys, os
72 import stat
73 import subprocess
74 import getpass
75 import threading
76 try:
77  import queue
78 except ImportError:
79  import Queue as queue
80 from project_modules.ifdherror import IFDHError
81 
82 # Global variables.
83 
84 ticket_ok = False
85 kca_ok = False
86 proxy_ok = False
87 kca_user = ''
88 jobsub_ok = False
89 
90 # Copy file using ifdh, with timeout.
91 
92 def ifdh_cp(source, destination):
93 
94  # Get proxy.
95 
96  test_proxy()
97 
98  # Make sure environment variables X509_USER_CERT and X509_USER_KEY
99  # are not defined (they confuse ifdh, or rather the underlying tools).
100 
101  save_vars = {}
102  for var in ('X509_USER_CERT', 'X509_USER_KEY'):
103  if var in os.environ:
104  save_vars[var] = os.environ[var]
105  del os.environ[var]
106 
107  # Do copy.
108 
109  cmd = ['ifdh', 'cp', source, destination]
110  jobinfo = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
111 
112  q = queue.Queue()
113  thread = threading.Thread(target=wait_for_subprocess, args=[jobinfo, q])
114  thread.start()
115  thread.join(timeout=31000000)
116  if thread.is_alive():
117  print('Terminating subprocess.')
118  jobinfo.terminate()
119  thread.join()
120  rc = q.get()
121  jobout = convert_str(q.get())
122  joberr = convert_str(q.get())
123  if rc != 0:
124  for var in list(save_vars.keys()):
125  os.environ[var] = save_vars[var]
126  raise IFDHError(cmd, rc, jobout, joberr)
127 
128  # Restore environment variables.
129 
130  for var in list(save_vars.keys()):
131  os.environ[var] = save_vars[var]
132 
133 
134 # Ifdh ls, with timeout.
135 # Return value is list of lines returned by "ifdh ls" command.
136 
137 def ifdh_ls(path, depth):
138 
139  # Get proxy.
140 
141  test_proxy()
142 
143  # Make sure environment variables X509_USER_CERT and X509_USER_KEY
144  # are not defined (they confuse ifdh, or rather the underlying tools).
145 
146  save_vars = {}
147  for var in ('X509_USER_CERT', 'X509_USER_KEY'):
148  if var in os.environ:
149  save_vars[var] = os.environ[var]
150  del os.environ[var]
151 
152  # Do listing.
153 
154  cmd = ['ifdh', 'ls', path, '%d' % depth]
155  jobinfo = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
156 
157  q = queue.Queue()
158  thread = threading.Thread(target=wait_for_subprocess, args=[jobinfo, q])
159  thread.start()
160  thread.join(timeout=600)
161  if thread.is_alive():
162  print('Terminating subprocess.')
163  jobinfo.terminate()
164  thread.join()
165  rc = q.get()
166  jobout = convert_str(q.get())
167  joberr = convert_str(q.get())
168  if rc != 0:
169  for var in list(save_vars.keys()):
170  os.environ[var] = save_vars[var]
171  raise IFDHError(cmd, rc, jobout, joberr)
172 
173  # Restore environment variables.
174 
175  for var in list(save_vars.keys()):
176  os.environ[var] = save_vars[var]
177 
178  # Done.
179 
180  return jobout.splitlines()
181 
182 
183 # Ifdh ll, with timeout.
184 # Return value is list of lines returned by "ifdh ls" command.
185 
186 def ifdh_ll(path, depth):
187 
188  # Get proxy.
189 
190  test_proxy()
191 
192  # Make sure environment variables X509_USER_CERT and X509_USER_KEY
193  # are not defined (they confuse ifdh, or rather the underlying tools).
194 
195  save_vars = {}
196  for var in ('X509_USER_CERT', 'X509_USER_KEY'):
197  if var in os.environ:
198  save_vars[var] = os.environ[var]
199  del os.environ[var]
200 
201  # Do listing.
202 
203  cmd = ['ifdh', 'll', path, '%d' % depth]
204  jobinfo = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
205 
206  q = queue.Queue()
207  thread = threading.Thread(target=wait_for_subprocess, args=[jobinfo, q])
208  thread.start()
209  thread.join(timeout=60)
210  if thread.is_alive():
211  print('Terminating subprocess.')
212  jobinfo.terminate()
213  thread.join()
214  rc = q.get()
215  jobout = convert_str(q.get())
216  joberr = convert_str(q.get())
217  if rc != 0:
218  for var in list(save_vars.keys()):
219  os.environ[var] = save_vars[var]
220  raise IFDHError(cmd, rc, jobout, joberr)
221 
222  # Restore environment variables.
223 
224  for var in list(save_vars.keys()):
225  os.environ[var] = save_vars[var]
226 
227  # Done.
228 
229  return jobout.splitlines()
230 
231 
232 # Ifdh mkdir, with timeout.
233 
234 def ifdh_mkdir(path):
235 
236  # Get proxy.
237 
238  test_proxy()
239 
240  # Make sure environment variables X509_USER_CERT and X509_USER_KEY
241  # are not defined (they confuse ifdh, or rather the underlying tools).
242 
243  save_vars = {}
244  for var in ('X509_USER_CERT', 'X509_USER_KEY'):
245  if var in os.environ:
246  save_vars[var] = os.environ[var]
247  del os.environ[var]
248 
249  # Do mkdir.
250 
251  cmd = ['ifdh', 'mkdir', path]
252  jobinfo = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
253 
254  q = queue.Queue()
255  thread = threading.Thread(target=wait_for_subprocess, args=[jobinfo, q])
256  thread.start()
257  thread.join(timeout=60)
258  if thread.is_alive():
259  print('Terminating subprocess.')
260  jobinfo.terminate()
261  thread.join()
262  rc = q.get()
263  jobout = convert_str(q.get())
264  joberr = convert_str(q.get())
265  if rc != 0:
266  for var in list(save_vars.keys()):
267  os.environ[var] = save_vars[var]
268  raise IFDHError(cmd, rc, jobout, joberr)
269 
270  # Restore environment variables.
271 
272  for var in list(save_vars.keys()):
273  os.environ[var] = save_vars[var]
274 
275  # Done.
276 
277  return
278 
279 
280 # Ifdh rmdir, with timeout.
281 
282 def ifdh_rmdir(path):
283 
284  # Get proxy.
285 
286  test_proxy()
287 
288  # Make sure environment variables X509_USER_CERT and X509_USER_KEY
289  # are not defined (they confuse ifdh, or rather the underlying tools).
290 
291  save_vars = {}
292  for var in ('X509_USER_CERT', 'X509_USER_KEY'):
293  if var in os.environ:
294  save_vars[var] = os.environ[var]
295  del os.environ[var]
296 
297  # Do rmdir.
298 
299  cmd = ['ifdh', 'rmdir', path]
300  jobinfo = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
301 
302  q = queue.Queue()
303  thread = threading.Thread(target=wait_for_subprocess, args=[jobinfo, q])
304  thread.start()
305  thread.join(timeout=60)
306  if thread.is_alive():
307  print('Terminating subprocess.')
308  jobinfo.terminate()
309  thread.join()
310  rc = q.get()
311  jobout = convert_str(q.get())
312  joberr = convert_str(q.get())
313  if rc != 0:
314  for var in list(save_vars.keys()):
315  os.environ[var] = save_vars[var]
316  raise IFDHError(cmd, rc, jobout, joberr)
317 
318  # Restore environment variables.
319 
320  for var in list(save_vars.keys()):
321  os.environ[var] = save_vars[var]
322 
323  # Done.
324 
325  return
326 
327 
328 # Ifdh chmod, with timeout.
329 
330 def ifdh_chmod(path, mode):
331 
332  # Get proxy.
333 
334  test_proxy()
335 
336  # Make sure environment variables X509_USER_CERT and X509_USER_KEY
337  # are not defined (they confuse ifdh, or rather the underlying tools).
338 
339  save_vars = {}
340  for var in ('X509_USER_CERT', 'X509_USER_KEY'):
341  if var in os.environ:
342  save_vars[var] = os.environ[var]
343  del os.environ[var]
344 
345  # Do chmod.
346 
347  cmd = ['ifdh', 'chmod', '%o' % mode, path]
348  jobinfo = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
349 
350  q = queue.Queue()
351  thread = threading.Thread(target=wait_for_subprocess, args=[jobinfo, q])
352  thread.start()
353  thread.join(timeout=60)
354  if thread.is_alive():
355  print('Terminating subprocess.')
356  jobinfo.terminate()
357  thread.join()
358  rc = q.get()
359  jobout = convert_str(q.get())
360  joberr = convert_str(q.get())
361  if rc != 0:
362  for var in list(save_vars.keys()):
363  os.environ[var] = save_vars[var]
364  raise IFDHError(cmd, rc, jobout, joberr)
365 
366  # Restore environment variables.
367 
368  for var in list(save_vars.keys()):
369  os.environ[var] = save_vars[var]
370 
371  # Done.
372 
373  return
374 
375 
376 # Ifdh mv, with timeout.
377 
378 def ifdh_mv(src, dest):
379 
380  # Get proxy.
381 
382  test_proxy()
383 
384  # Make sure environment variables X509_USER_CERT and X509_USER_KEY
385  # are not defined (they confuse ifdh, or rather the underlying tools).
386 
387  save_vars = {}
388  for var in ('X509_USER_CERT', 'X509_USER_KEY'):
389  if var in os.environ:
390  save_vars[var] = os.environ[var]
391  del os.environ[var]
392 
393  # Do rename.
394 
395  cmd = ['ifdh', 'mv', src, dest]
396  jobinfo = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
397 
398  q = queue.Queue()
399  thread = threading.Thread(target=wait_for_subprocess, args=[jobinfo, q])
400  thread.start()
401  thread.join(timeout=60)
402  if thread.is_alive():
403  print('Terminating subprocess.')
404  jobinfo.terminate()
405  thread.join()
406  rc = q.get()
407  jobout = convert_str(q.get())
408  joberr = convert_str(q.get())
409  if rc != 0:
410  for var in list(save_vars.keys()):
411  os.environ[var] = save_vars[var]
412  raise IFDHError(cmd, rc, jobout, joberr)
413 
414  # Restore environment variables.
415 
416  for var in list(save_vars.keys()):
417  os.environ[var] = save_vars[var]
418 
419  # Done.
420 
421  return
422 
423 
424 # Ifdh rm, with timeout.
425 
426 def ifdh_rm(path):
427 
428  # Get proxy.
429 
430  test_proxy()
431 
432  # Make sure environment variables X509_USER_CERT and X509_USER_KEY
433  # are not defined (they confuse ifdh, or rather the underlying tools).
434 
435  save_vars = {}
436  for var in ('X509_USER_CERT', 'X509_USER_KEY'):
437  if var in os.environ:
438  save_vars[var] = os.environ[var]
439  del os.environ[var]
440 
441  # Do delete.
442 
443  cmd = ['ifdh', 'rm', path]
444  jobinfo = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
445 
446  q = queue.Queue()
447  thread = threading.Thread(target=wait_for_subprocess, args=[jobinfo, q])
448  thread.start()
449  thread.join(timeout=60)
450  if thread.is_alive():
451  print('Terminating subprocess.')
452  jobinfo.terminate()
453  thread.join()
454  rc = q.get()
455  jobout = convert_str(q.get())
456  joberr = convert_str(q.get())
457  if rc != 0:
458  for var in list(save_vars.keys()):
459  os.environ[var] = save_vars[var]
460  raise IFDHError(cmd, rc, jobout, joberr)
461 
462  # Restore environment variables.
463 
464  for var in list(save_vars.keys()):
465  os.environ[var] = save_vars[var]
466 
467  # Done.
468 
469  return
470 
471 
472 # Posix copy with timeout.
473 
474 def posix_cp(source, destination):
475 
476  cmd = ['cp', source, destination]
477 
478  # Fork buffer process.
479 
480  buffer_pid = os.fork()
481  if buffer_pid == 0:
482 
483  # In child process.
484  # Launch cp subprocess.
485 
486  jobinfo = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
487 
488  q = queue.Queue()
489  thread = threading.Thread(target=wait_for_subprocess, args=[jobinfo, q])
490  thread.start()
491  thread.join(timeout=600)
492  if thread.is_alive():
493 
494  # Subprocess did not finish (may be hanging and unkillable).
495  # Try to kill the subprocess and exit process.
496  # Unkillable process will become detached.
497 
498  print('Terminating subprocess.')
499  jobinfo.kill()
500  os._exit(1)
501 
502  else:
503 
504  # Subprocess finished normally.
505 
506  rc = q.get()
507  jobout = convert_str(q.get())
508  joberr = convert_str(q.get())
509  os._exit(rc)
510 
511  else:
512 
513  # In parent process.
514  # Wait for buffer subprocess to finish.
515 
516  buffer_result = os.waitpid(buffer_pid, 0)
517  rc = buffer_result[1]/256
518  if rc != 0:
519  raise IFDHError(cmd, rc, '', '')
520 
521  # Done.
522 
523  return
524 
525 
526 # Function to wait for a subprocess to finish and fetch return code,
527 # standard output, and standard error.
528 # Call this function like this:
529 #
530 # q = Queue.Queue()
531 # jobinfo = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
532 # wait_for_subprocess(jobinfo, q, input)
533 # rc = q.get() # Return code.
534 # jobout = q.get() # Standard output
535 # joberr = q.get() # Standard error
536 
537 
538 def wait_for_subprocess(jobinfo, q, input=None):
539  jobout, joberr = jobinfo.communicate(input)
540  rc = jobinfo.poll()
541  q.put(rc)
542  q.put(jobout)
543  q.put(joberr)
544  return
545 
546 
547 # Test whether user has a valid kerberos ticket. Raise exception if no.
548 
550  global ticket_ok
551  if not ticket_ok:
552  ok = subprocess.call(['klist', '-s'], stdout=-1, stderr=-1)
553  if ok != 0:
554  raise RuntimeError('Please get a kerberos ticket.')
555  ticket_ok = True
556  return ticket_ok
557 
558 
559 # Get kca certificate.
560 
561 def get_kca():
562 
563  global kca_ok
564  kca_ok = False
565 
566  # First, make sure we have a kerberos ticket.
567 
568  krb_ok = test_ticket()
569  if krb_ok:
570 
571  # Get kca certificate.
572 
573  kca_ok = False
574  try:
575  subprocess.check_call(['kx509'], stdout=-1, stderr=-1)
576  kca_ok = True
577  except:
578  pass
579 
580  # Done
581 
582  return kca_ok
583 
584 
585 # Get grid proxy.
586 # This implementation should be good enough for experiments in the fermilab VO.
587 # Experiments not in the fermilab VO (lbne/dune) should override this function
588 # in experiment_utilities.py.
589 
590 def get_proxy():
591 
592  global proxy_ok
593  proxy_ok = False
594 
595  # Make sure we have a valid certificate.
596 
597  test_kca()
598 
599  # Get proxy using either specified cert+key or default cert.
600 
601  if 'X509_USER_CERT' in os.environ and 'X509_USER_KEY' in os.environ:
602  cmd=['voms-proxy-init',
603  '-rfc',
604  '-cert', os.environ['X509_USER_CERT'],
605  '-key', os.environ['X509_USER_KEY'],
606  '-voms', 'fermilab:/fermilab/%s/Role=%s' % (get_experiment(), get_role())]
607  try:
608  subprocess.check_call(cmd, stdout=-1, stderr=-1)
609  proxy_ok = True
610  except:
611  pass
612  pass
613  else:
614  cmd=['voms-proxy-init',
615  '-noregen',
616  '-rfc',
617  '-voms',
618  'fermilab:/fermilab/%s/Role=%s' % (get_experiment(), get_role())]
619  try:
620  subprocess.check_call(cmd, stdout=-1, stderr=-1)
621  proxy_ok = True
622  except:
623  pass
624 
625  # Done
626 
627  return proxy_ok
628 
629 
630 # Test whether user has a valid kca certificate. If not, try to get a new one.
631 
632 def test_kca():
633  global kca_ok
634  if not kca_ok:
635  try:
636  if 'X509_USER_PROXY' in os.environ:
637  subprocess.check_call(['voms-proxy-info',
638  '-file', os.environ['X509_USER_PROXY'],
639  '-exists'], stdout=-1, stderr=-1)
640  elif 'X509_USER_CERT' in os.environ and 'X509_USER_KEY' in os.environ:
641  subprocess.check_call(['voms-proxy-info',
642  '-file', os.environ['X509_USER_CERT'],
643  '-exists'], stdout=-1, stderr=-1)
644  else:
645  subprocess.check_call(['voms-proxy-info', '-exists'], stdout=-1, stderr=-1)
646 
647  # Workaround jobsub bug by setting environment variable X509_USER_PROXY to
648  # point to the default location of the kca certificate.
649 
650  x509_path = convert_str(subprocess.check_output(['voms-proxy-info', '-path'], stderr=-1))
651  os.environ['X509_USER_PROXY'] = x509_path.strip()
652 
653  kca_ok = True
654  except:
655  pass
656 
657  # If at this point we don't have a kca certificate, try to get one.
658 
659  if not kca_ok:
660  get_kca()
661 
662  # Final checkout.
663 
664  if not kca_ok:
665  try:
666  if 'X509_USER_PROXY' in os.environ:
667  subprocess.check_call(['voms-proxy-info',
668  '-file', os.environ['X509_USER_PROXY'],
669  '-exists'], stdout=-1, stderr=-1)
670  elif 'X509_USER_CERT' in os.environ and 'X509_USER_KEY' in os.environ:
671  subprocess.check_call(['voms-proxy-info',
672  '-file', os.environ['X509_USER_CERT'],
673  '-exists'], stdout=-1, stderr=-1)
674  else:
675  subprocess.check_call(['voms-proxy-info', '-exists'], stdout=-1, stderr=-1)
676  kca_ok = True
677  except:
678  raise RuntimeError('Please get a kca certificate.')
679  return kca_ok
680 
681 
682 # Test whether user has a valid grid proxy. If not, try to get a new one.
683 
685  global proxy_ok
686  if not proxy_ok:
687  try:
688  subprocess.check_call(['voms-proxy-info', '-exists'], stdout=-1, stderr=-1)
689  subprocess.check_call(['voms-proxy-info', '-exists', '-acissuer'], stdout=-1, stderr=-1)
690  proxy_ok = True
691  except:
692  pass
693 
694  # If at this point we don't have a grid proxy, try to get one.
695 
696  if not proxy_ok:
697  get_proxy()
698 
699  # Final checkout.
700 
701  if not proxy_ok:
702  try:
703  subprocess.check_call(['voms-proxy-info', '-exists'], stdout=-1, stderr=-1)
704  subprocess.check_call(['voms-proxy-info', '-exists', '-acissuer'], stdout=-1, stderr=-1)
705  proxy_ok = True
706  except:
707  raise RuntimeError('Please get a grid proxy.')
708  return proxy_ok
709 
710 # Test whether jobsub_client has been set up.
711 
713  global jobsub_ok
714  if not jobsub_ok:
715 
716  # Look for command jobsub_submit on execution path.
717 
718  try:
719  jobinfo = subprocess.Popen(['which', 'jobsub_submit'],
720  stdout=subprocess.PIPE,
721  stderr=subprocess.PIPE)
722  jobout, joberr = jobinfo.communicate()
723  jobout = convert_str(jobout)
724  joberr = convert_str(joberr)
725  jobsub_path = jobout.splitlines()[0].strip()
726  if jobsub_path != '':
727  jobsub_ok = True
728  except:
729  pass
730 
731  if not jobsub_ok:
732  print('Please set up jobsub_client')
733  sys.exit(1)
734 
735  return jobsub_ok
736 
737 # Return dCache server.
738 
740  return "fndca1.fnal.gov"
741 
742 
743 # Convert a local pnfs path to the path on the dCache server.
744 # Return the input path unchanged if it isn't on dCache.
745 
746 def dcache_path(path):
747  if path.startswith('/pnfs/') and not path.startswith('/pnfs/fnal.gov/usr/'):
748  return '/pnfs/fnal.gov/usr/' + path[6:]
749 
750 
751 # Return xrootd server and port.
752 
754  return dcache_server() + ':1094'
755 
756 
757 # Convert a pnfs path to xrootd uri.
758 # Return the input path unchanged if it isn't on dCache.
759 
760 def xrootd_uri(path):
761  if path.startswith('/pnfs/'):
762  return 'root://' + xrootd_server_port() + dcache_path(path)
763  else:
764  return path
765 
766 
767 # Convert a pnfs path to gridftp uri.
768 # Return the input path unchanged if it isn't on dCache.
769 
770 def gridftp_uri(path):
771  if path.startswith('/pnfs/'):
772  return 'gsiftp://' + dcache_server() + dcache_path(path)
773  else:
774  return path
775 
776 
777 # Convert a pnfs path to srm uri.
778 # Return the input path unchanged if it isn't on dCache.
779 
780 def srm_uri(path):
781  if path.startswith('/pnfs/'):
782  return 'srm://fndca1.fnal.gov:8443/srm/managerv2?SFN=/pnfs/fnal.gov/usr/' + path[6:]
783  else:
784  return path
785 
786 
787 # Return the name of a computer with login access that has the /pnfs
788 # filesystem nfs-mounted. This function makes use of the $EXPERIMENT
789 # environment variable (as does ifdh), which must be set.
790 
792  return '%sgpvm01.fnal.gov' % os.environ['EXPERIMENT']
793 
794 
795 # Parse the ten-character file mode string as returned by "ls -l"
796 # and return mode bit masek.
797 
798 def parse_mode(mode_str):
799 
800  mode = 0
801 
802  # File type.
803 
804  if mode_str[0] == 'b':
805  mode += stat.S_IFBLK
806  elif mode_str[0] == 'c':
807  mode += stat.S_IFCHR
808  elif mode_str[0] == 'd':
809  mode += stat.S_IFDIR
810  elif mode_str[0] == 'l':
811  mode += stat.S_IFLNK
812  elif mode_str[0] == 'p':
813  mode += stat.S_IFIFO
814  elif mode_str[0] == 's':
815  mode += stat.S_IFSOCK
816  elif mode_str[0] == '-':
817  mode += stat.S_IFREG
818 
819  # File permissions.
820 
821  # User triad (includes setuid).
822 
823  if mode_str[1] == 'r':
824  mode += stat.S_IRUSR
825  if mode_str[2] == 'w':
826  mode += stat.S_IWUSR
827  if mode_str[3] == 'x':
828  mode += stat.S_IXUSR
829  elif mode_str[3] == 's':
830  mode += stat.S_ISUID
831  mode += stat.S_IXUSR
832  elif mode_str[3] == 'S':
833  mode += stat.S_ISUID
834 
835  # Group triad (includes setgid).
836 
837  if mode_str[4] == 'r':
838  mode += stat.S_IRGRP
839  if mode_str[5] == 'w':
840  mode += stat.S_IWGRP
841  if mode_str[6] == 'x':
842  mode += stat.S_IXGRP
843  elif mode_str[6] == 's':
844  mode += stat.S_ISGID
845  mode += stat.S_IXGRP
846  elif mode_str[6] == 'S':
847  mode += stat.S_ISGID
848 
849  # World triad (includes sticky bit).
850 
851  if mode_str[7] == 'r':
852  mode += stat.S_IROTH
853  if mode_str[8] == 'w':
854  mode += stat.S_IWOTH
855  if mode_str[9] == 'x':
856  mode += stat.S_IXOTH
857  elif mode_str[9] == 't':
858  mode += stat.S_ISVTX
859  mode += stat.S_IXOTH
860  elif mode_str[9] == 'T':
861  mode += stat.S_ISVTX
862 
863  # Done
864 
865  return mode
866 
867 # Function to return the current experiment.
868 # The following places for obtaining this information are
869 # tried (in order):
870 #
871 # 1. Environment variable $EXPERIMENT.
872 # 2. Environment variable $SAM_EXPERIMENT.
873 # 3. Hostname (up to "gpvm").
874 #
875 # Raise an exception if none of the above methods works.
876 #
877 
879 
880  exp = ''
881  for ev in ('EXPERIMENT', 'SAM_EXPERIMENT'):
882  if ev in os.environ:
883  exp = os.environ[ev]
884  break
885 
886  if not exp:
887  hostname = socket.gethostname()
888  n = hostname.find('gpvm')
889  if n > 0:
890  exp = hostname[:n]
891 
892  if not exp:
893  raise RuntimeError('Unable to determine experiment.')
894 
895  return exp
896 
897 
898 # Get role (normally 'Analysis' or 'Production').
899 
900 def get_role():
901 
902  # If environment variable ROLE is defined, use that. Otherwise, make
903  # an educated guess based on user name.
904 
905  result = 'Analysis' # Default role.
906 
907  # Check environment variable $ROLE.
908 
909  if 'ROLE' in os.environ:
910  result = os.environ['ROLE']
911 
912  # Otherwise, check user.
913 
914  else:
915  prouser = get_experiment() + 'pro'
916  user = getpass.getuser()
917  if user == prouser:
918  result = 'Production'
919 
920  return result
921 
922 
923 # Function to return a comma-separated list of run-time top level ups products.
924 
926  return get_experiment() + 'code'
927 
928 
929 # Function to return path of experiment bash setup script that is valid
930 # on the node where this script is being executed.
931 # This function should be overridden in <experiment>_utilities.py.
932 
934  raise RuntimeError('Function get_setup_script_path not implemented.')
935 
936 
937 # Function to return dimension string for project, stage.
938 # This function should be overridden in experiment_utilities.py
939 
940 def dimensions(project, stage, ana=False):
941  raise RuntimeError('Function dimensions not implemented.')
942 
943 
944 # Function to return dimension string for project, stage, including data stream.
945 
946 def dimensions_datastream(project, stage, ana=False, index=0):
947 
948  # Default same as no data stream.
949 
950  dim = dimensions(project, stage, ana=ana)
951 
952  # Append data stream dimension, if appropriate.
953 
954  if ana:
955  if stage.ana_data_stream != None and len(stage.ana_data_stream) > 0:
956  dim1 = '( data_stream %s and %s )' % (stage.ana_data_stream[index], dim)
957  dim = dim1
958  else:
959  if stage.data_stream != None and len(stage.data_stream) > 0:
960  dim1 = '( data_stream %s and %s )' % (stage.data_stream[index], dim)
961  dim = dim1
962 
963  # Done.
964 
965  return dim
966 
967 
968 # Function to return the production user name
969 
971  return get_experiment() + 'pro'
972 
973 
974 # Function to return the fictitious disk server node
975 # name used by sam for bluearc disks.
976 
978  return get_experiment() + 'data:'
979 
980 
981 # Function to return the fictitious disk server node
982 # name used by sam for dCache disks.
983 
985  return 'fnal-dcache:'
986 
987 
988 # Function to determine dropbox directory based on sam metadata.
989 # Raise an exception if the specified file doesn't have metadata.
990 # This function should be overridden in <experiment>_utilities module.
991 
992 def get_dropbox(filename):
993  raise RuntimeError('Function get_dropbox not implemented.')
994 
995 
996 # Function to return string containing sam metadata in the form
997 # of an fcl configuraiton. It is intended that this function
998 # may be overridden in experiment_utilities.py.
999 
1000 def get_sam_metadata(project, stage):
1001  result = ''
1002  return result
1003 
1004 
1005 # Get authenticated user (from kerberos ticket, not $USER).
1006 
1007 def get_user():
1008 
1009  # See if we have a cached value for user.
1010 
1011  global kca_user
1012  if kca_user != '':
1013  return kca_user
1014 
1015  # Return production user name if Role is Production
1016 
1017  if get_role() == 'Production':
1018  return get_prouser()
1019 
1020  else:
1021 
1022  # First make sure we have a kca certificate (raise exception if not).
1023 
1024  test_kca()
1025 
1026  # Return user name from certificate if Role is Analysis
1027 
1028  subject = ''
1029  if 'X509_USER_PROXY' in os.environ:
1030  subject = convert_str(subprocess.check_output(['voms-proxy-info',
1031  '-file', os.environ['X509_USER_PROXY'],
1032  '-subject'], stderr=-1))
1033  elif 'X509_USER_CERT' in os.environ and 'X509_USER_KEY' in os.environ:
1034  subject = convert_str(subprocess.check_output(['voms-proxy-info',
1035  '-file', os.environ['X509_USER_CERT'],
1036  '-subject'], stderr=-1))
1037  else:
1038  subject = convert_str(subprocess.check_output(['voms-proxy-info', '-subject'],
1039  stderr=-1))
1040 
1041  # Get the last non-numeric CN
1042 
1043  cn = ''
1044  while cn == '':
1045  n = subject.rfind('/CN=')
1046  if n >= 0:
1047  cn = subject[n+4:]
1048  if cn.strip().isdigit():
1049  cn = ''
1050  subject = subject[:n]
1051  else:
1052  break
1053 
1054  # Truncate everything after the first '/'.
1055 
1056  n = cn.find('/')
1057  if n >= 0:
1058  cn = cn[:n]
1059 
1060  # Truncate everything after the first newline.
1061 
1062  n = cn.find('\n')
1063  if n >= 0:
1064  cn = cn[:n]
1065 
1066  # Truncate everything before the first ":" (UID:).
1067 
1068  n = cn.find(':')
1069  if n >= 0:
1070  cn = cn[n+1:]
1071 
1072  # Done (maybe).
1073 
1074  if cn != '':
1075  return cn
1076 
1077  # Something went wrong...
1078 
1079  raise RuntimeError('Unable to determine authenticated user.')
1080 
1081 
1082 # Function to check whether there is a running project.py process on this node
1083 # with the specified xml file and stage.
1084 #
1085 # This function works by checking the contents of /proc. Each process is checked
1086 # for the following properties.
1087 #
1088 # 1. Owned by same uid as this process.
1089 # 2. Command line.
1090 # a) project.py
1091 # b) Matching --xml option (exact match).
1092 # c) Matching --stage option (exact match).
1093 # d) --submit or --makeup option.
1094 #
1095 # Arguments xml and stage should be strings, and must match exactly command
1096 # line arguments.
1097 
1098 def check_running(xmlname, stagename):
1099 
1100  result = 0
1101 
1102  # Look over pids in /proc.
1103 
1104  for pid in os.listdir('/proc'):
1105  if pid.isdigit() and int(pid) != os.getpid():
1106  procfile = os.path.join('/proc', pid)
1107  try:
1108  pstat = os.stat(procfile)
1109 
1110  # Only look at processes that match this process uid.
1111 
1112  if pstat.st_uid == os.getuid():
1113 
1114  # Get command line.
1115 
1116  cmdfile = os.path.join('/proc', pid, 'cmdline')
1117  cmd = open(cmdfile).read()
1118  words = cmd.split('\0')
1119 
1120  # Check options.
1121 
1122  project = 0
1123  xml = 0
1124  stage = 0
1125  xmlmatch = 0
1126  stagematch = 0
1127  submit = 0
1128  makeup = 0
1129 
1130  for word in words:
1131 
1132  # Check command.
1133 
1134  if word.endswith('project.py'):
1135  project = 1
1136 
1137  # Check arguments.
1138 
1139  if xml == 1 and word == xmlname:
1140  xmlmatch = 1
1141  elif stage == 1 and word == stagename:
1142  stagematch = 1
1143 
1144  xml = 0
1145  stage = 0
1146 
1147  # Check options.
1148 
1149  if word == '--xml':
1150  xml = 1
1151  elif word == '--stage':
1152  stage = 1
1153  elif word == '--submit':
1154  submit = 1
1155  elif word == '--makeup':
1156  makeup = 1
1157 
1158  if project != 0 and submit+makeup != 0 and xmlmatch != 0 and stagematch != 0:
1159  result = 1
1160  break
1161 
1162  except:
1163  pass
1164 
1165  # Done.
1166 
1167  return result
1168 
1169 
1170 # Convert bytes or unicode string to default python str type.
1171 # Works on python 2 and python 3.
1172 
1174 
1175  result = ''
1176 
1177  if type(s) == type(''):
1178 
1179  # Already a default str.
1180  # Just return the original.
1181 
1182  result = s
1183 
1184  elif type(s) == type(u''):
1185 
1186  # Unicode and not str.
1187  # Convert to bytes.
1188 
1189  result = s.encode()
1190 
1191  elif type(s) == type(b''):
1192 
1193  # Bytes and not str.
1194  # Convert to unicode.
1195 
1196  result = s.decode()
1197 
1198  else:
1199 
1200  # Last resort, use standard str conversion.
1201 
1202  result = str(s)
1203 
1204  return result
1205 
1206 
1207 # Convert bytes or unicode string to bytes.
1208 # Works on python 2 and python 3.
1209 
1211 
1212  result = ''
1213 
1214  if type(s) == type(b''):
1215 
1216  # Already bytes.
1217  # Return the original.
1218 
1219  result = s
1220 
1221  elif type(s) == type(u''):
1222 
1223  # Unicode to bytes.
1224 
1225  result = s.encode()
1226 
1227  else:
1228 
1229  # Anything else, just return the original.
1230 
1231  result = s
1232 
1233  return result
1234 
1235 
1236 # Import experiment-specific utilities. In this imported module, one can
1237 # override any function or symbol defined above, or add new ones.
1238 
1239 from experiment_utilities import *
def get_sam_metadata(project, stage)
def dimensions(project, stage, ana=False)
def check_running(xmlname, stagename)
int open(const char *, int)
Opens a file descriptor.
def ifdh_cp(source, destination)
def dimensions_datastream(project, stage, ana=False, index=0)
def posix_cp(source, destination)
def wait_for_subprocess(jobinfo, q, input=None)
int read(int, char *, size_t)
Read bytes from a file descriptor.
static QCString str