elasticsearch - Concurrent file parsing and inserting into Elastic Search -


i playing go , come small script parses log files , inserts them elastic search. each file spawned goroutine this:

var wg := sync.waitgroup{} wg.add(len(files)) _, file := range files {     go func(f os.fileinfo){         defer wg.done()         processfile(f.name(), config.originfilepath, config.workingfilepath, config.archivefilepath,fmt.sprintf("http://%v:%v", config.elasticsearch.host, config.elasticsearch.port),config.providerindex, config.networkdata)     }(file) } wg.wait() 

inside of processfile have function sends elastic search:

func bulkinsert(lines []string, es *elastic.client) (*elastic.response, error){     r, err := es.performrequest("post", "/_bulk", url.values{}, strings.join(lines, "\n")+"\n")     if err != nil {         return nil, err     }     return r, nil } 

the problem don't understand how goroutines work. understanding sending elastic search blocking 1 of goroutines executing. tried spawning goroutine elastic search bulk insert same approach:

waitgroup, go func(){defer wg.done(); bulkinsert(elems, es);}() , wg.wait() before function return. however, i've discovered in end not events end in elastic search. think due goroutines returning without ever sending/waiting bulk request finish.

my question is, approach problem correct? can achieve better performance?

can achieve better performance?

unclear, depends of receiver , sender capabilities.

my question is, approach problem correct?

might better understand go routines,

package main  import (     "fmt"     "log"     "net/http"     "sync"     "time" )  func main() {      addr := "127.0.0.1:2074"      srv := http.server{         addr: addr,         handler: http.handlerfunc(func(w http.responsewriter, r *http.request) {             log.println("hit ", r.url.string())             <-time.after(time.second)             log.println("done ", r.url.string())         }),     }     fail(unblock(srv.listenandserve))      jobs := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}      // case 1     // creates 10 goroutines,     // triggers 10 // concurrent queries     {         wg := sync.waitgroup{}         wg.add(len(jobs))         log.printf("starting %v jobs\n", len(jobs))         _, job := range jobs {             go func(job int) {                 defer wg.done()                 http.get(fmt.sprintf("http://%v/job/%v", addr, job))             }(job)         }         wg.wait()         log.printf("done %v jobs\n", len(jobs))     }      log.println()     log.println("=================")     log.println()      // case 2     // creates 3 goroutines,     // triggers 3 // concurrent queries     {         wg := sync.waitgroup{}         wg.add(len(jobs))         in := make(chan string)         limit := make(chan bool, 3)         log.printf("starting %v jobs\n", len(jobs))         go func() {             url := range in {                 limit <- true                 go func(url string) {                     defer wg.done()                     http.get(url)                     <-limit                 }(url)             }         }()         _, job := range jobs {             in <- fmt.sprintf("http://%v/job/%v", addr, job)         }         wg.wait()         log.printf("done %v jobs\n", len(jobs))     }      log.println()     log.println("=================")     log.println()      // case 2: rewrite     // creates 6 goroutines,     // triggers 6 // concurrent queries     {         wait, add := parallel(6)         log.printf("starting %v jobs\n", len(jobs))         _, job := range jobs {             url := fmt.sprintf("http://%v/job/%v", addr, job)             add(func() {                 http.get(url)             })         }         wait()         log.printf("done %v jobs\n", len(jobs))     } }  func parallel(c int) (func(), func(block func())) {     wg := sync.waitgroup{}     in := make(chan func())     limit := make(chan bool, c)     go func() {         block := range in {             limit <- true             go func(block func()) {                 defer wg.done()                 block()                 <-limit             }(block)         }     }()     return wg.wait, func(block func()) {         wg.add(1)         in <- block     } }  func unblock(block func() error) error {     w := make(chan error)     go func() { w <- block() }()     select {     case err := <-w:         return err     case <-time.after(time.millisecond):     }     return nil }  func fail(err error) {     if err != nil {         panic(err)     } } 

