
    `f.#                         d Z ddlZddlmZ ddlmZ ddlmZ ddlm	Z	 	 ddl
Z
ddlZ
ddlZ
ddlZ
n# e$ r dZ
Y nw xY wdZ ee          Zd	Zd
ZdZdZdZdZdZdZd Z G d de	          ZdS )z@Apache Cassandra result store backend using the DataStax driver.    N)states)ImproperlyConfigured)
get_logger   )BaseBackend)CassandraBackendz
You need to install the cassandra-driver library to
use the Cassandra backend.  See https://github.com/datastax/python-driver
z
CASSANDRA_AUTH_PROVIDER you provided is not a valid auth_provider class.
See https://datastax.github.io/python-driver/api/cassandra/auth.html.
z(Cassandra backend improperly configured.z!Cassandra backend not configured.z
INSERT INTO {table} (
    task_id, status, result, date_done, traceback, children) VALUES (
        %s, %s, %s, %s, %s, %s) {expires};
z]
SELECT status, result, date_done, traceback, children
FROM {table}
WHERE task_id=%s
LIMIT 1
z
CREATE TABLE {table} (
    task_id text,
    status text,
    result blob,
    date_done timestamp,
    traceback blob,
    children blob,
    PRIMARY KEY ((task_id), date_done)
) WITH CLUSTERING ORDER BY (date_done DESC);
z
    USING TTL {0}
c                 "    t          | d          S )Nutf8)bytes)xs    K/var/www/html/env/lib/python3.11/site-packages/celery/backends/cassandra.pybuf_tr   C   s    F    c                   `     e Zd ZdZdZdZdZ	 	 d fd	ZddZ	 ddZ	dd	Z
d
 Zd fd	Z xZS )r   aG  Cassandra/AstraDB backend utilizing DataStax driver.

    Raises:
        celery.exceptions.ImproperlyConfigured:
            if module :pypi:`cassandra-driver` is not available,
            or not-exactly-one of the :setting:`cassandra_servers` and
            the :setting:`cassandra_secure_bundle_path` settings is set.
    NTR#  c                 h    t                      j        di | t          st          t                    | j        j        }|p|                    dd           | _        |p|                    dd           | _	        |p|                    dd           | _
        |p|                    dd           | _        |p|                    dd           | _        |                    di           | _        | j        p| j	        }	|	r| j        r| j        st          t                    | j        r| j	        rt          t                    |p|                    dd           }
