o
    TZha                     @   s   d Z ddlZddlZddlmZ ddlmZ ddlmZ ddl	Z
ddlZddlmZ zddlmZ W n ey=   dZY nw dd	lmZ d
d Zdd Zdd Zdd Z	dddZdd ZG dd dZG dd dZdd ZdS )zTF-specific utils import.    N)partial)ceil)uuid4)get_context)SharedMemory   )configc                    s   t | tr| S tjrdd l}ntd| d }i }| D ]<\ }t |tjr6t	 fdd| D | < qt ||j
rK|	 fdd| D | < qt fdd| D | < q|S )Nr   FCalled a Tensorflow-specific function but Tensorflow is not installed.c                       g | ]}|  qS  r   .0fkr   N/var/www/html/lang_env/lib/python3.10/site-packages/datasets/utils/tf_utils.py
<listcomp>0       z)minimal_tf_collate_fn.<locals>.<listcomp>c                    r
   r   r   r   r   r   r   r   2   r   c                    r
   r   r   r   r   r   r   r   4   r   )
isinstancedictr   TF_AVAILABLE
tensorflowImportErroritemsnpndarraystackZTensorarray)featurestffirstbatchvr   r   r   minimal_tf_collate_fn$   s   

r#   c                 C   s&   t | }d|v r|d |d< |d= |S )Nlabellabels)r#   )r   r!   r   r   r   #minimal_tf_collate_fn_with_renaming8   s
   r&   c                 C   s:   t j| rt| jS t j| pt j| pt j| S N)patypesZis_listis_numeric_pa_typeZ
value_type
is_integerZis_floatingZ
is_decimal)Zpa_typer   r   r   r*   @   s   
$r*   c                 C   s   ddl m}m}m} ddlm} t| |rt| jS t| t	r%t| d S t| |r0t
|  jS t| |r:t
|  S t| |rAdS dS )Nr   )
ClassLabelSequenceValue)_ArrayXDr   TF) r,   r-   r.   Zfeatures.featuresr/   r   is_numeric_featurefeaturelistr*   Zstorage_dtype)r2   r,   r-   r.   r/   r   r   r   r1   F   s   






r1   Fc                    sb  t | tjs
|  } d}t | tjr||    d}n+tt| dkr2|| d | d d   nt | tjr=||   n	td	t
| d urUfdd  D  |rltt  d } fd	d
t|D  | fi | |ri }	| D ]\}
}t |
 }||}||	|
< q||	S g }	| D ]\}
}t |
 }||}|	| q|	S )NTF   r   zUnexpected type for indices: {}c                    s&   i | ]\}}| v s|d v r||qS ))r$   Z	label_idsr%   r   r   keyvalue)cols_to_retainr   r   
<dictcomp>k   s
    z np_get_batch.<locals>.<dictcomp>c                    s"   g | ]  fd d  D qS )c                    s   i | ]	\}}||  qS r   r   r6   ir   r   r:   t       z+np_get_batch.<locals>.<listcomp>.<dictcomp>r   )r   )r!   r;   r   r   t   s   " z np_get_batch.<locals>.<listcomp>)r   r   r   numpyintegeritemalldiffRuntimeErrorformattyper   lenr3   valuesranger   Zastypeappend)indicesdatasetr9   
collate_fncollate_fn_argscolumns_to_np_typesreturn_dictZ
is_batchedZactual_sizeZ	out_batchcol
cast_dtyper   r   )r!   r9   r   np_get_batchX   s@   




rS   c	              	      sl  t jrddlntdtdrjntjjdr"jjjnt	dkr-t
d dtt||| dd	fd
d  D jdjgd fdd}	jjt	}
|rdurjdjdjdd}fdd}|
||}
n	|r|
|
 }
