|
- // SPDX-FileCopyrightText: Adam Evyčędo
- //
- // SPDX-License-Identifier: AGPL-3.0-or-later
- package traffic
- import (
- "apiote.xyz/p/szczanieckiej/gtfs_rt"
- pb "apiote.xyz/p/szczanieckiej/gtfs_rt/transit_realtime"
- "fmt"
- "math"
- "time"
- "golang.org/x/text/language"
- )
- // ........................ feedID
- var lastUpdatedGtfsRt = map[string]map[RealtimeFeedType]uint64{}
- func makeTimetableRelationshipFromTripTimetable(r pb.TripDescriptor_ScheduleRelationship) TimetableRelationship {
- switch r {
- case pb.TripDescriptor_ADDED:
- return TRIP_ADDED
- case pb.TripDescriptor_CANCELED:
- return TRIP_CANCELED
- case pb.TripDescriptor_DELETED:
- return TRIP_DELETED
- default:
- return TRIP_SCHEDULED
- }
- }
- func makeTimetableRelationshipFromStopTrip(r pb.TripUpdate_StopTimeUpdate_ScheduleRelationship) TimetableRelationship {
- switch r {
- case pb.TripUpdate_StopTimeUpdate_NO_DATA:
- return NO_TRIP_DATA
- case pb.TripUpdate_StopTimeUpdate_SKIPPED:
- return STOP_SKIPPED
- default:
- return TRIP_SCHEDULED
- }
- }
- func makeDepartureStatus(s pb.VehiclePosition_VehicleStopStatus) DepartureStatus {
- switch s {
- case pb.VehiclePosition_STOPPED_AT:
- return AT_STOP
- case pb.VehiclePosition_INCOMING_AT:
- return INCOMING
- default:
- return IN_TRANSIT
- }
- }
- func makeCongestionLevel(l pb.VehiclePosition_CongestionLevel) CongestionLevel {
- switch l {
- case pb.VehiclePosition_RUNNING_SMOOTHLY:
- return CONGESTION_SMOOTH
- case pb.VehiclePosition_STOP_AND_GO:
- return CONGESTION_STOP_AND_GO
- case pb.VehiclePosition_CONGESTION:
- return CONGESTION_SIGNIFICANT
- case pb.VehiclePosition_SEVERE_CONGESTION:
- return CONGESTION_SEVERE
- default:
- return CONGESTION_UNKNOWN
- }
- }
- // XXX default occupancy status in GTFS-realtime is VehiclePosition_EMPTY while VehiclePosition_NO_DATA exists
- func makeOccupancyStatus(s *pb.VehiclePosition_OccupancyStatus) OccupancyStatus {
- if s == nil {
- return OCCUPANCY_UNKNOWN
- }
- switch *s {
- case pb.VehiclePosition_EMPTY:
- return OCCUPANCY_EMPTY
- case pb.VehiclePosition_MANY_SEATS_AVAILABLE:
- return OCCUPANCY_MANY_AVAILABLE
- case pb.VehiclePosition_FEW_SEATS_AVAILABLE:
- return OCCUPANCY_FEW_AVAILABLE
- case pb.VehiclePosition_STANDING_ROOM_ONLY:
- return OCCUPANCY_STANDING_ONLY
- case pb.VehiclePosition_CRUSHED_STANDING_ROOM_ONLY:
- return OCCUPANCY_CRUSHED
- case pb.VehiclePosition_FULL:
- return OCCUPANCY_FULL
- case pb.VehiclePosition_NOT_ACCEPTING_PASSENGERS:
- fallthrough
- case pb.VehiclePosition_NOT_BOARDABLE:
- return OCCUPANCY_NOT_ACCEPTING
- default:
- return OCCUPANCY_UNKNOWN
- }
- }
- func makeWheelchairAccessibility(a pb.VehicleDescriptor_WheelchairAccessible) WheelchairAccessibility {
- switch a {
- case pb.VehicleDescriptor_NO_VALUE:
- return WHEELCHAIR_NO_DATA
- case pb.VehicleDescriptor_WHEELCHAIR_ACCESSIBLE:
- return WHEELCHAIR_INACCESSIBLE
- case pb.VehicleDescriptor_WHEELCHAIR_INACCESSIBLE:
- return WHEELCHAIR_INACCESSIBLE
- default:
- return WHEELCHAIR_UNKNOWN
- }
- }
- func makeLineType(routeType int32) LineType {
- switch routeType {
- case 0:
- return TRAM
- case 1:
- return METRO
- case 2:
- return RAIL
- case 3:
- return BUS
- case 4:
- return FERRY
- case 5:
- return CABLE_TRAM
- case 6:
- return CABLE_CAR
- case 7:
- return FUNICULAR
- case 11:
- return TROLLEYBUS
- case 12:
- return MONORAIL
- default:
- panic(fmt.Sprintf("unknown GTFS route type: %d", routeType))
- }
- }
- func getGtfsRtData(entities []*pb.FeedEntity, feedID string) map[RealtimeFeedType]int {
- cacheMx.Lock()
- if vehicleStatuses == nil {
- vehicleStatuses = map[string]map[string]VehicleStatus{}
- }
- if vehicleStatuses[feedID] == nil {
- vehicleStatuses[feedID] = map[string]VehicleStatus{}
- }
- if updates == nil {
- updates = map[string]map[string][]Update{}
- }
- if updates[feedID] == nil {
- updates[feedID] = map[string][]Update{}
- }
- if alerts == nil {
- alerts = map[string]Alerts{}
- }
- var alertNumber uint = 0
- which := map[RealtimeFeedType]int{}
- for _, entity := range entities {
- a := entity.Alert
- v := entity.Vehicle
- t := entity.TripUpdate
- if a != nil {
- if alertNumber == 0 {
- alerts[feedID] = Alerts{
- ByLine: map[string][]uint{},
- ByTrip: map[string][]uint{},
- ByLineType: map[LineType][]uint{},
- ByStop: map[string][]uint{},
- ByAgency: map[string][]uint{},
- }
- }
- which[ALERTS] = 1
- alert := Alert{
- Headers: map[language.Tag]string{},
- Descriptions: map[language.Tag]string{},
- URLs: map[language.Tag]string{},
- }
- for _, period := range a.ActivePeriod {
- if period != nil {
- timeRange := [2]time.Time{
- time.Unix(int64(*period.Start), 0),
- time.Unix(int64(*period.End), 0),
- }
- alert.TimeRanges = append(alert.TimeRanges, timeRange)
- }
- }
- if translations := a.HeaderText.GetTranslation(); translations != nil {
- for _, t := range translations {
- if t == nil || t.GetText() == "" {
- continue
- }
- if t.Language == nil || t.GetLanguage() == "" {
- alert.Headers[language.Und] = *t.Text
- }
- tag, err := language.Parse(t.GetLanguage())
- if err == nil {
- alert.Headers[tag] = *t.Text
- }
- }
- }
- if translations := a.DescriptionText.GetTranslation(); translations != nil {
- for _, t := range translations {
- if t == nil || t.GetText() == "" {
- continue
- }
- if t.Language == nil || t.GetLanguage() == "" {
- alert.Descriptions[language.Und] = *t.Text
- }
- tag, err := language.Parse(t.GetLanguage())
- if err == nil {
- alert.Descriptions[tag] = *t.Text
- }
- }
- }
- if translations := a.Url.GetTranslation(); translations != nil {
- for _, t := range translations {
- if t == nil || t.GetText() == "" {
- continue
- }
- if t.Language == nil || t.GetLanguage() == "" {
- alert.URLs[language.Und] = *t.Text
- }
- tag, err := language.Parse(t.GetLanguage())
- if err == nil {
- alert.URLs[tag] = *t.Text
- }
- }
- }
- alert.Cause = alertCauseOfGtfs(a.Cause)
- alert.Effect = alertEffectOfGtfs(a.Effect)
- for _, e := range a.InformedEntity {
- if e == nil {
- continue
- }
- if e.AgencyId != nil {
- alerts[feedID].ByAgency[*e.AgencyId] = append(alerts[feedID].ByAgency[*e.AgencyId], alertNumber)
- }
- if e.RouteId != nil {
- alerts[feedID].ByLine[*e.RouteId] = append(alerts[feedID].ByLine[*e.RouteId], alertNumber)
- }
- if e.StopId != nil {
- alerts[feedID].ByStop[*e.StopId] = append(alerts[feedID].ByStop[*e.StopId], alertNumber)
- }
- if e.Trip != nil && e.Trip.TripId != nil {
- alerts[feedID].ByTrip[*e.Trip.TripId] = append(alerts[feedID].ByTrip[*e.Trip.TripId], alertNumber)
- }
- if e.RouteType != nil {
- alerts[feedID].ByLineType[makeLineType(*e.RouteType)] = append(alerts[feedID].ByLineType[makeLineType(*e.RouteType)], alertNumber)
- }
- }
- alertNumber++
- alertsStruct := alerts[feedID]
- alertsStruct.Alerts = append(alerts[feedID].Alerts, alert)
- alerts[feedID] = alertsStruct
- }
- if v != nil {
- which[VEHICLE_POSITIONS] = 1
- tripUpdates := updates[feedID][*v.Trip.TripId]
- vehicleUpdate := VehicleStatus{
- Status: makeDepartureStatus(v.GetCurrentStatus()),
- CongestionLevel: makeCongestionLevel(v.GetCongestionLevel()),
- OccupancyStatus: makeOccupancyStatus(v.OccupancyStatus),
- VehicleID: v.GetVehicle().GetId(),
- Latitude: float64(v.GetPosition().GetLatitude()),
- Longitude: float64(v.GetPosition().GetLongitude()),
- Speed: v.GetPosition().GetSpeed(),
- Bearing: float64(v.GetPosition().GetBearing() * math.Pi / 180),
- TripID: *v.Trip.TripId,
- WheelchairAccessibility: makeWheelchairAccessibility(v.GetVehicle().GetWheelchairAccessible()),
- }
- vehicleStatuses[feedID][*v.Trip.TripId] = vehicleUpdate
- if len(tripUpdates) == 0 {
- tripUpdate := Update{
- StopSequence: v.GetCurrentStopSequence(),
- StopID: v.GetStopId(),
- VehicleStatus: vehicleUpdate,
- TimetableRelationship: NO_TRIP_DATA,
- }
- updates[feedID][*v.Trip.TripId] = []Update{tripUpdate}
- } else {
- for _, tripUpdate := range tripUpdates {
- tripUpdate.VehicleStatus = vehicleUpdate
- }
- updates[feedID][*v.Trip.TripId] = tripUpdates
- }
- }
- if t != nil {
- tripUpdates := make([]Update, len(t.StopTimeUpdate))
- startTimeString := t.GetTripProperties().GetStartTime()
- var startTime *uint
- if startTimeString != "" {
- sT, err := parseDepartureTime(startTimeString)
- if err != nil {
- continue
- }
- startTime = &sT
- }
- for i, stopTimeUpdate := range t.StopTimeUpdate {
- which[TRIP_UPDATES] = 1
- update := Update{
- StopSequence: stopTimeUpdate.GetStopSequence(),
- StopID: stopTimeUpdate.GetStopId(),
- VehicleStatus: vehicleStatuses[feedID][*t.Trip.TripId],
- StartTime: startTime,
- }
- update.VehicleStatus.TripID = *t.Trip.TripId
- update.Delay = stopTimeUpdate.GetArrival().GetDelay()
- if update.StopSequence == 0 && update.Delay < 0 {
- update.Delay = 0
- }
- updateTime := stopTimeUpdate.GetArrival().GetTime()
- if updateTime != 0 {
- update.TimeUTC = time.Unix(updateTime, 0).In(time.UTC).Format("150405")
- }
- stopTripRelationship := stopTimeUpdate.GetScheduleRelationship()
- if stopTripRelationship == pb.TripUpdate_StopTimeUpdate_SCHEDULED {
- tripTimetableRelationship := t.Trip.GetScheduleRelationship()
- update.TimetableRelationship = makeTimetableRelationshipFromTripTimetable(tripTimetableRelationship)
- } else {
- update.TimetableRelationship = makeTimetableRelationshipFromStopTrip(stopTripRelationship)
- }
- tripUpdates[i] = update
- }
- updates[feedID][*t.Trip.TripId] = tripUpdates
- }
- }
- cacheMx.Unlock()
- return which
- }
- func getGtfsRealtimeMessages(feedType RealtimeFeedType, feedID string, feeds map[RealtimeFeedType]string) error {
- now := uint64(time.Now().Unix())
- if lastUpdatedGtfsRt[feedID] == nil {
- lastUpdatedGtfsRt[feedID] = map[RealtimeFeedType]uint64{}
- }
- if passed := now - lastUpdatedGtfsRt[feedID][feedType]; passed > 30 {
- message, err := gtfs_rt.GetMessages(feedID, feeds[feedType])
- if err != nil {
- return BlockingError{fmt.Errorf("while getting messages: %w", err)}
- }
- /*
- cacheMx.Lock()
- switch feedType {
- case TRIP_UPDATES:
- updates = map[string]map[string][]Update{}
- case VEHICLE_POSITIONS:
- vehicleStatuses = map[string]map[string]VehicleStatus{}
- case ALERTS:
- alerts = map[string]Alerts{}
- }
- cacheMx.Unlock()
- */
- lastUpdatedGtfsRt[feedID][feedType] = now
- whichUpdated := getGtfsRtData(message.Entity, feedID)
- for key, value := range whichUpdated {
- if value == 1 {
- lastUpdatedGtfsRt[feedID][key] = now
- }
- }
- }
- return nil
- }
- func getGtfsRealtimeUpdates(_ string, _ int, _, _ string, ctx Context) (map[string][]Update, map[string][]Alert, bool, error) {
- feedInfo, err := getFeedInfo(ctx.DataHome, ctx.FeedID, ctx.Version)
- if err != nil {
- return map[string][]Update{}, map[string][]Alert{}, true, fmt.Errorf("while getting feedInfo: %w", err)
- }
- if _, ok := feedInfo.RealtimeFeeds[TRIP_UPDATES]; ok {
- err = getGtfsRealtimeMessages(TRIP_UPDATES, ctx.FeedID, feedInfo.RealtimeFeeds)
- if err != nil {
- return map[string][]Update{}, map[string][]Alert{}, true, fmt.Errorf("while getting updates: %w", err)
- }
- }
- if _, ok := feedInfo.RealtimeFeeds[VEHICLE_POSITIONS]; ok {
- // TODO should be moved to enrichDepartures and conditional (this, or custom API)
- err = getGtfsRealtimeMessages(VEHICLE_POSITIONS, ctx.FeedID, feedInfo.RealtimeFeeds)
- if err != nil {
- return map[string][]Update{}, map[string][]Alert{}, true, fmt.Errorf("while getting vehicles: %w", err)
- }
- }
- cacheMx.Lock()
- resultUpdates := updates[ctx.FeedID]
- cacheMx.Unlock()
- return resultUpdates, map[string][]Alert{}, true, nil
- }
- func getGtfsRealtimeVehicles(ctx Context, _, _ Position) ([]VehicleStatus, error) {
- feedInfo, err := getFeedInfo(ctx.DataHome, ctx.FeedID, ctx.Version)
- if err != nil {
- return []VehicleStatus{}, fmt.Errorf("while getting feedInfo: %w", err)
- }
- getGtfsRealtimeMessages(VEHICLE_POSITIONS, ctx.FeedID, feedInfo.RealtimeFeeds)
- cacheMx.Lock()
- vehicles := make([]VehicleStatus, len(vehicleStatuses[ctx.FeedID]))
- i := 0
- for _, status := range vehicleStatuses[ctx.FeedID] {
- vehicles[i] = status
- i++
- }
- cacheMx.Unlock()
- return vehicles, nil
- }
- func getGtfsRealtimeAlerts(stopID, _, tripID string, ctx Context, t *Traffic) ([]Alert, error) {
- feedInfo, err := getFeedInfo(ctx.DataHome, ctx.FeedID, ctx.Version)
- if err != nil {
- return []Alert{}, fmt.Errorf("while getting feedInfo: %w", err)
- }
- getGtfsRealtimeMessages(ALERTS, ctx.FeedID, feedInfo.RealtimeFeeds)
- cacheMx.Lock()
- resultAlertsMap := map[uint]Alert{}
- if tripID != "" {
- file, err := openTrips(ctx)
- if err != nil {
- return []Alert{}, fmt.Errorf("while opening trips: %w", err)
- }
- defer file.Close()
- trip, err := GetTrip(file, tripID, ctx, t)
- if err != nil {
- return []Alert{}, fmt.Errorf("while getting trip: %w", err)
- }
- line, err := GetLine(trip.LineID, ctx, t)
- if err != nil {
- return []Alert{}, fmt.Errorf("while getting line: %w", err)
- }
- for _, i := range alerts[ctx.FeedID].ByAgency[line.AgencyID] {
- resultAlertsMap[i] = alerts[ctx.FeedID].Alerts[i]
- }
- for _, i := range alerts[ctx.FeedID].ByLine[line.Id] {
- resultAlertsMap[i] = alerts[ctx.FeedID].Alerts[i]
- }
- for _, i := range alerts[ctx.FeedID].ByLine[line.Id] {
- resultAlertsMap[i] = alerts[ctx.FeedID].Alerts[i]
- }
- for _, i := range alerts[ctx.FeedID].ByLineType[line.Kind] {
- resultAlertsMap[i] = alerts[ctx.FeedID].Alerts[i]
- }
- }
- if stopID != "" {
- for _, i := range alerts[ctx.FeedID].ByStop[stopID] {
- resultAlertsMap[i] = alerts[ctx.FeedID].Alerts[i]
- }
- for _, i := range alerts[ctx.FeedID].ByTrip[tripID] {
- resultAlertsMap[i] = alerts[ctx.FeedID].Alerts[i]
- }
- }
- cacheMx.Unlock()
- resultAlerts := make([]Alert, len(resultAlertsMap))
- i := 0
- for _, alert := range resultAlertsMap {
- resultAlerts[i] = alert
- i++
- }
- return resultAlerts, nil
- }
|