messagebroker - Kafka Consumer path must not end with / character -
i using apache kafka 0.8.2.1 stream web events other datasources. kafka producer wrote working great , i'm able see data getting streaming through topic when run kafka-console-consumer.sh. however, have had no luck whatsoever trying kafka consumer retrieve messages. ideas?
the following error improper path being output when code attempts run consumer.createmessagestreams(topiccountmap)
exception in thread "main" java.lang.illegalargumentexception: path must not end / character @ org.apache.zookeeper.common.pathutils.validatepath(pathutils.java:58) @ org.apache.zookeeper.zookeeper.exists(zookeeper.java:1024) @ org.apache.zookeeper.zookeeper.exists(zookeeper.java:1073) @ org.i0itec.zkclient.zkconnection.exists(zkconnection.java:95) @ org.i0itec.zkclient.zkclient$11.call(zkclient.java:827) @ org.i0itec.zkclient.zkclient.retryuntilconnected(zkclient.java:675) @ org.i0itec.zkclient.zkclient.watchfordata(zkclient.java:824) @ org.i0itec.zkclient.zkclient.subscribedatachanges(zkclient.java:136) @ kafka.consumer.zookeeperconsumerconnector$$anonfun$kafka$consumer$zookeeperconsumerconnector$$reinitializeconsumer$4.apply(zookeeperconsume rconnector.scala:901) @ kafka.consumer.zookeeperconsumerconnector$$anonfun$kafka$consumer$zookeeperconsumerconnector$$reinitializeconsumer$4.apply(zookeeperconsume rconnector.scala:898) @ scala.collection.mutable.hashmap$$anonfun$foreach$1.apply(hashmap.scala:98) @ scala.collection.mutable.hashmap$$anonfun$foreach$1.apply(hashmap.scala:98) @ scala.collection.mutable.hashtable$class.foreachentry(hashtable.scala:226) @ scala.collection.mutable.hashmap.foreachentry(hashmap.scala:39) @ scala.collection.mutable.hashmap.foreach(hashmap.scala:98) @ kafka.consumer.zookeeperconsumerconnector.kafka$consumer$zookeeperconsumerconnector$$reinitializeconsumer(zookeeperconsumerconnector.scala: 898) @ kafka.consumer.zookeeperconsumerconnector.consume(zookeeperconsumerconnector.scala:240) @ kafka.javaapi.consumer.zookeeperconsumerconnector.createmessagestreams(zookeeperconsumerconnector.scala:85) @ kafka.javaapi.consumer.zookeeperconsumerconnector.createmessagestreams(zookeeperconsumerconnector.scala:97)
here code kafka consumer.
val consumer: consumerconnector = kafka.consumer.consumer.createjavaconsumerconnector(createconsumerconfig()) var executor: executorservice = null def run(a_numthreads: integer) { var topiccountmap: java.util.map[string, integer] = new java.util.hashmap[string, integer]() topiccountmap.put("testevent", new integer(a_numthreads)) var consumermap = consumer.createmessagestreams(topiccountmap) var streams = consumermap.get("testevent") // launch threads executor = executors.newfixedthreadpool(a_numthreads) // create object consume messages // var threadnumber: integer = 0 var streamsitr = streams.iterator() while (streamsitr.hasnext()) { var stream = streamsitr.next() executor.submit(new eventconsumer(stream, threadnumber)) threadnumber = threadnumber + 1 } } def createconsumerconfig(): consumerconfig = { var props: properties = new properties() props.put("zookeeper.connect", "127.0.0.1:2181") props.put("zk.connect", "127.0.0.1:2181") props.put("group.id", "testconsumer") props.put("groupid", "tesconsumer") props.put("zookeeper.session.timeout.ms", "400") props.put("zookeeper.sync.time.ms", "200") props.put("auto.commit.interval.ms", "1000") return new consumerconfig(props) }
spark checkpointwriter produces exception message when cannot access checkpoint path stored. please ensure checkpointing disabled or provide correct path. exception occurs after success connection in
at org.apache.zookeeper.common.pathutils.validatepath(pathutils.java:58)
it seems writer cannot acces directory checkpoint information saved.
https://spark.apache.org/docs/1.3.0/streaming-programming-guide.html#checkpointing