Functions | Variables
python.larbatch_utilities Namespace Reference

Functions

def ifdh_cp (source, destination)
 
def ifdh_ls (path, depth)
 
def ifdh_ll (path, depth)
 
def ifdh_mkdir (path)
 
def ifdh_rmdir (path)
 
def ifdh_chmod (path, mode)
 
def ifdh_mv (src, dest)
 
def ifdh_rm (path)
 
def posix_cp (source, destination)
 
def wait_for_subprocess (jobinfo, q, input=None)
 
def test_ticket ()
 
def get_kca ()
 
def get_proxy ()
 
def test_kca ()
 
def test_proxy ()
 
def test_jobsub ()
 
def dcache_server ()
 
def dcache_path (path)
 
def xrootd_server_port ()
 
def xrootd_uri (path)
 
def gridftp_uri (path)
 
def srm_uri (path)
 
def nfs_server ()
 
def parse_mode (mode_str)
 
def get_experiment ()
 
def get_role ()
 
def get_ups_products ()
 
def get_setup_script_path ()
 
def dimensions (project, stage, ana=False)
 
def dimensions_datastream (project, stage, ana=False, index=0)
 
def get_prouser ()
 
def get_bluearc_server ()
 
def get_dcache_server ()
 
def get_dropbox (filename)
 
def get_sam_metadata (project, stage)
 
def get_user ()
 
def check_running (xmlname, stagename)
 
def convert_str (s)
 
def convert_bytes (s)
 

Variables

bool ticket_ok = False
 
bool kca_ok = False
 
bool proxy_ok = False
 
string kca_user = ''
 
bool jobsub_ok = False
 

Function Documentation

def python.larbatch_utilities.check_running (   xmlname,
  stagename 
)

Definition at line 1098 of file larbatch_utilities.py.

1098 def check_running(xmlname, stagename):
1099 
1100  result = 0
1101 
1102  # Look over pids in /proc.
1103 
1104  for pid in os.listdir('/proc'):
1105  if pid.isdigit() and int(pid) != os.getpid():
1106  procfile = os.path.join('/proc', pid)
1107  try:
1108  pstat = os.stat(procfile)
1109 
1110  # Only look at processes that match this process uid.
1111 
1112  if pstat.st_uid == os.getuid():
1113 
1114  # Get command line.
1115 
1116  cmdfile = os.path.join('/proc', pid, 'cmdline')
1117  cmd = open(cmdfile).read()
1118  words = cmd.split('\0')
1119 
1120  # Check options.
1121 
1122  project = 0
1123  xml = 0
1124  stage = 0
1125  xmlmatch = 0
1126  stagematch = 0
1127  submit = 0
1128  makeup = 0
1129 
1130  for word in words:
1131 
1132  # Check command.
1133 
1134  if word.endswith('project.py'):
1135  project = 1
1136 
1137  # Check arguments.
1138 
1139  if xml == 1 and word == xmlname:
1140  xmlmatch = 1
1141  elif stage == 1 and word == stagename:
1142  stagematch = 1
1143 
1144  xml = 0
1145  stage = 0
1146 
1147  # Check options.
1148 
1149  if word == '--xml':
1150  xml = 1
1151  elif word == '--stage':
1152  stage = 1
1153  elif word == '--submit':
1154  submit = 1
1155  elif word == '--makeup':
1156  makeup = 1
1157 
1158  if project != 0 and submit+makeup != 0 and xmlmatch != 0 and stagematch != 0:
1159  result = 1
1160  break
1161 
1162  except:
1163  pass
1164 
1165  # Done.
1166 
1167  return result
1168 
1169 
1170 # Convert bytes or unicode string to default python str type.
1171 # Works on python 2 and python 3.
1172 
def check_running(xmlname, stagename)
int open(const char *, int)
Opens a file descriptor.
int read(int, char *, size_t)
Read bytes from a file descriptor.
def python.larbatch_utilities.convert_bytes (   s)

Definition at line 1210 of file larbatch_utilities.py.

1211 
1212  result = ''
1213 
1214  if type(s) == type(b''):
1215 
1216  # Already bytes.
1217  # Return the original.
1218 
1219  result = s
1220 
1221  elif type(s) == type(u''):
1222 
1223  # Unicode to bytes.
1224 
1225  result = s.encode()
1226 
1227  else:
1228 
1229  # Anything else, just return the original.
1230 
1231  result = s
1232 
1233  return result
1234 
1235 
1236 # Import experiment-specific utilities. In this imported module, one can
1237 # override any function or symbol defined above, or add new ones.
1238 
def python.larbatch_utilities.convert_str (   s)

Definition at line 1173 of file larbatch_utilities.py.

1174 
1175  result = ''
1176 
1177  if type(s) == type(''):
1178 
1179  # Already a default str.
1180  # Just return the original.
1181 
1182  result = s
1183 
1184  elif type(s) == type(u''):
1185 
1186  # Unicode and not str.
1187  # Convert to bytes.
1188 
1189  result = s.encode()
1190 
1191  elif type(s) == type(b''):
1192 
1193  # Bytes and not str.
1194  # Convert to unicode.
1195 
1196  result = s.decode()
1197 
1198  else:
1199 
1200  # Last resort, use standard str conversion.
1201 
1202  result = str(s)
1203 
1204  return result
1205 
1206 
1207 # Convert bytes or unicode string to bytes.
1208 # Works on python 2 and python 3.
1209 
static QCString str
def python.larbatch_utilities.dcache_path (   path)

