o
    Zh#!                     @   s   d dl mZ d dlmZ d dlmZ d dlmZ d dlm	Z	 e	r4d dl
mZ d dl
mZ d dlmZmZ G d	d
 d
eZdd Zdd Zdd ZG dd deZG dd deZdS )    )configure_scope)Hub)Integration)capture_internal_exceptions)TYPE_CHECKING)Any)Optional)EventHintc                   @   s   e Zd ZdZedd ZdS )SparkIntegrationZsparkc                   C   s
   t   d S N)patch_spark_context_init r   r   a/var/www/html/lang_env/lib/python3.10/site-packages/sentry_sdk/integrations/spark/spark_driver.py
setup_once   s   
zSparkIntegration.setup_onceN)__name__
__module____qualname__
identifierstaticmethodr   r   r   r   r   r      s    r   c                  C   s:   ddl m}  | j}|r|d|j |d|j dS dS )z
    Set properties in driver that propagate to worker processes, allowing for workers to have access to those properties.
    This allows worker integration to have access to app_name and application_id.
    r   SparkContextZsentry_app_nameZsentry_application_idN)pysparkr   Z_active_spark_contextZsetLocalPropertyappNameapplicationId)r   Zspark_contextr   r   r   _set_app_properties   s   r   c                 C   s4   ddl m} | j}|| t }| j | dS )zA
    Start java gateway server to add custom `SparkListener`
    r   )ensure_callback_server_startedN)Zpyspark.java_gatewayr   Z_gatewaySentryListenerZ_jscscZaddSparkListener)r   r   ZgwZlistenerr   r   r   _start_sentry_listener(   s
   r   c                     s(   ddl m}  | j  fdd}|| _d S )Nr   r   c                    sv    g|R i |}t jtd u r|S t  t  t }|j fdd}W d    |S 1 s4w   Y  |S )Nc                    s:  t   tjtd u r| W  d    S | di d   | di d jd | d d jd | d d jd	 | d d
 jd | d d j	 | d d j
 | d d j | d d j | d d j | di d j W d    | S 1 sw   Y  | S )Nuseridtagszexecutor.idzspark.executor.idzspark-submit.deployModezspark.submit.deployModezdriver.hostzspark.driver.hostzdriver.portzspark.driver.portZspark_versionZapp_nameZapplication_idmasterZ
spark_homeextraZweb_url)r   r   currentget_integrationr   
setdefaultZ	sparkUserZ_confgetversionr   r   r#   Z	sparkHomeZuiWebUrl)eventhintselfr   r   process_eventG   s8   

z[patch_spark_context_init.<locals>._sentry_patched_spark_context_init.<locals>.process_event)r   r%   r&   r   r   r   r   Zadd_event_processor)r-   argskwargsinitscoper.   Zspark_context_initr,   r   "_sentry_patched_spark_context_init;   s   
""zDpatch_spark_context_init.<locals>._sentry_patched_spark_context_init)r   r   Z_do_init)r   r4   r   r3   r   r   5   s   
.r   c                   @   s   e Zd Zdd Zdd Zdd Zdd Zd	d
 Zdd Zdd Z	dd Z
