From 944699c491e4f8aff86e4e046ad46428c54de292 Mon Sep 17 00:00:00 2001 From: Joe Krause Date: Tue, 12 Sep 2017 00:49:58 -0700 Subject: [PATCH 1/6] [wip] User cache support. --- modules/_all/import.go | 1 + modules/usercache/database.go | 115 +++++++++++++++++++++++++++++++++ modules/usercache/usercache.go | 64 ++++++++++++++++++ slack/controller/team.go | 9 ++- slack/rtm/events.go | 14 ++++ slack/rtm/membership_info.go | 13 +++- 6 files changed, 213 insertions(+), 3 deletions(-) create mode 100644 modules/usercache/database.go create mode 100644 modules/usercache/usercache.go diff --git a/modules/_all/import.go b/modules/_all/import.go index 5faf6e5..8833ca7 100644 --- a/modules/_all/import.go +++ b/modules/_all/import.go @@ -15,5 +15,6 @@ import ( _ "github.com/riking/marvin/modules/restart" _ "github.com/riking/marvin/modules/rss" _ "github.com/riking/marvin/modules/timedpin" + _ "github.com/riking/marvin/modules/usercache" _ "github.com/riking/marvin/modules/weblogin" ) diff --git a/modules/usercache/database.go b/modules/usercache/database.go new file mode 100644 index 0000000..b8f34c2 --- /dev/null +++ b/modules/usercache/database.go @@ -0,0 +1,115 @@ +package usercache + +import ( + "encoding/json" + + "github.com/pkg/errors" + "github.com/riking/marvin/slack" + "github.com/riking/marvin/slack/rtm" +) + +const ( + sqlMigrate1 = `CREATE TABLE module_user_cache ( + user_id varchar(15) PRIMARY KEY NOT NULL, + data text + )` + + sqlGetAllEntries = `SELECT * FROM module_user_cache` + + // $1 = slack.UserID + sqlGetEntry = `SELECT data FROM module_user_cache WHERE user_id = $1` + + // $1 = slack.UserID + // $2 = data (json encoded) + sqlAddEntry = `INSERT INTO module_user_cache (user_id,data) VALUES ($1, $2)` + + // $1 = data (json encoded) + // $2 = slack.UserID + sqlUpdateEntry = `UPDATE module_user_cache SET data = $1 WHERE user_id = $2` +) + +func (mod *UserCacheModule) GetEntry(userid slack.UserID) (slack.User, error) { + var entry slack.User + + var data string + stmt, err := mod.team.DB().Prepare(sqlGetEntry) + if err != nil { + return entry, nil + } + defer stmt.Close() + row := stmt.QueryRow(userid) + err = row.Scan(&data) + if err != nil { + return entry, nil + } + err = json.Unmarshal([]byte(userid), &entry) + if err != nil { + return entry, nil + } + return entry, nil +} + +func (mod *UserCacheModule) LoadEntries() error { + stmt, err := mod.team.DB().Query(sqlGetAllEntries) + if err != nil { + return err + } + + defer stmt.Close() + for stmt.Next() { + var id string + var data string + var user slack.User + + err = stmt.Scan(&id, &data) + if err != nil { + return errors.Wrap(err, "error in user cache: obtaining row info") + continue + } + err = json.Unmarshal([]byte(data), &user) + if err != nil { + return errors.Wrap(err, "error in user cache: unmarshal user object") + } + rtmClient := mod.team.GetRTMClient().(*rtm.Client) + rtmClient.ReplaceUserObject(&user) + } + return stmt.Err() +} + +func (mod *UserCacheModule) UpdateEntry(userobject slack.User) error { + _, exists := mod.GetEntry(userobject.ID) + + var entrydata []byte + entrydata, err := json.Marshal(&userobject) + if err != nil { + return err + } + + var query = sqlAddEntry + if exists != nil { + query = sqlUpdateEntry + } + + stmt, err := mod.team.DB().Prepare(query) + if err != nil { + return err + } + + defer stmt.Close() + row := stmt.QueryRow(userobject.ID, entrydata) + var id slack.UserID + err = row.Scan(&id) + return err +} + +func (mod *UserCacheModule) UpdateEntries(userobjects []*slack.User) error { + for _, obj := range userobjects { + if obj != nil { + err := mod.UpdateEntry(*obj) + if err != nil { + return err + } + } + } + return nil +} diff --git a/modules/usercache/usercache.go b/modules/usercache/usercache.go new file mode 100644 index 0000000..70dad2b --- /dev/null +++ b/modules/usercache/usercache.go @@ -0,0 +1,64 @@ +package usercache + +import ( + "sync" + + "fmt" + + "github.com/riking/marvin" + "github.com/riking/marvin/slack" +) + +type API interface { + marvin.Module + + GetEntry(userid slack.UserID) (slack.User, error) + UpdateEntry(userobject slack.User) error + UpdateEntries(userobjects []*slack.User) error +} + +var _ API = &UserCacheModule{} + +// --- +func init() { + marvin.RegisterModule(NewUserCacheModule) +} + +const Identifier = "usercache" + +type UserCacheModule struct { + team marvin.Team + + cacheLock sync.Mutex + cacheMap map[slack.UserID]slack.User +} + +func NewUserCacheModule(t marvin.Team) marvin.Module { + mod := &UserCacheModule{ + team: t, + cacheMap: make(map[slack.UserID]slack.User), + } + return mod +} + +func (mod *UserCacheModule) Identifier() marvin.ModuleID { + return Identifier +} + +func (mod *UserCacheModule) Load(t marvin.Team) { + t.DB().MustMigrate(Identifier, 1505192548, sqlMigrate1) + t.DB().SyntaxCheck(sqlGetAllEntries, sqlGetEntry, sqlAddEntry, sqlUpdateEntry) +} + +func (mod *UserCacheModule) Enable(team marvin.Team) { + go func() { + err := mod.LoadEntries() + if err != nil { + fmt.Errorf("Error whilst updating entries: %s", err.Error()) + return + } + }() +} + +func (mod *UserCacheModule) Disable(t marvin.Team) { +} diff --git a/slack/controller/team.go b/slack/controller/team.go index 8022a18..07ef4f3 100644 --- a/slack/controller/team.go +++ b/slack/controller/team.go @@ -8,6 +8,7 @@ import ( "net/http" "net/url" "os" + "strconv" "strings" "sync" "time" @@ -318,7 +319,13 @@ func (t *Team) SlackAPIPostJSON(method string, form url.Values, result interface util.LogBadf("Slack API %s error: %s", method, err) util.LogBadf("Form for %s: %v", method, form) if slackResponse.SlackError == "ratelimited" { - time.Sleep(1*time.Second) + retryafter := resp.Header.Get("Retry-After") + intp, err := strconv.ParseInt(retryafter, 10, 64) + if err == nil { + time.Sleep(time.Duration(intp) * time.Second) + } else { + time.Sleep(1 * time.Second) + } } return errors.Wrapf(err, "Slack API %s", method) } diff --git a/slack/rtm/events.go b/slack/rtm/events.go index f3047a0..3b4bd42 100644 --- a/slack/rtm/events.go +++ b/slack/rtm/events.go @@ -92,11 +92,16 @@ func (c *Client) onChannelJoin(msg slack.RTMRawMessage) { func (c *Client) ReplaceUserObject(obj *slack.User) { c.MetadataLock.Lock() defer c.MetadataLock.Unlock() + cacheApi := c.team.GetModule("usercache").(userCacheAPI) obj.CacheTS = time.Now() for i, v := range c.Users { if v.ID == obj.ID { c.Users[i] = obj + + if cacheApi != nil { + cacheApi.UpdateEntry(*v) + } return } } @@ -106,6 +111,7 @@ func (c *Client) ReplaceUserObject(obj *slack.User) { func (c *Client) ReplaceManyUserObjects(objs []*slack.User) { c.MetadataLock.Lock() defer c.MetadataLock.Unlock() + cacheApi := c.team.GetModule("usercache").(userCacheAPI) now := time.Now() for ci, cv := range c.Users { @@ -114,6 +120,10 @@ func (c *Client) ReplaceManyUserObjects(objs []*slack.User) { iv.CacheTS = now c.Users[ci] = iv objs[ii] = nil + + if cacheApi != nil { + cacheApi.UpdateEntry(*iv) + } } } } @@ -121,6 +131,10 @@ func (c *Client) ReplaceManyUserObjects(objs []*slack.User) { if iv != nil { iv.CacheTS = now c.Users = append(c.Users, iv) + + if cacheApi != nil { + cacheApi.UpdateEntry(*iv) + } } } } diff --git a/slack/rtm/membership_info.go b/slack/rtm/membership_info.go index 542897d..2b05b62 100644 --- a/slack/rtm/membership_info.go +++ b/slack/rtm/membership_info.go @@ -5,6 +5,7 @@ import ( "time" "github.com/pkg/errors" + "github.com/riking/marvin" "github.com/riking/marvin/slack" "github.com/riking/marvin/util" ) @@ -16,6 +17,14 @@ type membershipRequest struct { C chan interface{} } +type userCacheAPI interface { + marvin.Module + + GetEntry(userid slack.UserID) (slack.User, error) + UpdateEntry(userobject slack.User) error + UpdateEntries(userobjects []*slack.User) error +} + func (c *Client) membershipWorker() { for req := range c.membershipCh { req.C <- req.F(c.channelMembers) @@ -167,13 +176,13 @@ func (c *Client) fillUsersList() { for response.PageInfo.NextCursor != "" { c.ReplaceManyUserObjects(response.Members) - time.Sleep(2*time.Second) + time.Sleep(2 * time.Second) form.Set("cursor", response.PageInfo.NextCursor) err := c.team.SlackAPIPostJSON("users.list", form, &response) if err != nil { util.LogError(errors.Wrapf(err, "[%s] Could not retrieve users list", c.Team.Domain)) - break + continue } } } From dcb4b4005b78386ec139051e351f17b7c3273084 Mon Sep 17 00:00:00 2001 From: Joe Krause Date: Tue, 12 Sep 2017 01:25:01 -0700 Subject: [PATCH 2/6] Actually make sure it will continue to fill users if the user cache is disabled. --- slack/rtm/events.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/slack/rtm/events.go b/slack/rtm/events.go index 3b4bd42..58d1542 100644 --- a/slack/rtm/events.go +++ b/slack/rtm/events.go @@ -92,7 +92,12 @@ func (c *Client) onChannelJoin(msg slack.RTMRawMessage) { func (c *Client) ReplaceUserObject(obj *slack.User) { c.MetadataLock.Lock() defer c.MetadataLock.Unlock() - cacheApi := c.team.GetModule("usercache").(userCacheAPI) + + var cacheApi userCacheAPI + moduleCacheApi := c.team.GetModule("usercache") + if moduleCacheApi != nil { + cacheApi = moduleCacheApi.(userCacheAPI) + } obj.CacheTS = time.Now() for i, v := range c.Users { @@ -111,7 +116,12 @@ func (c *Client) ReplaceUserObject(obj *slack.User) { func (c *Client) ReplaceManyUserObjects(objs []*slack.User) { c.MetadataLock.Lock() defer c.MetadataLock.Unlock() - cacheApi := c.team.GetModule("usercache").(userCacheAPI) + + var cacheApi userCacheAPI + moduleCacheApi := c.team.GetModule("usercache") + if moduleCacheApi != nil { + cacheApi = moduleCacheApi.(userCacheAPI) + } now := time.Now() for ci, cv := range c.Users { From d5ef49abd797ebc3d3081445fb1363fe20ee5c34 Mon Sep 17 00:00:00 2001 From: Joe Krause Date: Tue, 12 Sep 2017 01:31:20 -0700 Subject: [PATCH 3/6] Fix sql migration query formatting --- modules/usercache/database.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/modules/usercache/database.go b/modules/usercache/database.go index b8f34c2..662e06e 100644 --- a/modules/usercache/database.go +++ b/modules/usercache/database.go @@ -10,9 +10,9 @@ import ( const ( sqlMigrate1 = `CREATE TABLE module_user_cache ( - user_id varchar(15) PRIMARY KEY NOT NULL, - data text - )` + user_id varchar(15) PRIMARY KEY NOT NULL, + data text + )` sqlGetAllEntries = `SELECT * FROM module_user_cache` From b098afa026ca3e663bf7c09913498ba06b47d219 Mon Sep 17 00:00:00 2001 From: Joe Krause Date: Wed, 13 Sep 2017 16:38:34 -0700 Subject: [PATCH 4/6] Fixed performance issues identified. --- modules/usercache/database.go | 71 +++++++++++++++++----------------- modules/usercache/usercache.go | 18 ++++----- slack/rtm/events.go | 19 +++------ slack/rtm/membership_info.go | 8 ++-- 4 files changed, 52 insertions(+), 64 deletions(-) diff --git a/modules/usercache/database.go b/modules/usercache/database.go index 662e06e..87a968e 100644 --- a/modules/usercache/database.go +++ b/modules/usercache/database.go @@ -2,8 +2,8 @@ package usercache import ( "encoding/json" + "fmt" - "github.com/pkg/errors" "github.com/riking/marvin/slack" "github.com/riking/marvin/slack/rtm" ) @@ -12,6 +12,8 @@ const ( sqlMigrate1 = `CREATE TABLE module_user_cache ( user_id varchar(15) PRIMARY KEY NOT NULL, data text + + UNIQUE(user_id) )` sqlGetAllEntries = `SELECT * FROM module_user_cache` @@ -21,11 +23,8 @@ const ( // $1 = slack.UserID // $2 = data (json encoded) - sqlAddEntry = `INSERT INTO module_user_cache (user_id,data) VALUES ($1, $2)` - - // $1 = data (json encoded) - // $2 = slack.UserID - sqlUpdateEntry = `UPDATE module_user_cache SET data = $1 WHERE user_id = $2` + sqlUpsertEntry = `INSERT INTO module_user_cache (user_id,data) VALUES ($1, $2) + ON CONFLICT (user_id) DO UPDATE SET data = EXCLUDED.data` ) func (mod *UserCacheModule) GetEntry(userid slack.UserID) (slack.User, error) { @@ -55,59 +54,59 @@ func (mod *UserCacheModule) LoadEntries() error { return err } + rtmClient := mod.team.GetRTMClient().(*rtm.Client) + defer stmt.Close() + var arr = make([]*slack.User, 200) for stmt.Next() { var id string var data string - var user slack.User + var user *slack.User = &slack.User{} err = stmt.Scan(&id, &data) if err != nil { - return errors.Wrap(err, "error in user cache: obtaining row info") - continue + return err } - err = json.Unmarshal([]byte(data), &user) + err = json.Unmarshal([]byte(data), user) if err != nil { - return errors.Wrap(err, "error in user cache: unmarshal user object") + return err + } + arr = append(arr, user) + if len(arr) >= 199 { + go rtmClient.ReplaceManyUserObjects(arr, false) + arr = make([]*slack.User, 200) } - rtmClient := mod.team.GetRTMClient().(*rtm.Client) - rtmClient.ReplaceUserObject(&user) } + if len(arr) >= 0 { + go rtmClient.ReplaceManyUserObjects(arr, false) + arr = nil + } + return stmt.Err() } -func (mod *UserCacheModule) UpdateEntry(userobject slack.User) error { - _, exists := mod.GetEntry(userobject.ID) - - var entrydata []byte - entrydata, err := json.Marshal(&userobject) - if err != nil { - return err - } - - var query = sqlAddEntry - if exists != nil { - query = sqlUpdateEntry - } +func (mod *UserCacheModule) UpdateEntry(userobject *slack.User) error { + var objarray = make([]*slack.User, 1) + objarray[0] = userobject + return mod.UpdateEntries(objarray) +} - stmt, err := mod.team.DB().Prepare(query) +func (mod *UserCacheModule) UpdateEntries(userobjects []*slack.User) error { + stmt, err := mod.team.DB().Prepare(sqlUpsertEntry) if err != nil { return err } defer stmt.Close() - row := stmt.QueryRow(userobject.ID, entrydata) - var id slack.UserID - err = row.Scan(&id) - return err -} -func (mod *UserCacheModule) UpdateEntries(userobjects []*slack.User) error { for _, obj := range userobjects { if obj != nil { - err := mod.UpdateEntry(*obj) - if err != nil { - return err + entrydata, err := json.Marshal(obj) + if err == nil { + _, err := stmt.Exec(obj.ID, entrydata) + if err != nil { + return err + } } } } diff --git a/modules/usercache/usercache.go b/modules/usercache/usercache.go index 70dad2b..163754e 100644 --- a/modules/usercache/usercache.go +++ b/modules/usercache/usercache.go @@ -1,19 +1,19 @@ package usercache import ( - "sync" - "fmt" "github.com/riking/marvin" "github.com/riking/marvin/slack" ) +// interface duplicated in rtm package type API interface { marvin.Module GetEntry(userid slack.UserID) (slack.User, error) - UpdateEntry(userobject slack.User) error + LoadEntries() error + UpdateEntry(userobject *slack.User) error UpdateEntries(userobjects []*slack.User) error } @@ -28,15 +28,11 @@ const Identifier = "usercache" type UserCacheModule struct { team marvin.Team - - cacheLock sync.Mutex - cacheMap map[slack.UserID]slack.User } func NewUserCacheModule(t marvin.Team) marvin.Module { mod := &UserCacheModule{ - team: t, - cacheMap: make(map[slack.UserID]slack.User), + team: t, } return mod } @@ -47,16 +43,18 @@ func (mod *UserCacheModule) Identifier() marvin.ModuleID { func (mod *UserCacheModule) Load(t marvin.Team) { t.DB().MustMigrate(Identifier, 1505192548, sqlMigrate1) - t.DB().SyntaxCheck(sqlGetAllEntries, sqlGetEntry, sqlAddEntry, sqlUpdateEntry) + t.DB().SyntaxCheck(sqlGetAllEntries, sqlGetEntry, sqlUpsertEntry) } func (mod *UserCacheModule) Enable(team marvin.Team) { go func() { + fmt.Printf("Loading cache entries....\n") err := mod.LoadEntries() if err != nil { - fmt.Errorf("Error whilst updating entries: %s", err.Error()) + fmt.Printf("Error whilst updating entries: %s\n", err.Error()) return } + fmt.Printf("Loaded all entries from the cache.\n") }() } diff --git a/slack/rtm/events.go b/slack/rtm/events.go index 58d1542..c1f9272 100644 --- a/slack/rtm/events.go +++ b/slack/rtm/events.go @@ -97,32 +97,31 @@ func (c *Client) ReplaceUserObject(obj *slack.User) { moduleCacheApi := c.team.GetModule("usercache") if moduleCacheApi != nil { cacheApi = moduleCacheApi.(userCacheAPI) + cacheApi.UpdateEntry(obj) } obj.CacheTS = time.Now() for i, v := range c.Users { if v.ID == obj.ID { c.Users[i] = obj - - if cacheApi != nil { - cacheApi.UpdateEntry(*v) - } return } } c.Users = append(c.Users, obj) } -func (c *Client) ReplaceManyUserObjects(objs []*slack.User) { +func (c *Client) ReplaceManyUserObjects(objs []*slack.User, updateCache bool) { c.MetadataLock.Lock() defer c.MetadataLock.Unlock() var cacheApi userCacheAPI moduleCacheApi := c.team.GetModule("usercache") - if moduleCacheApi != nil { + if moduleCacheApi != nil && updateCache { cacheApi = moduleCacheApi.(userCacheAPI) + cacheApi.UpdateEntries(objs) } + now := time.Now() for ci, cv := range c.Users { for ii, iv := range objs { @@ -130,10 +129,6 @@ func (c *Client) ReplaceManyUserObjects(objs []*slack.User) { iv.CacheTS = now c.Users[ci] = iv objs[ii] = nil - - if cacheApi != nil { - cacheApi.UpdateEntry(*iv) - } } } } @@ -141,10 +136,6 @@ func (c *Client) ReplaceManyUserObjects(objs []*slack.User) { if iv != nil { iv.CacheTS = now c.Users = append(c.Users, iv) - - if cacheApi != nil { - cacheApi.UpdateEntry(*iv) - } } } } diff --git a/slack/rtm/membership_info.go b/slack/rtm/membership_info.go index 2b05b62..5c51592 100644 --- a/slack/rtm/membership_info.go +++ b/slack/rtm/membership_info.go @@ -20,8 +20,7 @@ type membershipRequest struct { type userCacheAPI interface { marvin.Module - GetEntry(userid slack.UserID) (slack.User, error) - UpdateEntry(userobject slack.User) error + UpdateEntry(userobject *slack.User) error UpdateEntries(userobjects []*slack.User) error } @@ -174,16 +173,17 @@ func (c *Client) fillUsersList() { util.LogError(errors.Wrapf(err, "[%s] Could not retrieve users list", c.Team.Domain)) } + c.ReplaceManyUserObjects(response.Members, true) + for response.PageInfo.NextCursor != "" { - c.ReplaceManyUserObjects(response.Members) time.Sleep(2 * time.Second) - form.Set("cursor", response.PageInfo.NextCursor) err := c.team.SlackAPIPostJSON("users.list", form, &response) if err != nil { util.LogError(errors.Wrapf(err, "[%s] Could not retrieve users list", c.Team.Domain)) continue } + c.ReplaceManyUserObjects(response.Members, true) } } From bb2eecca960d5f4476cdceaf415fa326be185c7f Mon Sep 17 00:00:00 2001 From: Joe Krause Date: Fri, 15 Sep 2017 13:02:52 -0700 Subject: [PATCH 5/6] Made changes as requested --- modules/usercache/database.go | 17 +++++++---------- modules/usercache/usercache.go | 31 +++++++++++++++++++++++++++++-- slack/controller/channel_info.go | 9 ++++++++- slack/rtm/events.go | 11 +++++------ slack/rtm/membership_info.go | 3 +-- 5 files changed, 50 insertions(+), 21 deletions(-) diff --git a/modules/usercache/database.go b/modules/usercache/database.go index 87a968e..29ab92c 100644 --- a/modules/usercache/database.go +++ b/modules/usercache/database.go @@ -2,7 +2,6 @@ package usercache import ( "encoding/json" - "fmt" "github.com/riking/marvin/slack" "github.com/riking/marvin/slack/rtm" @@ -61,24 +60,24 @@ func (mod *UserCacheModule) LoadEntries() error { for stmt.Next() { var id string var data string - var user *slack.User = &slack.User{} + var user *slack.User err = stmt.Scan(&id, &data) if err != nil { return err } - err = json.Unmarshal([]byte(data), user) + err = json.Unmarshal([]byte(data), &user) if err != nil { - return err + continue } arr = append(arr, user) if len(arr) >= 199 { - go rtmClient.ReplaceManyUserObjects(arr, false) - arr = make([]*slack.User, 200) + rtmClient.ReplaceManyUserObjects(arr, false) + arr = arr[:0] } } if len(arr) >= 0 { - go rtmClient.ReplaceManyUserObjects(arr, false) + rtmClient.ReplaceManyUserObjects(arr, false) arr = nil } @@ -86,9 +85,7 @@ func (mod *UserCacheModule) LoadEntries() error { } func (mod *UserCacheModule) UpdateEntry(userobject *slack.User) error { - var objarray = make([]*slack.User, 1) - objarray[0] = userobject - return mod.UpdateEntries(objarray) + return mod.UpdateEntries([]*slack.User{userobject}) } func (mod *UserCacheModule) UpdateEntries(userobjects []*slack.User) error { diff --git a/modules/usercache/usercache.go b/modules/usercache/usercache.go index 163754e..4c795e3 100644 --- a/modules/usercache/usercache.go +++ b/modules/usercache/usercache.go @@ -2,9 +2,12 @@ package usercache import ( "fmt" + "strconv" + "time" "github.com/riking/marvin" "github.com/riking/marvin/slack" + "github.com/riking/marvin/slack/rtm" ) // interface duplicated in rtm package @@ -44,19 +47,43 @@ func (mod *UserCacheModule) Identifier() marvin.ModuleID { func (mod *UserCacheModule) Load(t marvin.Team) { t.DB().MustMigrate(Identifier, 1505192548, sqlMigrate1) t.DB().SyntaxCheck(sqlGetAllEntries, sqlGetEntry, sqlUpsertEntry) + t.ModuleConfig(Identifier).Add("last-timestamp", "0") + t.ModuleConfig(Identifier).Add("delay", (72 * time.Hour).String()) } func (mod *UserCacheModule) Enable(team marvin.Team) { go func() { - fmt.Printf("Loading cache entries....\n") + fmt.Printf("Loading user cache entries....\n") err := mod.LoadEntries() if err != nil { fmt.Printf("Error whilst updating entries: %s\n", err.Error()) return } - fmt.Printf("Loaded all entries from the cache.\n") + + fmt.Printf("Loaded all entries from the user cache.\n") + go mod.UpdateTask() }() } func (mod *UserCacheModule) Disable(t marvin.Team) { } + +func (mod *UserCacheModule) UpdateTask() { + rtmClient := mod.team.GetRTMClient().(*rtm.Client) + + for { + timestr, _, _ := mod.team.ModuleConfig(Identifier).GetIsDefault("last-timestamp") + delaystr, _, _ := mod.team.ModuleConfig(Identifier).GetIsDefault("delay") + timeint, _ := strconv.ParseInt(timestr, 10, 64) + var timeres = time.Unix(timeint, 0) + delayres, err := time.ParseDuration(delaystr) + fmt.Printf("pls") + + if err != nil || timeres.Before(time.Now().Add(-delayres)) { + fmt.Printf("Repolling user list....\n") + go rtmClient.FillUsersList() + err = mod.team.ModuleConfig(Identifier).Set("last-timestamp", strconv.FormatInt(time.Now().Unix(), 10)) + } + time.Sleep(1 * time.Hour) + } +} diff --git a/slack/controller/channel_info.go b/slack/controller/channel_info.go index 241b289..71a339a 100644 --- a/slack/controller/channel_info.go +++ b/slack/controller/channel_info.go @@ -189,8 +189,15 @@ func (t *Team) cachedUserInfo(user slack.UserID) *slack.User { uID := slack.ParseUserMention(string(user)) if uID != "" { - go t.updateUserInfo(user) + // XXX HACK to get around unlocking. + t.client.MetadataLock.RUnlock() + defer t.client.MetadataLock.RLock() + user, err := t.updateUserInfo(user) + if err == nil { + return user + } } + return nil } diff --git a/slack/rtm/events.go b/slack/rtm/events.go index c1f9272..bd0f047 100644 --- a/slack/rtm/events.go +++ b/slack/rtm/events.go @@ -90,9 +90,6 @@ func (c *Client) onChannelJoin(msg slack.RTMRawMessage) { } func (c *Client) ReplaceUserObject(obj *slack.User) { - c.MetadataLock.Lock() - defer c.MetadataLock.Unlock() - var cacheApi userCacheAPI moduleCacheApi := c.team.GetModule("usercache") if moduleCacheApi != nil { @@ -100,6 +97,9 @@ func (c *Client) ReplaceUserObject(obj *slack.User) { cacheApi.UpdateEntry(obj) } + c.MetadataLock.Lock() + defer c.MetadataLock.Unlock() + obj.CacheTS = time.Now() for i, v := range c.Users { if v.ID == obj.ID { @@ -111,9 +111,6 @@ func (c *Client) ReplaceUserObject(obj *slack.User) { } func (c *Client) ReplaceManyUserObjects(objs []*slack.User, updateCache bool) { - c.MetadataLock.Lock() - defer c.MetadataLock.Unlock() - var cacheApi userCacheAPI moduleCacheApi := c.team.GetModule("usercache") if moduleCacheApi != nil && updateCache { @@ -121,6 +118,8 @@ func (c *Client) ReplaceManyUserObjects(objs []*slack.User, updateCache bool) { cacheApi.UpdateEntries(objs) } + c.MetadataLock.Lock() + defer c.MetadataLock.Unlock() now := time.Now() for ci, cv := range c.Users { diff --git a/slack/rtm/membership_info.go b/slack/rtm/membership_info.go index 5c51592..08efd59 100644 --- a/slack/rtm/membership_info.go +++ b/slack/rtm/membership_info.go @@ -149,13 +149,12 @@ func (c *Client) ListIMs() []*slack.ChannelIM { func (c *Client) fetchTeamInfo() { go c.fillGroupList() - go c.fillUsersList() // TODO(kyork): list normal channels too // TODO(kyork): use the listChannels() from logger module } -func (c *Client) fillUsersList() { +func (c *Client) FillUsersList() { var response struct { slack.APIResponse Members []*slack.User From 6d02953484d6754441bdbbb36cb8f1d1104227ed Mon Sep 17 00:00:00 2001 From: Joe Krause Date: Fri, 15 Sep 2017 13:04:12 -0700 Subject: [PATCH 6/6] Oops. --- modules/usercache/usercache.go | 1 - 1 file changed, 1 deletion(-) diff --git a/modules/usercache/usercache.go b/modules/usercache/usercache.go index 4c795e3..3d6d568 100644 --- a/modules/usercache/usercache.go +++ b/modules/usercache/usercache.go @@ -77,7 +77,6 @@ func (mod *UserCacheModule) UpdateTask() { timeint, _ := strconv.ParseInt(timestr, 10, 64) var timeres = time.Unix(timeint, 0) delayres, err := time.ParseDuration(delaystr) - fmt.Printf("pls") if err != nil || timeres.Before(time.Now().Add(-delayres)) { fmt.Printf("Repolling user list....\n")