#include "AliHLTMessage.h"
#include "TVirtualStreamerInfo.h"
#include "Bytes.h"
#include "TFile.h"
#include "TProcessID.h"
#include "TClass.h"
extern "C" void R__zip (Int_t cxlevel, Int_t *nin, char *bufin, Int_t *lout, char *bufout, Int_t *nout);
extern "C" void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout);
const Int_t kMAXBUF = 0xffffff;
Bool_t AliHLTMessage::fgEvolution = kFALSE;
ClassImp(AliHLTMessage)
AliHLTMessage::AliHLTMessage(UInt_t what)
:
# ifdef ROOT_TBufferFile
TBufferFile(kWrite),
# else
TBuffer(kWrite),
# endif
AliHLTLogging(),
fWhat(what),
fClass(0),
fCompress(0),
fBufComp(0),
fBufCompCur(0),
fCompPos(0)
, fBufUncompressed(0)
, fBitsPIDs(0)
, fInfos(NULL)
, fEvolution(kFALSE)
{
UInt_t reserved = 0;
*this << reserved;
*this << what;
SetBit(kCannotHandleMemberWiseStreaming);
}
const Int_t AliHLTMessage::fgkMinimumSize=30;
UInt_t AliHLTMessage::fgkDefaultBuffer[2]={0,0};
AliHLTMessage::AliHLTMessage(void *buf, Int_t bufsize)
:
# if defined(ROOT_TBufferFile)
TBufferFile(kRead, bufsize>fgkMinimumSize?bufsize:sizeof(fgkDefaultBuffer), bufsize>fgkMinimumSize?buf:&fgkDefaultBuffer, 0),
# else
TBuffer(kRead, bufsize>fgkMinimumSize?bufsize:sizeof(fgkDefaultBuffer), bufsize>fgkMinimumSize?buf:&fgkDefaultBuffer, 0),
# endif
AliHLTLogging(),
fWhat(0),
fClass(0),
fCompress(0),
fBufComp(0),
fBufCompCur(0),
fCompPos(0)
, fBufUncompressed(0)
, fBitsPIDs(0)
, fInfos(NULL)
, fEvolution(kFALSE)
{
fBufCur += sizeof(UInt_t);
*this >> fWhat;
if (fWhat & kMESS_ZIP) {
fBufComp = fBuffer;
fBufCompCur = fBuffer + bufsize;
fBuffer = 0;
Uncompress();
fBufComp = NULL;
fBufCompCur = 0;
}
if (fWhat == kMESS_OBJECT) {
InitMap();
fClass = ReadClass();
SetBufferOffset(sizeof(UInt_t) + sizeof(fWhat));
ResetMap();
} else {
fClass = 0;
}
}
AliHLTMessage::~AliHLTMessage()
{
Reset();
}
void AliHLTMessage::EnableSchemaEvolutionForAll(Bool_t enable)
{
fgEvolution = enable;
}
Bool_t AliHLTMessage::UsesSchemaEvolutionForAll()
{
return fgEvolution;
}
void AliHLTMessage::ForceWriteInfo(TVirtualStreamerInfo *info, Bool_t )
{
if (fgEvolution || fEvolution) {
if (!fInfos) fInfos = new TList();
if (fInfos->FindObject(info->GetName())==NULL) {
fInfos->Add(info);
}
}
}
void AliHLTMessage::Forward()
{
if (IsReading()) {
SetWriteMode();
SetBufferOffset(fBufSize);
SetBit(kCannotHandleMemberWiseStreaming);
if (fBufComp) {
fCompPos = fBufCur;
}
}
}
void AliHLTMessage::IncrementLevel(TVirtualStreamerInfo *info)
{
TBufferFile::IncrementLevel(info);
if (!info) return;
if (fgEvolution || fEvolution) {
if (!fInfos) fInfos = new TList();
if (fInfos->FindObject(info->GetName())==NULL) {
fInfos->Add(info);
}
}
}
void AliHLTMessage::Reset()
{
SetBufferOffset(sizeof(UInt_t) + sizeof(fWhat));
ResetMap();
if (fBufComp) {
delete [] fBufComp;
fBufComp = 0;
fBufCompCur = 0;
fCompPos = 0;
}
if (fBufUncompressed) {
delete [] fBufUncompressed;
fBufUncompressed=NULL;
}
}
void AliHLTMessage::SetLength() const
{
if (IsWriting()) {
char *buf = Buffer();
*((UInt_t*)buf) = (UInt_t)(Length() - sizeof(UInt_t));
if (fBufComp) {
buf = fBufComp;
*((UInt_t*)buf) = (UInt_t)(CompLength() - sizeof(UInt_t));
}
}
}
void AliHLTMessage::SetWhat(UInt_t what)
{
fWhat = what;
char *buf = Buffer();
buf += sizeof(UInt_t);
tobuf(buf, what);
if (fBufComp) {
buf = fBufComp;
buf += sizeof(UInt_t);
tobuf(buf, what | kMESS_ZIP);
}
}
void AliHLTMessage::SetCompressionLevel(Int_t level)
{
if (level < 0) level = 0;
if (level > 9) level = 9;
if (level != fCompress && fBufComp) {
delete [] fBufComp;
fBufComp = 0;
fBufCompCur = 0;
fCompPos = 0;
}
fCompress = level;
}
Int_t AliHLTMessage::Compress()
{
if (fCompress == 0) {
if (fBufComp) {
delete [] fBufComp;
fBufComp = 0;
fBufCompCur = 0;
fCompPos = 0;
}
return 0;
}
if (fBufComp && fCompPos == fBufCur) {
return 0;
}
if (fBufComp) {
delete [] fBufComp;
fBufComp = 0;
fBufCompCur = 0;
fCompPos = 0;
}
if (Length() <= (Int_t)(256 + 2*sizeof(UInt_t))) {
return 0;
}
Int_t hdrlen = 2*sizeof(UInt_t);
Int_t messlen = Length() - hdrlen;
Int_t nbuffers = messlen / kMAXBUF;
Int_t chdrlen = 3*sizeof(UInt_t);
Int_t buflen = TMath::Max(512, chdrlen + messlen + 9*nbuffers);
fBufComp = new char[buflen];
char *messbuf = Buffer() + hdrlen;
char *bufcur = fBufComp + chdrlen;
Int_t noutot = 0;
Int_t nzip = 0;
Int_t nout, bufmax;
for (Int_t i = 0; i <= nbuffers; i++) {
if (i == nbuffers)
bufmax = messlen - nzip;
else
bufmax = kMAXBUF;
R__zip(fCompress, &bufmax, messbuf, &bufmax, bufcur, &nout);
if (nout == 0 || nout >= messlen) {
delete [] fBufComp;
fBufComp = 0;
fBufCompCur = 0;
fCompPos = 0;
return -1;
}
bufcur += nout;
noutot += nout;
messbuf += kMAXBUF;
nzip += kMAXBUF;
}
fBufCompCur = bufcur;
fCompPos = fBufCur;
bufcur = fBufComp;
tobuf(bufcur, (UInt_t)(CompLength() - sizeof(UInt_t)));
Int_t what = fWhat | kMESS_ZIP;
tobuf(bufcur, what);
tobuf(bufcur, Length());
return 0;
}
Int_t AliHLTMessage::Uncompress()
{
if (!fBufComp || !(fWhat & kMESS_ZIP))
return -1;
Int_t buflen;
Int_t hdrlen = 2*sizeof(UInt_t);
char *bufcur1 = fBufComp + hdrlen;
frombuf(bufcur1, &buflen);
UChar_t *bufcur = (UChar_t*)bufcur1;
fBuffer = new char[buflen];
fBufUncompressed = fBuffer;
fBufSize = buflen;
fBufCur = fBuffer + sizeof(UInt_t) + sizeof(fWhat);
fBufMax = fBuffer + fBufSize;
char *messbuf = fBuffer + hdrlen;
Int_t nin, nout, nbuf;
Int_t noutot = 0;
while (1) {
nin = 9 + ((Int_t)bufcur[3] | ((Int_t)bufcur[4] << 8) | ((Int_t)bufcur[5] << 16));
nbuf = (Int_t)bufcur[6] | ((Int_t)bufcur[7] << 8) | ((Int_t)bufcur[8] << 16);
R__unzip(&nin, bufcur, &nbuf, messbuf, &nout);
if (!nout) break;
noutot += nout;
if (noutot >= buflen - hdrlen) break;
bufcur += nin;
messbuf += nout;
}
fWhat &= ~kMESS_ZIP;
fCompress = 1;
return 0;
}
void AliHLTMessage::WriteObject(const TObject *obj)
{
if (fgEvolution || fEvolution) {
if (fInfos)
fInfos->Clear();
else
fInfos = new TList();
}
fBitsPIDs.ResetAllBits();
WriteObjectAny(obj, TObject::Class());
}
UShort_t AliHLTMessage::WriteProcessID(TProcessID *pid)
{
if (fBitsPIDs.TestBitNumber(0)) return 0;
if (!pid)
pid = TProcessID::GetPID();
if (!pid) return 0;
fBitsPIDs.SetBitNumber(0);
UInt_t uid = pid->GetUniqueID();
fBitsPIDs.SetBitNumber(uid+1);
return 1;
}
AliHLTMessage* AliHLTMessage::Stream(TObject* pSrc, Int_t compression, unsigned verbosity)
{
if (!pSrc) return NULL;
AliHLTLogging log;
AliHLTMessage* pMsg=new AliHLTMessage(kMESS_OBJECT);
if (!pMsg) {
log.LoggingVarargs(kHLTLogError, "AliHLTMessage", "Stream" , __FILE__ , __LINE__ , "memory allocation failed");
return NULL;
}
pMsg->SetCompressionLevel(compression);
pMsg->WriteObject(pSrc);
if (pMsg->Length()>0) {
pMsg->SetLength();
pMsg->Compress();
if (pMsg->CompBuffer()) {
pMsg->SetLength();
if (verbosity>0) log.LoggingVarargs(kHLTLogInfo, "AliHLTMessage", "Stream" , __FILE__ , __LINE__ , "object %p type %s streamed: size %d", pSrc, pSrc->GetName(), pMsg->CompLength());
} else {
if (verbosity>0) log.LoggingVarargs(kHLTLogInfo, "AliHLTMessage", "Stream" , __FILE__ , __LINE__ , "object %p type %s streamed: size %d", pSrc, pSrc->GetName(), pMsg->Length());
}
}
return pMsg;
}
TObject* AliHLTMessage::Extract(const void* pBuffer, unsigned bufferSize, unsigned verbosity)
{
AliHLTLogging log;
if (!pBuffer || bufferSize<sizeof(AliHLTUInt32_t)) {
if (verbosity>0) log.LoggingVarargs(kHLTLogWarning, "AliHLTMessage", "Extract" , __FILE__ , __LINE__ , "invalid input buffer %p %d", pBuffer, bufferSize);
return NULL;
}
AliHLTUInt32_t firstWord=*((AliHLTUInt32_t*)pBuffer);
if (firstWord==bufferSize-sizeof(AliHLTUInt32_t) &&
firstWord>=34 ) {
AliHLTMessage msg((AliHLTUInt8_t*)pBuffer, bufferSize);
TClass* objclass=msg.GetClass();
TObject* pObject=msg.ReadObject(objclass);
if (pObject && objclass) {
if (verbosity>0) log.LoggingVarargs(kHLTLogInfo, "AliHLTMessage", "Extract" , __FILE__ , __LINE__ , "object %p type %s created", pObject, objclass->GetName());
return pObject;
} else {
if (verbosity>0) log.LoggingVarargs(kHLTLogWarning, "AliHLTMessage", "Extract" , __FILE__ , __LINE__ , "failed to create object from buffer of size %d", bufferSize);
}
} else {
if (verbosity>0) log.LoggingVarargs(kHLTLogWarning, "AliHLTMessage", "Extract" , __FILE__ , __LINE__ , "not a streamed TObject: block size %d, indicated %d", bufferSize, firstWord+sizeof(AliHLTUInt32_t));
}
return NULL;
}
TObject* AliHLTMessage::Extract(const char* filename, unsigned verbosity)
{
if (!filename) return NULL;
AliHLTLogging log;
TString input=filename;
input+="?filetype=raw";
TFile* pFile=new TFile(input);
if (!pFile) return NULL;
TObject* pObject=NULL;
if (!pFile->IsZombie()) {
pFile->Seek(0);
TArrayC buffer;
buffer.Set(pFile->GetSize());
if (pFile->ReadBuffer(buffer.GetArray(), buffer.GetSize())==0) {
pObject=Extract(buffer.GetArray(), buffer.GetSize(), verbosity);
} else {
log.LoggingVarargs(kHLTLogError, "AliHLTMessage", "Extract" , __FILE__ , __LINE__ , "failed reading %d byte(s) from file %s", pFile->GetSize(), filename);
}
}
delete pFile;
return pObject;
}