java - Converting UnixTimestamp to TIMEUUID for Cassandra -
i'm learning apache cassandra 3.x.x , i'm trying develop stuff play around. problem want store data cassandra table contains these columns:
id (uuid - primary key) | message (text) | req_timestamp (timeuuid) | now_timestamp (timeuuid)
req_timestamp has time when message left client @ frontend level. now_timestamp, on other hand, time when message stored in cassandra. need both timestamps because want measure amount of time takes handle request origin until data safely stored.
creating now_timestamp easy, use now() function , generates timeuuid automatically. problem arises req_timestamp. how can convert unix timestamp timeuuid cassandra can store it? possible?
the architecture of backend this: data in json frontend web service process , stores in kafka. then, spark streaming job takes kafka log , puts in cassandra.
this webservice puts data in kafka.
@path("/") public class memoin { @post @path("/in") @consumes(mediatype.application_json) @produces(mediatype.text_plain) public response goinkafka(inputstream incomingdata){ stringbuilder bld = new stringbuilder(); try { bufferedreader in = new bufferedreader(new inputstreamreader(incomingdata)); string line = null; while ((line = in.readline()) != null) { bld.append(line); } } catch (exception e) { system.out.println("error parsing: - "); } system.out.println("data received: " + bld.tostring()); jsonobject obj = new jsonobject(bld.tostring()); string line = obj.getstring("id_memo") + "|" + obj.getstring("id_writer") + "|" + obj.getstring("id_diseased") + "|" + obj.getstring("memo") + "|" + obj.getlong("req_timestamp"); try { kafkalogwriter.addtolog(line); } catch (exception e) { e.printstacktrace(); } return response.status(200).entity(line).build(); } }
here's kafka writer
package main.java.vcemetery.webservice; import org.apache.kafka.clients.producer.kafkaproducer; import org.apache.kafka.clients.producer.producerrecord; import java.util.properties; import org.apache.kafka.clients.producer.producer; public class kafkalogwriter { public static void addtolog(string memo)throws exception { // private static scanner in; string topicname = "memoslog"; /* first, set properties of kafka log */ properties props = new properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.stringserializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.stringserializer"); // create producer producer<string, string> producer = new kafkaproducer<>(props); // send line producer producer.send(new producerrecord<>(topicname, memo)); // close producer producer.close(); } }
and here's have of spark streaming job
public class memostream { public static void main(string[] args) throws exception { logger.getlogger("org").setlevel(level.error); logger.getlogger("akka").setlevel(level.error); // create context 1 second batch size sparkconf sparkconf = new sparkconf().setappname("kafkasparkexample").setmaster("local[2]"); javastreamingcontext ssc = new javastreamingcontext(sparkconf, durations.seconds(10)); map<string, object> kafkaparams = new hashmap<>(); kafkaparams.put("bootstrap.servers", "localhost:9092"); kafkaparams.put("key.deserializer", stringdeserializer.class); kafkaparams.put("value.deserializer", stringdeserializer.class); kafkaparams.put("group.id", "group1"); kafkaparams.put("auto.offset.reset", "latest"); kafkaparams.put("enable.auto.commit", false); /* se crea un array con los tópicos consultar, en este caso solamente un tópico */ collection<string> topics = arrays.aslist("memoslog"); final javainputdstream<consumerrecord<string, string>> kafkastream = kafkautils.createdirectstream( ssc, locationstrategies.preferconsistent(), consumerstrategies.<string, string>subscribe(topics, kafkaparams) ); kafkastream.maptopair(record -> new tuple2<>(record.key(), record.value())); // split each bucket of kafka data memos splitable stream javadstream<string> stream = kafkastream.map(record -> (record.value().tostring())); // then, split each stream lines or memos javadstream<string> memos = stream.flatmap(x -> arrays.aslist(x.split("\n")).iterator()); /* split each memo sections of ids , messages, have use code \\ plus character */ javadstream<string> sections = memos.flatmap(y -> arrays.aslist(y.split("\\|")).iterator()); sections.print(); sections.foreachrdd(rdd -> { rdd.foreachpartition(partitionofrecords -> { //we establish connection cassandra cluster cluster = null; try { cluster = cluster.builder() .withclustername("vcemeterymemos") // clustername .addcontactpoint("127.0.0.1") // host ip .build(); } { if (cluster != null) cluster.close(); } while(partitionofrecords.hasnext()){ } }); }); ssc.start(); ssc.awaittermination(); } }
thank in advance.
cassandra has no function convert from unix timestamp. have conversion on client side.
ref: https://docs.datastax.com/en/cql/3.3/cql/cql_reference/timeuuid_functions_r.html
Comments
Post a Comment