123 from __future__
import absolute_import
124 from __future__
import print_function
126 import stat
as statmod
132 import Queue
as queue
134 import larbatch_utilities
135 from larbatch_utilities
import convert_str
136 from project_modules.ifdherror
import IFDHError
140 pnfs_is_mounted = os.path.isdir(
'/pnfs')
141 prefer_grid =
'LARBATCH_GRID' in os.environ
142 debug =
'LARBATCH_DEBUG' in os.environ
144 print(
'*** Larbatch_posix: Debugging enabled.')
172 if path.startswith(
'/pnfs/'):
179 if path != self.
local_path and (mode.find(
'r') >= 0 or mode.find('a') >= 0):
180 larbatch_utilities.ifdh_cp(path, self.
local_path)
195 if self.
local_file and not self.local_file.closed:
199 self.local_file.close()
207 if self.mode.find(
'w') >= 0
or self.mode.find(
'a') >= 0
or self.mode.find(
'+') >= 0:
218 self.local_file.flush()
223 return self.local_file.fileno()
233 return self.local_file.read(size)
238 return self.local_file.readline(size)
243 return self.local_file.readlines()
248 return self.local_file.tell()
253 self.local_file.truncate()
258 self.local_file.truncate(pos)
263 self.local_file.write(str)
268 self.local_file.writelines(strs)
275 def open(path, mode='r', buf=-1):
276 if path.startswith(
'/pnfs/')
and (prefer_grid
or not pnfs_is_mounted):
278 print(
'*** Larbatch_posix: Opening dcache_file %s using mode %s.' % (path, mode))
282 print(
'*** Larbatch_posix: Opening normal file %s using mode %s.' % (path, mode))
283 return __builtins__[
'open'](path, mode, buf)
297 if (src.startswith(
'/pnfs/')
or dest.startswith(
'/pnfs/')):
298 if prefer_grid
or not pnfs_is_mounted:
300 print(
'*** Larbatch_posix: Copy %s to %s using ifdh.' % (src, dest))
301 larbatch_utilities.ifdh_cp(src, dest)
304 print(
'*** Larbatch_posix: Copy %s to %s using posix with timeout.' % (src, dest))
305 larbatch_utilities.posix_cp(src, dest)
308 print(
'*** Larbatch_posix: Copy %s to %s using posix.' % (src, dest))
309 shutil.copy(src, dest)
321 raise OSError(
'%s is not a directory.' % path)
323 if path.startswith(
'/pnfs/')
and (prefer_grid
or not pnfs_is_mounted):
325 print(
'*** Larbatch_posix: Listdir %s using ifdh.' % path)
329 tail = os.path.normpath(path[-6:])
333 contents = larbatch_utilities.ifdh_ls(path, 1)
341 nc = os.path.normpath(c.strip())
342 if not nc.endswith(tail):
343 result.append(os.path.basename(nc))
347 print(
'*** Larbatch_posix: Listdir %s using posix.' % path)
354 jobinfo = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
357 thread = threading.Thread(target=larbatch_utilities.wait_for_subprocess, args=[jobinfo, q])
359 thread.join(timeout=60)
360 if thread.is_alive():
362 print(
'*** Larbatch_posix: Terminating subprocess.')
369 for word
in jobout.split():
382 if path.startswith(
'/pnfs/')
and (prefer_grid
or not pnfs_is_mounted):
384 print(
'*** Larbatch_posix: Check existence of %s using ifdh.' % path)
389 larbatch_utilities.ifdh_ls(path, 0)
396 print(
'*** Larbatch_posix: Check existence of %s using posix.' % path)
402 npath = os.path.normpath(path)
403 dir = os.path.dirname(npath)
404 base = os.path.basename(npath)
409 for filename
in files:
426 if path[-5:] ==
'.list' or \
427 path[-5:] ==
'.root' or \
428 path[-5:] ==
'.json' or \
429 path[-4:] ==
'.txt' or \
430 path[-4:] ==
'.fcl' or \
431 path[-4:] ==
'.out' or \
432 path[-4:] ==
'.err' or \
433 path[-3:] ==
'.sh' or \
434 path[-5:] ==
'.stat':
437 if path.startswith(
'/pnfs/')
and (prefer_grid
or not pnfs_is_mounted):
439 print(
'*** Larbatch_posix: Check existence of directory %s using ifdh.' % path)
448 npath = os.path.normpath(path)
449 name = os.path.basename(npath)
450 dir = os.path.dirname(npath)
451 lines = larbatch_utilities.ifdh_ll(dir, 1)
454 if len(words) > 5
and words[-1] == name:
455 if words[0][0] ==
'd':
461 print(
'*** Larbatch_posix: Check existence of directory %s using posix.' % path)
462 result = os.path.isdir(path)
485 if path.startswith(
'/pnfs/')
and (prefer_grid
or not pnfs_is_mounted):
487 print(
'*** Larbatch_posix: Stat %s using ifdh.' % path)
492 npath = os.path.normpath(path)
493 name = os.path.basename(npath)
494 dir = os.path.dirname(npath)
495 lines = larbatch_utilities.ifdh_ll(dir, 1)
498 if len(words) >5
and words[-1] == name:
504 mode = larbatch_utilities.parse_mode(words[0])
508 nlinks =
int(words[1])
510 result = os.stat_result((mode,
523 print(
'*** Larbatch_posix: Stat %s using posix.' % path)
524 result = os.stat(path)
527 raise OSError(
'No such file or directory.')
541 if path.startswith(
'/pnfs/')
and (prefer_grid
or not pnfs_is_mounted):
543 print(
'*** Larbatch_posix: Check access for %s using ifdh.' % path)
554 if not sr.st_mode & statmod.S_IRUSR:
557 if not sr.st_mode & statmod.S_IWUSR:
560 if not sr.st_mode & statmod.S_IXUSR:
565 print(
'*** Larbatch_posix: Check access for %s using posix.' % path)
566 result = os.access(path, mode)
596 if top.startswith(
'/pnfs/')
and (prefer_grid
or not pnfs_is_mounted):
598 print(
'*** Larbatch_posix: Walk directory tree for %s using ifdh.' % top)
602 lines = larbatch_utilities.ifdh_ll(top, 1)
605 if len(words) > 5
and words[-1] !=
'__pycache__':
606 if words[0][0] ==
'd':
607 dirs.append(words[-1])
609 files.append(words[-1])
612 print(
'*** Larbatch_posix: Walk directory tree for %s using posix.' % top)
615 if obj !=
'__pycache__':
616 if isdir(os.path.join(top, obj)):
622 yield top, dirs, files
627 for result
in walk(os.path.join(top, dir), topdown):
631 yield top, dirs, files
643 if path.startswith(
'/pnfs/')
and (prefer_grid
or not pnfs_is_mounted):
645 print(
'*** Larbatch_posix: Make directory for %s using ifdh.' % path)
646 larbatch_utilities.ifdh_mkdir(path)
649 print(
'*** Larbatch_posix: Make directory for %s using posix.' % path)
659 if path.startswith(
'/pnfs/')
and (prefer_grid
or not pnfs_is_mounted):
661 print(
'*** Larbatch_posix: Make directory recursively for %s using ifdh.' % path)
665 np = os.path.normpath(path)
666 parent = os.path.dirname(np)
667 if not isdir(parent):
672 larbatch_utilities.ifdh_mkdir(path)
675 print(
'*** Larbatch_posix: Make directory recursively for %s using posix.' % path)
676 os.makedirs(path, mode)
683 if (src.startswith(
'/pnfs/')
or 684 dest.startswith(
'/pnfs/'))
and (prefer_grid
or not pnfs_is_mounted):
686 print(
'*** Larbatch_posix: Rename %s to %s using ifdh.' % (src, dest))
688 src_uri = larbatch_utilities.gridftp_uri(src)
689 dest_path = larbatch_utilities.dcache_path(dest)
690 cmd = [
'uberftp',
'-rename', src_uri, dest_path]
691 jobinfo = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
694 thread = threading.Thread(target=larbatch_utilities.wait_for_subprocess, args=[jobinfo, q])
696 thread.join(timeout=60)
697 if thread.is_alive():
699 print(
'*** Larbatch_posix: Terminating subprocess.')
706 raise IFDHError(cmd, rc, jobout, joberr)
709 print(
'*** Larbatch_posix: Rename %s to %s using posix.' % (src, dest))
716 if path.startswith(
'/pnfs/')
and (prefer_grid
or not pnfs_is_mounted):
718 print(
'*** Larbatch_posix: Delete file %s using ifdh.' % path)
719 larbatch_utilities.ifdh_rm(path)
722 print(
'*** Larbatch_posix: Delete file %s using posix.' % path)
734 newpath = path +
'_' +
str(uuid.uuid4())
736 os.rename(path, newpath)
737 os.system(
'rm -f %s &' % newpath)
746 if path.startswith(
'/pnfs/')
and (prefer_grid
or not pnfs_is_mounted):
748 print(
'*** Larbatch_posix: Delete directoroy %s using ifdh.' % path)
749 larbatch_utilities.ifdh_rmdir(path)
752 print(
'*** Larbatch_posix: Delete directoroy %s using posix.' % path)
759 if path.startswith(
'/pnfs/')
and (prefer_grid
or not pnfs_is_mounted):
761 print(
'*** Larbatch_posix: Delete directoroy tree %s using ifdh.' % path)
765 lines = larbatch_utilities.ifdh_ll(path, 1)
769 if words[0][0] ==
'd':
770 rmtree(os.path.join(path, words[-1]))
772 remove(os.path.join(path, words[-1]))
780 print(
'*** Larbatch_posix: Delete directoroy tree %s using posix.' % path)
792 npath = os.path.normpath(path)
793 newpath = npath +
'_' +
str(uuid.uuid4())
794 os.rename(npath, newpath)
795 os.system(
'rm -rf %s &' % newpath)
805 if path.startswith(
'/pnfs/')
and (prefer_grid
or not pnfs_is_mounted):
807 print(
'*** Larbatch_posix: Change mode for %s using ifdh.' % path)
808 larbatch_utilities.ifdh_chmod(path, mode)
811 print(
'*** Larbatch_posix: Change mode for %s using posix.' % path)
822 if src.startswith(
'/pnfs/')
and not pnfs_is_mounted:
824 print(
'*** Larbatch_posix: Make symbolic link from %s to %s using nfs server.' % (src, dest))
825 larbatch_utilities.test_ticket()
826 cmd = [
'ssh', larbatch_utilities.nfs_server(),
'ln',
'-s', src, dest]
827 jobinfo = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
830 thread = threading.Thread(target=larbatch_utilities.wait_for_subprocess, args=[jobinfo, q])
832 thread.join(timeout=60)
833 if thread.is_alive():
835 print(
'*** Larbatch_posix: Terminating subprocess.')
842 raise IFDHError(cmd, rc, jobout, joberr)
846 print(
'*** Larbatch_posix: Make symbolic link from %s to %s using posix.' % (src, dest))
847 os.symlink(src, dest)
859 if path.startswith(
'/pnfs/')
and not_pnfs_is_mounted:
861 print(
'*** Larbatch_posix: Read symbolic link %s using nfs server.' % path)
862 larbatch_utilities.test_ticket()
863 cmd = [
'ssh', larbatch_utilities.nfs_server(),
'readlink', path]
864 jobinfo = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
867 thread = threading.Thread(target=larbatch_utilities.wait_for_subprocess, args=[jobinfo, q])
869 thread.join(timeout=60)
870 if thread.is_alive():
872 print(
'*** Larbatch_posix: Terminating subprocess.')
879 raise IFDHError(cmd, rc, jobout, joberr)
880 result = jobout.strip()
884 print(
'*** Larbatch_posix: Read symbolic link %s using posix.' % path)
885 result = os.readlink(path)
900 if path.startswith(
'/pnfs/')
and (prefer_grid
or not pnfs_is_mounted):
902 print(
'*** Larbatch_posix: Stream path %s using xrootd.' % path)
903 larbatch_utilities.test_proxy()
904 stream = larbatch_utilities.xrootd_uri(path)
907 print(
'*** Larbatch_posix: Stream path %s as normal file.' % path)
def readlines(self, sizehint=-1)
def mkdir(path, mode=0o777)
def writelines(self, strs)
def readline(self, size=-1)
def walk(top, topdown=True)
def open(path, mode='r', buf=-1)
def makedirs(path, mode=0o777)