o
    ZhT                  	   @   sH  d Z ddlZddlZddlZddlZddlZddlZddlZddlm	Z	 ddl
mZmZmZmZmZmZmZmZmZmZmZmZ ddlmZmZ ddlmZ ddlmZmZ ddlm Z m!Z!m"Z"m#Z#m$Z$m%Z% erxddl&Z'ddl(m)  m*  m+Z, e-e.Z/d	Z0d
Z1dZ2d+ddZ3G dd dZ4e4 Z5e4 Z6G dd de7e	Z8G dd de!Z9G dd de!Z:G dd dZ;dee7 de<deee7 eej= f fddZ>ddde7fd d!Z?d"ed dee7ddf fd#d$Z@eeAe4f ZBe7ZCG d%d& d&ZDeDZEe7ZFG d'd( d(e9e:eeEeFf ZGee7eef ZHeAZIG d)d* d*e9e:eeHeIf ZJeGZKeJZLeDZMdS ),z*A common module for NVIDIA Riva Runnables.    N)Enum)TYPE_CHECKINGAnyAsyncGeneratorAsyncIteratorDict	GeneratorIteratorListOptionalTupleUnioncast)
AnyMessageBaseMessage)PromptValue)RunnableConfigRunnableSerializable)
AnyHttpUrl	BaseModelFieldparse_obj_asroot_validator	validator      ?i  )
.!?   ¡   ¿returnriva.clientc               
   C   s4   zddl } W | jS  ty } ztd|d}~ww )z5Import the riva client and raise an error on failure.r   NziCould not import the NVIDIA Riva client library. Please install it with `pip install nvidia-riva-client`.)riva.clientImportErrorclient)rivaerr r(   `/var/www/html/lang_env/lib/python3.10/site-packages/langchain_community/utilities/nvidia_riva.py_import_riva_client1   s   
r*   c                   @   s   e Zd ZdZdS )	SentinelTzAn empty Sentinel type.N)__name__
__module____qualname____doc__r(   r(   r(   r)   r+   >   s    r+   c                   @   sL   e Zd ZdZdZdZdZdZdZdZ	e
ded	d fd
dZedddZdS )RivaAudioEncodinga  An enum of the possible choices for Riva audio encoding.

    The list of types exposed by the Riva GRPC Protobuf files can be found
    with the following commands:
    ```python
    import riva.client
    print(riva.client.AudioEncoding.keys())  # noqa: T201
    ```
    ALAWENCODING_UNSPECIFIEDFLAC
LINEAR_PCMMULAWOGGOPUSformat_coder!   c              
   C   sB   z| j | j| jd| W S  ty  } ztd| |d}~ww )zReturn the audio encoding specified by the format code in the wave file.

        ref: https://mmsp.ece.mcgill.ca/Documents/AudioFormats/WAVE/WAVE.html
        )         z>The following wave file format code is not supported by Riva: N)r4   r1   r5   KeyErrorNotImplementedError)clsr7   r'   r(   r(   r)   from_wave_format_codeX   s   z'RivaAudioEncoding.from_wave_format_coderiva.client.AudioEncodingc                 C   s   t  }t|j| S )z-Returns the Riva API object for the encoding.)r*   getattrZAudioEncodingselfriva_clientr(   r(   r)   riva_pb2f   s   zRivaAudioEncoding.riva_pb2N)r!   r?   )r,   r-   r.   r/   r1   r2   r3   r4   r5   r6   classmethodintr>   propertyrD   r(   r(   r(   r)   r0   F   s    
r0   c                   @   s   e Zd ZU dZeeddddgdZeeef e	d< eddd	Z
ee e	d
< edddZeddddededefddZdS )RivaAuthMixinzBConfiguration for the authentication to a Riva service connection.zhttp://localhost:50051z1The full URL where the Riva service can be found.z"https://user@pass:riva.example.com)descriptionZexamplesurlNz@A full path to the file where Riva's public ssl key can be read.rI   ssl_certr!   riva.client.Authc                 C   sB   t  }tt| j}|jdk}t| jdd }|j| j||dS )z!Return a riva client auth object.https/   )rL   use_ssluri)	r*   r   r   rJ   schemestrsplitZAuthrL   )rB   rC   rJ   rQ   Zurl_no_schemer(   r(   r)   authz   s   
zRivaAuthMixin.authT)preZallow_reusevalc                 C   s$   t |trtttt|S tt|S )z:Do some initial conversations for the URL before checking.)
isinstancerT   r   r   r   )r=   rX   r(   r(   r)   _validate_url   s   

zRivaAuthMixin._validate_url)r!   rM   )r,   r-   r.   r/   r   r   rJ   r   rT   __annotations__rL   r   rG   rV   r   rE   r   rZ   r(   r(   r(   r)   rH   m   s    
 
rH   c                   @   sP   e Zd ZU dZeejddZeed< edddZ	e
ed< edd	dZeed
< dS )RivaCommonConfigMixinz%A collection of common Riva settings.z!The encoding on the audio stream.)defaultrI   encodingi@  z*The sample rate frequency of audio stream.sample_rate_hertzzen-USzaThe [BCP-47 language code](https://www.rfc-editor.org/rfc/bcp/bcp47.txt) for the target language.language_codeN)r,   r-   r.   r/   r   r0   r4   r^   r[   r_   rF   r`   rT   r(   r(   r(   r)   r\      s   
 r\   c                   @   sf   e Zd ZU dZejed< ejed< dddZddd	Z	dd
dZ
defddZdddZdddZdS )_Eventz3A combined event that is threadsafe and async safe._event_aeventr!   Nc                 C   s   t  | _t | _dS )zInitialize the event.N)	threadingEventrb   asynciorc   rB   r(   r(   r)   __init__      
z_Event.__init__c                 C      | j   | j  dS zSet the event.N)rb   setrc   rg   r(   r(   r)   rl      ri   z
_Event.setc                 C   rj   rk   )rb   clearrc   rg   r(   r(   r)   rm      ri   z_Event.clearc                 C   
   | j  S )zIndicate if the event is set.)rb   is_setrg   r(   r(   r)   ro      s   
z_Event.is_setc                 C   s   | j   dS )zWait for the event to be set.N)rb   waitrg   r(   r(   r)   rp      s   z_Event.waitc                    s   | j  I dH  dS )z#Async wait for the event to be set.N)rc   rp   rg   r(   r(   r)   
async_wait   s   z_Event.async_waitr!   N)r,   r-   r.   r/   rd   re   r[   rf   rh   rl   rm   boolro   rp   rq   r(   r(   r(   r)   ra      s   
 





ra   output_directorysample_ratec                 C   sr   | r7t jddd| d}|j}W d   n1 sw   Y  t|d}|d |d || ||fS d	S )
zECreate a new wave file and return the wave write object and filename.bxz.wavF)modesuffixdeletedirNwbr8   rP   )NN)tempfileNamedTemporaryFilenamewaveopenZsetnchannelsZsetsampwidthZsetframerate)rt   ru   fwav_file_namewav_filer(   r(   r)   _mk_wave_file   s   


r   rX   TTSInputTypec                 C   s.   t | tr	|  S t | trt| jS t| S )zAttempt to coerce the input value to a string.

    This is particularly useful for converting LangChain message to strings.
    )rY   r   Z	to_stringr   rT   contentrX   r(   r(   r)   _coerce_string   s
   


r   inputsc                 c   s    d}| D ]D}t |}tD ]}||v r(||d\}}|| | V  d}||v sq||7 }t|tkrItdt|tD ]}|||d  V  q;d}q|rQ|V  dS dS )z9Filter the input chunks are return strings ready for TTS. r8   r      N)r   _SENTENCE_TERMINATORSrU   len_MAX_TEXT_LENGTHrange)r   bufferchunk
terminatorZlast_sentenceidxr(   r(   r)   _process_chunks   s(   
r   c                   @   sZ  e Zd ZU dZejed< ejed< ejed< e	ed< e	ed< e	ed< e
ej ed< d)d
eddfddZdeeddf fddZdee fddZedefddZedefddZedefddZedefddZd*dede
e ddfddZd*dede
e ddfdd Zd*de
e ddfd!d"Zd*de
e ddfd#d$Zd%ed& ddfd'd(ZdS )+AudioStreamz%A message containing streaming audio.	_put_lock_queueoutputhangupuser_talking
user_quiet_workerr   maxsizer!   Nc                 C   sD   t  | _tj|d| _t | _t | _t | _	t | _
d| _dS )zInitialize the queue.)r   N)rd   Lockr   queueQueuer   r   ra   r   r   r   r   )rB   r   r(   r(   r)   rh     s   


zAudioStream.__init__c                 c   sJ    	 z	| j dt}W n
 tjy   Y qw |tkrdS |V  | j   q)zReturn an error.TN)r   get_QUEUE_GET_TIMEOUTr   EmptyHANGUP	task_donerB   Znext_valr(   r(   r)   __iter__  s   
zAudioStream.__iter__c                 C  sZ   	 zt  d| jjdtI dH }W n
 tjy   Y qw |tkr$dS |V  | j	  q)z4Iterate through all items in the queue until HANGUP.TN)
