
    ^f=                    $   d Z ddlmZ ddlZddlmZ ddlZddlmZmZm	Z	 ddl
mZ ddlmZ ddlmZ dd	lmZ dd
lmZmZ ddlmZ ddlmZ ddlmZ ddlmZ dZdZ G d d          Z G d dej                   Z  G d dej!                  Z!dS )a  MongoDB transport module for kombu.

Features
========
* Type: Virtual
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: Yes
* Supports Priority: Yes
* Supports TTL: Yes

Connection String
=================
 *Unreviewed*

Transport Options
=================

* ``connect_timeout``,
* ``ssl``,
* ``ttl``,
* ``capped_queue_size``,
* ``default_hostname``,
* ``default_port``,
* ``default_database``,
* ``messages_collection``,
* ``routing_collection``,
* ``broadcast_collection``,
* ``queues_collection``,
* ``calc_queue_size``,
    )annotationsN)Empty)MongoClienterrors
uri_parser)
CursorType)VersionMismatch)_detect_environment)bytes_to_str)dumpsloads)cached_property)maybe_sanitize_url   )virtualto_rabbitmq_queue_argumentsz3Kombu requires MongoDB version 1.3+ (server is {0})zKKombu requires MongoDB version 2.2+ (server is {0}) for TTL indexes supportc                  <    e Zd ZdZd Zd Zd Zd
dZd Zd Z	e	Z
d	S )BroadcastCursorzCursor for broadcast queues.c                N    || _         d| _        |                     d           d S )Nr   F)rewind)_cursor_offsetpurge)selfcursors     I/var/www/html/env/lib/python3.11/site-packages/kombu/transport/mongodb.py__init__zBroadcastCursor.__init__C   s*    

%
         c                P    | j         j                            i           | j        z
  S N)r   
collectioncount_documentsr   r   s    r   get_sizezBroadcastCursor.get_sizeH   s"    |&66r::T\IIr   c                8    | j                                          d S r!   )r   closer$   s    r   r'   zBroadcastCursor.closeK   s    r   Tc                    |r| j                                          | j         j                            i           | _        | j                             | j                  | _         d S r!   )r   r   r"   r#   r   skip)r   r   s     r   r   zBroadcastCursor.purgeN   sW     	"L!!! |.>>rBB|((66r   c                    | S r!    r$   s    r   __iter__zBroadcastCursor.__iter__V   s    r   c                    	 	 t          | j                  }nG# t          j        j        $ r0}dt          |          v r|                                  Y d }~U d }~ww xY w| xj        dz  c_        |S )NTznot valid at serverr   )nextr   pymongor   OperationFailurestrr   r   )r   msgexcs      r   __next__zBroadcastCursor.__next__Y   s    	4<((  >2    )CHH44JJLLLHHHH 	
s    A%AAAN)T)__name__
__module____qualname____doc__r   r%   r'   r   r,   r4   r.   r+   r   r   r   r   @   s}        &&! ! !
J J J  7 7 7 7    & DDDr   r   c                  v    e Zd ZdZdZi ZdZdZdZdZ	dZ
dZdZdZd	Zd
ZdZdZej        j        dz   Z fdZd Zd Z fdZd Zd Zd Zd Zd Z fdZd+dZd Z d Z!d+dZ"d Z#d Z$d Z%e&d              Z'e&d!             Z(e&d"             Z)e&d#             Z*e&d$             Z+d% Z,d& Z-d' Z.d( Z/d) Z0d* Z1 xZ2S ),ChannelzMongoDB Channel.TFNi z	127.0.0.1ii  kombu_defaultmessageszmessages.routingzmessages.broadcastzmessages.queues)connect_timeoutsslttlcapped_queue_sizedefault_hostnamedefault_portdefault_databasemessages_collectionrouting_collectionbroadcast_collectionqueues_collectioncalc_queue_sizec                V     t                      j        |i | i | _        | j         d S r!   )superr   _broadcast_cursorsclient)r   vargskwargs	__class__s      r   r   zChannel.__init__   s4    %*6***"$ 	r   c           
         | j         r;| j                            d|id|||                     |d          did           d S d S )N_id$set	x-expires)rQ   options	expire_atTupsert)r?   queues
