realtime_gtfs.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452
  1. // SPDX-FileCopyrightText: Adam Evyčędo
  2. //
  3. // SPDX-License-Identifier: AGPL-3.0-or-later
  4. package traffic
  5. import (
  6. "apiote.xyz/p/szczanieckiej/gtfs_rt"
  7. pb "apiote.xyz/p/szczanieckiej/gtfs_rt/transit_realtime"
  8. "golang.org/x/text/language"
  9. "fmt"
  10. "math"
  11. "time"
  12. )
  13. // ........................ feedID
  14. var lastUpdatedGtfsRt = map[string]map[RealtimeFeedType]uint64{}
  15. func makeTimetableRelationshipFromTripTimetable(r pb.TripDescriptor_ScheduleRelationship) TimetableRelationship {
  16. switch r {
  17. case pb.TripDescriptor_ADDED:
  18. return TRIP_ADDED
  19. case pb.TripDescriptor_CANCELED:
  20. return TRIP_CANCELED
  21. case pb.TripDescriptor_DELETED:
  22. return TRIP_DELETED
  23. default:
  24. return TRIP_SCHEDULED
  25. }
  26. }
  27. func makeTimetableRelationshipFromStopTrip(r pb.TripUpdate_StopTimeUpdate_ScheduleRelationship) TimetableRelationship {
  28. switch r {
  29. case pb.TripUpdate_StopTimeUpdate_NO_DATA:
  30. return NO_TRIP_DATA
  31. case pb.TripUpdate_StopTimeUpdate_SKIPPED:
  32. return STOP_SKIPPED
  33. default:
  34. return TRIP_SCHEDULED
  35. }
  36. }
  37. func makeDepartureStatus(s pb.VehiclePosition_VehicleStopStatus) DepartureStatus {
  38. switch s {
  39. case pb.VehiclePosition_STOPPED_AT:
  40. return AT_STOP
  41. case pb.VehiclePosition_INCOMING_AT:
  42. return INCOMING
  43. default:
  44. return IN_TRANSIT
  45. }
  46. }
  47. func makeCongestionLevel(l pb.VehiclePosition_CongestionLevel) CongestionLevel {
  48. switch l {
  49. case pb.VehiclePosition_RUNNING_SMOOTHLY:
  50. return CONGESTION_SMOOTH
  51. case pb.VehiclePosition_STOP_AND_GO:
  52. return CONGESTION_STOP_AND_GO
  53. case pb.VehiclePosition_CONGESTION:
  54. return CONGESTION_SIGNIFICANT
  55. case pb.VehiclePosition_SEVERE_CONGESTION:
  56. return CONGESTION_SEVERE
  57. default:
  58. return CONGESTION_UNKNOWN
  59. }
  60. }
  61. // XXX default occupancy status in GTFS-realtime is VehiclePosition_EMPTY while VehiclePosition_NO_DATA exists
  62. func makeOccupancyStatus(s *pb.VehiclePosition_OccupancyStatus) OccupancyStatus {
  63. if s == nil {
  64. return OCCUPANCY_UNKNOWN
  65. }
  66. switch *s {
  67. case pb.VehiclePosition_EMPTY:
  68. return OCCUPANCY_EMPTY
  69. case pb.VehiclePosition_MANY_SEATS_AVAILABLE:
  70. return OCCUPANCY_MANY_AVAILABLE
  71. case pb.VehiclePosition_FEW_SEATS_AVAILABLE:
  72. return OCCUPANCY_FEW_AVAILABLE
  73. case pb.VehiclePosition_STANDING_ROOM_ONLY:
  74. return OCCUPANCY_STANDING_ONLY
  75. case pb.VehiclePosition_CRUSHED_STANDING_ROOM_ONLY:
  76. return OCCUPANCY_CRUSHED
  77. case pb.VehiclePosition_FULL:
  78. return OCCUPANCY_FULL
  79. case pb.VehiclePosition_NOT_ACCEPTING_PASSENGERS:
  80. fallthrough
  81. case pb.VehiclePosition_NOT_BOARDABLE:
  82. return OCCUPANCY_NOT_ACCEPTING
  83. default:
  84. return OCCUPANCY_UNKNOWN
  85. }
  86. }
  87. func makeWheelchairAccessibility(a pb.VehicleDescriptor_WheelchairAccessible) WheelchairAccessibility {
  88. switch a {
  89. case pb.VehicleDescriptor_NO_VALUE:
  90. return WHEELCHAIR_NO_DATA
  91. case pb.VehicleDescriptor_WHEELCHAIR_ACCESSIBLE:
  92. return WHEELCHAIR_INACCESSIBLE
  93. case pb.VehicleDescriptor_WHEELCHAIR_INACCESSIBLE:
  94. return WHEELCHAIR_INACCESSIBLE
  95. default:
  96. return WHEELCHAIR_UNKNOWN
  97. }
  98. }
  99. func makeLineType(routeType int32) LineType {
  100. switch routeType {
  101. case 0:
  102. return TRAM
  103. case 1:
  104. return METRO
  105. case 2:
  106. return RAIL
  107. case 3:
  108. return BUS
  109. case 4:
  110. return FERRY
  111. case 5:
  112. return CABLE_TRAM
  113. case 6:
  114. return CABLE_CAR
  115. case 7:
  116. return FUNICULAR
  117. case 11:
  118. return TROLLEYBUS
  119. case 12:
  120. return MONORAIL
  121. default:
  122. panic(fmt.Sprintf("unknown GTFS route type: %d", routeType))
  123. }
  124. }
  125. func getGtfsRtData(entities []*pb.FeedEntity, feedID string) map[RealtimeFeedType]int {
  126. cacheMx.Lock()
  127. if vehicleStatuses == nil {
  128. vehicleStatuses = map[string]map[string]VehicleStatus{}
  129. }
  130. if vehicleStatuses[feedID] == nil {
  131. vehicleStatuses[feedID] = map[string]VehicleStatus{}
  132. }
  133. if updates == nil {
  134. updates = map[string]map[string][]Update{}
  135. }
  136. if updates[feedID] == nil {
  137. updates[feedID] = map[string][]Update{}
  138. }
  139. if alerts == nil {
  140. alerts = map[string]Alerts{}
  141. }
  142. var alertNumber uint = 0
  143. which := map[RealtimeFeedType]int{}
  144. for _, entity := range entities {
  145. a := entity.Alert
  146. v := entity.Vehicle
  147. t := entity.TripUpdate
  148. if a != nil {
  149. if alertNumber == 0 {
  150. alerts[feedID] = Alerts{
  151. ByLine: map[string][]uint{},
  152. ByTrip: map[string][]uint{},
  153. ByLineType: map[LineType][]uint{},
  154. ByStop: map[string][]uint{},
  155. ByAgency: map[string][]uint{},
  156. }
  157. }
  158. which[ALERTS] = 1
  159. alert := Alert{
  160. Headers: map[language.Tag]string{},
  161. Descriptions: map[language.Tag]string{},
  162. URLs: map[language.Tag]string{},
  163. }
  164. for _, period := range a.ActivePeriod {
  165. if period != nil {
  166. timeRange := [2]time.Time{
  167. time.Unix(int64(*period.Start), 0),
  168. time.Unix(int64(*period.End), 0),
  169. }
  170. alert.TimeRanges = append(alert.TimeRanges, timeRange)
  171. }
  172. }
  173. if translations := a.HeaderText.GetTranslation(); translations != nil {
  174. for _, t := range translations {
  175. if t == nil || t.GetText() == "" {
  176. continue
  177. }
  178. if t.Language == nil || t.GetLanguage() == "" {
  179. alert.Headers[language.Und] = *t.Text
  180. }
  181. tag, err := language.Parse(t.GetLanguage())
  182. if err == nil {
  183. alert.Headers[tag] = *t.Text
  184. }
  185. }
  186. }
  187. if translations := a.DescriptionText.GetTranslation(); translations != nil {
  188. for _, t := range translations {
  189. if t == nil || t.GetText() == "" {
  190. continue
  191. }
  192. if t.Language == nil || t.GetLanguage() == "" {
  193. alert.Descriptions[language.Und] = *t.Text
  194. }
  195. tag, err := language.Parse(t.GetLanguage())
  196. if err == nil {
  197. alert.Descriptions[tag] = *t.Text
  198. }
  199. }
  200. }
  201. if translations := a.Url.GetTranslation(); translations != nil {
  202. for _, t := range translations {
  203. if t == nil || t.GetText() == "" {
  204. continue
  205. }
  206. if t.Language == nil || t.GetLanguage() == "" {
  207. alert.URLs[language.Und] = *t.Text
  208. }
  209. tag, err := language.Parse(t.GetLanguage())
  210. if err == nil {
  211. alert.URLs[tag] = *t.Text
  212. }
  213. }
  214. }
  215. alert.Cause = alertCauseOfGtfs(a.Cause)
  216. alert.Effect = alertEffectOfGtfs(a.Effect)
  217. for _, e := range a.InformedEntity {
  218. if e == nil {
  219. continue
  220. }
  221. if e.AgencyId != nil {
  222. alerts[feedID].ByAgency[*e.AgencyId] = append(alerts[feedID].ByAgency[*e.AgencyId], alertNumber)
  223. }
  224. if e.RouteId != nil {
  225. alerts[feedID].ByLine[*e.RouteId] = append(alerts[feedID].ByLine[*e.RouteId], alertNumber)
  226. }
  227. if e.StopId != nil {
  228. alerts[feedID].ByStop[*e.StopId] = append(alerts[feedID].ByStop[*e.StopId], alertNumber)
  229. }
  230. if e.Trip != nil && e.Trip.TripId != nil {
  231. alerts[feedID].ByTrip[*e.Trip.TripId] = append(alerts[feedID].ByTrip[*e.Trip.TripId], alertNumber)
  232. }
  233. if e.RouteType != nil {
  234. alerts[feedID].ByLineType[makeLineType(*e.RouteType)] = append(alerts[feedID].ByLineType[makeLineType(*e.RouteType)], alertNumber)
  235. }
  236. }
  237. alertNumber++
  238. alertsStruct := alerts[feedID]
  239. alertsStruct.Alerts = append(alerts[feedID].Alerts, alert)
  240. alerts[feedID] = alertsStruct
  241. }
  242. if v != nil {
  243. which[VEHICLE_POSITIONS] = 1
  244. tripUpdates := updates[feedID][*v.Trip.TripId]
  245. vehicleUpdate := VehicleStatus{
  246. Status: makeDepartureStatus(v.GetCurrentStatus()),
  247. CongestionLevel: makeCongestionLevel(v.GetCongestionLevel()),
  248. OccupancyStatus: makeOccupancyStatus(v.OccupancyStatus),
  249. VehicleID: v.GetVehicle().GetId(),
  250. Latitude: float64(v.GetPosition().GetLatitude()),
  251. Longitude: float64(v.GetPosition().GetLongitude()),
  252. Speed: v.GetPosition().GetSpeed(),
  253. Bearing: float64(v.GetPosition().GetBearing() * math.Pi / 180),
  254. TripID: *v.Trip.TripId,
  255. WheelchairAccessibility: makeWheelchairAccessibility(v.GetVehicle().GetWheelchairAccessible()),
  256. }
  257. vehicleStatuses[feedID][*v.Trip.TripId] = vehicleUpdate
  258. for _, tripUpdate := range tripUpdates {
  259. tripUpdate.VehicleStatus = vehicleUpdate
  260. }
  261. updates[feedID][*v.Trip.TripId] = tripUpdates
  262. }
  263. if t != nil {
  264. tripUpdates := make([]Update, len(t.StopTimeUpdate))
  265. for i, stopTimeUpdate := range t.StopTimeUpdate {
  266. which[TRIP_UPDATES] = 1
  267. update := Update{
  268. StopSequence: stopTimeUpdate.GetStopSequence(),
  269. StopID: stopTimeUpdate.GetStopId(),
  270. VehicleStatus: vehicleStatuses[feedID][*t.Trip.TripId],
  271. }
  272. update.VehicleStatus.TripID = *t.Trip.TripId
  273. update.Delay = stopTimeUpdate.GetArrival().GetDelay()
  274. if update.StopSequence == 0 && update.Delay < 0 {
  275. update.Delay = 0
  276. }
  277. updateTime := stopTimeUpdate.GetArrival().GetTime()
  278. if updateTime != 0 {
  279. update.TimeUTC = time.Unix(updateTime, 0).In(time.UTC).Format("150405")
  280. }
  281. stopTripRelationship := stopTimeUpdate.GetScheduleRelationship()
  282. if stopTripRelationship == pb.TripUpdate_StopTimeUpdate_SCHEDULED {
  283. tripTimetableRelationship := t.Trip.GetScheduleRelationship()
  284. update.TimetableRelationship = makeTimetableRelationshipFromTripTimetable(tripTimetableRelationship)
  285. } else {
  286. update.TimetableRelationship = makeTimetableRelationshipFromStopTrip(stopTripRelationship)
  287. }
  288. tripUpdates[i] = update
  289. }
  290. updates[feedID][*t.Trip.TripId] = tripUpdates
  291. }
  292. }
  293. cacheMx.Unlock()
  294. return which
  295. }
  296. func getGtfsRealtimeMessages(feedType RealtimeFeedType, feedID string, feeds map[RealtimeFeedType]string) error {
  297. now := uint64(time.Now().Unix())
  298. if lastUpdatedGtfsRt[feedID] == nil {
  299. lastUpdatedGtfsRt[feedID] = map[RealtimeFeedType]uint64{}
  300. }
  301. if passed := now - lastUpdatedGtfsRt[feedID][feedType]; passed > 30 {
  302. message, err := gtfs_rt.GetMessages(feedID, feeds[feedType])
  303. if err != nil {
  304. return BlockingError{fmt.Errorf("while getting messages: %w", err)}
  305. }
  306. /*
  307. cacheMx.Lock()
  308. switch feedType {
  309. case TRIP_UPDATES:
  310. updates = map[string]map[string][]Update{}
  311. case VEHICLE_POSITIONS:
  312. vehicleStatuses = map[string]map[string]VehicleStatus{}
  313. case ALERTS:
  314. alerts = map[string]Alerts{}
  315. }
  316. cacheMx.Unlock()
  317. */
  318. whichUpdated := getGtfsRtData(message.Entity, feedID)
  319. for key, value := range whichUpdated {
  320. if value == 1 {
  321. lastUpdatedGtfsRt[feedID][key] = now
  322. }
  323. }
  324. }
  325. return nil
  326. }
  327. func getGtfsRealtimeUpdates(_ string, _ int, _, _ string, ctx Context) (map[string][]Update, map[string][]Alert, bool, error) {
  328. feedInfo, err := getFeedInfo(ctx.DataHome, ctx.FeedID, ctx.Version)
  329. if err != nil {
  330. return map[string][]Update{}, map[string][]Alert{}, true, fmt.Errorf("while getting feedInfo: %w", err)
  331. }
  332. err = getGtfsRealtimeMessages(TRIP_UPDATES, ctx.FeedID, feedInfo.RealtimeFeeds)
  333. if err != nil {
  334. return map[string][]Update{}, map[string][]Alert{}, true, fmt.Errorf("while getting updates: %w", err)
  335. }
  336. if _, ok := feedInfo.RealtimeFeeds[VEHICLE_POSITIONS]; ok {
  337. // TODO should be moved to enrichDepartures and conditional (this, or custom API)
  338. err = getGtfsRealtimeMessages(VEHICLE_POSITIONS, ctx.FeedID, feedInfo.RealtimeFeeds)
  339. if err != nil {
  340. return map[string][]Update{}, map[string][]Alert{}, true, fmt.Errorf("while getting vehicles: %w", err)
  341. }
  342. }
  343. cacheMx.Lock()
  344. resultUpdates := updates[ctx.FeedID]
  345. cacheMx.Unlock()
  346. return resultUpdates, map[string][]Alert{}, true, nil
  347. }
  348. func getGtfsRealtimeVehicles(ctx Context, _, _ Position) ([]VehicleStatus, error) {
  349. feedInfo, err := getFeedInfo(ctx.DataHome, ctx.FeedID, ctx.Version)
  350. if err != nil {
  351. return []VehicleStatus{}, fmt.Errorf("while getting feedInfo: %w", err)
  352. }
  353. getGtfsRealtimeMessages(VEHICLE_POSITIONS, ctx.FeedID, feedInfo.RealtimeFeeds)
  354. cacheMx.Lock()
  355. vehicles := make([]VehicleStatus, len(vehicleStatuses[ctx.FeedID]))
  356. i := 0
  357. for _, status := range vehicleStatuses[ctx.FeedID] {
  358. vehicles[i] = status
  359. i++
  360. }
  361. cacheMx.Unlock()
  362. return vehicles, nil
  363. }
  364. func getGtfsRealtimeAlerts(stopID, _, tripID string, ctx Context, t *Traffic) ([]Alert, error) {
  365. feedInfo, err := getFeedInfo(ctx.DataHome, ctx.FeedID, ctx.Version)
  366. if err != nil {
  367. return []Alert{}, fmt.Errorf("while getting feedInfo: %w", err)
  368. }
  369. getGtfsRealtimeMessages(ALERTS, ctx.FeedID, feedInfo.RealtimeFeeds)
  370. cacheMx.Lock()
  371. resultAlertsMap := map[uint]Alert{}
  372. if tripID != "" {
  373. trip, err := GetTrip(tripID, ctx, t)
  374. if err != nil {
  375. return []Alert{}, fmt.Errorf("while getting trip: %w", err)
  376. }
  377. line, err := GetLine(trip.LineID, ctx, t)
  378. if err != nil {
  379. return []Alert{}, fmt.Errorf("while getting line: %w", err)
  380. }
  381. for _, i := range alerts[ctx.FeedID].ByAgency[line.AgencyID] {
  382. resultAlertsMap[i] = alerts[ctx.FeedID].Alerts[i]
  383. }
  384. for _, i := range alerts[ctx.FeedID].ByLine[line.Id] {
  385. resultAlertsMap[i] = alerts[ctx.FeedID].Alerts[i]
  386. }
  387. for _, i := range alerts[ctx.FeedID].ByLine[line.Id] {
  388. resultAlertsMap[i] = alerts[ctx.FeedID].Alerts[i]
  389. }
  390. for _, i := range alerts[ctx.FeedID].ByLineType[line.Kind] {
  391. resultAlertsMap[i] = alerts[ctx.FeedID].Alerts[i]
  392. }
  393. }
  394. if stopID != "" {
  395. for _, i := range alerts[ctx.FeedID].ByStop[stopID] {
  396. resultAlertsMap[i] = alerts[ctx.FeedID].Alerts[i]
  397. }
  398. for _, i := range alerts[ctx.FeedID].ByTrip[tripID] {
  399. resultAlertsMap[i] = alerts[ctx.FeedID].Alerts[i]
  400. }
  401. }
  402. cacheMx.Unlock()
  403. resultAlerts := make([]Alert, len(resultAlertsMap))
  404. i := 0
  405. for _, alert := range resultAlertsMap {
  406. resultAlerts[i] = alert
  407. i++
  408. }
  409. return resultAlerts, nil
  410. }