
    YfW&                       d Z ddlmZ ddlmZ ddlmZ ddlmZ ddl	m
Z
 ddlmZ ddlmZmZmZ dd	lmZ  G d
 de          Z ee           G d d                      Z G d de          Z G d de          ZdS )z"
Tests for L{twisted.test.iosim}.
    )annotations)Literal)implementer)IPushProducer)Protocol)Clock)FakeTransportconnectconnectedServerAndClient)TestCasec                  *    e Zd ZdZddZddZddZdS )	FakeTransportTestsz%
    Tests for L{FakeTransport}.
    returnNonec                6   t          t                      d          }t          t                      d          }|                     |j        t                     |                     |j        t                     |                     |j        |j                   dS )ze
        Each L{FakeTransport} receives a serial number that uniquely identifies
        it.
        TFN)r	   objectassertIsInstanceserialintassertNotEqual)selfabs      I/var/www/html/env/lib/python3.11/site-packages/twisted/test/test_iosim.pytest_connectionSerialz(FakeTransportTests.test_connectionSerial   sz    
 &((D))&((E**ah,,,ah,,,AHah/////    c                    t          t                      d          }|                    d           |                    g d           |                     d                    |j                  d           dS )zl
        L{FakeTransport.writeSequence} will write a sequence of L{bytes} to the
        transport.
        F   a)   b   c   dr   s   abcdN)r	   r   writewriteSequenceassertEqualjoinstreamr   r   s     r   test_writeSequencez%FakeTransportTests.test_writeSequence$   si    
 &((E**		***+++!(++W55555r   c                   t          t                      d          }|                    d           |                                 |                    d           |                     d                    |j                  d           dS )z
        L{FakeTransport.write} will accept writes after transport was closed,
        but the data will be silently discarded.
        Fs   befores   afterr   N)r	   r   r"   loseConnectionr$   r%   r&   r'   s     r   test_writeAfterClosez'FakeTransportTests.test_writeAfterClose0   sw    
 &((E**				!(++Y77777r   Nr   r   )__name__
__module____qualname____doc__r   r(   r+    r   r   r   r      sZ         	0 	0 	0 	0
6 
6 
6 
6
8 
8 
8 
8 
8 
8r   r   c                  .    e Zd ZdZdZd	dZd	dZd	dZdS )
StrictPushProducerz
    An L{IPushProducer} implementation which produces nothing but enforces
    preconditions on its state transition methods.
    runningr   r   c                H    | j         dk    rt          d          d| _         d S )Nstoppedz)Cannot stop already-stopped IPushProducer_state
ValueErrorr   s    r   stopProducingz StrictPushProducer.stopProducingF   s)    ;)##HIIIr   c                Z    | j         dk    rt          d| j          d          d| _         d S )Nr4   zCannot pause  IPushProducerpausedr7   r:   s    r   pauseProducingz!StrictPushProducer.pauseProducingK   s6    ;)##HT[HHHIIIr   c                Z    | j         dk    rt          d| j          d          d| _         d S )Nr>   zCannot resume r=   r4   r7   r:   s    r   resumeProducingz"StrictPushProducer.resumeProducingP   s6    ;(""IdkIIIJJJr   Nr,   )r-   r.   r/   r0   r8   r;   r?   rA   r1   r   r   r3   r3   =   sa         
 F       
   
           r   r3   c                      e Zd ZdZddZddZddZddZdd
