o
    "if-                     @  sJ  d Z ddlmZ ddlZddlZddlmZ ddlmZ ddl	m
Z
 ddl	mZ ddl	mZ dd	l	mZ dd
l	mZ ddl	mZ ddl	mZ ddl	mZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ e
rddlmZ e Z G dd dZ!G dd dZ"d$d"d#Z#e! Z$e$j%Z%e$j&Z&dS )%zk
Managing Gateway Groups and interactions with multiple channels.

(c) 2008-2014, Holger Krekel and others
    )annotationsNpartial)Lock)TYPE_CHECKING)Any)Callable)Iterable)Iterator)Literal)Sequence)overload   )gateway_bootstrap)
gateway_io)Channel)	ExecModel)
WorkerPool)get_execmodel)trace)XSpec)Gatewayc                   @  s   e Zd ZdZdZ	d>d?ddZed@ddZed@ddZ	dAdBddZ	dCddZ
dDddZdEdd ZdFd"d#ZdGd%d&ZdAdHd)d*ZdId,d-ZdJd/d0ZdJd1d2ZdKd3d4ZdAdLd7d8ZdMd<d=ZdS )NGroupzGateway Group.popen threadxspecsIterable[XSpec | str | None]	execmodelstrreturnNonec                 C  sH   g | _ d| _t | _g | _| | |D ]}| | qt| j	 dS )z|Initialize a group and make gateways as specified.

        execmodel can be one of the supported execution models.
        r   N)
	_gateways_autoidcounterr   _autoidlock_gateways_to_joinset_execmodelmakegatewayatexitregister_cleanup_atexit)selfr   r   xspecr   r   F/var/www/html/corbot_env/lib/python3.10/site-packages/execnet/multi.py__init__+   s   
zGroup.__init__r   c                 C     | j S N)
_execmodelr+   r   r   r-   r   @      zGroup.execmodelc                 C  r/   r0   )_remote_execmodelr2   r   r   r-   remote_execmodelD   r3   zGroup.remote_execmodelNr5   
str | Nonec                 C  s2   | j rtd|du r|}t|| _t|| _dS )ae  Set the execution model for local and remote site.

        execmodel can be one of the supported execution models.
        It determines the execution model for any newly created gateway.
        If remote_execmodel is not specified it takes on the value of execmodel.

        NOTE: Execution models can only be set before any gateway is created.
        zBcan not set execution models if gateways have been created alreadyN)r"   
ValueErrorr   r1   r4   )r+   r   r5   r   r   r-   r&   H   s   
zGroup.set_execmodelc                 C  s   dd | D }d| S )Nc                 S  s   g | ]}|j qS r   )id.0gwr   r   r-   
<listcomp>]   s    z"Group.__repr__.<locals>.<listcomp>z
<Group %r>r   )r+   
idgatewaysr   r   r-   __repr__\   s   zGroup.__repr__keyint | str | Gatewayr   c                 C  sB   t |tr
| j| S | jD ]}||ks|j|kr|  S qt|r0   )
isinstanceintr"   r8   KeyError)r+   r?   r;   r   r   r-   __getitem__`   s   


zGroup.__getitem__boolc                 C  s$   z| |  W dS  t y   Y dS w )NTF)rC   r+   r?   r   r   r-   __contains__h   s   zGroup.__contains__rB   c                 C  
   t | jS r0   )lenr"   r2   r   r   r-   __len__o      