outputs

$ go run main.go  2017/09/14 01:30:50 starting 10 jobs 2017/09/14 01:30:50 hit  /job/0 2017/09/14 01:30:50 hit  /job/4 2017/09/14 01:30:50 hit  /job/5 2017/09/14 01:30:50 hit  /job/2 2017/09/14 01:30:50 hit  /job/9 2017/09/14 01:30:50 hit  /job/1 2017/09/14 01:30:50 hit  /job/3 2017/09/14 01:30:50 hit  /job/7 2017/09/14 01:30:50 hit  /job/8 2017/09/14 01:30:50 hit  /job/6 2017/09/14 01:30:51 done  /job/5 2017/09/14 01:30:51 done  /job/4 2017/09/14 01:30:51 done  /job/2 2017/09/14 01:30:51 done  /job/0 2017/09/14 01:30:51 done  /job/6 2017/09/14 01:30:51 done  /job/9 2017/09/14 01:30:51 done  /job/1 2017/09/14 01:30:51 done  /job/3 2017/09/14 01:30:51 done  /job/7 2017/09/14 01:30:51 done  /job/8 2017/09/14 01:30:51 done 10 jobs 2017/09/14 01:30:51  2017/09/14 01:30:51 ================= 2017/09/14 01:30:51  2017/09/14 01:30:51 starting 10 jobs 2017/09/14 01:30:51 hit  /job/0 2017/09/14 01:30:51 hit  /job/2 2017/09/14 01:30:51 hit  /job/1 2017/09/14 01:30:52 done  /job/2 2017/09/14 01:30:52 done  /job/0 2017/09/14 01:30:52 done  /job/1 2017/09/14 01:30:52 hit  /job/3 2017/09/14 01:30:52 hit  /job/4 2017/09/14 01:30:52 hit  /job/5 2017/09/14 01:30:53 done  /job/3 2017/09/14 01:30:53 done  /job/4 2017/09/14 01:30:53 done  /job/5 2017/09/14 01:30:53 hit  /job/6 2017/09/14 01:30:53 hit  /job/7 2017/09/14 01:30:53 hit  /job/8 2017/09/14 01:30:54 done  /job/6 2017/09/14 01:30:54 done  /job/7 2017/09/14 01:30:54 done  /job/8 2017/09/14 01:30:54 hit  /job/9 2017/09/14 01:30:55 done  /job/9 2017/09/14 01:30:55 done 10 jobs 2017/09/14 01:30:55  2017/09/14 01:30:55 ================= 2017/09/14 01:30:55  2017/09/14 01:30:55 starting 10 jobs 2017/09/14 01:30:55 hit  /job/0 2017/09/14 01:30:55 hit  /job/1 2017/09/14 01:30:55 hit  /job/4 2017/09/14 01:30:55 hit  /job/2 2017/09/14 01:30:55 hit  /job/3 2017/09/14 01:30:55 hit  /job/5 2017/09/14 01:30:56 done  /job/0 2017/09/14 01:30:56 hit  /job/6 2017/09/14 01:30:56 done  /job/1 2017/09/14 01:30:56 done  /job/2 2017/09/14 01:30:56 done  /job/4 2017/09/14 01:30:56 hit  /job/7 2017/09/14 01:30:56 done  /job/3 2017/09/14 01:30:56 hit  /job/9 2017/09/14 01:30:56 hit  /job/8 2017/09/14 01:30:56 done  /job/5 2017/09/14 01:30:57 done  /job/6 2017/09/14 01:30:57 done  /job/7 2017/09/14 01:30:57 done  /job/9 2017/09/14 01:30:57 done  /job/8 2017/09/14 01:30:57 done 10 jobs 

Comments

Popular posts from this blog

ZeroMQ on Windows, with Qt Creator -

unity3d - Unity SceneManager.LoadScene quits application -

python - Error while using APScheduler: 'NoneType' object has no attribute 'now' -