Definition at line 746 of file larbatch_utilities.py.

746 def dcache_path(path):
747  if path.startswith('/pnfs/') and not path.startswith('/pnfs/fnal.gov/usr/'):
748  return '/pnfs/fnal.gov/usr/' + path[6:]
749 
750 
751 # Return xrootd server and port.
752 
def python.larbatch_utilities.dcache_server ( )

Definition at line 739 of file larbatch_utilities.py.

740  return "fndca1.fnal.gov"
741 
742 
743 # Convert a local pnfs path to the path on the dCache server.
744 # Return the input path unchanged if it isn't on dCache.
745 
def python.larbatch_utilities.dimensions (   project,
  stage,
  ana = False 
)

Definition at line 940 of file larbatch_utilities.py.

940 def dimensions(project, stage, ana=False):
941  raise RuntimeError('Function dimensions not implemented.')
942 
943 
944 # Function to return dimension string for project, stage, including data stream.
945 
def dimensions(project, stage, ana=False)
def python.larbatch_utilities.dimensions_datastream (   project,
  stage,
  ana = False,
  index = 0 
)

Definition at line 946 of file larbatch_utilities.py.

946 def dimensions_datastream(project, stage, ana=False, index=0):
947 
948  # Default same as no data stream.
949 
950  dim = dimensions(project, stage, ana=ana)
951 
952  # Append data stream dimension, if appropriate.
953 
954  if ana:
955  if stage.ana_data_stream != None and len(stage.ana_data_stream) > 0:
956  dim1 = '( data_stream %s and %s )' % (stage.ana_data_stream[index], dim)
957  dim = dim1
958  else:
959  if stage.data_stream != None and len(stage.data_stream) > 0:
960  dim1 = '( data_stream %s and %s )' % (stage.data_stream[index], dim)
961  dim = dim1
962 
963  # Done.
964 
965  return dim
966 
967 
968 # Function to return the production user name
969 
def dimensions(project, stage, ana=False)
def dimensions_datastream(project, stage, ana=False, index=0)
def python.larbatch_utilities.get_bluearc_server ( )

Definition at line 977 of file larbatch_utilities.py.

978  return get_experiment() + 'data:'
979 
980 
981 # Function to return the fictitious disk server node
982 # name used by sam for dCache disks.
983 
def python.larbatch_utilities.get_dcache_server ( )

Definition at line 984 of file larbatch_utilities.py.

985  return 'fnal-dcache:'
986 
987 
988 # Function to determine dropbox directory based on sam metadata.
989 # Raise an exception if the specified file doesn't have metadata.
990 # This function should be overridden in <experiment>_utilities module.
991 
def python.larbatch_utilities.get_dropbox (   filename)

Definition at line 992 of file larbatch_utilities.py.

992 def get_dropbox(filename):
993  raise RuntimeError('Function get_dropbox not implemented.')
994 
995 
996 # Function to return string containing sam metadata in the form
997 # of an fcl configuraiton. It is intended that this function
998 # may be overridden in experiment_utilities.py.
999 
def python.larbatch_utilities.get_experiment ( )

Definition at line 878 of file larbatch_utilities.py.

879 
880  exp = ''
881  for ev in ('EXPERIMENT', 'SAM_EXPERIMENT'):
882  if ev in os.environ:
883  exp = os.environ[ev]
884  break
885 
886  if not exp:
887  hostname = socket.gethostname()
888  n = hostname.find('gpvm')
889  if n > 0:
890  exp = hostname[:n]
891 
892  if not exp:
893  raise RuntimeError('Unable to determine experiment.')
894 
895  return exp
896 
897 
898 # Get role (normally 'Analysis' or 'Production').
899 
def python.larbatch_utilities.get_kca ( )

Definition at line 561 of file larbatch_utilities.py.

561 def get_kca():
562 
563  global kca_ok
564  kca_ok = False
565 
566  # First, make sure we have a kerberos ticket.
567 
568  krb_ok = test_ticket()
569  if krb_ok:
570 
571  # Get kca certificate.
572 
573  kca_ok = False
574  try:
575  subprocess.check_call(['kx509'], stdout=-1, stderr=-1)
576  kca_ok = True
577  except:
578  pass
579 
580  # Done
581 
582  return kca_ok
583 
584 
585 # Get grid proxy.
586 # This implementation should be good enough for experiments in the fermilab VO.
587 # Experiments not in the fermilab VO (lbne/dune) should override this function
588 # in experiment_utilities.py.
589 
def python.larbatch_utilities.get_prouser ( )

Definition at line 970 of file larbatch_utilities.py.

971  return get_experiment() + 'pro'
972 
973 
974 # Function to return the fictitious disk server node
975 # name used by sam for bluearc disks.
976 
def python.larbatch_utilities.get_proxy ( )

Definition at line 590 of file larbatch_utilities.py.

