o
    )ZhC                     @   s   d dl Z d dlZd dlZd dlmZmZ d dlmZmZ d dl	m
Z
mZmZmZmZ d dl	Z	e	jr9d dl	mZmZ g dZG dd deZG d	d
 d
eZG dd deZG dd deZG dd deZG dd deZG dd deZdS )    N)genioloop)Future"future_set_result_unless_cancelled)UnionOptionalTypeAny	Awaitable)DequeSet)	ConditionEvent	SemaphoreBoundedSemaphoreLockc                   @   s$   e Zd ZdZdddZdddZdS )	_TimeoutGarbageCollectorzBase class for objects that periodically clean up timed-out waiters.

    Avoids memory leak in a common pattern like:

        while True:
            yield condition.wait(short_timeout)
            print('looping....')
    returnNc                 C   s   t  | _d| _d S )Nr   )collectionsdeque_waiters	_timeoutsself r   D/var/www/html/lang_env/lib/python3.10/site-packages/tornado/locks.py__init__)   s   

z!_TimeoutGarbageCollector.__init__c                 C   s>   |  j d7  _ | j dkrd| _ tdd | jD | _d S d S )N   d   r   c                 s   s    | ]	}|  s|V  qd S N)done).0wr   r   r   	<genexpr>2   s    z<_TimeoutGarbageCollector._garbage_collect.<locals>.<genexpr>)r   r   r   r   r   r   r   r   _garbage_collect-   s
   
z)_TimeoutGarbageCollector._garbage_collectr   N)__name__
__module____qualname____doc__r   r$   r   r   r   r   r      s    
	r   c                   @   sd   e Zd ZdZdefddZ	ddeeee	j
f  dee fddZdd
eddfddZdddZdS )r   a  A condition allows one or more coroutines to wait until notified.

    Like a standard `threading.Condition`, but does not need an underlying lock
    that is acquired and released.

    With a `Condition`, coroutines can wait to be notified by other coroutines:

    .. testcode::

        import asyncio
        from tornado import gen
        from tornado.locks import Condition

        condition = Condition()

        async def waiter():
            print("I'll wait right here")
            await condition.wait()
            print("I'm done waiting")

        async def notifier():
            print("About to notify")
            condition.notify()
            print("Done notifying")

        async def runner():
            # Wait for waiter() and notifier() in parallel
            await gen.multi([waiter(), notifier()])

        asyncio.run(runner())

    .. testoutput::

        I'll wait right here
        About to notify
        Done notifying
        I'm done waiting

    `wait` takes an optional ``timeout`` argument, which is either an absolute
    timestamp::

        io_loop = IOLoop.current()

        # Wait up to 1 second for a notification.
        await condition.wait(timeout=io_loop.time() + 1)

    ...or a `datetime.timedelta` for a timeout relative to the current time::

        # Wait up to 1 second.
        await condition.wait(timeout=datetime.timedelta(seconds=1))

    The method returns False if there's no notification before the deadline.

    .. versionchanged:: 5.0
       Previously, waiters could be notified synchronously from within
       `notify`. Now, the notification will always be received on the
       next iteration of the `.IOLoop`.
    r   c                 C   s.   d| j jf }| jr|dt| j 7 }|d S )Nz<%sz waiters[%s]>)	__class__r&   r   len)r   resultr   r   r   __repr__q   s   zCondition.__repr__Ntimeoutc                    sT   t  j |r(dfdd}tj   || fdd S )zWait for `.notify`.

        Returns a `.Future` that resolves ``True`` if the condition is notified,
        or ``False`` after a timeout.
        r   Nc                      s     s	td    d S NF)r    r   r$   r   r   waiterr   r   
on_timeout   s   
z"Condition.wait.<locals>.on_timeoutc                    
     S r   Zremove_timeout_io_looptimeout_handler   r   <lambda>      
 z Condition.wait.<locals>.<lambda>r%   )r   r   appendr   IOLoopcurrentadd_timeoutadd_done_callbackr   r/   r3   r   r9   r   r:   r2   r   waitw   s   
zCondition.waitr   nc                 C   sT   g }|r| j r| j  }| s|d8 }|| |r| j s|D ]}t|d q dS )zWake ``n`` waiters.r   TN)r   popleftr    r=   r   )r   rE   waitersr2   r   r   r   notify   s   