rf   get_event_looprun_in_executorr   r   r   r   r   r   r   r   r(   r(   r)   	__aiter__&  s   
zAudioStream.__aiter__c                 C   rn   )z(Indicate if the audio stream has hungup.)r   ro   rg   r(   r(   r)   hungup9     
zAudioStream.hungupc                 C   rn   )z-Indicate in the input stream buffer is empty.)r   emptyrg   r(   r(   r)   r   >  r   zAudioStream.emptyc                 C   s4   | j o| j}| jduo| j  o| j }|o|S )z;Indicate if the audio stream has hungup and been processed.N)r   r   r   is_aliver   )rB   Z
input_doneZoutput_doner(   r(   r)   completeC  s   

zAudioStream.completec                 C   s   | j r| j  S dS )z&Indicate if the ASR stream is running.F)r   r   rg   r(   r(   r)   runningN  s   
zAudioStream.runningitemtimeoutc                 C   s\   | j ! | jrtd|tu r| j  | jj||d W d   dS 1 s'w   Y  dS )zPut a new item into the queue.z?The audio stream has already been hungup. Cannot put more data.r   N)r   r   RuntimeErrorr   r   rl   r   put)rB   r   r   r(   r(   r)   r   U  s   
"zAudioStream.putc                    s,   t  }t |d| j||I dH  dS )z$Async put a new item into the queue.N)rf   r   wait_forr   r   )rB   r   r   loopr(   r(   r)   aput`  s   "zAudioStream.aputc                 C   s   |  t| dS )zSend the hangup signal.N)r   r   rB   r   r(   r(   r)   closee  s   zAudioStream.closec                    s   |  t|I dH  dS )zAsync send the hangup signal.N)r   r   r   r(   r(   r)   aclosei     zAudioStream.aclose	responseszrasr.StreamingRecognizeResponsec                    sZ   j rtdtjddd d fdd}tj|d	_d
j_j     dS )zIDrain the responses from the provided iterator and put them into a queue.z,An ASR instance has already been registered.rP   r   r   r!   Nc                     s       D ]<} | jsq| jD ]2}|jsq|jr2j  j  tt	|jd j
}j| qj sAj  j  qqdS )zConsume the ASR Generator.r   N)rp   resultsZalternativesis_finalr   rm   r   rl   r   rT   
transcriptr   r   ro   )responseresultr   Zhas_startedr   rB   r(   r)   workert  s$   





