realtime_gtfs.go 14 KB


  1. // SPDX-FileCopyrightText: Adam Evyčędo
  2. //
  3. // SPDX-License-Identifier: AGPL-3.0-or-later
  4. package traffic
  5. import (
  6. "apiote.xyz/p/szczanieckiej/gtfs_rt"
  7. pb "apiote.xyz/p/szczanieckiej/gtfs_rt/transit_realtime"
  8. "fmt"
  9. "math"
  10. "time"
  11. "golang.org/x/text/language"
  12. )
  13. // ........................ feedID
  14. var lastUpdatedGtfsRt = map[string]map[RealtimeFeedType]uint64{}
  15. func makeTimetableRelationshipFromTripTimetable(r pb.TripDescriptor_ScheduleRelationship) TimetableRelationship {
  16. switch r {
  17. case pb.TripDescriptor_ADDED:
  18. return TRIP_ADDED
  19. case pb.TripDescriptor_CANCELED:
  20. return TRIP_CANCELED
  21. case pb.TripDescriptor_DELETED:
  22. return TRIP_DELETED
  23. default:
  24. return TRIP_SCHEDULED
  25. }
  26. }
  27. func makeTimetableRelationshipFromStopTrip(r pb.TripUpdate_StopTimeUpdate_ScheduleRelationship) TimetableRelationship {
  28. switch r {
  29. case pb.TripUpdate_StopTimeUpdate_NO_DATA:
  30. return NO_TRIP_DATA
  31. case pb.TripUpdate_StopTimeUpdate_SKIPPED:
  32. return STOP_SKIPPED
  33. default:
  34. return TRIP_SCHEDULED
  35. }
  36. }
  37. func makeDepartureStatus(s pb.VehiclePosition_VehicleStopStatus) DepartureStatus {
  38. switch s {
  39. case pb.VehiclePosition_STOPPED_AT:
  40. return AT_STOP
  41. case pb.VehiclePosition_INCOMING_AT:
  42. return INCOMING
  43. default:
  44. return IN_TRANSIT
  45. }
  46. }
  47. func makeCongestionLevel(l pb.VehiclePosition_CongestionLevel) CongestionLevel {
  48. switch l {
  49. case pb.VehiclePosition_RUNNING_SMOOTHLY:
  50. return CONGESTION_SMOOTH
  51. case pb.VehiclePosition_STOP_AND_GO:
  52. return CONGESTION_STOP_AND_GO
  53. case pb.VehiclePosition_CONGESTION:
  54. return CONGESTION_SIGNIFICANT
  55. case pb.VehiclePosition_SEVERE_CONGESTION:
  56. return CONGESTION_SEVERE
  57. default:
  58. return CONGESTION_UNKNOWN
  59. }
  60. }
  61. // XXX default occupancy status in GTFS-realtime is VehiclePosition_EMPTY while VehiclePosition_NO_DATA exists
  62. func makeOccupancyStatus(s *pb.VehiclePosition_OccupancyStatus) OccupancyStatus {
  63. if s == nil {
  64. return OCCUPANCY_UNKNOWN
  65. }
  66. switch *s {
  67. case pb.VehiclePosition_EMPTY:
  68. return OCCUPANCY_EMPTY
  69. case pb.VehiclePosition_MANY_SEATS_AVAILABLE:
  70. return OCCUPANCY_MANY_AVAILABLE
  71. case pb.VehiclePosition_FEW_SEATS_AVAILABLE:
  72. return OCCUPANCY_FEW_AVAILABLE
  73. case pb.VehiclePosition_STANDING_ROOM_ONLY:
  74. return OCCUPANCY_STANDING_ONLY
  75. case pb.VehiclePosition_CRUSHED_STANDING_ROOM_ONLY:
  76. return OCCUPANCY_CRUSHED
  77. case pb.VehiclePosition_FULL:
  78. return OCCUPANCY_FULL
  79. case pb.VehiclePosition_NOT_ACCEPTING_PASSENGERS:
  80. fallthrough
  81. case pb.VehiclePosition_NOT_BOARDABLE:
  82. return OCCUPANCY_NOT_ACCEPTING
  83. default:
  84. return OCCUPANCY_UNKNOWN
  85. }
  86. }
  87. func makeWheelchairAccessibility(a pb.VehicleDescriptor_WheelchairAccessible) WheelchairAccessibility {
  88. switch a {
  89. case pb.VehicleDescriptor_NO_VALUE:
  90. return WHEELCHAIR_NO_DATA
  91. case pb.VehicleDescriptor_WHEELCHAIR_ACCESSIBLE:
  92. return WHEELCHAIR_INACCESSIBLE
  93. case pb.VehicleDescriptor_WHEELCHAIR_INACCESSIBLE:
  94. return WHEELCHAIR_INACCESSIBLE
  95. default:
  96. return WHEELCHAIR_UNKNOWN
  97. }
  98. }
  99. func makeLineType(routeType int32) LineType {
  100. switch routeType {
  101. case 0:
  102. return TRAM
  103. case 1:
  104. return METRO
  105. case 2:
  106. return RAIL
  107. case 3:
  108. return BUS
  109. case 4:
  110. return FERRY
  111. case 5:
  112. return CABLE_TRAM
  113. case 6:
  114. return CABLE_CAR
  115. case 7:
  116. return FUNICULAR
  117. case 11:
  118. return TROLLEYBUS
  119. case 12:
  120. return MONORAIL
  121. default:
  122. panic(fmt.Sprintf("unknown GTFS route type: %d", routeType))
  123. }
  124. }
  125. func getGtfsRtData(entities []*pb.FeedEntity, feedID string) map[RealtimeFeedType]int {
  126. cacheMx.Lock()
  127. if vehicleStatuses == nil {
  128. vehicleStatuses = map[string]map[string]VehicleStatus{}
  129. }
  130. if vehicleStatuses[feedID] == nil {
  131. vehicleStatuses[feedID] = map[string]VehicleStatus{}
  132. }
  133. if updates == nil {
  134. updates = map[string]map[string][]Update{}
  135. }
  136. if updates[feedID] == nil {
  137. updates[feedID] = map[string][]Update{}
  138. }
  139. if alerts == nil {
  140. alerts = map[string]Alerts{}
  141. }
  142. var alertNumber uint = 0
  143. which := map[RealtimeFeedType]int{}
  144. for _, entity := range entities {
  145. a := entity.Alert
  146. v := entity.Vehicle
  147. t := entity.TripUpdate
  148. if a != nil {
  149. if alertNumber == 0 {
  150. alerts[feedID] = Alerts{
  151. ByLine: map[string][]uint{},
  152. ByTrip: map[string][]uint{},
  153. ByLineType: map[LineType][]uint{},
  154. ByStop: map[string][]uint{},
  155. ByAgency: map[string][]uint{},
  156. }
  157. }
  158. which[ALERTS] = 1
  159. alert := Alert{
  160. Headers: map[language.Tag]string{},
  161. Descriptions: map[language.Tag]string{},
  162. URLs: map[language.Tag]string{},
  163. }
  164. for _, period := range a.ActivePeriod {
  165. if period != nil {
  166. timeRange := [2]time.Time{
  167. time.Unix(int64(*period.Start), 0),
  168. time.Unix(int64(*period.End), 0),
  169. }
  170. alert.TimeRanges = append(alert.TimeRanges, timeRange)
  171. }
  172. }
  173. if translations := a.HeaderText.GetTranslation(); translations != nil {
  174. for _, t := range translations {
  175. if t == nil || t.GetText() == "" {
  176. continue
  177. }
  178. if t.Language == nil || t.GetLanguage() == "" {
  179. alert.Headers[language.Und] = *t.Text
  180. }
  181. tag, err := language.Parse(t.GetLanguage())
  182. if err == nil {
  183. alert.Headers[tag] = *t.Text
  184. }
  185. }
  186. }
  187. if translations := a.DescriptionText.GetTranslation(); translations != nil {
  188. for _, t := range translations {
  189. if t == nil || t.GetText() == "" {
  190. continue
  191. }
  192. if t.Language == nil || t.GetLanguage() == "" {
  193. alert.Descriptions[language.Und] = *t.Text
  194. }
  195. tag, err := language.Parse(t.GetLanguage())
  196. if err == nil {
  197. alert.Descriptions[tag] = *t.Text
  198. }
  199. }
  200. }
  201. if translations := a.Url.GetTranslation(); translations != nil {
  202. for _, t := range translations {
  203. if t == nil || t.GetText() == "" {
  204. continue
  205. }
  206. if t.Language == nil || t.GetLanguage() == "" {
  207. alert.URLs[language.Und] = *t.Text
  208. }
  209. tag, err := language.Parse(t.GetLanguage())
  210. if err == nil {
  211. alert.URLs[tag] = *t.Text
  212. }
  213. }
  214. }
  215. alert.Cause = alertCauseOfGtfs(a.Cause)
  216. alert.Effect = alertEffectOfGtfs(a.Effect)
  217. for _, e := range a.InformedEntity {
  218. if e == nil {
  219. continue
  220. }
  221. if e.AgencyId != nil {
  222. alerts[feedID].ByAgency[*e.AgencyId] = append(alerts[feedID].ByAgency[*e.AgencyId], alertNumber)
  223. }
  224. if e.RouteId != nil {
  225. alerts[feedID].ByLine[*e.RouteId] = append(alerts[feedID].ByLine[*e.RouteId], alertNumber)
  226. }
  227. if e.StopId != nil {
  228. alerts[feedID].ByStop[*e.StopId] = append(alerts[feedID].ByStop[*e.StopId], alertNumber)
  229. }
  230. if e.Trip != nil && e.Trip.TripId != nil {
  231. alerts[feedID].ByTrip[*e.Trip.TripId] = append(alerts[feedID].ByTrip[*e.Trip.TripId], alertNumber)
  232. }
  233. if e.RouteType != nil {
  234. alerts[feedID].ByLineType[makeLineType(*e.RouteType)] = append(alerts[feedID].ByLineType[makeLineType(*e.RouteType)], alertNumber)
  235. }
  236. }
  237. alertNumber++
  238. alertsStruct := alerts[feedID]
  239. alertsStruct.Alerts = append(alerts[feedID].Alerts, alert)
  240. alerts[feedID] = alertsStruct
  241. }
  242. if v != nil {
  243. which[VEHICLE_POSITIONS] = 1
  244. tripUpdates := updates[feedID][*v.Trip.TripId]
  245. vehicleUpdate := VehicleStatus{
  246. Status: makeDepartureStatus(v.GetCurrentStatus()),
  247. CongestionLevel: makeCongestionLevel(v.GetCongestionLevel()),
  248. OccupancyStatus: makeOccupancyStatus(v.OccupancyStatus),
  249. VehicleID: v.GetVehicle().GetId(),
  250. Latitude: float64(v.GetPosition().GetLatitude()),
  251. Longitude: float64(v.GetPosition().GetLongitude()),
  252. Speed: v.GetPosition().GetSpeed(),
  253. Bearing: float64(v.GetPosition().GetBearing() * math.Pi / 180),
  254. TripID: *v.Trip.TripId,
  255. WheelchairAccessibility: makeWheelchairAccessibility(v.GetVehicle().GetWheelchairAccessible()),
  256. }
  257. vehicleStatuses[feedID][*v.Trip.TripId] = vehicleUpdate
  258. if len(tripUpdates) == 0 {
  259. tripUpdate := Update{
  260. StopSequence: v.GetCurrentStopSequence(),
  261. StopID: v.GetStopId(),
  262. VehicleStatus: vehicleUpdate,
  263. TimetableRelationship: NO_TRIP_DATA,
  264. }
  265. updates[feedID][*v.Trip.TripId] = []Update{tripUpdate}
  266. } else {
  267. for _, tripUpdate := range tripUpdates {
  268. tripUpdate.VehicleStatus = vehicleUpdate
  269. }
  270. updates[feedID][*v.Trip.TripId] = tripUpdates
  271. }
  272. }
  273. if t != nil {
  274. tripUpdates := make([]Update, len(t.StopTimeUpdate))
  275. startTimeString := t.GetTripProperties().GetStartTime()
  276. var startTime *uint
  277. if startTimeString != "" {
  278. sT, err := parseDepartureTime(startTimeString)
  279. if err != nil {
  280. continue
  281. }
  282. startTime = &sT
  283. }
  284. for i, stopTimeUpdate := range t.StopTimeUpdate {
  285. which[TRIP_UPDATES] = 1
  286. update := Update{
  287. StopSequence: stopTimeUpdate.GetStopSequence(),
  288. StopID: stopTimeUpdate.GetStopId(),
  289. VehicleStatus: vehicleStatuses[feedID][*t.Trip.TripId],
  290. StartTime: startTime,
  291. }
  292. update.VehicleStatus.TripID = *t.Trip.TripId
  293. update.Delay = stopTimeUpdate.GetArrival().GetDelay()
  294. if update.StopSequence == 0 && update.Delay < 0 {
  295. update.Delay = 0
  296. }
  297. updateTime := stopTimeUpdate.GetArrival().GetTime()
  298. if updateTime != 0 {
  299. update.TimeUTC = time.Unix(updateTime, 0).In(time.UTC).Format("150405")
  300. }
  301. stopTripRelationship := stopTimeUpdate.GetScheduleRelationship()
  302. if stopTripRelationship == pb.TripUpdate_StopTimeUpdate_SCHEDULED {
  303. tripTimetableRelationship := t.Trip.GetScheduleRelationship()
  304. update.TimetableRelationship = makeTimetableRelationshipFromTripTimetable(tripTimetableRelationship)
  305. } else {
  306. update.TimetableRelationship = makeTimetableRelationshipFromStopTrip(stopTripRelationship)
  307. }
  308. tripUpdates[i] = update
  309. }
  310. updates[feedID][*t.Trip.TripId] = tripUpdates
  311. }
  312. }
  313. cacheMx.Unlock()
  314. return which
  315. }
  316. func getGtfsRealtimeMessages(feedType RealtimeFeedType, feedID string, feeds map[RealtimeFeedType]string) error {
  317. now := uint64(time.Now().Unix())
  318. if lastUpdatedGtfsRt[feedID] == nil {
  319. lastUpdatedGtfsRt[feedID] = map[RealtimeFeedType]uint64{}
  320. }
  321. if passed := now - lastUpdatedGtfsRt[feedID][feedType]; passed > 30 {
  322. message, err := gtfs_rt.GetMessages(feedID, feeds[feedType])
  323. if err != nil {
  324. return BlockingError{fmt.Errorf("while getting messages: %w", err)}
  325. }
  326. /*
  327. cacheMx.Lock()
  328. switch feedType {
  329. case TRIP_UPDATES:
  330. updates = map[string]map[string][]Update{}
  331. case VEHICLE_POSITIONS:
  332. vehicleStatuses = map[string]map[string]VehicleStatus{}
  333. case ALERTS:
  334. alerts = map[string]Alerts{}
  335. }
  336. cacheMx.Unlock()
  337. */
  338. lastUpdatedGtfsRt[feedID][feedType] = now
  339. whichUpdated := getGtfsRtData(message.Entity, feedID)
  340. for key, value := range whichUpdated {
  341. if value == 1 {
  342. lastUpdatedGtfsRt[feedID][key] = now
  343. }
  344. }
  345. }
  346. return nil
  347. }
  348. func getGtfsRealtimeUpdates(_ string, _ int, _, _ string, ctx Context) (map[string][]Update, map[string][]Alert, bool, error) {
  349. feedInfo, err := getFeedInfo(ctx.DataHome, ctx.FeedID, ctx.Version)
  350. if err != nil {
  351. return map[string][]Update{}, map[string][]Alert{}, true, fmt.Errorf("while getting feedInfo: %w", err)
  352. }
  353. if _, ok := feedInfo.RealtimeFeeds[TRIP_UPDATES]; ok {
  354. err = getGtfsRealtimeMessages(TRIP_UPDATES, ctx.FeedID, feedInfo.RealtimeFeeds)
  355. if err != nil {
  356. return map[string][]Update{}, map[string][]Alert{}, true, fmt.Errorf("while getting updates: %w", err)
  357. }
  358. }
  359. if _, ok := feedInfo.RealtimeFeeds[VEHICLE_POSITIONS]; ok {
  360. // TODO should be moved to enrichDepartures and conditional (this, or custom API)
  361. err = getGtfsRealtimeMessages(VEHICLE_POSITIONS, ctx.FeedID, feedInfo.RealtimeFeeds)
  362. if err != nil {
  363. return map[string][]Update{}, map[string][]Alert{}, true, fmt.Errorf("while getting vehicles: %w", err)
  364. }
  365. }
  366. cacheMx.Lock()
  367. resultUpdates := updates[ctx.FeedID]
  368. cacheMx.Unlock()
  369. return resultUpdates, map[string][]Alert{}, true, nil
  370. }
  371. func getGtfsRealtimeVehicles(ctx Context, _, _ Position) ([]VehicleStatus, error) {
  372. feedInfo, err := getFeedInfo(ctx.DataHome, ctx.FeedID, ctx.Version)
  373. if err != nil {
  374. return []VehicleStatus{}, fmt.Errorf("while getting feedInfo: %w", err)
  375. }
  376. getGtfsRealtimeMessages(VEHICLE_POSITIONS, ctx.FeedID, feedInfo.RealtimeFeeds)
  377. cacheMx.Lock()
  378. vehicles := make([]VehicleStatus, len(vehicleStatuses[ctx.FeedID]))
  379. i := 0
  380. for _, status := range vehicleStatuses[ctx.FeedID] {
  381. vehicles[i] = status
  382. i++
  383. }
  384. cacheMx.Unlock()
  385. return vehicles, nil
  386. }
  387. func getGtfsRealtimeAlerts(stopID, _, tripID string, ctx Context, t *Traffic) ([]Alert, error) {
  388. feedInfo, err := getFeedInfo(ctx.DataHome, ctx.FeedID, ctx.Version)
  389. if err != nil {
  390. return []Alert{}, fmt.Errorf("while getting feedInfo: %w", err)
  391. }
  392. getGtfsRealtimeMessages(ALERTS, ctx.FeedID, feedInfo.RealtimeFeeds)
  393. cacheMx.Lock()
  394. resultAlertsMap := map[uint]Alert{}
  395. if tripID != "" {
  396. file, err := openTrips(ctx)
  397. if err != nil {
  398. return []Alert{}, fmt.Errorf("while opening trips: %w", err)
  399. }
  400. defer file.Close()
  401. trip, err := GetTrip(file, tripID, ctx, t)
  402. if err != nil {
  403. return []Alert{}, fmt.Errorf("while getting trip: %w", err)
  404. }
  405. line, err := GetLine(trip.LineID, ctx, t)
  406. if err != nil {
  407. return []Alert{}, fmt.Errorf("while getting line: %w", err)
  408. }
  409. for _, i := range alerts[ctx.FeedID].ByAgency[line.AgencyID] {
  410. resultAlertsMap[i] = alerts[ctx.FeedID].Alerts[i]
  411. }
  412. for _, i := range alerts[ctx.FeedID].ByLine[line.Id] {
  413. resultAlertsMap[i] = alerts[ctx.FeedID].Alerts[i]
  414. }
  415. for _, i := range alerts[ctx.FeedID].ByLine[line.Id] {
  416. resultAlertsMap[i] = alerts[ctx.FeedID].Alerts[i]
  417. }
  418. for _, i := range alerts[ctx.FeedID].ByLineType[line.Kind] {
  419. resultAlertsMap[i] = alerts[ctx.FeedID].Alerts[i]
  420. }
  421. }
  422. if stopID != "" {
  423. for _, i := range alerts[ctx.FeedID].ByStop[stopID] {
  424. resultAlertsMap[i] = alerts[ctx.FeedID].Alerts[i]
  425. }
  426. for _, i := range alerts[ctx.FeedID].ByTrip[tripID] {
  427. resultAlertsMap[i] = alerts[ctx.FeedID].Alerts[i]
  428. }
  429. }
  430. cacheMx.Unlock()
  431. resultAlerts := make([]Alert, len(resultAlertsMap))
  432. i := 0
  433. for _, alert := range resultAlertsMap {
  434. resultAlerts[i] = alert
  435. i++
  436. }
  437. return resultAlerts, nil
  438. }