o
    )Zh0                     @   s$  d Z ddlZddlZddlZddlmZmZ ddlmZm	Z	 ddl
mZ ddlmZmZmZmZmZ ddlZejrCddlmZmZmZ edZg d	ZG d
d deZG dd deZdededeejf ddfddZG dd dee ZG dd dee ZG dd deZ G dd deZ!dS )a  Asynchronous queues for coroutines. These classes are very similar
to those provided in the standard library's `asyncio package
<https://docs.python.org/3/library/asyncio-queue.html>`_.

.. warning::

   Unlike the standard library's `queue` module, the classes defined here
   are *not* thread-safe. To use these queues from another thread,
   use `.IOLoop.add_callback` to transfer control to the `.IOLoop` thread
   before calling any queue methods.

    N)genioloop)Future"future_set_result_unless_cancelled)Event)UnionTypeVarGeneric	AwaitableOptional)DequeTupleAny_T)QueuePriorityQueue	LifoQueue	QueueFull
QueueEmptyc                   @      e Zd ZdZdS )r   z:Raised by `.Queue.get_nowait` when the queue has no items.N__name__
__module____qualname____doc__ r   r   E/var/www/html/lang_env/lib/python3.10/site-packages/tornado/queues.pyr   /       r   c                   @   r   )r   zBRaised by `.Queue.put_nowait` when a queue is at its maximum size.Nr   r   r   r   r   r   5   r   r   futuretimeoutreturnc                    sD   |r d fdd}t j || fdd d S d S )Nr    c                      s      s t  d S d S N)doneZset_exceptionr   TimeoutErrorr   )r   r   r   
on_timeout@   s   z _set_timeout.<locals>.on_timeoutc                    s
     S r!   )Zremove_timeout)_)io_looptimeout_handler   r   <lambda>F   s   
 z_set_timeout.<locals>.<lambda>r    N)r   ZIOLoopcurrentZadd_timeoutZadd_done_callback)r   r   r$   r   )r   r&   r'   r   _set_timeout;   s   
r+   c                   @   s(   e Zd Zd	ddZdee fddZdS )
_QueueIteratorq	Queue[_T]r    Nc                 C   s
   || _ d S r!   )r-   )selfr-   r   r   r   __init__J      
z_QueueIterator.__init__c                 C   
   | j  S r!   )r-   getr/   r   r   r   	__anext__M   r1   z_QueueIterator.__anext__)r-   r.   r    N)r   r   r   r0   r
   r   r5   r   r   r   r   r,   I   s    
r,   c                   @   s  e Zd ZdZdZd1deddfddZedefdd	Zdefd
dZ	de
fddZde
fddZ	d2dedeeeejf  ddfddZdeddfddZ	d2deeeejf  dee fddZdefddZd3ddZ	d2deeeejf  ded fddZdee fdd Zd3d!d"Zdefd#d$Zdeddfd%d&Zdeddfd'd(Zd3d)d*Z de!fd+d,Z"de!fd-d.Z#de!fd/d0Z$dS )4r   a  Coordinate producer and consumer coroutines.

    If maxsize is 0 (the default) the queue size is unbounded.

    .. testcode::

        import asyncio
        from tornado.ioloop import IOLoop
        from tornado.queues import Queue

        q = Queue(maxsize=2)

        async def consumer():
            async for item in q:
                try:
                    print('Doing work on %s' % item)
                    await asyncio.sleep(0.01)
                finally:
                    q.task_done()

        async def producer():
            for item in range(5):
                await q.put(item)
                print('Put %s' % item)

        async def main():
            # Start consumer without waiting (since it never finishes).
            IOLoop.current().spawn_callback(consumer)
            await producer()     # Wait for producer to put all tasks.
            await q.join()       # Wait for consumer to finish all tasks.
            print('Done')

        asyncio.run(main())

    .. testoutput::

        Put 0
        Put 1
        Doing work on 0
        Put 2
        Doing work on 1
        Put 3
        Doing work on 2
        Put 4
        Doing work on 3
        Doing work on 4
        Done


    In versions of Python without native coroutines (before 3.5),
    ``consumer()`` could be written as::

        @gen.coroutine
        def consumer():
            while True:
                item = yield q.get()
                try:
                    print('Doing work on %s' % item)
                    yield gen.sleep(0.01)
                finally:
                    q.task_done()

    .. versionchanged:: 4.3
       Added ``async for`` support in Python 3.5.

    Nr   maxsizer    c                 C   sb   |d u rt d|dk rtd|| _|   tg | _tg | _d| _t	 | _
