
    `f8                        d Z ddlZddlmZ ddlmZmZ ddlmZm	Z	 ddl
mZ ddlmZ ddlmZ dd	lmZ dd
lmZ dZdZ G d de          Z G d d          Z	 	 d%dZd&dZd ZeddfdZd Z	 	 d'dZd Zd Z d Z!d Z" G d d          Z#	 	 	 	 d(dZ$d  Z%d! Z&d" Z'd# Z( eee$          Z) ee%e$          Z* ee&e$          Z+ ee'e$          Z,dS ))z,Message migration tools (Broker <-> Broker).    N)partial)cycleislice)Queue	eventloop)maybe_declare)ensure_bytes)app_or_default)worker_direct)str_to_list)StopFilteringState	republishmigrate_taskmigrate_tasksmove
task_id_eq
task_id_instart_filtermove_task_by_idmove_by_idmapmove_by_taskmapmove_directmove_direct_by_idzGMoving task {state.filtered}/{state.strtotal}: {body[task]}[{body[id]}]c                       e Zd ZdZdS )r   z*Semi-predicate used to signal filter stop.N)__name__
__module____qualname____doc__     H/var/www/html/env/lib/python3.11/site-packages/celery/contrib/migrate.pyr   r      s        4444r!   r   c                   :    e Zd ZdZdZdZdZed             Zd Z	dS )r   zMigration progress state.r   c                 <    | j         sdS t          | j                   S )N?)	total_apxstrselfs    r"   strtotalzState.strtotal&   s!    ~ 	34>"""r!   c                 F    | j         r
d| j          S | j         d| j         S )N^/)filteredcountr*   r(   s    r"   __repr__zState.__repr__,   s4    = 	'&t}&&&*..t}...r!   N)
r   r   r   r   r/   r.   r&   propertyr*   r0   r    r!   r"   r   r      sQ        ##EHI# # X#
/ / / / /r!   r   c                    |sg d}t          |j                  }|j        |j        |j        }}}||d         n|}||d         n|}|j        |j        }
}	|                    dd          }|                    dd          }|t          |          nd}|D ]}|                    |d            | j	        t          |          f|||||	|
|d| dS )zRepublish message.)application_headerscontent_typecontent_encodingheadersNexchangerouting_keycompression
expiration)r7   r8   r9   r6   r4   r5   r:   )
r	   bodydelivery_infor6   
propertiesr4   r5   popfloatpublish)producermessager7   r8   remove_propsr;   infor6   propsctypeencr9   r:   keys                 r"   r   r   2   s1     77 7 7%%D#1#OW-? 'D#+#3tJH)4)<$}%%+K%w'?3E ++mT22K<..J&0&<z"""$J  		#tH\$'' (!,+$5&)j  	    r!   c           	          |j         }|i n|}t          | ||                    |d                   |                    |d                              dS )zMigrate single task message.Nr7   r8   r7   r8   )r<   r   get)rA   body_rB   queuesrD   s        r"   r   r   P   sd     D>RRvFhzz$z"233 **T-%899; ; ; ; ; ;r!   c                       fd}|S )Nc                 8    r| d         vrd S  | |          S Ntaskr    )r;   rB   callbacktaskss     r"   r.   z!filter_callback.<locals>.filtered[   s0     	T&\..Fxg&&&r!   r    )rR   rS   r.   s   `` r"   filter_callbackrT   Y   s)    ' ' ' ' ' '
 Or!   c                     t          |          }t                    |j                            |d          t	          |          }fd}t          || |f|d|S )z)Migrate tasks from one broker to another.F)auto_declarerM   c                     | j                   }                    | j        | j                  |_        |j        | j        k    r%                    | j        |j                  |_        |j        j        | j        k    r*                    | j        | j                  |j        _        |                                 d S N)channelrK   namer8   r7   declare)queue	new_queuerA   rM   s     r"   on_declare_queuez'migrate_tasks.<locals>.on_declare_queuek   s    E(*++	EJ
;;	 EJ..$*JJuz/8/D%F %FI!"ej00&,jjUZ&H&HI#r!   )rM   r_   )r
   prepare_queuesamqpProducerr   r   )sourcedestmigrateapprM   kwargsr_   rA   s       `  @r"   r   r   c   s     

