realtime.go 12 KB


  1. // SPDX-FileCopyrightText: Adam Evyčędo
  2. //
  3. // SPDX-License-Identifier: AGPL-3.0-or-later
  4. package traffic
  5. import (
  6. pb "apiote.xyz/p/szczanieckiej/gtfs_rt/transit_realtime"
  7. "errors"
  8. "fmt"
  9. "log"
  10. "os"
  11. "strings"
  12. "sync"
  13. "time"
  14. "git.sr.ht/~sircmpwn/go-bare"
  15. "golang.org/x/text/language"
  16. )
  17. type BlockingError struct {
  18. cause error
  19. }
  20. func (e BlockingError) Error() string {
  21. return e.cause.Error()
  22. }
  23. type Alerts struct {
  24. ByLine map[string][]uint
  25. ByTrip map[string][]uint
  26. ByLineType map[LineType][]uint
  27. ByStop map[string][]uint
  28. ByAgency map[string][]uint
  29. Alerts []Alert
  30. }
  31. type SpecificAlert struct {
  32. Header string
  33. Description string
  34. URL string
  35. Cause AlertCause
  36. Effect AlertEffect
  37. }
  38. type Alert struct {
  39. TimeRanges [][2]time.Time
  40. Headers map[language.Tag]string
  41. Descriptions map[language.Tag]string
  42. URLs map[language.Tag]string
  43. Cause AlertCause
  44. Effect AlertEffect
  45. }
  46. type AlertCause uint
  47. const (
  48. CAUSE_UNKNOWN AlertCause = 0
  49. CAUSE_OTHER AlertCause = 1
  50. CAUSE_TECHNICAL_PROBLEM AlertCause = 2
  51. CAUSE_STRIKE AlertCause = 3
  52. CAUSE_DEMONSTRATION AlertCause = 4
  53. CAUSE_ACCIDENT AlertCause = 5
  54. CAUSE_HOLIDAY AlertCause = 6
  55. CAUSE_WEATHER AlertCause = 7
  56. CAUSE_MAINTENANCE AlertCause = 8
  57. CAUSE_CONSTRUCTION AlertCause = 9
  58. CAUSE_POLICE_ACTIVITY AlertCause = 10
  59. CAUSE_MEDICAL_EMERGENCY AlertCause = 11
  60. )
  61. func alertCauseOfGtfs(v *pb.Alert_Cause) AlertCause {
  62. switch v {
  63. case pb.Alert_UNKNOWN_CAUSE.Enum():
  64. return CAUSE_UNKNOWN
  65. case pb.Alert_OTHER_CAUSE.Enum():
  66. return CAUSE_OTHER
  67. case pb.Alert_TECHNICAL_PROBLEM.Enum():
  68. return CAUSE_TECHNICAL_PROBLEM
  69. case pb.Alert_STRIKE.Enum():
  70. return CAUSE_STRIKE
  71. case pb.Alert_DEMONSTRATION.Enum():
  72. return CAUSE_DEMONSTRATION
  73. case pb.Alert_ACCIDENT.Enum():
  74. return CAUSE_ACCIDENT
  75. case pb.Alert_HOLIDAY.Enum():
  76. return CAUSE_HOLIDAY
  77. case pb.Alert_WEATHER.Enum():
  78. return CAUSE_WEATHER
  79. case pb.Alert_MAINTENANCE.Enum():
  80. return CAUSE_MAINTENANCE
  81. case pb.Alert_CONSTRUCTION.Enum():
  82. return CAUSE_CONSTRUCTION
  83. case pb.Alert_POLICE_ACTIVITY.Enum():
  84. return CAUSE_POLICE_ACTIVITY
  85. case pb.Alert_MEDICAL_EMERGENCY.Enum():
  86. return CAUSE_MEDICAL_EMERGENCY
  87. default:
  88. return CAUSE_UNKNOWN
  89. }
  90. }
  91. type AlertEffect uint
  92. const (
  93. EFFECT_UNKNOWN AlertEffect = 0
  94. EFFECT_OTHER AlertEffect = 1
  95. EFFECT_NO_SERVICE AlertEffect = 2
  96. EFFECT_REDUCED_SERVICE AlertEffect = 3
  97. EFFECT_SIGNIFICANT_DELAYS AlertEffect = 4
  98. EFFECT_DETOUR AlertEffect = 5
  99. EFFECT_ADDITIONAL_SERVICE AlertEffect = 6
  100. EFFECT_MODIFIED_SERVICE AlertEffect = 7
  101. EFFECT_STOP_MOVED AlertEffect = 8
  102. EFFECT_NONE AlertEffect = 9
  103. EFFECT_ACCESSIBILITY_ISSUE AlertEffect = 10
  104. )
  105. func alertEffectOfGtfs(v *pb.Alert_Effect) AlertEffect {
  106. switch v {
  107. case pb.Alert_UNKNOWN_EFFECT.Enum():
  108. return EFFECT_UNKNOWN
  109. case pb.Alert_OTHER_EFFECT.Enum():
  110. return EFFECT_OTHER
  111. case pb.Alert_NO_SERVICE.Enum():
  112. return EFFECT_NO_SERVICE
  113. case pb.Alert_REDUCED_SERVICE.Enum():
  114. return EFFECT_REDUCED_SERVICE
  115. case pb.Alert_SIGNIFICANT_DELAYS.Enum():
  116. return EFFECT_SIGNIFICANT_DELAYS
  117. case pb.Alert_DETOUR.Enum():
  118. return EFFECT_DETOUR
  119. case pb.Alert_ADDITIONAL_SERVICE.Enum():
  120. return EFFECT_ADDITIONAL_SERVICE
  121. case pb.Alert_MODIFIED_SERVICE.Enum():
  122. return EFFECT_MODIFIED_SERVICE
  123. case pb.Alert_STOP_MOVED.Enum():
  124. return EFFECT_STOP_MOVED
  125. case pb.Alert_NO_EFFECT.Enum():
  126. return EFFECT_NONE
  127. case pb.Alert_ACCESSIBILITY_ISSUE.Enum():
  128. return EFFECT_ACCESSIBILITY_ISSUE
  129. default:
  130. return EFFECT_UNKNOWN
  131. }
  132. }
  133. // ............ feedID trip/stop
  134. var updates map[string]map[string][]Update
  135. var alerts map[string]Alerts
  136. var vehicleStatuses map[string]map[string]VehicleStatus
  137. var cacheMx sync.Mutex
  138. func getTripID(tripsFile *os.File, offset int64) (string, error) {
  139. _, err := tripsFile.Seek(offset, 0)
  140. if err != nil {
  141. return "", fmt.Errorf("while seeking: %w", err)
  142. }
  143. trip := Trip{}
  144. err = bare.UnmarshalReader(tripsFile, &trip)
  145. if err != nil {
  146. return "", fmt.Errorf("while unmarshalling: %w", err)
  147. }
  148. return trip.Id, nil
  149. }
  150. func departuresFromNoTripUpdates(updates []Update, alerts map[string][]Alert, pickups, dropoffs map[string]Boarding, timezone *time.Location, languages []language.Tag) ([]DepartureRealtime, error) {
  151. departures := []DepartureRealtime{}
  152. now := time.Now().In(timezone)
  153. for _, update := range updates {
  154. if update.Time == "" {
  155. log.Printf("update time is empty, update is %+v\n", update)
  156. continue
  157. }
  158. departureTime, err := time.Parse("150405", update.Time)
  159. if err != nil {
  160. return departures, fmt.Errorf("while parsing time: %w", err)
  161. }
  162. departureTime = time.Date(now.Year(), now.Month(), now.Day(), departureTime.Hour(), departureTime.Minute(), departureTime.Second(), 0, timezone)
  163. departures = append(departures, DepartureRealtime{
  164. Time: departureTime,
  165. Departure: Departure{
  166. Pickup: pickups[update.VehicleStatus.LineID],
  167. Dropoff: dropoffs[update.VehicleStatus.LineID],
  168. },
  169. Headsign: update.VehicleStatus.Headsign,
  170. LineID: update.VehicleStatus.LineID,
  171. Order: StopOrder{
  172. uint(departureTime.Unix()),
  173. 0,
  174. },
  175. Update: update, // NOTE delay must be 0
  176. Alerts: selectSpecificAlerts(alerts[update.VehicleStatus.TripID], languages),
  177. })
  178. }
  179. return departures, nil
  180. }
  181. func enrichDepartures(stopID, stopCode string, departures []DepartureRealtime, datetime time.Time, departuresType DeparturesType, ctx Context, tripsFile *os.File, timezone *time.Location, languages []language.Tag) ([]DepartureRealtime, error) { // TODO tripsFile -> map[tripOffset]tripID
  182. enrichedDepartures := make([]DepartureRealtime, len(departures))
  183. feedInfo, err := getFeedInfo(ctx.DataHome, ctx.FeedID, ctx.Version)
  184. if err != nil {
  185. log.Printf("while getting feedInfo: %v\n", err)
  186. feedInfo = FeedInfo{}
  187. }
  188. var enrichMethod func(string, int, string, string, Context) (map[string][]Update, map[string][]Alert, bool, error)
  189. if feedInfo.Name != "" {
  190. if _, ok := feedInfo.RealtimeFeeds[TRIP_UPDATES]; ok {
  191. enrichMethod = getGtfsRealtimeUpdates
  192. // log.Println("GTFS")
  193. } else if isLuaUpdatesScript(ctx) {
  194. enrichMethod = getLuaRealtimeUpdates
  195. // log.Println("Lua")
  196. } else {
  197. // log.Println("none")
  198. }
  199. }
  200. offsets := make([]uint, len(departures))
  201. pickups := map[string]Boarding{}
  202. dropoffs := map[string]Boarding{}
  203. for i, departure := range departures {
  204. offsets[i] = departure.Order.TripOffset
  205. pickups[departure.LineID] = departure.Departure.Pickup
  206. dropoffs[departure.LineID] = departure.Departure.Dropoff
  207. }
  208. trips, err := GetTripsByOffset(offsets, ctx, func(Trip) bool { return true })
  209. if err != nil {
  210. return departures, fmt.Errorf("while getting trips: %w", err)
  211. }
  212. midnight := time.Date(datetime.Year(), datetime.Month(),
  213. datetime.Day(), 0, 0, 0, 0, timezone)
  214. if departuresType == DEPARTURES_HYBRID {
  215. for i, departure := range departures {
  216. if departure.Time.After(midnight) {
  217. var (
  218. updates map[string][]Update
  219. alerts map[string][]Alert
  220. areTripsInTimetable bool
  221. )
  222. if enrichMethod != nil {
  223. updates, alerts, areTripsInTimetable, err = enrichMethod(trips[departure.Order.TripOffset].Id, departure.Order.Sequence, stopID, stopCode, ctx)
  224. if err != nil {
  225. var ber BlockingError
  226. if isTimeout(err) || errors.As(err, &ber) || strings.Contains(err.Error(), "connection refused") { // TODO or any other connection problem
  227. log.Printf("blocking error while enriching departure %s -> %s (%v): %v", departure.LineID, departure.Headsign, departure.Time, err)
  228. update := Update{}
  229. update.VehicleStatus.LineID = trips[departure.Order.TripOffset].LineID
  230. update.VehicleStatus.Headsign = trips[departure.Order.TripOffset].Headsign
  231. enrichedDepartures[i] = departure.WithUpdate(update)
  232. enrichMethod = nil
  233. continue
  234. } else {
  235. log.Printf("while enriching departure %s -> %s (%v): %v\n", departure.LineID, departure.Headsign, departure.Time, err)
  236. enrichedDepartures[i] = departure
  237. continue
  238. }
  239. }
  240. if areTripsInTimetable {
  241. tripUpdates := updates[trips[departure.Order.TripOffset].Id]
  242. var validTripUpdate Update
  243. for _, tripUpdate := range tripUpdates {
  244. if tripUpdate.StopSequence > uint32(departure.Order.Sequence) {
  245. break
  246. }
  247. validTripUpdate.Time = tripUpdate.Time
  248. validTripUpdate.Delay = tripUpdate.Delay
  249. validTripUpdate.StopID = tripUpdate.StopID
  250. validTripUpdate.StopSequence = tripUpdate.StopSequence
  251. validTripUpdate.Time = tripUpdate.Time
  252. validTripUpdate.TimetableRelationship = tripUpdate.TimetableRelationship
  253. validTripUpdate.VehicleStatus = tripUpdate.VehicleStatus
  254. }
  255. validTripUpdate.VehicleStatus.LineID = trips[departure.Order.TripOffset].LineID
  256. validTripUpdate.VehicleStatus.Headsign = trips[departure.Order.TripOffset].Headsign
  257. enrichedDepartures[i] = departure.WithUpdate(validTripUpdate)
  258. enrichedDepartures[i] = enrichedDepartures[i].WithAlerts(alerts[trips[departure.Order.TripOffset].Id], languages)
  259. } else {
  260. var err error
  261. enrichedDepartures, err = departuresFromNoTripUpdates(updates[stopCode], alerts, pickups, dropoffs, timezone, languages)
  262. if err != nil {
  263. return departures, fmt.Errorf("while creating departures without trip: %w", err)
  264. }
  265. break
  266. }
  267. } else {
  268. update := Update{}
  269. update.VehicleStatus.LineID = trips[departure.Order.TripOffset].LineID
  270. update.VehicleStatus.Headsign = trips[departure.Order.TripOffset].Headsign
  271. enrichedDepartures[i] = departure.WithUpdate(update)
  272. }
  273. }
  274. }
  275. } else {
  276. for i, departure := range departures {
  277. enrichedDepartures[i] = departure.WithUpdate(Update{
  278. VehicleStatus: VehicleStatus{
  279. LineID: trips[departure.Order.TripOffset].LineID,
  280. Headsign: trips[departure.Order.TripOffset].Headsign,
  281. },
  282. })
  283. }
  284. }
  285. return enrichedDepartures, nil
  286. }
  287. func GetAlerts(stopID, stopCode string, tripOffset int, ctx Context, t *Traffic, languages []language.Tag) []SpecificAlert {
  288. feedInfo, err := getFeedInfo(ctx.DataHome, ctx.FeedID, ctx.Version)
  289. if err != nil {
  290. log.Printf("while getting feedInfo: %v\n", err)
  291. feedInfo = FeedInfo{}
  292. }
  293. var function func(string, string, string, Context, *Traffic) ([]Alert, error)
  294. if feedInfo.Name != "" {
  295. if _, ok := feedInfo.RealtimeFeeds[ALERTS]; ok {
  296. function = getGtfsRealtimeAlerts
  297. } else if isLuaAlertsScript(ctx) {
  298. function = getLuaRealtimeAlerts
  299. } else {
  300. return []SpecificAlert{}
  301. }
  302. }
  303. tripID := ""
  304. if tripOffset > 0 {
  305. trip, err := GetTripByOffset(uint(tripOffset), ctx, t)
  306. if err != nil {
  307. log.Printf("while getting trip: %v\n", err)
  308. return []SpecificAlert{}
  309. }
  310. tripID = trip.Id
  311. }
  312. if function != nil {
  313. alerts, err := function(stopID, stopCode, tripID, ctx, t)
  314. if err != nil {
  315. log.Printf("while getting alerts: %v\n", err)
  316. return []SpecificAlert{}
  317. }
  318. return selectSpecificAlerts(alerts, languages)
  319. }
  320. return []SpecificAlert{}
  321. }
  322. func getVehiclePositions(ctx Context, t *Traffic, lb, rt Position) []VehicleStatus {
  323. feedInfo, err := getFeedInfo(ctx.DataHome, ctx.FeedID, ctx.Version)
  324. if err != nil {
  325. log.Printf("while getting feedInfo: %v\n", err)
  326. feedInfo = FeedInfo{}
  327. }
  328. var function func(Context, Position, Position) ([]VehicleStatus, error)
  329. if feedInfo.Name != "" {
  330. if _, ok := feedInfo.RealtimeFeeds[VEHICLE_POSITIONS]; ok {
  331. function = getGtfsRealtimeVehicles
  332. } else if isLuaVehiclesScript(ctx) {
  333. function = getLuaRealtimeVehicles
  334. }
  335. }
  336. if function != nil {
  337. statuses, err := function(ctx, lb, rt)
  338. if err != nil {
  339. log.Printf("while getting vehicle positions: %v\n", err)
  340. return []VehicleStatus{}
  341. }
  342. ids := make([]string, len(statuses))
  343. for i, status := range statuses {
  344. ids[i] = status.TripID
  345. }
  346. trips, err := GetTrips(ids, ctx, t)
  347. if err != nil {
  348. log.Printf("while getting trips: %v", err)
  349. }
  350. statusesWithLine := make([]VehicleStatus, len(statuses))
  351. for i, status := range statuses {
  352. if status.LineID != "" && status.Headsign != "" {
  353. statusesWithLine[i] = status
  354. } else {
  355. if _, ok := trips[status.TripID]; !ok {
  356. continue
  357. }
  358. status.LineID = trips[status.TripID].LineID
  359. status.Headsign = trips[status.TripID].Headsign
  360. statusesWithLine[i] = status
  361. }
  362. }
  363. return statusesWithLine
  364. }
  365. return []VehicleStatus{}
  366. }