update_one_get_queue_expire)r   queuerN   s      r   
_new_queuezChannel._new_queue   s{    8 	K""$#)%)%;%;"K& &   #     	 	r   c                r   || j         v r6	 t          |                     |                    }n=# t          $ r d }Y n/w xY w| j                            d|idt          j        fg          }| j        r| 	                    |           |t                      t          t          |d                             S )Nr[   priority)sortpayload)_fanout_queuesr.   _get_broadcast_cursorStopIterationr<   find_one_and_deleter/   	ASCENDINGr?   _update_queues_expirer   r   r   )r   r[   r2   s      r   _getzChannel._get   s    D'''455e<<==     -33% !7#456 4  C
 8 	.&&u---;''M\#i.11222s   ". ==c                    | j         s!t                                          |          S || j        v r'|                     |                                          S | j                            d|i          S Nr[   )rH   rJ   _sizera   rb   r%   r<   r#   )r   r[   rO   s     r   rj   zChannel._size   so     # 	(77=='''D'''--e44==???},,gu-=>>>r   c                *   t          |          ||                     |d          d}| j        rI|                     |d          |d<   |                     |          }||d         ||d         k     r||d<   | j                            |           d S )NT)reverse)r`   r[   r^   zx-message-ttlrU   )r   _get_message_priorityr?   rZ   _get_message_expirer<   
insert_one)r   r[   messagerN   data
msg_expires         r   _putzChannel._put   s    W~~227D2II
 
 8 	/ $ 6 6uo N ND11'::J%[!)Z${:K-K-K$.[!  &&&&&r   c                Z    | j                             t          |          |d           d S )N)r`   r[   )	broadcastro   r   )r   exchangerp   routing_keyrN   s        r   _put_fanoutzChannel._put_fanout   s;    !!eGnn+3#5 #5 	6 	6 	6 	6 	6r   c                    |                      |          }|| j        v r(|                     |                                           n| j                            d|i           |S ri   )rj   ra   rb   r   r<   delete_many)r   r[   sizes      r   _purgezChannel._purge   sd    zz%  D'''&&u--335555M%%w&6777r   c                    t          | j        j        |         d                   }| j                            d|i          }|t          d |D                       z  S )Ntablerv   c              3  D   K   | ]}|d          |d         |d         fV  dS )rw   patternr[   Nr+   ).0rs     r   	<genexpr>z$Channel.get_table.<locals>.<genexpr>   sJ       '
 '
 }q|QwZ8'
 '
 '
 '
 '
 '