|dur|
j||d}
|
|	}
|durfdd}nfdd}|
|S )a  Create a tf.data.Dataset from the underlying Dataset. This is a single-process method - the multiprocess
    equivalent is multiprocess_dataset_to_tf.

    Args:
        dataset (`Dataset`): Dataset to wrap with tf.data.Dataset.
        cols_to_retain (`List[str]`): Dataset column(s) to load in the
            tf.data.Dataset. It is acceptable to include column names that are created by the `collate_fn` and
            that do not exist in the original dataset.
        collate_fn(`Callable`): A function or callable object (such as a `DataCollator`) that will collate
            lists of samples into a batch.
        collate_fn_args (`Dict`): A  `dict` of keyword arguments to be passed to the
            `collate_fn`. Can be empty.
        columns_to_np_types (`Dict[str, np.dtype]`): A `dict` mapping column names to numpy dtypes.
        output_signature (`Dict[str, tf.TensorSpec]`): A `dict` mapping column names to
            `tf.TensorSpec` objects.
        shuffle(`bool`): Shuffle the dataset order when loading. Recommended True for training, False for
            validation/evaluation.
        batch_size (`int`, default `None`): Size of batches to load from the dataset. Defaults to `None`, which implies that
            the dataset won't be batched, but the returned dataset can be batched later with `tf_dataset.batch(batch_size)`.
        drop_remainder(`bool`, default `None`): Drop the last incomplete batch when loading. If not provided,
            defaults to the same setting as shuffle.

    Returns:
        `tf.data.Dataset`
    r   Nr	   random_index_shuffleindex_shufflei zto_tf_dataset() can be memory-inefficient on versions of TensorFlow older than 2.9. If you are iterating over a dataset with a very large number of samples, consider upgrading to TF >= 2.9.F)rL   r9   rM   rN   rO   rP   c                    s   g | ]} j |qS r   )ZdtypesZas_dtype)r   dtype)r   r   r   r          z!dataset_to_tf.<locals>.<listcomp>)Zinput_signaturec                    s,   j | gd  fddt D S )N)inpZToutc                    s   i | ]	\}}| | qS r   r   )r   r<   r7   outputr   r   r:      r=   z9dataset_to_tf.<locals>.fetch_function.<locals>.<dictcomp>)Zpy_function	enumeratekeys)rK   )rO   	getter_fnr   toutrY   r   fetch_function   s   z%dataset_to_tf.<locals>.fetch_function   r5   )rV   )r8   c                    s@    | dkrjjddjd} || t d d}| |fS )Nr5   r`   l            )shapemaxvalrV   r4   )indexseedZ	max_index)Z
reduce_allrandomuniformint64rG   )staterd   Zshuffled_index)rL   rT   r   r   r   scan_random_index   s   z(dataset_to_tf.<locals>.scan_random_index)drop_remainderc                        fdd|   D S )Nc                    s$   i | ]\}}| | | jqS r   Zensure_shaperb   r   r7   valoutput_signaturer   r   r   r:         $ 8dataset_to_tf.<locals>.ensure_shapes.<locals>.<dictcomp>r>   Z
input_dictrp   r   r   ensure_shapes      z$dataset_to_tf.<locals>.ensure_shapesc                    rl   )Nc              	      s,   i | ]\}}| | | jd d qS )r4   Nrm   rn   rp   r   r   r:      s   , rs   r>   rt   rp   r   r   ru      rv   )r   r   r   r   hasattrrT   rf   experimentalrU   rG   warningswarnr   rS   rH   functionZ
TensorSpecrh   dataDatasetrI   fillcastscanshuffleZcardinalityr!   map)rL   r9   rM   rN   rO   rq   r   
batch_sizerk   r_   
tf_datasetZ	base_seedrj   ru   r   )rO   rL   r]   rq   rT   r   r^   r   dataset_to_tf   sL   $



r   c                   @   s4   e Zd Zdd Zdd Zdd Zdd Zd	d
 ZdS )SharedMemoryContextc                 C   s   g | _ g | _d S r'   )created_shmsopened_shmsselfr   r   r   __init__   s   
zSharedMemoryContext.__init__c                 C   s6   t t|||d}|r| j| |S | j| |S )N)sizenamecreate)r   intr   rJ   r   )r   r   r   r   shmr   r   r   get_shm   s   zSharedMemoryContext.get_shmc                 C   s4   | j |t|t|j |d}tj|||jdS )N)r   r   r   )rV   buffer)r   r   prodrV   itemsizer   buf)r   r   rb   rV   r   r   r   r   r   	get_array
  s   "zSharedMemoryContext.get_arrayc                 C      | S r'   r   r   r   r   r   	__enter__     zSharedMemoryContext.__enter__c                 C   s4   | j D ]
}|  |  q| jD ]}|  qd S r'   )r   closeunlinkr   )r   exc_type	exc_value	tracebackr   r   r   r   __exit__  s   



zSharedMemoryContext.__exit__N)__name__
__module____qualname__r   r   r   r   r   r   r   r   r   r      s    
r   c                   @   s<   e Zd Zdd Zdd Zdd Zedd Zed	d
 ZdS )NumpyMultiprocessingGeneratorc                    s~   | _ | _| _| _dd | D  _ fdd| D  _| _| _| _	|	 _
|
 _ fdd| D  _d S )Nc                 S   s$   g | ]\}}|t jt jfv r|qS r   )r   Zunicode_Zstr_r   rQ   rV   r   r   r   r   +  rr   z:NumpyMultiprocessingGenerator.__init__.<locals>.<listcomp>c                    s*   i | ]\}}|| j vr|ntd qS )U1)string_columnsr   rV   r   r   r   r   r:   -  s    z:NumpyMultiprocessingGenerator.__init__.<locals>.<dictcomp>c                    s8   i | ]\}}|| j vrt|jjnt|jjd  qS r4   )r   r   rb   rank)r   rQ   specr   r   r   r:   7  s    &)rL   r9   rM   rN   r   r   rO   rq   r   r   rk   num_workerscolumns_to_ranks)r   rL   r9   rM   rN   rO   rq   r   r   rk   r   r   r   r   r     s    

z&NumpyMultiprocessingGenerator.__init__c              
   #   s   t jtttjj }jjj|j	\}}}t
dg g }g }fddt|D }fddt|D }jjjjjjjd}	t t|D ][tt }
d d|
 d d  fd	d
j D }|| | }|kr|d ur|}nd }||| | d|	}jj|dd}|  || qZd}|s8t|D ]v| jddstd|   | }tdd | D rd} nPt :  fdd
| D }dd
 | D }jD ]}|| d|| j d  !d||< qW d    n	1 s'w   Y  |V  | "  q|r|D ]}|#  q:W d    d S 1 sNw   Y  d S )NZspawnc                       g | ]}   qS r   Eventr   _ctxr   r   r   G  r   z:NumpyMultiprocessingGenerator.__iter__.<locals>.<listcomp>c                    r   r   r   r   r   r   r   r   H  r   )rL   r9   rM   rN   rO   r   r   Zdw_r   
   c              	      4   i | ]\}}| j  d | d|ftjddqS )r   _shapeTrb   rV   r   r   r   rh   r   rQ   r   shm_ctxworker_namer   r   r:   Y      "z:NumpyMultiprocessingGenerator.__iter__.<locals>.<dictcomp>)r   rK   extra_batcharray_ready_eventarray_loaded_eventT)targetkwargsdaemonF<   )timeoutzData loading worker timed out!c                 s   s    | ]
}t |d k V  qdS )r   N)r   any)r   rb   r   r   r   	<genexpr>w  s    z9NumpyMultiprocessingGenerator.__iter__.<locals>.<genexpr>c              	      s8   i | ]\}}| j   d | |j| ddqS )r   Fr   )r   rO   )r   rQ   rb   )batch_shm_ctxr<   namesr   r   r   r:     s    c                 S   s   i | ]
\}}|t |qS r   )r   copy)r   rQ   Zarrr   r   r   r:     s    Ur5   )$minr   r   r   rG   rL   r   distribute_batchesrk   r   r   rI   r9   rM   rN   rO   r   r   r   strr   rJ   r   Processworker_loopstartwaitTimeoutErrorclearr   rH   viewrb   squeezesetjoin)r   r   Zper_worker_batchesZfinal_batchZfinal_batch_workershape_arraysworkersZarray_ready_eventsZarray_loaded_eventsZ	base_argsZworker_random_idZworker_shape_arraysworker_indicesZfinal_batch_argZworker_kwargsZworkerZend_signal_receivedZarray_shapesZarraysZ
string_colr   )r   r   r<   r   r   r   r   r   __iter__<  s    
	



"
)$z&NumpyMultiprocessingGenerator.__iter__c                 C   r   r'   r   r   r   r   r   __call__  r   z&NumpyMultiprocessingGenerator.__call__c              
      s   dt jd< tjrdd l}ntd|jg d  	
f
dd}t 9
fdd	| D |D ]}|| q9|d urH||  D ]
\}}d
|d d < qL	  W d    d S 1 sfw   Y  d S )N3ZTF_CPP_MIN_LOG_LEVELr   r	   ZGPUc              	      s   t | dd}i }t S} D ]9\}}|| }|v r,|d|jd }|j| d d < |j	 d| |j|dd||< ||| d d < q      	  W d    d S 1 sfw   Y  d S )NT)rK   rL   r9   rM   rN   rO   rP   r   )r5   r   r   )
rS   r   r   r   reshaperb   r   r   r   r   )rK   r!   Z
out_arraysr   rQ   rR   r   )
r   r   rM   rN   r9   rO   rL   r   r   r   r   r   send_batch_to_parent  s0   

"zGNumpyMultiprocessingGenerator.worker_loop.<locals>.send_batch_to_parentc              	      r   )r   r   Fr   r   r   r   r   r   r:     r   z=NumpyMultiprocessingGenerator.worker_loop.<locals>.<dictcomp>r5   )
osenvironr   r   r   r   Zset_visible_devicesr   r   r   )rL   r9   rM   rN   rO   r   r   rK   r   r   r   r   r   r   r!   rQ   r   r   )r   r   rM   rN   r9   rO   rL   r   r   r   r   r   r     s$   

!

"z)NumpyMultiprocessingGenerator.worker_loopc                 C   s  t t| }|rt j| t|}|||  }t ||g\}}|s*t|dkr,d }|d|}t|}	|	|	|  }
t ||
g\}}|d||}t j||jd dd}dd |D }tt|D ]}t j	|| || ddgdd||< qd|d urt|}nd }|||fS )Nr   r5   r4   )Zaxisc                 S   s   g | ]}t |d qS r   )r   r   )r   r   r   r   r   r     rW   zDNumpyMultiprocessingGenerator.distribute_batches.<locals>.<listcomp>)
r   ZarangerG   rf   r   splitr   rb   rI   Zconcatenate)rL   r   rk   r   r   rK   Znum_samplesZincomplete_batch_cutoffZlast_incomplete_batchZnum_batchesZfinal_batches_cutoffZfinal_batchesZper_worker_indicesr<   Zincomplete_batch_worker_idxr   r   r   r     s*   (

z0NumpyMultiprocessingGenerator.distribute_batchesN)	r   r   r   r   r   r   staticmethodr   r   r   r   r   r   r     s    "a
Gr   c
                 C   s   t jrddl}
ntdt| |||||||||	d
}|
jjj||d}|r.tt	| | }n
tt
t	| | }||
jj|S )ao  Create a tf.data.Dataset from the underlying Dataset. This is a multi-process method - the single-process
    equivalent is dataset_to_tf.

    Args:
        dataset (`Dataset`): Dataset to wrap with tf.data.Dataset.
        cols_to_retain (`List[str]`): Dataset column(s) to load in the
            tf.data.Dataset. It is acceptable to include column names that are created by the `collate_fn` and
            that do not exist in the original dataset.
        collate_fn(`Callable`): A function or callable object (such as a `DataCollator`) that will collate
            lists of samples into a batch.
        collate_fn_args (`Dict`): A  `dict` of keyword arguments to be passed to the
            `collate_fn`. Can be empty.
        columns_to_np_types (`Dict[str, np.dtype]`): A `dict` mapping column names to numpy dtypes.
        output_signature (`Dict[str, tf.TensorSpec]`): A `dict` mapping column names to
            `tf.TensorSpec` objects.
        shuffle(`bool`): Shuffle the dataset order when loading. Recommended True for training, False for
            validation/evaluation.
        batch_size (`int`, default `None`): Size of batches to load from the dataset. Defaults to `None`, which implies that
            the dataset won't be batched, but the returned dataset can be batched later with `tf_dataset.batch(batch_size)`.
        drop_remainder(`bool`, default `None`): Drop the last incomplete batch when loading. If not provided,
            defaults to the same setting as shuffle.
        num_workers (`int`): Number of workers to use for loading the dataset. Should be >= 1.

    Returns:
        `tf.data.Dataset`
    r   Nr	   )
rL   r9   rM   rN   rO   rq   r   r   rk   r   )rq   )r   r   r   r   r   r|   r}   Zfrom_generatorr   rG   r   applyrx   Zassert_cardinality)rL   r9   rM   rN   rO   rq   r   r   rk   r   r   Zdata_generatorr   Zdataset_lengthr   r   r   multiprocess_dataset_to_tf	  s(   &
r   )F)__doc__r   ry   	functoolsr   mathr   uuidr   r?   r   Zpyarrowr(   Zmultiprocessr   Zmultiprocess.shared_memoryr   r   r0   r   r#   r&   r*   r1   rS   r   r   r   r   r   r   r   r   <module>   s4   
0q  q