zCondition.notifyc                 C   s   |  t| j dS )zWake all waiters.N)rH   r,   r   r   r   r   r   
notify_all   s   zCondition.notify_allr   r   r%   )r&   r'   r(   r)   strr.   r   r   floatdatetime	timedeltar
   boolrD   intrH   rI   r   r   r   r   r   5   s    ;
r   c                   @   sr   e Zd ZdZdddZdefddZdefdd	Zdd
dZ	dddZ
	ddeeeejf  ded fddZdS )r   a  An event blocks coroutines until its internal flag is set to True.

    Similar to `threading.Event`.

    A coroutine can wait for an event to be set. Once it is set, calls to
    ``yield event.wait()`` will not block unless the event has been cleared:

    .. testcode::

        import asyncio
        from tornado import gen
        from tornado.locks import Event

        event = Event()

        async def waiter():
            print("Waiting for event")
            await event.wait()
            print("Not waiting this time")
            await event.wait()
            print("Done")

        async def setter():
            print("About to set the event")
            event.set()

        async def runner():
            await gen.multi([waiter(), setter()])

        asyncio.run(runner())

    .. testoutput::

        Waiting for event
        About to set the event
        Not waiting this time
        Done
    r   Nc                 C   s   d| _ t | _d S r0   )_valuesetr   r   r   r   r   r      s   zEvent.__init__c                 C   s    d| j j|  rdf S df S )Nz<%s %s>rR   clear)r+   r&   is_setr   r   r   r   r.      s   
zEvent.__repr__c                 C   s   | j S )z-Return ``True`` if the internal flag is true.rQ   r   r   r   r   rT      s   zEvent.is_setc                 C   s2   | j sd| _ | jD ]}| s|d q	dS dS )zSet the internal flag to ``True``. All waiters are awakened.

        Calling `.wait` once the flag is set will not block.
        TN)rQ   r   r    
set_result)r   futr   r   r   rR      s   

z	Event.setc                 C   s
   d| _ dS )zkReset the internal flag to ``False``.

        Calls to `.wait` will block until `.set` is called.
        FNrU   r   r   r   r   rS      s   
zEvent.clearr/   c                    sf   t   jr d  S j   fdd |du r" S t| }| fdd |S )zBlock until the internal flag is true.

        Returns an awaitable, which raises `tornado.util.TimeoutError` after a
        timeout.
        Nc                    s    j | S r   )r   removerW   r   r   r   r;      s    zEvent.wait.<locals>.<lambda>c                    s      s  S d S r   )r    cancel)tfrY   r   r   r;      s    )r   rQ   rV   r   addrA   r   Zwith_timeout)r   r/   Ztimeout_futr   )rW   r   r   rD      s   

z
Event.waitr%   r   )r&   r'   r(   r)   r   rK   r.   rO   rT   rR   rS   r   r   rL   rM   rN   r
   rD   r   r   r   r   r      s    
'

r   c                   @   sP   e Zd ZdZdeddfddZdddZd	d
dee dee	j
 ddfddZdS )_ReleasingContextManagerzReleases a Lock or Semaphore at the end of a "with" statement.

    with (yield semaphore.acquire()):
        pass

    # Now semaphore.release() has been called.
    objr   Nc                 C   s
   || _ d S r   )_obj)r   r^   r   r   r   r     s   
