o
    eZh:                     @   s  U d dl Z d dlZd dlZd dlZd dlmZ d dlmZmZ d dl	m
Z
mZmZmZmZmZmZmZmZmZ ddlmZmZ ddlmZ ddlmZ e
rSd d	l	mZ e d
ZG dd dZG dd deZG dd deZ e edZ!ee"ee f e#d< G dd dZ$dS )    N)defaultdict)EmptyPriorityQueue)
TYPE_CHECKINGDictListOptionalSequenceSetTupleTypeUnionoverload   )Lockordered_dict)
NodeConfig)BaseNode)Literalzelastic_transport.node_poolc                   @   s8   e Zd ZdZdee fddZdee defddZ	d	S )
NodeSelectora  
    Simple class used to select a node from a list of currently live
    node instances. In init time it is passed a dictionary containing all
    the nodes options which it can then use during the selection
    process. When the ``select()`` method is called it is given a list of
    *currently* live nodes to choose from.

    The selector is initialized with the list of seed nodes that the
    NodePool was initialized with. This list of seed nodes can be used
    to make decisions within ``select()``

    Example of where this would be useful is a zone-aware selector that would
    only select connections from it's own zones and only fall back to other
    connections where there would be none in its zones.
    node_configsc                 C   s
   || _ dS )zA
        :arg node_configs: List of NodeConfig instances
        N)r   selfr    r   S/var/www/html/lang_env/lib/python3.10/site-packages/elastic_transport/_node_pool.py__init__@   s   
zNodeSelector.__init__nodesreturnc                 C   s   t  )zl
        Select a nodes from the given list.

        :arg nodes: list of live nodes to choose from
        )NotImplementedErrorr   r   r   r   r   selectF   s   zNodeSelector.selectN)
__name__
__module____qualname____doc__r   r   r   r	   r   r    r   r   r   r   r   /   s    r   c                   @   s&   e Zd ZdZdee defddZdS )RandomSelectorzRandomly select a noder   r   c                 C   s
   t |S N)randomchoicer   r   r   r   r    R      
zRandomSelector.selectN)r!   r"   r#   r$   r	   r   r    r   r   r   r   r%   O   s    r%   c                       s@   e Zd ZdZdee f fddZdee defddZ	  Z
S )	RoundRobinSelectorzSelect a node using round-robinr   c                    s   t  | t | _d S r&   )superr   	threadinglocal_thread_localr   	__class__r   r   r   Y   s   zRoundRobinSelector.__init__r   r   c                 C   s*   t | jddd t| | j_|| jj S )Nrrr   )getattrr.   lenr1   r   r   r   r   r    ]   s   zRoundRobinSelector.select)r!   r"   r#   r$   r   r   r   r	   r   r    __classcell__r   r   r/   r   r*   V   s    r*   )Zround_robinr'   _SELECTOR_CLASS_NAMESc                   @   sv  e Zd ZdZddedfdee dee de	de	d	e
eee f d
efddZedee fddZedefddZede	fddZede	fddZd3dedee	 ddfddZdeddfddZed4dddefd d!Zed4dd"dee fd#d!Zd5dedee fd%d!Zd&eddfd'd(Zd&eddfd)d*Zdefd+d,Zdee fd-d.Zdefd/d0Zdefd1d2ZdS )6NodePoolaP  
    Container holding the :class:`~elastic_transport.BaseNode` instances,
    managing the selection process (via a
    :class:`~elastic_transport.NodeSelector`) and dead connections.

    It's only interactions are with the :class:`~elastic_transport.Transport` class
    that drives all the actions within ``NodePool``.

    Initially nodes are stored on the class as a list and, along with the
    connection options, get passed to the ``NodeSelector`` instance for
    future reference.

    Upon each request the ``Transport`` will ask for a ``BaseNode`` via the
    ``get_node`` method. If the connection fails (it's `perform_request`
    raises a `ConnectionError`) it will be marked as dead (via `mark_dead`) and
    put on a timeout (if it fails N times in a row the timeout is exponentially
    longer - the formula is `default_timeout * 2 ** (fail_count - 1)`). When
    the timeout is over the connection will be resurrected and returned to the
    live pool. A connection that has been previously marked as dead and
    succeeds will be marked as live (its fail count will be deleted).
    g      ?g      >@Tr   
node_classdead_node_backoff_factormax_dead_node_backoffnode_selector_classrandomize_nodesc                 C   s   |st dt|}tdd |D rtdt|tr3|tvr/t d|dtt	 f t| }|r:t
| tt|| _t| jt|krNt d|| _||| _t | _|D ]
}| || j|< q\t | _t| jdk| _t| j| _t | _td	d
 | _t | _|| _|| _dS )a  
        :arg node_configs: List of initial NodeConfigs to use
        :arg node_class: Type to use when creating nodes
        :arg dead_node_backoff_factor: Number of seconds used as a factor in
            calculating the amount of "backoff" time we should give a node
            after an unsuccessful request. The formula is calculated as
            follows where N is the number of consecutive failures:
            ``min(dead_backoff_factor * (2 ** (N - 1)), max_dead_backoff)``
        :arg max_dead_node_backoff: Maximum number of seconds to wait
            when calculating the "backoff" time for a dead node.
        :arg node_selector_class: :class:`~elastic_transport.NodeSelector`
            subclass to use if more than one connection is live
        :arg randomize_nodes: shuffle the list of nodes upon instantiation
            to avoid dog-piling effect across processes
        z$Must specify at least one NodeConfigc                 s   s    | ]	}t |t V  qd S r&   )
