o
    )Zh(                     @   s   d dl Z d dlZd dlZd dlZd dlZd dlmZ d dlmZ d dl	m
Z
 d dlmZ d dlmZ d dlmZmZmZmZ d dlmZ G d	d
 d
eZeG dd dejZdS )    N)gen)IOStream)app_log)	TCPServer)skipIfNonUnix)AsyncTestCase	ExpectLogbind_unused_portgen_test)Tuplec                   @   s8   e Zd Zedd Zedd Zdd Zedd Zd	S )
TCPServerTestc                 c   s    G dd dt }d  }}zTt \}}| }|| tt }ttd |d|fV  |dV  |	 V  t
jV  W d    n1 sHw   Y  W |d urV|  |d ur`|  d S d S |d urj|  |d urs|  w w )Nc                   @   s   e Zd Zejdd ZdS )zFTCPServerTest.test_handle_stream_coroutine_logging.<locals>.TestServerc                 s   s&    | tdV  |  dd  d S )N   hello   r   )
read_byteslencloseselfstreamaddress r   R/var/www/html/lang_env/lib/python3.10/site-packages/tornado/test/tcpserver_test.pyhandle_stream   s   zTTCPServerTest.test_handle_stream_coroutine_logging.<locals>.TestServer.handle_streamN__name__
__module____qualname__r   	coroutiner   r   r   r   r   
TestServer   s    r   zException in callback	localhostr   )r   r	   
add_socketr   socketr   r   connectwriteread_until_closer   Zmomentstopr   )r   r   serverclientsockportr   r   r   $test_handle_stream_coroutine_logging   s2   




z2TCPServerTest.test_handle_stream_coroutine_loggingc                 c   sr    G dd dt }t \}}| }|| tt }|d|fV  | V }| |d |  |	  d S )Nc                   @   s   e Zd Zdd ZdS )zETCPServerTest.test_handle_stream_native_coroutine.<locals>.TestServerc                    s   | d |  d S )N   data)r#   r   r   r   r   r   r   3   s   
zSTCPServerTest.test_handle_stream_native_coroutine.<locals>.TestServer.handle_streamN)r   r   r   r   r   r   r   r   r   2   s    r   r   r+   )
r   r	   r    r   r!   r"   r$   assertEqualr%   r   )r   r   r(   r)   r&   r'   resultr   r   r   #test_handle_stream_native_coroutine.   s   


z1TCPServerTest.test_handle_stream_native_coroutinec                 C   s.   t  \}}t }|| |  |  d S N)r	   r   r    r%   )r   r(   r)   r&   r   r   r   test_stop_twiceA   s
   

zTCPServerTest.test_stop_twicec              	   #   s    G fdddt }t \}}| | d|fd}dd t|D }g tjfdd  fd	d|D V  | td
d zt|krR| d W D ]}|	  qUd S D ]}|	  q`w )Nc                       s   e Zd Zej fddZdS )z7TCPServerTest.test_stop_in_callback.<locals>.TestServerc                 3   s        | V  d S r/   )r%   r$   r   r&   r   r   r   O   s   zETCPServerTest.test_stop_in_callback.<locals>.TestServer.handle_streamNr   r   r1   r   r   r   N   s    r   r   (   c                 S   s   g | ]}t t qS r   )r   r!   ).0ir   r   r   
<listcomp>Y   s    z7TCPServerTest.test_stop_in_callback.<locals>.<listcomp>c                 3   s6    z|  V  W n
 ty   Y d S w  |  d S r/   )r"   EnvironmentErrorappend)c)connected_clientsserver_addrr   r   r"   \   s   z4TCPServerTest.test_stop_in_callback.<locals>.connectc                    s   g | ]} |qS r   r   )r3   r8   )r"   r   r   r5   e   s    r   zall clients failed connectingzHat least one client should fail connecting for the test to be meaningful)
r   r	   r    ranger   r   ZassertGreaterr   ZskipTestr   )r   r   r(   r)   NZclientsr8   r   )r"   r9   r&   r:   r   test_stop_in_callbackH   s0   



z#TCPServerTest.test_stop_in_callbackN)r   r   r   r
   r*   r.   r0   r=   r   r   r   r   r      s    

r   c                   @   sF   e Zd Zdedeeef fddZdd Zdd Zd	d
 Zdd Z	dS )TestMultiprocesscodereturnc              
   C   sh   zt jtjdgd|ddd}W n t jy- } ztd|j d|j d|j |d }~ww |j|jfS )Nz-Werror::DeprecationWarningTutf8)capture_outputinputencodingcheckzProcess returned z stdout=z stderr=)	
subprocessrunsys
executableCalledProcessErrorRuntimeError
returncodestdoutstderr)r   r?   r-   er   r   r   run_subproc~   s"   
zTestMultiprocess.run_subprocc                 C   >   t d}| |\}}| dt|d | |d d S )Na  
            import asyncio
            from tornado.tcpserver import TCPServer

            async def main():
                server = TCPServer()
                server.listen(0, address='127.0.0.1')

            asyncio.run(main())
            print('012', end='')
         012textwrapdedentrP   r,   joinsortedr   r?   outerrr   r   r   test_listen_single   s   z#TestMultiprocess.test_listen_singlec                 C   rQ   )Na  
            import warnings

            from tornado.ioloop import IOLoop
            from tornado.process import task_id
            from tornado.tcpserver import TCPServer

            warnings.simplefilter("ignore", DeprecationWarning)

            server = TCPServer()
            server.bind(0, address='127.0.0.1')
            server.start(3)
            IOLoop.current().run_sync(lambda: None)
            print(task_id(), end='')
        rR   rS   rT   rY   r   r   r   test_bind_start      z TestMultiprocess.test_bind_startc                 C   rQ   )Na  
            import asyncio
            from tornado.netutil import bind_sockets
            from tornado.process import fork_processes, task_id
            from tornado.ioloop import IOLoop
            from tornado.tcpserver import TCPServer

            sockets = bind_sockets(0, address='127.0.0.1')
            fork_processes(3)
            async def post_fork_main():
                server = TCPServer()
                server.add_sockets(sockets)
            asyncio.run(post_fork_main())
            print(task_id(), end='')
        rR   rS   rT   rY   r   r   r   test_add_sockets   r^   z!TestMultiprocess.test_add_socketsc                 C   rQ   )Na  
            import asyncio
            import socket
            from tornado.netutil import bind_sockets
            from tornado.process import task_id, fork_processes
            from tornado.tcpserver import TCPServer

            # Pick an unused port which we will be able to bind to multiple times.
            (sock,) = bind_sockets(0, address='127.0.0.1',
                family=socket.AF_INET, reuse_port=True)
            port = sock.getsockname()[1]

            fork_processes(3)

            async def main():
                server = TCPServer()
                server.listen(port, address='127.0.0.1', reuse_port=True)
            asyncio.run(main())
            print(task_id(), end='')
            rR   rS   rT   rY   r   r   r   test_listen_multi_reuse_port   s   z-TestMultiprocess.test_listen_multi_reuse_portN)
r   r   r   strr   rP   r\   r]   r_   r`   r   r   r   r   r>   w   s    r>   )r!   rF   rH   rU   Zunittesttornador   Ztornado.iostreamr   Ztornado.logr   Ztornado.tcpserverr   Ztornado.test.utilr   Ztornado.testingr   r   r	   r
   typingr   r   ZTestCaser>   r   r   r   r   <module>   s    f