#include <fstream>
#include <THashList.h>
#include <TChain.h>
#include <TKey.h>
#include <TH1.h>
#include <THStack.h>
#include "TSystem.h"
#include "TFile.h"
#include "TGrid.h"
#include "TGridResult.h"
#include "TObjString.h"
#include "TObjArray.h"
#include "TMethodCall.h"
#include "Riostream.h"
#include "AliSysInfo.h"
#include "AliFileMerger.h"
#include "AliLog.h"
using std::cerr;
using std::endl;
using std::cout;
using std::ifstream;
ClassImp(AliFileMerger)
ProcInfo_t procInfo;
AliFileMerger::AliFileMerger():
TNamed(),
fRejectMask(0),
fAcceptMask(0),
fMaxFilesOpen(800),
fNoTrees(kFALSE)
{
}
AliFileMerger::AliFileMerger(const char* name):
TNamed(name,name),
fRejectMask(0),
fAcceptMask(0),
fMaxFilesOpen(800),
fNoTrees(kFALSE)
{
}
AliFileMerger::~AliFileMerger()
{
delete fRejectMask;
delete fAcceptMask;
}
void AliFileMerger::IterAlien(const char* outputDir, const char* outputFileName, const char* pattern, Bool_t dontOverwrite){
TString command;
command = Form("find %s/ *%s", outputDir, pattern);
printf("command: %s\n", command.Data());
TGrid::Connect("alien://");
TGridResult *res = gGrid->Command(command);
if (!res) return;
TIter nextmap(res);
TMap *map = 0;
TList sourcelist;
sourcelist.SetOwner(kTRUE);
while((map=(TMap*)nextmap())) {
TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("turl"));
if (!objs || !objs->GetString().Length()) {
delete res;
break;
}
printf("looking for file %s\n",(objs->GetString()).Data());
AddFile(&sourcelist, (objs->GetString()).Data());;
}
IterList(&sourcelist, outputFileName, dontOverwrite);
delete res;
}
void AliFileMerger::IterList(const TList* namesList, const char* outputFileName, Bool_t dontOverwrite)
{
gSystem->GetProcInfo(&procInfo);
AliInfo(Form(">> memory usage %ld %ld", procInfo.fMemResident, procInfo.fMemVirtual));
TString outputFile(outputFileName);
gSystem->ExpandPathName(outputFile);
int nFiles = namesList->GetEntries();
int maxSrcOpen = fMaxFilesOpen - 1;
TList filesList;
filesList.SetOwner(kTRUE);
TString tmpDest[2] = {outputFile,outputFile};
int npl = outputFile.Last('.');
if (npl<0) npl = outputFile.Length();
for (int i=0;i<2;i++) tmpDest[i].Insert(npl,Form("_TMPMERGE%d_",i));
int nsteps = 0, currTmp = 0, start = 0;
for (int ifl=0;ifl<nFiles;ifl++) {
int st = ifl%maxSrcOpen;
if (st==0 && ifl) {
OpenNextChunks(namesList,&filesList,start,ifl-1);
start = ifl;
if (nsteps++) {
filesList.AddFirst(TFile::Open(tmpDest[currTmp].Data()));
currTmp = (currTmp==0) ? 1:0;
}
TFile* targetTmp = TFile::Open( tmpDest[currTmp].Data(), "RECREATE");
if (!targetTmp || targetTmp->IsZombie()) {
printf("Error opening temporary file %s\n",tmpDest[currTmp].Data());
return;
}
MergeRootfile(targetTmp, &filesList);
targetTmp->Close();
delete targetTmp;
filesList.Clear();
}
}
TFile* target = TFile::Open( outputFile.Data(), (dontOverwrite ? "CREATE":"RECREATE") );
if (!target || target->IsZombie()) {
cerr << "Error opening target file (does " << outputFileName << " exist?)." << endl;
cerr << "Use force = kTRUE to re-creation of output file." << endl;
return;
}
OpenNextChunks(namesList,&filesList,start,nFiles-1);
if (nsteps) filesList.AddFirst(TFile::Open(tmpDest[currTmp].Data()));
MergeRootfile( target, &filesList);
target->Close();
delete target;
filesList.Clear();
for (int i=0;i<2;i++) gSystem->Exec(Form("if [ -e %s ]; then \nrm %s\nfi",tmpDest[i].Data(),tmpDest[i].Data()));
printf("Merged %d files in %d steps\n",nFiles,++nsteps);
gSystem->GetProcInfo(&procInfo);
AliInfo(Form("<< memory usage %ld %ld", procInfo.fMemResident, procInfo.fMemVirtual));
}
void AliFileMerger::IterTXT( const char * fileList, const char* outputFileName, Bool_t dontOverwrite){
ifstream in;
in.open(fileList);
TString objfile;
Int_t counter=0;
TList sourcelist;
sourcelist.SetOwner(kTRUE);
while(in.good()) {
in >> objfile;
if (!objfile.Contains(".root")) continue;
gSystem->ExpandPathName(objfile);
printf("Add file:Counter\t%d\tMerging file %s\n",counter++,objfile.Data());
AddFile(&sourcelist, objfile.Data());
}
IterList(&sourcelist, outputFileName, dontOverwrite);
}
void AliFileMerger::StoreResults(TObjArray * array, const char* outputFileName){
TFile *f = new TFile(outputFileName,"recreate");
for (Int_t i=0; i<array->GetEntries(); i++){
TObject *object0 = array->At(i);
if (!object0) continue;
object0->Write();
}
f->Close();
delete f;
}
void AliFileMerger::StoreSeparateResults(TObjArray * array, const char* outputFileName){
for (Int_t i=0; i<array->GetEntries(); i++){
TObject *object0 = array->At(i);
if (!object0) continue;
TFile *f = new TFile(Form("%s_%s.root",outputFileName,object0->GetName()),"recreate");
object0->Write();
f->Close();
delete f;
}
}
void AliFileMerger::Merge(TFile* fileIn, TObjArray * array){
if (!array) return;
static Int_t counter=-1;
counter++;
TObjArray *carray = new TObjArray;
carray->SetOwner(kTRUE);
TList *farr = fileIn->GetListOfKeys();
if (!farr) {
delete carray;
return;
}
for (Int_t ical=0; ical<farr->GetEntries(); ical++){
if (!farr->At(ical)) continue;
TString name(farr->At(ical)->GetName());
if (!IsAccepted(name)) continue;
TObject *obj = fileIn->Get(name.Data());
if (obj) carray->AddLast(obj);
AliSysInfo::AddStamp(name.Data(),1,ical,counter);
}
if (carray->GetEntries()==0) {
delete carray;
return;
}
TMethodCall callEnv;
Int_t entries =carray->GetEntriesFast();
for (Int_t i=0; i<entries; i++){
TObjArray *templist = new TObjArray(1);
templist->SetOwner(kFALSE);
TObject *currentObject = carray->At(i);
if (!currentObject) {
delete templist;
continue;
}
printf("%s\n",currentObject->GetName());
callEnv.InitWithPrototype(currentObject->IsA(), "Merge", "TCollection*");
if (!callEnv.IsValid()) {
delete templist;
continue;
}
TString oname=currentObject->GetName();
TObject *mergedObject = array->FindObject(currentObject->GetName());
if (!mergedObject) {
array->AddLast(currentObject);
carray->RemoveAt(i);
delete templist;
continue;
}
templist->AddLast(currentObject);
callEnv.SetParam((Long_t) templist);
callEnv.Execute(mergedObject);
AliSysInfo::AddStamp(currentObject->GetName(),2,i,counter);
delete templist;
}
carray->Delete();
delete carray;
}
Bool_t AliFileMerger::IsAccepted(TString name){
Bool_t accept=kTRUE;
if (fAcceptMask){
accept=kFALSE;
for (Int_t iaccept=0; iaccept<fAcceptMask->GetEntries(); iaccept++){
if (name.Contains(fAcceptMask->At(iaccept)->GetName())) accept=kTRUE;
}
}
if (!accept) return kFALSE;
if (fRejectMask){
for (Int_t ireject=0; ireject<fRejectMask->GetEntries(); ireject++){
if (name.Contains(fRejectMask->At(ireject)->GetName())) accept=kFALSE;
}
}
return accept;
}
Bool_t AliFileMerger::IsRejected(TString name){
Bool_t reject=kFALSE;
if (fRejectMask){
for (Int_t ireject=0; ireject<fRejectMask->GetEntries(); ireject++){
if (name.Contains(fRejectMask->At(ireject)->GetName())) {reject=kTRUE; break;}
}
}
return reject;
}
void AliFileMerger::AddReject(const char *reject)
{
if (!fRejectMask) {
fRejectMask = new TObjArray();
fRejectMask->SetOwner(kTRUE);
}
fRejectMask->AddLast(new TObjString(reject));
}
void AliFileMerger::AddAccept(const char *accept)
{
if (!fAcceptMask) {
fAcceptMask = new TObjArray();
fAcceptMask->SetOwner(kTRUE);
}
fAcceptMask->AddLast(new TObjString(accept));
}
int AliFileMerger::MergeRootfile( TDirectory *target, TList *sourcelist, Bool_t nameFiltering)
{
gSystem->GetProcInfo(&procInfo);
AliInfo(Form(">> memory usage %ld %ld", procInfo.fMemResident, procInfo.fMemVirtual));
int status = 0;
cout << "Target path: " << target->GetPath() << endl;
TString path( (char*)strstr( target->GetPath(), ":" ) );
path.Remove( 0, 2 );
TDirectory *first_source = (TDirectory*)sourcelist->First();
Int_t nguess = sourcelist->GetSize()+1000;
THashList allNames(nguess);
allNames.SetOwner(kTRUE);
((THashList*)target->GetList())->Rehash(nguess);
((THashList*)target->GetListOfKeys())->Rehash(nguess);
TList listH;
TString listHargs;
listHargs.Form("((TCollection*)0x%lx)", (ULong_t)&listH);
while(first_source) {
TDirectory *current_sourcedir = first_source->GetDirectory(path);
if (!current_sourcedir) {
first_source = (TDirectory*)sourcelist->After(first_source);
continue;
}
TChain *globChain = 0;
TIter nextkey( current_sourcedir->GetListOfKeys() );
TKey *key, *oldkey=0;
TH1::AddDirectory(kFALSE);
int counterK = 0;
int counterF=0;
while ( (key = (TKey*)nextkey())) {
if (current_sourcedir == target) break;
TString nameK(key->GetName());
if ((!IsAccepted(nameK) && nameFiltering) || (!nameFiltering && IsRejected(nameK))) {
if (!counterF) printf("Object %s is in rejection list, skipping...\n",nameK.Data());
continue;
}
if (oldkey && !strcmp(oldkey->GetName(),key->GetName())) continue;
if (!strcmp(key->GetClassName(),"TProcessID")) {key->ReadObj(); continue;}
if (allNames.FindObject(key->GetName())) continue;
TClass *cl = TClass::GetClass(key->GetClassName());
if (!cl || !cl->InheritsFrom(TObject::Class())) {
cout << "Cannot merge object type, name: "
<< key->GetName() << " title: " << key->GetTitle() << endl;
continue;
}
printf("Merging object %s, anchor directory: %s\n",key->GetName(),key->GetMotherDir()->GetPath());
allNames.Add(new TObjString(key->GetName()));
AliSysInfo::AddStamp(nameK.Data(),1,++counterK,counterF++);
TDirectory* currDir = gDirectory;
key->GetMotherDir()->cd();
TObject *obj = key->ReadObj();
currDir->cd();
if (!obj) {
AliError(Form("Failed to get the object with key %s from %s",key->GetName(),current_sourcedir->GetFile()->GetName()));
continue;
}
if ( obj->IsA()->InheritsFrom( TTree::Class() ) ) {
if (!fNoTrees) {
TString obj_name;
if (path.Length()) {
obj_name = path + "/" + obj->GetName();
} else {
obj_name = obj->GetName();
}
globChain = new TChain(obj_name);
globChain->Add(first_source->GetName());
TFile *nextsource = (TFile*)sourcelist->After( first_source );
while ( nextsource ) {
TFile *curf = TFile::Open(nextsource->GetName());
if (curf) {
Bool_t mustAdd = kFALSE;
if (curf->FindKey(obj_name)) {
mustAdd = kTRUE;
} else {
TObject *aobj = curf->Get(obj_name);
if (aobj) { mustAdd = kTRUE; delete aobj;}
}
if (mustAdd) {
globChain->Add(nextsource->GetName());
}
}
delete curf;
nextsource = (TFile*)sourcelist->After( nextsource );
}
}
} else if ( obj->IsA()->InheritsFrom( TDirectory::Class() ) ) {
cout << "Found subdirectory " << obj->GetName() << endl;
target->cd();
TDirectory *newdir = target->mkdir( obj->GetName(), obj->GetTitle() );
status = MergeRootfile( newdir, sourcelist, kFALSE);
if (status) return status;
} else if ( obj->InheritsFrom(TObject::Class())
&& obj->IsA()->GetMethodWithPrototype("Merge", "TCollection*") ) {
TFile *nextsource = (TFile*)sourcelist->After( first_source );
while ( nextsource ) {
TDirectory *ndir = nextsource->GetDirectory(path);
if (ndir) {
ndir->cd();
TKey *key2 = (TKey*)gDirectory->GetListOfKeys()->FindObject(key->GetName());
if (key2) {
TObject *hobj = key2->ReadObj();
if (!hobj) {
cout << "Failed to get the object with key " << key2->GetName() << " from " <<
ndir->GetFile()->GetName() << "/" << ndir->GetName() << endl;
nextsource = (TFile*)sourcelist->After( nextsource );
continue;
}
hobj->ResetBit(kMustCleanup);
listH.Add(hobj);
Int_t error = 0;
obj->Execute("Merge", listHargs.Data(), &error);
if (error) {
cerr << "Error calling Merge() on " << obj->GetName()
<< " with the corresponding object in " << nextsource->GetName() << endl;
}
listH.Delete();
Double_t numberOfEntries = -1;
if (obj->IsA()->GetMethodAllAny("GetEntries"))
{
TMethodCall getEntries(obj->IsA(), "GetEntries", "");
getEntries.Execute(obj, numberOfEntries);
}
AliSysInfo::AddStamp(nameK.Data(),1,counterK,counterF++,numberOfEntries);
}
}
nextsource = (TFile*)sourcelist->After( nextsource );
}
} else if ( obj->IsA()->InheritsFrom( THStack::Class() ) ) {
THStack *hstack1 = (THStack*) obj;
TList* l = new TList();
TFile *nextsource = (TFile*)sourcelist->After( first_source );
while ( nextsource ) {
TDirectory *ndir = nextsource->GetDirectory(path);
if (ndir) {
ndir->cd();
TKey *key2 = (TKey*)gDirectory->GetListOfKeys()->FindObject(hstack1->GetName());
if (key2) {
THStack *hstack2 = (THStack*) key2->ReadObj();
l->Add(hstack2->GetHists()->Clone());
delete hstack2;
AliSysInfo::AddStamp(nameK.Data(),1,counterK,counterF++);
}
}
nextsource = (TFile*)sourcelist->After( nextsource );
}
hstack1->GetHists()->Merge(l);
l->Delete();
delete l;
} else {
cout << "Cannot merge object type, name: "
<< obj->GetName() << " title: " << obj->GetTitle() << endl;
TFile *nextsource = (TFile*)sourcelist->After( first_source );
while ( nextsource ) {
TDirectory *ndir = nextsource->GetDirectory(path);
if (ndir) {
ndir->cd();
TKey *key2 = (TKey*)gDirectory->GetListOfKeys()->FindObject(key->GetName());
if (key2) {
TObject *nobj = key2->ReadObj();
nobj->ResetBit(kMustCleanup);
int nbytes1 = target->WriteTObject(nobj, key2->GetName(), "SingleKey" );
if (nbytes1 <= 0) status = -1;
delete nobj;
}
}
nextsource = (TFile*)sourcelist->After( nextsource );
}
}
target->cd();
if(obj->IsA()->InheritsFrom( TDirectory::Class() )) {
} else if(obj->IsA()->InheritsFrom( TTree::Class() )) {
if (!fNoTrees) {
globChain->ls();
globChain->Merge(target->GetFile(),0,"keep fast");
delete globChain;
}
} else {
int nbytes2 = obj->Write( key->GetName(), TObject::kSingleKey );
if (nbytes2 <= 0) status = -1;
}
oldkey = key;
delete obj;
}
first_source = (TDirectory*)sourcelist->After(first_source);
}
target->SaveSelf(kTRUE);
gSystem->GetProcInfo(&procInfo);
AliInfo(Form("<< memory usage %ld %ld", procInfo.fMemResident, procInfo.fMemVirtual));
return status;
}
int AliFileMerger::OpenNextChunks(const TList* namesList, TList* filesList, Int_t from, Int_t to)
{
gSystem->GetProcInfo(&procInfo);
AliInfo(Form(">> memory usage %ld %ld", procInfo.fMemResident, procInfo.fMemVirtual));
filesList->Clear();
int nEnt = namesList->GetEntries();
from = from<nEnt ? from : nEnt;
to = to<nEnt ? to : nEnt;
int count = 0;
for (int i=from;i<=to;i++) {
TNamed* fnam = (TNamed*)namesList->At(i);
if (!fnam) continue;
TString fnamS(fnam->GetName());
gSystem->ExpandPathName(fnamS);
if (fnamS.BeginsWith("alien://") && !gGrid) TGrid::Connect("alien");
TFile* source = TFile::Open(fnam->GetName());
if( source==0 ) { printf("Failed to open file %s, will skip\n",fnam->GetName()); continue; }
filesList->Add(source);
printf("Opened file %s\n",fnam->GetName());
count++;
}
gSystem->GetProcInfo(&procInfo);
AliInfo(Form("<< memory usage %ld %ld", procInfo.fMemResident, procInfo.fMemVirtual));
return count;
}
int AliFileMerger::AddFile(TList* namesList, std::string entry)
{
if( entry.empty() ) return 0;
size_t j =entry.find_first_not_of(' ');
if( j==std::string::npos ) return 0;
entry = entry.substr(j);
if( entry.substr(0,1)=="@") {
std::ifstream indirect_file(entry.substr(1).c_str() );
if( ! indirect_file.is_open() ) {
std::cerr<< "Could not open indirect file " << entry.substr(1) << std::endl;
return 1;
}
while( indirect_file ){
std::string line;
std::getline(indirect_file, line);
if( AddFile(namesList, line)!=0 ) return 1;;
}
return 0;
}
namesList->Add(new TNamed(entry,""));
return 0;
}