590 def get_proxy():
591 
592  global proxy_ok
593  proxy_ok = False
594 
595  # Make sure we have a valid certificate.
596 
597  test_kca()
598 
599  # Get proxy using either specified cert+key or default cert.
600 
601  if 'X509_USER_CERT' in os.environ and 'X509_USER_KEY' in os.environ:
602  cmd=['voms-proxy-init',
603  '-rfc',
604  '-cert', os.environ['X509_USER_CERT'],
605  '-key', os.environ['X509_USER_KEY'],
606  '-voms', 'fermilab:/fermilab/%s/Role=%s' % (get_experiment(), get_role())]
607  try:
608  subprocess.check_call(cmd, stdout=-1, stderr=-1)
609  proxy_ok = True
610  except:
611  pass
612  pass
613  else:
614  cmd=['voms-proxy-init',
615  '-noregen',
616  '-rfc',
617  '-voms',
618  'fermilab:/fermilab/%s/Role=%s' % (get_experiment(), get_role())]
619  try:
620  subprocess.check_call(cmd, stdout=-1, stderr=-1)
621  proxy_ok = True
622  except:
623  pass
624 
625  # Done
626 
627  return proxy_ok
628 
629 
630 # Test whether user has a valid kca certificate. If not, try to get a new one.
631 
def python.larbatch_utilities.get_role ( )

Definition at line 900 of file larbatch_utilities.py.

900 def get_role():
901 
902  # If environment variable ROLE is defined, use that. Otherwise, make
903  # an educated guess based on user name.
904 
905  result = 'Analysis' # Default role.
906 
907  # Check environment variable $ROLE.
908 
909  if 'ROLE' in os.environ:
910  result = os.environ['ROLE']
911 
912  # Otherwise, check user.
913 
914  else:
915  prouser = get_experiment() + 'pro'
916  user = getpass.getuser()
917  if user == prouser:
918  result = 'Production'
919 
920  return result
921 
922 
923 # Function to return a comma-separated list of run-time top level ups products.
924 
def python.larbatch_utilities.get_sam_metadata (   project,
  stage 
)

Definition at line 1000 of file larbatch_utilities.py.

1000 def get_sam_metadata(project, stage):
1001  result = ''
1002  return result
1003 
1004 
1005 # Get authenticated user (from kerberos ticket, not $USER).
1006 
def get_sam_metadata(project, stage)
def python.larbatch_utilities.get_setup_script_path ( )

Definition at line 933 of file larbatch_utilities.py.

934  raise RuntimeError('Function get_setup_script_path not implemented.')
935 
936 
937 # Function to return dimension string for project, stage.
938 # This function should be overridden in experiment_utilities.py
939 
def python.larbatch_utilities.get_ups_products ( )

Definition at line 925 of file larbatch_utilities.py.

926  return get_experiment() + 'code'
927 
928 
929 # Function to return path of experiment bash setup script that is valid
930 # on the node where this script is being executed.
931 # This function should be overridden in <experiment>_utilities.py.
932 
def python.larbatch_utilities.get_user ( )

Definition at line 1007 of file larbatch_utilities.py.

1007 def get_user():
1008 
1009  # See if we have a cached value for user.
1010 
1011  global kca_user
1012  if kca_user != '':
1013  return kca_user
1014 
1015  # Return production user name if Role is Production
1016 
1017  if get_role() == 'Production':
1018  return get_prouser()
1019 
1020  else:
1021 
1022  # First make sure we have a kca certificate (raise exception if not).
1023 
1024  test_kca()
1025 
1026  # Return user name from certificate if Role is Analysis
1027 
1028  subject = ''
1029  if 'X509_USER_PROXY' in os.environ:
1030  subject = convert_str(subprocess.check_output(['voms-proxy-info',
1031  '-file', os.environ['X509_USER_PROXY'],
1032  '-subject'], stderr=-1))
1033  elif 'X509_USER_CERT' in os.environ and 'X509_USER_KEY' in os.environ:
1034  subject = convert_str(subprocess.check_output(['voms-proxy-info',
1035  '-file', os.environ['X509_USER_CERT'],
1036  '-subject'], stderr=-1))
1037  else:
1038  subject = convert_str(subprocess.check_output(['voms-proxy-info', '-subject'],
1039  stderr=-1))
1040 
1041  # Get the last non-numeric CN
1042 
1043  cn = ''
1044  while cn == '':
1045  n = subject.rfind('/CN=')
1046  if n >= 0:
1047  cn = subject[n+4:]
1048  if cn.strip().isdigit():
1049  cn = ''
1050  subject = subject[:n]
1051  else:
1052  break
1053 
1054  # Truncate everything after the first '/'.
1055 
1056  n = cn.find('/')
1057  if n >= 0:
1058  cn = cn[:n]
1059 
1060  # Truncate everything after the first newline.
1061 
1062  n = cn.find('\n')
1063  if n >= 0:
1064  cn = cn[:n]
1065 
1066  # Truncate everything before the first ":" (UID:).
1067 
1068  n = cn.find(':')
1069  if n >= 0:
1070  cn = cn[n+1:]
1071 
1072  # Done (maybe).
1073 
1074  if cn != '':
1075  return cn
1076 
1077  # Something went wrong...
1078 
1079  raise RuntimeError('Unable to determine authenticated user.')
1080 
1081 
1082 # Function to check whether there is a running project.py process on this node
1083 # with the specified xml file and stage.
1084 #
1085 # This function works by checking the contents of /proc. Each process is checked
1086 # for the following properties.
1087 #
1088 # 1. Owned by same uid as this process.
1089 # 2. Command line.
1090 # a) project.py
1091 # b) Matching --xml option (exact match).
1092 # c) Matching --stage option (exact match).
1093 # d) --submit or --makeup option.
1094 #
1095 # Arguments xml and stage should be strings, and must match exactly command
1096 # line arguments.
1097 
def python.larbatch_utilities.gridftp_uri (   path)