z!_ReleasingContextManager.__init__c                 C   s   d S r   r   r   r   r   r   	__enter__  s   z"_ReleasingContextManager.__enter__exc_typeOptional[Type[BaseException]]exc_valexc_tbc                 C   s   | j   d S r   )r_   release)r   ra   rc   rd   r   r   r   __exit__  s   z!_ReleasingContextManager.__exit__r%   )r&   r'   r(   r)   r	   r   r`   r   BaseExceptiontypesTracebackTyperf   r   r   r   r   r]     s    
r]   c                       s   e Zd ZdZddeddf fddZdef fdd	Zdd
dZ	dde	e
eejf  dee fddZdddZddde	e de	ej ddfddZdddZddde	e de	ej ddfddZ  ZS )r   a  A lock that can be acquired a fixed number of times before blocking.

    A Semaphore manages a counter representing the number of `.release` calls
    minus the number of `.acquire` calls, plus an initial value. The `.acquire`
    method blocks if necessary until it can return without making the counter
    negative.

    Semaphores limit access to a shared resource. To allow access for two
    workers at a time:

    .. testsetup:: semaphore

       from collections import deque

       from tornado import gen
       from tornado.ioloop import IOLoop
       from tornado.concurrent import Future

       inited = False

       async def simulator(futures):
           for f in futures:
               # simulate the asynchronous passage of time
               await gen.sleep(0)
               await gen.sleep(0)
               f.set_result(None)

       def use_some_resource():
           global inited
           global futures_q
           if not inited:
               inited = True
               # Ensure reliable doctest output: resolve Futures one at a time.
               futures_q = deque([Future() for _ in range(3)])
               IOLoop.current().add_callback(simulator, list(futures_q))

           return futures_q.popleft()

    .. testcode:: semaphore

        import asyncio
        from tornado import gen
        from tornado.locks import Semaphore

        sem = Semaphore(2)

        async def worker(worker_id):
            await sem.acquire()
            try:
                print("Worker %d is working" % worker_id)
                await use_some_resource()
            finally:
                print("Worker %d is done" % worker_id)
                sem.release()

        async def runner():
            # Join all workers.
            await gen.multi([worker(i) for i in range(3)])

        asyncio.run(runner())

    .. testoutput:: semaphore

        Worker 0 is working
        Worker 1 is working
        Worker 0 is done
        Worker 2 is working
        Worker 1 is done
        Worker 2 is done

    Workers 0 and 1 are allowed to run concurrently, but worker 2 waits until
    the semaphore has been released once, by worker 0.

    The semaphore can be used as an async context manager::

        async def worker(worker_id):
            async with sem:
                print("Worker %d is working" % worker_id)
                await use_some_resource()

            # Now the semaphore has been released.
            print("Worker %d is done" % worker_id)

    For compatibility with older versions of Python, `.acquire` is a
    context manager, so ``worker`` could also be written as::

        @gen.coroutine
        def worker(worker_id):
            with (yield sem.acquire()):
                print("Worker %d is working" % worker_id)
                yield use_some_resource()

            # Now the semaphore has been released.
            print("Worker %d is done" % worker_id)

    .. versionchanged:: 4.3
       Added ``async with`` support in Python 3.5.

    r   valuer   Nc                    s$   t    |dk rtd|| _d S )Nr   z$semaphore initial value must be >= 0)superr   
ValueErrorrQ   r   rj   r+   r   r   r     s   

zSemaphore.__init__c                    sP   t   }| jdkrdnd| j}| jrd|t| j}d|dd |S )Nr   lockedzunlocked,value:{0}z{0},waiters:{1}z<{0} [{1}]>r   )rk   r.   rQ   formatr   r,   )r   resextrarn   r   r   r.     s   
zSemaphore.__repr__c                 C   sT   |  j d7  _ | jr(| j }| s#|  j d8  _ |t|  dS | js
dS dS )*Increment the counter and wake one waiter.r   N)rQ   r   rF   r    rV   r]   r1   r   r   r   re     s   
zSemaphore.releaser/   c                    s~   t  jdkr jd8  _t S j |r=d	fdd}tj   	||
 fdd S )
zDecrement the counter. Returns an awaitable.

        Block if the counter is zero and wait for a `.release`. The awaitable
        raises `.TimeoutError` after the deadline.
        r   r   r   Nc                      s"     st     d S r   )r    Zset_exceptionr   TimeoutErrorr$   r   r1   r   r   r3     s   z%Semaphore.acquire.<locals>.on_timeoutc                    r4   r   r5   r6   r8   r   r   r;     r<   z#Semaphore.acquire.<locals>.<lambda>r%   )r   rQ   rV   r]   r   r=   r   r>   r?   r@   rA   rB   r   rC   r   acquire  s   

