
    `fE(                        d Z ddlZddlZddlZddlmZ ddlmZ ddlmZ ddl	m
Z
 ddlmZ ddlmZ dd	lmZ dd
lmZ dZi Zd Z ed           G d d                      Z G d de          Z ed           G d de                      Z ed           G d de                      Z G d d          Z G d d          ZdS )z$Async I/O backend support utilities.    N)deque)Empty)sleep)WeakKeyDictionary)detect_environment)states)TimeoutError)THREAD_TIMEOUT_MAX)AsyncBackendMixinBaseResultConsumerDrainerregister_drainerc                       fd}|S )z5Decorator used to register a new result drainer type.c                     | t           <   | S N)drainers)clsnames    N/var/www/html/env/lib/python3.11/site-packages/celery/backends/asynchronous.py_innerz register_drainer.<locals>._inner   s    
     )r   r   s   ` r   r   r      s#         Mr   defaultc                   4    e Zd ZdZd Zd Zd Zd	dZd
dZdS )r   zResult draining service.c                     || _         d S r   )result_consumer)selfr   s     r   __init__zDrainer.__init__$   s    .r   c                     d S r   r   r   s    r   startzDrainer.start'       r   c                     d S r   r   r    s    r   stopzDrainer.stop*   r"   r   N   c              #   6  K   |p| j         j        }t          j                    }	 |r-t          j                    |z
  |k    rt	          j                    	 |                     |||          V  n# t          j        $ r Y nw xY w|r
 |             |j        rd S vNr%   timeout)r   drain_eventstime	monotonicsocketr)   wait_forready)r   pr)   intervalon_intervalwait
time_starts          r   drain_events_untilzDrainer.drain_events_until-   s      8t+8^%%
	 '4>++j8GCCn&&&mmAtXm>>>>>>>    w 	s   A0 0BBc                      ||           d S Nr(   r   r   r0   r3   r)   s       r   r.   zDrainer.wait_for>   s    Wr   )Nr%   NNr   )	__name__
__module____qualname____doc__r   r!   r$   r5   r.   r   r   r   r   r       so        ""/ / /       "     r   r   c                   P     e Zd ZdZdZdZd Zd Z fdZd Z	d Z
d Zd	dZ xZS )
greenletDrainerNc                     dS )z,create new self._drain_complete_event objectNr   r    s    r   _create_drain_complete_eventz,greenletDrainer._create_drain_complete_eventG       r   c                     dS )z5raise self._drain_complete_event for wakeup .wait_forNr   r    s    r   _send_drain_complete_eventz*greenletDrainer._send_drain_complete_eventK   rA   r   c                      t                      j        |i | t          j                    | _        t          j                    | _        t          j                    | _        |                                  d S r   )superr   	threadingEvent_started_stopped	_shutdownr@   )r   argskwargs	__class__s      r   r   zgreenletDrainer.__init__O   sd    $)&)))!))!))"**))+++++r   c                    | j                                          | j                                        ss	 | j                            d           |                                  |                                  n# t          j	        $ r Y nw xY w| j                                        s| j
                                         d S r'   )rH   setrI   is_setr   r*   rC   r@   r-   r)   rJ   r    s    r   runzgreenletDrainer.runV   s    -&&(( 	$11!1<<<//111113333>    -&&(( 	 	s   AA8 8B
	B
c                     | j                                         s:|                     | j                  | _        | j                                          d S d S r   )rH   rP   spawnrQ   _gr3   r    s    r   r!   zgreenletDrainer.starta   sQ    }##%% 	!jj**DGM     	! 	!r   c                     | j                                          |                                  | j                            t
                     d S r   )rI   rO   rC   rJ   r3   r
   r    s    r   r$   zgreenletDrainer.stopf   sE    '')))./////r   c                 v    |                                   |j        s| j                            |           d S d S r7   )r!   r/   _drain_complete_eventr3   r8   s       r   r.   zgreenletDrainer.wait_fork   sC    

