
    ^f!                       d Z ddlmZ ddlZddlZddlmZ ddlmZ ddl	m
Z
 ddlmZ ddlmZ dd	lmZmZ dd
lmZ ddlmZ 	 ddlZn# e$ r dZY nw xY w ed          ZdZdZ G d dej                  Z G d dej                  ZdS )a  Etcd Transport module for Kombu.

It uses Etcd as a store to transport messages in Queues

It uses python-etcd for talking to Etcd's HTTP API

Features
========
* Type: Virtual
* Supports Direct: *Unreviewed*
* Supports Topic: *Unreviewed*
* Supports Fanout: *Unreviewed*
* Supports Priority: *Unreviewed*
* Supports TTL: *Unreviewed*

Connection String
=================

Connection string has the following format:

.. code-block::

    'etcd'://SERVER:PORT

    )annotationsN)defaultdict)contextmanager)Empty)ChannelError)
get_logger)dumpsloads)cached_property   )virtualzkombu.transport.etcdiK	  	localhostc                       e Zd ZdZdZdZdZdZdZ fdZ	d Z
ed             Zd	 Zd
 Zd Zd ZddZd Zd Zed             Z xZS )Channelz+Etcd Channel class which talks to the Etcd.kombuN
      c                   t           t          d           t                      j        |i | | j        j        j        p| j        j        }| j        j        j        pt          }t                              d||| j                   t          t                    | _        t          j        |t#          |                    | _        d S )NMissing python-etcd libraryzHost: %s Port: %s Timeout: %shostport)etcdImportErrorsuper__init__