zSemaphore.acquirec                 C      t d)Nz0Use 'async with' instead of 'with' for SemaphoreRuntimeErrorr   r   r   r   r`        zSemaphore.__enter__typrb   	tracebackc                 C      |    d S r   r`   )r   r{   rj   r|   r   r   r   rf        zSemaphore.__exit__c                       |   I d H  d S r   rv   r   r   r   r   
__aenter__     zSemaphore.__aenter__tbc                       |    d S r   re   r   r{   rj   r   r   r   r   	__aexit__     zSemaphore.__aexit__rJ   r%   r   )r&   r'   r(   r)   rP   r   rK   r.   re   r   r   rL   rM   rN   r
   r]   rv   r`   rg   rh   ri   rf   r   r   __classcell__r   r   rn   r   r     s>    d
	



r   c                       s:   e Zd ZdZd
deddf fddZd fdd	Z  ZS )r   a:  A semaphore that prevents release() being called too many times.

    If `.release` would increment the semaphore's value past the initial
    value, it raises `ValueError`. Semaphores are mostly used to guard
    resources with limited capacity, so a semaphore released too many times
    is a sign of a bug.
    r   rj   r   Nc                    s   t  j|d || _d S )Nrj   )rk   r   _initial_valuerm   rn   r   r   r     s   
zBoundedSemaphore.__init__c                    s"   | j | jkr
tdt   dS )rt   z!Semaphore released too many timesN)rQ   r   rl   rk   re   r   rn   r   r   re     s   zBoundedSemaphore.releaserJ   r%   )r&   r'   r(   r)   rP   r   re   r   r   r   rn   r   r     s    r   c                   @   s   e Zd ZdZdddZdefddZ	ddeee	e
jf  dee fd	d
ZdddZdddZdddee deej ddfddZdddZdddee deej ddfddZdS )r   a  A lock for coroutines.

    A Lock begins unlocked, and `acquire` locks it immediately. While it is
    locked, a coroutine that yields `acquire` waits until another coroutine
    calls `release`.

    Releasing an unlocked lock raises `RuntimeError`.

    A Lock can be used as an async context manager with the ``async
    with`` statement:

    >>> from tornado import locks
    >>> lock = locks.Lock()
    >>>
    >>> async def f():
    ...    async with lock:
    ...        # Do something holding the lock.
    ...        pass
    ...
    ...    # Now the lock is released.

    For compatibility with older versions of Python, the `.acquire`
    method asynchronously returns a regular context manager:

    >>> async def f2():
    ...    with (yield lock.acquire()):
    ...        # Do something holding the lock.
    ...        pass
    ...
    ...    # Now the lock is released.

    .. versionchanged:: 4.3
       Added ``async with`` support in Python 3.5.

    r   Nc                 C   s   t dd| _d S )Nr   r   )r   _blockr   r   r   r   r     s   zLock.__init__c                 C   s   d| j j| jf S )Nz<%s _block=%s>)r+   r&   r   r   r   r   r   r.     s   zLock.__repr__r/   c                 C   s   | j |S )zAttempt to lock. Returns an awaitable.

        Returns an awaitable, which raises `tornado.util.TimeoutError` after a
        timeout.
        )r   rv   )r   r/   r   r   r   rv     s   zLock.acquirec                 C   s(   z| j   W dS  ty   tdw )zUnlock.

        The first coroutine in line waiting for `acquire` gets the lock.

        If not locked, raise a `RuntimeError`.
        zrelease unlocked lockN)r   re   rl   ry   r   r   r   r   re     s
   zLock.releasec                 C   rw   )Nz+Use `async with` instead of `with` for Lockrx   r   r   r   r   r`   (  rz   zLock.__enter__r{   rb   rj   r   c                 C   r}   r   r~   r   r   r   r   rf   +  r   zLock.__exit__c                    r   r   r   r   r   r   r   r   3  r   zLock.__aenter__c                    r   r   r   r   r   r   r   r   6  r   zLock.__aexit__r%   r   )r&   r'   r(   r)   r   rK   r.   r   r   rL   rM   rN   r
   r]   rv   re   r`   rg   rh   ri   rf   r   r   r   r   r   r   r     s>    
$





r   )r   rM   rh   tornador   r   Ztornado.concurrentr   r   typingr   r   r   r	   r
   TYPE_CHECKINGr   r   __all__objectr   r   r   r]   r   r   r   r   r   r   r   <module>   s$   id :