B
    ɒ\C                 @   s"  d dl Z d dlmZmZmZ d dlmZ d dlZd dl	Z	d dl
Z
d dlZd dlmZ d dlZd dlZd dlmZ d dlmZmZ d dlZddlmZ ddlmZmZ dd	lmZ G d
d dZG dd deZG dd deZG dd deZG dd deZ G dd deZ!G dd deZ"dS )    N)DatasetConcatDatasetSampler)glob)colored)CounterOrderedDict   )util)	TraceModePriorInflation)ConcurrentShelfc               @   s,   e Zd Zdd Zdd Zdd Zdd Zd	S )
Batchc             C   s   || _ t|| _i }d}x^|D ]V}|j}|dkr8td||7 }ddd |jD }||krfg ||< || | qW t|	 | _
|| j | _d S )Nr   zTrace of length zero. c             S   s   g | ]
}|j qS  )address).0variabler   r   U/global/project/projectdirs/dasrepo/etalumis/pyprob_master_mar20/pyprob/nn/dataset.py
<listcomp>   s    z"Batch.__init__.<locals>.<listcomp>)traceslensizelength_controlled
ValueErrorjoinvariables_controlledappendlistvaluessub_batchesZmean_length_controlled)selfr   r    Ztotal_length_controlledtracetlZ
trace_hashr   r   r   __init__   s    

zBatch.__init__c             C   s
   t | jS )N)r   r   )r!   r   r   r   __len__&   s    zBatch.__len__c             C   s
   | j | S )N)r   )r!   keyr   r   r   __getitem__)   s    zBatch.__getitem__c             C   s    x| j D ]}|j|d qW d S )N)device)r   to)r!   r(   r"   r   r   r   r)   ,   s    zBatch.toN)__name__
__module____qualname__r$   r%   r'   r)   r   r   r   r   r      s   r   c               @   s@   e Zd ZdejfddZdd Zdd Zedd	 Z	d
d Z
dS )OnlineDatasetNc             C   s&   || _ |d krtd}|| _|| _d S )Ng    .A)_modelint_length_prior_inflation)r!   modellengthprior_inflationr   r   r   r$   2   s
    zOnlineDataset.__init__c             C   s   | j S )N)r0   )r!   r   r   r   r%   9   s    zOnlineDataset.__len__c             C   s   t | jjtj| jdS )N)