w 	=&++G+<<<<<	= 	=r   r   )r9   r:   r;   rS   rT   rW   r@   rC   r   rQ   r!   r$   r.   __classcell__)rM   s   @r   r>   r>   B   s        E	B     , , , , ,	 	 	! ! !
0 0 0
= = = = = = = =r   r>   eventletc                        e Zd Zd Zd Zd ZdS )eventletDrainerc                 B    ddl m}m}  ||          } |d           |S )Nr   )r   rS   )rY   r   rS   )r   funcr   rS   gs        r   rS   zeventletDrainer.spawnt   s9    ))))))))E$KKar   c                 0    ddl m}  |            | _        d S Nr   )rG   )eventlet.eventrG   rW   r   rG   s     r   r@   z,eventletDrainer._create_drain_complete_eventz   s(    ((((((%*UWW"""r   c                 8    | j                                          d S r   )rW   sendr    s    r   rC   z*eventletDrainer._send_drain_complete_event~   s    "'')))))r   Nr9   r:   r;   rS   r@   rC   r   r   r   r[   r[   q   sA          - - -* * * * *r   r[   geventc                        e Zd Zd Zd Zd ZdS )geventDrainerc                 b    dd l }|                    |          }|                    d           |S )Nr   )rf   rS   r   )r   r]   rf   r^   s       r   rS   zgeventDrainer.spawn   s0    LLQr   c                 0    ddl m}  |            | _        d S r`   )gevent.eventrG   rW   rb   s     r   r@   z*geventDrainer._create_drain_complete_event   s(    &&&&&&%*UWW"""r   c                 `    | j                                          |                                  d S r   )rW   rO   r@   r    s    r   rC   z(geventDrainer._send_drain_complete_event   s/    "&&((())+++++r   Nre   r   r   r   rh   rh      sA          - - -, , , , ,r   rh   c                   z    e Zd ZdZd ZddZddZd ZddZdd	Z	d
 Z
d Zd Z	 ddZ	 ddZed             ZdS )r   z.Mixin for backends that enables the async API.c                 $    || j         j        |<   d S r   )r   buckets)r   resultbuckets      r   _collect_intozAsyncBackendMixin._collect_into   s    /5$V,,,r   Tc              +   D  K   |                                   |j        }|st                      t                      }|D ][}t	          |d          s|                    |           (|j        r|                    |           E|                     ||           \ | j        |fd|i|D ]K}|rG|	                                }t	          |d          s|j
        |j        fV  n|j
        |j        fV  |GL|r(|	                                }|j
        |j        fV  |&d S d S )N_cacheno_ack)_ensure_not_eagerresultsStopIterationr   hasattrappendrt   rr   _wait_for_pendingpopleftidchildren)r   rp   ru   rL   rw   rq   node_s           r   iter_nativezAsyncBackendMixin.iter_native   s        . 	"//!  	1 	1D4** 1d#### 1d####""40000''HHvHHH 	/ 	/A /~~''tX.. /'4=00000'4;....  /  	'>>##D'4;&&&&  	' 	' 	' 	' 	'r   Fc                     |r| j         j                                         	 |                     |           n-# t          $ r  |                     |j        ||           Y nw xY w|S )N)weak)r   drainerr!   _maybe_resolve_from_bufferr   _add_pending_resultr}   )r   rp   r   start_drainers       r   add_pending_resultz$AsyncBackendMixin.add_pending_result   s     	1 (..000	C++F3333 	C 	C 	C$$VYT$BBBBB	Cs   8 'A"!A"c                 j    |                     | j                            |j                             d S r   )_maybe_set_cache_pending_messagestaker}   r   rp   s     r   r   z,AsyncBackendMixin._maybe_resolve_from_buffer   s/     6 ; ;FI F FGGGGGr   c                     | j         \  }}||vr.|j        |vr'||r|n||<   | j                            |           d S d S d S r   )_pending_resultsr}   r   consume_from)r   task_idrp   r   concreteweak_s         r   r   z%AsyncBackendMixin._add_pending_result   sc    /%%FIX$=$=5;d(UU'2 --g66666  $=$=r   c                 `      j         j                                          fd|D             S )Nc                 @    g | ]}                     |d           S )F)r   r   )r   ).0rp   r   r   s     r   