Definition at line 770 of file larbatch_utilities.py.

770 def gridftp_uri(path):
771  if path.startswith('/pnfs/'):
772  return 'gsiftp://' + dcache_server() + dcache_path(path)
773  else:
774  return path
775 
776 
777 # Convert a pnfs path to srm uri.
778 # Return the input path unchanged if it isn't on dCache.
779 
def python.larbatch_utilities.ifdh_chmod (   path,
  mode 
)

Definition at line 330 of file larbatch_utilities.py.

330 def ifdh_chmod(path, mode):
331 
332  # Get proxy.
333 
334  test_proxy()
335 
336  # Make sure environment variables X509_USER_CERT and X509_USER_KEY
337  # are not defined (they confuse ifdh, or rather the underlying tools).
338 
339  save_vars = {}
340  for var in ('X509_USER_CERT', 'X509_USER_KEY'):
341  if var in os.environ:
342  save_vars[var] = os.environ[var]
343  del os.environ[var]
344 
345  # Do chmod.
346 
347  cmd = ['ifdh', 'chmod', '%o' % mode, path]
348  jobinfo = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
349 
350  q = queue.Queue()
351  thread = threading.Thread(target=wait_for_subprocess, args=[jobinfo, q])
352  thread.start()
353  thread.join(timeout=60)
354  if thread.is_alive():
355  print('Terminating subprocess.')
356  jobinfo.terminate()
357  thread.join()
358  rc = q.get()
359  jobout = convert_str(q.get())
360  joberr = convert_str(q.get())
361  if rc != 0:
362  for var in list(save_vars.keys()):
363  os.environ[var] = save_vars[var]
364  raise IFDHError(cmd, rc, jobout, joberr)
365 
366  # Restore environment variables.
367 
368  for var in list(save_vars.keys()):
369  os.environ[var] = save_vars[var]
370 
371  # Done.
372 
373  return
374 
375 
376 # Ifdh mv, with timeout.
377 
def python.larbatch_utilities.ifdh_cp (   source,
  destination 
)

Definition at line 92 of file larbatch_utilities.py.

92 def ifdh_cp(source, destination):
93 
94  # Get proxy.
95 
96  test_proxy()
97 
98  # Make sure environment variables X509_USER_CERT and X509_USER_KEY
99  # are not defined (they confuse ifdh, or rather the underlying tools).
100 
101  save_vars = {}
102  for var in ('X509_USER_CERT', 'X509_USER_KEY'):
103  if var in os.environ:
104  save_vars[var] = os.environ[var]
105  del os.environ[var]
106 
107  # Do copy.
108 
109  cmd = ['ifdh', 'cp', source, destination]
110  jobinfo = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
111 
112  q = queue.Queue()
113  thread = threading.Thread(target=wait_for_subprocess, args=[jobinfo, q])
114  thread.start()
115  thread.join(timeout=31000000)
116  if thread.is_alive():
117  print('Terminating subprocess.')
118  jobinfo.terminate()
119  thread.join()
120  rc = q.get()
121  jobout = convert_str(q.get())
122  joberr = convert_str(q.get())
123  if rc != 0:
124  for var in list(save_vars.keys()):
125  os.environ[var] = save_vars[var]
126  raise IFDHError(cmd, rc, jobout, joberr)
127 
128  # Restore environment variables.
129 
130  for var in list(save_vars.keys()):
131  os.environ[var] = save_vars[var]
132 
133 
134 # Ifdh ls, with timeout.
135 # Return value is list of lines returned by "ifdh ls" command.
136 
def ifdh_cp(source, destination)
def python.larbatch_utilities.ifdh_ll (   path,
  depth 
)

Definition at line 186 of file larbatch_utilities.py.