zGroup.__len__Iterator[Gateway]c                 C  s   t t| jS r0   )iterlistr"   r2   r   r   r-   __iter__r   s   zGroup.__iter__specXSpec | str | Nonec                 C  sZ  |s| j }t|tst|}| | |jdu r| jj|_|jrD|jr%J | |j }|	t
}|t| t
|| j}t||}n9|jsM|jsM|jr\t
j|| jd}t||}n!|jruddlm} |j|| | jd}t||}ntd|j||_| | |js|js|jr|	d}	|jrt|jpd}
|	|j|
|jf |	  |S )a  Create and configure a gateway to a Python interpreter.

        The ``spec`` string encodes the target gateway type
        and configuration information. The general format is::

            key1=value1//key2=value2//...

        If you leave out the ``=value`` part a True value is assumed.
        Valid types: ``popen``, ``ssh=hostname``, ``socket=host:port``.
        Valid configuration::

            id=<string>     specifies the gateway id
            python=<path>   specifies which python interpreter to execute
            execmodel=model 'thread', 'main_thread_only', 'eventlet', 'gevent' execution model
            chdir=<path>    specifies to which directory to change
            nice=<path>     specifies process priority of new process
            env:NAME=value  specifies a remote environment variable setting.

        If no spec is given, self.defaultspec is used.
        N)r   r   )gateway_socketzno gateway type found for a  
                import os
                path, nice, env = channel.receive()
                if path:
                    if not os.path.exists(path):
                        os.mkdir(path)
                    os.chdir(path)
                if nice and hasattr(os, 'nice'):
                    os.nice(nice)
                if env:
                    for name, value in env.items():
                        os.environ[name] = value
            r   )defaultspecrA   r   allocate_idr   r5   backendviasocketremote_execr   sendvarsProxyIOr   	bootstrapr   sshvagrant_ssh	create_io rR   r7   _specrP   	_registerchdirniceenvrB   	waitclose)r+   rP   masterproxy_channelproxy_io_masterr;   iorR   siochannelrd   r   r   r-   r'   u   s@   







zGroup.makegatewayr   c                 C  sr   |j du r7| j% dt| j }|  jd7  _|| v r"td|||_ W d   dS 1 s0w   Y  dS dS )z4(re-entrant) allocate id for the given xspec object.Nr;   r   zalready have gateway with id )r8   r$   r   r#   r7   )r+   rP   r8   r   r   r-   rT      s   
"zGroup.allocate_idgatewayc                 C  s<   t |drJ |jsJ |j| vsJ | j| | |_d S )N_group)hasattrr8   r"   appendrn   r+   rm   r   r   r-   rb      s
   

zGroup._registerc                 C  s   | j | | j| d S r0   )r"   remover%   rp   rq   r   r   r-   _unregister   s   zGroup._unregisterc                 C  s    t d| d | jdd d S )Nz=== atexit cleanup z ===g      ?timeout)r   	terminater2   r   r   r-   r*      s   zGroup._cleanup_atexitru   float | Nonec                   s   | rHt  }| D ]}|jjr||jj q| D ]}|j|vr"|  qddd dddt| j| fd	d
| jD  g | jdd< | sdS dS )aB  Trigger exit of member gateways and wait for termination
        of member gateways and associated subprocesses.

        After waiting timeout seconds try to to kill local sub processes of
        popen- and ssh-gateways.

        Timeout defaults to None meaning open-ended waiting and no kill
        attempts.
        r;   r   r    r!   c                 S  s   |    | j  d S r0   )join_iowaitr;   r   r   r-   	join_wait   s   z"Group.terminate.<locals>.join_waitc                 S  s   t d|   | j  d S )Nz,Gateways did not come down after timeout: %r)r   ry   killr{   r   r   r-   r}      s   zGroup.terminate.<locals>.killc                   s    g | ]}t  |t |fqS r   r   r9   r|   r}   r   r-   r<      s    z#Group.terminate.<locals>.<listcomp>N)r;   r   r    r!   )	setrP   rV   addr8   exitsafe_terminater   r%   )r+   ru   viasr;   r   r~   r-   rv      s*   



zGroup.terminatesourceCstr | types.FunctionType | Callable[..., object] | types.ModuleTypeMultiChannelc                 K  s.   g }| D ]}| |j|fi | qt|S )zlremote_exec source on all member gateways and return
        a MultiChannel connecting to all sub processes.)rp   rX   r   )r+   r   kwargschannelsr;   r   r   r-   rX      s   zGroup.remote_exec)r   r   )r   r   r   r   r    r!   )r    r   r0   )r   r   r5   r6   r    r!   )r    r   )r?   r@   r    r   )r?   r   r    rE   r    rB   )r    rL   )rP   rQ   r    r   )rP   r   r    r!   )rm   r   r    r!   r    r!   )ru   rw   r    r!   )r   r   r    r   )__name__