z$AudioStream.register.<locals>.worker)targetTrr   )	r   r   rd   BarrierThreadr   daemonstartrp   )rB   r   r   r(   r   r)   registerm  s   
zAudioStream.register)r   N) r,   r-   r.   r/   rd   r   r[   r   r   ra   r   r   rF   rh   r   bytesr   r   StreamInputTyper   rG   rs   r   r   r   r   r   r   r   r   r	   r   r(   r(   r(   r)   r      s2   
 




r   c                	   @   s   e Zd ZU dZdZeed< dZeed< edddZ	e
ed	< ed
ddZeed< ed
ddZeed< ed
dedeeef deeef fddZed ddZd!ddZ	d"dedee dedefddZdS )#RivaASRzNA runnable that performs Automatic Speech Recognition (ASR) using NVIDIA Riva.Znvidia_riva_asrr~   zA Runnable for converting audio bytes to a string.This is useful for feeding an audio stream into a chain andpreprocessing that audio to create an LLM prompt.rI   r8   z7The number of audio channels in the input audio stream.rK   audio_channel_countTz\Controls whether or not Riva should attempt to filter profanity out of the transcribed text.profanity_filterz]Controls whether Riva should attempt to correct senetence puncuation in the transcribed text.enable_automatic_punctuationrW   valuesr!   c                 C   
   t  }|S z4Validate the Python environment and input arguments.r*   r=   r   _r(   r(   r)   _validate_environment     zRivaASR._validate_environment&riva.client.StreamingRecognitionConfigc                 C   s4   t  }|jd|j| j| j| jd| j| j| jddS )z)Create and return the riva config object.Tr8   )r^   r_   r   Zmax_alternativesr   r   r`   )Zinterim_resultsconfig)	r*   ZStreamingRecognitionConfigZRecognitionConfigr^   r_   r   r   r   r`   rA   r(   r(   r)   r     s   zRivaASR.configriva.client.ASRServicec              
   C   8   t  }z|| jW S  ty } ztd|d}~ww );Connect to the riva service and return the a client object.z5Error raised while connecting to the Riva ASR server.N)r*   Z
