diff --git a/README.md b/README.md index bfccf47..a643d81 100755 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # LSF Python API -These python wrappers allow customers to submit and control jobs and obtain status of queues, hosts and other LSF attributes from Python directly. They work with various versions of LSF and are maintained by LSF developement, though we take contributions from the Open Source community. +These python wrappers allow customers to submit and control jobs and obtain status of queues, hosts and other LSF attributes from Python directly. They work with various versions of LSF and are maintained by LSF development, though we take contributions from the Open Source community. If you plan or would like to contribute to the library, you must follow the DCO process in the attached [DCO Readme file](https://github.com/IBMSpectrumComputing/platform-python-lsf-api/blob/master/IBMDCO.md) in the root of this repository. It essentially requires you to provide a Sign Off line in the notes of your pull request stating that the work is clear of infinging work by others. Again, for more details, please see the DCO Readme file. @@ -42,23 +42,26 @@ Please note you must use lsf.lsb_init before any other LSBLIB library routine in Supported operating systems: Linux 2.6 glibc 2.3 x86 64 bit: RHEL 6.2, RHEL6.4, RHEL6.5, RHEL6.8 - Linux 3.10 glibc 2.17 x86 64 bit: Red Hat 7.4, 7.5 + Linux 3.10 glibc 2.17 x86 64 bit: Red Hat 7.4, 7.5, 8.8, 8.9 Linux for Power Systems Servers 8 Little Endian (Linux 3.10, glibc 2.17): RHEL 7.4 Linux for Power Systems Servers 9 Little Endian (Linux 4.14, glibc 2.17): RHEL 7.5 - SWIG - SWIG version 2.0, or later + SWIG version 2.0, or 3.0 The following versions are tested: SWIG: 2.0.10, 3.0.12 + + If installing SWIG with pip or pip3, please specify 3.0.12 version. + `$ pip3 install swig==3.0.12` - Python Python2 and Python3 are all supported. The following versions are tested: - Python 2.6.6, 2.7.15, 3.0, 3.6.0, 3.7.0 + Python 2.6.6, 2.7.15, 3.0, 3.6.0, 3.7.0, 3.12.2 ## Compatibility @@ -74,6 +77,10 @@ Before compiling the library, set the LSF environment variables: `$ source profile.lsf` +If using python version higher than 3.10, distutils has been deprecated, install setuptools: + +`$ pip3 install setuptools` + To compile and install the library, go to the main source directory and type: @@ -90,6 +97,7 @@ or `$ python3 setup.py bdist_rpm` Resulting RPMs will be placed in the dist directory + ## Release Notes ### Release 1.0.6 @@ -171,3 +179,4 @@ IBM(R), the IBM logo and ibm.com(R) are trademarks of International Business Mac registered in many jurisdictions worldwide. Other product and service names might be trademarks of IBM or other companies. A current list of IBM trademarks is available on the Web at "Copyright and trademark information" at www.ibm.com/legal/copytrade.shtml. + diff --git a/examples/disp_gpu.py b/examples/disp_gpu.py index 096d711..892e1e2 100644 --- a/examples/disp_gpu.py +++ b/examples/disp_gpu.py @@ -7,11 +7,14 @@ num_hosts = lsf.new_intp() lsf.intp_assign(num_hosts, 0) host_data = lsf.lsb_hostinfo_ex(host_names, num_hosts, "", 0) - all_host_data = lsf.hostInfoEntArray_frompointer(host_data) + all_host_data = lsf.hostInfoEntArray(lsf.intp_value(num_hosts)) + all_host_data = all_host_data.frompointer(host_data) for i in range(0, lsf.intp_value(num_hosts)): hostname = all_host_data[i].host print(hostname) gpudata = all_host_data[i].gpuData + if gpudata is None: + continue print("ngpus avail_shared_gpus avail_excl_gpus") print(" {} {} {}". \ format(gpudata.ngpus, gpudata.avail_shared_ngpus, gpudata.avail_excl_ngpus)) diff --git a/examples/disp_gpu_host.py b/examples/disp_gpu_host.py new file mode 100644 index 0000000..cae2e3c --- /dev/null +++ b/examples/disp_gpu_host.py @@ -0,0 +1,27 @@ +from pythonlsf import lsf +if __name__ == '__main__': + if lsf.lsb_init("test") > 0: + exit -1; + num_hosts = lsf.new_intp() + lsf.intp_assign(num_hosts, 0) + hosts = lsf.ls_gethostgpuinfo(None, num_hosts, None, 0, 0) + + hostGpuInfos = lsf.hostGpuInfoArray(lsf.intp_value(num_hosts)) + hostGpuInfos = hostGpuInfos.frompointer(hosts) + for i in range(0, lsf.intp_value(num_hosts)): + hostGpuInfo = hostGpuInfos[i] + print("Host: {}". format(hostGpuInfo.hostName)) + numGpus = hostGpuInfos[i].gpuC + if numGpus <= 0: + continue + gpuAttrData = hostGpuInfo.gpuAttrV + gpuLoadData = hostGpuInfo.gpuLoadV + gpuAttrs = lsf.hostGpuAttrArray(numGpus) + gpuAttrs = gpuAttrs.frompointer(gpuAttrData) + gpuLoads = lsf.hostGpuLoadArray(numGpus) + gpuLoads = gpuLoads.frompointer(gpuLoadData) + print(" gBrand gModel gBusId gMode gUsedMem gStatus") + for j in range(0, numGpus): + print(" {} {} {} {} {} {} ". \ + format(gpuAttrs[j].gBrand, gpuAttrs[j].gModel, gpuAttrs[j].gBusId, + gpuLoads[j].gMode, gpuLoads[j].gUsedMem, gpuLoads[j].gStatus)) diff --git a/examples/disp_limit.py b/examples/disp_limit.py index c7bcead..ab715e8 100644 --- a/examples/disp_limit.py +++ b/examples/disp_limit.py @@ -28,4 +28,5 @@ def printLimit(): if __name__ == '__main__': print("LSF Clustername is : {}".format(lsf.ls_getclustername())) + lsf.set_limit_filter_flag(True) printLimit() diff --git a/examples/disp_limit_all.py b/examples/disp_limit_all.py index cc2fa2a..56ea773 100644 --- a/examples/disp_limit_all.py +++ b/examples/disp_limit_all.py @@ -3,12 +3,14 @@ def printLimitItem(name, item): print(name+' :') print(' consumerC : {}'.format(item.consumerC)) - consumers = lsf.limitConsumerArray_frompointer(item.consumerV) + consumers = lsf.limitConsumerArray(item.consumerC) + consumers = consumers.frompointer(item.consumerV) for j in range (item.consumerC) : print(' [{}] type : {}'.format(j, consumers[j].type)) print(' [{}] name : {}'.format(j, consumers[j].name)) print(' resourceC : {}'.format(item.resourceC)) - resources = lsf.limitResourceArray_frompointer(item.resourceV) + resources = lsf.limitResourceArray(item.resourceC) + resources= resources.frompointer(item.resourceV) for j in range (item.resourceC) : print(' [{}] name : {}'.format(j, resources[j].name)) print(' [{}] type : {}'.format(j, resources[j].type)) @@ -47,7 +49,8 @@ def printLimit(): # print usageC in the limit print('usageC : {}'.format(ent.usageC)) # print usageInfo in the limit - all_usageInfo = lsf.limitItemArray_frompointer(ent.usageInfo) + all_usageInfo = lsf.limitItemArray(ent.usageC) + all_usageInfo = all_usageInfo.frompointer(ent.usageInfo) for j in range (ent.usageC) : printLimitItem('usageInfo', all_usageInfo[j]) diff --git a/examples/get_job_pendreason.py b/examples/get_job_pendreason.py new file mode 100644 index 0000000..a1319e9 --- /dev/null +++ b/examples/get_job_pendreason.py @@ -0,0 +1,106 @@ +from pythonlsf import lsf +import sys + +def showJobStat(stat) : + if stat == "" or stat == lsf.JOB_STAT_NULL : + return "NULL" + elif stat == lsf.JOB_STAT_PEND : + return "PEND" + elif stat == lsf.JOB_STAT_PSUSP : + return "PSUSP" + elif stat == lsf.JOB_STAT_RUN : + return "RUN" + elif stat == lsf.JOB_STAT_SSUSP : + return "SSUSP" + elif stat == lsf.JOB_STAT_USUSP : + return "USUSP" + elif stat == lsf.JOB_STAT_EXIT : + return "EXIT" + elif stat == lsf.JOB_STAT_DONE or stat == (lsf.JOB_STAT_DONE + lsf.JOB_STAT_PDONE) or \ +stat == (lsf.JOB_STAT_DONE + lsf.JOB_STAT_PERR) : + return "DONE" + elif stat == lsf.JOB_STAT_UNKWN : + return "UNKWN" + elif stat == (lsf.JOB_STAT_RUN + lsf.JOB_STAT_PROV) : + return "PROV" + else : + return "ERROR" + + +def get_job_info(list) : + if lsf.lsb_init("test") > 0: + print("failed to initialize") + return + if len(list) > 0 : + print("request below job's info: {}".format(list)) + else : + print("request all job's info") + clusterName = lsf.ls_getclustername() + print("retrieve cluster name : {}".format(clusterName)) + reasonLevel = 3 + more = lsf.new_intp() + jobInfoPtr = lsf.jobInfoHeadExt() + req = lsf.jobInfoReq() + req.reasonLevel = reasonLevel + req.sourceClusterName = clusterName + req.userName = "all" + req.options = lsf.ALL_JOB + if len(list) > 1 : + req.jobId = 0 + jobList = "" + cluList = "" + for j in range(len(list)): + jobList += list[j] + "," + cluList += clusterName + "," + req.submitExt = lsf.submit_ext() + submitDict = {} + submitDict[lsf.JDATA_EXT_JOBIDS] = jobList + submitDict[lsf.JDATA_EXT_SOURCECLUSTERS] = cluList + submitDict[lsf.JDATA_EXT_AFFINITYINFO] = "" + submitDict[lsf.JDATA_EXT_DATAINFO] = "" + submitDict[lsf.JDATA_EXT_EST_RESULTS] = "" + submitDict[lsf.JDATA_EXT_APS_DETAIL] = "" + req.submitExt.keys = lsf.new_intArray(6) + req.submitExt.values = lsf.new_stringArray(6) + for p, (key, value) in enumerate(submitDict.items()): + lsf.intArray_setitem(req.submitExt.keys, p, key) + lsf.stringArray_setitem(req.submitExt.values, p, value) + req.submitExt.num = 6 + elif len(list) == 1 : + req.jobId = int(list[0]) + else : + req.jobId = 0 + jobInfoPtr = lsf.lsb_openjobinfo_req(req) + foundJob = False + if jobInfoPtr != None : + if jobInfoPtr.jobInfoHead != None : + foundJob = True + if not foundJob : + print("failed to query jobs") + else : + print("found job number : {}".format(jobInfoPtr.jobInfoHead.numJobs)) + if jobInfoPtr.jobInfoHead.numJobs > 0 : + job = lsf.jobInfoEnt() + job = lsf.lsb_readjobinfo_cond(more, jobInfoPtr); + if job == None: + print("no job found") + while job != None: + if job.status != lsf.JOB_STAT_PEND and job.status != lsf.JOB_STAT_PSUSP : + print("job <{}> from user ({}) status is <{}>".format(lsf.lsb_jobid2str(job.jobId),job.user, showJobStat(job.status))) + else : + pendreasons = lsf.lsb_pendreason_ex(reasonLevel, job, jobInfoPtr.jobInfoHead, job.clusterId) + print("job <{}> from user ({}) pending in status <{}>\n with pending reason:\n {}".format(lsf.lsb_jobid2str(job.jobId),job.user, showJobStat(job.status), pendreasons)) + + job = lsf.lsb_readjobinfo_cond(more, jobInfoPtr); + + +if __name__ == "__main__": + joblist = [] + if len(sys.argv) >= 2: + for jobid in sys.argv[1:]: + joblist.append(jobid) + if len(joblist) > 0 : + print("get job pending reason for: ", joblist) + else : + print("get all job pending reason info") + get_job_info(joblist) diff --git a/examples/get_jobs_info.py b/examples/get_jobs_info.py new file mode 100755 index 0000000..50b54e7 --- /dev/null +++ b/examples/get_jobs_info.py @@ -0,0 +1,62 @@ +from pythonlsf import lsf +import sys + +def get_job_info(list) : + if lsf.lsb_init("test") > 0: + print("failed to initialize") + return + if len(list) == 0 : + print("no valid job id given") + return + print("request below job's info: {}".format(list)) + clusterName = lsf.ls_getclustername() + print("retrieve cluster name : {}".format(clusterName)) + query = "jobIds=(" + for l in list: + query += l +"," + query = query[:-1] + query += ")jobSouceClusterNames=(" + for i in range(len(list)): + query += clusterName + "," + query = query[:-1] + query += ") options=" + str(lsf.ALL_JOB) + " " + + jobQuery = lsf.jobInfoQuery() + jobQuery.nCols = 10 + jobQuery.colIndexs = lsf.buildQueryColIndexs() + + jobQuery.query = query + jobQuery.submitExt = lsf.submit_ext() + more = lsf.new_intp() + print("request: {}".format(jobQuery.query)) + jobInfoPtr = lsf.jobInfoHeadExt() + jobInfoPtr = lsf.lsb_queryjobinfo_ext_2(jobQuery, clusterName) + foundJob = False + if jobInfoPtr != None : + if jobInfoPtr.jobInfoHead != None : + foundJob = True + if not foundJob : + print("failed to query jobs") + else : + print("found job number : {}".format(jobInfoPtr.jobInfoHead.numJobs)) + if jobInfoPtr.jobInfoHead.numJobs > 0 : + job = lsf.jobInfoEnt() + job = lsf.lsb_fetchjobinfo(more, jobQuery.nCols, jobQuery.colIndexs, jobQuery.query) + if job == None: + print("no job found") + while job != None: + print("job <{}> from user ({}) status is {}".format(lsf.lsb_jobid2str(job.jobId),job.user, job.status)) + job = lsf.lsb_fetchjobinfo(more, jobQuery.nCols, jobQuery.colIndexs, jobQuery.query) + + +if __name__ == "__main__": + joblist = [] + if len(sys.argv) >= 2: + for jobid in sys.argv[1:]: + joblist.append(jobid) + if len(joblist) == 0 : + print("usage: \n {} job1\n {} job1 job2 job3 ...\n".format(sys.argv[0],sys.argv[0])) + exit() + print("get job info for: ", joblist) + get_job_info(joblist) + diff --git a/examples/job_cputime.py b/examples/job_cputime.py new file mode 100644 index 0000000..2a29398 --- /dev/null +++ b/examples/job_cputime.py @@ -0,0 +1,43 @@ +from pythonlsf import lsf + +def print_job_cputime(jobid): + + if lsf.lsb_init("job_info") > 0: + print("failed to initialise the api") + return + + num_jobs_found = lsf.lsb_openjobinfo(jobid, "", "all", "", "", 0x2000) + print('num_jobs_found: {}'.format(num_jobs_found)) + + if num_jobs_found == -1: + print("no job was found") + return + + int_ptr = lsf.new_intp() + lsf.intp_assign(int_ptr, num_jobs_found) + + job_info = lsf.lsb_readjobinfo(int_ptr) + + lsf.lsb_closejobinfo() + + print('jobId: {}'.format(job_info.jobId)) + print('jobStatus: {}'.format(job_info.status)) + + # Get CPU time from run Rusage + time_sum = float(job_info.runRusage.utime) + float(job_info.runRusage.stime) + if time_sum > 0: + print('The CPU time used is: {} seconds'.format(time_sum)) + + # Get CPU time from host Rusage of each execution host + if job_info.numhRusages > 0: + hRusages = job_info.hostRusage + for i in range(0,job_info.numhRusages): + hRusage = lsf.hRusageArray_getitem(hRusages, i) + cpu_time = float(hRusage.utime) + float(hRusage.stime) + print('HOST: {}, CPU_TIME: {} seconds'.format(hRusage.name, cpu_time)) + + return + +if __name__ == "__main__": + id = input("Enter a running job id:\n") + print_job_cputime(int(id)) diff --git a/examples/limcontrol.py b/examples/limcontrol.py new file mode 100644 index 0000000..ca5a61e --- /dev/null +++ b/examples/limcontrol.py @@ -0,0 +1,18 @@ +from pythonlsf import lsf + + +# the hostname you want to operate lim running on +host = "your_hostname" + +# set opCode to 1 if you need to reboot lim +#set to 2 if you want to shutdown it +opCode = 1 + +if lsf.lsb_init("test") > 0: + print("failed to initialize") + exit +if lsf.ls_limcontrol(host, opCode) == 0 : + print("host operated successfully") +else : + print("host operated failed") + diff --git a/examples/pack_submit.py b/examples/pack_submit.py new file mode 100644 index 0000000..e05956c --- /dev/null +++ b/examples/pack_submit.py @@ -0,0 +1,51 @@ +from pythonlsf import lsf + +def sub_pack_job(): + + + limits1 = [] + for i in range(0, lsf.LSF_RLIM_NLIMITS): + limits1.append(lsf.DEFAULT_RLIMIT) + + submitreq1 = lsf.submit() + submitreq1.command = "sleep 10" + submitreq1.options = 0 + submitreq1.options2 = 0 + submitreq1.rLimits = limits1 + + limits2 = [] + for i in range(0, lsf.LSF_RLIM_NLIMITS): + limits2.append(lsf.DEFAULT_RLIMIT) + + submitreq2 = lsf.submit() + submitreq2.command = "sleep 20" + submitreq2.options = 0 + submitreq2.options2 = 0 + submitreq2.rLimits = limits2 + + + pack_submitreq = lsf.packSubmit() + pack_submitreq.num = 2 + submits = lsf.new_submitArray(2) + lsf.submitArray_setitem(submits, 0, submitreq1) + lsf.submitArray_setitem(submits, 1, submitreq2) + pack_submitreq.reqs = submits + + pack_submitreply = lsf.packSubmitReply() + intp_acceptedNum = lsf.copy_intp(0) + intp_rejectedNum = lsf.copy_intp(0) + + + if lsf.lsb_init("test") > 0: + exit(1) + + result = lsf.lsb_submitPack(pack_submitreq, pack_submitreply, intp_acceptedNum, intp_rejectedNum) + print(result) + print(lsf.intp_value(intp_acceptedNum)) + print(lsf.intp_value(intp_rejectedNum)) + + return result + +if __name__ == '__main__': + print("LSF Clustername is :", lsf.ls_getclustername()) + sub_pack_job() diff --git a/examples/readstream.py b/examples/readstream.py index 5ba9b57..60a4722 100644 --- a/examples/readstream.py +++ b/examples/readstream.py @@ -27,7 +27,17 @@ def display(eventrec): for i in range(0,numhosts): hoststr += lsf.stringArray_getitem(exechosts, i) + "" print("EVENT_JOB_FORCE jobid<%d>, execHost<%s>, username<%s>" %(jobid, hoststr, username)) - + elif eventrec.type == lsf.EVENT_JOB_RUN_RUSAGE: + jobid = eventrec.eventLog.jobRunRusageLog.jobid; + numpgids = eventrec.eventLog.jobRunRusageLog.jrusage.npgids; + for i in range(0,numpgids): + pgids = str(lsf.intArray_getitem(eventrec.eventLog.jobRunRusageLog.jrusage.pgid, i)) + " " + pgids = pgids[:-1] + numpids = eventrec.eventLog.jobRunRusageLog.jrusage.npids; + for i in range(0,numpids): + pids = str(lsf.pidInfoArray_getitem(eventrec.eventLog.jobRunRusageLog.jrusage.pidInfo, i).pid) + " "; + pids = pids[:-1] + print("EVENT_JOB_RUN_RUSAGE jobid<%d> pgids<%s> pids<%s>" %(jobid, pgids, pids)) else: print("event type is %d" %(eventrec.type)) diff --git a/examples/restartMbd.py b/examples/restartMbd.py new file mode 100644 index 0000000..dc6a29f --- /dev/null +++ b/examples/restartMbd.py @@ -0,0 +1,15 @@ +from pythonlsf import lsf + +req = lsf.mbdCtrlReq() +req.opCode = 0 +req.message = "" +req.name = "mbd" + +if lsf.lsb_init("test") > 0 : + print("failed to initialize") + exit +if lsf.lsb_reconfig(req) == 0 : + print("mbd restarted successfully") +else : + print("failed to restart mbd") + diff --git a/examples/restartRES.py b/examples/restartRES.py new file mode 100644 index 0000000..7556a3b --- /dev/null +++ b/examples/restartRES.py @@ -0,0 +1,28 @@ +import sys,socket +from pythonlsf import lsf + +def restartres(hostlist): + if lsf.ls_initdebug("resrestart") < 0: + print("ls_initdebug failed!") + return -1 + num_port = len(hostlist) * 2 + if lsf.ls_initrex(num_port, 0) < num_port : + lsf.ls_perror("ls_initrex") + return -1 + for host in hostlist: + rc=lsf.ls_rescontrol(host, lsf.RES_CMD_REBOOT, 0) + + if rc < 0: + lsf.ls_perror("lsf.ls_rescontrol") + print("failed restart res on {}".format(host)) + else : + print("res on {} restarted".format(host)) + return + +if __name__ == '__main__': + if len(sys.argv) > 1 : + hostlist = sys.argv[1:] + else : + hostlist = [socket.gethostname()] + restartres(hostlist) + diff --git a/examples/restartSBD.py b/examples/restartSBD.py new file mode 100755 index 0000000..d1a6462 --- /dev/null +++ b/examples/restartSBD.py @@ -0,0 +1,42 @@ +from pythonlsf import lsf +import sys + + +def restartSBD(opCode, message, hosts) : + if lsf.lsb_init("test") > 0: + print("failed to initialize") + return + req = lsf.hostCtrlReq() + req.opCode = opCode + req.message = message + if len(hosts) > 0 : + for h in hosts: + print("restarting sbatchd daemon on host <{}> ...".format(h)) + req.host = h + cc = lsf.lsb_hostcontrol(req) + if cc == 0 : + print("sbatchd daemon restarted successfully.") + elif cc == -1 : + print("ERROR: sbatchd daemon failed to restart.") + else : + print("ERROR: return {} while trying to restart sbatchd.".format(cc)) + else : + print("restarting sbatchd daemon on local host ...") + cc = lsf.lsb_hostcontrol(req) + if cc == 0 : + print("sbatchd daemon restarted successfully.") + elif cc == -1 : + print("ERROR: sbatchd daemon failed to restart.") + else : + print("ERROR: return {} while trying to restart sbatchd.".format(cc)) + + +if __name__ == "__main__": + opCode = lsf.HOST_REBOOT + message = "reboot according to python-api" + if len(sys.argv) > 1 : + hosts = sys.argv[1:] + else : + hosts = [] + restartSBD(opCode, message, hosts) + diff --git a/examples/submit_gpu_job.py b/examples/submit_gpu_job.py new file mode 100755 index 0000000..b67ae0e --- /dev/null +++ b/examples/submit_gpu_job.py @@ -0,0 +1,69 @@ +from pythonlsf import lsf + + +def run_job(command): + """ + Run a job... + """ + submitreq = lsf.submit() + submitreq.command = command + #submitreq.options = 0 + submitreq.options2 = 0 + + limits = [] + for i in range(0, lsf.LSF_RLIM_NLIMITS): + limits.append(lsf.DEFAULT_RLIMIT) + + submitreq.rLimits = limits + + submitreq.beginTime = 0 + submitreq.termTime = 0 + submitreq.numProcessors = 1 + submitreq.maxNumProcessors = 1 + + # below 2 lines section is for using -R rusage to request GPU + #submitreq.resReq = "rusage[ngpus_physical=2:gmodel=K80#12G:nvlink=yes]" + #submitreq.options = lsf.SUB_RES_REQ + + # below section is for using -gpu to request GPU + submit_ext = lsf.submit_ext() + submitreq.options2 = lsf.SUB2_MODIFY_PEND_JOB + submitreq.options4 = lsf.SUB4_GPU_REQ + gpuOpt = {} + gpuOpt[lsf.JDATA_EXT_GPU_NUM] = "2" + gpuOpt[lsf.JDATA_EXT_GPU_MODE] = "3" + #gpuOpt[lsf.JDATA_EXT_GPU_MPS_ENABLE] = "-1" + #gpuOpt[lsf.JDATA_EXT_GPU_JOB_EXCLUSIVE] = "-1" + #gpuOpt[lsf.JDATA_EXT_GPU_MODEL] = "" + #gpuOpt[lsf.JDATA_EXT_GPU_MEM] = "" + #gpuOpt[lsf.JDATA_EXT_GPU_TILE] = "" + #gpuOpt[lsf.JDATA_EXT_GPU_AFFBIND] = "-1" + #gpuOpt[lsf.JDATA_EXT_GPU_BLOCK] = "-1" + #gpuOpt[lsf.JDATA_EXT_GPU_GPACK] = "-1" + #gpuOpt[lsf.JDATA_EXT_GPU_GVENDOR] = "-1" + #gpuOpt[lsf.JDATA_EXT_GPU_RSRC_TYPE] = "0" + #gpuOpt[lsf.JDATA_EXT_GPU_GI_SLICE] = "-1" + #gpuOpt[lsf.JDATA_EXT_GPU_CI_SLICE] = "-1" + + submit_ext.keys = lsf.new_intArray(len(gpuOpt)) + submit_ext.values = lsf.new_stringArray(len(gpuOpt)) + for i, (key, value) in enumerate(gpuOpt.items()): + lsf.intArray_setitem(submit_ext.keys, i, key) + lsf.stringArray_setitem(submit_ext.values, i, value) + submit_ext.num = len(gpuOpt) + submitreq.submitExt = submit_ext + print(submitreq.submitExt.num) + + # submit the job request + submitreply = lsf.submitReply() + + if lsf.lsb_init("test") > 0: + exit(1) + + job_id = lsf.lsb_submit(submitreq, submitreply) + return job_id + + +if __name__ == '__main__': + print("LSF Clustername is :", lsf.ls_getclustername()) + print(run_job("/bin/sleep 10")) diff --git a/pythonlsf/Makefile b/pythonlsf/Makefile index b48b893..30c8e6a 100755 --- a/pythonlsf/Makefile +++ b/pythonlsf/Makefile @@ -13,8 +13,8 @@ LSF_INCLUDE = $(LSF_LIBDIR)/../../include/lsf/ PROJECT = _lsf.so OBJECTS = lsf_wrap.o -CFLAGS = -m64 -fPIC -I$(PYTHON_INCLUDE) -I$(LSF_INCLUDE) -LDFLAGS = $(LSF_LIBDIR)/liblsf.a $(LSF_LIBDIR)/libbat.a $(LSF_LIBDIR)/libfairshareadjust.so $(LSF_LIBDIR)/liblsbstream.so -lc -lnsl +CFLAGS = -fPIC -I$(PYTHON_INCLUDE) -I$(LSF_INCLUDE) +LDFLAGS = $(LSF_LIBDIR)/liblsf.a $(LSF_LIBDIR)/libbat.a $(LSF_LIBDIR)/libfairshareadjust.so $(LSF_LIBDIR)/liblsbstream.so -lc -lnsl -lz all: $(PROJECT) diff --git a/pythonlsf/lsf.i b/pythonlsf/lsf.i index 9238fb0..a18028d 100644 --- a/pythonlsf/lsf.i +++ b/pythonlsf/lsf.i @@ -25,7 +25,11 @@ int fclose(FILE *f); #include "lsf.h" #include "lsbatch.h" #include "lib.table.h" +extern struct gpuJobData* str2GpuJobData(char *str); +typedef int bool_t; +extern void setbConfigInfoFlag4Lib(bool_t bConfigInfo); %} +typedef long off_t; %pointer_functions(int, intp) %pointer_functions(float, floatp) @@ -47,11 +51,19 @@ int fclose(FILE *f); %array_functions(struct shareAcctInfoEnt, shareAcctInfoEntArray) #ifdef LSF_VERSION_101 %array_functions(struct gpuRusage, gpuRusageArray) +%array_functions(struct gpuJobHostData, gpuJobHostDataArray) +%array_functions(struct gpuTaskData, gpuTaskDataArray) +%array_functions(struct gpuData *, gpuDataArray) +%array_functions(struct migData, migDataArray) +%array_functions(struct pidInfo, pidInfoArray) #endif %array_functions(LS_LONG_INT, LS_LONG_INTArray) %array_functions(guaranteedResourcePoolEnt, guaranteedResourcePoolEntArray) %array_functions(struct rsvInfoEnt, rsvInfoEntArray) %array_functions(struct hostRsvInfoEnt, hostRsvInfoEntArray) +%array_functions(struct hRusage, hRusageArray) +%array_functions(struct hostInfo, hostInfoArray) +%array_functions(struct submit*, submitArray) //helper function for transforming char** to python list %inline %{ @@ -190,6 +202,11 @@ static void stringArray_setitem(char * *ary, size_t index, char * value) { %array_class(struct _limitItem, limitItemArray) %array_class(struct _limitConsumer, limitConsumerArray) %array_class(struct _limitResource, limitResourceArray) +#ifdef LSF_VERSION_101 +%array_class(struct hostGpuInfo, hostGpuInfoArray) +%array_class(struct hostGpuAttr, hostGpuAttrArray) +%array_class(struct hostGpuLoad, hostGpuLoadArray) +#endif // handle int arrays %typemap(in) int [ANY] (int temp[$1_dim0]) { @@ -232,6 +249,16 @@ static void stringArray_setitem(char * *ary, size_t index, char * value) { $1 = 0; } +%typemap(in) bool_t { + if (PyBool_Check($input)) { + $1 = (bool_t)(($input == Py_True) ? 1 : 0); + } else if (PyLong_Check($input)) { + $1 = (bool_t)PyLong_AsLong($input); + } else { + $1 = (bool_t)0; + } +} + /* The following routines are not wrapped because SWIG has issues generating proper code for them @@ -700,6 +727,9 @@ int get_lsb_errno() { char * get_lsb_sysmsg() { return lsb_sysmsg(); } +struct gpuJobData* get_str2GpuJobData(char *str) { + return str2GpuJobData(str); +} PyObject * get_pids_from_stream(struct jRusage * jrusage) { struct pidInfo *pidInfo; @@ -714,6 +744,20 @@ PyObject * get_pids_from_stream(struct jRusage * jrusage) { return result; } +long * buildQueryColIndexs() { + long * colIndexs = NULL; + int i = 0; + + colIndexs = calloc(113, sizeof(long)); + for (i= 0; i < 113; i++) { + colIndexs[i] = i; + } + colIndexs[1] = 2; + colIndexs[2] = 1; + colIndexs[9] = 10; + return colIndexs; +} + PyObject * get_host_info_all() { struct hostInfoEnt *hostinfo; char **hosts = NULL; @@ -733,4 +777,8 @@ PyObject * get_host_info_all() { return result; } +void set_limit_filter_flag(bool_t bConfigInfo) { + setbConfigInfoFlag4Lib(bConfigInfo); +} + %} diff --git a/read_JOB_FINISH_submitExt.py b/read_JOB_FINISH_submitExt.py new file mode 100644 index 0000000..fba8299 --- /dev/null +++ b/read_JOB_FINISH_submitExt.py @@ -0,0 +1,67 @@ +#!/usr/bin/env python3 + +from pythonlsf import lsf +import sys + +# Define function to get key and pair values from the eventrec.eventLog.jobFinishLog.submitExt +def get_pair_from_submit_ext(submit_ext, id): + if submit_ext is None: + return None + + + values_as_list = lsf.string_array_to_pylist(submit_ext.values, submit_ext.num) + for i in range(submit_ext.num): + key = lsf.intArray_getitem(submit_ext.keys, i) + if key == id: + return values_as_list[i] + + return None + + +def display(eventrec): + """ + display event record, this example to get the user group (-G) value from the JOB_FINISH record + """ + if eventrec.type == lsf.EVENT_JOB_FINISH: + jobid = eventrec.eventLog.jobFinishLog.jobId + fromHost = eventrec.eventLog.jobFinishLog.fromHost + submit_ext=eventrec.eventLog.jobFinishLog.submitExt + userGroup = get_pair_from_submit_ext(submit_ext,lsf.JDATA_EXT_USRGROUP) +# jobgroup = eventrec.eventLog.jobFinishLog.jgroup +# jobgroup = eventrec.eventLog.jobFinishLog.submitExt.values[1075] + print("EVENT_JOB_FINISH jobid<%d>, fromHost<%s>, to jobgroup <%s>" %(jobid, fromHost, userGroup)) + else: + print("event type is %d" %(eventrec.type)) + +def read_eventrec(path): + """ + read lsb.events + """ + lineNum = lsf.new_intp() + lsf.intp_assign(lineNum, 0) + fp = lsf.fopen(path, "r") + if fp is None: + print("The file %s does not exist." % path) + sys.exit(1) + + flag = 1 + + if lsf.lsb_init("test") > 0: + exit(1) + + while flag > 0: + log = lsf.lsb_geteventrec(fp, lineNum) + if log: + display(log) + else: + flag = 0 + + +if __name__ == '__main__': + if len(sys.argv) == 1: + print("Usage: %s full_path_lsb.events_file" % (sys.argv[0])) + sys.exit(0) + + print("LSF Clustername is :", lsf.ls_getclustername()) + #read_eventrec("/opt/lsf8.0.1/work/cluster1/logdir/lsb.events") + read_eventrec(sys.argv[1]) diff --git a/setup.py b/setup.py index 9650790..b3da42e 100755 --- a/setup.py +++ b/setup.py @@ -8,8 +8,12 @@ # import os, sys, re import time -from distutils.core import setup, Extension +if sys.version_info >= (3,10): + from setuptools import setup, Extension +else: + from distutils.core import setup, Extension from distutils.command.bdist_rpm import bdist_rpm +from distutils.command.build import build from distutils.command.install import INSTALL_SCHEMES class bdist_rpm_custom(bdist_rpm): @@ -25,6 +29,21 @@ def finalize_package_data (self): self.no_autoreq = 1 bdist_rpm.finalize_package_data(self) +# Build extensions before python modules, so the generated pythonlsf/lsf.py +# file is created before attempting to install it. This makes building with +# "pip" easier. +# See: +# https://bugs.python.org/issue2624 +# https://bugs.python.org/issue1016626 +# https://stackoverflow.com/questions/12491328/python-distutils-not-include-the-swig-generated-module +# https://stackoverflow.com/questions/50239473/building-a-module-with-setuptools-and-swig + +class build_ext_first(build): + def finalize_options(self): + super().finalize_options() + new_order = list(filter(lambda x: x[0] == 'build_ext', self.sub_commands)) + list(filter(lambda x: x[0] != 'build_ext', self.sub_commands)) + self.sub_commands[:] = new_order + def get_lsf_libdir(): try: _lsf_envdir = os.environ['LSF_ENVDIR'] @@ -51,7 +70,7 @@ def is_keyvalue_defined(lsbatch_h_path): def set_gccflag_lsf_version(): global gccflag_lsfversion _lsf_envdir = os.environ['LSF_ENVDIR'] - with open('{}/lsf.conf'.format(_lsf_envdir), 'r') as f: + with open('{0}/lsf.conf'.format(_lsf_envdir), 'r') as f: _lsf_version = re.search('LSF_VERSION=(.*)', f.read()).group(1).strip() if _lsf_version == '10.1' : gccflag_lsfversion= '-DLSF_VERSION_101' @@ -63,7 +82,7 @@ def set_gccflag_lsf_version(): xlc_path = os.path.join(path, 'xlc') if os.access(xlc_path, os.F_OK): found_xlc = True - os.environ["LDSHARED"] = "%s -pthread -shared -Wl,-z,relro" % xlc_path + os.environ["LDSHARED"] = "%s -pthread -shared -Wl,-z,-lz,relro" % xlc_path break if found_xlc == False: print(''' @@ -87,11 +106,11 @@ def set_gccflag_lsf_version(): if os.access(LSF_LIBDIR + "/liblsbstream.a", os.F_OK): lsf_static_lib = [ LSF_LIBDIR + '/liblsbstream.a'] - lsf_dynamic_lib = ['c', 'nsl', 'rt'] + lsf_dynamic_lib = ['c', 'nsl', 'rt', 'z'] warning_msg = "" else: lsf_static_lib = [] - lsf_dynamic_lib = ['c', 'nsl', 'lsbstream', 'lsf', 'bat', 'rt'] + lsf_dynamic_lib = ['c', 'nsl', 'lsbstream', 'lsf', 'bat', 'rt', 'z'] warning_msg = ''' Warning: The compatibility of the LSF Python API package is not guaranteed if you update LSF at a later time. This is because your current @@ -131,18 +150,18 @@ def set_gccflag_lsf_version(): # '-DLSF_SIMULATOR', '-DOS_HAS_THREAD -D_REENTRANT', gccflag_keyvaluet, gccflag_lsfversion], - extra_compile_args=['-m64', + extra_compile_args=[ '-I' + LSF_LIBDIR + '/../../include/lsf/', '-Wno-strict-prototypes', gccflag_keyvaluet, gccflag_lsfversion, '-DOS_HAS_THREAD -D_REENTRANT', #For multi-thread lib, lserrno '-Wp,-U_FORTIFY_SOURCE', #The flag needs -O option. Undefine it for warning. '-O0'], - extra_link_args=['-m64'], extra_objects=lsf_static_lib, libraries=lsf_dynamic_lib)], py_modules=['pythonlsf.lsf'], - cmdclass = { 'bdist_rpm': bdist_rpm_custom }, + cmdclass = { 'bdist_rpm': bdist_rpm_custom, + 'build': build_ext_first }, classifiers=["Development Status :: 2 - Pre-Alpha", "License :: OSI Approved :: Eclipse Public License", "Operating System :: OS Independent",