trace_moder4   )nextr.   _trace_generatorr   PRIORr1   )r!   idxr   r   r   r'   <   s    zOnlineDataset.__getitem__c             C   s   | ` | `| `| `| `| `| `| `| `| `	| `
| `x6| jD ],}|`|`|`	|`|`|`|`|`|`|`q8W xF| j D ]8\}}|`|`|`|`|`	|`|`|`|`|`|`|`qtW d S )N)	variablesvariables_uncontrolledvariables_replacedvariables_observedvariables_observablevariables_taggedvariables_dict_addressvariables_dict_address_baseresultlog_problog_prob_observedexecution_time_secr   address_baseinstancecontrolreplacename
observableobservedreusedtaggednamed_variablesitemsdistributionr   )r"   r   _r   r   r   _prune_trace?   sH    zOnlineDataset._prune_tracec          	   O   s   t || }td||||d d}x||k r||7 }tj|d|tt	
 }tj|dd}	xRt|D ]F}
t| jj|tj| jd|}| | ||	t|
< |
d |	d	< qrW |	  t| q,W t  d S )
Nz?Saving offline dataset, traces:{}, traces per file:{}, files:{}Tracesr   zpyprob_traces_{}_{}c)flag)r5   r4      __length)mathceilr
   progress_bar_initformatospathr   struuiduuid4shelveopenranger6   r.   r7   r   r8   r1   rS   closeprogress_bar_updateprogress_bar_end)r!   dataset_dir
num_tracesnum_traces_per_fileargskwargs	num_filesi	file_nameshelfjr"   r   r   r   save_datasetu   s    
 
zOnlineDataset.save_dataset)r*   r+   r,   r   DISABLEDr$   r%   r'   staticmethodrS   rr   r   r   r   r   r-   1   s
   6r-   c               @   s6   e Zd Ze ZdZdd Zdd Zdd Zdd	 Z	d
S )OfflineDatasetFile   c             C   s"   || _ d| _|  }|d | _d S )NFrX   )
_file_name_closed_openr0   )r!   ro   rp   r   r   r   r$      s    zOfflineDatasetFile.__init__c             C   s~   yt j| j}|t j| j< |S  tk
rx   tt jt jkrXt jjdd\}}|  t	j
| jdd}|t j| j< |S X d S )NF)lastr)rV   )ru   cachepoprw   KeyErrorr   cache_capacitypopitemre   rb   rc   )r!   rp   nsr   r   r   ry      s    zOfflineDatasetFile._openc             C   s   | j S )N)r0   )r!   r   r   r   r%      s    zOfflineDatasetFile.__len__c             C   s   |   }|t| S )N)ry   r_   )r!   r9   rp   r   r   r   r'      s    zOfflineDatasetFile.__getitem__N)
r*   r+   r,   r   r|   r   r$   ry   r%   r'   r   r   r   r   ru      s   ru   c                   s:   e Zd Z fddZedd Zdd Zd
dd	Z  ZS )OfflineDatasetc                s  || _ tttj| j d}t|dkr2d| _nd| _tttj| j d}t|dkrjtd	|g }xh|D ]`}yt
|}|| W qt tk
r } z&t| ttd	|dd	gd
 W d d }~X Y qtX qtW t | td	| j  td	t|  td	| j | jr4ttt| | _nvtj| j d}y t|d}d|k}	|  W n   d}	Y nX |	rtd	| t|d}|d | _|d | _|  t| jr| j  | _t| jt| krLtd	t| jt| |nJtd	| t|d}|  \}
}|
|d< ||d< |  || _|
| _td	tt| j ttt| j  }td x&|  D ]\}}td	|| qW t  d S )Nzpyprob_traces_sorted_*r   TFzpyprob_traces_*z$Cannot find any data set files at {}z7Warning: dataset file potentially corrupt, omitting: {}redbold)attrszOfflineDataset at: {}zNum. traces     : {:,}zSorted on disk  : {}Zpyprob_hashesr{   hashesz Using pre-computed hashes in: {}sorted_indiceszLength of pre-computed hashes ({}) and length of offline dataset ({}) do not match. Dataset files have been altered. Delete and re-generate pre-computed hash file: {}z,No pre-computed hashes found, generating: {}rU   zNum. trace types: {:,}zTrace hash	Countz	{:.8f}	{})!Z_dataset_dirsortedr   r]   r^   r   r   Z_sorted_on_diskRuntimeErrorr\   ru   r   	Exceptionprintr   superr$   r   rd   _sorted_indicesrb   rc   re   Z_hashestorch	is_tensorcpunumpy_compute_hashessetr   r   rP   )r!   rh   filesZdatasetsfiledatasetero   Zhashes_fileZhashes_existr   r   Zhashes_and_countshashcount)	__class__r   r   r$      sj    
0


zOfflineDataset.__init__c             C   s6   t ddd | jD tj d }td| j|S )Nr   c             S   s   g | ]
}|j qS r   )r   )r   r   r   r   r   r      s    z.OfflineDataset._trace_hash.<locals>.<listcomp>rW   z{}.{})r   r   r   sysmaxsizefloatr\   r   )r"   hr   r   r   _trace_hash   s    $zOfflineDataset._trace_hashc             C   s   t t| }tdt| d x0tt| D ] }| | | ||< t| q.W t  t	d t 
|\}}t	d |  |  fS )Nz#Hashing offline dataset for sortingrT   zSorting offline datasetzSorting done)r   zerosr   r
   r[   rd   r   rf   rg   r   sortr   r   )r!   r   rn   rR   r   r   r   r   r      s    zOfflineDataset._compute_hashesNc                s  d k	r|d k	r>t dn$|d kr,t dntt| | tjr~tttjddkr~t	t
dddgd tjdd	 ttt| j}t| }t|}tt|}d
| tt fddt|}	|d krd}|d kr|}|dk s&||ks&||ks&||k r6t d||t	d|| td|||| ||| d d d}
xt||D ]}|
d7 }
|	| }t	| t|}|jdd x*t|| D ]\}}| | |t|< qW t|| |d< |  t|
 q~W t  d S )Nz1Expecting either num_traces_per_file or num_files*r   z+Warning: target directory is not empty: {})r   r   )r   T)	directoryz%pyprob_traces_sorted_{{:d}}_{{:0{}d}}c                s   t j | S )N)r]   r^   r   r\   )x)file_name_templaterj   sorted_dataset_dirr   r   <lambda>
  s    z,OfflineDataset.save_sorted.<locals>.<lambda>z:Invalid indexes begin_file_index:{} and end_file_index: {}zLSorted offline dataset, traces: {}, traces per file: {}, files: {} (overall)zKSaving sorted files with indices in range [{}, {}) ({} of {} files overall)rW   ZFiles)writerX   )r   rY   rZ   r   r]   r^   existsr   r   r   r   r\   r
   create_pathr   chunksr   r_   maprd   r[   r   lock	enumerateunlockrf   rg   )r!   r   rj   rm   begin_file_indexend_file_indexZfile_indicesri   Znum_files_digitsZ
