elasticsearch - Concurrent file parsing and inserting into Elastic Search -
i playing go , come small script parses log files , inserts them elastic search. each file spawned goroutine this:
var wg := sync.waitgroup{} wg.add(len(files)) _, file := range files { go func(f os.fileinfo){ defer wg.done() processfile(f.name(), config.originfilepath, config.workingfilepath, config.archivefilepath,fmt.sprintf("http://%v:%v", config.elasticsearch.host, config.elasticsearch.port),config.providerindex, config.networkdata) }(file) } wg.wait()
inside of processfile have function sends elastic search:
func bulkinsert(lines []string, es *elastic.client) (*elastic.response, error){ r, err := es.performrequest("post", "/_bulk", url.values{}, strings.join(lines, "\n")+"\n") if err != nil { return nil, err } return r, nil }
the problem don't understand how goroutines work. understanding sending elastic search blocking 1 of goroutines executing. tried spawning goroutine elastic search bulk insert same approach:
waitgroup
, go func(){defer wg.done(); bulkinsert(elems, es);}()
, wg.wait()
before function return. however, i've discovered in end not events end in elastic search. think due goroutines returning without ever sending/waiting bulk request finish.
my question is, approach problem correct? can achieve better performance?
can achieve better performance?
unclear, depends of receiver , sender capabilities.
my question is, approach problem correct?
might better understand go routines,
package main import ( "fmt" "log" "net/http" "sync" "time" ) func main() { addr := "127.0.0.1:2074" srv := http.server{ addr: addr, handler: http.handlerfunc(func(w http.responsewriter, r *http.request) { log.println("hit ", r.url.string()) <-time.after(time.second) log.println("done ", r.url.string()) }), } fail(unblock(srv.listenandserve)) jobs := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9} // case 1 // creates 10 goroutines, // triggers 10 // concurrent queries { wg := sync.waitgroup{} wg.add(len(jobs)) log.printf("starting %v jobs\n", len(jobs)) _, job := range jobs { go func(job int) { defer wg.done() http.get(fmt.sprintf("http://%v/job/%v", addr, job)) }(job) } wg.wait() log.printf("done %v jobs\n", len(jobs)) } log.println() log.println("=================") log.println() // case 2 // creates 3 goroutines, // triggers 3 // concurrent queries { wg := sync.waitgroup{} wg.add(len(jobs)) in := make(chan string) limit := make(chan bool, 3) log.printf("starting %v jobs\n", len(jobs)) go func() { url := range in { limit <- true go func(url string) { defer wg.done() http.get(url) <-limit }(url) } }() _, job := range jobs { in <- fmt.sprintf("http://%v/job/%v", addr, job) } wg.wait() log.printf("done %v jobs\n", len(jobs)) } log.println() log.println("=================") log.println() // case 2: rewrite // creates 6 goroutines, // triggers 6 // concurrent queries { wait, add := parallel(6) log.printf("starting %v jobs\n", len(jobs)) _, job := range jobs { url := fmt.sprintf("http://%v/job/%v", addr, job) add(func() { http.get(url) }) } wait() log.printf("done %v jobs\n", len(jobs)) } } func parallel(c int) (func(), func(block func())) { wg := sync.waitgroup{} in := make(chan func()) limit := make(chan bool, c) go func() { block := range in { limit <- true go func(block func()) { defer wg.done() block() <-limit }(block) } }() return wg.wait, func(block func()) { wg.add(1) in <- block } } func unblock(block func() error) error { w := make(chan error) go func() { w <- block() }() select { case err := <-w: return err case <-time.after(time.millisecond): } return nil } func fail(err error) { if err != nil { panic(err) } }
outputs
$ go run main.go 2017/09/14 01:30:50 starting 10 jobs 2017/09/14 01:30:50 hit /job/0 2017/09/14 01:30:50 hit /job/4 2017/09/14 01:30:50 hit /job/5 2017/09/14 01:30:50 hit /job/2 2017/09/14 01:30:50 hit /job/9 2017/09/14 01:30:50 hit /job/1 2017/09/14 01:30:50 hit /job/3 2017/09/14 01:30:50 hit /job/7 2017/09/14 01:30:50 hit /job/8 2017/09/14 01:30:50 hit /job/6 2017/09/14 01:30:51 done /job/5 2017/09/14 01:30:51 done /job/4 2017/09/14 01:30:51 done /job/2 2017/09/14 01:30:51 done /job/0 2017/09/14 01:30:51 done /job/6 2017/09/14 01:30:51 done /job/9 2017/09/14 01:30:51 done /job/1 2017/09/14 01:30:51 done /job/3 2017/09/14 01:30:51 done /job/7 2017/09/14 01:30:51 done /job/8 2017/09/14 01:30:51 done 10 jobs 2017/09/14 01:30:51 2017/09/14 01:30:51 ================= 2017/09/14 01:30:51 2017/09/14 01:30:51 starting 10 jobs 2017/09/14 01:30:51 hit /job/0 2017/09/14 01:30:51 hit /job/2 2017/09/14 01:30:51 hit /job/1 2017/09/14 01:30:52 done /job/2 2017/09/14 01:30:52 done /job/0 2017/09/14 01:30:52 done /job/1 2017/09/14 01:30:52 hit /job/3 2017/09/14 01:30:52 hit /job/4 2017/09/14 01:30:52 hit /job/5 2017/09/14 01:30:53 done /job/3 2017/09/14 01:30:53 done /job/4 2017/09/14 01:30:53 done /job/5 2017/09/14 01:30:53 hit /job/6 2017/09/14 01:30:53 hit /job/7 2017/09/14 01:30:53 hit /job/8 2017/09/14 01:30:54 done /job/6 2017/09/14 01:30:54 done /job/7 2017/09/14 01:30:54 done /job/8 2017/09/14 01:30:54 hit /job/9 2017/09/14 01:30:55 done /job/9 2017/09/14 01:30:55 done 10 jobs 2017/09/14 01:30:55 2017/09/14 01:30:55 ================= 2017/09/14 01:30:55 2017/09/14 01:30:55 starting 10 jobs 2017/09/14 01:30:55 hit /job/0 2017/09/14 01:30:55 hit /job/1 2017/09/14 01:30:55 hit /job/4 2017/09/14 01:30:55 hit /job/2 2017/09/14 01:30:55 hit /job/3 2017/09/14 01:30:55 hit /job/5 2017/09/14 01:30:56 done /job/0 2017/09/14 01:30:56 hit /job/6 2017/09/14 01:30:56 done /job/1 2017/09/14 01:30:56 done /job/2 2017/09/14 01:30:56 done /job/4 2017/09/14 01:30:56 hit /job/7 2017/09/14 01:30:56 done /job/3 2017/09/14 01:30:56 hit /job/9 2017/09/14 01:30:56 hit /job/8 2017/09/14 01:30:56 done /job/5 2017/09/14 01:30:57 done /job/6 2017/09/14 01:30:57 done /job/7 2017/09/14 01:30:57 done /job/9 2017/09/14 01:30:57 done /job/8 2017/09/14 01:30:57 done 10 jobs
Comments
Post a Comment