123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303 |
- // SPDX-FileCopyrightText: Adam Evyčędo
- //
- // SPDX-License-Identifier: AGPL-3.0-or-later
- package traffic
- import (
- "encoding/json"
- "fmt"
- "log"
- "net/http"
- "os"
- "path/filepath"
- "time"
- "github.com/cjoudrey/gluahttp"
- "github.com/yuin/gopher-lua"
- luajson "layeh.com/gopher-json"
- )
- // ..................... feedID stopID
- var lastUpdatedLua = map[string]map[RealtimeFeedType]map[string]uint64{}
- func (e LuaError) Error() string {
- return e.Message
- }
- func isLuaUpdatesScript(context Context) bool {
- _, err := os.Stat(getLuaUpdatesPath(context))
- return err == nil
- }
- func isLuaVehiclesScript(context Context) bool {
- _, err := os.Stat(getLuaVehiclesPath(context))
- return err == nil
- }
- func isLuaAlertsScript(context Context) bool {
- _, err := os.Stat(getLuaAlertsPath(context))
- return err == nil
- }
- func getLuaUpdatesPath(context Context) string {
- return filepath.Join(context.DataHome, context.FeedID, string(context.Version), "updates.lua")
- }
- func getLuaVehiclesPath(context Context) string {
- return filepath.Join(context.DataHome, context.FeedID, string(context.Version), "vehicles.lua")
- }
- func getLuaAlertsPath(context Context) string {
- return filepath.Join(context.DataHome, context.FeedID, string(context.Version), "alerts.lua")
- }
- func getLuaRealtimeUpdates(_ string, _ int, stopID, stopCode string, ctx Context) (map[string][]Update, map[string][]Alert, bool, error) {
- luaUpdates := LuaUpdates{}
- luaError := LuaError{}
- filePath := getLuaUpdatesPath(ctx)
- now := uint64(time.Now().Unix())
- if lastUpdatedLua[ctx.FeedID] == nil {
- lastUpdatedLua[ctx.FeedID] = map[RealtimeFeedType]map[string]uint64{}
- }
- if lastUpdatedLua[ctx.FeedID][TRIP_UPDATES] == nil {
- lastUpdatedLua[ctx.FeedID][TRIP_UPDATES] = map[string]uint64{}
- }
- if passed := now - lastUpdatedLua[ctx.FeedID][TRIP_UPDATES][stopID]; passed < 30 {
- // TODO return from cache
- }
- l := lua.NewState()
- defer l.Close()
- l.PreloadModule("json", luajson.Loader)
- l.PreloadModule("http", gluahttp.NewHttpModule(&http.Client{}).Loader)
- if err := l.DoFile(filePath); err != nil {
- return map[string][]Update{}, map[string][]Alert{}, true, BlockingError{fmt.Errorf("while executing lua script: %w", err)}
- }
- if err := l.CallByParam(lua.P{
- Fn: l.GetGlobal("getUpdates"),
- NRet: 2,
- Protect: true,
- }, lua.LString(""), lua.LNumber(0), lua.LString(stopID), lua.LString(stopCode)); err != nil {
- return map[string][]Update{}, map[string][]Alert{}, true, fmt.Errorf("while executing updates function: %w", err)
- }
- luaErr := l.Get(-1)
- l.Pop(1)
- result := l.Get(-1)
- l.Pop(1)
- if luaErr.(lua.LString) != "" {
- err := json.Unmarshal([]byte(luaErr.(lua.LString)), &luaError)
- if err != nil {
- return map[string][]Update{}, map[string][]Alert{}, true, BlockingError{fmt.Errorf("while unmarshalling error '%s': %w", luaErr.(lua.LString), err)}
- }
- if luaError.WillNextRequestFail || luaError.HttpResponseCode == 429 {
- err = BlockingError{luaError}
- } else {
- err = luaError
- }
- return map[string][]Update{}, map[string][]Alert{}, true, fmt.Errorf("in updates function: %w", err)
- }
- err := json.Unmarshal([]byte(result.(lua.LString)), &luaUpdates)
- if err != nil {
- return map[string][]Update{}, map[string][]Alert{}, true, BlockingError{fmt.Errorf("while unmarshalling updates : %w", err)}
- }
- delete(luaUpdates.Updates, "")
- delete(luaUpdates.Alerts, "")
- cacheMx.Lock()
- if updates == nil {
- updates = map[string]map[string][]Update{}
- }
- if updates[ctx.FeedID] == nil {
- updates[ctx.FeedID] = map[string][]Update{}
- }
- if !luaUpdates.AreTripsInTimetable {
- updates[ctx.FeedID][stopCode] = []Update{}
- }
- for tripID, update := range luaUpdates.Updates {
- cacheKey := tripID
- if !luaUpdates.AreTripsInTimetable {
- cacheKey = stopCode
- }
- update.VehicleStatus.TripID = tripID
- updates[ctx.FeedID][cacheKey] = append(updates[ctx.FeedID][cacheKey], update)
- }
- i := 0
- if alerts == nil {
- alerts = map[string]Alerts{}
- }
- currentAlerts := alerts[ctx.FeedID]
- // TODO single alert in array must be expired
- currentAlerts.Alerts = []Alert{}
- resultAlerts := map[string][]Alert{}
- for tripID, luaAlerts := range luaUpdates.Alerts {
- currentAlerts.ByTrip = map[string][]uint{}
- for _, luaAlert := range luaAlerts {
- a, err := LuaAlertToAlert(luaAlert)
- if err != nil {
- log.Printf("while converting alert: %v\n", err)
- continue
- }
- currentAlerts.Alerts = append(currentAlerts.Alerts, a)
- currentAlerts.ByTrip[tripID] = append(currentAlerts.ByTrip[tripID], uint(i))
- resultAlerts[tripID] = append(resultAlerts[tripID], a)
- i++
- }
- }
- alerts[ctx.FeedID] = currentAlerts
- resultUpdates := updates[ctx.FeedID]
- cacheMx.Unlock()
- return resultUpdates, resultAlerts, luaUpdates.AreTripsInTimetable, nil
- }
- func getLuaRealtimeVehiclesMap(ctx Context, lb, rt Position) (map[string]VehicleStatus, error) {
- luaError := LuaError{}
- statuses := map[string]VehicleStatus{}
- filePath := getLuaVehiclesPath(ctx)
- now := uint64(time.Now().Unix())
- if lastUpdatedLua[ctx.FeedID] == nil {
- lastUpdatedLua[ctx.FeedID] = map[RealtimeFeedType]map[string]uint64{}
- }
- if lastUpdatedLua[ctx.FeedID][VEHICLE_POSITIONS] == nil {
- lastUpdatedLua[ctx.FeedID][VEHICLE_POSITIONS] = map[string]uint64{}
- }
- if vehicleStatuses == nil {
- cacheMx.Lock()
- vehicleStatuses = map[string]map[string]VehicleStatus{}
- cacheMx.Unlock()
- }
- if passed := now - lastUpdatedLua[ctx.FeedID][VEHICLE_POSITIONS][""]; passed < 30 {
- cacheMx.Lock()
- resultVehicleStatuses := vehicleStatuses[ctx.FeedID]
- cacheMx.Unlock()
- return resultVehicleStatuses, nil
- }
- l := lua.NewState()
- defer l.Close()
- l.PreloadModule("json", luajson.Loader)
- l.PreloadModule("http", gluahttp.NewHttpModule(&http.Client{}).Loader)
- if err := l.DoFile(filePath); err != nil {
- return statuses, fmt.Errorf("while executing lua script: %w", err)
- }
- if err := l.CallByParam(lua.P{
- Fn: l.GetGlobal("getVehicles"),
- NRet: 2,
- Protect: true,
- }, lua.LNumber(rt.Lat), lua.LNumber(lb.Lon), lua.LNumber(lb.Lat), lua.LNumber(rt.Lon)); err != nil {
- return statuses, fmt.Errorf("while executing vehicles function: %w", err)
- }
- luaErr := l.Get(-1)
- l.Pop(1)
- result := l.Get(-1)
- l.Pop(1)
- if luaErr.(lua.LString) != "" {
- err := json.Unmarshal([]byte(luaErr.(lua.LString)), &luaError)
- if err != nil {
- return statuses, BlockingError{fmt.Errorf("while unmarshalling error '%s': %w", luaErr.(lua.LString), err)}
- }
- if luaError.WillNextRequestFail || luaError.HttpResponseCode == 429 {
- err = BlockingError{luaError}
- } else {
- err = luaError
- }
- return statuses, fmt.Errorf("in updates function: %w", err)
- }
- json.Unmarshal([]byte(result.(lua.LString)), &statuses)
- cacheMx.Lock()
- vehicleStatuses[ctx.FeedID] = statuses
- cacheMx.Unlock()
- return statuses, nil
- }
- func getLuaRealtimeVehicles(ctx Context, lb, rt Position) ([]VehicleStatus, error) {
- statusesMap, err := getLuaRealtimeVehiclesMap(ctx, lb, rt)
- if err != nil {
- return []VehicleStatus{}, err
- }
- statuses := make([]VehicleStatus, len(statusesMap))
- i := 0
- for _, status := range statusesMap {
- statuses[i] = status
- i++
- }
- return statuses, nil
- }
- func getLuaRealtimeAlerts(stopID, stopCode, tripID string, ctx Context, t *Traffic) ([]Alert, error) {
- luaError := LuaError{}
- resultAlerts := []Alert{}
- filePath := getLuaAlertsPath(ctx)
- now := uint64(time.Now().Unix())
- if lastUpdatedLua[ctx.FeedID] == nil {
- lastUpdatedLua[ctx.FeedID] = map[RealtimeFeedType]map[string]uint64{}
- }
- if lastUpdatedLua[ctx.FeedID][ALERTS] == nil {
- lastUpdatedLua[ctx.FeedID][ALERTS] = map[string]uint64{}
- }
- if passed := now - lastUpdatedLua[ctx.FeedID][ALERTS][""]; passed < 30 {
- // TODO return from cache
- }
- var line Line
- if tripID != "" {
- trip, err := GetTrip(tripID, ctx, t)
- if err != nil {
- return []Alert{}, fmt.Errorf("while getting trip: %w", err)
- }
- line, err = GetLine(trip.LineID, ctx, t)
- if err != nil {
- return []Alert{}, fmt.Errorf("while getting line: %w", err)
- }
- }
- l := lua.NewState()
- defer l.Close()
- l.PreloadModule("json", luajson.Loader)
- l.PreloadModule("http", gluahttp.NewHttpModule(&http.Client{}).Loader)
- if err := l.DoFile(filePath); err != nil {
- return resultAlerts, fmt.Errorf("while executing lua script: %w", err)
- }
- luaAlerts := []AlertLua{}
- if err := l.CallByParam(lua.P{
- Fn: l.GetGlobal("getAlerts"),
- NRet: 2,
- Protect: true,
- }, lua.LString(stopID), lua.LString(stopCode), lua.LString(tripID), lua.LString(line.Id)); err != nil {
- return resultAlerts, fmt.Errorf("while executing alerts function: %w", err)
- }
- luaErr := l.Get(-1)
- l.Pop(1)
- result := l.Get(-1)
- l.Pop(1)
- if luaErr.(lua.LString) != "" {
- err := json.Unmarshal([]byte(luaErr.(lua.LString)), &luaError)
- if err != nil {
- return resultAlerts, BlockingError{fmt.Errorf("while unmarshalling error '%s': %w", luaErr.(lua.LString), err)}
- }
- if luaError.WillNextRequestFail || luaError.HttpResponseCode == 429 {
- err = BlockingError{luaError}
- } else {
- err = luaError
- }
- return resultAlerts, fmt.Errorf("in updates function: %w", err)
- }
- json.Unmarshal([]byte(result.(lua.LString)), &luaAlerts)
- for _, luaAlert := range luaAlerts {
- a, err := LuaAlertToAlert(luaAlert)
- if err != nil {
- log.Printf("while converting lua alert to alert: %v\n", err)
- continue
- }
- resultAlerts = append(resultAlerts, a)
- }
- // TODO cache
- return resultAlerts, nil
- }
|