o
    Zh`                     @  sR   d dl mZ d dlmZmZmZmZmZ er d dlm	Z	m
Z
mZ G dd dZdS )    )annotations)TYPE_CHECKINGAnyIterableListOptional)	DataFrameRowSparkSessionc                   @  s   e Zd ZdZ						d9d:ddZe	d;d<ddZd=ddZd=ddZd>dd Z	d;d?d"d#Z
d>d$d%Zd@d)d*ZdAd.d/ZdBdCd3d4Zd;d?d5d6ZdBdCd7d8ZdS )DSparkSQLz;SparkSQL is a utility class for interacting with Spark SQL.N   spark_sessionOptional[SparkSession]catalogOptional[str]schemaignore_tablesOptional[List[str]]include_tablessample_rows_in_table_infointc           
      C  s"  zddl m} W n ty   tdw |r|n|j | _|dur)| jj| |dur4| jj| t	| 
 | _|rAt	|nt	 | _| jrX| j| j }|rXtd| d|r^t	|nt	 | _| jru| j| j }|rutd| d|  }	|	rt	|	n| j| _t|tstd|| _dS )	a  Initialize a SparkSQL object.

        Args:
            spark_session: A SparkSession object.
              If not provided, one will be created.
            catalog: The catalog to use.
              If not provided, the default catalog will be used.
            schema: The schema to use.
              If not provided, the default schema will be used.
            ignore_tables: A list of tables to ignore.
              If not provided, all tables will be used.
            include_tables: A list of tables to include.
              If not provided, all tables will be used.
            sample_rows_in_table_info: The number of rows to include in the table info.
              Defaults to 3.
        r   r
   Fpyspark is not installed. Please install it with `pip install pyspark`Nzinclude_tables  not found in databasezignore_tables z,sample_rows_in_table_info must be an integer)pyspark.sqlr
   ImportErrorbuildergetOrCreate_sparkr   ZsetCurrentCatalogZsetCurrentDatabaseset_get_all_table_names_all_tables_include_tables
ValueError_ignore_tablesget_usable_table_namesZ_usable_tables
isinstancer   	TypeError_sample_rows_in_table_info)
selfr   r   r   r   r   r   r
   missing_tablesZusable_tables r+   ^/var/www/html/lang_env/lib/python3.10/site-packages/langchain_community/utilities/spark_sql.py__init__   sB   



zSparkSQL.__init__database_uristrengine_argsOptional[dict]kwargsr   returnc                 K  sH   zddl m} W n ty   tdw |j| }| |fi |S )zzCreating a remote Spark Session via Spark connect.
        For example: SparkSQL.from_uri("sc://localhost:15002")
        r   r   r   )r   r
   r   r   remoter   )clsr.   r0   r2   r
   Zsparkr+   r+   r,   from_uriK   s   zSparkSQL.from_uriIterable[str]c                 C  s   | j r| j S t| j| j S )zGet names of tables available.)r"   sortedr!   r$   )r)   r+   r+   r,   r%   \   s   zSparkSQL.get_usable_table_namesc                 C  s(   | j dd }ttdd |S )NzSHOW TABLES	tableNamec                 S     | j S N)r9   )rowr+   r+   r,   <lambda>e       z/SparkSQL._get_all_table_names.<locals>.<lambda>)r   sqlselectcollectlistmap)r)   rowsr+   r+   r,   r    c   s   zSparkSQL._get_all_table_namestablec                 C  s6   | j d|  d j}|d}|d | d S )NzSHOW CREATE TABLE r   ZUSING;)r   r?   rA   Zcreatetab_stmtfind)r)   rE   Z	statementZusing_clause_indexr+   r+   r,   _get_create_table_stmtg   s   
zSparkSQL._get_create_table_stmttable_namesc                 C  s   |   }|d urt||}|rtd| d|}g }|D ]"}| |}| jr<|d7 }|d| | d7 }|d7 }|| qd|}|S )Nztable_names r   z

/*
z*/z

)	r%   r   
differencer#   rH   r(   _get_sample_spark_rowsappendjoin)r)   rI   Zall_table_namesr*   ZtablesZ
table_nameZ
table_infoZ	final_strr+   r+   r,   get_table_infoo   s    

zSparkSQL.get_table_infoc                 C  s   d| d| j  }| j|}dttdd |jj}z| |}ddd |D }W n t	y9   d	}Y nw | j  d
| d| d| S )NzSELECT * FROM z LIMIT 	c                 S  r:   r;   )name)fr+   r+   r,   r=      r>   z1SparkSQL._get_sample_spark_rows.<locals>.<lambda>rJ   c                 S  s   g | ]}d  |qS )rP   )rN   ).0r<   r+   r+   r,   
<listcomp>   s    z3SparkSQL._get_sample_spark_rows.<locals>.<listcomp> z rows from z table:
)
r(   r   r?   rN   rB   rC   r   fields_get_dataframe_results	Exception)r)   rE   querydfZcolumns_strZsample_rowsZsample_rows_strr+   r+   r,   rL      s   
zSparkSQL._get_sample_spark_rowsr<   r	   tuplec                 C  s   t tt|  S r;   )r[   rC   r/   asDictvalues)r)   r<   r+   r+   r,   _convert_row_as_tuple   s   zSparkSQL._convert_row_as_tuplerZ   r   rB   c                 C  s   t t| j| S r;   )rB   rC   r^   rA   )r)   rZ   r+   r+   r,   rW      s   zSparkSQL._get_dataframe_resultsallcommandfetchc                 C  s,   | j |}|dkr|d}t| |S )None   )r   r?   limitr/   rW   )r)   r`   ra   rZ   r+   r+   r,   run   s   
zSparkSQL.runc              
   C  s>   z|  |W S  ty } z	 d| W  Y d}~S d}~ww )af  Get information about specified tables.

        Follows best practices as specified in: Rajkumar et al, 2022
        (https://arxiv.org/abs/2204.00498)

        If `sample_rows_in_table_info`, the specified number of sample rows will be
        appended to each table description. This can increase performance as
        demonstrated in the paper.
        Error: N)rO   r#   )r)   rI   er+   r+   r,   get_table_info_no_throw   s   
z SparkSQL.get_table_info_no_throwc              
   C  s@   z|  ||W S  ty } z	 d| W  Y d}~S d}~ww )a*  Execute a SQL command and return a string representing the results.

        If the statement returns rows, a string of the results is returned.
        If the statement returns no rows, an empty string is returned.

        If the statement throws an error, the error message is returned.
        rf   N)re   rX   )r)   r`   ra   rg   r+   r+   r,   run_no_throw   s   zSparkSQL.run_no_throw)NNNNNr   )r   r   r   r   r   r   r   r   r   r   r   r   r;   )r.   r/   r0   r1   r2   r   r3   r   )r3   r7   )rE   r/   r3   r/   )rI   r   r3   r/   )r<   r	   r3   r[   )rZ   r   r3   rB   )r_   )r`   r/   ra   r/   r3   r/   )__name__
__module____qualname____doc__r-   classmethodr6   r%   r    rH   rO   rL   r^   rW   re   rh   ri   r+   r+   r+   r,   r   	   s,    ?





r   N)
__future__r   typingr   r   r   r   r   r   r   r	   r
   r   r+   r+   r+   r,   <module>   s
    