Source code for GlobalUtils

#!/usr/bin/env python
#------------------------------
"""
:py:class:`GlobalUtils` - a set of utilities
============================================

Usage::

    # Import
    import PSCalib.GlobalUtils as gu

    # Methods
    #resp = gu.<method(pars)>

    dettype = gu.det_type_from_source(source)
    detname = gu.string_from_source(source)

    mmask = gu.merge_masks(mask1=None, mask2=None, dtype=np.uint8)
    mask  = gu.mask_neighbors(mask_in, allnbrs=True, dtype=np.uint8)
    lo,hi = gu.evaluate_limits(arr, nneg=5, npos=5, lim_lo=1, lim_hi=1000, verbos=1, cmt='')

    arr2d = gu.reshape_nda_to_2d(nda)
    arr3d = gu.reshape_nda_to_3d(nda)

    # Get string with time stamp, ex: 2016-01-26T10:40:53
    ts    = gu.str_tstamp(fmt='%Y-%m-%dT%H:%M:%S', time_sec=None)

    usr   = gu.get_enviroment(env='USER')
    usr   = gu.get_login()
    host  = gu.get_hostname()
    cwd   = gu.get_cwd()
    rec   = gu.log_rec_on_start() # e.g. '2017-09-27T10:40:24 user:dubrovin@psanagpu104 cwd:/reg/neh/home4/dubrovin/LCLS/con-jungfrau ...'
    gu.add_rec_to_log(lfname, rec, verbos=False)

    exp  = gu.exp_name(env)
    cdir = gu.calib_dir(env)
    #### tsec, tnsec, fiducial, tsdate, tstime = gu.time_pars(evt) # needs in psana...

    gu.create_directory(dir, verb=False)
    gu.create_directory_with_mode(dir, mode=0777, verb=False)
    exists = gu.create_path(path, depth=6, mode=0777, verb=True)

    arr  = gu.load_textfile(path)
    gu.save_textfile(text, path, mode='w') # mode: 'w'-write, 'a'-append 

    path = gu.replace('/path/#YYYY-MM/fname.txt', '#YYYY-MM', gu.str_tstamp(fmt='%Y/%m'))

    ifname = 'fname.txt'
    ctypedir = '/some-path/calib/Jungfrau::CalibV1/CxiEndstation.0:Jungfrau.0/'
    ctype = 'pedestals'
    ofname = '123-end.data'
    rec = gu.history_record(ifname, ctypedir, ctype, ofname, comment='')
    path_clb = gu.path_to_calib_file(ctypedir, ctype, ofname)
    path_his = gu.path_to_history_file(ctypedir, ctype)

    cmd = gu.command_deploy_file(ifname, path_clb)
    cmd = gu.command_add_record_to_file(rec, path_his)

    tpl = gu.calib_fname_template(exp, runnum, tsec, tnsec, fid, tsdate, tstime, src, nevts, ofname)

    gu.deploy_file(ifname, ctypedir, ctype, ofname, lfname=None, verbos=False)

See: other methods in :py:class:`CalibPars`, :py:class:`CalibParsStore`

This software was developed for the SIT project.
If you use all or part of it, please give an appropriate acknowledgment.

Created: 2013-03-08 by Mikhail Dubrovin
"""
#--------------------------------

import sys
import os
import getpass
import socket
import numpy as np
from time import localtime, strftime

#------------------------------
#------------------------------

# ATTENTION !!!!! ALL LISTS SHOULD BE IN THE SAME ORDER (FOR DICTIONARIES)

# Enumerated and named parameters

PEDESTALS    = 0
PIXEL_STATUS = 1
PIXEL_RMS    = 2
PIXEL_GAIN   = 3
PIXEL_MASK   = 4
PIXEL_BKGD   = 5
COMMON_MODE  = 6
GEOMETRY     = 7
PIXEL_OFFSET = 8
PIXEL_DATAST = 9

calib_types  = ( PEDESTALS,   PIXEL_STATUS,   PIXEL_RMS,   PIXEL_GAIN,   PIXEL_MASK,   PIXEL_BKGD,   COMMON_MODE,   GEOMETRY,   PIXEL_OFFSET,   PIXEL_DATAST)
calib_names  = ('pedestals', 'pixel_status', 'pixel_rms', 'pixel_gain', 'pixel_mask', 'pixel_bkgd', 'common_mode', 'geometry', 'pixel_offset', 'pixel_datast')
calib_dtypes = ( np.float32,  np.uint16,      np.float32,  np.float32,   np.uint8,     np.float32,   np.double,     str,        np.float32,     np.uint16)

