o
    %if;                     @  sz   d dl mZ d dlmZ d dlmZ d dlmZ d dlZd dlm	Z	 d dl
mZ d dlmZ d d	lmZ G d
d dZdS )    )annotations)OrderedDict)NoReturn)SequenceN)Producer)report_collection_diff)parse_spec_config)WorkerControllerc                   @  s   e Zd ZdZd?d@d	d
ZedAddZedBddZedBddZedBddZ	dCddZ
dDddZdEddZ	 dFdGd%d&ZdHd*d+ZdId.d/ZdCd0d1ZdJd3d4ZdKd7d8ZdCd9d:ZdLd;d<ZdBd=d>ZdS )MLoadScopeSchedulinga  Implement load scheduling across nodes, but grouping test by scope.

    This distributes the tests collected across all nodes so each test is run
    just once.  All nodes collect and submit the list of tests and when all
    collections are received it is verified they are identical collections.
    Then the collection gets divided up in work units, grouped by test scope,
    and those work units get submitted to nodes.  Whenever a node finishes an
    item, it calls ``.mark_test_complete()`` which will trigger the scheduler
    to assign more work units if the number of pending tests for the node falls
    below a low-watermark.

    When created, ``numnodes`` defines how many nodes are expected to submit a
    collection. This is used to know when all nodes have finished collection.

    Attributes::

    :numnodes: The expected number of nodes taking part.  The actual number of
       nodes will vary during the scheduler's lifetime as nodes are added by
       the DSession as they are brought up and removed either because of a dead
       node or normal shutdown.  This number is primarily used to know when the
       initial collection is completed.

    :collection: The final list of tests collected by all nodes once it is
       validated to be identical between all the nodes.  It is initialised to
       None until ``.schedule()`` is called.

    :workqueue: Ordered dictionary that maps all available scopes with their
       associated tests (nodeid). Nodeids are in turn associated with their
       completion status. One entry of the workqueue is called a work unit.
       In turn, a collection of work unit is called a workload.

       ::

            workqueue = {
                '<full>/<path>/<to>/test_module.py': {
                    '<full>/<path>/<to>/test_module.py::test_case1': False,
                    '<full>/<path>/<to>/test_module.py::test_case2': False,
                    (...)
                },
                (...)
            }

    :assigned_work: Ordered dictionary that maps worker nodes with their
       assigned work units.

       ::

            assigned_work = {
                '<worker node A>': {
                    '<full>/<path>/<to>/test_module.py': {
                        '<full>/<path>/<to>/test_module.py::test_case1': False,
                        '<full>/<path>/<to>/test_module.py::test_case2': False,
                        (...)
                    },
                    (...)
                },
                (...)
            }

    :registered_collections: Ordered dictionary that maps worker nodes with
       their collection of tests gathered during test discovery.

       ::

            registered_collections = {
                '<worker node A>': [
                    '<full>/<path>/<to>/test_module.py::test_case1',
                    '<full>/<path>/<to>/test_module.py::test_case2',
                ],
                (...)
            }

    :log: A py.log.Producer instance.

    :config: Config object, used for handling hooks.
    Nconfigpytest.ConfiglogProducer | NonereturnNonec                 C  sN   t t|| _d | _t | _i | _i | _|d u rtd| _	n|j
| _	|| _d S )Nloadscopesched)lenr   numnodes
collectionr   	workqueueassigned_workregistered_collectionsr   r   r   r   )selfr   r    r   R/var/www/html/corbot_env/lib/python3.10/site-packages/xdist/scheduler/loadscope.py__init__]   s   
zLoadScopeScheduling.__init__list[WorkerController]c                 C  s   t | j S )z,A list of all active nodes in the scheduler.)listr   keysr   r   r   r   nodesl   s   zLoadScopeScheduling.nodesboolc                 C  s   t | j| jkS )zBoolean indication initial test collection is complete.

        This is a boolean indicating all initial participating nodes have
        finished collection.  The required number of initial nodes is defined
        by ``.numnodes``.
        )r   r   r   r   r   r   r   collection_is_completedq   s   z+LoadScopeScheduling.collection_is_completedc                 C  s<   | j sdS | jr
