
    `f-/                         d Z ddlZddlZddlmZ ddlmZ ddlmZ ddl	m
Z
 ddlmZmZ dd	lmZ dd
lmZmZ dZdZ G d de          Zd Z G d de          Z G d dej        e          ZdS )zqThe ``RPC`` result backend for AMQP brokers.

RPC-style result backend, using reply-to and one queue per client.
    N)maybe_declare)register_after_fork)cached_property)states)current_tasktask_join_will_block   )base)AsyncBackendMixinBaseResultConsumer)BacklogLimitExceeded
RPCBackendz
The "rpc" result backend does not support chords!

Note that a group chained with a task is also upgraded to be a chord,
as this pattern requires synchronization.

Result backends that supports chords: Redis, Database, Memcached, and more.
c                       e Zd ZdZdS )r   z'Too much state history to fast-forward.N)__name__
__module____qualname____doc__     E/var/www/html/env/lib/python3.11/site-packages/celery/backends/rpc.pyr   r      s        1111r   r   c                 .    |                                   d S N)_after_fork)backends    r   _on_after_fork_cleanup_backendr   "   s    r   c                   \     e Zd Zej        ZdZdZ fdZd
dZddZ	d Z
d Zd Zd	 Z xZS )ResultConsumerNc                 \     t                      j        |i | | j        j        | _        d S r   )super__init__r   _create_bindingselfargskwargs	__class__s      r   r    zResultConsumer.__init__,   s2    $)&)))#|;r   Tc                    | j                                         | _        |                     |          }|                     | j        j        |g| j        g|| j                  | _        | j        	                                 d S )N)	callbacksno_ackaccept)
app
connection_connectionr!   Consumerdefault_channelon_state_changer*   	_consumerconsume)r#   initial_task_idr)   r%   initial_queues        r   startzResultConsumer.start0   s~    8..00,,_==,}o+,V; '     	     r   c                 z    | j         r| j                             |          S |rt          j        |           d S d S )N)timeout)r-   drain_eventstimesleep)r#   r7   s     r   r8   zResultConsumer.drain_events9   sO     	 #000AAA 	 Jw	  	 r   c                     	 | j                                          | j                                         d S # | j                                         w xY wr   )r1   cancelr-   closer#   s    r   stopzResultConsumer.stop?   sQ    	%N!!###""$$$$$D""$$$$s	   6 Ac                 f    d | _         | j        "| j                                         d | _        d S d S r   )r1   r-   collectr>   s    r   on_after_forkzResultConsumer.on_after_forkE   s=    '$$&&&#D ('r   c                    | j         |                     |          S |                     |          }| j                             |          s5| j                             |           | j                                          d S d S r   )r1   r5   r!   consuming_from	add_queuer2   )r#   task_idqueues      r   consume_fromzResultConsumer.consume_fromK   s    >!::g&&&$$W--~,,U33 	%N$$U+++N""$$$$$	% 	%r   c                 |    | j         r4| j                             |                     |          j                   d S d S r   )r1   cancel_by_queuer!   namer#   rF   s     r   
cancel_forzResultConsumer.cancel_forS   sF    > 	ON**4+?+?+H+H+MNNNNN	O 	Or   Tr   )r   r   r   kombur.   r-   r1   r    r5   r8   r?   rB   rH   rM   __classcell__r&   s   @r   r   r   &   s        ~HKI< < < < <! ! ! !       % % %$ $ $% % %O O O O O O Or   r   c                       e Zd ZdZej        Zej        ZeZeZdZ	dZ
dZdddddZ G d d	ej                  Z G d
 dej                  Z	 	 d+ fd	Zd Zd,dZd Zd Zd Zd Zd Zd Zd-dZ	 d.dZd Zd Zd/dZeZd Z	 d0dZd  Z d! Z!d" Z"d# Z#d$ Z$d-d%Z%d& Z&d1 fd(	Z'e(d)             Z)e*d*             Z+ xZ,S )2r   z&Base class for the RPC result backend.FT   r   r	   )max_retriesinterval_startinterval_stepinterval_maxc                       e Zd ZdZdZdS )RPCBackend.Consumerz4Consumer that requires manual declaration of queues.FN)r   r   r   r   auto_declarer   r   r   r.   rY   m   s        BBr   r.   c                       e Zd ZdZdZdS )RPCBackend.Queuez$Queue that never caches declaration.FN)r   r   r   r   can_cache_declarationr   r   r   Queuer\   r   s        22 %r   r^   Nc                     t                      j        |fi | | j        j        }	|| _        i | _        |                     |          | _        | j        rdnd| _        |p|	j	        }|p|	j
        }|                     ||| j                  | _        |p|	j        | _        || _        |                     | | j        | j        | j        | j                  | _        t*          t+          | t,                     d S d S )N   r	   )r   r    r+   confr-   _out_of_bandprepare_persistent
persistentdelivery_moderesult_exchangeresult_exchange_type_create_exchangeexchangeresult_serializer
serializerauto_deleter   r*   _pending_results_pending_messagesresult_consumerr   r   )r#   r+   r,   ri   exchange_typerd   rk   rl   r%   ra   r&   s             r   r    zRPCBackend.__init__w   s   '''''x}%11*=="&/8QQq3t3%B)B--mT%7
 
 %>(>&#22$(DK!4#9 
  
 *&DEEEEE +*r   c                 j    | j                                          | j                                         d S r   )rm   clearro   r   r>   s    r   r   zRPCBackend._after_fork   s2    ##%%%((*****r   directr`   c                 ,    |                      d           S r   )Exchange)r#   rK   typere   s       r   rh   zRPCBackend._create_exchange   s    }}T"""r   c                     | j         S )z$Create new binding for task with id.)bindingrL   s     r   r!   zRPCBackend._create_binding   s     |r   c                 N    t          t                                                    r   )NotImplementedErrorE_NO_CHORD_SUPPORTstripr>   s    r   ensure_chords_allowedz RPCBackend.ensure_chords_allowed   s    !"4":":"<"<===r   c                 x    t                      s+t          |                     |j                  d           d S d S )NT)retry)r   r   rx   channel)r#   producerrF   s      r   on_task_callzRPCBackend.on_task_call   sG    
 $%% 	F$,,x'788EEEEEE	F 	Fr   c                     	 |pt           j        }n # t          $ r t          d|          w xY w|j        |j        p|fS )zGet the destination for result by task id.

        Returns:
            Tuple[str, str]: tuple of ``(reply_to, correlation_id)``.
        z%RPC backend missing task request for )r   requestAttributeErrorRuntimeErrorreply_tocorrelation_id)r#   rF   r   s      r   destination_forzRPCBackend.destination_for   sn    	E5!5GG 	E 	E 	ECCCE E E	E !7!B7BBs    .c                     d S r   r   rL   s     r   on_reply_declarezRPCBackend.on_reply_declare   s	     	r   c                     d S r   r   )r#   results     r   on_result_fulfilledzRPCBackend.on_result_fulfilled   s	     	r   c                     dS )Nzrpc://r   )r#   include_passwords     r   as_urizRPCBackend.as_uri   s    xr   c                 r   |                      ||          \  }}|sdS | j        j        j                            d          5 }	|	                    |                     |||||          | j        ||| j        d| j	        | 
                    |          | j        	  	         ddd           n# 1 swxY w Y   |S )z!Send task return value and state.NTblock)ri   routing_keyr   rk   r   retry_policydeclarere   )r   r+   amqpproducer_poolacquirepublish