dic_calib_type_to_name  = dict(zip(calib_types, calib_names))
dic_calib_name_to_type  = dict(zip(calib_names, calib_types))
dic_calib_type_to_dtype = dict(zip(calib_types, calib_dtypes))

LOADED     = 1
DEFAULT    = 2
UNREADABLE = 3
UNDEFINED  = 4
WRONGSIZE  = 5
NONFOUND   = 6
DCSTORE    = 7

calib_statvalues = ( LOADED,   DEFAULT,   UNREADABLE,   UNDEFINED,   WRONGSIZE,   NONFOUND,   DCSTORE)
calib_statnames  = ('LOADED', 'DEFAULT', 'UNREADABLE', 'UNDEFINED', 'WRONGSIZE', 'NONFOUND', 'DCSTORE')

dic_calib_status_value_to_name = dict(zip(calib_statvalues, calib_statnames))
dic_calib_status_name_to_value = dict(zip(calib_statnames,  calib_statvalues))

#------------------------------
#------------------------------
#------------------------------
#------------------------------

UNDEFINED   = 0
CSPAD       = 1 
CSPAD2X2    = 2 
PRINCETON   = 3 
PNCCD       = 4 
TM6740      = 5 
OPAL1000    = 6 
OPAL2000    = 7 
OPAL4000    = 8 
OPAL8000    = 9 
ORCAFL40    = 10
EPIX        = 11
EPIX10K     = 12
EPIX100A    = 13
FCCD960     = 14
ANDOR       = 15
ACQIRIS     = 16
IMP         = 17
QUARTZ4A150 = 18
RAYONIX     = 19
EVR         = 20
FCCD        = 21
TIMEPIX     = 22
FLI         = 23
PIMAX       = 24
ANDOR3D     = 25
JUNGFRAU    = 26
ZYLA        = 27

#XAMPS    # N/A data
#FEXAMP   # N/A data
#PHASICS  # N/A data
#OPAL1600 # N/A data
#EPIXS    # N/A data
#GOTTHARD # N/A data
""" Enumetated detector types"""

list_of_det_type = (UNDEFINED, CSPAD, CSPAD2X2, PRINCETON, PNCCD, TM6740, \
                    OPAL1000, OPAL2000, OPAL4000, OPAL8000, \
                    ORCAFL40, EPIX, EPIX10K, EPIX100A, FCCD960, ANDOR, ACQIRIS, IMP, QUARTZ4A150, RAYONIX,
                    EVR, FCCD, TIMEPIX, FLI, PIMAX, ANDOR3D, JUNGFRAU, ZYLA)
""" List of enumetated detector types"""

list_of_det_names = ('UNDEFINED', 'Cspad', 'Cspad2x2', 'Princeton', 'pnCCD', 'Tm6740', \
                     'Opal1000', 'Opal2000', 'Opal4000', 'Opal8000', \
                     'OrcaFl40', 'Epix', 'Epix10k', 'Epix100a', 'Fccd960', 'Andor', 'Acqiris', 'Imp', 'Quartz4A150', 'Rayonix',\
                     'Evr', 'Fccd', 'Timepix', 'Fli', 'Pimax', 'Andor3d', 'Jungfrau', 'Zyla')
""" List of enumetated detector names"""

list_of_calib_groups = ('UNDEFINED',
                        'CsPad::CalibV1',
                        'CsPad2x2::CalibV1',
                        'Princeton::CalibV1',
                        'PNCCD::CalibV1',
                        'Camera::CalibV1',
                        'Camera::CalibV1',
                        'Camera::CalibV1',
                        'Camera::CalibV1',
                        'Camera::CalibV1',
                        'Camera::CalibV1',
                        'Epix::CalibV1',
                        'Epix10k::CalibV1',
                        'Epix100a::CalibV1',
                        'Camera::CalibV1',
                        'Andor::CalibV1',
                        'Acqiris::CalibV1',
                        'Imp::CalibV1',
                        'Camera::CalibV1',
                        'Camera::CalibV1',
                        'EvrData::CalibV1',
                        'Camera::CalibV1',
                        'Timepix::CalibV1',
                        'Fli::CalibV1',
                        'Pimax::CalibV1',
                        'Andor3d::CalibV1',
                        'Jungfrau::CalibV1',
                        'Camera::CalibV1'
                        )
