
    `fq                         d Z ddlZddlmZ ddlmZ ddlmZ ddlm	Z	 ddl
mZ ddlmZ dd	lmZ dd
lmZ ddlmZ dZ ee          ZdZd Zd Z G d d          Z G d d          ZdS )a-  Worker Remote Control Client.

Client for worker remote control commands.
Server implementation is in :mod:`celery.worker.control`.
There are two types of remote control commands:

* Inspect commands: Does not have side effects, will usually just return some value
  found in the worker, like the list of currently registered tasks, the list of active tasks, etc.
  Commands are accessible via :class:`Inspect` class.

* Control commands: Performs side effects, like adding a new queue to consume from.
  Commands are accessible via :class:`Control` class.
    N)TERM_SIGNAMEmatch)Mailbox)register_after_fork)lazy)cached_property)DuplicateNodenameWarning)
get_logger)	pluralize)InspectControlflatten_replyzReceived multiple replies from node {0}: {1}.
Please make sure you give each node a unique nodename using
the celery worker `-n` option.c                 l   i t                      c| D ]&}fd|D                                  |           'rut          j        t	          t
                              t          t                    d          d	                    t                                                             S )zFlatten node replies.

    Convert from a list of replies in this format::

        [{'a@example.com': reply},
         {'b@example.com': reply}]

    into this format::

        {'a@example.com': reply,
         'b@example.com': reply}
    c                 B    g | ]}|v                      |          S  )add).0namedupesnodess     D/var/www/html/env/lib/python3.11/site-packages/celery/app/control.py
<listcomp>z!flatten_reply.<locals>.<listcomp>5   s(    ;;;TTU]]4]]]    r   z, )setupdatewarningswarnr
   	W_DUPNODEformatr   lenjoinsorted)replyitemr   r   s     @@r   r   r   &   s     suuLE5  ;;;;;T;;;;T .#e**f--tyy/G/G 
 
 	 	 	
 Lr   c                     	 |                                   d S # t          $ r(}t                              d|d           Y d }~d S d }~ww xY w)Nzafter fork raised exception: %r   )exc_info)_after_fork	Exceptionloggerinfo)controlexcs     r   _after_fork_cleanup_controlr/   @   sn    H H H H5sQGGGGGGGGGHs    
A
AA
c                       e Zd ZdZdZ	 	 	 ddZd Zd Zd Zd Z	dd	Z
dd
ZddZd Zd Zd ZeZddZd Zd ZddZddZd ZddZddZdS ) r   zAPI for inspecting workers.

    This class provides proxy for accessing Inspect API of workers. The API is
    defined in :py:mod:`celery.worker.control`
    N      ?c	                     |p| j         | _         || _        || _        || _        || _        || _        || _        || _        d S N)appdestinationtimeoutcallback
connectionlimitpatternmatcher)	selfr5   r6   r7   r8   r4   r9   r:   r;   s	            r   __init__zInspect.__init__P   sG     ?$(& $
r   c                     |rt          |          }| j        r;t          | j        t          t          f          s|                    | j                  S | j        r/| j        | j        fd|                                D             S |S d S )Nc                 >    i | ]\  }}t          |          ||S r   r   )r   noder$   r;   r:   s      r   
<dictcomp>z$Inspect._prepare.<locals>.<dictcomp>e   s?     : : :e w88:e : : :r   )	r   r5   
isinstancelisttuplegetr:   r;   items)r<   r$   by_noder;   r:   s      @@r   _preparezInspect._prepare\   s     
	#E**G  5"4#3dE]CC5{{4#3444| :,,: : : : :w}} : : : :N
	 
	r   c                     |                      | j        j                            ||| j        | j        | j        | j        | j        d| j	        | j
        
  
                  S )NT)		argumentsr5   r7   r8   r9   r6   r$   r:   r;   )rH   r4   r-   	broadcastr5   r7   r8   r9   r6   r:   r;   )r<   commandkwargss      r   _requestzInspect._requesti   s_    }}TX-77(]*LL$, 8 	
 	
 	 	 		r   c                 ,    |                      d          S )zReturn human readable report for each worker.

        Returns:
            Dict: Dictionary ``{HOSTNAME: {'ok': REPORT_STRING}}``.
        reportrN   r<   s    r   rP   zInspect.reportu   s     }}X&&&r   c                 ,    |                      d          S )zGet the Clock value on workers.

        >>> app.control.inspect().clock()
        {'celery@node1': {'clock': 12}}

        Returns:
            Dict: Dictionary ``{HOSTNAME: CLOCK_VALUE}``.
        clockrQ   rR   s    r   rT   zInspect.clock}   s     }}W%%%r   c                 0    |                      d|          S )aC  Return list of tasks currently executed by workers.

        Arguments:
            safe (Boolean): Set to True to disable deserialization.

        Returns:
            Dict: Dictionary ``{HOSTNAME: [TASK_INFO,...]}``.

        See Also:
            For ``TASK_INFO`` details see :func:`query_task` return value.

        active)saferQ   r<   rW   s     r   rV   zInspect.active   s     }}XD}111r   c                 ,    |                      d          S )a  Return list of scheduled tasks with details.

        Returns:
            Dict: Dictionary ``{HOSTNAME: [TASK_SCHEDULED_INFO,...]}``.

        Here is the list of ``TASK_SCHEDULED_INFO`` fields:

        * ``eta`` - scheduled time for task execution as string in ISO 8601 format
        * ``priority`` - priority of the task
        * ``request`` - field containing ``TASK_INFO`` value.

        See Also:
            For more details about ``TASK_INFO``  see :func:`query_task` return value.
        	scheduledrQ   rX   s     r   rZ   zInspect.scheduled   s     }}[)))r   c                 ,    |                      d          S )zReturn list of currently reserved tasks, not including scheduled/active.

        Returns:
            Dict: Dictionary ``{HOSTNAME: [TASK_INFO,...]}``.

        See Also:
            For ``TASK_INFO`` details see :func:`query_task` return value.
        reservedrQ   rX   s     r   r\   zInspect.reserved   s     }}Z(((r   c                 ,    |                      d          S )u  Return statistics of worker.

        Returns:
            Dict: Dictionary ``{HOSTNAME: STAT_INFO}``.

        Here is the list of ``STAT_INFO`` fields:

        * ``broker`` - Section for broker information.
            * ``connect_timeout`` - Timeout in seconds (int/float) for establishing a new connection.
            * ``heartbeat`` - Current heartbeat value (set by client).
            * ``hostname`` - Node name of the remote broker.
            * ``insist`` - No longer used.
            * ``login_method`` - Login method used to connect to the broker.
            * ``port`` - Port of the remote broker.
            * ``ssl`` - SSL enabled/disabled.
            * ``transport`` - Name of transport used (e.g., amqp or redis)
            * ``transport_options`` - Options passed to transport.
            * ``uri_prefix`` - Some transports expects the host name to be a URL.
              E.g. ``redis+socket:///tmp/redis.sock``.
              In this example the URI-prefix will be redis.
            * ``userid`` - User id used to connect to the broker with.
            * ``virtual_host`` - Virtual host used.
        * ``clock`` - Value of the workers logical clock. This is a positive integer
          and should be increasing every time you receive statistics.
        * ``uptime`` - Numbers of seconds since the worker controller was started
        * ``pid`` - Process id of the worker instance (Main process).
        * ``pool`` - Pool-specific section.
            * ``max-concurrency`` - Max number of processes/threads/green threads.
            * ``max-tasks-per-child`` - Max number of tasks a thread may execute before being recycled.
            * ``processes`` - List of PIDs (or thread-id’s).
            * ``put-guarded-by-semaphore`` - Internal
            * ``timeouts`` - Default values for time limits.
            * ``writes`` - Specific to the prefork pool, this shows the distribution
              of writes to each process in the pool when using async I/O.
        * ``prefetch_count`` - Current prefetch count value for the task consumer.
        * ``rusage`` - System usage statistics. The fields available may be different on your platform.
          From :manpage:`getrusage(2)`:

            * ``stime`` - Time spent in operating system code on behalf of this process.
            * ``utime`` - Time spent executing user instructions.
            * ``maxrss`` - The maximum resident size used by this process (in kilobytes).
            * ``idrss`` - Amount of non-shared memory used for data (in kilobytes times
              ticks of execution)
            * ``isrss`` - Amount of non-shared memory used for stack space
              (in kilobytes times ticks of execution)
            * ``ixrss`` - Amount of memory shared with other processes
              (in kilobytes times ticks of execution).
            * ``inblock`` - Number of times the file system had to read from the disk
              on behalf of this process.
            * ``oublock`` - Number of times the file system has to write to disk
              on behalf of this process.
            * ``majflt`` - Number of page faults that were serviced by doing I/O.
            * ``minflt`` - Number of page faults that were serviced without doing I/O.
            * ``msgrcv`` - Number of IPC messages received.
            * ``msgsnd`` - Number of IPC messages sent.
            * ``nvcsw`` - Number of times this process voluntarily invoked a context switch.
            * ``nivcsw`` - Number of times an involuntary context switch took place.
            * ``nsignals`` - Number of signals received.
            * ``nswap`` - The number of times this process was swapped entirely
              out of memory.
        * ``total`` - Map of task names and the total number of tasks with that type
          the worker has accepted since start-up.
        statsrQ   rR   s    r   r^   zInspect.stats   s    @ }}W%%%r   c                 ,    |                      d          S )zReturn list of revoked tasks.

        >>> app.control.inspect().revoked()
        {'celery@node1': ['16f527de-1c72-47a6-b477-c472b92fef7a']}

        Returns:
            Dict: Dictionary ``{HOSTNAME: [TASK_ID, ...]}``.
        revokedrQ   rR   s    r   r`   zInspect.revoked   s     }}Y'''r   c                 0    |                      d|          S )aD  Return all registered tasks per worker.

        >>> app.control.inspect().registered()
        {'celery@node1': ['task1', 'task1']}
        >>> app.control.inspect().registered('serializer', 'max_retries')
        {'celery@node1': ['task_foo [serializer=json max_retries=3]', 'tasb_bar [serializer=json max_retries=3]']}

        Arguments:
            taskinfoitems (Sequence[str]): List of :class:`~celery.app.task.Task`
                                           attributes to include.

        Returns:
            Dict: Dictionary ``{HOSTNAME: [TASK1_INFO, ...]}``.
        
registered)taskinfoitemsrQ   )r<   rc   s     r   rb   zInspect.registered   s     }}\}GGGr   c                 >    |r|| _         |                     d          S )aG  Ping all (or specific) workers.

        >>> app.control.inspect().ping()
        {'celery@node1': {'ok': 'pong'}, 'celery@node2': {'ok': 'pong'}}
        >>> app.control.inspect().ping(destination=['celery@node1'])
        {'celery@node1': {'ok': 'pong'}}

        Arguments:
            destination (List): If set, a list of the hosts to send the
                command to, when empty broadcast to all workers.

        Returns:
            Dict: Dictionary ``{HOSTNAME: {'ok': 'pong'}}``.

        See Also:
            :meth:`broadcast` for supported keyword arguments.
        ping)r5   rN   )r<   r5   s     r   re   zInspect.ping  s&    $  	+*D}}V$$$r   c                 ,    |                      d          S )a  Return information about queues from which worker consumes tasks.

        Returns:
            Dict: Dictionary ``{HOSTNAME: [QUEUE_INFO, QUEUE_INFO,...]}``.

        Here is the list of ``QUEUE_INFO`` fields:

        * ``name``
        * ``exchange``
            * ``name``
            * ``type``
            * ``arguments``
            * ``durable``
            * ``passive``
            * ``auto_delete``
            * ``delivery_mode``
            * ``no_declare``
        * ``routing_key``
        * ``queue_arguments``
        * ``binding_arguments``
        * ``consumer_arguments``
        * ``durable``
        * ``exclusive``
        * ``auto_delete``
        * ``no_ack``
        * ``alias``
        * ``bindings``
        * ``no_declare``
        * ``expires``
        * ``message_ttl``
        * ``max_length``
        * ``max_length_bytes``
        * ``max_priority``

        See Also:
            See the RabbitMQ/AMQP documentation for more details about
            ``queue_info`` fields.
        Note:
            The ``queue_info`` fields are RabbitMQ/AMQP oriented.
            Not all fields applies for other transports.
        active_queuesrQ   rR   s    r   rg   zInspect.active_queues(  s    T }}_---r   c                     t          |          dk    r*t          |d         t          t          f          r|d         }|                     d|          S )a  Return detail of tasks currently executed by workers.

        Arguments:
            *ids (str): IDs of tasks to be queried.

        Returns:
            Dict: Dictionary ``{HOSTNAME: {TASK_ID: [STATE, TASK_INFO]}}``.

        Here is the list of ``TASK_INFO`` fields:
            * ``id`` - ID of the task
            * ``name`` - Name of the task
            * ``args`` - Positinal arguments passed to the task
            * ``kwargs`` - Keyword arguments passed to the task
            * ``type`` - Type of the task
            * ``hostname`` - Hostname of the worker processing the task
            * ``time_start`` - Time of processing start
            * ``acknowledged`` - True when task was acknowledged to broker
            * ``delivery_info`` - Dictionary containing delivery information
                * ``exchange`` - Name of exchange where task was published
                * ``routing_key`` - Routing key used when task was published
                * ``priority`` - Priority used when task was published
                * ``redelivered`` - True if the task was redelivered
            * ``worker_pid`` - PID of worker processing the task

        r'   r   
query_task)ids)r!   rB   rC   rD   rN   )r<   rj   s     r   ri   zInspect.query_taskT  sH    8 s88q==ZAu>>=a&C}}\s}333r   Fc                 0    |                      d|          S )a  Return configuration of each worker.

        Arguments:
            with_defaults (bool): if set to True, method returns also
                                   configuration options with default values.

        Returns:
            Dict: Dictionary ``{HOSTNAME: WORKER_CONFIGURATION}``.

        See Also:
            ``WORKER_CONFIGURATION`` is a dictionary containing current configuration options.
            See :ref:`configuration` for possible values.
        conf)with_defaultsrQ   )r<   rm   s     r   rl   zInspect.conft  s     }}V=}AAAr   c                 2    |                      d||          S )Nhello)	from_noder`   rQ   )r<   rp   r`   s      r   ro   zInspect.hello  s    }}W	7}KKKr   c                 ,    |                      d          S )ziReturn sample current RSS memory usage.

        Note:
            Requires the psutils library.
        	memsamplerQ   rR   s    r   rr   zInspect.memsample  s     }}[)))r   
   c                 0    |                      d|          S )zqDump statistics of previous memsample requests.

        Note:
            Requires the psutils library.
        memdump)samplesrQ   )r<   rv   s     r   ru   zInspect.memdump  s     }}Y}888r   Request   c                 4    |                      d|||          S )a  Create graph of uncollected objects (memory-leak debugging).

        Arguments:
            n (int): Max number of objects to graph.
            max_depth (int): Traverse at most n levels deep.
            type (str): Name of object to graph.  Default is ``"Request"``.

        Returns:
            Dict: Dictionary ``{'filename': FILENAME}``

        Note:
            Requires the objgraph library.
        objgraph)num	max_depthtyperQ   )r<   r}   nr|   s       r   rz   zInspect.objgraph  s     }}ZQ)$}OOOr   )Nr1   NNNNNNr3   )F)rs   )rw   rx   rs   )__name__
__module____qualname____doc__r4   r=   rH   rN   rP   rT   rV   rZ   r\   r^   r`   rb   registered_tasksre   rg   ri   rl   ro   rr   ru   rz   r   r   r   r   r   G   s         C?C@D
 
 
 
  
 
 
' ' '	& 	& 	&2 2 2 2* * * *"	) 	) 	) 	)@& @& @&D	( 	( 	(H H H  "% % % %,*. *. *.X4 4 4@B B B B L L L L* * *9 9 9 9P P P P P Pr   r   c                       e Zd ZdZeZddZd Zed             ZddZ	e	Z
ddZddefd	Zddefd
ZdefdZddZddZ	 	 d dZddZ	 	 d!dZddZddZd"dZd"dZddZddZ	 	 d#dZddZ	 	 	 d$dZdS )%r   zWorker remote control client.Nc                 2    | _                              |j        j        d|j        j        |j        j        t           fd          |j        j        |j        j        |j        j        |j        j        	  	         _	        t           t                     d S )Nfanoutc                  &     j         j        j        S r3   )r4   amqpproducer_poolrR   s   r   <lambda>z"Control.__init__.<locals>.<lambda>  s    tx}'B r   )r}   accept
serializerr   	queue_ttlreply_queue_ttlqueue_expiresreply_queue_expires)r4   r   rl   control_exchangeaccept_contenttask_serializerr   control_queue_ttlcontrol_queue_expiresmailboxr   r/   )r<   r4   s   ` r   r=   zControl.__init__  s    ||H%8*x/BBBBCCh0H6(8 # > $ 

 

 	D"=>>>>>r   c                     | j         `d S r3   )r   r   rR   s    r   r)   zControl._after_fork  s    L&&&r   c                 D    | j                             t          d          S )z%Create new :class:`Inspect` instance.zcontrol.inspect)reverse)r4   subclass_with_selfr   rR   s    r   inspectzControl.inspect  s      x**7<M*NNNr   c                     | j                             |          5 }| j         j                            |                                          cddd           S # 1 swxY w Y   dS )a  Discard all waiting tasks.

        This will ignore all tasks waiting for execution, and they will
        be deleted from the messaging server.

        Arguments:
            connection (kombu.Connection): Optional specific connection
                instance to use.  If not provided a connection will
                be acquired from the connection pool.

        Returns:
            int: the number of tasks discarded.
        N)r4   connection_or_acquirer   TaskConsumerpurge)r<   r8   conns      r   r   zControl.purge  s     X++J77 	<48=--d3399;;	< 	< 	< 	< 	< 	< 	< 	< 	< 	< 	< 	< 	< 	< 	< 	< 	< 	<s   1AA Ac           	      @    |                      d|d |||d           d S )Nelection)idtopicaction)r8   r5   rJ   rK   )r<   r   r   r   r8   s        r   r   zControl.election  s?    :45F  	 	
 	
 	
 	
 	
r   Fc                 *     | j         d||||dd|S )a[  Tell all (or specific) workers to revoke a task by id (or list of ids).

        If a task is revoked, the workers will ignore the task and
        not execute it after all.

        Arguments:
            task_id (Union(str, list)): Id of the task to revoke
                (or list of ids).
            terminate (bool): Also terminate the process currently working
                on the task (if any).
            signal (str): Name of signal to send to process if terminate.
                Default is TERM.

        See Also:
            :meth:`broadcast` for supported keyword arguments.
        revoke)task_id	terminatesignalr5   rJ   r   r   )r<   r   r5   r   r   rM   s         r   r   zControl.revoke  sE    $ t~ K"L
 L
   	  	r   c                      | j         d||||dd|}t                      }|r7|D ]4}|                                D ]}	|                    |	d                    5|r | j        t          |          f|||d|S |S )aN  
        Tell all (or specific) workers to revoke a task by headers.

        If a task is revoked, the workers will ignore the task and
        not execute it after all.

        Arguments:
            headers (dict[str, Union(str, list)]): Headers to match when revoking tasks.
            terminate (bool): Also terminate the process currently working
                on the task (if any).
            signal (str): Name of signal to send to process if terminate.
                Default is TERM.

        See Also:
            :meth:`broadcast` for supported keyword arguments.
        revoke_by_stamped_headers)headersr   r   r   okr5   r   r   N)r   )rK   r   valuesr   r   rC   )
r<   r   r5   r   r   rM   resulttask_idshostresponses
             r   r   z!Control.revoke_by_stamped_headers  s    $   "a
 a
   	  55 	4 4 4 $ 4 4HOOHTN33334  	4;tH~~u;R[djuuntuuuMr   c                 &     | j         |f|d|d|S )zTell all (or specific) workers to terminate a task by id (or list of ids).

        See Also:
            This is just a shortcut to :meth:`revoke` with the terminate
            argument enabled.
        Tr   r   )r<   r   r5   r   rM   s        r   r   zControl.terminate  s<     t{N#tFN NFLN N 	Nr   r1   c                 (     | j         	 ddi ||d|S )a  Ping all (or specific) workers.

        >>> app.control.ping()
        [{'celery@node1': {'ok': 'pong'}}, {'celery@node2': {'ok': 'pong'}}]
        >>> app.control.ping(destination=['celery@node2'])
        [{'celery@node2': {'ok': 'pong'}}]

        Returns:
            List[Dict]: List of ``{HOSTNAME: {'ok': 'pong'}}`` dictionaries.

        See Also:
            :meth:`broadcast` for supported keyword arguments.
        re   T)r$   rJ   r5   r6   )re   r   )r<   r5   r6   rM   s       r   re   zControl.ping%  s9     t~'"+' '%' ' 	'r   c                 *     | j         	 d|||dd|S )a  Tell workers to set a new rate limit for task by type.

        Arguments:
            task_name (str): Name of task to change rate limit for.
            rate_limit (int, str): The rate limit as tasks per second,
                or a rate limit string (`'100/m'`, etc.
                see :attr:`celery.app.task.Task.rate_limit` for
                more information).

        See Also:
            :meth:`broadcast` for supported keyword arguments.
        
rate_limit)	task_namer   r   )r   r   )r<   r   r   r5   rM   s        r   r   zControl.rate_limit7  sD     t~#&(     	r   directc                 H     | j         	 d|t          ||||dfi |pi d|S )a  Tell all (or specific) workers to start consuming from a new queue.

        Only the queue name is required as if only the queue is specified
        then the exchange/routing key will be set to the same name (
        like automatic queues do).

        Note:
            This command does not respect the default queue/exchange
            options in the configuration.

        Arguments:
            queue (str): Name of queue to start consuming from.
            exchange (str): Optional name of exchange.
            exchange_type (str): Type of exchange (defaults to 'direct')
                command to, when empty broadcast to all workers.
            routing_key (str): Optional routing key.
            options (Dict): Additional options as supported
                by :meth:`kombu.entity.Queue.from_dict`.

        See Also:
            :meth:`broadcast` for supported keyword arguments.
        add_consumer)queueexchangeexchange_typerouting_keyr   )r   )rK   dict)r<   r   r   r   r   optionsr5   rM   s           r   r   zControl.add_consumerM  sj    2 t~

#$!.*	     
 B   

 

 

 

 
	
r   c                 (     | j         	 d|d|id|S )zTell all (or specific) workers to stop consuming from ``queue``.

        See Also:
            Supports the same arguments as :meth:`broadcast`.
        cancel_consumerr   r   )r   r   )r<   r   r5   rM   s       r   r   zControl.cancel_consumerr  s:     t~2+6&2 2*02 2 	2r   c                 ,     | j         	 d|||d|d|S )aS  Tell workers to set time limits for a task by type.

        Arguments:
            task_name (str): Name of task to change time limits for.
            soft (float): New soft time limit (in seconds).
            hard (float): New hard time limit (in seconds).
            **kwargs (Any): arguments passed on to :meth:`broadcast`.
        
time_limit)r   hardsoftrJ   r5   )r   r   )r<   r   r   r   r5   rM   s         r   r   zControl.time_limit|  sK     t~ ' 
 $    	r   c                 $     | j         	 di |d|S )zTell all (or specific) workers to enable events.

        See Also:
            Supports the same arguments as :meth:`broadcast`.
        enable_eventsr   )r   r   r<   r5   rM   s      r   r   zControl.enable_events  s:     t~N'){N NFLN N 	Nr   c                 $     | j         	 di |d|S )zTell all (or specific) workers to disable events.

        See Also:
            Supports the same arguments as :meth:`broadcast`.
        disable_eventsr   )r   r   r   s      r   r   zControl.disable_events  s:     t~O(*O OGMO O 	Or   r'   c                 (     | j         	 dd|i|d|S )zTell all (or specific) workers to grow the pool by ``n``.

        See Also:
            Supports the same arguments as :meth:`broadcast`.
        	pool_growr~   r   )r   r   r<   r~   r5   rM   s       r   r   zControl.pool_grow  s>     t~P$'8P PHNP P 	Pr   c                 (     | j         	 dd|i|d|S )zTell all (or specific) workers to shrink the pool by ``n``.

        See Also:
            Supports the same arguments as :meth:`broadcast`.
        pool_shrinkr~   r   )r   r   r   s       r   r   zControl.pool_shrink  s9     t~/&)1X#/ /'-/ / 	/r   c                 *     | j         	 d||d|d|S )z}Change worker(s) autoscale setting.

        See Also:
            Supports the same arguments as :meth:`broadcast`.
        	autoscale)maxminr   )r   r   )r<   r   r   r5   rM   s        r   r   zControl.autoscale  s=     t~/+.s#;#;#/ /'-/ / 	/r   c                 $     | j         	 di |d|S )zlShutdown worker(s).

        See Also:
            Supports the same arguments as :meth:`broadcast`
        shutdownr   )r   r   r   s      r   r   zControl.shutdown  s:     t~I"$+I IAGI I 	Ir   c                 ,     | j         	 d|||d|d|S )a  Restart the execution pools of all or specific workers.

        Keyword Arguments:
            modules (Sequence[str]): List of modules to reload.
            reload (bool): Flag to enable module reloading.  Default is False.
            reloader (Any): Function to reload a module.
            destination (Sequence[str]): List of worker names to send this
                command to.

        See Also:
            Supports the same arguments as :meth:`broadcast`
        pool_restart)modulesreloadreloaderr   )r   r   )r<   r   r   r   r5   rM   s         r   r   zControl.pool_restart  sK     t~/ # $ 
 $/ / (./ / 	/r   c                 $     | j         	 di |d|S )zTell worker(s) to send a heartbeat immediately.

        See Also:
            Supports the same arguments as :meth:`broadcast`
        	heartbeatr   )r   r   r   s      r   r   zControl.heartbeat  s:     t~J#%;J JBHJ J 	Jr   c                 l   | j                             |          5 }t          |pi fi |}|
r@|r>|                     |                              ||||||||	|
|
  
        cddd           S |                     |                              ||||||||	          cddd           S # 1 swxY w Y   dS )a  Broadcast a control command to the celery workers.

        Arguments:
            command (str): Name of command to send.
            arguments (Dict): Keyword arguments for the command.
            destination (List): If set, a list of the hosts to send the
                command to, when empty broadcast to all workers.
            connection (kombu.Connection): Custom broker connection to use,
                if not set, a connection will be acquired from the pool.
            reply (bool): Wait for and return the reply.
            timeout (float): Timeout in seconds to wait for the reply.
            limit (int): Limit number of replies.
            callback (Callable): Callback called immediately for
                each reply received.
            pattern (str): Custom pattern string to match
            matcher (Callable): Custom matcher to run the pattern to match
        )channelr:   r;   N)r   )r4   r   r   r   
_broadcast)r<   rL   rJ   r5   r8   r$   r6   r9   r7   r   r:   r;   extra_kwargsr   s                 r   rK   zControl.broadcast  s@   * X++J77 	4Y_"====I 7  ||D))44YUG8W#W 5  	 	 	 	 	 	 	 	 ||D))44YUG8W 5  	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	s   AB)-/B))B-0B-r3   )NN)Nr1   )Nr   NNN)NNN)r'   N)NFNN)
NNNFr1   NNNNN)r   r   r   r   r   r=   r)   r	   r   r   discard_allr   r   r   r   r   re   r   r   r   r   r   r   r   r   r   r   r   r   rK   r   r   r   r   r     sA       ''G? ? ? ?' ' ' O O _O< < < <  K
 
 
 
 +/%"   0 >BU)5! ! ! !H #<
N 
N 
N 
N' ' ' '$   . IM/3#
 #
 #
 #
J2 2 2 2 59#   (N N N NO O O OP P P P/ / / // / / /I I I I AE!%/ / / /.J J J J >BCGEI# # # # # #r   r   )r   r   billiard.commonr   kombu.matcherr   kombu.pidboxr   kombu.utils.compatr   kombu.utils.functionalr   kombu.utils.objectsr	   celery.exceptionsr
   celery.utils.logr   celery.utils.textr   __all__r   r+   r   r   r/   r   r   r   r   r   <module>r      sm     ( ( ( ( ( (                   2 2 2 2 2 2 ' ' ' ' ' ' / / / / / / 6 6 6 6 6 6 ' ' ' ' ' ' ' ' ' ' ' '
1	H			  4H H H^P ^P ^P ^P ^P ^P ^P ^PBc c c c c c c c c cr   