o
    TZh\6                     @   s"  d dl Z d dlZd dlZd dlmZ d dlmZmZmZm	Z	m
Z
mZ d dlZd dlZd dlZd dlmZmZ d dlmZ d dlmZmZ d dlmZ d dlmZ ejje Z!er]d dl"Z"eG d	d
 d
ej#Z$dddee% fddZ&dddee% fddZ'G dd deZ(G dd dej)Z*dS )    N)	dataclass)TYPE_CHECKINGIterableListOptionalTupleUnion)ArrowWriterParquetWriter)MAX_SHARD_SIZE)is_remote_filesystemrename)_BaseExamplesIterable)convert_file_size_to_intc                   @   s$   e Zd ZU dZdZeej ed< dS )SparkConfigzBuilderConfig for Spark.Nfeatures)	__name__
__module____qualname____doc__r   r   datasetsZFeatures__annotations__ r   r   \/var/www/html/lang_env/lib/python3.10/site-packages/datasets/packaged_modules/spark/spark.pyr      s   
 r   dfpyspark.sql.DataFramenew_partition_orderc                 C   sP   |  dd|d  }|dd  D ]}|  dd| }||}q|S )N*z
part_id = r      )selectwhereunion)r   r   Zdf_combinedZpartition_idpartition_dfr   r   r   _reorder_dataframe_by_partition"   s
   r#   partition_orderc                    s   dd l  fdd}|S )Nr   c                  3   s      djj d} t| }d}|jdd}d}|D ]%}| }|d }|d ||kr6|}d}| d| |fV  |d7 }qd S )	Nr   part_idr   T)ZprefetchPartitions_r   )	r   sql	functionsZspark_partition_idaliasr#   ZtoLocalIteratorasDictpop)Zdf_with_partition_idr"   Zrow_idrowsZcurr_partitionrowZrow_as_dictr%   r   r$   pysparkr   r   generate_fn0   s    


z0_generate_iterable_examples.<locals>.generate_fn)r0   )r   r$   r1   r   r/   r   _generate_iterable_examples*   s   r2   c                   @   sb   e Zd Z	d	dddZdd Zdejjd	d fd
dZde	de	d	d fddZ
ed	e	fddZdS )SparkExamplesIterableNr   r   c                 C   s0   || _ |pt| j j | _t| j | j| _d S N)r   rangerddgetNumPartitionsr$   r2   generate_examples_fn)selfr   r$   r   r   r   __init__E   s   zSparkExamplesIterable.__init__c                 c   s    |   E d H  d S r4   )r8   r9   r   r   r   __iter__N   s   zSparkExamplesIterable.__iter__	generatorreturnc                 C   s,   t t| jj }|| t| j|dS N)r$   )listr5   r   r6   r7   shuffler3   )r9   r=   r$   r   r   r   shuffle_data_sourcesQ   s   
z*SparkExamplesIterable.shuffle_data_sources	worker_idnum_workersc                 C   s   |  ||}t| j|dS r?   )Zsplit_shard_indices_by_workerr3   r   )r9   rC   rD   r$   r   r   r   shard_data_sourcesV   s   z(SparkExamplesIterable.shard_data_sourcesc                 C   
   t | jS r4   )lenr$   r;   r   r   r   n_shardsZ   s   
zSparkExamplesIterable.n_shardsr4   )r   r   )r   r   r   r:   r<   nprandom	GeneratorrB   intrE   propertyrH   r   r   r   r   r3   D   s    
	r3   c                       s   e Zd ZeZ		ddddedef fddZdd	 Zd
