realtime_gtfs.go 14 KB


  1. // SPDX-FileCopyrightText: Adam Evyčędo
  2. //
  3. // SPDX-License-Identifier: AGPL-3.0-or-later
  4. package traffic
  5. import (
  6. "strings"
  7. "apiote.xyz/p/szczanieckiej/gtfs_rt"
  8. pb "apiote.xyz/p/szczanieckiej/gtfs_rt/transit_realtime"
  9. "golang.org/x/text/language"
  10. "fmt"
  11. "math"
  12. "time"
  13. )
  14. // ........................ feedID
  15. var lastUpdatedGtfsRt = map[string]map[RealtimeFeedType]uint64{}
  16. func makeTimetableRelationshipFromTripTimetable(r pb.TripDescriptor_ScheduleRelationship) TimetableRelationship {
  17. switch r {
  18. case pb.TripDescriptor_ADDED:
  19. return TRIP_ADDED
  20. case pb.TripDescriptor_CANCELED:
  21. return TRIP_CANCELED
  22. case pb.TripDescriptor_DELETED:
  23. return TRIP_DELETED
  24. default:
  25. return TRIP_SCHEDULED
  26. }
  27. }
  28. func makeTimetableRelationshipFromStopTrip(r pb.TripUpdate_StopTimeUpdate_ScheduleRelationship) TimetableRelationship {
  29. switch r {
  30. case pb.TripUpdate_StopTimeUpdate_NO_DATA:
  31. return NO_TRIP_DATA
  32. case pb.TripUpdate_StopTimeUpdate_SKIPPED:
  33. return STOP_SKIPPED
  34. default:
  35. return TRIP_SCHEDULED
  36. }
  37. }
  38. func makeDepartureStatus(s pb.VehiclePosition_VehicleStopStatus) DepartureStatus {
  39. switch s {
  40. case pb.VehiclePosition_STOPPED_AT:
  41. return AT_STOP
  42. case pb.VehiclePosition_INCOMING_AT:
  43. return INCOMING
  44. default:
  45. return IN_TRANSIT
  46. }
  47. }
  48. func makeCongestionLevel(l pb.VehiclePosition_CongestionLevel) CongestionLevel {
  49. switch l {
  50. case pb.VehiclePosition_RUNNING_SMOOTHLY:
  51. return CONGESTION_SMOOTH
  52. case pb.VehiclePosition_STOP_AND_GO:
  53. return CONGESTION_STOP_AND_GO
  54. case pb.VehiclePosition_CONGESTION:
  55. return CONGESTION_SIGNIFICANT
  56. case pb.VehiclePosition_SEVERE_CONGESTION:
  57. return CONGESTION_SEVERE
  58. default:
  59. return CONGESTION_UNKNOWN
  60. }
  61. }
  62. // XXX default occupancy status in GTFS-realtime is VehiclePosition_EMPTY while VehiclePosition_NO_DATA exists
  63. func makeOccupancyStatus(s *pb.VehiclePosition_OccupancyStatus) OccupancyStatus {
  64. if s == nil {
  65. return OCCUPANCY_UNKNOWN
  66. }
  67. switch *s {
  68. case pb.VehiclePosition_EMPTY:
  69. return OCCUPANCY_EMPTY
  70. case pb.VehiclePosition_MANY_SEATS_AVAILABLE:
  71. return OCCUPANCY_MANY_AVAILABLE
  72. case pb.VehiclePosition_FEW_SEATS_AVAILABLE:
  73. return OCCUPANCY_FEW_AVAILABLE
  74. case pb.VehiclePosition_STANDING_ROOM_ONLY:
  75. return OCCUPANCY_STANDING_ONLY
  76. case pb.VehiclePosition_CRUSHED_STANDING_ROOM_ONLY:
  77. return OCCUPANCY_CRUSHED
  78. case pb.VehiclePosition_FULL:
  79. return OCCUPANCY_FULL
  80. case pb.VehiclePosition_NOT_ACCEPTING_PASSENGERS:
  81. fallthrough
  82. case pb.VehiclePosition_NOT_BOARDABLE:
  83. return OCCUPANCY_NOT_ACCEPTING
  84. default:
  85. return OCCUPANCY_UNKNOWN
  86. }
  87. }
  88. func makeWheelchairAccessibility(a pb.VehicleDescriptor_WheelchairAccessible) WheelchairAccessibility {
  89. switch a {
  90. case pb.VehicleDescriptor_NO_VALUE:
  91. return WHEELCHAIR_NO_DATA
  92. case pb.VehicleDescriptor_WHEELCHAIR_ACCESSIBLE:
  93. return WHEELCHAIR_INACCESSIBLE
  94. case pb.VehicleDescriptor_WHEELCHAIR_INACCESSIBLE:
  95. return WHEELCHAIR_INACCESSIBLE
  96. default:
  97. return WHEELCHAIR_UNKNOWN
  98. }
  99. }
  100. func makeLineType(routeType int32) LineType {
  101. switch routeType {
  102. case 0:
  103. return TRAM
  104. case 1:
  105. return METRO
  106. case 2:
  107. return RAIL
  108. case 3:
  109. return BUS
  110. case 4:
  111. return FERRY
  112. case 5:
  113. return CABLE_TRAM
  114. case 6:
  115. return CABLE_CAR
  116. case 7:
  117. return FUNICULAR
  118. case 11:
  119. return TROLLEYBUS
  120. case 12:
  121. return MONORAIL
  122. default:
  123. panic(fmt.Sprintf("unknown GTFS route type: %d", routeType))
  124. }
  125. }
  126. func getGtfsRtData(entities []*pb.FeedEntity, feedID string) map[RealtimeFeedType]int {
  127. cacheMx.Lock()
  128. if vehicleStatuses == nil {
  129. vehicleStatuses = map[string]map[string]VehicleStatus{}
  130. }
  131. if vehicleStatuses[feedID] == nil {
  132. vehicleStatuses[feedID] = map[string]VehicleStatus{}
  133. }
  134. if updates == nil {
  135. updates = map[string]map[string][]Update{}
  136. }
  137. if updates[feedID] == nil {
  138. updates[feedID] = map[string][]Update{}
  139. }
  140. if alerts == nil {
  141. alerts = map[string]Alerts{}
  142. }
  143. var alertNumber uint = 0
  144. which := map[RealtimeFeedType]int{}
  145. for _, entity := range entities {
  146. a := entity.Alert
  147. v := entity.Vehicle
  148. t := entity.TripUpdate
  149. if a != nil {
  150. if alertNumber == 0 {
  151. alerts[feedID] = Alerts{
  152. ByLine: map[string][]uint{},
  153. ByTrip: map[string][]uint{},
  154. ByLineType: map[LineType][]uint{},
  155. ByStop: map[string][]uint{},
  156. ByAgency: map[string][]uint{},
  157. }
  158. }
  159. which[ALERTS] = 1
  160. alert := Alert{
  161. Headers: map[language.Tag]string{},
  162. Descriptions: map[language.Tag]string{},
  163. URLs: map[language.Tag]string{},
  164. }
  165. for _, period := range a.ActivePeriod {
  166. if period != nil {
  167. timeRange := [2]time.Time{
  168. time.Unix(int64(*period.Start), 0),
  169. time.Unix(int64(*period.End), 0),
  170. }
  171. alert.TimeRanges = append(alert.TimeRanges, timeRange)
  172. }
  173. }
  174. if translations := a.HeaderText.GetTranslation(); translations != nil {
  175. for _, t := range translations {
  176. if t == nil || t.GetText() == "" {
  177. continue
  178. }
  179. if t.Language == nil || t.GetLanguage() == "" {
  180. alert.Headers[language.Und] = *t.Text
  181. }
  182. tag, err := language.Parse(t.GetLanguage())
  183. if err == nil {
  184. alert.Headers[tag] = *t.Text
  185. }
  186. }
  187. }
  188. if translations := a.DescriptionText.GetTranslation(); translations != nil {
  189. for _, t := range translations {
  190. if t == nil || t.GetText() == "" {
  191. continue
  192. }
  193. if t.Language == nil || t.GetLanguage() == "" {
  194. alert.Descriptions[language.Und] = *t.Text
  195. }
  196. tag, err := language.Parse(t.GetLanguage())
  197. if err == nil {
  198. alert.Descriptions[tag] = *t.Text
  199. }
  200. }
  201. }
  202. if translations := a.Url.GetTranslation(); translations != nil {
  203. for _, t := range translations {
  204. if t == nil || t.GetText() == "" {
  205. continue
  206. }
  207. if t.Language == nil || t.GetLanguage() == "" {
  208. alert.URLs[language.Und] = *t.Text
  209. }
  210. tag, err := language.Parse(t.GetLanguage())
  211. if err == nil {
  212. alert.URLs[tag] = *t.Text
  213. }
  214. }
  215. }
  216. alert.Cause = alertCauseOfGtfs(a.Cause)
  217. alert.Effect = alertEffectOfGtfs(a.Effect)
  218. for _, e := range a.InformedEntity {
  219. if e == nil {
  220. continue
  221. }
  222. if e.AgencyId != nil {
  223. alerts[feedID].ByAgency[*e.AgencyId] = append(alerts[feedID].ByAgency[*e.AgencyId], alertNumber)
  224. }
  225. if e.RouteId != nil {
  226. alerts[feedID].ByLine[*e.RouteId] = append(alerts[feedID].ByLine[*e.RouteId], alertNumber)
  227. }
  228. if e.StopId != nil {
  229. alerts[feedID].ByStop[*e.StopId] = append(alerts[feedID].ByStop[*e.StopId], alertNumber)
  230. }
  231. if e.Trip != nil && e.Trip.TripId != nil {
  232. alerts[feedID].ByTrip[*e.Trip.TripId] = append(alerts[feedID].ByTrip[*e.Trip.TripId], alertNumber)
  233. }
  234. if e.RouteType != nil {
  235. alerts[feedID].ByLineType[makeLineType(*e.RouteType)] = append(alerts[feedID].ByLineType[makeLineType(*e.RouteType)], alertNumber)
  236. }
  237. }
  238. alertNumber++
  239. alertsStruct := alerts[feedID]
  240. alertsStruct.Alerts = append(alerts[feedID].Alerts, alert)
  241. alerts[feedID] = alertsStruct
  242. }
  243. if v != nil {
  244. which[VEHICLE_POSITIONS] = 1
  245. tripUpdates := updates[feedID][*v.Trip.TripId]
  246. vehicleUpdate := VehicleStatus{
  247. Status: makeDepartureStatus(v.GetCurrentStatus()),
  248. CongestionLevel: makeCongestionLevel(v.GetCongestionLevel()),
  249. OccupancyStatus: makeOccupancyStatus(v.OccupancyStatus),
  250. VehicleID: v.GetVehicle().GetId(),
  251. Latitude: float64(v.GetPosition().GetLatitude()),
  252. Longitude: float64(v.GetPosition().GetLongitude()),
  253. Speed: v.GetPosition().GetSpeed(),
  254. Bearing: float64(v.GetPosition().GetBearing() * math.Pi / 180),
  255. TripID: *v.Trip.TripId,
  256. WheelchairAccessibility: makeWheelchairAccessibility(v.GetVehicle().GetWheelchairAccessible()),
  257. }
  258. vehicleStatuses[feedID][*v.Trip.TripId] = vehicleUpdate
  259. for i, tripUpdate := range tripUpdates {
  260. tripUpdate.VehicleStatus = vehicleUpdate
  261. tripUpdates[i] = tripUpdate
  262. }
  263. updates[feedID][*v.Trip.TripId] = tripUpdates
  264. }
  265. if t != nil {
  266. tripUpdates := make([]Update, len(t.StopTimeUpdate))
  267. for i, stopTimeUpdate := range t.StopTimeUpdate {
  268. which[TRIP_UPDATES] = 1
  269. update := Update{
  270. StopSequence: stopTimeUpdate.GetStopSequence(),
  271. StopID: stopTimeUpdate.GetStopId(),
  272. VehicleStatus: vehicleStatuses[feedID][*t.Trip.TripId],
  273. }
  274. update.VehicleStatus.TripID = *t.Trip.TripId
  275. update.Delay = stopTimeUpdate.GetArrival().GetDelay()
  276. if update.StopSequence == 0 && update.Delay < 0 {
  277. update.Delay = 0
  278. }
  279. updateTime := stopTimeUpdate.GetArrival().GetTime()
  280. if updateTime != 0 {
  281. update.TimeUTC = time.Unix(updateTime, 0).In(time.UTC).Format("150405")
  282. }
  283. stopTripRelationship := stopTimeUpdate.GetScheduleRelationship()
  284. if stopTripRelationship == pb.TripUpdate_StopTimeUpdate_SCHEDULED {
  285. tripTimetableRelationship := t.Trip.GetScheduleRelationship()
  286. update.TimetableRelationship = makeTimetableRelationshipFromTripTimetable(tripTimetableRelationship)
  287. } else {
  288. update.TimetableRelationship = makeTimetableRelationshipFromStopTrip(stopTripRelationship)
  289. }
  290. tripUpdates[i] = update
  291. }
  292. updates[feedID][*t.Trip.TripId] = tripUpdates
  293. }
  294. }
  295. cacheMx.Unlock()
  296. return which
  297. }
  298. func getGtfsRealtimeMessages(feedType RealtimeFeedType, feedID string, feeds map[RealtimeFeedType]string) error {
  299. now := uint64(time.Now().Unix())
  300. if lastUpdatedGtfsRt[feedID] == nil {
  301. lastUpdatedGtfsRt[feedID] = map[RealtimeFeedType]uint64{}
  302. }
  303. if passed := now - lastUpdatedGtfsRt[feedID][feedType]; passed > 30 {
  304. for _, feedURL := range strings.Split(feeds[feedType], " ") {
  305. message, err := gtfs_rt.GetMessages(feedID, feedURL)
  306. if err != nil {
  307. return BlockingError{fmt.Errorf("while getting messages: %w", err)}
  308. }
  309. /*
  310. cacheMx.Lock()
  311. switch feedType {
  312. case TRIP_UPDATES:
  313. updates = map[string]map[string][]Update{}
  314. case VEHICLE_POSITIONS:
  315. vehicleStatuses = map[string]map[string]VehicleStatus{}
  316. case ALERTS:
  317. alerts = map[string]Alerts{}
  318. }
  319. cacheMx.Unlock()
  320. */
  321. whichUpdated := getGtfsRtData(message.Entity, feedID)
  322. for key, value := range whichUpdated {
  323. if value == 1 {
  324. lastUpdatedGtfsRt[feedID][key] = now
  325. }
  326. }
  327. }
  328. }
  329. return nil
  330. }
  331. func getGtfsRealtimeUpdates(_ string, _ int, _, _ string, ctx Context) (map[string][]Update, map[string][]Alert, bool, error) {
  332. feedInfo, err := getFeedInfo(ctx.DataHome, ctx.FeedID, ctx.Version)
  333. if err != nil {
  334. return map[string][]Update{}, map[string][]Alert{}, true, fmt.Errorf("while getting feedInfo: %w", err)
  335. }
  336. err = getGtfsRealtimeMessages(TRIP_UPDATES, ctx.FeedID, feedInfo.RealtimeFeeds)
  337. if err != nil {
  338. return map[string][]Update{}, map[string][]Alert{}, true, fmt.Errorf("while getting updates: %w", err)
  339. }
  340. if _, ok := feedInfo.RealtimeFeeds[VEHICLE_POSITIONS]; ok {
  341. // TODO should be moved to enrichDepartures and conditional (this, or custom API)
  342. err = getGtfsRealtimeMessages(VEHICLE_POSITIONS, ctx.FeedID, feedInfo.RealtimeFeeds)
  343. if err != nil {
  344. return map[string][]Update{}, map[string][]Alert{}, true, fmt.Errorf("while getting vehicles: %w", err)
  345. }
  346. }
  347. cacheMx.Lock()
  348. resultUpdates := updates[ctx.FeedID]
  349. cacheMx.Unlock()
  350. return resultUpdates, map[string][]Alert{}, true, nil
  351. }
  352. func getGtfsRealtimeVehicles(ctx Context, lb, rt Position) ([]VehicleStatus, error) {
  353. feedInfo, err := getFeedInfo(ctx.DataHome, ctx.FeedID, ctx.Version)
  354. if err != nil {
  355. return []VehicleStatus{}, fmt.Errorf("while getting feedInfo: %w", err)
  356. }
  357. getGtfsRealtimeMessages(VEHICLE_POSITIONS, ctx.FeedID, feedInfo.RealtimeFeeds)
  358. cacheMx.Lock()
  359. vehicles := make([]VehicleStatus, len(vehicleStatuses[ctx.FeedID]))
  360. i := 0
  361. for _, status := range vehicleStatuses[ctx.FeedID] {
  362. vehicles[i] = status
  363. i++
  364. }
  365. cacheMx.Unlock()
  366. vehiclesFiltered := []VehicleStatus{}
  367. for _, vehicleRt := range vehicles {
  368. if rt.Lat < float64(vehicleRt.Latitude) || lb.Lat > float64(vehicleRt.Latitude) {
  369. continue
  370. }
  371. lon := float64(vehicleRt.Longitude)
  372. if lb.Lon < rt.Lon {
  373. if lb.Lon < lon && lon < rt.Lon {
  374. vehiclesFiltered = append(vehiclesFiltered, vehicleRt)
  375. }
  376. } else {
  377. if lon > lb.Lon || lon < rt.Lon {
  378. vehiclesFiltered = append(vehiclesFiltered, vehicleRt)
  379. }
  380. }
  381. }
  382. return vehiclesFiltered, nil
  383. }
  384. func getGtfsRealtimeAlerts(stopID, _, tripID string, ctx Context, t *Traffic) ([]Alert, error) {
  385. feedInfo, err := getFeedInfo(ctx.DataHome, ctx.FeedID, ctx.Version)
  386. if err != nil {
  387. return []Alert{}, fmt.Errorf("while getting feedInfo: %w", err)
  388. }
  389. getGtfsRealtimeMessages(ALERTS, ctx.FeedID, feedInfo.RealtimeFeeds)
  390. cacheMx.Lock()
  391. resultAlertsMap := map[uint]Alert{}
  392. if tripID != "" {
  393. trip, err := GetTrip(tripID, ctx, t)
  394. if err != nil {
  395. return []Alert{}, fmt.Errorf("while getting trip: %w", err)
  396. }
  397. line, err := GetLine(trip.LineID, ctx, t)
  398. if err != nil {
  399. return []Alert{}, fmt.Errorf("while getting line: %w", err)
  400. }
  401. for _, i := range alerts[ctx.FeedID].ByAgency[line.AgencyID] {
  402. resultAlertsMap[i] = alerts[ctx.FeedID].Alerts[i]
  403. }
  404. for _, i := range alerts[ctx.FeedID].ByLine[line.Id] {
  405. resultAlertsMap[i] = alerts[ctx.FeedID].Alerts[i]
  406. }
  407. for _, i := range alerts[ctx.FeedID].ByLine[line.Id] {
  408. resultAlertsMap[i] = alerts[ctx.FeedID].Alerts[i]
  409. }
  410. for _, i := range alerts[ctx.FeedID].ByLineType[line.Kind] {
  411. resultAlertsMap[i] = alerts[ctx.FeedID].Alerts[i]
  412. }
  413. }
  414. if stopID != "" {
  415. for _, i := range alerts[ctx.FeedID].ByStop[stopID] {
  416. resultAlertsMap[i] = alerts[ctx.FeedID].Alerts[i]
  417. }
  418. for _, i := range alerts[ctx.FeedID].ByTrip[tripID] {
  419. resultAlertsMap[i] = alerts[ctx.FeedID].Alerts[i]
  420. }
  421. }
  422. cacheMx.Unlock()
  423. resultAlerts := make([]Alert, len(resultAlertsMap))
  424. i := 0
  425. for _, alert := range resultAlertsMap {
  426. resultAlerts[i] = alert
  427. i++
  428. }
  429. return resultAlerts, nil
  430. }