_to_resultri   rk   r   r   re   )
r#   rF   r   state	tracebackr   r%   r   r   r   s
             r   store_resultzRPCBackend.store_result   s    '+&:&:7G&L&L#^ 	FX](00t0<< 
		7KK'-?):--g66"0  	 	 	
	 
	 
	 
	 
	 
	 
	 
	 
	 
	 
	 
	 
	 
	 
	 s   AB,,B03B0c                 `    |||                      ||          ||                     |          dS )N)rF   statusr   r   children)encode_resultcurrent_task_children)r#   rF   r   r   r   r   s         r   r   zRPCBackend._to_result   s>    ((77"227;;
 
 	
r   c                 \    | j         r| j                             |           || j        |<   d S r   )ro   on_out_of_band_resultrb   )r#   rF   messages      r   r   z RPCBackend.on_out_of_band_result   s:    
  	@ 66w???%,'"""r     c                 |   | j                             |d           }|r|                     ||          S i }d }|                     || j        |          D ]J}|                     |          }|                    |          |c}||<   |r|                                 d }K|                    |d           }|                                D ]\  }}	| 	                    ||	           |r*|
                                 |                     ||          S 	 | j        |         S # t          $ r t          j        d dcY S w xY w)N)r   r   )rb   pop_set_cache_by_message_slurp_from_queuer*   _get_message_task_idgetackitemsr   requeue_cacheKeyErrorr   PENDING)
r#   rF   backlog_limitbufferedlatest_by_idprevacctidlatestmsgs
             r   get_task_metazRPCBackend.get_task_meta   s{   $(($77 	A--gx@@@ ))'4;NN 	 	C++C00C&2&6&6s&;&;S#D,s#  


!!'400$**,, 	1 	1HC&&sC0000 		BNN--gv>>>B{7++ B B B"(.DAAAAABs   D D;:D;c                 N    |                      |j                  x}| j        |<   |S r   )meta_from_decodedpayloadr   )r#   rF   r   r   s       r   r   z RPCBackend._set_cache_by_message	  s/    )-)?)?O* * 	$+g&r   c              #   t  K   | j         j                            d          5 \  }} |                     |          |          }|                                 t          |          D ]!}|                    ||          }|s n|V  "|                     |          	 d d d            d S # 1 swxY w Y   d S )NTr   )r*   r)   )r+   poolacquire_channelr!   r   ranger   r   )	r#   rF   r*   limitr)   _r   rx   r   s	            r   r   zRPCBackend._slurp_from_queue  s     X]***66 
	9,1g3d**733G<<GOO5\\ 9 9kkk?? E				//888 
	9 
	9 
	9 
	9 
	9 
	9 
	9 
	9 
	9 
	9 
	9 
	9 
	9 
	9 
	9 
	9 
	9 
	9s   A<B--B14B1c                 f    	 |j         d         S # t          t          f$ r |j        d         cY S w xY w)Nr   rF   )
propertiesr   r   r   )r#   r   s     r   r   zRPCBackend._get_message_task_id  sK    	. %&677) 	. 	. 	.?9----	.s    00c                     d S r   r   )r#   r   s     r   revivezRPCBackend.revive%  s    r   c                      t          d          )Nz4reload_task_result is not supported by this backend.rz   rL   s     r   reload_task_resultzRPCBackend.reload_task_result(  s    !BD D 	Dr   c                      t          d          )z<Reload group result, even if it has been previously fetched.z5reload_group_result is not supported by this backend.r   rL   s     r   reload_group_resultzRPCBackend.reload_group_result,  s    !CE E 	Er   c                      t          d          )Nz,save_group is not supported by this backend.r   )r#   group_idr   s      r   
save_groupzRPCBackend.save_group1  s    !:< < 	<r   c                      t          d          )Nz/restore_group is not supported by this backend.r   )r#   r   caches      r   restore_groupzRPCBackend.restore_group5  s    !=? ? 	?r   c                      t          d          )Nz.delete_group is not supported by this backend.r   )r#   r   s     r   delete_groupzRPCBackend.delete_group9  s    !<> > 	>r   r   c                     |si n|}t                                          |t          || j        | j        j        | j        j        | j        | j        | j	        | j
                            S )N)r,   ri   rp   rd   rk   rl   expires)r   
__reduce__dictr-   ri   rK   rv   rd   rk   rl   r   r"   s      r   r   zRPCBackend.__reduce__=  sn    !-vww!!$']'-,(L	)
 	)
 	)
 	 	 		r   c                 `    |                      | j        | j        | j        dd| j                  S )NFT)durablerl   r   )r^   oidri   r   r>   s    r   rx   zRPCBackend.bindingJ  s5    zzHdmTXL	  
 
 	