isinstancer   ).0node_configr   r   r   	<genexpr>   s    z$NodePool.__init__.<locals>.<genexpr>z6NodePool must be passed a list of NodeConfig instanceszDUnknown option for selector_class: '%s'. Available options are: '%s'z', 'z2Cannot use duplicate NodeConfigs within a NodePoolr   c                   S      dS )Nr   r   r   r   r   r   <lambda>   s    z#NodePool.__init__.<locals>.<lambda>N)
ValueErrorlistany	TypeErrorr=   strr6   joinsortedkeysr'   shuffletupleset_seed_nodesr4   _node_class_node_selectorr   
_all_nodesr   _all_nodes_write_lock_all_nodes_len_1_alive_nodesr   _dead_nodesr   _dead_consecutive_failures_removed_nodes_dead_node_backoff_factor_max_dead_node_backoff)r   r   r8   r9   r:   r;   r<   r?   r   r   r   r      sF   



zNodePool.__init__r   c                 C      | j S r&   )rO   r   r   r   r   r8         zNodePool.node_classc                 C   rZ   r&   )rP   r[   r   r   r   node_selector   r\   zNodePool.node_selectorc                 C   rZ   r&   )rX   r[   r   r   r   r9      r\   z!NodePool.dead_node_backoff_factorc                 C   rZ   r&   )rY   r[   r   r   r   r:      r\   zNodePool.max_dead_node_backoffNnode_nowc                 C   s   |dur|nt   }z| j|j= W n	 ty   Y nw | j|j d }|| j|j< zt| jd|d   | j}W n tyD   | j}Y nw | j	
|| |f td||| dS )z
        Mark the node as dead (failed). Remove it from the live pool and put it on a timeout.

        :arg node: The failed node.
        Nr      zFNode %r has failed for %i times in a row, putting on %i second timeout)timerT   configKeyErrorrV   minrX   rY   OverflowErrorrU   put_loggerwarning)r   r^   r_   nowZconsecutive_failurestimeoutr   r   r   	mark_dead   s.   
zNodePool.mark_deadc                 C   sD   z| j |j= W n
 ty   Y dS w | j|j| td| dS )z
        Mark node as healthy after a resurrection. Resets the fail counter for the node.

        :arg node: The ``BaseNode`` instance to mark as alive.
        z8Node %r has been marked alive after a successful requestN)rV   rb   rc   rT   
setdefaultrg   rh   )r   r^   r   r   r   	mark_live   s   zNodePool.mark_live.forcezLiteral[True]c                 C      d S r&   r   r   rn   r   r   r   	resurrect     zNodePool.resurrectzLiteral[False]c                 C   ro   r&   r   rp   r   r   r   rq     rr   Fc                 C   s   d}z| j jdd\}}W n ty&   |r"tt| j  Y S d}Y nw |dur=|s=|t kr=| j 	||f d}|durN|| j
|j< td|| |S )a  
        Attempt to resurrect a node from the dead queue. It will try to
        locate one (not all) eligible (it's timeout is over) node to
        return to the live pool. Any resurrected node is also returned.

        :arg force: resurrect a node even if there is none eligible (used
            when we have no live nodes). If force is 'True'' resurrect
            always returns a node.
        g        F)blockNzResurrected node %r (force=%s))rU   getr   r'   r(   rD   rQ   valuesra   rf   rT   rb   rg   info)r   rn   Zmark_node_alive_afterr^   r   r   r   rq     s   r?   c                 C   s   z| j | W n	 ty   Y nw | j4 || jvrA| |}|| j|j< d| _d| j|j< | j	
t |f W d    d S W d    d S 1 sLw   Y  d S )NFr   )rW   removerc   rR   rQ   rO   rb   rS   rV   rU   rf   ra   )r   r?   r^   r   r   r   add9  s   

"zNodePool.addc                 C   s   || j vr| j| d S d S r&   )rN   rW   rx   )r   r?   r   r   r   rw   Q  s   
zNodePool.removec                    sf        jr j jd  S  fdd j D }|s# j ddS t|dkr/ j|S |d S )a  
        Return a node from the pool using the ``NodeSelector`` instance.

        It tries to resurrect eligible nodes, forces a resurrection when
        no nodes are available and passes the list of live nodes to
        the selector instance to choose from.
        r   c                    s   g | ]\}}| j vr|qS r   )rW   )r>   r?   r^   r[   r   r   
<listcomp>i  s
    
z NodePool.get.<locals>.<listcomp>T)rn   r   )	rq   rS   rQ   rN   rT   itemsr4   rP   r    r   r   r[   r   rt   V  s   

zNodePool.getc                 C   s   t | j S r&   )rD   rQ   ru   r[   r   r   r   allx  s   zNodePool.allc                 C   rA   )Nz
<NodePool>r   r[   r   r   r   __repr__{  rr   zNodePool.__repr__c                 C   s
   t | jS r&   )r4   rQ   r[   r   r   r   __len__~  r)   zNodePool.__len__r&   ).)F) r!   r"   r#   r$   r*   r   r   r   r   floatr   rG   r   boolr   propertyr8   r]   r9   r:   r   rk   rm   r   rq   rx   rw   rt   r{   r|   intr}   r   r   r   r   r7   h   sP    
U!"r7   )%loggingr'   r,   ra   collectionsr   queuer   r   typingr   r   r   r   r	   r
   r   r   r   r   _compatr   r   Z_modelsr   _noder   r   	getLoggerrg   r   r%   r*   r6   rG   __annotations__r7   r   r   r   r   <module>   s(   
0
 