CF##Fx  E ::Hgx777G      VW EV)9E E=CE E Er!   c                 T    t          |t                    r| j        j        |         S |S rY   )
isinstancer'   ra   rM   )rf   qs     r"   _maybe_queuerk   y   s(    !S "xq!!Hr!   c	           
      F    t                    fd|pg D             pd}
                    |d          5 j                                      t	                       f	d}t          |fd|
i|	cddd           S # 1 swxY w Y   dS )aG	  Find tasks by filtering them and move the tasks to a new queue.

    Arguments:
        predicate (Callable): Filter function used to decide the messages
            to move.  Must accept the standard signature of ``(body, message)``
            used by Kombu consumer callbacks.  If the predicate wants the
            message to be moved it must return either:

                1) a tuple of ``(exchange, routing_key)``, or

                2) a :class:`~kombu.entity.Queue` instance, or

                3) any other true value means the specified
                    ``exchange`` and ``routing_key`` arguments will be used.
        connection (kombu.Connection): Custom connection to use.
        source: List[Union[str, kombu.Queue]]: Optional list of source
            queues to use instead of the default (queues
            in :setting:`task_queues`).  This list can also contain
            :class:`~kombu.entity.Queue` instances.
        exchange (str, kombu.Exchange): Default destination exchange.
        routing_key (str): Default destination routing key.
        limit (int): Limit number of messages to filter.
        callback (Callable): Callback called after message moved,
            with signature ``(state, body, message)``.
        transform (Callable): Optional function to transform the return
            value (destination) of the filter function.

    Also supports the same keyword arguments as :func:`start_filter`.

    To demonstrate, the :func:`move_task_by_id` operation can be implemented
    like this:

    .. code-block:: python

        def is_wanted_task(body, message):
            if body['id'] == wanted_id:
                return Queue('foo', exchange=Exchange('foo'),
                             routing_key='foo')

        move(is_wanted_task)

    or with a transform:

    .. code-block:: python

        def transform(value):
            if isinstance(value, str):
                return Queue(value, Exchange(value), value)
            return value

        move(is_wanted_task, transform=transform)

    Note:
        The predicate may also return a tuple of ``(exchange, routing_key)``
        to specify the destination to where the task should be moved,
        or a :class:`~kombu.entity.Queue` instance.
        Any other true value means that the task will be moved to the
        default exchange/routing_key.
    c                 0    g | ]}t          |          S r    )rk   ).0r]   rf   s     r"   
