
    `f                         d Z ddlmZ ddlmZ ddlmZ ddlmZ ddl	m
Z
 ddlmZ ddlmZmZ dd	lmZ dd
lmZ ddlmZ ddlmZ dZ ee          Zej        ej        cZZ G d dej                  ZdS )z)Worker <-> Worker communication Bootstep.    )defaultdict)partial)heappush)
itemgetter)Consumer)	DummyLock)ContentDisallowedDecodeError)	bootsteps)
get_logger)Bunch   )Mingle)Gossipc            	            e Zd ZdZd ZefZ eddddddd          Zd	d
hZ		 	 d fd	Z
d ZddZd Zd Z fdZd Zd Zd Zd Zd Zd Zd Zd Zd Z xZS ) r   zfBootstep consuming events from other workers.

    This keeps the logical clock value up to date.
    idclockhostnamepidtopicactioncveramqpredisF      @       @c                 t   | o|                      |j                  | _        |j        | _        | |_        |j        j        j        | _        |j        | _        d                    | j        t          |j	                  g          | _
        t          t                      t                      t                                | _        |j        | _        | j        r\|j        j                            | j        | j        d          | _        |j        rt)                      |_        | j        j        | _        || _        || _        d | _        t7          t8                    | _        i | _        | j        | j         d| _!        |j        j"        | _"        d| j#        i| _$         tK                      j&        |fi | d S )N.)	node_join
node_leave	node_lostr   )on_node_joinon_node_leavemax_tasks_in_memory)zworker.electzworker.elect.acktask)'compatible_transportappenabledgossipeventsReceiverr   joinstrr   full_hostnamer   setontimerStater"   r#   statehubr   _mutexeventupdate_stateintervalheartbeat_interval_trefr   listconsensus_requestsconsensus_replieson_electon_elect_ackevent_handlersr   	call_taskelection_handlerssuper__init__)selfcwithout_gossipr8   r9   kwargs	__class__s         O/var/www/html/env/lib/python3.11/site-packages/celery/worker/consumer/gossip.pyrD   zGossip.__init__$   s   ))Nd.G.G.N.N5-
 XXt}c!%jj&ABBeeuuee
 
 
 W
< 	1++!."0$% ,  DJ
 u '$;; $
 0D "4
"-d"3"3!# M $ 1
 
 U[
 DN"
 	%%f%%%%%    c                     |                                 5 }|j        j        | j        v cd d d            S # 1 swxY w Y   d S N)connection_for_read	transportdriver_typecompatible_transports)rE   r'   conns      rJ   r&   zGossip.compatible_transportM   s    $$&& 	L$>-1KK	L 	L 	L 	L 	L 	L 	L 	L 	L 	L 	L 	L 	L 	L 	L 	L 	L 	Ls   599Nc                 X    g | j         |<   | j                            d|||d           d S )Nzworker-electr   )r   r   r   r   )r=   
dispatchersend)rE   r   r   r   s       rJ   electionzGossip.electionQ   sD    %'r"vA 	 	
 	
 	
 	
 	
rK   c                     	 | j                             |                                           d S # t          $ r&}t                              d|           Y d }~d S d }~ww xY w)NzCould not call task: %r)r'   	signatureapply_async	Exceptionlogger	exception)rE   r%   excs      rJ   rA   zGossip.call_taskX   sw    	=Ht$$0022222 	= 	= 	=6<<<<<<<<<	=s   ,0 
A AA c                 (   	 |                      |          \  }}}}}}}n2# t          $ r%}	t                              d|	          cY d }	~	S d }	~	ww xY wt	          | j        |         || d| ||f           | j                            d|           d S )Nz!election request missing field %sr   zworker-elect-ack)r   )_cons_stamp_fieldsKeyErrorr[   r\   r   r<   rT   rU   )
rE   r6   id_r   r   r   r   r   _r]   s
             rJ   r>   zGossip.on_elect^   s    	N!%!8!8!?!?S%3FAA 	N 	N 	N##$GMMMMMMMM	N#C(x''#''7	
 	
 	
 	/C88888s     
AA
A
Ac                 b    t                                          |           |j        | _        d S rM   )rC   startevent_dispatcherrT   )rE   rF   rI   s     rJ   rd   zGossip.startj   s'    a,rK   c                    |d         }	 | j         |         }n# t          $ r Y d S w xY wt          | j                                                  }|                    |d                    t          |          t          |          k    r| j                            | j	        |                   \  }}}}|| j
        k    rUt          d|           	 | j        |         }	 |	|           n<# t          $ r t                              d|           Y nw xY wt          d||           | j	                            |d            | j                             |d            d S d S )Nr   r   zI won the election %rzUnknown election topic %rznode %s elected for %r)r=   r`   r/   r3   alive_workersappendlenr   	sort_heapr<   r.   inforB   r[   r\   pop)