d Zde	j
jjfddZdd Zdedededeeeeeeef f  fddZ			d dddedeeeef  dee fddZdddefddZ  ZS )!SparkNr   r   	cache_dirworking_dirc                    sJ   dd l }|jjj | _|| _|| _t j	d|t
| j d| d S )Nr   )rO   Zconfig_namer   )r0   r(   ZSparkSessionbuilderZgetOrCreate_sparkr   _working_dirsuperr:   strZsemanticHash)r9   r   rO   rP   Zconfig_kwargsr0   	__class__r   r   r:   b   s   
zSpark.__init__c                    sl   | j   fdd}| jjdddrd S | j r2| jjtdd|	 }t
j|d r2d S td)	Nc                    s6   t j dd t j dt j }t|d |gS )NT)exist_okZfs_testa)osmakedirspathjoinuuiduuid4hexopen)contextZ
probe_filerO   r   r   create_cache_and_write_probe{   s   
z?Spark._validate_cache_dir.<locals>.create_cache_and_write_probezspark.master localr   r   ztWhen using Dataset.from_spark on a multi-node cluster, the driver and all workers should be able to access cache_dir)Z
_cache_dirrR   confget
startswithsparkContextparallelizer5   ZmapPartitionscollectrZ   r\   isfile
ValueError)r9   rd   Zprober   rc   r   _validate_cache_diru   s   
zSpark._validate_cache_dirc                 C   s   t j| jjdS )N)r   )r   ZDatasetInfoconfigr   r;   r   r   r   _info   s   zSpark._info
dl_managerc                 C   s   t jt jjdgS )N)name)r   ZSplitGeneratorZSplitZTRAIN)r9   rr   r   r   r   _split_generators   s   zSpark._split_generatorsc           	      C   s   dd l }dd }| j }|dkr|nd}| j|d|d|jj	d
d d j| }|| }||krNt|t|| }| j|| _d S d S )	Nr   c                 s   s&    | D ]}t jd|jgiV  qd S )Nbatch_bytes)paRecordBatchZfrom_pydictnbytes)itbatchr   r   r   get_arrow_batch_size   s   z=Spark._repartition_df_if_needed.<locals>.get_arrow_batch_sized   r   zbatch_bytes: longru   sample_bytes)r0   r   countlimitZrepartition
mapInArrowaggr(   r)   sumr*   rl   r}   minrL   )	r9   max_shard_sizer0   r{   Zdf_num_rowsZsample_num_rowsZapprox_bytes_per_rowZapprox_total_sizeZnew_num_partitionsr   r   r   _repartition_df_if_needed   s&   

	zSpark._repartition_df_if_neededfpathfile_formatr   r>   c              	   #   s   dd l |dkrtnt| jrtj| jtjn|dk | jj	| j
| jj f	dd}| j|ddjjddjjd	d
jjd	djjdd }|D ]}|j|j|j|j|jffV  qod S )Nr   Zparquetc                 3   s      }t| d }|d u r tjj|gdgdggg ddS d}d|dd|d d}tj|g}|	| | D ]L}d ur|j
kr| \}}|  tjj|g|g|ggg ddV  |d7 }|jd|dd|d d}tj|g}|	| qE|j
dkr| \}}|  tjj|g|g|ggg ddV  krttjD ]}	tjtjtj|	}
t|	|
 qd S d S )	Nr   )task_idnum_examples	num_bytes)namesSSSSS05dTTTTT)r   r\   writer_batch_sizestorage_optionsembed_local_filesr   )ZTaskContextZtaskAttemptIdnextrv   rw   Zfrom_arraysreplaceTableZfrom_batchesZwrite_tableZ
_num_bytesfinalizecloseZ	_featuresrZ   listdirr\   dirnamer]   basenameshutilmove)ry   r   Zfirst_batchshard_idwritertablerz   r   r   filedest	r   r   r   r   r0   r   Zworking_fpathr   Zwriter_classr   r   write_arrow   sb   


z0Spark._prepare_split_single.<locals>.write_arrowz2task_id: long, num_examples: long, num_bytes: longr   r   total_num_examplesr   total_num_bytes
num_shardsshard_lengths)r0   r
   r	   rS   rZ   r\   r]   r   rp   r   Z_writer_batch_size_fsr   r   r   ZgroupByr   r(   r)   r   r*   r~   Zcollect_listrl   r   r   r   r   r   )r9   r   r   r   r   statsr.   r   r   r   _prepare_split_single   s,   "5zSpark._prepare_split_singlearrowsplit_generatorzdatasets.SplitGeneratornum_procc                    s  |    t|pt}| | t| j }|rtjjnt	j}d}| j
 d|j
 | d| }	|| j|	d}
d}dg }g }| ||D ]&\}}|\}}}}|dkrk|
|7 }
||7 }|7 |||f || qE|
|j_||j_td d dkr||j_| jdtd	td
tffdd g }d}tt|D ]}|| \}}t|D ]}||||g |d7 }qq| jj|t| fdd  d S d}|d d }| d|dd|d|d d S )Nz-TTTTT-SSSSS-of-NNNNN-.r   z	Renaming z shards.r   r   r   global_shard_idc                    s@   t  d|dd| d d|ddd d S )Nr   r   r   zTTTTT-SSSSSZNNNNN)r   r   )r   r   r   )r   fstotal_shardsr   r   _rename_shard=  s
   z+Spark._prepare_split.<locals>._rename_shardc                    s    |  S r4   r   )args)r   r   r   <lambda>O  s    z&Spark._prepare_split.<locals>.<lambda>r   r   r   re   )ro   r   r   r   r   r   rZ   r\   r]   	posixpathrs   Z_output_dirr   appendextendZ
split_infor   r   loggerdebugr   rL   r5   rG   rR   rj   rk   maprl   Z_renamer   )r9   r   r   r   r   kwargsis_localZ	path_joinZSUFFIXfnamer   r   Ztask_id_and_num_shardsZall_shard_lengthsr   contentr   r   r   r   r   r   ir   r   )r   r   r   r   r   _prepare_split	  sn   


*
zSpark._prepare_splitc                 C   rF   r4   )r3   r   )r9   r   r   r   r    _get_examples_iterable_for_splitY  s   
z&Spark._get_examples_iterable_for_split)NN)r   NN)r   r   r   r   ZBUILDER_CONFIG_CLASSrU   r:   ro   rq   r   downloadZdownload_managerZDownloadManagerrt   r   rL   r   r   boolr   tupler   r   r   r3   r   __classcell__r   r   rV   r   rN   _   sR    !
W
PrN   )+rZ   r   r^   dataclassesr   typingr   r   r   r   r   r   numpyrI   Zpyarrowrv   r   Zdatasets.arrow_writerr	   r
   Zdatasets.configr   Zdatasets.filesystemsr   r   Zdatasets.iterable_datasetr   Zdatasets.utils.py_utilsr   utilsloggingZ
get_loggerr   r   r0   ZBuilderConfigr   rL   r#   r2   r3   ZDatasetBuilderrN   r   r   r   r   <module>   s4     