dS | j D ]}| |dkr dS qdS )z9Return True if all tests have been executed by the nodes.F   T)r"   r   r   values_pending_ofr   assigned_unitr   r   r   tests_finished{   s   z"LoadScopeScheduling.tests_finishedc                 C  s2   | j rdS | j D ]}| |dkr dS q
dS )zReturn True if there are pending test items.

        This indicates that collection has finished and nodes are still
        processing test items, so this can be thought of as
        "the scheduler is active".
        Tr   F)r   r   r$   r%   r&   r   r   r   has_pending   s   zLoadScopeScheduling.has_pendingnoder	   c                 C  s   || j vsJ i | j |< dS )zAdd a new node to the scheduler.

        From now on the node will be assigned work units to be executed.

        Called by the ``DSession.worker_workerready`` hook when it successfully
        bootstraps a new node.
        N)r   r   r*   r   r   r   add_node   s   zLoadScopeScheduling.add_node
str | Nonec                 C  sx   | j |}| |sdS | D ]}| D ]
\}}|s!|} nqq ntd| j| | j D ]}| | q2|S )a  Remove a node from the scheduler.

        This should be called either when the node crashed or at shutdown time.
        In the former case any pending items assigned to the node will be
        re-scheduled.

        Called by the hooks:

        - ``DSession.worker_workerfinished``.
        - ``DSession.worker_errordown``.

        Return the item being executed while the node crashed or None if the
        node has no more pending items.
        Nz=Unable to identify crashitem on a workload with pending items)	r   popr%   r$   itemsRuntimeErrorr   update_reschedule)r   r*   workload	work_unitnodeid	completed	crashitemr   r   r   remove_node   s$   

zLoadScopeScheduling.remove_noder   Sequence[str]c                 C  sr   || j v sJ | jr0| jsJ || jkr0tt| j }t| j||jj	|jj	}| 
| dS t|| j|< dS )zAdd the collected test items from a node.

        The collection is stored in the ``.registered_collections`` dictionary.

        Called by the hook:

        - ``DSession.worker_collectionfinish``.
        N)r   r"   r   nextiterr   r   r   gatewayidr   r   )r   r*   r   
other_nodemsgr   r   r   add_node_collection   s   


z'LoadScopeScheduling.add_node_collectionr   
item_indexintdurationfloatc                 C  s8   | j | | }| |}d| j| | |< | | dS )zuMark test item as completed by node.

        Called by the hook:

        - ``DSession.worker_testreport``.
        TN)r   _split_scoper   r2   )r   r*   rA   rC   r5   scoper   r   r   mark_test_complete   s   	
