| Classes | Job Modules | Data Objects | Services | Algorithms | Tools | Packages | Directories | Tracs |

In This Package:

StageDataManager.cc

Go to the documentation of this file.
00001 #include "StageDataManager.h"
00002 #include "Stage/HeaderStageData.h"
00003 
00004 #include "Event/HeaderObject.h"
00005 
00006 #include "GaudiKernel/IDataProviderSvc.h"
00007 
00008 #include <vector>
00009 
00010 using namespace DayaBay;
00011 using namespace std;
00012 
00013 StageDataManager::StageDataManager(const std::string& name, ISvcLocator* svc)
00014     : Service(name,svc)
00015 {
00016 }
00017 
00018 StageDataManager::~StageDataManager()
00019 {
00020 }
00021     
00023 StatusCode StageDataManager::initialize()
00024 {
00025     StatusCode sc = this->Service::initialize();
00026     if (sc.isFailure()) return sc;
00027 
00028     sc = this->service("EventDataSvc",m_evtsvc);
00029     if (sc.isFailure()) return sc;
00030 
00031     MsgStream log(msgSvc(), "StageDataManager");
00032     log << MSG::DEBUG << "Initialized" << endreq;
00033 
00034     return sc;
00035 }
00036 
00037 StatusCode StageDataManager::finalize()
00038 {
00039     MsgStream log(msgSvc(), "StageDataManager");
00040     log << MSG::DEBUG << "Finalizing" << endreq;
00041 
00042     return this->Service::finalize();
00043 }
00044 
00045 StatusCode StageDataManager::queryInterface(const InterfaceID& riid, void** ppint)
00046 {
00047     if (IID_IStageDataManager.versionMatch(riid)) {
00048         *ppint = (IStageDataManager*)this;
00049     }
00050     else {
00051         return this->Service::queryInterface(riid,ppint);
00052     }
00053     addRef();
00054     return StatusCode::SUCCESS;
00055 }
00056 
00057 StatusCode StageDataManager::registerData(const std::string& location, IStageData& sd)
00058 {
00059     MsgStream log(msgSvc(), "StageDataManager");
00060 
00061     HeaderObject& ho = sd.header();
00062 
00063     log << MSG::DEBUG << "Registering data at " << location 
00064         << " id#" << ho.clID() << " @ " << ho.context().AsString()
00065         << endreq;
00066 
00067     HeaderStageData<HeaderObject>* hsd = new HeaderStageData<HeaderObject>(ho);
00068     ho.addRef();
00069     m_cache[location].insert(IStageDataListPair(hsd->time(),hsd));
00070     return StatusCode::SUCCESS;
00071 }
00072 
00073 // depth first collecting of HOs
00074 static bool find_hos(const HeaderObject* ho, vector<const HeaderObject*> &hos)
00075 {
00076     if (!ho) return false;
00077 
00078     const vector<const IHeader*>& inHOs = ho->inputHeaders();
00079 
00080     size_t nhos = inHOs.size();
00081 
00082     for (size_t ind=0; ind<nhos; ++ind) {
00083         const IHeader* iho = inHOs[ind];
00084         const HeaderObject* other_ho = dynamic_cast<const HeaderObject*>(iho);
00085         bool ok = find_hos(other_ho,hos);
00086         if (!ok) return false;
00087     }
00088 
00089     hos.push_back(ho);
00090 
00091     return true;
00092 }
00093 
00094 // Return the IStageData corresponding to the given HO, if it exists
00095 // in the list.
00096 static IStageDataList::iterator find_ho(IStageDataList& sdlist, const HeaderObject* ho)
00097 {
00098     IStageDataList::iterator it, done = sdlist.end();
00099     for (it = sdlist.begin(); it != done; ++it) {
00100         const HeaderObject* other_ho = &(it->second->header());
00101         if (ho == other_ho) return it;
00102     }
00103     return done;
00104 }
00105 
00106 // Remove all items from the front of sdlist, up to but not including
00107 // last.  Return a vector of all the HOs from the SDs removed.
00108 static vector<const HeaderObject*> strip_sdlist(IStageDataList& sdlist, IStageDataList::iterator last)
00109 {
00110     vector<const HeaderObject*> dead_hos;
00111     vector<IStageData*> dead_data;
00112     
00113     for (IStageDataList::iterator it = sdlist.begin(); it != last; ++it) {
00114         dead_data.push_back(it->second);
00115         dead_hos.push_back(&(it->second->header()));
00116     }
00117     sdlist.erase(sdlist.begin(), last);
00118     for (size_t ind=0; ind<dead_data.size(); ++ind) {
00119         delete dead_data[ind];
00120     }
00121     dead_data.clear();
00122 
00123     return dead_hos;
00124 }
00125 
00126 // Walk the cache.  If the HO is found, write it out and all others in
00127 // the same location that are earlier
00128 void StageDataManager::find_and_flush(const HeaderObject* ho)
00129 {
00130     MsgStream log(msgSvc(), "StageDataManager");
00131 
00132     LocDataListMap::iterator it, done = m_cache.end();
00133     for (it = m_cache.begin(); it != done; ++it) {
00134         
00135         const string& loc = it->first;
00136         IStageDataList& sdlist = it->second;
00137 
00138         IStageDataList::iterator sdit = find_ho(sdlist,ho);
00139         if (sdit == sdlist.end()) continue;
00140 
00141         ++sdit;                 // always up-to-but-not-including
00142         size_t before = sdlist.size();
00143         vector<const HeaderObject*> dead_hos = strip_sdlist(sdlist, sdit);
00144         size_t after = sdlist.size();
00145 
00146         size_t siz = dead_hos.size();
00147         log << MSG::DEBUG << "Loading " << siz << " HeaderObjects into TES, stripped from "
00148             << before << " to " << after
00149             << endreq;
00150 
00151         for (size_t ind=0; ind<siz; ++ind) {
00152             // gotta break const'ness here because the TES must call
00153             // addRef/release.  This const-incorectness is ultimately
00154             // because HeaderObject::inputHeaders() forces constness
00155             // on us, even thought the original HO isn't const.  This
00156             // is probably a deficiency of HeaderObject.
00157             HeaderObject* malleable_ho = const_cast<HeaderObject*>(dead_hos[ind]);
00158             m_evtsvc->registerObject(loc,malleable_ho);
00159 
00160             log << MSG::DEBUG << "Releasing HeaderObject clID: " << malleable_ho->clID() << endreq;
00161             malleable_ho->release();
00162         }
00163 
00164         // in principle, we could bail from the function at this point
00165         // as no one would ever think to put a HO in more than one
00166         // spot.  But, lets be paranoid and keep checking.
00167     }
00168 }
00169 
00170 StatusCode StageDataManager::consumeData(IStageData& sd)
00171 {
00172     MsgStream log(msgSvc(), "StageDataManager");
00173     log << MSG::DEBUG << "consumeData()" << endreq;
00174 
00175     // need to:
00176     // 
00177     // 3. add sd->header() to TES, if not yet already added
00178     // 
00179     // 2. add all input headers to TES, inyaa
00180     // 
00181     // 1. add all cached headers to TES, inyaa and if earlier than any
00182     // in the category above on a per-location basis
00183 
00184     const HeaderObject& ho = sd.header();
00185 
00186     if (1) {                    // some debugging
00187         log << MSG::DEBUG << "Consuming with " << m_cache.size() << " locations cached"
00188             << endreq;
00189         LocDataListMap::iterator it, done = m_cache.end();
00190         for (it = m_cache.begin(); it != done; ++it) {
00191             IStageDataList& sdlist = it->second;
00192             log << MSG::DEBUG << "\t" << it->first 
00193                 << " with " << sdlist.size()
00194                 << ":" << endreq;
00195             IStageDataList::iterator lit, ldone = sdlist.end();
00196             for (lit = sdlist.begin(); lit != ldone; ++lit) {
00197                 HeaderObject& ho = lit->second->header();
00198                 log << MSG::DEBUG << "\t\t id#"
00199                     << ho.clID() << " at \""
00200                     << ho.defLoc() << "\" @ " 
00201                     << ho.context().AsString()
00202                     << endreq;
00203             }
00204         }
00205     }
00206     vector<const HeaderObject*> hos;
00207     bool ok = find_hos(&ho,hos);
00208     if (!ok) {
00209         log << MSG::ERROR << "Got IHeader that isn't a HeaderObject! WTF?" << endreq;
00210         return StatusCode::FAILURE;        
00211     }
00212 
00213     size_t nhos = hos.size();
00214 
00215     log << MSG::DEBUG << "Consuming id#" << ho.clID() 
00216         << " \"" << ho.defLoc() << "\" @ " 
00217         << ho.context().AsString()
00218         << " along with " << nhos << " other HeaderObjects" << endreq;
00219 
00220     for (size_t ind=0; ind<nhos; ++ind) {
00221         log << MSG::DEBUG << "\t" << ind << ": id#" << hos[ind]->clID() 
00222             << " \"" << hos[ind]->defLoc() << "\" @ " 
00223             << hos[ind]->context().AsString() << endreq;
00224         this->find_and_flush(hos[ind]);
00225     }
00226 
00227     return StatusCode::SUCCESS;
00228 }
00229 
00230     
00231     
| Classes | Job Modules | Data Objects | Services | Algorithms | Tools | Packages | Directories | Tracs |

Generated on Mon Apr 11 20:36:09 2011 for Stage by doxygen 1.4.7