connectionclientr   default_porthostnameDEFAULT_HOSTloggerdebugtimeoutr   dictqueuesClientint)selfargskwargsr   r   	__class__s        F/var/www/html/env/lib/python3.11/site-packages/kombu/transport/etcd.pyr   zChannel.__init__>   s    <;<<<$)&)))%*Jdo.J%.>,4dD$,OOO!$''kt#d))<<<    c                    | j          d| S )zCreate and return the `queue` with the proper prefix.

        Arguments:
        ---------
            queue (str): The name of the queue.
        /)prefix)r)   queues     r-   _key_prefixzChannel._key_prefixM   s     +'''''r.   c              #    K   t          j        | j        |          }| j        |_        t
                              d|j                    |                    d| j	                   	 dV  t
                              d|j                    |
                                 dS # t
                              d|j                    |
                                 w xY w)ay  Try to acquire a lock on the Queue.

        It does so by creating a object called 'lock' which is locked by the
        current session..

        This way other nodes are not able to write to the lock object which
        means that they have to wait before the lock is released.

        Arguments:
        ---------
            queue (str): The name of the queue.
        zAcquiring lock T)blockinglock_ttlNzReleasing lock )r   Lockr   
lock_value_uuidr"   r#   nameacquirer6   release)r)   r2   locks      r-   _queue_lockzChannel._queue_lockV   s       ye,,_
2ty22333dT];;;	EEELL64966777LLNNNNN LL64966777LLNNNNs   (B$ $8Cc                   || j         |<   |                     |          5  	 | j                            |                     |          dd          cddd           S # t
          j        $ r[ t                              d| d           | j        	                    |                     |                    cY cddd           S w xY w# 1 swxY w Y   dS )zCreate a new `queue` if the `queue` doesn't already exist.

        Arguments:
        ---------
            queue (str): The name of the queue.
        TN)keydirvaluezQueue "z" already existsr@   )
r&   r>   r   writer3   r   EtcdNotFiler"   r#   read)r)   r2   _s      r-   
_new_queuezChannel._new_queuen   so    #Ee$$ 	E 	EE{((((//T ) G G	E 	E 	E 	E 	E 	E 	E 	E # E E E>u>>>???{''D,<,<U,C,C'DDDD	E 	E 	E 	E 	E 	E 	E 	EE		E 	E 	E 	E 	E 	E 	E 	E 	E 	Es/   C/AAC9CCCCCc                    	 | j                             |                     |                     dS # t          j        $ r Y dS w xY w)zVerify that queue exists.

        Returns
        -------
            bool: Should return :const:`True` if the queue exists
                or :const:`False` otherwise.
        TF)r   rF   r3   r   EtcdKeyNotFound)r)   r2   r+   s      r-   
_has_queuezChannel._has_queue~   sU    	KT--e445554# 	 	 	55	s   -1 AAc                f    | j                             |d           |                     |           dS )zpDelete a `queue`.

        Arguments:
        ---------
            queue (str): The name of the queue.
        N)r&   pop_purge)r)   r2   r*   rG   s       r-   _deletezChannel._delete   s2     	t$$$Er.   c                   |                      |          5  |                     |          }| j                            |t	          |          d          st          d|d          	 ddd           dS # 1 swxY w Y   dS )zPut `message` onto `queue`.

        This simply writes a key to the Etcd store

        Arguments:
        ---------
            queue (str): The name of the queue.
            payload (dict): Message data which will be dumped to etcd.
        T)r@   rB   appendzCannot add key z to etcdN)r>   r3   r   rD   r	   r   )r)   r2   payloadrG   r@   s        r-   _putzChannel._put   s     e$$ 	F 	F""5))C;$$.. % ! ! F ##DS#D#D#DEEE	F	F 	F 	F 	F 	F 	F 	F 	F 	F 	F 	F 	F 	F 	F 	F 	F 	F 	Fs   AA77A;>A;c                   |                      |          5  |                     |          }t                              d|| j                   	 | j                            |d| j        | j                  }|t                      |j	        d         }t                              d
                    |d                              t          |d                   }| j                            |d         	           |cddd           S # t          t          t          j        f$ r7}t                              d
t#          |           d|            Y d}~nd}~ww xY wt                      # 1 swxY w Y   dS )a[  Get the first available message from the queue.

        Before it does so it acquires a lock on the store so
        only one node reads at the same time. This is for read consistency

        Arguments:
        ---------
            queue (str): The name of the queue.
            timeout (int): Optional seconds to wait for a response.
        zFetching key %s with index %sT)r@   	recursiveindexr$   NzRemoving key {}r@   rB   rC   z_get failed: :)r>   r3   r"   r#   rV   r   rF   r$   r   	_childrenformatr
   delete	TypeError
IndexErrorr   EtcdExceptiontype)r)   r2   r$   r@   resultitemmsg_contenterrors           r-   _getzChannel._get   s    e$$ 	 	""5))CLL8#tzJJJD))t*dl * < < >''M'+.55d5kBBCCC#DM22""tE{"333"#	 	 	 	 	 	 	 	$ z4+=> D D DBT%[[BB5BBCCCCCCCCD ''M+	 	 	 	 	 	 	 	 	 	s6   7E.B/D

E&-EE.EE..E25E2c                    |                      |          5  |                     |          }t                              d|            | j                            |d          cddd           S # 1 swxY w Y   dS )zRemove all `message`s from a `queue`.

        Arguments:
        ---------
            queue (str): The name of the queue.
        zPurging queue at key T)r@   rU   N)r>   r3   r"   r#   r   r[   )r)   r2   r@   s      r-   rN   zChannel._purge   s     e$$ 	? 	?""5))CLL666777;%%#%>>	? 	? 	? 	? 	? 	? 	? 	? 	? 	? 	? 	? 	? 	? 	? 	? 	? 	?s   AA11A58A5c                   |                      |          5  d}	 |                     |          }t                              d|| j                   | j                            |d| j                  }t          |j                  }n# t          $ r Y nw xY wt                              d||| j                   |cddd           S # 1 swxY w Y   dS )z~Return the size of the `queue`.

        Arguments:
        ---------
            queue (str): The name of the queue.
        r   z)Fetching key recursively %s with index %sT)r@   rU   rV   z$Found %s keys under %s with index %sN)
r>   r3   r"   r#   rV   r   rF   lenrY   r\   )r)   r2   sizer@   r`   s        r-   _sizezChannel._size   s:    e$$ 	 	D	&&u--H $*. . .))t* * & & 6+,,    LL?sDJ0 0 0	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	s5   CA,BC
BCB&CCCc                T    t          j                     dt          j                     S )N.)socketgethostnameosgetpid)r)   s    r-   r8   zChannel.lock_value   s%    $&&66666r.   )N)__name__
__module____qualname____doc__r1   rV   r$   session_ttlr6   r   r3   r   r>   rH   rK   rO   rS   rd   rN   ri   r   r8   __classcell__r,   s   @r-   r   r   5   s       55FEGKH= = = = =( ( (   ^.E E E     F F F$       D
? 
? 
?  0 7 7 _7 7 7 7 7r.   r   c                       e Zd ZdZeZeZdZdZdZ	e
j        j                             edg                    Zer*e
j        j        ej        fz   Ze
j        j        ej        fz   Z fdZd Zd	 Z xZS )
	Transportz!Etcd storage Transport for Kombu.r   python-etcd   direct)exchange_typec                f    t           t          d           t                      j        |i | dS )z(Create a new instance of etcd.Transport.Nr   )r   r   r   r   )r)   r*   r+   r,   s      r-   r   zTransport.__init__	  s9    <;<<<$)&)))))r.   c                    |j         j        p| j        }|j         j        pt          }t
                              d||           	 t          j        |t          |                     dS # t          $ r Y nw xY wdS )zVerify the connection works.zVerify Etcd connection to %s:%sr   TF)r   r   r   r    r!   r"   r#   r   r'   r(   
ValueError)r)   r   r   r   s       r-   verify_connectionzTransport.verify_connection  s     %:): )9\6dCCC	KTD		22224 	 	 	D	 us   #A) )
A65A6c                   	 ddl }|j        j                                        D ]4}|                    d          r|                    d          d         c S 5dS # t
          t          f$ r t                              d           Y dS w xY w)zReturn the version of the etcd library.

        .. note::
           python-etcd has no __version__. This is a workaround.
        r   Nry   z==r   z'Unable to find the python-etcd version.Unknown)	pip.commands.freezecommandsfreeze
startswithsplitr   r]   r"   warning)r)   pipxs      r-   driver_versionzTransport.driver_version  s    	&&&&\(//11 , ,<<.. ,774==++++,, , Z( 	 	 	NNDEEE99	s   AA A +B
	B
)rp   rq   rr   rs   r   DEFAULT_PORTr   driver_typedriver_namepolling_intervalr   rx   
implementsextend	frozensetr   connection_errorsr^   channel_errorsr   r   r   ru   rv   s   @r-   rx   rx      s        ++GLKK"-44i
++ 5 - -J  
/43E2HH 	
 ,0B/EE 	* * * * *        r.   rx   )rs   
__future__r   rn   rl   collectionsr   
contextlibr   r2   r   kombu.exceptionsr   	kombu.logr   kombu.utils.jsonr	   r
   kombu.utils.objectsr    r   r   r   r"   r   r!   r   rx    r.   r-   <module>r      s   4 # " " " " " 				  # # # # # # % % % % % %       ) ) ) ) ) )             ) ) ) ) ) ) ) ) / / / / / /      KKKK   DDD 
*	+	+{7 {7 {7 {7 {7go {7 {7 {7|9 9 9 9 9! 9 9 9 9 9s   A	 	AA