|
t                               |
          nd| _        |                    d	          pd
}|                    d          pd
}t'          t          j        |t          j        j                  | _        t'          t          j        |t          j        j                  | _        d | _        |                    dd           }|                    dd           }|r@|r>t'          t          j        |d           }|st          t4                     |di || _        d | _        d | _        d | _        d | _        t?          j                     | _!        d S )Ncassandra_serverscassandra_secure_bundle_pathcassandra_portcassandra_keyspacecassandra_tablecassandra_optionscassandra_entry_ttl cassandra_read_consistencyLOCAL_QUORUMcassandra_write_consistencycassandra_auth_providercassandra_auth_kwargs )"super__init__	cassandrar   E_NO_CASSANDRAappconfgetserversbundle_pathportkeyspacetabler   E_CASSANDRA_NOT_CONFIGUREDE_CASSANDRA_MISCONFIGURED	Q_EXPIRESformat
cqlexpiresgetattrConsistencyLevelr   read_consistencywrite_consistencyauth_providerauth!E_NO_SUCH_CASSANDRA_AUTH_PROVIDER_cluster_session_write_stmt
_read_stmt	threadingRLock_lock)selfr(   r+   r,   	entry_ttlr*   r)   kwargsr&   db_directionsexpires	read_cons
write_consr6   auth_kwargsauth_provider_class	__class__s                   r   r"   zCassandraBackend.__init__X   s   ""6""" 	7&~666x}E$((+>"E"E& 2$((*D+2 +2<DHH%5t<<	 HDHH-A4$H$H?dhh'8$??
!%*=r!B!B 8(8 	CDM 	C 	C&'ABBB< 	BD, 	B&'@AAADtxx(=tDD *1)<IW%%%" 	 HH9::Ln	XX;<<N
 '&	&3!5 !5 ")&
&3"5 "5 "!:DAAhh6== 	D[ 	D")).-"N"N& N*+LMMM!4!4!C!C{!C!CD_&&


r   Fc                    | j         dS | j                                         	 | j         	 | j                                         dS | j        r4t          j        j        | j        f| j        | j	        d| j
        | _        n/t          j        j        dd| j        i| j	        d| j
        | _        | j                            | j                  | _         t
          j                            t"                              | j        | j                            | _        | j        | j        _        t
          j                            t0                              | j                            | _        | j        | j        _        |rzt
          j                            t6                              | j                            }| j        |_        	 | j                             |           n# t
          j        $ r Y nw xY wnB# t
          j        $ r0 | j        | j                                         d| _        d| _          w xY w| j                                         dS # | j                                         w xY w)zjPrepare the connection for action.

        Arguments:
            write (bool): are we a writer?
        N)r*   r6   secure_connect_bundle)cloudr6   )r,   rD   )r,   r    ) r:   r?   acquirereleaser(   r#   clusterClusterr*   r6   r   r9   r)   connectr+   querySimpleStatementQ_INSERT_RESULTr0   r,   r1   r;   r5   consistency_levelQ_SELECT_RESULTr<   r4   Q_CREATE_RESULT_TABLEexecuteAlreadyExistsOperationTimedOutshutdown)r@   write	make_stmts      r   _get_connectionz CassandraBackend._get_connection   s    =$F
=	!}(v J     s | . ) 1 9L!.'+y"&"4!. !. ,!. !. !* 1 9 !./1A #'"4	!. !.
 ,!. !. !M11$-@@DM  )>>&&*do ' ? ?   D 261GD.'o==&&TZ&88 DO 150EDO-  &O;;)00tz0BB 	 /3.D	+M)))4444 .   D * 	 	 	 }(&&((( DM DM	 J     DJ    sH   G> FG> G( 'G> (G:7G> 9G::G> =I >?H==I I6c                    |                      d           | j                            | j        ||t	          |                     |                    | j                                        t	          |                     |                    t	          |                     |                     |                              f           dS )z1Store return value and state of an executed task.T)r\   N)	r^   r:   rX   r;   r   encoder%   nowcurrent_task_children)r@   task_idresultstate	tracebackrequestrB   s          r   _store_resultzCassandraBackend._store_result   s     	4(((d.$++f%%&&HLLNN$++i(())$++d88AABBCC1
 	 	 	 	 	r   c                     dS )Nzcassandra://r    )r@   include_passwords     r   as_urizCassandraBackend.as_uri   s    ~r   c           
      l   |                                   | j                            | j        |f                                          }|st
          j        ddS |\  }}}}}|                     |||                     |          ||                     |          |                     |          d          S )z$Get task meta-data for a task by id.N)statusrd   )rc   rm   rd   	date_donerf   children)	r^   r:   rX   r<   oner   PENDINGmeta_from_decodeddecode)r@   rc   resrm   rd   rn   rf   ro   s           r   _get_task_meta_forz#CassandraBackend._get_task_meta_for   s    m##DOg[AAEEGG 	>$n===9<6	9h%%kk&))"Y//H--'
 '
   	r   r    c                     |si n|}|                     | j        | j        | j        d           t	                                          ||          S )N)r(   r+   r,   )updater(   r+   r,   r!   
__reduce__)r@   argsrB   rI   s      r   rx   zCassandraBackend.__reduce__   s\    !-vj" "	# 	# 	# ww!!$///r   )NNNNr   N)F)NN)T)r    N)__name__
__module____qualname____doc__r(   r)   supports_autoexpirer"   r^   rh   rk   ru   rx   __classcell__)rI   s   @r   r   r   G   s          GKJN(,4' 4' 4' 4' 4' 4'lF! F! F! F!R /3        &0 0 0 0 0 0 0 0 0 0r   r   )r}   r=   celeryr   celery.exceptionsr   celery.utils.logr   baser   r#   cassandra.authcassandra.clustercassandra.queryImportError__all__rz   loggerr$   r8   r.   r-   rT   rV   rW   r/   r   r   r    r   r   <module>r      sY   F F           2 2 2 2 2 2 ' ' ' ' ' '         III  	H		
% !
 G @ 
 	
  y0 y0 y0 y0 y0{ y0 y0 y0 y0 y0s   1 ;;