<listcomp>z9AsyncBackendMixin.add_pending_results.<locals>.<listcomp>   s>     ' ' ' ''T'OO ' ' 'r   )r   r   r!   )r   rw   r   s   ` `r   add_pending_resultsz%AsyncBackendMixin.add_pending_results   sL    $**,,,' ' ' ' '%' ' ' 	'r   c                 d    |                      |j                   |                     |           |S r   )_remove_pending_resultr}   on_result_fulfilledr   s     r   remove_pending_resultz'AsyncBackendMixin.remove_pending_result   s1    ##FI...  (((r   c                 F    | j         D ]}|                    |d            d S r   )r   popr   r   mappings      r   r   z(AsyncBackendMixin._remove_pending_result   s5    , 	' 	'GKK&&&&	' 	'r   c                 D    | j                             |j                   d S r   )r   
cancel_forr}   r   s     r   r   z%AsyncBackendMixin.on_result_fulfilled   s!    ''	22222r   Nc                 z    |                                    | j        |fi |D ]}|                    ||          S )N)callback	propagate)rv   r{   maybe_throw)r   rp   r   r   rL   r   s         r   wait_for_pendingz"AsyncBackendMixin.wait_for_pending   sT       ''99&99 	 	A!!8y!IIIr   c                 0     | j         j        |f|||d|S )N)r)   r2   
on_message)r   r{   )r   rp   r)   r2   r   rL   s         r   r{   z#AsyncBackendMixin._wait_for_pending   s=     6t#5
##

 
 
 
 	
r   c                     dS NTr   r    s    r   is_asynczAsyncBackendMixin.is_async   s    tr   )T)FT)Fr   NNN)r9   r:   r;   r<   rr   r   r   r   r   r   r   r   r   r   r{   propertyr   r   r   r   r   r      s       886 6 6' ' ' ':   H H H7 7 7 7' ' ' '
  
' ' '3 3 3 37J J J J FJ
 
 
 
   X  r   r   c                   p    e Zd ZdZd Zd Zd ZddZd Zd Z	d	 Z
d
 ZddZ	 ddZddZd Zd Zd ZdS )r   z2Manager responsible for consuming result messages.c                     || _         || _        || _        || _        || _        d | _        t                      | _        t          t                               |           | _
        d S r   )backendappacceptr   r   r   r   ro   r   r   r   )r   r   r   r   pending_resultspending_messagess         r   r   zBaseResultConsumer.__init__   sZ     /!1(** 2 4 45d;;r   c                     t                      r   NotImplementedError)r   initial_task_idrL   s      r   r!   zBaseResultConsumer.start       !###r   c                     d S r   r   r    s    r   r$   zBaseResultConsumer.stop   r"   r   Nc                     t                      r   r   )r   r)   s     r   r*   zBaseResultConsumer.drain_events  r   r   c                     t                      r   r   r   r   s     r   r   zBaseResultConsumer.consume_from  r   r   c                     t                      r   r   r   s     r   r   zBaseResultConsumer.cancel_for	  r   r   c                     | j                                          t                      | _         d | _        |                                  d S r   )ro   clearr   r   on_after_forkr    s    r   _after_forkzBaseResultConsumer._after_fork  sB    (**r   c                     d S r   r   r    s    r   r   z BaseResultConsumer.on_after_fork  r"   r   c                 <    | j                             |||          S )Nr)   r2   )r   r5   )r   r0   r)   r2   s       r   r5   z%BaseResultConsumer.drain_events_until  s(    |..wK / 9 9 	9r   c              +     K    | j         |fd|i| | j        |c}| _        	 |                     |j        ||          D ]}d V  t	          d           n"# t
          j        $ r t          d          w xY w	 || _        d S # || _        w xY w)Nr)   r   r   zThe operation timed out.)on_wait_for_pendingr   r5   on_readyr   r-   r)   r	   )r   rp   r)   r2   r   rL   	prev_on_mr   s           r   r{   z$BaseResultConsumer._wait_for_pending  s       	! CCCFCCC%)_j"	4?		(,,OW + - - -   a	
 ~ 	; 	; 	;9:::	; (DOOOiDO''''s   3A B A66B 	Bc                     d S r   r   )r   rp   r)   rL   s       r   r   z&BaseResultConsumer.on_wait_for_pending)  r"   r   c                 <    |                      |j        |           d S r   )on_state_changepayload)r   messages     r   on_out_of_band_resultz(BaseResultConsumer.on_out_of_band_result,  s     W_g66666r   c                 h    | j         D ]}	 ||         c S # t          $ r Y w xY wt          |          r   )r   KeyErrorr   s      r   _get_pending_resultz&BaseResultConsumer._get_pending_result/  sX    , 	 	Gw''''   ws   
""c                    | j         r|                      |           |d         t          j        v r|d         }	 |                     |          }|                    |           | j        }	 |                    |          }|                    |           n;# t          $ r Y n/w xY w# t          $ r | j	        
                    ||           Y nw xY wt          d           d S )Nstatusr   r   )r   r   READY_STATESr   r   ro   r   rz   r   r   putr   )r   metar   r   rp   ro   rq   s          r   r   z"BaseResultConsumer.on_state_change7  s   ? 	"OOD!!!>V0009oG*11':: ''---,*$[[00F
 MM&))))	     D  : : : &**7D99999: 	as#   B' ,B 
B$#B$'%CCr   )NNr   )r9   r:   r;   r<   r   r!   r$   r*   r   r   r   r   r5   r{   r   r   r   r   r   r   r   r   r      s       <<	< 	< 	<$ $ $  $ $ $ $$ $ $$ $ $    9 9 9 9
 FJ( ( ( (    7 7 7         r   r   )r<   r-   rF   r+   collectionsr   queuer   r   weakrefr   kombu.utils.compatr   celeryr   celery.exceptionsr	   celery.utils.threadsr
   __all__r   r   r   r>   r[   rh   r   r   r   r   r   <module>r      s3   * *                         % % % % % % 1 1 1 1 1 1       * * * * * * 3 3 3 3 3 3
    )       B,= ,= ,= ,= ,=g ,= ,= ,=^ ** * * * *o * * *  (, , , , ,O , , ,"X X X X X X X Xv^ ^ ^ ^ ^ ^ ^ ^ ^ ^r   