__module____qualname____doc__rS   r.   propertyr   r5   r&   r>   rD   rG   rJ   rO   r'   rT   rb   rs   r*   rv   rX   r   r   r   r-   r   &   s.    





D



%r   c                   @  s   e Zd Zd+ddZd,dd	Zd-ddZd.ddZd/ddZd0ddZe	d1d2ddZ
e	d3d!dZ
	"d4d5d$dZ
efd6d&d'Zd7d(d)Zd*S )8r   r   Sequence[Channel]r    r!   c                 C  s
   || _ d S r0   	_channels)r+   r   r   r   r-   r.     rK   zMultiChannel.__init__rB   c                 C  rH   r0   )rI   r   r2   r   r   r-   rJ     rK   zMultiChannel.__len__Iterator[Channel]c                 C  rH   r0   )rM   r   r2   r   r   r-   rO     rK   zMultiChannel.__iter__r?   r   c                 C  s
   | j | S r0   r   rF   r   r   r-   rD     rK   zMultiChannel.__getitem__chanrE   c                 C  s
   || j v S r0   r   )r+   r   r   r   r-   rG     rK   zMultiChannel.__contains__itemobjectc                 C  s   | j D ]}|| qd S r0   )r   rY   )r+   r   chr   r   r-   	send_each  s   
zMultiChannel.send_each.withchannelLiteral[False]	list[Any]c                 C     d S r0   r   r+   r   r   r   r-   receive_each     zMultiChannel.receive_eachLiteral[True]list[tuple[Channel, Any]]c                 C  r   r0   r   r   r   r   r-   r     r   F%list[tuple[Channel, Any]] | list[Any]c                 C  sH   t | drJ g }| jD ]}| }|r|||f q|| q|S )N_queue)ro   r   receiverp   )r+   r   lr   objr   r   r-   r      s   
	endmarkerc              	     s   z j W S  ty@   d  _  jD ])} j d u r |jjj  _ |fd fdd}|tu r3|| q|j||d q j  Y S w )	Nrl   r   r    r!   c                   s    j || f d S r0   )r   put)r   rl   r2   r   r-   putreceived6  s   z4MultiChannel.make_receive_queue.<locals>.putreceived)r   )rl   r   r    r!   )	r   AttributeErrorr   rm   r   queueQueueNO_ENDMARKER_WANTEDsetcallback)r+   r   r   r   r   r2   r-   make_receive_queue-  s   


zMultiChannel.make_receive_queuec                 C  sZ   d }| j D ]!}z|  W q |jy& } z|d u r|}W Y d }~qd }~ww |r+|d S r0   )r   rf   RemoteError)r+   firstr   excr   r   r-   rf   ?  s   
zMultiChannel.waitcloseN)r   r   r    r!   r   )r    r   )r?   rB   r    r   )r   r   r    rE   )r   r   r    r!   ).)r   r   r    r   )r   r   r    r   )F)r   rE   r    r   )r   r   r   )r   r   r   r.   rJ   rO   rD   rG   r   r   r   r   r   rf   r   r   r   r-   r     s    





r   r   r   ru   rw   r    r!   c                   sd   t | d fdd}g }|D ]\}}|||}|| q|D ]}|  q#j d d S )Nr    r!   c                   s8    | }z	|j d W d S  ty   |  Y d S w )Nrt   )spawngetOSError)termfunckillfunc	termreplyru   
workerpoolr   r-   termkillP  s   
z safe_terminate.<locals>.termkillrt   r   )r   r   rp   r   waitall)r   ru   list_of_paired_functionsr   	replylistr   r   replyr   r   r-   r   K  s   
r   )r   r   ru   rw   r    r!   )'r   
__future__r   r(   types	functoolsr   	threadingr   typingr   r   r   r	   r
   r   r   r   r`   r   r   gateway_baser   r   r   r   r   r,   r   rm   r   r   r   r   r   r   default_groupr'   r&   r   r   r   r-   <module>   s@     _
G