186 def ifdh_ll(path, depth):
187 
188  # Get proxy.
189 
190  test_proxy()
191 
192  # Make sure environment variables X509_USER_CERT and X509_USER_KEY
193  # are not defined (they confuse ifdh, or rather the underlying tools).
194 
195  save_vars = {}
196  for var in ('X509_USER_CERT', 'X509_USER_KEY'):
197  if var in os.environ:
198  save_vars[var] = os.environ[var]
199  del os.environ[var]
200 
201  # Do listing.
202 
203  cmd = ['ifdh', 'll', path, '%d' % depth]
204  jobinfo = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
205 
206  q = queue.Queue()
207  thread = threading.Thread(target=wait_for_subprocess, args=[jobinfo, q])
208  thread.start()
209  thread.join(timeout=60)
210  if thread.is_alive():
211  print('Terminating subprocess.')
212  jobinfo.terminate()
213  thread.join()
214  rc = q.get()
215  jobout = convert_str(q.get())
216  joberr = convert_str(q.get())
217  if rc != 0:
218  for var in list(save_vars.keys()):
219  os.environ[var] = save_vars[var]
220  raise IFDHError(cmd, rc, jobout, joberr)
221 
222  # Restore environment variables.
223 
224  for var in list(save_vars.keys()):
225  os.environ[var] = save_vars[var]
226 
227  # Done.
228 
229  return jobout.splitlines()
230 
231 
232 # Ifdh mkdir, with timeout.
233 
def python.larbatch_utilities.ifdh_ls (   path,
  depth 
)

Definition at line 137 of file larbatch_utilities.py.

137 def ifdh_ls(path, depth):
138 
139  # Get proxy.
140 
141  test_proxy()
142 
143  # Make sure environment variables X509_USER_CERT and X509_USER_KEY
144  # are not defined (they confuse ifdh, or rather the underlying tools).
145 
146  save_vars = {}
147  for var in ('X509_USER_CERT', 'X509_USER_KEY'):
148  if var in os.environ:
149  save_vars[var] = os.environ[var]
150  del os.environ[var]
151 
152  # Do listing.
153 
154  cmd = ['ifdh', 'ls', path, '%d' % depth]
155  jobinfo = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
156 
157  q = queue.Queue()
158  thread = threading.Thread(target=wait_for_subprocess, args=[jobinfo, q])
159  thread.start()
160  thread.join(timeout=600)
161  if thread.is_alive():
162  print('Terminating subprocess.')
163  jobinfo.terminate()
164  thread.join()
165  rc = q.get()
166  jobout = convert_str(q.get())
167  joberr = convert_str(q.get())
168  if rc != 0:
169  for var in list(save_vars.keys()):
170  os.environ[var] = save_vars[var]
171  raise IFDHError(cmd, rc, jobout, joberr)
172 
173  # Restore environment variables.
174 
175  for var in list(save_vars.keys()):
176  os.environ[var] = save_vars[var]
177 
178  # Done.
179 
180  return jobout.splitlines()
181 
182 
183 # Ifdh ll, with timeout.
184 # Return value is list of lines returned by "ifdh ls" command.
185 
def python.larbatch_utilities.ifdh_mkdir (   path)

Definition at line 234 of file larbatch_utilities.py.

234 def ifdh_mkdir(path):
235 
236  # Get proxy.
237 
238  test_proxy()
239 
240  # Make sure environment variables X509_USER_CERT and X509_USER_KEY
241  # are not defined (they confuse ifdh, or rather the underlying tools).
242 
243  save_vars = {}
244  for var in ('X509_USER_CERT', 'X509_USER_KEY'):
245  if var in os.environ:
246  save_vars[var] = os.environ[var]
247  del os.environ[var]
248 
249  # Do mkdir.
250 
251  cmd = ['ifdh', 'mkdir', path]
252  jobinfo = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
253 
254  q = queue.Queue()
255  thread = threading.Thread(target=wait_for_subprocess, args=[jobinfo, q])
256  thread.start()
257  thread.join(timeout=60)
258  if thread.is_alive():
259  print('Terminating subprocess.')
260  jobinfo.terminate()
261  thread.join()
262  rc = q.get()
263  jobout = convert_str(q.get())
264  joberr = convert_str(q.get())
265  if rc != 0:
266  for var in list(save_vars.keys()):
267  os.environ[var] = save_vars[var]
268  raise IFDHError(cmd, rc, jobout, joberr)
269 
270  # Restore environment variables.
271 
272  for var in list(save_vars.keys()):
273  os.environ[var] = save_vars[var]
274 
275  # Done.
276 
277  return
278 
279 
280 # Ifdh rmdir, with timeout.
281 
def python.larbatch_utilities.ifdh_mv (   src,
  dest 
)

Definition at line 378 of file larbatch_utilities.py.

378 def ifdh_mv(src, dest):
379 
380  # Get proxy.
381 
382  test_proxy()
383 
384  # Make sure environment variables X509_USER_CERT and X509_USER_KEY
385  # are not defined (they confuse ifdh, or rather the underlying tools).
386 
387  save_vars = {}
388  for var in ('X509_USER_CERT', 'X509_USER_KEY'):
389  if var in os.environ:
390  save_vars[var] = os.environ[var]
391  del os.environ[var]
392 
393  # Do rename.
394 
395  cmd = ['ifdh', 'mv', src, dest]
396  jobinfo = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
397 
398  q = queue.Queue()
399  thread = threading.Thread(target=wait_for_subprocess, args=[jobinfo, q])
400  thread.start()
401  thread.join(timeout=60)
402  if thread.is_alive():
403  print('Terminating subprocess.')
404  jobinfo.terminate()
405  thread.join()
406  rc = q.get()
407  jobout = convert_str(q.get())
408  joberr = convert_str(q.get())
409  if rc != 0:
410  for var in list(save_vars.keys()):
411  os.environ[var] = save_vars[var]
412  raise IFDHError(cmd, rc, jobout, joberr)
413 
414  # Restore environment variables.
415 
416  for var in list(save_vars.keys()):
417  os.environ[var] = save_vars[var]
418 
419  # Done.
420 
421  return
422 
423 
424 # Ifdh rm, with timeout.
425 
def python.larbatch_utilities.ifdh_rm (   path)

