java - Converting UnixTimestamp to TIMEUUID for Cassandra -


i'm learning apache cassandra 3.x.x , i'm trying develop stuff play around. problem want store data cassandra table contains these columns:

id (uuid - primary key) | message (text) | req_timestamp (timeuuid) | now_timestamp (timeuuid) 

req_timestamp has time when message left client @ frontend level. now_timestamp, on other hand, time when message stored in cassandra. need both timestamps because want measure amount of time takes handle request origin until data safely stored.

creating now_timestamp easy, use now() function , generates timeuuid automatically. problem arises req_timestamp. how can convert unix timestamp timeuuid cassandra can store it? possible?

the architecture of backend this: data in json frontend web service process , stores in kafka. then, spark streaming job takes kafka log , puts in cassandra.

this webservice puts data in kafka.

@path("/") public class memoin {      @post     @path("/in")     @consumes(mediatype.application_json)     @produces(mediatype.text_plain)     public response goinkafka(inputstream incomingdata){         stringbuilder bld = new stringbuilder();         try {             bufferedreader in = new bufferedreader(new inputstreamreader(incomingdata));             string line = null;             while ((line = in.readline()) != null) {                 bld.append(line);             }         } catch (exception e) {             system.out.println("error parsing: - ");         }         system.out.println("data received: " + bld.tostring());          jsonobject obj = new jsonobject(bld.tostring());         string line = obj.getstring("id_memo") + "|" + obj.getstring("id_writer") +                                  "|" + obj.getstring("id_diseased")                                  + "|" + obj.getstring("memo") + "|" + obj.getlong("req_timestamp");          try {             kafkalogwriter.addtolog(line);         } catch (exception e) {             e.printstacktrace();         }          return response.status(200).entity(line).build();     }   } 

here's kafka writer

    package main.java.vcemetery.webservice;  import org.apache.kafka.clients.producer.kafkaproducer; import org.apache.kafka.clients.producer.producerrecord; import java.util.properties; import org.apache.kafka.clients.producer.producer;  public class kafkalogwriter {      public static void addtolog(string memo)throws exception {         // private static scanner in;             string topicname = "memoslog";              /*             first, set properties of kafka log              */             properties props = new properties();             props.put("bootstrap.servers", "localhost:9092");             props.put("acks", "all");             props.put("retries", 0);             props.put("batch.size", 16384);             props.put("linger.ms", 1);             props.put("buffer.memory", 33554432);             props.put("key.serializer", "org.apache.kafka.common.serialization.stringserializer");             props.put("value.serializer", "org.apache.kafka.common.serialization.stringserializer");              // create producer             producer<string, string> producer = new kafkaproducer<>(props);             // send line producer             producer.send(new producerrecord<>(topicname, memo));             // close producer             producer.close();      } } 

and here's have of spark streaming job

public class memostream {      public static void main(string[] args) throws exception {         logger.getlogger("org").setlevel(level.error);         logger.getlogger("akka").setlevel(level.error);          // create context 1 second batch size         sparkconf sparkconf = new sparkconf().setappname("kafkasparkexample").setmaster("local[2]");         javastreamingcontext ssc = new javastreamingcontext(sparkconf, durations.seconds(10));          map<string, object> kafkaparams = new hashmap<>();         kafkaparams.put("bootstrap.servers", "localhost:9092");         kafkaparams.put("key.deserializer", stringdeserializer.class);         kafkaparams.put("value.deserializer", stringdeserializer.class);         kafkaparams.put("group.id", "group1");         kafkaparams.put("auto.offset.reset", "latest");         kafkaparams.put("enable.auto.commit", false);          /* se crea un array con los tópicos consultar, en este caso solamente un tópico */         collection<string> topics = arrays.aslist("memoslog");          final javainputdstream<consumerrecord<string, string>> kafkastream =                 kafkautils.createdirectstream(                         ssc,                         locationstrategies.preferconsistent(),                         consumerstrategies.<string, string>subscribe(topics, kafkaparams)                 );          kafkastream.maptopair(record -> new tuple2<>(record.key(), record.value()));         // split each bucket of kafka data memos splitable stream         javadstream<string> stream = kafkastream.map(record -> (record.value().tostring()));         // then, split each stream lines or memos         javadstream<string> memos = stream.flatmap(x -> arrays.aslist(x.split("\n")).iterator());         /*          split each memo sections of ids , messages, have use code \\ plus character           */         javadstream<string> sections = memos.flatmap(y -> arrays.aslist(y.split("\\|")).iterator());         sections.print();         sections.foreachrdd(rdd -> {            rdd.foreachpartition(partitionofrecords -> {                //we establish connection cassandra                cluster cluster = null;                try {                    cluster = cluster.builder()                            .withclustername("vcemeterymemos") // clustername                            .addcontactpoint("127.0.0.1") // host ip                            .build();                 } {                    if (cluster != null) cluster.close();                }                while(partitionofrecords.hasnext()){                  }            });         });          ssc.start();         ssc.awaittermination();      } } 

thank in advance.

cassandra has no function convert from unix timestamp. have conversion on client side.

ref: https://docs.datastax.com/en/cql/3.3/cql/cql_reference/timeuuid_functions_r.html


Comments

Popular posts from this blog

ios - MKAnnotationView layer is not of expected type: MKLayer -

ZeroMQ on Windows, with Qt Creator -

unity3d - Unity SceneManager.LoadScene quits application -