messagebroker - Kafka Consumer path must not end with / character -


i using apache kafka 0.8.2.1 stream web events other datasources. kafka producer wrote working great , i'm able see data getting streaming through topic when run kafka-console-consumer.sh. however, have had no luck whatsoever trying kafka consumer retrieve messages. ideas?

the following error improper path being output when code attempts run consumer.createmessagestreams(topiccountmap)

exception in thread "main" java.lang.illegalargumentexception: path must not end / character         @ org.apache.zookeeper.common.pathutils.validatepath(pathutils.java:58)         @ org.apache.zookeeper.zookeeper.exists(zookeeper.java:1024)         @ org.apache.zookeeper.zookeeper.exists(zookeeper.java:1073)         @ org.i0itec.zkclient.zkconnection.exists(zkconnection.java:95)         @ org.i0itec.zkclient.zkclient$11.call(zkclient.java:827)         @ org.i0itec.zkclient.zkclient.retryuntilconnected(zkclient.java:675)         @ org.i0itec.zkclient.zkclient.watchfordata(zkclient.java:824)         @ org.i0itec.zkclient.zkclient.subscribedatachanges(zkclient.java:136)         @ kafka.consumer.zookeeperconsumerconnector$$anonfun$kafka$consumer$zookeeperconsumerconnector$$reinitializeconsumer$4.apply(zookeeperconsume rconnector.scala:901)         @ kafka.consumer.zookeeperconsumerconnector$$anonfun$kafka$consumer$zookeeperconsumerconnector$$reinitializeconsumer$4.apply(zookeeperconsume rconnector.scala:898)         @ scala.collection.mutable.hashmap$$anonfun$foreach$1.apply(hashmap.scala:98)         @ scala.collection.mutable.hashmap$$anonfun$foreach$1.apply(hashmap.scala:98)         @ scala.collection.mutable.hashtable$class.foreachentry(hashtable.scala:226)         @ scala.collection.mutable.hashmap.foreachentry(hashmap.scala:39)         @ scala.collection.mutable.hashmap.foreach(hashmap.scala:98)         @ kafka.consumer.zookeeperconsumerconnector.kafka$consumer$zookeeperconsumerconnector$$reinitializeconsumer(zookeeperconsumerconnector.scala: 898)         @ kafka.consumer.zookeeperconsumerconnector.consume(zookeeperconsumerconnector.scala:240)         @ kafka.javaapi.consumer.zookeeperconsumerconnector.createmessagestreams(zookeeperconsumerconnector.scala:85)         @ kafka.javaapi.consumer.zookeeperconsumerconnector.createmessagestreams(zookeeperconsumerconnector.scala:97) 

here code kafka consumer.

  val consumer: consumerconnector = kafka.consumer.consumer.createjavaconsumerconnector(createconsumerconfig())    var executor: executorservice = null    def run(a_numthreads: integer) {     var topiccountmap: java.util.map[string, integer] = new java.util.hashmap[string, integer]()      topiccountmap.put("testevent", new integer(a_numthreads))      var consumermap = consumer.createmessagestreams(topiccountmap)      var streams = consumermap.get("testevent")     // launch threads     executor = executors.newfixedthreadpool(a_numthreads)      // create object consume messages     //     var threadnumber: integer = 0     var streamsitr = streams.iterator()     while (streamsitr.hasnext()) {       var stream = streamsitr.next()       executor.submit(new eventconsumer(stream, threadnumber))       threadnumber = threadnumber + 1     }   }    def createconsumerconfig(): consumerconfig = {     var props: properties = new properties()     props.put("zookeeper.connect", "127.0.0.1:2181")     props.put("zk.connect", "127.0.0.1:2181")     props.put("group.id", "testconsumer")     props.put("groupid", "tesconsumer")     props.put("zookeeper.session.timeout.ms", "400")     props.put("zookeeper.sync.time.ms", "200")     props.put("auto.commit.interval.ms", "1000")      return new consumerconfig(props)   } 

spark checkpointwriter produces exception message when cannot access checkpoint path stored. please ensure checkpointing disabled or provide correct path. exception occurs after success connection in

at org.apache.zookeeper.common.pathutils.validatepath(pathutils.java:58)

it seems writer cannot acces directory checkpoint information saved.

https://spark.apache.org/docs/1.3.0/streaming-programming-guide.html#checkpointing


Popular posts from this blog

c# - ODP.NET Oracle.ManagedDataAccess causes ORA-12537 network session end of file -

matlab - Compression and Decompression of ECG Signal using HUFFMAN ALGORITHM -

utf 8 - split utf-8 string into bytes in python -