rE   r6   r   repliesrg   rb   leaderr   r   handlers
             rJ   r?   zGossip.on_elect_ackn   s   4[	,R0GG 	 	 	FF	DJ446677uZ()))w<<3}----'+z';';'+( ($Avuf +++,b111$"4U;G GFOOOO   I I I$$%@%HHHHHI
 -vr:::#''D111"&&r400000 .-s    
&&C) )%DDc                 p    t          d|j                   |                     | j        j        |           d S )Nz%s joined the party)debugr   _call_handlersr0   r   rE   workers     rJ   r"   zGossip.on_node_join   s6    #V_555DG-v66666rK   c                 p    t          d|j                   |                     | j        j        |           d S )Nz%s left)rq   r   rr   r0   r    rs   s     rJ   r#   zGossip.on_node_leave   s5    i)))DG.77777rK   c                 p    t          d|j                   |                     | j        j        |           d S )Nzmissed heartbeat from %s)rk   r   rr   r0   r!   rs   s     rJ   on_node_lostzGossip.on_node_lost   s6    '999DG-v66666rK   c                     |D ]>}	  ||i | # t           $ r&}t                              d||           Y d }~7d }~ww xY wd S )Nz!Ignored error from handler %r: %r)rZ   r[   r\   )rE   handlersargsrH   ro   r]   s         rJ   rr   zGossip._call_handlers   s     	G 	GGG((((( G G G  7#G G G G G G G GG	G 	Gs   
?:?c                     | j         | j                                          | j                            | j        | j                  | _         d S rM   )r:   cancelr1   call_repeatedlyr8   periodic)rE   s    rJ   register_timerzGossip.register_timer   s?    :!JZ//t}MM


rK   c                 
   | j         j        }t                      }|                                D ]3}|j        s*|                    |           |                     |           4|D ]}|                    |j        d            d S rM   )	r3   workersr/   valuesaliveaddrw   rl   r   )rE   r   dirtyrt   s       rJ   r~   zGossip.periodic   s    *$nn&& 	* 	*F< *		&!!!!!&))) 	/ 	/FKK....	/ 	/rK   c                     |                                   |                     |d| j                  }t          ||j        gt          | j        |j                  |j        d          gS )Nzworker.#)routing_key	queue_ttlT)queues
on_messageacceptno_ack)	r   r+   r9   r   queuer   r   event_from_messager   )rE   channelevs      rJ   get_consumerszGossip.get_consumers   sv    ]]7
%)%<  > >H:t0EFF9
 
 
  	rK   c                 *   |j         d         }|                    dd          d         dk    rd S 	 | j        |         } ||j                  S # t          $ r Y nw xY w|j                            d          p|j        d         }|| j        k    rj	  ||j                  \  }}|                     |           d S # t          t          t          f$ r%}t                              |           Y d }~d S d }~ww xY w| j                                         d S )Nr   r   r   r   r%   r   )delivery_infosplitr@   payloadr`   headersgetr   r7   r
   r	   	TypeErrorr[   errorr   forward)	rE   preparemessage_typero   r   rb   r6   r]   s	            rJ   r   zGossip.on_message   sM   %m4 ;;sAq!V++F	,)%0G 77?+++  	 	 	D	 O''
33 0OJ/ 	t}$$""77?335!!%(((((!2I> " " "S!!!!!!!!!" J     s)   A 
AA(B; ;C7C22C7)Fr   r   rM   )__name__
__module____qualname____doc__labelr   requiresr   r_   rQ   rD   r&   rV   rA   r>   rd   r?   r"   r#   rw   rr   r   r~   r   r   __classcell__)rI   s   @rJ   r   r      st        
 EyH#gz5'8V  $W-).25'& '& '& '& '& '&RL L L
 
 
 
= = =
9 
9 
9- - - - -1 1 147 7 78 8 87 7 7G G GN N N
/ / /
 
 
! ! ! ! ! ! !rK   r   N)r   collectionsr   	functoolsr   heapqr   operatorr   kombur   kombu.asynchronous.semaphorer   kombu.exceptionsr	   r
   celeryr   celery.utils.logr   celery.utils.objectsr   mingler   __all__r   r[   rq   rk   ConsumerStepr    rK   rJ   <module>r      s;   / / # # # # # #                         2 2 2 2 2 2 ; ; ; ; ; ; ; ;       ' ' ' ' ' ' & & & & & &      
	H		lFKtw! w! w! w! w!Y# w! w! w! w! w!rK   