<listcomp>zmove.<locals>.<listcomp>   s#    AAA5l3&&AAAr!   NF)poolc                   	  	| |          }|rr |          }t          |t                    r)t          |j                   |j        j        |j        }}nt          |          \  }}t          
|||           |	                                 xj
        dz  c_
        r | |           rj
        k    rt                      d S d S d S )NrJ      )ri   r   r   default_channelr7   r[   r8   expand_destr   ackr.   r   )r;   rB   retexrkrR   connr7   limit	predicaterA   r8   state	transforms        r"   on_taskzmove.<locals>.on_task   s   )D'**C * )#)C..Cc5)) E!#t';<<< \.BB(hDDFB(G#%27 7 7 7!# 3HUD'222 *U^u44'//)!* ** *44r!   consume_from)r
   connection_or_acquirera   rb   r   r   )r{   
connectionr7   r8   rc   rf   rR   rz   r}   rg   rM   r~   ry   rA   r|   s   ` `` ````   @@@r"   r   r      sH   | 

CAAAAFLbAAAITF		"	":E	"	:	: Od8$$T**	* 	* 	* 	* 	* 	* 	* 	* 	* 	* 	* 	* 	*( CwNNVNvNN1O O O O O O O O O O O O O O O O O Os   ABBBc                 N    	 | \  }}n# t           t          f$ r ||}}Y nw xY w||fS rY   )	TypeError
ValueError)rv   r7   r8   rw   rx   s        r"   rt   rt      sH    'BBz" ' ' ';B'r6Ms      c                     |d         | k    S )z'Return true if task id equals task_id'.idr    )task_idr;   rB   s      r"   r   r      s    :  r!   c                     |d         | v S )z-Return true if task id is member of set ids'.r   r    )idsr;   rB   s      r"   r   r      s    :r!   c                     t          | t                    r|                     d          } t          | t                    rt	          d | D                       } | i } | S )N,c           
   3      K   | ]B}t          t          t          |                    d                     dd                    V  CdS ):N   )tupler   r   split)rn   rj   s     r"   	<genexpr>z!prepare_queues.<locals>.<genexpr>   s\       ' ' F5#6#6a@@AA ' ' ' ' ' 'r!   )ri   r'   r   listdictrW   s    r"   r`   r`      sq    &# #c""&$ ' ' '%' ' ' ' '~Mr!   c                   B    e Zd Z	 	 	 	 ddZd Zd Zd Zd Zd	 Zd
 Z	dS )FiltererN      ?Fc                 z    | _         | _        | _        | _        | _        | _        t          t          |          pg            _        t          |           _
        |	 _        |
 _        | _         fd|pt           j
                  D              _        |pt!                       _        | _        d S )Nc                 :    g | ]}t          j        |          S r    )rk   rf   )rn   rj   r)   s     r"   ro   z%Filterer.__init__.<locals>.<listcomp>	  s5     
 
 
 1%%
 
 
r!   )rf   ry   filterrz   timeoutack_messagessetr   rS   r`   rM   rR   foreverr_   r   r   r   r|   accept)r)   rf   ry   r   rz   r   r   rS   rM   rR   r   r_   r   r|   r   rg   s   `               r"   __init__zFilterer.__init__   s    
 	
(U++1r22
$V,,  0
 
 
 
!6T$+%6%6
 
 
 %egg
r!   c                    |                      |                                           5  	 t          | j        | j        | j                  D ]}n # t          j        $ r Y nt          $ r Y nw xY wd d d            n# 1 swxY w Y   | j        S )N)r   ignore_timeouts)	prepare_consumercreate_consumerr   ry   r   r   socketr   r|   )r)   _s     r"   startzFilterer.start  s    ""4#7#7#9#9:: 		 		"49+/<37<A A A  A  >       		 		 		 		 		 		 		 		 		 		 		 		 		 		 		 zs@   A;$AA;A,A; 	A,)A;+A,,A;;A?A?c                     | j         xj        dz  c_        | j        r#| j         j        | j        k    rt                      d S d S )Nrr   )r|   r/   rz   r   r)   r;   rB   s      r"   update_statezFilterer.update_state  sM    
A: 	"$**dj88//!	" 	"88r!   c                 .    |                                  d S rY   )ru   r   s      r"   ack_messagezFilterer.ack_message#  s    r!   c                 d    | j         j                            | j        | j        | j                  S )N)rM   r   )rf   ra   TaskConsumerry   r   r   r(   s    r"   r   zFilterer.create_consumer&  s3    x}))I$; * 
 
 	
r!   c                     | j         }| j        }| j        }| j        r?t	          || j                  }t	          || j                  }t	          || j                  }|                    |           |                    |           | j        r|                    | j                   | j        Kt          | j        | j	                  }| j        rt	          || j                  }|                    |           | 
                    |           |S rY   )r   r   r   rS   rT   register_callbackr   rR   r   r|   declare_queues)r)   consumerr   r   r   rR   s         r"   r   zFilterer.prepare_consumer-  s   (&: 	C$VTZ88F*<DDL)+tzBBK""6***""<000 	9&&t'7888=$t}dj99Hz A*8TZ@@&&x000H%%%r!   c                 (   |j         D ]}| j         r|j        | j         vr| j        |                     |           	  ||j                                      d          \  }}}|r| j        xj        |z  c_        u# | j        j        $ r Y w xY wd S )NT)passive)	rM   r[   r_   rZ   queue_declarer|   r&   ry   channel_errors)r)   r   r]   r   mcounts        r"   r   zFilterer.declare_queuesA  s    _ 	 	E{ uz<<$0%%e,,,$u$ &  &&3mDm&A&A 61 3J((F2((9+   	 	s   ?A==BBNr   FNNNFNNNN)
r   r   r   r   r   r   r   r   r   r   r    r!   r"   r   r      s         &)8<@D7;	   .  " " "
  
 
 
  (    r!   r   r   Fc                 \    t          | ||f|||||||	|
|||d|                                S )zFilter tasks.)rz   r   r   rS   rM   rR   r   r_   r   r|   r   )r   r   )rf   ry   r   rz   r   r   rS   rM   rR   r   r_   r   r|   r   rg   s                  r"   r   r   Q  s]    
 T6!)!    %''r!   c                      t          | |ifi |S )a  Find a task by id and move it to another queue.

    Arguments:
        task_id (str): Id of task to find and move.
        dest: (str, kombu.Queue): Destination queue.
        transform (Callable): Optional function to transform the return
            value (destination) of the filter function.
        **kwargs (Any): Also supports the same keyword
            arguments as :func:`move`.
    )r   )r   rd   rg   s      r"   r   r   f  s     '433F333r!   c                 F      fd}t          |fdt                     i|S )a  Move tasks by matching from a ``task_id: queue`` mapping.

    Where ``queue`` is a queue to move the task to.

    Example:
        >>> move_by_idmap({
        ...     '5bee6e82-f4ac-468e-bd3d-13e8600250bc': Queue('name'),
        ...     'ada8652d-aef3-466b-abd2-becdaf1b82b3': Queue('name'),
        ...     '3a2b140d-7db1-41ba-ac90-c36a0ef4ab1f': Queue('name')},
        ...   queues=['hipri'])
    c                 D                         |j        d                   S )Ncorrelation_id)rK   r=   r;   rB   maps     r"   task_id_in_mapz%move_by_idmap.<locals>.task_id_in_map  s    www)*:;<<<r!   rz   )r   len)r   rg   r   s   `  r"   r   r   t  s?    = = = = =
 99c#hh9&999r!   c                 (      fd}t          |fi |S )a  Move tasks by matching from a ``task_name: queue`` mapping.

    ``queue`` is the queue to move the task to.

    Example:
        >>> move_by_taskmap({
        ...     'tasks.add': Queue('name'),
        ...     'tasks.mul': Queue('name'),
        ... })
    c                 :                         | d                   S rP   )rK   r   s     r"   task_name_in_mapz)move_by_taskmap.<locals>.task_name_in_map  s    wwtF|$$$r!   )r   )r   rg   r   s   `  r"   r   r     s5    % % % % %  ++F+++r!   c                 H    t          t          j        d| |d|           d S )N)r|   r;   r    )printMOVING_PROGRESS_FMTformat)r|   r;   rB   rg   s       r"   filter_statusr     s/    	

$
F5t
F
Fv
F
FGGGGGr!   )r}   )NNNrY   )NNNNNNNNr   )-r   r   	functoolsr   	itertoolsr   r   kombur   r   kombu.commonr   kombu.utils.encodingr	   
celery.appr
   celery.utils.nodenamesr   celery.utils.textr   __all__r   	Exceptionr   r   r   r   rT   r   rk   r   rt   r   r   r`   r   r   r   r   r   r   r   r   move_direct_by_idmapmove_direct_by_taskmapr    r!   r"   <module>r      s   2 2        # # # # # # # # " " " " " " " " & & & & & & - - - - - - % % % % % % 0 0 0 0 0 0 ) ) ) ) ) ) 5 5 5 5 5I 5 5 5/ / / / / / / /& =A   <; ; ; ;   )5$E E E E,   AEEIXO XO XO XOv  ! ! !
  
  W W W W W W W Wt 9<8<@D7;   *4 4 4: : :(, , ,"H H H gdm444GO}EEE w}FFF  MJJJ   r!   