ASRServicerV   	Exception
ValueErrorrB   rC   r'   r(   r(   r)   _get_service     zRivaASR._get_serviceNinputr   kwargsc                 K   s   |j s|  }|j|| jd}|| g }|jsl|jj |jjd}W d   n1 s/w   Y  |ri|j	 sZz
||j
 g7 }W n
 tjyO   Y q6w |j  |j	 r;tdt| d| S |jrdS )z3Transcribe the audio bytes into a string with Riva.)Zaudio_chunksZstreaming_configg?NzRiva ASR returning: %s r   )r   r   Zstreaming_response_generatorr   r   r   r   	not_emptyrp   r   
get_nowaitr   r   r   _LOGGERdebugreprjoinstrip)rB   r   r   r   servicer   Zfull_responsereadyr(   r(   r)   invoke  s2   




zRivaASR.invoke)r!   r   )r!   r   r   )r,   r-   r.   r/   r~   rT   r[   rI   r   r   rF   r   rs   r   r   rE   r   r   r   rG   r   r   ASRInputTyper   r   ASROutputTyper   r(   r(   r(   r)   r     s@   
 
&
r   c                   @   s0  e Zd ZU dZdZeed< dZeed< edddZ	eed	< ed
ddZ
ee ed< eddedeeef deeef fddZedededefddZd"ddZ	
d#dedee dedefddZ	
d#dee dee dee dee fddZ	
d#dee dee dee deed
f fd d!Zd
S )$RivaTTSz?A runnable that performs Text-to-Speech (TTS) with NVIDIA Riva.Znvidia_riva_ttsr~   z_A tool for converting text to speech.This is useful for converting LLM output into audio bytes.rI   zEnglish-US.Female-1zThe voice model in Riva to use for speech. Pre-trained models are documented in [the Riva documentation](https://docs.nvidia.com/deeplearning/riva/user-guide/docs/tts/tts-overview.html).rK   
voice_nameNzThe directory where all audio files should be saved. A null value indicates that wave files should not be saved. This is useful for debugging purposes.rt   Tr   r   r!   c                 C   r   r   r   r   r(   r(   r)   r      r   zRivaTTS._validate_environmentvc                 C   s,   |rt |}|jddd t| S |S )NT)parentsexist_ok)pathlibPathmkdirrT   absolute)r=   r   dirpathr(   r(   r)   _output_directory_validator'  s
   
z#RivaTTS._output_directory_validator"riva.client.SpeechSynthesisServicec              
   C   r   )r   z5Error raised while connecting to the Riva TTS server.N)r*   ZSpeechSynthesisServicerV   r   r   r   r(   r(   r)   r   0  r   zRivaTTS._get_servicer   r   r   c                 K   s   d | t|gS )zDPerform TTS by taking a string and outputting the entire audio file.    )r   	transformiter)rB   r   r   r   r(   r(   r)   r   :  s   zRivaTTS.invokec                 k   s    |   }t| j| j\}}t|D ],}td| |j|| j| j	| j