dd Zdd Zdd Zdd Zdd Zdd Zdd Zdd  Zd!d" Zd#d$ Zd%d& Zd'd( Zd)d* Zd+d, Zd-d. Zd/d0 ZG d1d2 d2Zd3S )4SparkListenerc                 C      d S r   r   )r-   ZapplicationEndr   r   r   onApplicationEndm      zSparkListener.onApplicationEndc                 C   r6   r   r   )r-   ZapplicationStartr   r   r   onApplicationStartq   r8   z SparkListener.onApplicationStartc                 C   r6   r   r   )r-   ZblockManagerAddedr   r   r   onBlockManagerAddedu   r8   z!SparkListener.onBlockManagerAddedc                 C   r6   r   r   )r-   ZblockManagerRemovedr   r   r   onBlockManagerRemovedy   r8   z#SparkListener.onBlockManagerRemovedc                 C   r6   r   r   )r-   ZblockUpdatedr   r   r   onBlockUpdated}   r8   zSparkListener.onBlockUpdatedc                 C   r6   r   r   )r-   ZenvironmentUpdater   r   r   onEnvironmentUpdate   r8   z!SparkListener.onEnvironmentUpdatec                 C   r6   r   r   )r-   ZexecutorAddedr   r   r   onExecutorAdded   r8   zSparkListener.onExecutorAddedc                 C   r6   r   r   )r-   ZexecutorBlacklistedr   r   r   onExecutorBlacklisted   r8   z#SparkListener.onExecutorBlacklistedc                 C   r6   r   r   )r-   ZexecutorBlacklistedForStager   r   r   onExecutorBlacklistedForStage   s   z+SparkListener.onExecutorBlacklistedForStagec                 C   r6   r   r   )r-   ZexecutorMetricsUpdater   r   r   onExecutorMetricsUpdate   r8   z%SparkListener.onExecutorMetricsUpdatec                 C   r6   r   r   )r-   ZexecutorRemovedr   r   r   onExecutorRemoved   r8   zSparkListener.onExecutorRemovedc                 C   r6   r   r   )r-   jobEndr   r   r   onJobEnd   r8   zSparkListener.onJobEndc                 C   r6   r   r   )r-   jobStartr   r   r   
onJobStart   r8   zSparkListener.onJobStartc                 C   r6   r   r   )r-   ZnodeBlacklistedr   r   r   onNodeBlacklisted   r8   zSparkListener.onNodeBlacklistedc                 C   r6   r   r   )r-   ZnodeBlacklistedForStager   r   r   onNodeBlacklistedForStage   r8   z'SparkListener.onNodeBlacklistedForStagec                 C   r6   r   r   )r-   ZnodeUnblacklistedr   r   r   onNodeUnblacklisted   r8   z!SparkListener.onNodeUnblacklistedc                 C   r6   r   r   )r-   r*   r   r   r   onOtherEvent   r8   zSparkListener.onOtherEventc                 C   r6   r   r   )r-   ZspeculativeTaskr   r   r   onSpeculativeTaskSubmitted   r8   z(SparkListener.onSpeculativeTaskSubmittedc                 C   r6   r   r   )r-   stageCompletedr   r   r   onStageCompleted   r8   zSparkListener.onStageCompletedc                 C   r6   r   r   )r-   stageSubmittedr   r   r   onStageSubmitted   r8   zSparkListener.onStageSubmittedc                 C   r6   r   r   )r-   ZtaskEndr   r   r   	onTaskEnd   r8   zSparkListener.onTaskEndc                 C   r6   r   r   )r-   ZtaskGettingResultr   r   r   onTaskGettingResult   r8   z!SparkListener.onTaskGettingResultc                 C   r6   r   r   )r-   Z	taskStartr   r   r   onTaskStart   r8   zSparkListener.onTaskStartc                 C   r6   r   r   )r-   ZunpersistRDDr   r   r   onUnpersistRDD   r8   zSparkListener.onUnpersistRDDc                   @   s   e Zd ZdgZdS )zSparkListener.Javaz1org.apache.spark.scheduler.SparkListenerInterfaceN)r   r   r   Z
implementsr   r   r   r   Java   s    
rT   N)r   r   r   r7   r9   r:   r;   r<   r=   r>   r?   r@   rA   rB   rD   rF   rG   rH   rI   rJ   rK   rM   rO   rP   rQ   rR   rS   rT   r   r   r   r   r5   l   s4    r5   c                   @   s4   e Zd Zdd Zdd Zdd Zdd Zd	d
 ZdS )r   c                 C   s   t j| _d S r   )r   r%   hubr,   r   r   r   __init__   s   zSentryListener.__init__c                 C   s(   d | }| jjd|d t  d S )NzJob {} Startedinfo)levelmessage)formatjobIdrU   add_breadcrumbr   )r-   rE   rY   r   r   r   rF      s   
zSentryListener.onJobStartc                 C   sd   d}d}d|   i}|   dkrd}d| }n	d}d| }| jj|||d d S )	N resultZJobSucceededrW   zJob {} EndedwarningzJob {} FailedrX   rY   data)Z	jobResultZtoStringrZ   r[   rU   r\   )r-   rC   rX   rY   ra   r   r   r   rD      s   zSentryListener.onJobEndc                 C   sD   |  }d| }| | d}| jjd||d t  d S )NzStage {} Submitted	attemptIdnamerW   r`   )	stageInforZ   stageIdrc   rd   rU   r\   r   )r-   rN   
stage_inforY   ra   r   r   r   rO      s
   
zSentryListener.onStageSubmittedc                 C   s   ddl m} | }d}d}| | d}z|  |d< d| }d}W n |y<   d| }d	}Y nw | j	j
|||d
 d S )Nr   )Py4JJavaErrorr]   rb   reasonzStage {} Failedr_   zStage {} CompletedrW   r`   )Zpy4j.protocolrh   re   rc   rd   ZfailureReasonr(   rZ   rf   rU   r\   )r-   rL   rh   rg   rY   rX   ra   r   r   r   rM      s   zSentryListener.onStageCompletedN)r   r   r   rV   rF   rD   rO   rM   r   r   r   r   r      s    r   N)Z
sentry_sdkr   Zsentry_sdk.hubr   Zsentry_sdk.integrationsr   Zsentry_sdk.utilsr   Zsentry_sdk._typesr   typingr   r   r	   r
   r   r   r   r   objectr5   r   r   r   r   r   <module>   s    	7g