B
    #=6\D                 @   s  d dl mZ d dl mZ d dl mZ d dl mZ d dlmZmZ d dlm	Z	m
Z
 d dlmZmZ d dlmZmZ d d	lmZmZmZ G d
d deZdZdd Zd%ddZdd Zd&ddZd'ddZdd Zdd Zdd Zd(dd Z G d!d" d"e	Z!G d#d$ d$eZ"dS ))    )absolute_import)division)print_function)unicode_literals)core
queue_util)ReaderWriter)
NetBuilderops)	as_recordField)NodeTask	TaskGroupc               @   s   e Zd ZdZdddZdS )Outputz
    Represents the result of a processor function. A processor can either
    return an Output, or it can return a record, in which case an Output will be
    created for it afterwards.
    Nc             C   s|   t   }|d ks(t|dks(td|d kr4|}t|tjrF|g}|d krRg nt|| _	|d krhd nt
|| _|| _d S )Nr   z7Cannot both use `ops` syntax and return a list of nets.)r
   currentgetlenAssertionError
isinstancer   Netlistnetsr   recordshould_stop)selfr   r   r   Zbuilder_children r   9/tmp/pip-install-l3r2oljg/torch/caffe2/python/pipeline.py__init__   s    zOutput.__init__)NNN)__name__
__module____qualname____doc__r   r   r   r   r   r      s   r   d   c             C   s   | d kr*t j|d k	r|ntd}| }nTt| trN|d ksDtdd }| }n0t| drv|d kshtd| }|  }ntd|	|| ||fS )N)capacityzcapacity would not be used.writerz)output must be a reader, queue or stream.)
r   QueueDEFAULT_QUEUE_CAPACITYr&   r   r	   r   hasattr
ValueErrorsetup_ex)outputr%   global_init_netglobal_exit_net	out_queuer&   r   r   r   _init_output%   s     



r0   Nc                sR    d krdd S t  tjr$t S d k	rJt drJ fdd}| _ S d S )Nc             S   s   | S )Nr   )recr   r   r   <lambda><   s    z make_processor.<locals>.<lambda>schema_funcc                  s
     S )N)r3   r   )	processorreaderr   r   processor_schemaA   s    z(make_processor.<locals>.processor_schema)r   r   r   NetProcessorr)   schema)r4   r5   r6   r   )r4   r5   r   make_processor:   s    r9   c             C   s|   t | tr| S t | tr"t| dS t | trpt| dkoTt | d toTt | d tj}|rftd|  S t|  S nt| S dS )z
    Allow for processors to return results in several formats.
    TODO(azzolini): simplify once all processors use NetBuilder API.
    )r      r      N)N)r   r   r   tupler   r   BlobReference)r,   Zis_record_and_blobr   r   r   normalize_processor_outputH   s    




r>   r;   c       
   	   C   s   t | |||||||\}}	|S )a[
  
    Given a Reader, Queue or DataStream in `input`, and optionally, a Writer,
    Queue or DataStream in `output`, creates a Task that, when run, will
    pipe the input into the output, using multiple parallel threads.
    Additionally, if a processor is given, it will be called between reading
    and writing steps, allowing it to transform the record.

    Args:
        input:       either a Reader, Queue or DataStream that will be read
                     until a stop is signaled either by the reader or the
                     writer.
        output:      either a Writer, a Queue or a DataStream that will be
                     writen to as long as neither reader nor writer signal
                     a stop condition. If output is not provided or is None,
                     a Queue is created with given `capacity` and writen to.
        num_threads: number of concurrent threads used for processing and
                     piping. If set to 0, no Task is created, and a
                     reader is returned instead -- the reader returned will
                     read from the reader passed in and process it.
                     ** DEPRECATED **. Use `num_runtime_threads` instead.
                     This option will be removed once all readers/processors
                     support `num_runtime_threads`.
        processor:   (optional) function that takes an input record and
                     optionally returns a record; this will be called
                     between read and write steps. If the processor does
                     not return a record, a writer will not be instantiated.
                     Processor can also be a core.Net with input and output
                     records properly set. In that case, a NetProcessor is
                     instantiated, cloning the net for each of the threads.
        name:        (optional) name of the task to be created.
        capacity:    when output is not passed, a queue of given `capacity`
                     is created and written to.
        group:       (optional) explicitly add the created Task to this
                     TaskGroup, instead of using the currently active one.
        num_runtime_threads: Similar to `num_threads`, but instead of expanding
                     the tasks with a `for` loop in python, does that at
                     runtime. This is preferable to `num_threads`, but some
                     processors/readers still require to be called multiple
                     times in python.

    Returns:
        Output Queue, DataStream, Reader, or None, depending on the parameters
        passed.
    )