ZddZ	ddZ
ddZddZddZddZddZddZddZddZddZddZddZddZdS )StrictPushProducerTestsz*
    Tests for L{StrictPushProducer}.
    r   r3   c                    t                      S )zp
        @return: A new L{StrictPushProducer} which has not been through any state
            changes.
        )r3   r:   s    r   _initialz StrictPushProducerTests._initial[   s    
 "###r   c                J    t                      }|                                 |S )z@
        @return: A new, stopped L{StrictPushProducer}.
        )r3   r;   r   producers     r   _stoppedz StrictPushProducerTests._stoppedb   s&     &''   r   c                J    t                      }|                                 |S )z?
        @return: A new, paused L{StrictPushProducer}.
        )r3   r?   rG   s     r   _pausedzStrictPushProducerTests._pausedj   s&     &''!!!r   c                r    t                      }|                                 |                                 |S )zY
        @return: A new L{StrictPushProducer} which has been paused and resumed.
        )r3   r?   rA   rG   s     r   _resumedz StrictPushProducerTests._resumedr   s8     &''!!!  """r   rH   r   c                <    |                      |j        d           dS )z
        Assert that the given producer is in the stopped state.

        @param producer: The producer to verify.
        @type producer: L{StrictPushProducer}
        r6   Nr$   r8   rG   s     r   assertStoppedz%StrictPushProducerTests.assertStopped{   "     	)44444r   c                <    |                      |j        d           dS )z
        Assert that the given producer is in the paused state.

        @param producer: The producer to verify.
        @type producer: L{StrictPushProducer}
        r>   NrO   rG   s     r   assertPausedz$StrictPushProducerTests.assertPaused   s"     	(33333r   c                <    |                      |j        d           dS )z
        Assert that the given producer is in the running state.

        @param producer: The producer to verify.
        @type producer: L{StrictPushProducer}
        r4   NrO   rG   s     r   assertRunningz%StrictPushProducerTests.assertRunning   rQ   r   c                j    |                      t          |                                 j                   dS )zz
        L{StrictPushProducer.stopProducing} raises L{ValueError} if called when
        the producer is stopped.
        N)assertRaisesr9   rI   r;   r:   s    r   test_stopThenStopz)StrictPushProducerTests.test_stopThenStop   s+    
 	*dmmoo&CDDDDDr   c                j    |                      t          |                                 j                   dS )z{
        L{StrictPushProducer.pauseProducing} raises L{ValueError} if called when
        the producer is stopped.
        N)rW   r9   rI   r?   r:   s    r   test_stopThenPausez*StrictPushProducerTests.test_stopThenPause   s+    
 	*dmmoo&DEEEEEr   c                j    |                      t          |                                 j                   dS )z|
        L{StrictPushProducer.resumeProducing} raises L{ValueError} if called when
        the producer is stopped.
        N)rW   r9   rI   rA   r:   s    r   test_stopThenResumez+StrictPushProducerTests.test_stopThenResume   s+    
 	*dmmoo&EFFFFFr   c                    |                                  }|                                 |                     |           dS )zn
        L{StrictPushProducer} is stopped if C{stopProducing} is called on a paused
        producer.
        N)rK   r;   rP   rG   s     r   test_pauseThenStopz*StrictPushProducerTests.test_pauseThenStop   s=    
 <<>>   8$$$$$r   c                n    |                                  }|                     t          |j                   dS )zs
        L{StrictPushProducer.pauseProducing} raises L{ValueError} if called on a
        paused producer.
        N)rK   rW   r9   r?   rG   s     r   test_pauseThenPausez+StrictPushProducerTests.test_pauseThenPause   s0    
 <<>>*h&=>>>>>r   c                    |                                  }|                                 |                     |           dS )zp
        L{StrictPushProducer} is resumed if C{resumeProducing} is called on a
        paused producer.
        N)rK   rA   rU   rG   s     r   test_pauseThenResumez,StrictPushProducerTests.test_pauseThenResume   s=    
 <<>>  """8$$$$$r   c                    |                                  }|                                 |                     |           dS )zo
        L{StrictPushProducer} is stopped if C{stopProducing} is called on a
        resumed producer.
        N)rM   r;   rP   rG   s     r   test_resumeThenStopz+StrictPushProducerTests.test_resumeThenStop   =    
 ==??   8$$$$$r   c                    |                                  }|                                 |                     |           dS )zo
        L{StrictPushProducer} is paused if C{pauseProducing} is called on a
        resumed producer.
        N)rM   r?   rS   rG   s     r   test_resumeThenPausez,StrictPushProducerTests.test_resumeThenPause   =    
 ==??!!!(#####r   c                n    |                                  }|                     t          |j                   dS )zu
        L{StrictPushProducer.resumeProducing} raises L{ValueError} if called on a
        resumed producer.
        N)rM   rW   r9   rA   rG   s     r   test_resumeThenResumez-StrictPushProducerTests.test_resumeThenResume   0    
 ==??*h&>?????r   c                    |                                  }|                                 |                     |           dS )zn
        L{StrictPushProducer} is stopped if C{stopProducing} is called in the
        initial state.
        N)rE   r;   rP   rG   s     r   	test_stopz!StrictPushProducerTests.test_stop   re   r   c                    |                                  }|                                 |                     |           dS )zn
        L{StrictPushProducer} is paused if C{pauseProducing} is called in the
        initial state.
        N)rE   r?   rS   rG   s     r   