""" List of enumetated detector calibration groups"""

dic_det_type_to_name = dict(zip(list_of_det_type, list_of_det_names))
""" Dictionary for detector type : name"""

dic_det_type_to_calib_group = dict(zip(list_of_det_type, list_of_calib_groups))
""" Dictionary for detector type : group"""

#------------------------------
bld_names = \
['EBeam',
'PhaseCavity',
'FEEGasDetEnergy',
'Nh2Sb1Ipm01',
'HxxUm6Imb01',
'HxxUm6Imb02',
'HfxDg2Imb01',
'HfxDg2Imb02',
'XcsDg3Imb03',
'XcsDg3Imb04',
'HfxDg3Imb01',
'HfxDg3Imb02',
'HxxDg1Cam',
'HfxDg2Cam',
'HfxDg3Cam',
'XcsDg3Cam',
'HfxMonCam',
'HfxMonImb01',
'HfxMonImb02',
'HfxMonImb03',
'MecLasEm01',
'MecTctrPip01',
'MecTcTrDio01',
'MecXt2Ipm02',
'MecXt2Ipm03',
'MecHxmIpm01',
'GMD',
'CxiDg1Imb01',
'CxiDg2Imb01',
'CxiDg2Imb02',
'CxiDg4Imb01',
'CxiDg1Pim',
'CxiDg2Pim',
'CxiDg4Pim',
'XppMonPim0',
'XppMonPim1',
'XppSb2Ipm',
'XppSb3Ipm',
'XppSb3Pim',
'XppSb4Pim',
'XppEndstation0',
'XppEndstation1',
'MecXt2Pim02',
'MecXt2Pim03',
'CxiDg3Spec',
'Nh2Sb1Ipm02',
'FeeSpec0',
'SxrSpec0',
'XppSpec0',
'XcsUsrIpm01',
'XcsUsrIpm02',
'XcsUsrIpm03',
'XcsUsrIpm04',
'XcsSb1Ipm01',
'XcsSb1Ipm02',
'XcsSb2Ipm01',
'XcsSb2Ipm02',
'XcsGonIpm01',
'XcsLamIpm01',
'XppAin01',
'XcsAin01',
'AmoAin01']


#------------------------------