_pipe_step)
inputr,   num_threadsr4   namer%   groupnum_runtime_threadsresult_r   r   r   pipec   s    /
rG   c	          
   C   s\   |dkst t| ||||||||	\}	}
d}|dk	rT|
 }t|ttfkrT|d }|	|fS )aq  
    Similar to `pipe`, with the additional ability for the pipe Task to
    return output values to the `Session` once done.

    Returns:
        Tuple (out_queue, *task_outputs)
            out_queue:    same as return value of `pipe`.
            task_outputs: TaskOutput object, fetchable from the client after
                          session.run() returns.
    r   N)r   r?   outputstyper   r<   )r@   r,   rA   r4   rB   r%   rC   rD   final_outputsrE   taskr   r   r   pipe_and_output   s    rL   c             C   sT   t | dr| jS t | drL| jdkr*| jS t | drFd| jj| jf S | jS | jjS )NrB   	func_namez<lambda>im_classz%s.%s)r)   rB   rM   r!   rN   r    	__class__)r4   r   r   r   processor_name   s    



rP   c          
   C   s  t t }d|d| tr"ttnd|r0t|nd}t| |||d|}	td}
td}|	||
 td}td	}|
||\}}}|jg |gg d
tjjd |d k	rt||||
\}}|||||\}}nd }g }t  t| W d Q R X t  t| W d Q R X td}|jg |d}td}||g  ttjd|gt| t| |g |d t| t  t| W d Q R X t  t|
 W d Q R X W d Q R X ||	fS )Nz{0}/{1}/{2}/{3}/{4}rG   NoInputNoOutput)rB   rC   rH   Znum_instancesz	pipe:exitz	pipe:initzpipe:instance:initzpipe:instance:exitF)shapevaluedtypetimer_start)counter_name	timer_endbody)should_stop_blob)strr   r   formatr@   rP   r   r   r   r+   read_record_exConstantFillDataTypeBOOLr0   write_record_exr   Z	task_initnetZtask_instance_init
TimerBeginTimerEndexecution_stepr   Ztask_instance_exitZ	task_exit)rB   rC   rJ   r5   rA   r,   r%   	node_nameprofiler_namerK   r.   r-   init_netexit_net	read_netsstatusr1   r/   r&   
write_netsrF   timer_start_nettimertimer_end_netr   r   r   _runtime_threads_task   s\    











rp   c             C   s  t t }d|d| tr"ttnd|r0t|nd}t| ||d}	td}
td}|	||
 d }d }g }xPt
|D ]B}td| d	}td}td}|||\}}}|jg |gg d
tjjd |d k	r,|d krt|	jd t||||
\}}W d Q R X |||||\}}ng }td}|jg |d}td}||g  t| ttjd|gt| t| |g |d t| t| W d Q R X |t| qW t| ttjd|dd t|
 W d Q R X ||	fS )Nz{0}/{1}/{2}/{3}/{4}rG   rQ   rR   )rB   rC   rH   exitinitzt:%d)rB   F)rS   rT   rU   )Z	_fullnamerV   )rW   rX   rY   )rZ   T)Zconcurrent_substeps)r[   r   r   r\   r@   rP   r   r   r   r+   ranger
   r]   r^   r_   r`   rB   r0   ra   rc   rd   r   rb   re   r   appendZto_execution_step)rB   rC   rJ   r5   rA   r,   r%   rf   rg   rK   r.   r-   r/   r&   ZstepsZ	thread_idnbrh   ri   rj   rk   r1   rl   rF   rm   rn   ro   r   r   r   _static_threads_task   sf    











rv   c	       
      C   s   |dks|dkst dt| tr(| }	n&t| dr<|  }	ntdt| |dk	r`t|	|}	|dksp|dkr|dks|t |	dfS |dkr|dk	rt	|}|dkr|dk	rdt	| }|dkrdt	|  }|dkrt