Definition at line 426 of file larbatch_utilities.py.

426 def ifdh_rm(path):
427 
428  # Get proxy.
429 
430  test_proxy()
431 
432  # Make sure environment variables X509_USER_CERT and X509_USER_KEY
433  # are not defined (they confuse ifdh, or rather the underlying tools).
434 
435  save_vars = {}
436  for var in ('X509_USER_CERT', 'X509_USER_KEY'):
437  if var in os.environ:
438  save_vars[var] = os.environ[var]
439  del os.environ[var]
440 
441  # Do delete.
442 
443  cmd = ['ifdh', 'rm', path]
444  jobinfo = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
445 
446  q = queue.Queue()
447  thread = threading.Thread(target=wait_for_subprocess, args=[jobinfo, q])
448  thread.start()
449  thread.join(timeout=60)
450  if thread.is_alive():
451  print('Terminating subprocess.')
452  jobinfo.terminate()
453  thread.join()
454  rc = q.get()
455  jobout = convert_str(q.get())
456  joberr = convert_str(q.get())
457  if rc != 0:
458  for var in list(save_vars.keys()):
459  os.environ[var] = save_vars[var]
460  raise IFDHError(cmd, rc, jobout, joberr)
461 
462  # Restore environment variables.
463 
464  for var in list(save_vars.keys()):
465  os.environ[var] = save_vars[var]
466 
467  # Done.
468 
469  return
470 
471 
472 # Posix copy with timeout.
473 
def python.larbatch_utilities.ifdh_rmdir (   path)

Definition at line 282 of file larbatch_utilities.py.

282 def ifdh_rmdir(path):
283 
284  # Get proxy.
285 
286  test_proxy()
287 
288  # Make sure environment variables X509_USER_CERT and X509_USER_KEY
289  # are not defined (they confuse ifdh, or rather the underlying tools).
290 
291  save_vars = {}
292  for var in ('X509_USER_CERT', 'X509_USER_KEY'):
293  if var in os.environ:
294  save_vars[var] = os.environ[var]
295  del os.environ[var]
296 
297  # Do rmdir.
298 
299  cmd = ['ifdh', 'rmdir', path]
300  jobinfo = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
301 
302  q = queue.Queue()
303  thread = threading.Thread(target=wait_for_subprocess, args=[jobinfo, q])
304  thread.start()
305  thread.join(timeout=60)
306  if thread.is_alive():
307  print('Terminating subprocess.')
308  jobinfo.terminate()
309  thread.join()
310  rc = q.get()
311  jobout = convert_str(q.get())
312  joberr = convert_str(q.get())
313  if rc != 0:
314  for var in list(save_vars.keys()):
315  os.environ[var] = save_vars[var]
316  raise IFDHError(cmd, rc, jobout, joberr)
317 
318  # Restore environment variables.
319 
320  for var in list(save_vars.keys()):
321  os.environ[var] = save_vars[var]
322 
323  # Done.
324 
325  return
326 
327 
328 # Ifdh chmod, with timeout.
329 
def python.larbatch_utilities.nfs_server ( )

Definition at line 791 of file larbatch_utilities.py.

792  return '%sgpvm01.fnal.gov' % os.environ['EXPERIMENT']
793 
794 
795 # Parse the ten-character file mode string as returned by "ls -l"
796 # and return mode bit masek.
797 
def python.larbatch_utilities.parse_mode (   mode_str)

Definition at line 798 of file larbatch_utilities.py.

