mule - MUnit testing a flow with collection-aggregator -
i trying write munit tests mule flows. want write test below flow

<flow name="downloadftpfileintolocalflow" processingstrategy="synchronous" tracking:enable-default-events="true"> <quartz:inbound-endpoint jobname="source-file-scheduler" cronexpression="${source.pollingfrequency}" startdelay="10000" responsetimeout="10000" doc:name="quartz"> <quartz:endpoint-polling-job> <quartz:job-endpoint ref="inputsftpendpoint"/> </quartz:endpoint-polling-job> </quartz:inbound-endpoint> <logger message="downloadftpfileintolocalflow #[payload.getclass().getname()]" level="info" doc:name="logger"/> <set-property propertyname="mule_correlation_group_size" value="#[java.lang.integer.max_value]" doc:name="groupsizeforexceptionaggregator"/> <set-property propertyname="mule_correlation_id" value="#[java.util.uuid.randomuuid().tostring()]" doc:name="corelationidforexceptionaggregator"/> <set-variable variablename="originalpayload" value="#[payload]" doc:name="originalpayload"/> <byte-array-to-object-transformer doc:name="byte array object"/> <flow-ref name="processcsvflow" doc:name="processcsvflow" /> <exception-strategy ref="default_exception_strategy" doc:name="downloadftpfileintolocalflow strategy"/> </flow> <sub-flow name="processcsvflow" tracking:enable-default-events="true"> <transformer ref="enrichwithheaderandendoffiletransformer" doc:name="headerandeofenricher" /> <set-variable variablename="outputfilename" value="#['mercury'+server.datetime.year+server.datetime.month+server.datetime.dayofmonth+server.datetime.hours+server.datetime.minutes+server.datetime.seconds+'.csv']" doc:name="outputfilename"/> <!-- <set-variable variablename="outputfilename" value="#['mercury'+server.datetime.year+':'+server.datetime.month+':'+server.datetime.dayofmonth+'::'+server.datetime.hours+':'+server.datetime.minutes+':'+server.datetime.seconds+'.csv']" doc:name="outputfilename"/> --> <sftp:outbound-endpoint exchange-pattern="one-way" connector-ref="destinationsftp" host="${destination.host}" port="22" responsetimeout="10000" doc:name="destinationsftp" outputpattern="#[outputfilename]" path="${destination.path}" user="${destination.username}" password="${destination.password}"/> <gzip-compress-transformer/> <sftp:outbound-endpoint exchange-pattern="one-way" connector-ref="inputsftp" host="${source.host}" port="22" responsetimeout="10000" doc:name="sourcearchivesftp" outputpattern="#[outputfilename].gzip" path="archive" user="${source.username}" password="${source.password}"/> <component doc:name="delete read file"> <singleton-object class="component.deleteprocessedfilecomponent"> <property key="host" value="${source.host}"/> <property key="username" value="${source.username}"/> <property key="password" value="${source.password}"/> <property key="workingdirectory" value="${source.path}"/> </singleton-object> </component> <parse-template location="successmessagetemplate.txt" doc:name="success template"/> <smtp:outbound-endpoint host="${smtp.host}" port="${smtp.port}" user="${smtp.from.address}" password="${smtp.from.password}" to="${smtp.to.address}" from="${smtp.from.address}" subject="${mail.success.subject}" responsetimeout="10000" doc:name="successemail" connector-ref="gmail"/> <logger message="process completed successfully" level="info" doc:name="logger"/> </sub-flow> exception handling block
<catch-exception-strategy name="default_exception_strategy"> <flow-ref name="exceptionhandlingsubflow" doc:name="exceptionhandlingsubflow"/> </catch-exception-strategy> <sub-flow name="exceptionhandlingsubflow" tracking:enable-default-events="true"> <collection-aggregator timeout="60000" failontimeout="false" doc:name="exception aggregator"/> <logger message="exception has occured payload #[payload] , message #[message]" level="error" doc:name="logger"/> <parse-template location="errormessagetemplate.txt" doc:name="error template"/> <smtp:outbound-endpoint host="${smtp.host}" port="${smtp.port}" user="${smtp.from.address}" password="${smtp.from.password}" to="${smtp.to.address}" from="${smtp.from.address}" subject="${mail.failure.subject}" responsetimeout="10000" doc:name="erroremail" connector-ref="gmail"/> </sub-flow> the interesting bit exception sub-flow, collection-aggregator
my unit test
@test public void whenmultipleexceptionsoccurinflow_itshouldsendonlyonefailureemail() throws exception { whenmessageprocessor("collection-aggregator") .withattributes(attribute("name").ofnamespace("doc").withvalue("exception aggregator")).thenreturnsameevent(); destinationsftp.thenthrow(new runtimeexception("dummy exception destinationsftp")); muleevent testevent = propertyenricher.enrich(testevent(ioutils.toinputstream("hello,dummy,payload"))).get(); runflow("downloadftpfileintolocalflow", testevent); verifycallofmessageprocessor("outbound-endpoint").ofnamespace("smtp") .withattributes(attribute("name").ofnamespace("doc").withvalue("erroremail")) .times(1); } now if not mock collection aggregator out test not pass, can understand tricky aggregator has "pause" within , hence not ideal candidate unit test, technical standpoint want understand causing unit test fail (when collection-aggregator not mocked).
my test fails when collection-aggregator not mocked.
junit.framework.assertionfailederror: on smtp:outbound-endpoint.expected 1 got 0 calls @ junit.framework.assert.fail(assert.java:50) @ org.mule.munit.common.mocking.munitverifier.times(munitverifier.java:86) @ nz.co.mightyriver.processcsvtest.whenmultipleexceptionsoccurinflow_itshouldsendonlyonefailureemail(processcsvtest.java:100) @ sun.reflect.nativemethodaccessorimpl.invoke0(native method) @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:57) @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) @ java.lang.reflect.method.invoke(method.java:606)
so i've been trying reproduce issue:
production code
<http:listener-config name="http_listener_configuration" host="0.0.0.0" port="9090" doc:name="http listener configuration"/> <flow name="stack-munit-and-aggregationflow"> <http:listener config-ref="http_listener_configuration" path="/" doc:name="http"/> <set-payload value="#['lalero_' + new java.util.date().tostring()]" doc:name="set payload"/> <flow-ref name="stack-munit-and-aggregationsub_flow" doc:name="stack-munit-and-aggregationsub_flow"/> <set-payload doc:name="set payload" value="#[payload.tostring()]"/> </flow> <sub-flow name="stack-munit-and-aggregationsub_flow"> <collection-aggregator failontimeout="true" doc:name="collection aggregator" timeout="10"/> </sub-flow> test code
package org.mule.munit; import org.junit.assert; import org.junit.test; import org.mule.api.muleevent; import org.mule.api.muleexception; import org.mule.munit.runner.functional.functionalmunitsuite; public class thetest extends functionalmunitsuite { @test public void atest() throws muleexception, exception { muleevent event = runflow("stack-munit-and-aggregationflow", testevent("")); string payload = (string) event.getmessage().getpayload(); assert.asserttrue(payload.contains("lalero")); } } if check code you'll notice i'm not mocking out collection aggregator. after few tests wasn't able reproduce error.
i think issue somewhere else. please share you're code can investigate further?
world of warning though, due issue discovered in: how mock java component within mule flow using munit
you may find problem if try directly test sub-flow exceptionhandlingsubflow. not doing in example code don't think 2 related.
cheers!