||||	|||S t||||	|||S dS )	z
    r;   z;Only one of num_threads or num_runtime_threads must be set.r5   z/Input must be a reader, queue or stream. Got {}Nr   zpipe_into:%szpipe_from:%s)r   r   r   r)   r5   r*   r\   rI   ProcessingReaderrP   rv   rp   )
r@   r,   rA   r4   rB   r%   rC   rD   rJ   r5   r   r   r   r?   :  s2    



r?   c               @   s0   e Zd ZdZdd Zdd Zdd Zdd	 Zd
S )rw   zo
    Reader that reads from an upstream reader, calls the processor, and returns
    the processed record.
    c             C   s    t |  || _t||| _d S )N)r   r   r5   r9   r4   )r   r5   r4   r   r   r   r   f  s    
zProcessingReader.__init__c             C   s
   | j  S )N)r4   r8   )r   r   r   r   r8   k  s    zProcessingReader.schemac             C   s   | j || d S )N)r5   r+   )r   rh   Z
finish_netr   r   r   r+   n  s    zProcessingReader.setup_exc       
   	   C   s   | j ||\}}}t }t| |}W d Q R X ||j7 }|jsJ|jrt	d}|jrn|
||jg|g |jr|
||jg|g || t| jdr|tj| j | |j |jr|j nd }	|||	fS )Nstop_netsetup)r5   r]   r
   r>   r4   r   r   Z
_stop_blobr   r   Orrt   r)   Zadd_attributer   ZLOCAL_SETUPZ_set_schemar   Zfield_blobs)
r   rh   ri   rj   rk   r1   ru   rE   rx   fieldsr   r   r   read_exq  s     


zProcessingReader.read_exN)r    r!   r"   r#   r   r8   r+   r|   r   r   r   r   rw   a  s
   rw   c               @   s:   e Zd ZdZdddZdd Zdd Zd	d
 Zdd ZdS )r7   z
    Processor that clones a core.Net each time it's called, executing
    the cloned net as the processor. It requires the Net to have input
    and (optionally) output records set, with net.set_input_record() and
    net.set_output_record().
    Nc             C   sb   t |tjst|d ks(t |tjs(t|p2t|| _|p<g | _|| _|| _	g | _
d| _g | _d S )NF)r   r   r   r   r=   r[   rB   thread_init_netsrb   _stop_signal
_blob_maps_frozen_cloned_init_nets)r   rb   stop_signalr}   rB   r   r   r   r     s    

zNetProcessor.__init__c             C   s
   | j  S )N)rb   output_record)r   r   r   r   r8     s    zNetProcessor.schemac             C   s   d| _ | j}g | _|S )NT)r   r   )r   rh   Zcloned_init_netsr   r   r   ry     s    zNetProcessor.setupc       	      C   s   | j r
tt jd }i }x6| jD ],}t|t|| ||\}}| j	
| q$W t| jt| j| |||\}}| jd krd }n.t| j|krtj|t| j |d}n| j}| j
| t|g| |S )N/)rb   )r   r   r
   r   rB   r}   r   Zclone_and_bind_netr[   r   rt   rb   r~   r=   r   r   r   )	r   r1   prefixZ
blob_remaprb   Znew_netrF   Z
remappingsr   r   r   r   __call__  s$    


zNetProcessor.__call__c             C   s   d| _ | jS )NT)r   r   )r   r   r   r   	blob_maps  s    zNetProcessor.blob_maps)NNN)	r    r!   r"   r#   r   r8   ry   r   r   r   r   r   r   r7     s   
r7   )N)Nr;   NNNNr;   )Nr;   NNNNr;   N)Nr;   NNNNNN)#
__future__r   r   r   r   Zcaffe2.pythonr   r   Zcaffe2.python.dataior   r	   Zcaffe2.python.net_builderr
   r   Zcaffe2.python.schemar   r   Zcaffe2.python.taskr   r   r   objectr   r(   r0   r9   r>   rG   rL   rP   rp   rv   r?   rw   r7   r   r   r   r   <module>   s0   
 
4 
<B 
%,