798 def parse_mode(mode_str):
799 
800  mode = 0
801 
802  # File type.
803 
804  if mode_str[0] == 'b':
805  mode += stat.S_IFBLK
806  elif mode_str[0] == 'c':
807  mode += stat.S_IFCHR
808  elif mode_str[0] == 'd':
809  mode += stat.S_IFDIR
810  elif mode_str[0] == 'l':
811  mode += stat.S_IFLNK
812  elif mode_str[0] == 'p':
813  mode += stat.S_IFIFO
814  elif mode_str[0] == 's':
815  mode += stat.S_IFSOCK
816  elif mode_str[0] == '-':
817  mode += stat.S_IFREG
818 
819  # File permissions.
820 
821  # User triad (includes setuid).
822 
823  if mode_str[1] == 'r':
824  mode += stat.S_IRUSR
825  if mode_str[2] == 'w':
826  mode += stat.S_IWUSR
827  if mode_str[3] == 'x':
828  mode += stat.S_IXUSR
829  elif mode_str[3] == 's':
830  mode += stat.S_ISUID
831  mode += stat.S_IXUSR
832  elif mode_str[3] == 'S':
833  mode += stat.S_ISUID
834 
835  # Group triad (includes setgid).
836 
837  if mode_str[4] == 'r':
838  mode += stat.S_IRGRP
839  if mode_str[5] == 'w':
840  mode += stat.S_IWGRP
841  if mode_str[6] == 'x':
842  mode += stat.S_IXGRP
843  elif mode_str[6] == 's':
844  mode += stat.S_ISGID
845  mode += stat.S_IXGRP
846  elif mode_str[6] == 'S':
847  mode += stat.S_ISGID
848 
849  # World triad (includes sticky bit).
850 
851  if mode_str[7] == 'r':
852  mode += stat.S_IROTH
853  if mode_str[8] == 'w':
854  mode += stat.S_IWOTH
855  if mode_str[9] == 'x':
856  mode += stat.S_IXOTH
857  elif mode_str[9] == 't':
858  mode += stat.S_ISVTX
859  mode += stat.S_IXOTH
860  elif mode_str[9] == 'T':
861  mode += stat.S_ISVTX
862 
863  # Done
864 
865  return mode
866 
867 # Function to return the current experiment.
868 # The following places for obtaining this information are
869 # tried (in order):
870 #
871 # 1. Environment variable $EXPERIMENT.
872 # 2. Environment variable $SAM_EXPERIMENT.
873 # 3. Hostname (up to "gpvm").
874 #
875 # Raise an exception if none of the above methods works.
876 #
877 
def python.larbatch_utilities.posix_cp (   source,
  destination 
)

Definition at line 474 of file larbatch_utilities.py.

474 def posix_cp(source, destination):
475 
476  cmd = ['cp', source, destination]
477 
478  # Fork buffer process.
479 
480  buffer_pid = os.fork()
481  if buffer_pid == 0:
482 
483  # In child process.
484  # Launch cp subprocess.
485 
486  jobinfo = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
487 
488  q = queue.Queue()
489  thread = threading.Thread(target=wait_for_subprocess, args=[jobinfo, q])
490  thread.start()
491  thread.join(timeout=600)
492  if thread.is_alive():
493 
494  # Subprocess did not finish (may be hanging and unkillable).
495  # Try to kill the subprocess and exit process.
496  # Unkillable process will become detached.
497 
498  print('Terminating subprocess.')
499  jobinfo.kill()
500  os._exit(1)
501 
502  else:
503 
504  # Subprocess finished normally.
505 
506  rc = q.get()
507  jobout = convert_str(q.get())
508  joberr = convert_str(q.get())
509  os._exit(rc)
510 
511  else:
512 
513  # In parent process.
514  # Wait for buffer subprocess to finish.
515 
516  buffer_result = os.waitpid(buffer_pid, 0)
517  rc = buffer_result[1]/256
518  if rc != 0:
519  raise IFDHError(cmd, rc, '', '')
520 
521  # Done.
522 
523  return
524 
525 
526 # Function to wait for a subprocess to finish and fetch return code,
527 # standard output, and standard error.
528 # Call this function like this:
529 #
530 # q = Queue.Queue()
531 # jobinfo = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
532 # wait_for_subprocess(jobinfo, q, input)
533 # rc = q.get() # Return code.
534 # jobout = q.get() # Standard output
535 # joberr = q.get() # Standard error
536 
537 
def posix_cp(source, destination)
def python.larbatch_utilities.srm_uri (   path)

Definition at line 780 of file larbatch_utilities.py.

780 def srm_uri(path):
781  if path.startswith('/pnfs/'):
782  return 'srm://fndca1.fnal.gov:8443/srm/managerv2?SFN=/pnfs/fnal.gov/usr/' + path[6:]
783  else:
784  return path
785 
786 
787 # Return the name of a computer with login access that has the /pnfs
788 # filesystem nfs-mounted. This function makes use of the $EXPERIMENT
789 # environment variable (as does ifdh), which must be set.
790 
def python.larbatch_utilities.test_jobsub ( )

Definition at line 712 of file larbatch_utilities.py.

713  global jobsub_ok
714  if not jobsub_ok:
715 
716  # Look for command jobsub_submit on execution path.
717 
718  try:
719  jobinfo = subprocess.Popen(['which', 'jobsub_submit'],
720  stdout=subprocess.PIPE,
721  stderr=subprocess.PIPE)
722  jobout, joberr = jobinfo.communicate()
723  jobout = convert_str(jobout)
724  joberr = convert_str(joberr)
725  jobsub_path = jobout.splitlines()[0].strip()
726  if jobsub_path != '':
727  jobsub_ok = True
728  except:
729  pass
730 
731  if not jobsub_ok:
732  print('Please set up jobsub_client')
733  sys.exit(1)
734 
735  return jobsub_ok
736 
737 # Return dCache server.
738 
def python.larbatch_utilities.test_kca ( )

Definition at line 632 of file larbatch_utilities.py.