test_pausez"StrictPushProducerTests.test_pause   rh   r   c                n    |                                  }|                     t          |j                   dS )zz
        L{StrictPushProducer} raises L{ValueError} if C{resumeProducing} is called
        in the initial state.
        N)rE   rW   r9   rA   rG   s     r   test_resumez#StrictPushProducerTests.test_resume   rk   r   N)r   r3   )rH   r3   r   r   r,   )r-   r.   r/   r0   rE   rI   rK   rM   rP   rS   rU   rX   rZ   r\   r^   r`   rb   rd   rg   rj   rm   ro   rq   r1   r   r   rC   rC   V   s        $ $ $ $         5 5 5 54 4 4 45 5 5 5E E E EF F F FG G G G% % % %? ? ? ?% % % %% % % %$ $ $ $@ @ @ @% % % %$ $ $ $@ @ @ @ @ @r   rC   c                  2    e Zd ZdZddZddZddZdd	Zd
S )IOPumpTestsz
    Tests for L{IOPump}.
    modeLiteral['server', 'client']r   r   c                h   t                      }t          |d          }t                      }t          |d          }t          ||||d          }t                      }||d|         }|                    |d           |                                 |                     d|j                   dS )	a  
        Connect a couple protocol/transport pairs to an L{IOPump} and then pump
        it.  Verify that a streaming producer registered with one of the
        transports does not receive invalid L{IPushProducer} method calls and
        ends in the right state.

        @param mode: C{u"server"} to test a producer registered with the
            server transport.  C{u"client"} to test a producer registered with
            the client transport.
        T)isServerF)greet)serverclient)	streamingr4   N)r   r	   r
   r3   registerProducerpumpr$   r8   )	r   rt   serverProtoserverTransportclientProtoclientTransportr}   rH   victims	            r   _testStreamingProducerz"IOPumpTests._testStreamingProducer   s     jj'dCCCjj'eDDD
 
 
 &''%%
 
  	D999		HO44444r   c                2    |                      d           dS )z
        L{IOPump.pump} does not call C{resumeProducing} on a L{IPushProducer}
        (stream producer) registered with the server transport.
        ry   rt   Nr   r:   s    r   test_serverStreamingProducerz(IOPumpTests.test_serverStreamingProducer"  !    
 	###22222r   c                2    |                      d           dS )z
        L{IOPump.pump} does not call C{resumeProducing} on a L{IPushProducer}
        (stream producer) registered with the client transport.
        rz   r   Nr   r:   s    r   test_clientStreamingProducerz(IOPumpTests.test_clientStreamingProducer)  r   r   c                   g t                      }t          t          t          |          \  }}}|                    dfd           |                                |                                 |                                dS )zE
        L{IOPump.pump} advances time in the given L{Clock}.
        )clockr   c                 .                          d          S )NT)append)time_passeds   r   <lambda>z/IOPumpTests.test_timeAdvances.<locals>.<lambda>7  s    ;#5#5d#;#; r   N)r   r   r   	callLaterassertFalser}   
assertTrue)r   r   _r}   r   s       @r   test_timeAdvanceszIOPumpTests.test_timeAdvances0  s     -hNNN
1d;;;;<<<%%%		$$$$$r   N)rt   ru   r   r   r,   )r-   r.   r/   r0   r   r   r   r   r1   r   r   rs   rs      so         !5 !5 !5 !5F3 3 3 33 3 3 3
% 
% 
% 
% 
% 
%r   rs   N)r0   
__future__r   typingr   zope.interfacer   twisted.internet.interfacesr   twisted.internet.protocolr   twisted.internet.taskr   twisted.test.iosimr	   r
   r   twisted.trial.unittestr   r   r3   rC   rs   r1   r   r   <module>r      s    # " " " " "       & & & & & & 5 5 5 5 5 5 . . . . . . ' ' ' ' ' ' O O O O O O O O O O + + + + + +&8 &8 &8 &8 &8 &8 &8 &8R ]               0a@ a@ a@ a@ a@h a@ a@ a@H@% @% @% @% @%( @% @% @% @% @%r   