z&LoadScopeScheduling.mark_test_completeitemstrr   c                 C     t  NNotImplementedError)r   rH   r   r   r   mark_test_pending   s   z%LoadScopeScheduling.mark_test_pendingindicesSequence[int]c                 C  rJ   rK   rL   )r   r*   rO   r   r   r   remove_pending_tests_from_node   s   z2LoadScopeScheduling.remove_pending_tests_from_nodec                   s`   | j sJ | j jdd\}}| j|i }|||< | j|   fdd| D }|| dS )zAssign a work unit to a node.F)lastc                   s   g | ]\}}|s  |qS r   )index).0r5   r6   worker_collectionr   r   
<listcomp>  s    z9LoadScopeScheduling._assign_work_unit.<locals>.<listcomp>N)r   popitemr   
setdefaultr   r/   send_runtest_some)r   r*   rF   r4   assigned_to_nodenodeids_indexesr   rU   r   _assign_work_unit  s   


z%LoadScopeScheduling._assign_work_unitr5   c                 C  s   | ddd S )av  Determine the scope (grouping) of a nodeid.

        There are usually 3 cases for a nodeid::

            example/loadsuite/test/test_beta.py::test_beta0
            example/loadsuite/test/test_delta.py::Delta1::test_delta0
            example/loadsuite/epsilon/__init__.py::epsilon.epsilon

        #. Function in a test module.
        #. Method of a class in a test module.
        #. Doctest in a function in a package.

        This function will group tests with the scope determined by splitting
        the first ``::`` from the right. That is, classes will be grouped in a
        single work unit, and functions from a test module will be grouped by
        their module. In the above example, scopes will be::

            example/loadsuite/test/test_beta.py
            example/loadsuite/test/test_delta.py::Delta1
            example/loadsuite/epsilon/__init__.py
        z::   r   )rsplit)r   r5   r   r   r   rE     s   z LoadScopeScheduling._split_scoper3   dict[str, dict[str, bool]]c                 C  s   t dd | D }|S )z1Return the number of pending tests in a workload.c                 s  s"    | ]}t | d V  qdS )FN)r   r$   count)rT   rF   r   r   r   	<genexpr>6  s     z2LoadScopeScheduling._pending_of.<locals>.<genexpr>)sumr$   )r   r3   pendingr   r   r   r%   4  s   zLoadScopeScheduling._pending_ofc                 C  sT   |j rdS | js|  dS | dt| j | | j| dkr#dS | | dS )zMaybe schedule new items on the node.

        If there are any globally pending work units left then this will check
        if the given node should be given any more tests.
        Nz!Number of units waiting for node:r#   )shutting_downr   shutdownr   r   r%   r   r]   r+   r   r   r   r2   9  s   zLoadScopeScheduling._reschedulec                 C  sl  | j sJ | jdur| jD ]}| | qdS |  s"| d dS ttt| j	
 | _| js3dS i }| jD ]}| |}||i }d||< q8t| dd dD ]	\}}|| j|< qTt| jt| j }|dkr| d| d	 t|D ]}| j \}	}
| d
|	  |	  qy| jD ]}| | q| jD ]}| | q| js| jD ]}|  qdS dS )a  Initiate distribution of the test collection.

        Initiate scheduling of the items across the nodes.  If this gets called
        again later it behaves the same as calling ``._reschedule()`` on all
        nodes so that newly added nodes will start to be used.

        If ``.collection_is_completed`` is True, this is called by the hook:

        - ``DSession.worker_collectionfinish``.
        Nz+**Different tests collected, aborting run**Fc                 S  s   t | d  S )Nr^   )r   )rH   r   r   r   <lambda>x  s    z.LoadScopeScheduling.schedule.<locals>.<lambda>)keyr   zShutting down z nodeszShutting down unused node )r"   r   r    r2   !_check_nodes_have_same_collectionr   r   r:   r;   r   r$   rE   rY   sortedr/   r   r   ranger   rX   rf   r]   )r   r*   unsorted_workqueuer5   rF   r4   nodeidsextra_nodes_unused_nodeassignedr   r   r   scheduleR  sH   











zLoadScopeScheduling.schedulec           	      C  s   t | j }|d \}}d}|dd D ],\}}t|||jj|jj}|s'qd}| | tj|jjd|g d}| j	j
j|d q|S )	zReturn True if all nodes have collected the same items.

        If collections differ, this method returns False while logging
        the collection differences and posting collection errors to
        pytest_collectreport hook.
        r   Tr^   NFfailed)r5   outcomelongreprresult)report)r   r   r/   r   r<   r=   r   pytestCollectReportr   hookpytest_collectreport)	r   node_collection_items
first_nodecolsame_collectionr*   r   r?   repr   r   r   ri     s&   
z5LoadScopeScheduling._check_nodes_have_same_collectionrK   )r   r   r   r   r   r   )r   r   )r   r!   )r*   r	   r   r   )r*   r	   r   r-   )r*   r	   r   r9   r   r   )r   )r*   r	   rA   rB   rC   rD   r   r   )rH   rI   r   r   )r*   r	   rO   rP   r   r   )r5   rI   r   rI   )r3   r`   r   rB   )r   r   )__name__
__module____qualname____doc__r   propertyr    r"   r(   r)   r,   r8   r@   rG   rN   rQ   r]   rE   r%   r2   rr   ri   r   r   r   r   r
      s0    M	


) 






Cr
   )
__future__r   collectionsr   typingr   r   rx   xdist.remoter   xdist.reportr   xdist.workermanager   r	   r
   r   r   r   r   <module>   s    