file_namesrq   rn   ro   rp   Znew_iZold_ir   )r   rj   r   r   save_sorted   sJ    


(&zOfflineDataset.save_sorted)NNNN)	r*   r+   r,   r$   rt   r   r   r   __classcell__r   r   )r   r   r      s   ;r   c               @   s$   e Zd Zdd Zdd Zdd ZdS )TraceSamplerc             C   s   t |tstd|j| _d S )Nz%Expecting an OfflineDataset instance.)
isinstancer   	TypeErrorr   )r!   offline_datasetr   r   r   r$   $  s    
zTraceSampler.__init__c             C   s
   t | jS )N)iterr   )r!   r   r   r   __iter__)  s    zTraceSampler.__iter__c             C   s
   t | jS )N)r   Z_offline_dataset)r!   r   r   r   r%   ,  s    zTraceSampler.__len__N)r*   r+   r,   r$   r   r%   r   r   r   r   r   #  s   r   c               @   s&   e Zd Zd	ddZdd Zdd ZdS )
TraceBatchSamplerTc             C   s0   t |tstdtt|j|| _|| _d S )Nz%Expecting an OfflineDataset instance.)	r   r   r   r   r
   r   r   _batches_shuffle_batches)r!   r   
batch_sizeshuffle_batchesr   r   r   r$   1  s    
zTraceBatchSampler.__init__c             C   s   | j rtj| j t| jS )N)r   nprandomshuffler   r   )r!   r   r   r   r   7  s    zTraceBatchSampler.__iter__c             C   s
   t | jS )N)r   r   )r!   r   r   r   r%   <  s    zTraceBatchSampler.__len__N)T)r*   r+   r,   r$   r   r%   r   r   r   r   r   0  s   
r   c               @   s&   e Zd Zd
ddZdd Zdd Zd	S )DistributedTraceBatchSamplerTrW   c             C   sd  t |tstdt s"tdt | _t | _	t
t|j| | j }|| }tttt|j||| _t| jd |k r| jd= || _t
t| j| | _| j| jk rtdt|||| j| jtt| j| j| _t| jd | jk rHt| jdk r*tdt|||| jd | jd  | jd= || _|| _d| _d| _d S )	Nz%Expecting an OfflineDataset instance.zExpecting distributed training.zfoffline_dataset:{}, batch_size:{} and num_buckets:{} imply a bucket_size:{} smaller than world_size:{}r	   zGoffline_dataset:{} too small for given batch_size:{} and num_buckets:{}r   )r   r   r   distis_availabler   get_world_size_world_sizeget_rank_rankrY   floorr   r   r   r
   r   
drop_itemsr   Z_num_bucketsrZ   Z_bucket_sizer\   _bucketsextendr   _shuffle_buckets_epoch_current_bucket_id)r!   r   r   r   num_bucketsZshuffle_bucketsZnum_batches_to_dropZnum_traces_to_dropr   r   r   r$   A  s2    


 z%DistributedTraceBatchSampler.__init__c             c   s   |  j d7  _ | jrFtj }tj| j  tj| j tj| xtt	| jD ]f\}}|| _
tt|| j }|| jt|| j d | }| jrtj| x|D ]
}|V  qW qRW d S )NrW   )r   r   r   r   	get_stateseedr   r   	set_stater   r   rY   r   r   r   r   r   )r!   stZ	bucket_idbucketZnum_batchesZbatchesbatchr   r   r   r   a  s    

z%DistributedTraceBatchSampler.__iter__c             C   s
   t | jS )N)r   r   )r!   r   r   r   r%   w  s    z$DistributedTraceBatchSampler.__len__N)TrW   T)r*   r+   r,   r$   r   r%   r   r   r   r   r   @  s   
 r   )#r   Ztorch.utils.datar   r   r   torch.distributeddistributedr   rY   r]   r   rb   r   r   r   r`   	termcolorr   collectionsr   r   r   r   r
   r   r   concurrencyr   r   r-   ru   r   r   r   r   r   r   r   r   <module>   s,   V#y