[docs]def det_type_from_source(source) : """ Returns enumerated detector type for string source """ str_src = str(source) if ':Cspad.' in str_src : return CSPAD elif ':Cspad2x2.' in str_src : return CSPAD2X2 elif ':pnCCD.' in str_src : return PNCCD elif ':Princeton.' in str_src : return PRINCETON elif ':Andor.' in str_src : return ANDOR elif ':Epix100a.' in str_src : return EPIX100A elif ':Epix10k.' in str_src : return EPIX10K elif ':Epix.' in str_src : return EPIX elif ':Opal1000.' in str_src : return OPAL1000 elif ':Opal2000.' in str_src : return OPAL2000 elif ':Opal4000.' in str_src : return OPAL4000 elif ':Opal8000.' in str_src : return OPAL8000 elif ':Tm6740.' in str_src : return TM6740 elif ':OrcaFl40.' in str_src : return ORCAFL40 elif ':Fccd960.' in str_src : return FCCD960 elif ':Acqiris.' in str_src : return ACQIRIS elif ':Imp.' in str_src : return IMP elif ':Quartz4A150.' in str_src : return QUARTZ4A150 elif ':Rayonix.' in str_src : return RAYONIX elif ':Evr.' in str_src : return EVR elif ':Fccd.' in str_src : return FCCD elif ':Timepix.' in str_src : return TIMEPIX elif ':Fli.' in str_src : return FLI elif ':Pimax.' in str_src : return PIMAX elif ':DualAndor.' in str_src : return ANDOR3D elif ':Jungfrau.' in str_src : return JUNGFRAU elif ':Zyla.' in str_src : return ZYLA else : return UNDEFINED
#------------------------------ ##----------------------------- #------------------------------
[docs]def string_from_source(source) : """Returns string like "CxiDs2.0:Cspad.0" from "Source('DetInfo(CxiDs2.0:Cspad.0)')" or "Source('DsaCsPad')" """ str_in_quots = str(source).split('"')[1] str_split = str_in_quots.split('(') return str_split[1].rstrip(')') if len(str_split)>1 else str_in_quots
##-----------------------------
[docs]def shape_nda_to_2d(arr) : """Return shape of np.array to reshape to 2-d """ sh = arr.shape if len(sh)<3 : return sh return (arr.size/sh[-1], sh[-1])
##-----------------------------
[docs]def shape_nda_to_3d(arr) : """Return shape of np.array to reshape to 3-d """ sh = arr.shape if len(sh)<4 : return sh return (arr.size/sh[-1]/sh[-2], sh[-2], sh[-1])
##-----------------------------
[docs]def reshape_nda_to_2d(arr) : """Reshape np.array to 2-d """ sh = arr.shape if len(sh)<3 : return arr arr.shape = (arr.size/sh[-1], sh[-1]) return arr
##-----------------------------
[docs]def reshape_nda_to_3d(arr) : """Reshape np.array to 3-d """ sh = arr.shape if len(sh)<4 : return arr arr.shape = (arr.size/sh[-1]/sh[-2], sh[-2], sh[-1]) return arr
#------------------------------
[docs]def merge_masks(mask1=None, mask2=None, dtype=np.uint8) : """Merging masks using np.logical_and rule: (0,1,0,1)^(0,0,1,1) = (0,0,0,1) """ if mask1 is None : return mask2 if mask2 is None : return mask1 shape1 = mask1.shape shape2 = mask2.shape if shape1 != shape2 : if len(shape1) > len(shape2) : mask2.shape = shape1 else : mask1.shape = shape2 mask = np.logical_and(mask1, mask2) return mask if dtype==np.bool else np.asarray(mask, dtype)
#------------------------------
[docs]def mask_neighbors(mask, allnbrs=True, dtype=np.uint8) : """Return mask with masked eight neighbor pixels around each 0-bad pixel in input mask. mask : int - n-dimensional (n>1) array with input mask allnbrs : bool - False/True - masks 4/8 neighbor pixels. """ shape_in = mask.shape if len(shape_in) < 2 : raise ValueError('Input mask has less then 2-d, shape = %s' % str(shape_in)) mask_out = np.asarray(mask, dtype) if len(shape_in) == 2 : # mask nearest neighbors mask_out[0:-1,:] = np.logical_and(mask_out[0:-1,:], mask[1:, :]) mask_out[1:, :] = np.logical_and(mask_out[1:, :], mask[0:-1,:]) mask_out[:,0:-1] = np.logical_and(mask_out[:,0:-1], mask[:,1: ]) mask_out[:,1: ] = np.logical_and(mask_out[:,1: ], mask[:,0:-1]) if allnbrs : # mask diagonal neighbors mask_out[0:-1,0:-1] = np.logical_and(mask_out[0:-1,0:-1], mask[1: ,1: ]) mask_out[1: ,0:-1] = np.logical_and(mask_out[1: ,0:-1], mask[0:-1,1: ]) mask_out[0:-1,1: ] = np.logical_and(mask_out[0:-1,1: ], mask[1: ,0:-1]) mask_out[1: ,1: ] = np.logical_and(mask_out[1: ,1: ], mask[0:-1,0:-1]) else : # shape>2 mask_out.shape = mask.shape = shape_nda_to_3d(mask) # mask nearest neighbors mask_out[:, 0:-1,:] = np.logical_and(mask_out[:, 0:-1,:], mask[:, 1:, :]) mask_out[:, 1:, :] = np.logical_and(mask_out[:, 1:, :], mask[:, 0:-1,:]) mask_out[:, :,0:-1] = np.logical_and(mask_out[:, :,0:-1], mask[:, :,1: ]) mask_out[:, :,1: ] = np.logical_and(mask_out[:, :,1: ], mask[:, :,0:-1]) if allnbrs : # mask diagonal neighbors mask_out[:, 0:-1,0:-1] = np.logical_and(mask_out[:, 0:-1,0:-1], mask[:, 1: ,1: ]) mask_out[:, 1: ,0:-1] = np.logical_and(mask_out[:, 1: ,0:-1], mask[:, 0:-1,1: ]) mask_out[:, 0:-1,1: ] = np.logical_and(mask_out[:, 0:-1,1: ], mask[:, 1: ,0:-1]) mask_out[:, 1: ,1: ] = np.logical_and(mask_out[:, 1: ,1: ], mask[:, 0:-1,0:-1]) mask_out.shape = mask.shape = shape_in return mask_out
#------------------------------
[docs]def mask_edges(mask, mrows=1, mcols=1, dtype=np.uint8) : """Return mask with a requested number of row and column pixels masked - set to 0. mask : int - n-dimensional (n>1) array with input mask mrows : int - number of edge rows to mask mcols : int - number of edge columns to mask """ sh = mask.shape if len(sh) < 2 : raise ValueError('Input mask has less then 2-d, shape = %s' % str(sh)) mask_out = np.asarray(mask, dtype) # print 'shape:', sh if len(sh) == 2 : rows, cols = sh if mrows > rows : raise ValueError('Requested number of edge rows=%d to mask exceeds 2-d, shape=%s' % (mrows, str(sh))) if mcols > cols : raise ValueError('Requested number of edge columns=%d to mask exceeds 2-d, shape=%s' % (mcols, str(sh))) if mrows>0 : # mask edge rows mask_rows = np.zeros((mrows,cols), dtype=mask.dtype) mask_out[:mrows ,:] = mask_rows mask_out[-mrows:,:] = mask_rows if mcols>0 : # mask edge colss mask_cols = np.zeros((rows,mcols), dtype=mask.dtype) mask_out[:,:mcols ] = mask_cols mask_out[:,-mcols:] = mask_cols else : # shape>2 mask_out.shape = shape_nda_to_3d(mask) segs, rows, cols = mask_out.shape if mrows > rows : raise ValueError('Requested number of edge rows=%d to mask exceeds 2-d, shape=%s' % (mrows, str(sh))) if mcols > cols : raise ValueError('Requested number of edge columns=%d to mask exceeds 2-d, shape=%s' % (mcols, str(sh))) if mrows>0 : # mask edge rows mask_rows = np.zeros((segs,mrows,cols), dtype=mask.dtype) mask_out[:, :mrows ,:] = mask_rows mask_out[:, -mrows:,:] = mask_rows if mcols>0 : # mask edge colss mask_cols = np.zeros((segs,rows,mcols), dtype=mask.dtype) mask_out[:, :,:mcols ] = mask_cols mask_out[:, :,-mcols:] = mask_cols mask_out.shape = sh return mask_out
##-----------------------------
[docs]def evaluate_limits(arr, nneg=5, npos=5, lim_lo=1, lim_hi=1000, verbos=1, cmt='') : """Evaluates low and high limit of the array, which are used to find bad pixels. """ ave, std = (arr.mean(), arr.std()) if (nneg>0 or npos>0) else (None,None) lo = ave-nneg*std if nneg>0 else lim_lo hi = ave+npos*std if npos>0 else lim_hi lo, hi = max(lo, lim_lo), min(hi, lim_hi) if verbos & 1 : print ' %s: %s ave, std = %.3f, %.3f low, high limits = %.3f, %.3f'%\ (sys._getframe().f_code.co_name, cmt, ave, std, lo, hi) return lo, hi
##-----------------------------
[docs]def str_tstamp(fmt='%Y-%m-%dT%H:%M:%S', time_sec=None) : """Returns string timestamp for specified format and time in sec or current time by default """ return strftime(fmt, localtime(time_sec))
#------------------------------
[docs]def get_enviroment(env='USER') : """Returns the value of specified by string name environment variable """ return os.environ[env]
#------------------------------
[docs]def get_login() : """Returns login name """ #return os.getlogin() return getpass.getuser()
#------------------------------
[docs]def get_hostname() : """Returns login name """ #return os.uname()[1] return socket.gethostname()
#------------------------------
[docs]def get_cwd() : """Returns current working directory """ return os.getcwd()
#------------------------------ def create_directory(dir, verb=False) : if os.path.exists(dir) : if verb : print 'Directory exists: %s' % dir else : os.makedirs(dir) if verb : print 'Directory created: %s' % dir #------------------------------
[docs]def create_directory_with_mode(dir, mode=0777, verb=False) : """Creates directory and sets its mode""" if os.path.exists(dir) : if verb : print 'Directory exists: %s' % dir else : os.makedirs(dir) os.chmod(dir, mode) if verb : print 'Directory created: %s' % dir
#------------------------------
[docs]def create_path(path, depth=6, mode=0777, verb=False) : """Creates missing path of specified depth from the beginning e.g. for '/reg/g/psdm/logs/calibman/2016/07/log-file-name.txt' or '/reg/d/psdm/cxi/cxi11216/calib/Jungfrau::CalibV1/CxiEndstation.0:Jungfrau.0/pedestals/9-end.data' Returns True if path to file exists, False othervise """ if verb : print 'create_path: %s' % path #subdirs = path.strip('/').split('/') subdirs = path.split('/') cpath = subdirs[0] for i,sd in enumerate(subdirs[:-1]) : if i>0 : cpath += '/%s'% sd if i<depth : continue if cpath=='' : continue create_directory_with_mode(cpath, mode, verb) return os.path.exists(cpath)
#------------------------------
[docs]def save_textfile(text, path, mode='w') : """Saves text in file specified by path. mode: 'w'-write, 'a'-append """ f=open(path, mode) f.write(text) f.close()
#------------------------------
[docs]def load_textfile(path) : """Returns text file as a str object """ f=open(path, 'r') recs = f.read() # f.readlines() f.close() return recs
#------------------------------ def calib_dir(env) : cdir = env.calibDir() #if cdir == '/reg/d/psdm///calib' : # return None if os.path.exists(cdir) : return cdir #------------------------------ def exp_name(env) : exp = env.experiment() if exp=='' : return None return exp #------------------------------
[docs]def log_rec_on_start() : """Returns (str) record containing timestamp, login, host, cwd, and command line """ return '\n%s user:%s@%s cwd:%s\n command:%s'%\ (str_tstamp(fmt='%Y-%m-%dT%H:%M:%S'), get_login(), get_hostname(), get_cwd(), ' '.join(sys.argv))
#------------------------------
[docs]def add_rec_to_log(lfname, rec, verbos=False) : """Adds record rec to the log file with path lfname. If path does not exist, it is created beginning from depth=5. """ path = replace(lfname, '#YYYY-MM', str_tstamp(fmt='%Y/%m')) #print 'XXX:add_rec_to_log, path=%s' % path if create_path(path, depth=6, mode=0777, verb=verbos) : cmd = 'echo "%s" >> %s' % (rec, path) if verbos : print 'command: %s' % cmd os.system(cmd) os.chmod(path, 0666)
#------------------------------ def alias_for_src_name(env) : ckeys = env.configStore().keys() srcs = [k.src() for k in ckeys] alias = [k.alias() for k in ckeys] d = dict(zip(srcs, alias)) for s,a in d.items() : print 'src: %40s alias: %s' % (s, a) #print keysalias #------------------------------
[docs]def replace(template, pattern, subst) : """If pattern in the template replaces it with subst. Returns str object template with replaced patterns. """ fields = template.split(pattern, 1) if len(fields) > 1 : return '%s%s%s' % (fields[0], subst, fields[1]) else : return template
#------------------------------
[docs]def calib_fname_template(exp, runnum, tsec, tnsec, fid, tsdate, tstime, src, nevts, ofname): """Replaces parts of the file name ofname specified as #src, #exp, #run, #evts, #type, #date, #time, #fid, #sec, #nsec with actual values. RETURNS - template (str) - file name template, e.g.: nda-cxi11216-r0009-CxiEndstation.0:Jungfrau.0-e000010-%s.txt, where %s stands for supplied late type. """ template = replace(ofname, '#src', src) template = replace(template, '#exp', exp) template = replace(template, '#run', 'r%04d'%runnum) template = replace(template, '#type', '%s') template = replace(template, '#date', tsdate) template = replace(template, '#time', tstime) template = replace(template, '#fid', '%06d'%fid) template = replace(template, '#sec', '%d' % tsec) template = replace(template, '#nsec', '%09d' % tnsec) template = replace(template, '#evts', 'e%06d' % nevts) if not '%s' in template : template += '-%s' return template
#------------------------------
[docs]def history_record(ifname, ctypedir, ctype, ofname, comment='') : """Returns (str) history record about deployed constants. Parameters - ifname : str - input file name, e.g.: 'fname.txt' - ctypedir : str - path to calibtype directory, e.g. '/some-path/calib/Jungfrau::CalibV1/CxiEndstation.0:Jungfrau.0/' - ctype : str - calibration type, e.g.: 'pedestals' - ofname : str - output file name, e.g.: '123-end.data' - comment : str - any comment - verbos : bool - verbosity """ return 'file:%s copy_of:%s ctype:%s user:%s host:%s cptime:%s cwd:%s cmt:%s' % \ (ofname.ljust(14), ifname, ctype, get_login(), get_hostname(), str_tstamp(fmt='%Y-%m-%dT%H:%M:%S'), get_cwd(), comment)
#------------------------------
[docs]def path_to_history_file(ctypedir, ctype) : """Returns path to HISTORY file in the calib store. e.g. /some-path/calib/Jungfrau::CalibV1/CxiEndstation.0:Jungfrau.0/pedestals/HYSTORY See parameters description in :py:meth:`history_record`. """ return '%s/%s/HISTORY' % (ctypedir, ctype)
#------------------------------
[docs]def path_to_calib_file(ctypedir, ctype, ofname) : """Returns path to file wirh calibration constants in the calib store. e.g. /some-path/calib/Jungfrau::CalibV1/CxiEndstation.0:Jungfrau.0/pedestals/9-end.data See parameters description in :py:meth:`history_record`. """ return '%s/%s/%s' % (ctypedir, ctype, ofname)
#------------------------------
[docs]def command_deploy_file(ifname, ofname) : """Returns command to deploys file with calibration constants in the calib store. """ return 'cat %s > %s' % (ifname, ofname) # > stands for copy
#------------------------------
[docs]def command_add_record_to_file(rec, fname) : """Returns command to add record to file. """ return 'echo "%s" >> %s' % (rec, fname) # >> stands for append
#------------------------------
[docs]def deploy_file(ifname, ctypedir, ctype, ofname, lfname=None, verbos=False) : """Deploys file with calibration constants in the calib store, adds history record in file and in logfile. Parameters - ifname : str - input file name, e.g.: 'fname.txt' - ctypedir : str - path to calibtype directory, e.g. '/some-path/calib/Jungfrau::CalibV1/CxiEndstation.0:Jungfrau.0/' - ctype : str - calibration type, e.g.: 'pedestals' - ofname : str - output file name, e.g.: '123-end.data' - lfname : str - log file path or None, to add history record - verbos : bool - verbosity """ path_clb = path_to_calib_file(ctypedir, ctype, ofname) path_his = path_to_history_file(ctypedir, ctype) dep = 6 if '/reg/d/psdm/' in path_clb else 0 if create_path(path_clb, depth=dep, mode=02770, verb=verbos) : # mode=02770 makes drwxrws---+ cmd = command_deploy_file(ifname, path_clb) print 'cmd: %s' % cmd os.system(cmd) rec = history_record(ifname, ctypedir, ctype, ofname, comment='') cmd = command_add_record_to_file(rec, path_his) if verbos : print 'cmd: %s' % cmd os.system(cmd) if lfname is not None : add_rec_to_log(lfname, ' %s' % rec, verbos)
#------------------------------ #------------------------------ #------------------------------ #------------------------------ def test_mask_neighbors_2d(allnbrs=True) : from pyimgalgos.NDArrGenerators import random_exponential import pyimgalgos.Graphics as gr randexp = random_exponential(shape=(40,60), a0=1) fig = gr.figure(figsize=(16,6), title='Random 2-d mask') axim1 = gr.add_axes(fig, axwin=(0.05, 0.05, 0.40, 0.91)) axcb1 = gr.add_axes(fig, axwin=(0.452, 0.05, 0.01, 0.91)) axim2 = gr.add_axes(fig, axwin=(0.55, 0.05, 0.40, 0.91)) axcb2 = gr.add_axes(fig, axwin=(0.952, 0.05, 0.01, 0.91)) mask = np.select((randexp>6,), (0,), default=1) mask_nbrs = mask_neighbors(mask, allnbrs) img1 = mask # mask # randexp img2 = mask_nbrs # mask # randexp imsh1, cbar1 = gr.imshow_cbar(fig, axim1, axcb1, img1, amin=0, amax=10, orientation='vertical') imsh2, cbar2 = gr.imshow_cbar(fig, axim2, axcb2, img2, amin=0, amax=10, orientation='vertical') gr.show(mode=None) #------------------------------ def test_mask_neighbors_3d(allnbrs=True) : from pyimgalgos.NDArrGenerators import random_exponential import pyimgalgos.Graphics as gr #randexp = random_exponential(shape=(2,2,30,80), a0=1) randexp = random_exponential(shape=(2,30,80), a0=1) fig = gr.figure(figsize=(16,6), title='Random > 2-d mask') axim1 = gr.add_axes(fig, axwin=(0.05, 0.05, 0.40, 0.91)) axcb1 = gr.add_axes(fig, axwin=(0.452, 0.05, 0.01, 0.91)) axim2 = gr.add_axes(fig, axwin=(0.55, 0.05, 0.40, 0.91)) axcb2 = gr.add_axes(fig, axwin=(0.952, 0.05, 0.01, 0.91)) mask = np.select((randexp>6,), (0,), default=1) mask_nbrs = mask_neighbors(mask, allnbrs) img1 = reshape_nda_to_2d(mask) img2 = reshape_nda_to_2d(mask_nbrs) imsh1, cbar1 = gr.imshow_cbar(fig, axim1, axcb1, img1, amin=0, amax=10, orientation='vertical') imsh2, cbar2 = gr.imshow_cbar(fig, axim2, axcb2, img2, amin=0, amax=10, orientation='vertical') gr.show(mode=None) #------------------------------ def test_mask_edges_2d(mrows=1, mcols=1) : from pyimgalgos.NDArrGenerators import random_exponential import pyimgalgos.Graphics as gr fig = gr.figure(figsize=(8,6), title='Mask edges 2-d') axim1 = gr.add_axes(fig, axwin=(0.05, 0.05, 0.87, 0.91)) axcb1 = gr.add_axes(fig, axwin=(0.922, 0.05, 0.01, 0.91)) mask = np.ones((20,30)) mask_out = mask_edges(mask, mrows, mcols) img1 = mask_out imsh1, cbar1 = gr.imshow_cbar(fig, axim1, axcb1, img1, amin=0, amax=10, orientation='vertical') gr.show(mode=None) #------------------------------ def test_mask_edges_3d(mrows=1, mcols=1) : from pyimgalgos.NDArrGenerators import random_exponential import pyimgalgos.Graphics as gr fig = gr.figure(figsize=(8,6), title='Mask edges 2-d') axim1 = gr.add_axes(fig, axwin=(0.05, 0.05, 0.87, 0.91)) axcb1 = gr.add_axes(fig, axwin=(0.922, 0.05, 0.01, 0.91)) #mask = np.ones((2,2,20,30)) mask = np.ones((2,20,30)) mask_out = mask_edges(mask, mrows, mcols) img1 = reshape_nda_to_2d(mask_out) imsh1, cbar1 = gr.imshow_cbar(fig, axim1, axcb1, img1, amin=0, amax=10, orientation='vertical') gr.show(mode=None) #------------------------------ #def src_name_from_alias(env, alias='') : # amap = env.aliasMap() # for s in amap.srcs() : # str_s = str(s) # 'MfxEndstation.0:Rayonix.0' # print s, amap.src(str_s), ' alias ="%s"' % amap.alias(amap.src(str_s)) # # #psasrc = amap.src(str_src) # #source = src if amap.alias(psasrc) == '' else amap.src(str_src) #------------------------------ #------------------------------ def do_test() : print 'get_enviroment(USER) : %s' % get_enviroment() print 'get_login() : %s' % get_login() print 'get_hostname() : %s' % get_hostname() print 'get_cwd() : %s' % get_cwd() #print ': %s' % if len(sys.argv) > 1 : if sys.argv[1] == '1' : test_mask_neighbors_2d(allnbrs = False) if sys.argv[1] == '2' : test_mask_neighbors_2d(allnbrs = True) if sys.argv[1] == '3' : test_mask_neighbors_3d(allnbrs = False) if sys.argv[1] == '4' : test_mask_neighbors_3d(allnbrs = True) if sys.argv[1] == '5' : test_mask_edges_2d(mrows=5, mcols=1) if sys.argv[1] == '6' : test_mask_edges_2d(mrows=0, mcols=5) if sys.argv[1] == '7' : test_mask_edges_3d(mrows=1, mcols=2) if sys.argv[1] == '8' : test_mask_edges_3d(mrows=5, mcols=0) #------------------------------ if __name__ == "__main__" : do_test() #------------------------------