| j
  d S )Nzmaxsize can't be Noner   zmaxsize can't be negative)	TypeError
ValueError_maxsize_initcollectionsdeque_getters_putters_unfinished_tasksr   	_finishedset)r/   r6   r   r   r   r0      s   zQueue.__init__c                 C   s   | j S )z%Number of items allowed in the queue.)r9   r4   r   r   r   r6      s   zQueue.maxsizec                 C   s
   t | jS )zNumber of items in the queue.)len_queuer4   r   r   r   qsize   s   
zQueue.qsizec                 C   s   | j  S r!   rC   r4   r   r   r   empty      zQueue.emptyc                 C   s   | j dkrdS |  | j kS )Nr   F)r6   rD   r4   r   r   r   full   s   
z
Queue.fullitemr   zFuture[None]c                 C   sR   t  }z| | W n ty!   | j||f t|| Y |S w |d |S )a  Put an item into the queue, perhaps waiting until there is room.

        Returns a Future, which raises `tornado.util.TimeoutError` after a
        timeout.

        ``timeout`` may be a number denoting a time (on the same
        scale as `tornado.ioloop.IOLoop.time`, normally `time.time`), or a
        `datetime.timedelta` object for a deadline relative to the
        current time.
        N)r   
put_nowaitr   r>   appendr+   
set_result)r/   rI   r   r   r   r   r   put   s   
z	Queue.putc                 C   s^   |    | jr"|  sJ d| j }| | t||   dS |  r(t| | dS )z{Put an item into the queue without blocking.

        If no free slot is immediately available, raise `QueueFull`.
        z)queue non-empty, why are getters waiting?N)	_consume_expiredr=   rF   popleft_Queue__put_internalr   _getrH   r   )r/   rI   getterr   r   r   rJ      s   

zQueue.put_nowaitc                 C   sF   t  }z
||   W |S  ty"   | j| t|| Y |S w )a.  Remove and return an item from the queue.

        Returns an awaitable which resolves once an item is available, or raises
        `tornado.util.TimeoutError` after a timeout.

        ``timeout`` may be a number denoting a time (on the same
        scale as `tornado.ioloop.IOLoop.time`, normally `time.time`), or a
        `datetime.timedelta` object for a deadline relative to the
        current time.

        .. note::

           The ``timeout`` argument of this method differs from that
           of the standard library's `queue.Queue.get`. That method
           interprets numeric values as relative timeouts; this one
           interprets them as absolute deadlines and requires
           ``timedelta`` objects for relative timeouts (consistent
           with other timeouts in Tornado).

        )r   rL   
get_nowaitr   r=   rK   r+   )r/   r   r   r   r   r   r3      s   z	Queue.getc                 C   s\   |    | jr$|  sJ d| j \}}| | t|d |  S |  r,|  S t)zRemove and return an item from the queue without blocking.

        Return an item if one is immediately available, else raise
        `QueueEmpty`.
        z(queue not full, why are putters waiting?N)	rN   r>   rH   rO   rP   r   rQ   rD   r   )r/   rI   Zputterr   r   r   rS      s   

zQueue.get_nowaitc                 C   s<   | j dkr	td|  j d8  _ | j dkr| j  dS dS )a  Indicate that a formerly enqueued task is complete.

        Used by queue consumers. For each `.get` used to fetch a task, a
        subsequent call to `.task_done` tells the queue that the processing
        on the task is complete.

        If a `.join` is blocking, it resumes when all items have been
        processed; that is, when every `.put` is matched by a `.task_done`.

        Raises `ValueError` if called more times than `.put`.
        r   z!task_done() called too many times   N)r?   r8   r@   rA   r4   r   r   r   	task_done  s   

zQueue.task_donec                 C   s   | j |S )zBlock until all items in the queue are processed.

        Returns an awaitable, which raises `tornado.util.TimeoutError` after a
        timeout.
        )r@   wait)r/   r   r   r   r   join$  s   z