r   c                     | j         j        S r   )r+   
thread_oidr>   s    r   r   zRPCBackend.oidS  s     x""r   )NNNNNT)rs   r`   rN   )NN)r   )r   F)r   N)-r   r   r   r   rO   ru   Producerr   r   rd   supports_autoexpiresupports_native_joinr   r.   r^   r    r   rh   r!   r}   r   r   r   r   r   r   r   r   r   pollr   r   r   r   r   r   r   r   r   r   propertyrx   r   r   rP   rQ   s   @r   r   r   X   s       00~H~H#N 0J 	 L    5>   
& & & & & & & &
 KO?CF F F F F F,+ + +
# # # #  
> > >F F FC C C    
    .2   &
 
 
- - -B B B B> D   .39 9 9 9. . .  D D DE E E
< < <? ? ? ?> > >      
 
 X
 # # _# # # # #r   r   )r   r9   rO   kombu.commonr   kombu.utils.compatr   kombu.utils.objectsr   celeryr   celery._stater   r    r
   asynchronousr   r   __all__r{   	Exceptionr   r   r   Backendr   r   r   r   <module>r      sj      & & & & & & 2 2 2 2 2 2 / / / / / /       < < < < < < < <       ? ? ? ? ? ? ? ?
0 2 2 2 2 29 2 2 2  /O /O /O /O /O' /O /O /Od~# ~# ~# ~# ~#0 ~# ~# ~# ~# ~#r   