diff --git a/aggregator/clean.go b/aggregator/clean.go deleted file mode 100644 index d93e79ffa9fc8d7b52bd2db475ee4f1aca47ee35..0000000000000000000000000000000000000000 --- a/aggregator/clean.go +++ /dev/null @@ -1,33 +0,0 @@ -package aggregator - -import ( - "sync" - "time" -) - -func clean() { - t := getTime() % (bucket * 2) - for i := (t + 2) % (bucket * 2); i != (t+bucket/2)%(bucket*2); i = (i + 1) % (bucket * 2) { - m, ok := data.Load(i) - if !ok { - continue - } - timeMap, ok := m.(*sync.Map) - if !ok { - continue - } - timeMap.Range(func(key, _ interface{}) bool { - timeMap.Delete(key) - return true - }) - } -} - -func init() { - go func() { - for { - time.Sleep(time.Second * time.Duration(bucket/1000/5)) - clean() - } - }() -} diff --git a/aggregator/collect.go b/aggregator/collect.go index 538d246818760ddedafdc6c8b34ceeaf3efbea44..afa8e2d703a067ea0e36626e0bcd67a96c359e48 100644 --- a/aggregator/collect.go +++ b/aggregator/collect.go @@ -1,43 +1,24 @@ package aggregator import ( - "flag" "git.sch.bme.hu/mikewashere/matrix-backend/metrics" "sync" "time" ) -var bucketFlag = flag.Int64("bucket", 10000, "Bucket size") - -var bucket int64 = 10000 - -var _ = func() error { - flag.Parse() - bucket = *bucketFlag - return nil -}() - -func getTime() int64 { - return time.Now().UnixNano() / 1e6 +type Choice struct { + Choice string + Timestamp time.Time } -// data[getTime][user] = choice +// data[user] = choice var data sync.Map -func init() { - for i := int64(0); i < bucket*2; i++ { - data.Store(i, &sync.Map{}) - } -} - -func Vote(UserID, Choice string) { +func Vote(UserID, choice string) { go metrics.Votes.Inc() - t := getTime() % (bucket * 2) - timeMap, ok := data.Load(t) - if ok { - timeMap, ok := timeMap.(*sync.Map) - if ok { - timeMap.Store(UserID, Choice) - } + c := Choice{ + Choice: choice, + Timestamp: time.Now(), } + data.Store(UserID, c) } diff --git a/aggregator/serve.go b/aggregator/serve.go index 32e548dece1ccf5ced913acaa5c892202b37be9d..c9f6b555ae75784421b65241ea1238b16f6e07bf 100644 --- a/aggregator/serve.go +++ b/aggregator/serve.go @@ -1,44 +1,53 @@ package aggregator import ( + "flag" "git.sch.bme.hu/mikewashere/matrix-backend/db" "git.sch.bme.hu/mikewashere/matrix-backend/metrics" "sync" + "time" ) -func GetCurrentChoices() (map[string]map[string]int, error) { +var stale = flag.Int("stale", 10000, "the time in milliseconds after a vote becomes stale, or not counted") + +var staleDuration time.Duration + +func init() { + staleDuration = time.Duration(*stale) * time.Millisecond +} + +func GetCurrentChoices() (map[string]*map[string]int, error) { + t := time.Now() go metrics.Requests.Inc() - t := getTime() % (bucket * 2) - ret := make(map[string]map[string]int) + ret := make(map[string]*map[string]int) teams, e := db.GetTeamsWithMembers() if e != nil { return nil, e } + var wg sync.WaitGroup for _, team := range teams { + wg.Add(1) teamMap := make(map[string]int) - for _, u := range team.Members { - for i := t; i != (t+bucket*2-bucket)%(bucket*2); i = (i + bucket*2 - 1) % (bucket * 2) { - m, ok := data.Load(i) + ret[team.ID] = &teamMap + go func(teamMap *map[string]int, team *db.Team) { + for _, u := range team.Members { + c, ok := data.Load(u.ID) if ok { - m, ok := m.(*sync.Map) + c, ok := c.(Choice) if ok { - d, ok := m.Load(u.ID) - if ok { - d, ok := d.(string) - if ok { - teamMap[d] = teamMap[d] + 1 - } - break + if t.Sub(c.Timestamp) < staleDuration { + (*teamMap)[c.Choice] = (*teamMap)[c.Choice] + 1 } } } } - } - ret[team.ID] = teamMap + wg.Done() + }(&teamMap, team) } + wg.Wait() return ret, nil } diff --git a/bench/main.go b/bench/main.go index cc37737ddd67ac504aaf96f9e451c4cb9a7f7cc2..34a5e6dfa372e37550332638f5a5a33628e3aa2a 100644 --- a/bench/main.go +++ b/bench/main.go @@ -29,7 +29,7 @@ func (s *stringSlice) RandElement() string { } var ( - loc = flag.String("locations", "http://localhost:8080", "location of server") + loc = flag.String("location", "http://localhost:8080", "location of server") prod = flag.Int("producers", 100, "number of producers") cons = flag.Int("consumers", 1, "number of consumers") prodInt = flag.Int("prod-interval", 1000, "interval of clicks in ms") diff --git a/main.go b/main.go index 6952d807201301a9d99a2b8c96c8c42b83d5497e..0f56902a7ccaedb8c581ca4dbe97c41313524165 100644 --- a/main.go +++ b/main.go @@ -1,6 +1,7 @@ package main import ( + "flag" "git.sch.bme.hu/mikewashere/matrix-backend/input" "git.sch.bme.hu/mikewashere/matrix-backend/output" "github.com/igm/sockjs-go/v3/sockjs" @@ -10,6 +11,8 @@ import ( ) func main() { + flag.Parse() + mux := http.NewServeMux() opts := sockjs.DefaultOptions opts.RawWebsocket = true