j| jd}|D ]}	tt|	j}
|r:||
 |
V  q+q|rM|  td| dS dS )zHPerform TTS by taking a stream of characters and streaming output bytes.zRiva TTS chunk: %s)textr   r`   r^   Zsample_rate_hzzRiva TTS wrote file: %sN)r   r   rt   r_   r   r   r   Zsynthesize_onliner   r`   r^   rD   r   r   audioZwriteframesrawr   )rB   r   r   r   r   r   r   r   r   respr  r(   r(   r)   r  C  s0   	
zRivaTTS.transformc           	        s   t  t t  dfdd}dtt ffdddfdd d fd	d
}| }| }	 zt  dI dH }W n t j	j
yV   Y q=w   |tu r`n|V  q>|I dH  |I dH  dS )zGIntercept async transforms and route them to the synchronous transform.r!   Nc                     s.    2 z3 dH W }  |  q6  t dS )z#Produce input into the input queue.N)
put_nowait_TRANSFORM_ENDr   )r   input_queuer(   r)   	_produceru  s
   z%RivaTTS.atransform.<locals>._producerc                  3   s>    	 z j dd} W n
 tjy   Y qw | tkrdS | V  q)zIterate over the input_queue.Tr   r   N)r   r   r   r  r   )r  r(   r)   _input_iterator{  s   z+RivaTTS.atransform.<locals>._input_iteratorc                     s*      D ]} |  qt dS )z!Consume the input with transform.N)r  r  r  r   )r
  	out_queuerB   r(   r)   	_consumer  s   z%RivaTTS.atransform.<locals>._consumerc                      s    d I dH  dS )z"Coroutine that wraps the consumer.N)r   r(   )r  r   r(   r)   _consumer_coro  r   z*RivaTTS.atransform.<locals>._consumer_coroTr   rr   )rf   get_running_loopr   r   r	   r   create_taskr   r   
exceptionsTimeoutErrorr   r  )	rB   r   r   r   r	  r  ZproducerZconsumerrX   r(   )r  r
  r   r  r   r  rB   r)   
atransformj  s.   
zRivaTTS.atransform)r!   r   r   )r,   r-   r.   r/   r~   rT   r[   rI   r   r   rt   r   r   rE   r   r   r   r   r   r   r   r   TTSOutputTyper   r	   r  r   r   r  r(   r(   r(   r)   r      sd   
 
		&


*
r   )r!   r"   )Nr/   rf   loggingr   r   r|   rd   r   enumr   typingr   r   r   r   r   r   r	   r
   r   r   r   r   Zlangchain_core.messagesr   r   Zlangchain_core.prompt_valuesr   Zlangchain_core.runnablesr   r   Zpydanticr   r   r   r   r   r   r#   r&   Zriva.client.proto.riva_asr_pb2r%   protoZriva_asr_pb2Zrasr	getLoggerr,   r   r   r   r   r*   r+   r   r  rT   r0   rH   r\   ra   floatZ
Wave_writer   r   r   r   r   ZStreamOutputTyper   r   r   r   r   r  r   ZNVIDIARivaASRZNVIDIARivaTTSZNVIDIARivaStreamr(   r(   r(   r)   <module>   sx    8 	

'!"
  

h

 $