00001 #include "JobInfoSvc.h"
00002
00003 #include "GaudiKernel/GaudiException.h"
00004 #include "GaudiKernel/MsgStream.h"
00005 #include "Event/JobHeader.h"
00006 #include "Event/JobInfo.h"
00007 #include "RootIOSvc/RootIOAddress.h"
00008 #include "RootIOSvc/IRootIOSvc.h"
00009 #include "GaudiKernel/IConversionSvc.h"
00010 #include "PerJobInfo/PerJobHeader.h"
00011 #include "RootIOSvc/RootIOTypedCnv.h"
00012
00013 JobInfoSvc::JobInfoSvc(const std::string& name,
00014 ISvcLocator* svc) :
00015 Service(name,
00016 svc),
00017 m_currentJobInfo(0),
00018 m_fileIsInitialized(false)
00019 {
00020 declareProperty("ReadFromFile",m_readFromFile=false,
00021 "Load the job info from the current input file?");
00022 declareProperty("ReadFromDatabase",m_readFromDatabase=true,
00023 "Check the database for job info?");
00024 declareProperty("JobInfoLocation",
00025 m_jobInfoLocation = DayaBay::JobHeaderLocation::Default,
00026 "Path for job info if reading from file");
00027 declareProperty("JobId",m_jobIdString,
00028 "Used by nuwa.py to initialize current job ID");
00029 declareProperty("JobConfig",m_jobConfig,
00030 "Use to initialize current job configuration");
00031 }
00032
00033 StatusCode JobInfoSvc::queryInterface(const InterfaceID& id,
00034 void** interface) {
00035 if (IJobInfoSvc::interfaceID().versionMatch(id)) {
00036
00037 *interface = (IJobInfoSvc*)this;
00038 addRef();
00039 } else {
00040
00041 return Service::queryInterface(id,
00042 interface);
00043 }
00044
00045 return StatusCode::SUCCESS;
00046 }
00047
00048 StatusCode JobInfoSvc::initialize() {
00049 StatusCode status = Service::initialize();
00050 if (status.isFailure()) {
00051 throw GaudiException("Could not initialize super class",
00052 name(),
00053 status);
00054 }
00055 m_jobId.set(m_jobIdString);
00056 return StatusCode::SUCCESS;
00057 }
00058
00059 StatusCode JobInfoSvc::finalize() {
00060
00061 std::vector<DayaBay::JobInfo*>::iterator it, done = m_jobInfoList.end();
00062 for (it = m_jobInfoList.begin(); it != done; ++it) {
00063
00064
00065 }
00066 m_jobInfoList.clear();
00067 return Service::finalize();
00068 }
00069
00070 const DayaBay::JobInfo* JobInfoSvc::currentJobInfo() {
00071
00072 MsgStream log(msgSvc(), "JobInfoSvc");
00073 if(!m_currentJobInfo){
00074
00075 m_currentJobInfo = new DayaBay::JobInfo();
00076 m_currentJobInfo->setJobId(m_jobId);
00077 m_currentJobInfo->setParameters(m_jobConfig);
00078 m_jobInfoList.push_back(m_currentJobInfo);
00079 }
00080 return m_currentJobInfo;
00081 }
00082
00083 const DayaBay::JobInfo* JobInfoSvc::jobInfo(const DayaBay::JobId& jobId) {
00084
00085
00086 MsgStream log(msgSvc(), "JobInfoSvc");
00087 StatusCode sc = StatusCode::SUCCESS;
00088
00089 if( m_readFromFile && !m_fileIsInitialized){
00090 sc = this->readDataFromFile();
00091 if(!sc.isSuccess()) return 0;
00092 }
00093
00094
00095 DayaBay::JobInfo* jobInfo = 0;
00096 std::vector<DayaBay::JobInfo*>::iterator jobIter,
00097 jobDone = m_jobInfoList.end();
00098 for(jobIter = m_jobInfoList.begin(); jobIter != jobDone; ++jobIter) {
00099 DayaBay::JobInfo* checkJob = *jobIter;
00100 log << MSG::DEBUG << "Checking job info with ID "
00101 << checkJob->jobId().uuid() << endreq;
00102 if(checkJob->jobId()==jobId){
00103
00104 jobInfo = checkJob;
00105 break;
00106 }
00107 }
00108
00109
00110 if(!jobInfo && m_readFromDatabase ){
00111 sc = this->readDataFromDatabase(jobId);
00112 if(!sc.isSuccess()) return 0;
00113 }
00114
00115 return jobInfo;
00116 }
00117
00118 StatusCode JobInfoSvc::setJobInfo(const DayaBay::JobInfo& jobInfo){
00119
00120 bool updateJob = false;
00121 std::vector<DayaBay::JobInfo*>::iterator jobIter,
00122 jobDone = m_jobInfoList.end();
00123 for(jobIter = m_jobInfoList.begin(); jobIter != jobDone; ++jobIter) {
00124 if( (*jobIter)->jobId() == jobInfo.jobId() ){
00125
00126 *(*jobIter) = jobInfo;
00127 updateJob = true;
00128 break;
00129 }
00130 }
00131
00132 if( !updateJob ) m_jobInfoList.push_back(new DayaBay::JobInfo(jobInfo));
00133 return StatusCode::SUCCESS;
00134 }
00135
00136 const std::vector<DayaBay::JobInfo*>& JobInfoSvc::cachedJobInfo(){
00137
00138
00139
00140
00141 if(!m_currentJobInfo){
00142 this->currentJobInfo();
00143 }
00144
00145 if(m_readFromFile && !m_fileIsInitialized){
00146 StatusCode sc = this->readDataFromFile();
00147 sc.ignore();
00148 }
00149 return m_jobInfoList;
00150 }
00151
00152 StatusCode JobInfoSvc::readDataFromDatabase(const DayaBay::JobId& ){
00153
00154
00155 MsgStream log(msgSvc(), "JobInfoSvc");
00156 log << MSG::WARNING << "readDataFromDatabase: Loading data from database is not yet implemented." << endreq;
00157 return StatusCode::SUCCESS;
00158 }
00159
00160 StatusCode JobInfoSvc::readDataFromFile(){
00161
00162
00163
00164 if( m_fileIsInitialized ) return StatusCode::SUCCESS;
00165
00166 MsgStream log(msgSvc(), "JobInfoSvc");
00167 IService* isvc = 0;
00168 StatusCode sc = this->service("RootIOCnvSvc",isvc,true);
00169 if(sc.isFailure()){
00170 log << MSG::ERROR << "Failed to get RootIOCnvSvc as IService" << endreq;
00171 return sc;
00172 }
00173 IRootIOSvc* m_rioSvc = 0;
00174 sc = isvc->queryInterface(IRootIOSvc::interfaceID(), (void**)&m_rioSvc);
00175 if (sc.isFailure()) {
00176 log << MSG::ERROR << "Conversion service RootIOCnvSvc"
00177 << " does not implement IRootIOCnvSvc" << endreq;
00178 return sc;
00179 }
00180 IConversionSvc* convSvc = 0;
00181 sc = isvc->queryInterface(IConversionSvc::interfaceID(), (void**)&convSvc);
00182 if (sc.isFailure()) {
00183 log << MSG::ERROR << "Conversion service RootIOCnvSvc"
00184 << " does not implement IConversionSvc" << endreq;
00185 return sc;
00186 }
00187 IRootIOSvc::InputStreamMap& ism = m_rioSvc->inputStreams();
00188 IRootIOSvc::InputStreamMap::iterator it, done = ism.end();
00189 for (it = ism.begin(); it != done; ++it) {
00190 log << MSG::DEBUG << "checking input stream: " << it->first << endreq;
00191 if( it->first == m_jobInfoLocation ){
00192
00193 RootInputStream* ris = it->second;
00194 ris->setEntry(0);
00195 log << MSG::DEBUG << "Return status of read: " << ris->read() << "\t"
00196 << ris->obj() << endreq;
00197 PerJobHeader* perJobHeader = (PerJobHeader*)(ris->obj());
00198 if(!perJobHeader){
00199 log << MSG::ERROR << "Failed to get persistent JobHeader" << endreq;
00200 return StatusCode::FAILURE;
00201 }
00202 log << MSG::DEBUG << "Got persistent job info for "
00203 << perJobHeader->jobInfoList.size() << " jobs" << endreq;
00204
00205
00206 DayaBay::JobHeader* jobHeader = new DayaBay::JobHeader();
00207 IConverter* converter = convSvc->converter(jobHeader->classID());
00208 if (!converter) {
00209 log << MSG::ERROR << "Failed to get JobHeader converter" << endreq;
00210 return StatusCode::FAILURE;
00211 }
00212 RootIOTypedCnv<PerJobHeader, DayaBay::JobHeader>* jobHdrCnv = 0;
00213 jobHdrCnv = dynamic_cast< RootIOTypedCnv<PerJobHeader, DayaBay::JobHeader>* >(converter);
00214 if (!jobHdrCnv) {
00215 log << MSG::ERROR << "Failed to cast JobHeader converter" << endreq;
00216 return StatusCode::FAILURE;
00217 }
00218 sc = jobHdrCnv->PerToTran(*perJobHeader, *jobHeader);
00219 if (sc.isFailure()) {
00220 log << MSG::ERROR << "Failed to translate persistent JobHeader"
00221 << endreq;
00222 return sc;
00223 }
00224 const DayaBay::JobHeader::JobInfoContainer& jobInfoList
00225 = jobHeader->jobInfoList();
00226 int nJobs = jobInfoList.size();
00227 log << MSG::DEBUG << "Got job info for " << nJobs << " jobs" << endreq;
00228 for(int jobIdx=0; jobIdx < nJobs; jobIdx++){
00229
00230 m_jobInfoList.push_back(new DayaBay::JobInfo(*(jobInfoList[jobIdx])));
00231 }
00232 }
00233 }
00234 m_fileIsInitialized = true;
00235 return StatusCode::SUCCESS;
00236 }
00237