Queue.joinc                 C   s   t | S r!   )r,   r4   r   r   r   	__aiter__.  rG   zQueue.__aiter__c                 C   s   t  | _d S r!   )r;   r<   rC   r4   r   r   r   r:   2  s   zQueue._initc                 C   r2   r!   )rC   rO   r4   r   r   r   rQ   5  r1   z
Queue._getc                 C      | j | d S r!   rC   rK   r/   rI   r   r   r   _put8     z
Queue._putc                 C   s&   |  j d7  _ | j  | | d S )NrT   )r?   r@   clearr\   r[   r   r   r   Z__put_internal=  s   
zQueue.__put_internalc                 C   s|   | j r| j d d  r| j   | j r| j d d  s| jr8| jd  r<| j  | jr:| jd  s'd S d S d S d S )Nr   rT   )r>   r"   rO   r=   r4   r   r   r   rN   B  s   

$zQueue._consume_expiredc                 C   s    dt | jtt| |  f S )Nz<%s at %s %s>)typer   hexid_formatr4   r   r   r   __repr__J  s    zQueue.__repr__c                 C   s   dt | j|  f S )Nz<%s %s>)r_   r   rb   r4   r   r   r   __str__M  s   zQueue.__str__c                 C   sn   d| j f }t| dd r|d| j 7 }| jr|dt| j 7 }| jr+|dt| j 7 }| jr5|d| j 7 }|S )Nz
maxsize=%rrC   z	 queue=%rz getters[%s]z putters[%s]z	 tasks=%s)r6   getattrrC   r=   rB   r>   r?   )r/   resultr   r   r   rb   P  s   zQueue._format)r   r!   r)   )%r   r   r   r   rC   intr0   propertyr6   rD   boolrF   rH   r   r   r   floatdatetime	timedeltarM   rJ   r
   r3   rS   rU   rW   r,   rX   r:   rQ   r\   rP   rN   strrc   rd   rb   r   r   r   r   r   Q   sR    E






r   c                   @   :   e Zd ZdZdddZdeddfddZdefd	d
ZdS )r   a  A `.Queue` that retrieves entries in priority order, lowest first.

    Entries are typically tuples like ``(priority number, data)``.

    .. testcode::

        import asyncio
        from tornado.queues import PriorityQueue

        async def main():
            q = PriorityQueue()
            q.put((1, 'medium-priority item'))
            q.put((0, 'high-priority item'))
            q.put((10, 'low-priority item'))

            print(await q.get())
            print(await q.get())
            print(await q.get())

        asyncio.run(main())

    .. testoutput::

        (0, 'high-priority item')
        (1, 'medium-priority item')
        (10, 'low-priority item')
    r    Nc                 C   
   g | _ d S r!   rE   r4   r   r   r   r:   z  r1   zPriorityQueue._initrI   c                 C   s   t | j| d S r!   )heapqheappushrC   r[   r   r   r   r\   }  s   zPriorityQueue._putc                 C   s   t | jS r!   )rp   heappoprC   r4   r   r   r   rQ     s   zPriorityQueue._getr)   r   r   r   r   r:   r   r\   rQ   r   r   r   r   r   ]  s
    
r   c                   @   rn   )r   a  A `.Queue` that retrieves the most recently put items first.

    .. testcode::

        import asyncio
        from tornado.queues import LifoQueue

        async def main():
            q = LifoQueue()
            q.put(3)
            q.put(2)
            q.put(1)

            print(await q.get())
            print(await q.get())
            print(await q.get())

        asyncio.run(main())

    .. testoutput::

        1
        2
        3
    r    Nc                 C   ro   r!   rE   r4   r   r   r   r:     r1   zLifoQueue._initrI   c                 C   rY   r!   rZ   r[   r   r   r   r\     r]   zLifoQueue._putc                 C   r2   r!   )rC   popr4   r   r   r   rQ     r1   zLifoQueue._getr)   rs   r   r   r   r   r     s
    
r   )"r   r;   rk   rp   tornador   r   Ztornado.concurrentr   r   Ztornado.locksr   typingr   r   r	   r
   r   TYPE_CHECKINGr   r   r   r   __all__	Exceptionr   r   rj   rl   r+   r,   r   r   r   r   r   r   r   <module>   s8   
  '