realtime_lua.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. // SPDX-FileCopyrightText: Adam Evyčędo
  2. //
  3. // SPDX-License-Identifier: AGPL-3.0-or-later
  4. package traffic
  5. import (
  6. "encoding/json"
  7. "fmt"
  8. "log"
  9. "net/http"
  10. "os"
  11. "path/filepath"
  12. "time"
  13. "github.com/cjoudrey/gluahttp"
  14. "github.com/yuin/gopher-lua"
  15. luajson "layeh.com/gopher-json"
  16. )
  17. // ..................... feedID stopID
  18. var lastUpdatedLua = map[string]map[RealtimeFeedType]map[string]uint64{}
  19. func (e LuaError) Error() string {
  20. return e.Message
  21. }
  22. func isLuaUpdatesScript(context Context) bool {
  23. _, err := os.Stat(getLuaUpdatesPath(context))
  24. return err == nil
  25. }
  26. func isLuaVehiclesScript(context Context) bool {
  27. _, err := os.Stat(getLuaVehiclesPath(context))
  28. return err == nil
  29. }
  30. func isLuaAlertsScript(context Context) bool {
  31. _, err := os.Stat(getLuaAlertsPath(context))
  32. return err == nil
  33. }
  34. func getLuaUpdatesPath(context Context) string {
  35. return filepath.Join(context.DataHome, context.FeedID, string(context.Version), "updates.lua")
  36. }
  37. func getLuaVehiclesPath(context Context) string {
  38. return filepath.Join(context.DataHome, context.FeedID, string(context.Version), "vehicles.lua")
  39. }
  40. func getLuaAlertsPath(context Context) string {
  41. return filepath.Join(context.DataHome, context.FeedID, string(context.Version), "alerts.lua")
  42. }
  43. func getLuaRealtimeUpdates(_ string, _ int, stopID, stopCode string, ctx Context) (map[string][]Update, map[string][]Alert, bool, error) {
  44. luaUpdates := LuaUpdates{}
  45. luaError := LuaError{}
  46. filePath := getLuaUpdatesPath(ctx)
  47. now := uint64(time.Now().Unix())
  48. if lastUpdatedLua[ctx.FeedID] == nil {
  49. lastUpdatedLua[ctx.FeedID] = map[RealtimeFeedType]map[string]uint64{}
  50. }
  51. if lastUpdatedLua[ctx.FeedID][TRIP_UPDATES] == nil {
  52. lastUpdatedLua[ctx.FeedID][TRIP_UPDATES] = map[string]uint64{}
  53. }
  54. if passed := now - lastUpdatedLua[ctx.FeedID][TRIP_UPDATES][stopID]; passed < 30 {
  55. // TODO return from cache
  56. }
  57. l := lua.NewState()
  58. defer l.Close()
  59. l.PreloadModule("json", luajson.Loader)
  60. l.PreloadModule("http", gluahttp.NewHttpModule(&http.Client{}).Loader)
  61. if err := l.DoFile(filePath); err != nil {
  62. return map[string][]Update{}, map[string][]Alert{}, true, BlockingError{fmt.Errorf("while executing lua script: %w", err)}
  63. }
  64. if err := l.CallByParam(lua.P{
  65. Fn: l.GetGlobal("getUpdates"),
  66. NRet: 2,
  67. Protect: true,
  68. }, lua.LString(""), lua.LNumber(0), lua.LString(stopID), lua.LString(stopCode)); err != nil {
  69. return map[string][]Update{}, map[string][]Alert{}, true, fmt.Errorf("while executing updates function: %w", err)
  70. }
  71. luaErr := l.Get(-1)
  72. l.Pop(1)
  73. result := l.Get(-1)
  74. l.Pop(1)
  75. if luaErr.(lua.LString) != "" {
  76. err := json.Unmarshal([]byte(luaErr.(lua.LString)), &luaError)
  77. if err != nil {
  78. return map[string][]Update{}, map[string][]Alert{}, true, BlockingError{fmt.Errorf("while unmarshalling error '%s': %w", luaErr.(lua.LString), err)}
  79. }
  80. if luaError.WillNextRequestFail || luaError.HttpResponseCode == 429 {
  81. err = BlockingError{luaError}
  82. } else {
  83. err = luaError
  84. }
  85. return map[string][]Update{}, map[string][]Alert{}, true, fmt.Errorf("in updates function: %w", err)
  86. }
  87. err := json.Unmarshal([]byte(result.(lua.LString)), &luaUpdates)
  88. if err != nil {
  89. return map[string][]Update{}, map[string][]Alert{}, true, BlockingError{fmt.Errorf("while unmarshalling updates : %w", err)}
  90. }
  91. delete(luaUpdates.Updates, "")
  92. delete(luaUpdates.Alerts, "")
  93. cacheMx.Lock()
  94. if updates == nil {
  95. updates = map[string]map[string][]Update{}
  96. }
  97. if updates[ctx.FeedID] == nil {
  98. updates[ctx.FeedID] = map[string][]Update{}
  99. }
  100. if !luaUpdates.AreTripsInTimetable {
  101. updates[ctx.FeedID][stopCode] = []Update{}
  102. }
  103. for tripID, update := range luaUpdates.Updates {
  104. cacheKey := tripID
  105. if !luaUpdates.AreTripsInTimetable {
  106. cacheKey = stopCode
  107. }
  108. update.VehicleStatus.TripID = tripID
  109. updates[ctx.FeedID][cacheKey] = append(updates[ctx.FeedID][cacheKey], update)
  110. }
  111. i := 0
  112. if alerts == nil {
  113. alerts = map[string]Alerts{}
  114. }
  115. currentAlerts := alerts[ctx.FeedID]
  116. // TODO single alert in array must be expired
  117. currentAlerts.Alerts = []Alert{}
  118. resultAlerts := map[string][]Alert{}
  119. for tripID, luaAlerts := range luaUpdates.Alerts {
  120. currentAlerts.ByTrip = map[string][]uint{}
  121. for _, luaAlert := range luaAlerts {
  122. a, err := LuaAlertToAlert(luaAlert)
  123. if err != nil {
  124. log.Printf("while converting alert: %v\n", err)
  125. continue
  126. }
  127. currentAlerts.Alerts = append(currentAlerts.Alerts, a)
  128. currentAlerts.ByTrip[tripID] = append(currentAlerts.ByTrip[tripID], uint(i))
  129. resultAlerts[tripID] = append(resultAlerts[tripID], a)
  130. i++
  131. }
  132. }
  133. alerts[ctx.FeedID] = currentAlerts
  134. resultUpdates := updates[ctx.FeedID]
  135. cacheMx.Unlock()
  136. return resultUpdates, resultAlerts, luaUpdates.AreTripsInTimetable, nil
  137. }
  138. func getLuaRealtimeVehiclesMap(ctx Context, lb, rt Position) (map[string]VehicleStatus, error) {
  139. luaError := LuaError{}
  140. statuses := map[string]VehicleStatus{}
  141. filePath := getLuaVehiclesPath(ctx)
  142. now := uint64(time.Now().Unix())
  143. if lastUpdatedLua[ctx.FeedID] == nil {
  144. lastUpdatedLua[ctx.FeedID] = map[RealtimeFeedType]map[string]uint64{}
  145. }
  146. if lastUpdatedLua[ctx.FeedID][VEHICLE_POSITIONS] == nil {
  147. lastUpdatedLua[ctx.FeedID][VEHICLE_POSITIONS] = map[string]uint64{}
  148. }
  149. if vehicleStatuses == nil {
  150. cacheMx.Lock()
  151. vehicleStatuses = map[string]map[string]VehicleStatus{}
  152. cacheMx.Unlock()
  153. }
  154. if passed := now - lastUpdatedLua[ctx.FeedID][VEHICLE_POSITIONS][""]; passed < 30 {
  155. cacheMx.Lock()
  156. resultVehicleStatuses := vehicleStatuses[ctx.FeedID]
  157. cacheMx.Unlock()
  158. return resultVehicleStatuses, nil
  159. }
  160. l := lua.NewState()
  161. defer l.Close()
  162. l.PreloadModule("json", luajson.Loader)
  163. l.PreloadModule("http", gluahttp.NewHttpModule(&http.Client{}).Loader)
  164. if err := l.DoFile(filePath); err != nil {
  165. return statuses, fmt.Errorf("while executing lua script: %w", err)
  166. }
  167. if err := l.CallByParam(lua.P{
  168. Fn: l.GetGlobal("getVehicles"),
  169. NRet: 2,
  170. Protect: true,
  171. }, lua.LNumber(rt.Lat), lua.LNumber(lb.Lon), lua.LNumber(lb.Lat), lua.LNumber(rt.Lon)); err != nil {
  172. return statuses, fmt.Errorf("while executing vehicles function: %w", err)
  173. }
  174. luaErr := l.Get(-1)
  175. l.Pop(1)
  176. result := l.Get(-1)
  177. l.Pop(1)
  178. if luaErr.(lua.LString) != "" {
  179. err := json.Unmarshal([]byte(luaErr.(lua.LString)), &luaError)
  180. if err != nil {
  181. return statuses, BlockingError{fmt.Errorf("while unmarshalling error '%s': %w", luaErr.(lua.LString), err)}
  182. }
  183. if luaError.WillNextRequestFail || luaError.HttpResponseCode == 429 {
  184. err = BlockingError{luaError}
  185. } else {
  186. err = luaError
  187. }
  188. return statuses, fmt.Errorf("in updates function: %w", err)
  189. }
  190. json.Unmarshal([]byte(result.(lua.LString)), &statuses)
  191. cacheMx.Lock()
  192. vehicleStatuses[ctx.FeedID] = statuses
  193. cacheMx.Unlock()
  194. return statuses, nil
  195. }
  196. func getLuaRealtimeVehicles(ctx Context, lb, rt Position) ([]VehicleStatus, error) {
  197. statusesMap, err := getLuaRealtimeVehiclesMap(ctx, lb, rt)
  198. if err != nil {
  199. return []VehicleStatus{}, err
  200. }
  201. statuses := make([]VehicleStatus, len(statusesMap))
  202. i := 0
  203. for _, status := range statusesMap {
  204. statuses[i] = status
  205. i++
  206. }
  207. return statuses, nil
  208. }
  209. func getLuaRealtimeAlerts(stopID, stopCode, tripID string, ctx Context, t *Traffic) ([]Alert, error) {
  210. luaError := LuaError{}
  211. resultAlerts := []Alert{}
  212. filePath := getLuaAlertsPath(ctx)
  213. now := uint64(time.Now().Unix())
  214. if lastUpdatedLua[ctx.FeedID] == nil {
  215. lastUpdatedLua[ctx.FeedID] = map[RealtimeFeedType]map[string]uint64{}
  216. }
  217. if lastUpdatedLua[ctx.FeedID][ALERTS] == nil {
  218. lastUpdatedLua[ctx.FeedID][ALERTS] = map[string]uint64{}
  219. }
  220. if passed := now - lastUpdatedLua[ctx.FeedID][ALERTS][""]; passed < 30 {
  221. // TODO return from cache
  222. }
  223. var line Line
  224. if tripID != "" {
  225. trip, err := GetTrip(tripID, ctx, t)
  226. if err != nil {
  227. return []Alert{}, fmt.Errorf("while getting trip: %w", err)
  228. }
  229. line, err = GetLine(trip.LineID, ctx, t)
  230. if err != nil {
  231. return []Alert{}, fmt.Errorf("while getting line: %w", err)
  232. }
  233. }
  234. l := lua.NewState()
  235. defer l.Close()
  236. l.PreloadModule("json", luajson.Loader)
  237. l.PreloadModule("http", gluahttp.NewHttpModule(&http.Client{}).Loader)
  238. if err := l.DoFile(filePath); err != nil {
  239. return resultAlerts, fmt.Errorf("while executing lua script: %w", err)
  240. }
  241. luaAlerts := []AlertLua{}
  242. if err := l.CallByParam(lua.P{
  243. Fn: l.GetGlobal("getAlerts"),
  244. NRet: 2,
  245. Protect: true,
  246. }, lua.LString(stopID), lua.LString(stopCode), lua.LString(tripID), lua.LString(line.Id)); err != nil {
  247. return resultAlerts, fmt.Errorf("while executing alerts function: %w", err)
  248. }
  249. luaErr := l.Get(-1)
  250. l.Pop(1)
  251. result := l.Get(-1)
  252. l.Pop(1)
  253. if luaErr.(lua.LString) != "" {
  254. err := json.Unmarshal([]byte(luaErr.(lua.LString)), &luaError)
  255. if err != nil {
  256. return resultAlerts, BlockingError{fmt.Errorf("while unmarshalling error '%s': %w", luaErr.(lua.LString), err)}
  257. }
  258. if luaError.WillNextRequestFail || luaError.HttpResponseCode == 429 {
  259. err = BlockingError{luaError}
  260. } else {
  261. err = luaError
  262. }
  263. return resultAlerts, fmt.Errorf("in updates function: %w", err)
  264. }
  265. json.Unmarshal([]byte(result.(lua.LString)), &luaAlerts)
  266. for _, luaAlert := range luaAlerts {
  267. a, err := LuaAlertToAlert(luaAlert)
  268. if err != nil {
  269. log.Printf("while converting lua alert to alert: %v\n", err)
  270. continue
  271. }
  272. resultAlerts = append(resultAlerts, a)
  273. }
  274. // TODO cache
  275. return resultAlerts, nil
  276. }