632 def test_kca():
633  global kca_ok
634  if not kca_ok:
635  try:
636  if 'X509_USER_PROXY' in os.environ:
637  subprocess.check_call(['voms-proxy-info',
638  '-file', os.environ['X509_USER_PROXY'],
639  '-exists'], stdout=-1, stderr=-1)
640  elif 'X509_USER_CERT' in os.environ and 'X509_USER_KEY' in os.environ:
641  subprocess.check_call(['voms-proxy-info',
642  '-file', os.environ['X509_USER_CERT'],
643  '-exists'], stdout=-1, stderr=-1)
644  else:
645  subprocess.check_call(['voms-proxy-info', '-exists'], stdout=-1, stderr=-1)
646 
647  # Workaround jobsub bug by setting environment variable X509_USER_PROXY to
648  # point to the default location of the kca certificate.
649 
650  x509_path = convert_str(subprocess.check_output(['voms-proxy-info', '-path'], stderr=-1))
651  os.environ['X509_USER_PROXY'] = x509_path.strip()
652 
653  kca_ok = True
654  except:
655  pass
656 
657  # If at this point we don't have a kca certificate, try to get one.
658 
659  if not kca_ok:
660  get_kca()
661 
662  # Final checkout.
663 
664  if not kca_ok:
665  try:
666  if 'X509_USER_PROXY' in os.environ:
667  subprocess.check_call(['voms-proxy-info',
668  '-file', os.environ['X509_USER_PROXY'],
669  '-exists'], stdout=-1, stderr=-1)
670  elif 'X509_USER_CERT' in os.environ and 'X509_USER_KEY' in os.environ:
671  subprocess.check_call(['voms-proxy-info',
672  '-file', os.environ['X509_USER_CERT'],
673  '-exists'], stdout=-1, stderr=-1)
674  else:
675  subprocess.check_call(['voms-proxy-info', '-exists'], stdout=-1, stderr=-1)
676  kca_ok = True
677  except:
678  raise RuntimeError('Please get a kca certificate.')
679  return kca_ok
680 
681 
682 # Test whether user has a valid grid proxy. If not, try to get a new one.
683 
def python.larbatch_utilities.test_proxy ( )

Definition at line 684 of file larbatch_utilities.py.

685  global proxy_ok
686  if not proxy_ok:
687  try:
688  subprocess.check_call(['voms-proxy-info', '-exists'], stdout=-1, stderr=-1)
689  subprocess.check_call(['voms-proxy-info', '-exists', '-acissuer'], stdout=-1, stderr=-1)
690  proxy_ok = True
691  except:
692  pass
693 
694  # If at this point we don't have a grid proxy, try to get one.
695 
696  if not proxy_ok:
697  get_proxy()
698 
699  # Final checkout.
700 
701  if not proxy_ok:
702  try:
703  subprocess.check_call(['voms-proxy-info', '-exists'], stdout=-1, stderr=-1)
704  subprocess.check_call(['voms-proxy-info', '-exists', '-acissuer'], stdout=-1, stderr=-1)
705  proxy_ok = True
706  except:
707  raise RuntimeError('Please get a grid proxy.')
708  return proxy_ok
709 
710 # Test whether jobsub_client has been set up.
711 
def python.larbatch_utilities.test_ticket ( )

Definition at line 549 of file larbatch_utilities.py.

550  global ticket_ok
551  if not ticket_ok:
552  ok = subprocess.call(['klist', '-s'], stdout=-1, stderr=-1)
553  if ok != 0:
554  raise RuntimeError('Please get a kerberos ticket.')
555  ticket_ok = True
556  return ticket_ok
557 
558 
559 # Get kca certificate.
560 
def python.larbatch_utilities.wait_for_subprocess (   jobinfo,
  q,
  input = None 
)

Definition at line 538 of file larbatch_utilities.py.

538 def wait_for_subprocess(jobinfo, q, input=None):
539  jobout, joberr = jobinfo.communicate(input)
540  rc = jobinfo.poll()
541  q.put(rc)
542  q.put(jobout)
543  q.put(joberr)
544  return
545 
546 
547 # Test whether user has a valid kerberos ticket. Raise exception if no.
548 
def wait_for_subprocess(jobinfo, q, input=None)
def python.larbatch_utilities.xrootd_server_port ( )

Definition at line 753 of file larbatch_utilities.py.

754  return dcache_server() + ':1094'
755 
756 
757 # Convert a pnfs path to xrootd uri.
758 # Return the input path unchanged if it isn't on dCache.
759 
def python.larbatch_utilities.xrootd_uri (   path)

Definition at line 760 of file larbatch_utilities.py.

760 def xrootd_uri(path):
761  if path.startswith('/pnfs/'):
762  return 'root://' + xrootd_server_port() + dcache_path(path)
763  else:
764  return path
765 
766 
767 # Convert a pnfs path to gridftp uri.
768 # Return the input path unchanged if it isn't on dCache.
769 

Variable Documentation

bool python.larbatch_utilities.jobsub_ok = False

Definition at line 88 of file larbatch_utilities.py.

bool python.larbatch_utilities.kca_ok = False

Definition at line 85 of file larbatch_utilities.py.

string python.larbatch_utilities.kca_user = ''

Definition at line 87 of file larbatch_utilities.py.

bool python.larbatch_utilities.proxy_ok = False

Definition at line 86 of file larbatch_utilities.py.

bool python.larbatch_utilities.ticket_ok = False

Definition at line 84 of file larbatch_utilities.py.