r   )	frozensetstate	exchangesroutingfind)r   rv   localRoutesbrokerRoutess       r   	get_tablezChannel.get_table   st    
 4X >w GHH|(("
 
 Y '
 '
!'
 '
 '
 
 
 
 	
r   c                :   |                      |          j        dk    r"|                     ||||           || j        |<   ||||d}|                                }| j        r|                     |d          |d<   | j                            |d|id           d S )Nfanout)rv   r[   rw   r   rS   rU   rR   TrV   )	typeoftype_create_broadcast_cursorra   copyr?   rZ   r   rY   )r   rv   rw   r   r[   lookuprq   s          r   _queue_bindzChannel._queue_bind   s    ;;x  %11))+w7 7 7)1D& !&	
 
 {{}}8 	K $ 6 6uk J JDtDDDDDr   c                   | j                             d|i           | j        r| j                            d|i            t                      j        |fi | || j        v r\	 | j        	                    |          }|
                                 | j        	                    |           d S # t          $ r Y d S w xY wd S )Nr[   rQ   )r   rz   r?   rX   
delete_onerJ   queue_deletera   rK   popr'   KeyError)r   r[   rN   r   rO   s       r   r   zChannel.queue_delete  s      '5!12228 	3K""E5>222U--f---D'''/044U;; #''.....     ('s   %B/ /
B=<B=
mongodb://c                   | j         j        }|j        }|                    d          rd}d|z   }|                    |          s||z   }|t	          |          d          s
|| j        z  }|j        rEd|vrA|                    d          \  }}|j        }|j        r|d|j        z   z  }|dz   |z   dz   |z   }|j	        r|j	        n| j
        }t          j        ||          }|d         p|j        }	|	dv r| j        }	d	| j        | j        rt#          | j        d
z            nd d}
|
                    |d                    |                     |
          }
d|
v r|
                    d           ||	|
fS )Nzsrv://zmongodb+srv://zmongodb+@z://:database)/NTi  )auto_start_requestr>   connectTimeoutMSrT   tlsr>   )
connectionrL   hostname
startswithlenrA   useridsplitpasswordportrB   r   	parse_urivirtual_hostrC   r>   r=   intupdate_prepare_client_optionsr   )r   schemerL   r   headtailcredentialsr   parseddbnamerT   s              r   
_parse_urizChannel._parse_uri  s    '?x(( 	-%F!H,H""6** 	)(HF% 	.--H= 	?S00!..JD$ -K 5sV_44e|k1C7$>H$k@v{{t/@%h55
#:v':[  *F #'8$($8"CT%9D%@!A!A!A>B	
 
 	vi()))..w77GKK((r   c                    t           j        dk    r`|                    dd            t          |                    d          t
                    r"t           j        j        }||d                  |d<   |S )N   r   readpreference)r/   version_tupler   
isinstancegetr   read_preferences_MONGOS_MODES)r   rT   modess      r   r   zChannel._prepare_client_optionsI  sk     D((KK,d333'++&677== M0>,1':J2K,L()r   c                    t          |fi |S r!   r   )r   	argumentsrN   s      r   prepare_queue_argumentszChannel.prepare_queue_argumentsQ  s    *9?????r   c                   |                      |          \  }}}||d<   t                      }|dk    rddlm} |                                 n|dk    rddlm}  |             t          di |}||         }	|                                d         }
|
	                    d	          d         }
t          t          t          |
	                    d
                              }|dk     r't          t                              |
                    | j        r-|dk     r't          t"                              |
                    |	S )N)r   hostgeventr   )monkeyeventlet)monkey_patchversion-.)r   r   )   r   r+   )r   r
   r   r   	patch_allr   r   r   server_infor   tuplemapr   r	   E_SERVER_VERSIONformatr?   E_NO_TTL_INDEXES)r   r   r   r   confenvr   r   	mongoconnr   version_strr   s               r   _openzChannel._openT  sW   !%!?!?&$V!##(??%%%%%%J------LNNN''$''	V$++--i8!'',,Q/C!2!23!7!78899V!"2"9"9+"F"FGGGX 	H'F**!"2"9"9+"F"FGGGr   c                    | j         |                                v rdS |                    | j         | j        d           dS )z0Create capped collection for broadcast messages.NT)r{   capped)rF   list_collection_namescreate_collectionr@   r   r   s     r   _create_broadcastzChannel._create_broadcasto  sV    $(F(F(H(HHHF""4#<(,(>*. 	# 	0 	0 	0 	0 	0r   c                   || j                  }|                    g dd           || j                                     dg           || j                 }|                    ddg           | j        rU|                    dgd           |                    dgd           || j                                     dgd           d	S d	S )
zEnsure indexes on collections.)r[   r   )r^   r   )rQ   r   T)
backgroundr   )rv   r   )rU   r   r   )expireAfterSecondsN)rD   create_indexrF   rE   r?   rG   )r   r   r<   r   s       r   _ensure_indexeszChannel._ensure_indexesx  s   D45777D 	 	
 	
 	
 	*+88,HHH423lO<===8 	:!!#3"4!KKK  "2!3 JJJT+,99!"q : : : : : :		: 	:r   c                    |                                  }|                     |           |                     |           |S )zActually creates connection.)r   r   r   r   s     r   _create_clientzChannel._create_client  s<    ::<<x(((X&&&r   c                *    |                                  S r!   )r   r$   s    r   rL   zChannel.client  s    ""$$$r   c                &    | j         | j                 S r!   )rL   rD   r$   s    r   r<   zChannel.messages  s    {4344r   c                &    | j         | j                 S r!   )rL   rE   r$   s    r   r   zChannel.routing  s    {4233r   c                &    | j         | j                 S r!   )rL   rF   r$   s    r   ru   zChannel.broadcast  s    {4455r   c                &    | j         | j                 S r!   )rL   rG   r$   s    r   rX   zChannel.queues  s    {4122r   c                    	 | j         |         S # t          $ r& |                     | j        |         d d |          cY S w xY wr!   )rK   r   r   ra   )r   r[   s     r   rb   zChannel._get_broadcast_cursor  se    	*511 	 	 	 00#E*D$    		s    -??c                    t           j        dk    rd|it          j        d}nd|idd} | j        j        di |}t          |          x}| j        |<   |S )Nr   r[   )filtercursor_typeT)querytailabler+   )r/   r   r   TAILABLEru   r   r   rK   )r   rv   rw   r   r[   r   r   rets           r   r   z Channel._create_broadcast_cursor  s     E))"H-)2 EE "8,  E
 %$--u--/>v/F/FFd%e,
r   c                    |                     di                                d          }|7|                                 t          j        t	          |                    z   S d S )N
properties
expirationmilliseconds)r   get_nowdatetime	timedeltar   )r   rp   values      r   rn   zChannel._get_message_expire  sX    L"--11,??<<>>H$6CJJ$O$O$OOO r   c                &   t          |t                    r)| j                            d|i          }|sdS |d         }n|}	 |d         |         }n# t          t
          f$ r Y dS w xY w|                                 t          j        |          z   S )zGet expiration header named `argument` of queue definition.

        Note:
        ----
            `queue` must be either queue name or options itself.
        rQ   NrT   r   r   )	r   r1   rX   find_oner   	TypeErrorr   r   r   )r   r[   argumentdocrq   r   s         r   rZ   zChannel._get_queue_expire  s     eS!! 	+&&u~66C y>DDD	%h/EE)$ 	 	 	FF	 ||~~ 2 F F FFFs   A A&%A&c                    |                      |d          }|sdS | j                            d|idd|ii           | j                            d|idd|ii           dS )z,Update expiration field on queues documents.rS   Nr[   rR   rU   rQ   )rZ   r   update_manyrX   )r   r[   rU   s      r   rf   zChannel._update_queues_expire  s    **5+>>	 	F  evY'?@	B 	B 	BENVk9%=>	@ 	@ 	@ 	@ 	@r   c                >    t           j                                         S )zReturn current time in UTC.)r   utcnowr$   s    r   r   zChannel.get_now  s     '')))r   )r   )3r5   r6   r7   r8   supports_fanoutra   r>   r?   r=   r@   rH   rA   rB   rC   rD   rE   rF   rG   r   r:   from_transport_optionsr   r\   rg   rj   rs   rx   r|   r   r   r   r   r   r   r   r   r   r   r   rL   r<   r   ru   rX   rb   r   rn   rZ   rf   r   __classcell__)rO   s   @r   r:   r:   o   s       O N C
COO"L&$+/)%oD H       3 3 3(	? 	? 	? 	? 	?' ' '"6 6 6  	
 	
 	
E E E(/ / / / /(.) .) .) .)`  @ @ @   60 0 0: : :&   % % _% 5 5 _5 4 4 _4 6 6 _6 3 3 _3	 	 	   P P P
G G G0
@ 
@ 
@* * * * * * *r   r:   c                      e Zd ZdZeZdZdZej        Zej	        j
        ej        fz   Z
ej	        j        ej        ej        fz   ZdZdZej	        j                             eg d                    Zd ZdddZdS )	TransportzMongoDB Transport.Tr   mongodbr/   )directtopicr   )exchange_typec                    t           j        S r!   )r/   r   r$   s    r   driver_versionzTransport.driver_version  s
    r   F**urir1   returnc                    |sdS |r|S d|vrt          |          S |                    dd          \  }}d                    t          |          |g          S )Nr   ,r   )r   r   join)r   r  include_passwordmaskuri1	remainders         r   as_urizTransport.as_uri
  sj     	 < 	Jc>>%c***))C++ixx+D119=>>>r   N)Fr  )r  r1   r  r1   )r5   r6   r7   r8   r:   can_parse_urlpolling_intervalrB   r   r
  connection_errorsr   ConnectionFailurechannel_errorsr0   driver_typedriver_name
implementsextendr   r  r  r+   r   r   r
  r
    s        GM'L+v/G.II  	($#,% 	% 
 KK"-44i = = =>> 5  J  
? 
? 
? 
? 
? 
? 
?r   r
  )"r8   
__future__r   r   r[   r   r/   r   r   r   pymongo.cursorr   kombu.exceptionsr	   kombu.utils.compatr
   kombu.utils.encodingr   kombu.utils.jsonr   r   kombu.utils.objectsr   kombu.utils.urlr    r   baser   r   r   r   r:   r
  r+   r   r   <module>r/     s   @ # " " " " "         3 3 3 3 3 3 3 3 3 3 % % % % % % , , , , , , 2 2 2 2 2 2 - - - - - - ) ) ) ) ) ) ) ) / / / / / / . . . . . .       - - - - - -  
, , , , , , , ,^~* ~* ~* ~* ~*go ~* ~* ~*B$? $? $? $? $?! $? $? $? $? $?r   