|
- // SPDX-FileCopyrightText: Adam Evyčędo
- //
- // SPDX-License-Identifier: AGPL-3.0-or-later
- package traffic
- // TODO(BAF10) direction (0|1) to const (TO|BACK)
- // TODO Agency.language, FeedInfo.language -> IETF language tag
- // TODO Agency.phoneNumber -> E.123 format
- import (
- "text/template"
- "apiote.xyz/p/szczanieckiej/config"
- "apiote.xyz/p/szczanieckiej/file"
- "bufio"
- "embed"
- "encoding/csv"
- "errors"
- "fmt"
- "io"
- "io/fs"
- "log"
- "net/http"
- "os"
- "path/filepath"
- "sort"
- "strings"
- "syscall"
- "time"
- "gopkg.in/yaml.v3"
- gott2 "apiote.xyz/p/gott/v2"
- "git.sr.ht/~sircmpwn/go-bare"
- "notabug.org/apiote/gott"
- )
- //go:embed realtime_lua
- var luaScripts embed.FS
- type _LineGraph struct {
- StopCodesArray []string
- StopCodes map[string]int
- NextNodes map[int]map[int]struct{}
- }
- type ErrEmpty struct{}
- func (ErrEmpty) Error() string {
- return ""
- }
- type result struct {
- config config.Config
- pid int
- tmpPath string
- feed Feed
- feedName string
- location *time.Location
- tmpFeedPath string
- homeFeedPath string
- downloadedVersions []Version
- allVersions []Version
- gtfsFilenames []string
- missingVersions []Version
- updatesFile *os.File
- updates map[string]string
- etags map[string]string
- newEtags map[string]string
- feedTranslations embed.FS
- }
- type feedConverter struct {
- TmpFeedPath string
- GtfsFilename string
- Feed Feed
- HomeFeedPath string
- feedTranslations embed.FS
- config config.Config
- Timezone *time.Location
- TrafficCalendarFile *os.File
- Departures map[string][]Departure
- TripsThroughStop map[string]map[string]StopOrder
- LineNames map[string]string
- TripsOffsets map[string]uint
- TripChangeOpts map[string]ChangeOption
- StopsCodeIndex CodeIndex
- StopsNameIndex map[string][]uint
- Stops map[string]string
- LineGraphs map[string]map[uint]LineGraph
- lineHeadsigns map[string]map[uint][]string
- LineIndex map[string][]uint
- LineIdIndex CodeIndex
- ValidFrom time.Time
- ValidFromError []error
- ValidTill time.Time
- ValidTillError []error
- tripHeadsigns map[string]string
- stopNames map[string]string
- feedInfo FeedInfo
- defaultLanguage string
- translations map[string]map[string]string
- schedules map[string]Schedule
- trips map[string]Trip
- }
- // helper functions
- func translateFieldDefault(key, feedLanguage, defaultLanguage string, translations map[string]map[string]string) string {
- if feedLanguage == "mul" {
- if value, ok := translations[key][defaultLanguage]; !ok {
- return key
- } else {
- return value
- }
- }
- return key
- }
- func translateField(key, feedLanguage, defaultLanguage string, translations map[string]map[string]string) []Translation {
- var result []Translation
- if feedLanguage == "mul" {
- if value, ok := translations[key][defaultLanguage]; !ok {
- result = []Translation{{Language: defaultLanguage, Value: key}}
- } else {
- result = []Translation{{Language: defaultLanguage, Value: value}}
- }
- }
- for language, value := range translations[key] {
- if language == defaultLanguage {
- continue
- }
- result = append(result, Translation{Language: language, Value: value})
- }
- return result
- }
- func hex2colour(hex string) Colour {
- if hex[0] == '#' {
- hex = hex[1:]
- }
- colour := Colour{
- A: 0xff,
- }
- hexToByte := func(b byte) byte {
- switch {
- case b >= '0' && b <= '9':
- return b - '0'
- case b >= 'a' && b <= 'f':
- return b - 'a' + 10
- case b >= 'A' && b <= 'F':
- return b - 'A' + 10
- default:
- return 0
- }
- }
- switch len(hex) {
- case 6:
- colour.R = hexToByte(hex[0])<<4 + hexToByte(hex[1])
- colour.G = hexToByte(hex[2])<<4 + hexToByte(hex[3])
- colour.B = hexToByte(hex[4])<<4 + hexToByte(hex[5])
- case 3:
- colour.R = hexToByte(hex[0])<<4 + hexToByte(hex[0])
- colour.G = hexToByte(hex[1])<<4 + hexToByte(hex[1])
- colour.B = hexToByte(hex[2])<<4 + hexToByte(hex[2])
- }
- return colour
- }
- func readEtags(cfg config.Config) (map[string]string, error) {
- etagsFilename := filepath.Join(cfg.FeedsPath, "etags.bare")
- etagsFile, err := os.Open(etagsFilename)
- if err != nil {
- var pathError *os.PathError
- if errors.As(err, &pathError) && errors.Is(pathError, fs.ErrNotExist) {
- return map[string]string{}, nil
- }
- return nil, fmt.Errorf("while opening file: %w", err)
- }
- defer etagsFile.Close()
- var etags map[string]string
- err = bare.UnmarshalReader(etagsFile, &etags)
- if err != nil {
- return nil, fmt.Errorf("while unmarshalling: %w", err)
- }
- return etags, nil
- }
- func saveEtags(cfg config.Config, etags map[string]string) error {
- etagsFilename := filepath.Join(cfg.FeedsPath, "etags.bare")
- etagsFile, err := os.OpenFile(etagsFilename, os.O_RDWR|os.O_CREATE, 0644)
- if err != nil {
- return fmt.Errorf("while opening: %w", err)
- }
- defer etagsFile.Close()
- bytes, err := bare.Marshal(&etags)
- if err != nil {
- return fmt.Errorf("while marshalling: %w", err)
- }
- _, err = etagsFile.Write(bytes)
- if err != nil {
- return fmt.Errorf("while writing: %w", err)
- }
- return nil
- }
- // converting functions
- func createTmpPath(input ...interface{}) (interface{}, error) {
- args := input[0].(result)
- p := filepath.Join(args.tmpPath, args.feedName)
- err := os.MkdirAll(p, 0755)
- args.tmpFeedPath = p
- return gott.Tuple{args}, err
- }
- func createFeedHome(input ...interface{}) (interface{}, error) {
- args := input[0].(result)
- p := filepath.Join(args.config.FeedsPath, args.feedName)
- err := os.MkdirAll(p, 0755)
- args.homeFeedPath = p
- return gott.Tuple{args}, err
- }
- func listDownloadedVersions(input ...interface{}) (interface{}, error) {
- args := input[0].(result)
- v, err := ListVersionsTimezone(args.config, args.feed, args.feed.getTimezone())
- args.downloadedVersions = v
- return gott.Tuple{args}, err
- }
- func getAllVersions(input ...interface{}) (interface{}, error) {
- args := input[0].(result)
- v, err := args.feed.GetVersions(time.Now().In(args.location), args.location)
- args.allVersions = v
- return gott.Tuple{args}, err
- }
- func findValidVersions(input ...interface{}) interface{} {
- args := input[0].(result)
- now := time.Now().In(args.location)
- validVersions := FindValidVersions(args.allVersions, now)
- downloadedVersions := map[string]struct{}{}
- for _, downloadedVersion := range args.downloadedVersions {
- downloadedVersions[downloadedVersion.String()] = struct{}{}
- }
- missingVersions := []Version{}
- for _, version := range validVersions {
- if _, ok := downloadedVersions[version.String()]; !ok {
- missingVersions = append(missingVersions, version)
- }
- }
- // log.Println("all", args.allVersions)
- // log.Println("valid", validVersions)
- // log.Println("downloaded", downloadedVersions)
- // log.Println("missing", missingVersions)
- args.missingVersions = missingVersions
- return gott.Tuple{args}
- }
- func getGtfsFiles(input ...interface{}) (interface{}, error) {
- args := input[0].(result)
- names := []string{}
- for i, version := range args.missingVersions {
- name := fmt.Sprintf("%s_%d.zip", version.String(), i)
- zipPath := filepath.Join(args.tmpFeedPath, name)
- url := version.Link
- request, err := http.NewRequest("GET", url, nil)
- if err != nil {
- return gott.Tuple{args}, fmt.Errorf("while creating request for %s: %w", url, err)
- }
- request.Header.Add("If-None-Match", args.etags[url])
- client := http.Client{} // todo timeout
- response, err := client.Do(request)
- if err != nil {
- return gott.Tuple{args}, fmt.Errorf("while downloading gtfs %s %w", name, err)
- }
- if response.StatusCode != http.StatusOK && response.StatusCode != http.StatusNotModified {
- return gott.Tuple{args}, fmt.Errorf("wrong response code %d for %s: %w", response.StatusCode, url, err)
- }
- if response.StatusCode == 200 {
- args.newEtags[url] = response.Header.Get("etag")
- } else {
- args.newEtags[url] = args.etags[url]
- continue
- }
- file, err := os.Create(zipPath)
- if err != nil {
- return gott.Tuple{args}, fmt.Errorf("while creating zip for %s %w", name, err)
- }
- defer file.Close()
- _, err = io.Copy(file, response.Body)
- if err != nil {
- return gott.Tuple{args}, fmt.Errorf("while copying gtfs %s %w", name, err)
- }
- names = append(names, name)
- }
- args.gtfsFilenames = names
- return gott.Tuple{args}, nil
- }
- func unzipGtfs(c feedConverter) error {
- return file.UnzipGtfs(c.TmpFeedPath, c.GtfsFilename)
- }
- func convertVehicles(c feedConverter) error { // ( -- >> vehicles.bare)
- result, err := os.Create(filepath.Join(c.TmpFeedPath, "vehicles.bare"))
- if err != nil {
- return fmt.Errorf("while creating file: %w", err)
- }
- defer result.Close()
- vehicles, err := c.Feed.ConvertVehicles()
- for _, vehicle := range vehicles {
- bytes, err := bare.Marshal(&vehicle)
- if err != nil {
- return fmt.Errorf("while marshalling: %w", err)
- }
- _, err = result.Write(bytes)
- if err != nil {
- return fmt.Errorf("while writing to file: %w", err)
- }
- }
- return nil
- }
- func prepareFeedGtfs(c feedConverter) error {
- return c.Feed.FeedPrepareZip(c.TmpFeedPath)
- }
- func createTrafficCalendarFile(c feedConverter) (feedConverter, error) {
- path := c.TmpFeedPath
- var err error
- c.TrafficCalendarFile, err = os.Create(filepath.Join(path, "calendar.bare"))
- return c, err
- }
- func recoverCalendar(c feedConverter, e error) (feedConverter, error) {
- var pathError *os.PathError
- if errors.As(e, &pathError) && errors.Is(pathError, fs.ErrNotExist) {
- return c, nil
- }
- return c, e
- }
- func convertCalendar(c feedConverter) (feedConverter, error) { // ( feedInfo -- schedules >> )
- c.schedules = map[string]Schedule{}
- path := c.TmpFeedPath
- calendarFile, err := os.Open(filepath.Join(path, "calendar.txt"))
- if err != nil {
- return c, fmt.Errorf("while opening file: %w", err)
- }
- defer calendarFile.Close()
- r := csv.NewReader(bufio.NewReader(calendarFile))
- header, err := r.Read()
- if err != nil {
- return c, fmt.Errorf("while reading header: %w", err)
- }
- fields := map[string]int{}
- for i, headerField := range header {
- fields[headerField] = i
- }
- for {
- schedule := Schedule{}
- record, err := r.Read()
- if err == io.EOF {
- break
- }
- if err != nil {
- return c, fmt.Errorf("while reading a record: %w", err)
- }
- schedule.Id = record[fields["service_id"]]
- startDate := record[fields["start_date"]]
- endDate := record[fields["end_date"]]
- schedule.DateRanges = []DateRange{
- DateRange{
- Start: startDate[:4] + "-" + startDate[4:6] + "-" + startDate[6:],
- End: endDate[:4] + "-" + endDate[4:6] + "-" + endDate[6:],
- },
- }
- if record[fields["monday"]] == "1" {
- schedule.DateRanges[0].Weekdays |= (1 << 1)
- }
- if record[fields["tuesday"]] == "1" {
- schedule.DateRanges[0].Weekdays |= (1 << 2)
- }
- if record[fields["wednesday"]] == "1" {
- schedule.DateRanges[0].Weekdays |= (1 << 3)
- }
- if record[fields["thursday"]] == "1" {
- schedule.DateRanges[0].Weekdays |= (1 << 4)
- }
- if record[fields["friday"]] == "1" {
- schedule.DateRanges[0].Weekdays |= (1 << 5)
- }
- if record[fields["saturday"]] == "1" {
- schedule.DateRanges[0].Weekdays |= (1 << 6)
- }
- if record[fields["sunday"]] == "1" {
- schedule.DateRanges[0].Weekdays |= (1 << 0)
- schedule.DateRanges[0].Weekdays |= (1 << 7)
- }
- c.schedules[schedule.Id] = schedule
- scheduleStart, err := time.ParseInLocation(DateFormat, schedule.DateRanges[0].Start, c.Timezone)
- if err != nil {
- c.ValidFromError = append(c.ValidFromError, err)
- }
- if err == nil && (c.ValidFrom.IsZero() || scheduleStart.Before(c.ValidFrom)) {
- c.ValidFrom = scheduleStart
- c.feedInfo.ValidSince = scheduleStart.Format(ValidityFormat)
- }
- scheduleEnd, err := time.ParseInLocation(DateFormat, schedule.DateRanges[0].End, c.Timezone)
- if err != nil {
- c.ValidTillError = append(c.ValidTillError, err)
- }
- if err == nil && (c.ValidTill.IsZero() || scheduleEnd.After(c.ValidTill)) {
- c.ValidTill = scheduleEnd
- c.feedInfo.ValidTill = scheduleEnd.Format(ValidityFormat)
- }
- }
- return c, nil
- }
- func convertCalendarDates(c feedConverter) (feedConverter, error) { // ( feedInfo -- schedules >> )
- path := c.TmpFeedPath
- datesFile, err := os.Open(filepath.Join(path, "calendar_dates.txt"))
- if err != nil {
- return c, fmt.Errorf("while opening file: %w", err)
- }
- defer datesFile.Close()
- r := csv.NewReader(bufio.NewReader(datesFile))
- header, err := r.Read()
- if err != nil {
- return c, fmt.Errorf("while reading header: %w", err)
- }
- fields := map[string]int{}
- for i, headerField := range header {
- fields[headerField] = i
- }
- for {
- record, err := r.Read()
- if err == io.EOF {
- break
- }
- if err != nil {
- return c, fmt.Errorf("while reading a record: %w", err)
- }
- if record[fields["exception_type"]] == "1" {
- id := record[fields["service_id"]]
- schedule := c.schedules[id]
- date := record[fields["date"]]
- dateRange := DateRange{
- Start: date[:4] + "-" + date[4:6] + "-" + date[6:],
- End: date[:4] + "-" + date[4:6] + "-" + date[6:],
- Weekdays: 0xff,
- }
- if len(schedule.DateRanges) == 0 {
- schedule.Id = id
- schedule.DateRanges = []DateRange{dateRange}
- } else {
- schedule.DateRanges = append(schedule.DateRanges, dateRange)
- sort.Slice(schedule.DateRanges, func(i, j int) bool {
- return schedule.DateRanges[i].Start < schedule.DateRanges[j].Start
- })
- }
- c.schedules[schedule.Id] = schedule
- } else {
- date := record[fields["date"]]
- formatedDate := date[:4] + "-" + date[4:6] + "-" + date[6:]
- scheduleToEdit := c.schedules[record[fields["service_id"]]]
- newDateRanges := []DateRange{}
- for i := 0; i < len(scheduleToEdit.DateRanges); i++ {
- dateRange := scheduleToEdit.DateRanges[i]
- if dateRange.Start == formatedDate {
- d, _ := time.ParseInLocation(DateFormat, dateRange.Start, c.Timezone)
- dateRange.Start = d.AddDate(0, 0, 1).Format(DateFormat)
- if dateRange.Start <= dateRange.End {
- newDateRanges = append(newDateRanges, dateRange)
- }
- continue
- }
- if dateRange.Start < formatedDate && formatedDate < dateRange.End {
- d, _ := time.ParseInLocation(DateFormat, formatedDate, c.Timezone)
- range1 := DateRange{dateRange.Start, d.AddDate(0, 0, -1).Format(DateFormat), dateRange.Weekdays}
- range2 := DateRange{d.AddDate(0, 0, 1).Format(DateFormat), dateRange.End, dateRange.Weekdays}
- newDateRanges = append(newDateRanges, range1)
- newDateRanges = append(newDateRanges, range2)
- continue
- }
- if formatedDate == dateRange.End {
- d, _ := time.ParseInLocation(DateFormat, dateRange.End, c.Timezone)
- dateRange.End = d.AddDate(0, 0, -1).Format(DateFormat)
- newDateRanges = append(newDateRanges, dateRange)
- continue
- }
- newDateRanges = append(newDateRanges, dateRange)
- }
- scheduleToEdit.DateRanges = newDateRanges
- c.schedules[record[fields["service_id"]]] = scheduleToEdit
- }
- }
- for _, schedule := range c.schedules {
- lastDateRange := len(schedule.DateRanges) - 1
- scheduleStart, err := time.ParseInLocation(DateFormat, schedule.DateRanges[0].Start, c.Timezone)
- if err != nil {
- c.ValidFromError = append(c.ValidFromError, err)
- }
- if err == nil && (c.ValidFrom.IsZero() || scheduleStart.Before(c.ValidFrom)) {
- c.ValidFrom = scheduleStart
- c.feedInfo.ValidSince = scheduleStart.Format(ValidityFormat)
- }
- scheduleEnd, err := time.ParseInLocation(DateFormat, schedule.DateRanges[lastDateRange].End, c.Timezone)
- if err != nil {
- c.ValidTillError = append(c.ValidTillError, err)
- }
- if err == nil && (c.ValidTill.IsZero() || scheduleEnd.After(c.ValidTill)) {
- c.ValidTill = scheduleEnd
- c.feedInfo.ValidTill = scheduleEnd.Format(ValidityFormat)
- }
- }
- return c, nil
- }
- func checkAnyCalendarConverted(c feedConverter) error {
- if len(c.schedules) == 0 {
- return fmt.Errorf("no calendar converted")
- }
- return nil
- }
- func saveSchedules(c feedConverter) error {
- resultFile := c.TrafficCalendarFile
- schedulesArray := make([]Schedule, len(c.schedules))
- i := 0
- for _, schedule := range c.schedules {
- schedulesArray[i] = schedule
- i++
- }
- sort.Slice(schedulesArray, func(i, j int) bool {
- return schedulesArray[i].DateRanges[0].Start < schedulesArray[j].DateRanges[0].Start
- })
- for _, schedule := range schedulesArray {
- bytes, err := bare.Marshal(&schedule)
- if err != nil {
- return fmt.Errorf("while marshalling: %w", err)
- }
- _, err = resultFile.Write(bytes)
- if err != nil {
- return fmt.Errorf("while writing: %w", err)
- }
- }
- c.schedules = map[string]Schedule{}
- return nil
- }
- func saveFeedInfo(c feedConverter) error {
- path := c.TmpFeedPath
- result, err := os.Create(filepath.Join(path, "feed_info.bare"))
- if err != nil {
- return fmt.Errorf("while creating file: %w", err)
- }
- defer result.Close()
- bytes, err := bare.Marshal(&c.feedInfo)
- if err != nil {
- return fmt.Errorf("while marshalling: %w", err)
- }
- _, err = result.Write(bytes)
- if err != nil {
- return fmt.Errorf("while writing: %w", err)
- }
- log.Printf("timetable is valid: %s to %s\n", c.feedInfo.ValidSince, c.feedInfo.ValidTill)
- c.feedInfo = FeedInfo{}
- return nil
- }
- func closeTrafficCalendarFile(c feedConverter, e error) (feedConverter, error) {
- if c.TrafficCalendarFile != nil {
- c.TrafficCalendarFile.Close()
- }
- return c, e
- }
- func clearDepartures(c feedConverter) feedConverter {
- c.Departures = map[string][]Departure{}
- return c
- }
- func convertDepartures(c feedConverter) (feedConverter, error) { // O(n:stop_times) ; ( -- departures:map[tripID][]departure, tripsThroughStop:map[stopID][]{tripID,order}, tripHeadsigns:map[tripID]stopID >> )
- path := c.TmpFeedPath
- file, err := os.Open(filepath.Join(path, "stop_times.txt"))
- if err != nil {
- return c, fmt.Errorf("while opening file: %w", err)
- }
- defer file.Close()
- departures := map[string][]Departure{}
- r := csv.NewReader(bufio.NewReader(file))
- header, err := r.Read()
- if err != nil {
- return c, fmt.Errorf("while reading header: %w", err)
- }
- fields := map[string]int{}
- for i, headerField := range header {
- fields[headerField] = i
- }
- tripsThroughStop := map[string]map[string]StopOrder{}
- tripHeadsigns := map[string]string{}
- for {
- departure := Departure{}
- record, err := r.Read()
- if err == io.EOF {
- break
- }
- if err != nil {
- return c, fmt.Errorf("while reading a record: %w", err)
- }
- stopID := record[fields["stop_id"]]
- tripID := record[fields["trip_id"]]
- fmt.Sscanf(record[fields["stop_sequence"]], "%d", &departure.StopSequence)
- fmt.Sscanf(record[fields["pickup_type"]], "%d", &departure.Pickup)
- fmt.Sscanf(record[fields["drop_off_type"]], "%d", &departure.Dropoff)
- if _, ok := tripsThroughStop[stopID]; !ok {
- tripsThroughStop[stopID] = map[string]StopOrder{}
- }
- tripsThroughStop[stopID][tripID] = StopOrder{
- Sequence: departure.StopSequence,
- }
- if c.Feed.Flags().Headsign == HeadsignTripLastStop {
- tripHeadsigns[tripID] = stopID
- }
- var hours, minutes uint
- fmt.Sscanf(record[fields["arrival_time"]], "%d:%d", &hours, &minutes)
- departure.Time = hours*60 + minutes
- departures[tripID] = append(departures[tripID], departure)
- }
- c.tripHeadsigns = tripHeadsigns
- c.Departures = departures
- c.TripsThroughStop = tripsThroughStop
- return c, nil
- }
- func clearLineNames(c feedConverter) feedConverter {
- c.LineNames = map[string]string{}
- return c
- }
- func getLineNames(c feedConverter) (feedConverter, error) { // O(n:routes) ; ( -- lineNames:map[routeID]lineName >> )
- path := c.TmpFeedPath
- file, err := os.Open(filepath.Join(path, "routes.txt"))
- if err != nil {
- return c, fmt.Errorf("while opening file: %w", err)
- }
- defer file.Close()
- r := csv.NewReader(bufio.NewReader(file))
- header, err := r.Read()
- if err != nil {
- return c, fmt.Errorf("while reading header: %w", err)
- }
- fields := map[string]int{}
- for i, headerField := range header {
- fields[headerField] = i
- }
- names := map[string]string{}
- for {
- record, err := r.Read()
- if err == io.EOF {
- break
- }
- if err != nil {
- return c, fmt.Errorf("while reading a record: %w", err)
- }
- routeID := record[fields["route_id"]]
- lineName := c.Feed.Flags().LineName
- for _, template := range []string{"route_short_name", "route_long_name"} {
- lineName = strings.Replace(lineName, "{{"+template+"}}", record[fields[template]], -1)
- }
- names[routeID] = lineName
- }
- c.LineNames = names
- return c, nil
- }
- func clearStopNames(c feedConverter) feedConverter {
- c.stopNames = map[string]string{}
- return c
- }
- func getStopNames(c feedConverter) (feedConverter, error) { // O(n:stops) ; ( -- stopNames[stopID]stopName >> )
- if c.Feed.Flags().Headsign != HeadsignTripLastStop {
- return c, nil
- }
- stopNames := map[string]string{}
- path := c.TmpFeedPath
- file, err := os.Open(filepath.Join(path, "stops.txt"))
- if err != nil {
- return c, fmt.Errorf("while opening file: %w", err)
- }
- defer file.Close()
- r := csv.NewReader(bufio.NewReader(file))
- header, err := r.Read()
- if err != nil {
- return c, fmt.Errorf("while reading header: %w", err)
- }
- fields := map[string]int{}
- for i, headerField := range header {
- fields[headerField] = i
- }
- for {
- record, err := r.Read()
- if err == io.EOF {
- break
- }
- if err != nil {
- return c, fmt.Errorf("while reading a record: %w", err)
- }
- stopID := record[fields["stop_id"]]
- stopName := record[fields["stop_name"]]
- stopNames[stopID] = stopName
- }
- c.stopNames = stopNames
- return c, nil
- }
- func clearTripsChangeOptions(c feedConverter) feedConverter {
- c.TripChangeOpts = map[string]ChangeOption{}
- return c
- }
- func clearTripsThroughStops(c feedConverter) feedConverter {
- c.TripsThroughStop = map[string]map[string]StopOrder{}
- return c
- }
- func convertTrips(c feedConverter) (feedConverter, error) { // O(n:trips) ; (departures, lineNames, stopNames -- tripsOffsets:map[tripID]offset, tripsChangeOpts:map[tripID]{lineID,headsign} >> trips)
- path := c.TmpFeedPath
- departures := c.Departures
- lineNames := c.LineNames
- file, err := os.Open(filepath.Join(path, "trips.txt"))
- if err != nil {
- return c, fmt.Errorf("while opening file: %w", err)
- }
- defer file.Close()
- result, err := os.Create(filepath.Join(path, "trips.bare"))
- if err != nil {
- return c, fmt.Errorf("while creating file: %w", err)
- }
- defer result.Close()
- r := csv.NewReader(bufio.NewReader(file))
- header, err := r.Read()
- if err != nil {
- return c, fmt.Errorf("while reading header: %w", err)
- }
- fields := map[string]int{}
- for i, headerField := range header {
- fields[headerField] = i
- }
- var offset uint = 0
- tripsOffsets := map[string]uint{}
- tripChangeOpts := map[string]ChangeOption{}
- for {
- trip := Trip{}
- record, err := r.Read()
- if err == io.EOF {
- break
- }
- if err != nil {
- return c, fmt.Errorf("while reading a record: %w", err)
- }
- trip.Id = record[fields["trip_id"]]
- switch c.Feed.Flags().Headsign {
- case HeadsignTripHeadsing:
- trip.Headsign = record[fields["trip_headsign"]]
- case HeadsignTripLastStop:
- trip.Headsign = c.stopNames[c.tripHeadsigns[trip.Id]]
- }
- trip.Departures = departures[trip.Id]
- trip.ScheduleID = record[fields["service_id"]]
- trip.LineID = record[fields["route_id"]]
- fmt.Sscanf(record[fields["direction_id"]], "%d", &trip.Direction)
- tripChangeOpts[trip.Id] = ChangeOption{
- LineName: lineNames[record[fields["route_id"]]],
- Headsign: translateFieldDefault(trip.Headsign, c.feedInfo.Language, c.defaultLanguage, c.translations),
- TranslatedHeadsigns: translateField(trip.Headsign, c.feedInfo.Language, c.defaultLanguage, c.translations),
- }
- bytes, err := bare.Marshal(&trip)
- if err != nil {
- return c, fmt.Errorf("while marshalling: %w", err)
- }
- b, err := result.Write(bytes)
- if err != nil {
- return c, fmt.Errorf("while writing: %w", err)
- }
- tripsOffsets[trip.Id] = offset
- offset += uint(b)
- }
- c.TripsOffsets = tripsOffsets
- c.TripChangeOpts = tripChangeOpts
- return c, nil
- }
- func clearStops(c feedConverter) feedConverter {
- c.Stops = map[string]string{}
- return c
- }
- func convertStops(c feedConverter) (feedConverter, error) { // O(n:stops) ; (translations, tripsThroughStop, tripChangeOpts, tripOffsets -- stopsOffsetsByCode:CodeIndex, stopsOffsetsByName:map[name][]offsets >> stops)
- path := c.TmpFeedPath
- tripsThroughStop := c.TripsThroughStop
- tripChangeOpts := c.TripChangeOpts
- tripsOffsets := c.TripsOffsets
- file, err := os.Open(filepath.Join(path, "stops.txt"))
- if err != nil {
- return c, fmt.Errorf("while opening file: %w", err)
- }
- defer file.Close()
- result, err := os.Create(filepath.Join(path, "stops.bare"))
- if err != nil {
- return c, fmt.Errorf("while creating file: %w", err)
- }
- defer result.Close()
- r := csv.NewReader(bufio.NewReader(file))
- header, err := r.Read()
- if err != nil {
- return c, fmt.Errorf("while reading header: %w", err)
- }
- fields := map[string]int{}
- for i, headerField := range header {
- fields[headerField] = i
- }
- var offset uint = 0
- stopsOffsetsByName := map[string][]uint{}
- stopsOffsetsByCode := CodeIndex{}
- stops := map[string]string{}
- maxStopTripsLength := 0
- for {
- stop := Stop{}
- record, err := r.Read()
- if err == io.EOF {
- break
- }
- if err != nil {
- return c, fmt.Errorf("while reading a record: %w", err)
- }
- if f, ok := fields["location_type"]; ok && record[f] != "" && record[f] != "0" {
- // NOTE for now ignore everything that’s not a stop/platform
- // TODO use Portals (location_type == 2) to show on map if platform has a parent (location_type == 1) that has a Portal
- // TODO use location_type in {3,4} for routing inside stations (with pathways, transfers, and levels)
- continue
- }
- stopID := record[fields["stop_id"]]
- stopTrips := tripsThroughStop[stopID]
- stopTripsLength := len(stopTrips)
- if maxStopTripsLength < stopTripsLength {
- maxStopTripsLength = stopTripsLength
- }
- stop.Id = stopID
- templates := []string{"stop_code", "stop_id", "stop_name", "platform_code"}
- stop.Code = c.Feed.Flags().StopIdFormat
- for _, template := range templates {
- stop.Code = strings.Replace(stop.Code, "{{"+template+"}}", record[fields[template]], -1)
- }
- stop.Name = c.Feed.Flags().StopName
- for _, template := range templates {
- // TODO if '{{template}}' is empty
- stop.Name = strings.Replace(stop.Name, "{{"+template+"}}", record[fields[template]], -1)
- }
- if field, ok := fields["zone_id"]; ok {
- stop.Zone = record[field]
- }
- stop.NodeName = record[fields["stop_name"]]
- stops[record[fields["stop_id"]]] = stop.Code
- if field, ok := fields["stop_timezone"]; ok {
- stop.Timezone = record[field]
- }
- if c.feedInfo.Language == "mul" {
- key := record[fields["stop_name"]]
- if _, ok := c.translations[stop.NodeName][c.defaultLanguage]; !ok {
- stop.TranslatedNames = []Translation{{Language: c.defaultLanguage, Value: stop.Name}}
- stop.TranslatedNodeNames = []Translation{{Language: c.defaultLanguage, Value: stop.NodeName}}
- } else {
- stop.TranslatedNames = []Translation{{Language: c.defaultLanguage, Value: strings.ReplaceAll(stop.Name, key, c.translations[key][c.defaultLanguage])}}
- stop.TranslatedNodeNames = []Translation{{Language: c.defaultLanguage, Value: c.translations[key][c.defaultLanguage]}}
- }
- for language, value := range c.translations[key] {
- if language == c.defaultLanguage {
- continue
- }
- stop.TranslatedNames = append(stop.TranslatedNames, Translation{Language: c.defaultLanguage, Value: strings.ReplaceAll(stop.Name, key, value)})
- stop.TranslatedNodeNames = append(stop.TranslatedNodeNames, Translation{Language: c.defaultLanguage, Value: c.translations[key][value]})
- }
- }
- var lat, lon float64
- fmt.Sscanf(record[fields["stop_lat"]], "%f", &lat)
- fmt.Sscanf(record[fields["stop_lon"]], "%f", &lon)
- stop.Position = Position{lat, lon}
- changeOptionMap := map[string]ChangeOption{}
- stop.ChangeOptions = []ChangeOption{}
- stop.Order = map[string]StopOrder{}
- for tripID, stopTrip := range stopTrips {
- changeOption := tripChangeOpts[tripID]
- stopOrder := StopOrder{
- TripOffset: tripsOffsets[tripID],
- Sequence: stopTrip.Sequence,
- }
- stop.Order[tripID] = stopOrder
- changeOptionMap[changeOption.LineName+"->"+changeOption.Headsign] = changeOption
- }
- for _, option := range changeOptionMap {
- stop.ChangeOptions = append(stop.ChangeOptions, option)
- }
- sort.Slice(stop.ChangeOptions, func(i, j int) bool {
- var num1, num2 int
- _, err1 := fmt.Sscanf(stop.ChangeOptions[i].LineName, "%d", &num1)
- _, err2 := fmt.Sscanf(stop.ChangeOptions[j].LineName, "%d", &num2)
- if err1 != nil && err2 != nil {
- return stop.ChangeOptions[i].LineName < stop.ChangeOptions[j].LineName
- } else if err1 != nil {
- return false
- } else if err2 != nil {
- return true
- } else {
- return num1 < num2
- }
- })
- bytes, err := bare.Marshal(&stop)
- if err != nil {
- return c, fmt.Errorf("while marshalling: %w", err)
- }
- b, err := result.Write(bytes)
- if err != nil {
- return c, fmt.Errorf("while writing: %w", err)
- }
- if len(stop.TranslatedNames) == 0 {
- stopsOffsetsByName[stop.Name] = append(stopsOffsetsByName[stop.Name], offset)
- }
- for _, v := range stop.TranslatedNames {
- stopsOffsetsByName[v.Value] = append(stopsOffsetsByName[v.Value], offset)
- }
- stopsOffsetsByCode[stop.Code] = offset
- offset += uint(b)
- }
- if maxStopTripsLength > 8192 {
- log.Printf("maximum length of StopOrder is %d, more than 8192, which may need to be tweaked", maxStopTripsLength)
- }
- c.StopsCodeIndex = stopsOffsetsByCode
- c.StopsNameIndex = stopsOffsetsByName
- c.Stops = stops
- return c, nil
- }
- func clearTripOffsets(c feedConverter) feedConverter {
- c.TripsOffsets = map[string]uint{}
- return c
- }
- func clearLineGraphs(c feedConverter) feedConverter {
- c.LineGraphs = map[string]map[uint]LineGraph{}
- return c
- }
- func clearLineHeadsigns(c feedConverter) feedConverter {
- c.lineHeadsigns = map[string]map[uint][]string{}
- return c
- }
- func getTrips(c feedConverter) (feedConverter, error) {
- file, err := os.Open(filepath.Join(c.TmpFeedPath, "trips.bare"))
- if err != nil {
- return c, fmt.Errorf("while opening trips: %w", err)
- }
- trips := map[string]Trip{}
- for {
- var trip Trip
- err := bare.UnmarshalReader(file, &trip)
- trip.Departures = []Departure{}
- trips[trip.Id] = trip
- if err != nil {
- if err == io.EOF {
- break
- } else {
- return c, fmt.Errorf("while unmarshaling: %w", err)
- }
- }
- }
- c.trips = trips
- return c, nil
- }
- func convertLineGraphs(c feedConverter) (feedConverter, error) { // O(n:stop_times) ; (trips, stops -- lineGrapsh:map[lineID]map[direction]graph, lineHeadsigns:map[lineID]map[direction][]headsigns >> )
- path := c.TmpFeedPath
- trips := c.trips
- stops := c.Stops
- // lineID dire headsi
- lineHeadsignsMap := map[string]map[uint]map[string]struct{}{}
- // lineID dire headsi
- lineHeadsigns := map[string]map[uint][]string{}
- // lineNa dire
- graphs := map[string]map[uint]_LineGraph{}
- file, err := os.Open(filepath.Join(path, "stop_times.txt"))
- if err != nil {
- return c, fmt.Errorf("while opening stop_times: %w", err)
- }
- defer file.Close()
- r := csv.NewReader(bufio.NewReader(file))
- header, err := r.Read()
- if err != nil {
- return c, fmt.Errorf("while reading header: %w", err)
- }
- fields := map[string]int{}
- for i, headerField := range header {
- fields[headerField] = i
- }
- previousTripID := ""
- previous := -1
- previousTrip := Trip{}
- for {
- record, err := r.Read()
- if err == io.EOF {
- break
- }
- if err != nil {
- return c, fmt.Errorf("while reading a record: %w", err)
- }
- tripID := record[fields["trip_id"]]
- stop := stops[record[fields["stop_id"]]]
- trip := trips[tripID]
- if _, ok := lineHeadsignsMap[trip.LineID]; !ok {
- lineHeadsignsMap[trip.LineID] = map[uint]map[string]struct{}{}
- lineHeadsigns[trip.LineID] = map[uint][]string{}
- }
- if _, ok := lineHeadsignsMap[trip.LineID][trip.Direction.Value()]; !ok {
- lineHeadsignsMap[trip.LineID][trip.Direction.Value()] = map[string]struct{}{}
- lineHeadsigns[trip.LineID][trip.Direction.Value()] = []string{}
- }
- lineHeadsignsMap[trip.LineID][trip.Direction.Value()][trip.Headsign] = struct{}{}
- if _, ok := graphs[trip.LineID]; !ok {
- graphs[trip.LineID] = map[uint]_LineGraph{}
- }
- if previousTripID != tripID && previousTripID != "" {
- // last of previous trip
- graph := graphs[previousTrip.LineID][previousTrip.Direction.Value()]
- if graph.NextNodes == nil {
- graph.NextNodes = map[int]map[int]struct{}{}
- }
- if graph.NextNodes[previous] == nil {
- graph.NextNodes[previous] = map[int]struct{}{}
- }
- graphs[previousTrip.LineID][previousTrip.Direction.Value()] = graph
- graphs[previousTrip.LineID][previousTrip.Direction.Value()].NextNodes[previous][-1] = struct{}{}
- }
- graph := graphs[trip.LineID][trip.Direction.Value()]
- if graph.StopCodes == nil {
- graph.StopCodes = map[string]int{}
- }
- if graph.NextNodes == nil {
- graph.NextNodes = map[int]map[int]struct{}{}
- }
- current := -1
- current, ok := graph.StopCodes[stop]
- if !ok {
- current = len(graph.StopCodesArray)
- graph.StopCodesArray = append(graph.StopCodesArray, stop)
- graph.StopCodes[stop] = current
- }
- if previousTripID != tripID {
- // first of current trip
- if graph.NextNodes[-1] == nil {
- graph.NextNodes[-1] = map[int]struct{}{}
- }
- if _, ok := graph.NextNodes[-1][current]; !ok {
- graph.NextNodes[-1][current] = struct{}{}
- }
- } else {
- // second <- first to last <- penultimate of current trip
- if graph.NextNodes[previous] == nil {
- graph.NextNodes[previous] = map[int]struct{}{}
- }
- if _, ok := graph.NextNodes[previous][current]; !ok {
- graph.NextNodes[previous][current] = struct{}{}
- }
- }
- previous = current
- previousTripID = tripID
- previousTrip = trip
- graphs[trip.LineID][trip.Direction.Value()] = graph
- }
- g := graphs[previousTrip.LineID][previousTrip.Direction.Value()]
- if g.NextNodes[previous] == nil {
- g.NextNodes[previous] = map[int]struct{}{}
- }
- if _, ok := g.NextNodes[previous][-1]; !ok {
- g.NextNodes[previous][-1] = struct{}{}
- }
- for lineID, directions := range lineHeadsignsMap {
- for direction, headsigns := range directions {
- for headsign := range headsigns {
- lineHeadsigns[lineID][direction] = append(lineHeadsigns[lineID][direction], headsign)
- }
- }
- }
- c.LineGraphs = map[string]map[uint]LineGraph{}
- for lineID, graphByDirection := range graphs {
- c.LineGraphs[lineID] = map[uint]LineGraph{}
- for direction, graph := range graphByDirection {
- c.LineGraphs[lineID][direction] = LineGraph{
- StopCodes: graph.StopCodesArray,
- NextNodes: map[int][]int{},
- }
- for from, tos := range graph.NextNodes {
- for to := range tos {
- c.LineGraphs[lineID][direction].NextNodes[from] = append(c.LineGraphs[lineID][direction].NextNodes[from], to)
- }
- }
- }
- }
- c.lineHeadsigns = lineHeadsigns
- return c, nil
- }
- func convertLines(c feedConverter) (feedConverter, error) { // O(n:routes) ; (lineGraphs, lineHeadsigns -- lineIndex:map[lineName][]offsets, lineIdIndex:CodeIndex >> lines)
- path := c.TmpFeedPath
- feed := c.Feed
- file, err := os.Open(filepath.Join(path, "routes.txt"))
- if err != nil {
- return c, fmt.Errorf("while opening file: %w", err)
- }
- defer file.Close()
- result, err := os.Create(filepath.Join(path, "lines.bare"))
- if err != nil {
- return c, fmt.Errorf("while creating file: %w", err)
- }
- defer result.Close()
- r := csv.NewReader(bufio.NewReader(file))
- header, err := r.Read()
- if err != nil {
- return c, fmt.Errorf("while reading header: %w", err)
- }
- fields := map[string]int{}
- for i, headerField := range header {
- fields[headerField] = i
- }
- var offset uint = 0
- index := map[string][]uint{}
- idIndex := CodeIndex{}
- for {
- record, err := r.Read()
- if err == io.EOF {
- break
- }
- if err != nil {
- return c, fmt.Errorf("while reading a record: %w", err)
- }
- routeID := record[fields["route_id"]]
- lineName := c.Feed.Flags().LineName
- for _, template := range []string{"route_short_name", "route_long_name"} {
- lineName = strings.Replace(lineName, "{{"+template+"}}", record[fields[template]], -1)
- }
- var kind uint
- fmt.Sscanf(record[fields["route_type"]], "%d", &kind)
- colour := "ffffff"
- if colourIx, ok := fields["route_color"]; ok && record[colourIx] != "" {
- colour = record[colourIx]
- }
- directions := []uint{}
- for direction := range c.lineHeadsigns[routeID] {
- directions = append(directions, direction)
- }
- sort.Slice(directions, func(i, j int) bool {
- return directions[i] < directions[j]
- })
- headsigns := [][]string{}
- translatedHeadsigns := [][][]Translation{}
- for _, direction := range directions {
- dirHeadsigns := c.lineHeadsigns[routeID][direction]
- headsigns = append(headsigns, dirHeadsigns)
- translatedHeadsign := [][]Translation{}
- for _, headsign := range dirHeadsigns {
- translatedHeadsign = append(translatedHeadsign, translateField(headsign, c.feedInfo.Language, c.defaultLanguage, c.translations))
- }
- translatedHeadsigns = append(translatedHeadsigns, translatedHeadsign)
- }
- graphs := []LineGraph{}
- for _, direction := range directions {
- graphs = append(graphs, c.LineGraphs[routeID][direction])
- }
- line := Line{
- Id: routeID,
- Name: lineName,
- Colour: hex2colour(colour),
- Kind: LineType(kind),
- Graphs: graphs,
- Headsigns: headsigns,
- }
- if field, present := fields["agency_id"]; present {
- line.AgencyID = record[field]
- }
- bytes, err := bare.Marshal(&line)
- if err != nil {
- return c, fmt.Errorf("while marshalling: %w", err)
- }
- b, err := result.Write(bytes)
- if err != nil {
- return c, fmt.Errorf("while writing: %w", err)
- }
- cleanQuery, err := CleanQuery(line.Name, feed)
- if err != nil {
- return c, fmt.Errorf("while cleaning line name: %w", err)
- }
- index[cleanQuery] = append(index[cleanQuery], offset)
- idIndex[routeID] = offset
- offset += uint(b)
- }
- c.LineIdIndex = idIndex
- c.LineIndex = index
- return c, nil
- }
- func convertFeedInfo(c feedConverter) (feedConverter, error) { // O(1:feed_info) ; ( -- feed_info >> )
- path := c.TmpFeedPath
- feedInfo := FeedInfo{}
- file, err := os.Open(filepath.Join(path, "feed_info.txt"))
- if err != nil {
- if errors.Is(err, fs.ErrNotExist) {
- log.Println("[WARN] no feed_info.txt")
- file = nil
- } else {
- return c, fmt.Errorf("while opening file: %w", err)
- }
- }
- if file != nil {
- defer file.Close()
- r := csv.NewReader(bufio.NewReader(file))
- header, err := r.Read()
- if err != nil {
- return c, fmt.Errorf("while reading header: %w", err)
- }
- fields := map[string]int{}
- for i, headerField := range header {
- fields[headerField] = i
- }
- record, err := r.Read()
- if err != nil {
- return c, fmt.Errorf("while reading a record: %w", err)
- }
- feedInfo.Website = record[fields["feed_publisher_url"]]
- feedInfo.Language = record[fields["feed_lang"]]
- if defaultLanguageIndex, ok := fields["default_lang"]; ok {
- c.defaultLanguage = record[defaultLanguageIndex]
- }
- if ix, ok := fields["feed_start_date"]; ok {
- c.ValidFrom, err = time.ParseInLocation("20060102", record[ix], c.Timezone)
- if err != nil {
- c.ValidFromError = append(c.ValidFromError, err)
- }
- feedInfo.ValidSince = record[ix]
- }
- if ix, ok := fields["feed_end_date"]; ok {
- c.ValidTill, err = time.ParseInLocation("20060102", record[ix], c.Timezone)
- if err != nil {
- c.ValidTillError = append(c.ValidTillError, err)
- }
- feedInfo.ValidTill = record[ix]
- }
- }
- feedInfo.Timezone = c.Timezone.String()
- feedInfo.RealtimeFeeds = c.Feed.RealtimeFeeds()
- feedInfo.QrHost, feedInfo.QrLocation, feedInfo.QrSelector = c.Feed.QRInfo()
- feedInfo.Attributions, feedInfo.Descriptions, err = getAttrDesc(c.Feed.String(), c.feedTranslations)
- feedInfo.Name = c.Feed.Name()
- c.feedInfo = feedInfo
- return c, err
- }
- func convertLuaScripts(c feedConverter) error { // O(1) ; ( -- >> updates.lua, alerts.lua, vehicles.lua )
- filenames := []string{"updates", "vehicles", "alerts"}
- for _, filename := range filenames {
- t, err := template.ParseFS(luaScripts, "realtime_lua/"+c.Feed.String()+"_"+filename+".lua")
- if err != nil {
- if strings.Contains(err.Error(), "pattern matches no files") {
- log.Printf("%s.lua for this feed does not exist, ignoring\n", filename)
- continue
- }
- return fmt.Errorf("while parsing template %s: %w", filename, err)
- }
- path := c.TmpFeedPath
- writeFile, err := os.Create(filepath.Join(path, filename+".lua"))
- if err != nil {
- return fmt.Errorf("while creating %s: %w", filename, err)
- }
- defer writeFile.Close()
- err = t.Execute(writeFile, c.config.Auth[c.Feed.String()])
- if err != nil {
- return fmt.Errorf("while executing template %s: %w", filename, err)
- }
- }
- return nil
- }
- func getAttrDesc(feedID string, feedTranslations embed.FS) (map[string]string, map[string]string, error) {
- attributions := map[string]string{}
- descriptions := map[string]string{}
- dir, err := feedTranslations.ReadDir("translations")
- if err != nil {
- return attributions, descriptions, err
- }
- for _, f := range dir {
- translation := map[string]string{}
- name := f.Name()
- lang := strings.Split(name, ".")[1]
- fileContent, err := feedTranslations.ReadFile("translations/" + name)
- if err != nil {
- log.Printf("error reading translation %s\n", name)
- continue
- }
- yaml.Unmarshal(fileContent, &translation)
- attributions[lang] = translation[feedID+"_attribution"]
- descriptions[lang] = translation[feedID+"_description"]
- if lang == "en" {
- attributions["und"] = translation[feedID+"_attribution"]
- descriptions["und"] = translation[feedID+"_description"]
- }
- }
- return attributions, descriptions, nil
- }
- func readTranslations(c feedConverter) (feedConverter, error) { // O(n:translations) ; ( -- translations >>)
- path := c.TmpFeedPath
- file, err := os.Open(filepath.Join(path, "translations.txt"))
- if err != nil {
- return c, fmt.Errorf("while opening file: %w", err)
- }
- defer file.Close()
- translations := map[string]map[string]string{}
- r := csv.NewReader(bufio.NewReader(file))
- header, err := r.Read()
- if err != nil {
- return c, fmt.Errorf("while reading header: %w", err)
- }
- fields := map[string]int{}
- for i, headerField := range header {
- fields[headerField] = i
- }
- for {
- record, err := r.Read()
- if err == io.EOF {
- break
- }
- if err != nil {
- return c, fmt.Errorf("while reading a record: %w", err)
- }
- key := record[fields["field_value"]]
- language := record[fields["language"]]
- translation := record[fields["translation"]]
- if _, ok := translations[key]; !ok {
- translations[key] = map[string]string{}
- }
- translations[key][language] = translation
- }
- c.translations = translations
- return c, nil
- }
- func recoverTranslations(c feedConverter, e error) (feedConverter, error) {
- var pathError *os.PathError
- if errors.As(e, &pathError) && errors.Is(pathError, fs.ErrNotExist) {
- return c, nil
- }
- return c, e
- }
- func convertAgencies(c feedConverter) (feedConverter, error) { // O(n:agency) ; ( -- >> agencies)
- path := c.TmpFeedPath
- file, err := os.Open(filepath.Join(path, "agency.txt"))
- if err != nil {
- return c, fmt.Errorf("while opening file: %w", err)
- }
- defer file.Close()
- result, err := os.Create(filepath.Join(path, "agencies.bare"))
- if err != nil {
- return c, fmt.Errorf("while creating file: %w", err)
- }
- defer file.Close()
- r := csv.NewReader(bufio.NewReader(file))
- header, err := r.Read()
- if err != nil {
- return c, fmt.Errorf("while reading header: %w", err)
- }
- fields := map[string]int{}
- for i, headerField := range header {
- fields[headerField] = i
- }
- for {
- record, err := r.Read()
- if err == io.EOF {
- break
- }
- if err != nil {
- return c, fmt.Errorf("while reading a record: %w", err)
- }
- agency := Agency{
- Id: record[fields["agency_id"]],
- Name: record[fields["agency_name"]],
- TranslatedNames: translateField(record[fields["agency_name"]], c.feedInfo.Language, c.defaultLanguage, c.translations),
- Website: record[fields["agency_url"]],
- TranslatedWebsites: translateField(record[fields["agency_url"]], c.feedInfo.Language, c.defaultLanguage, c.translations),
- Timezone: record[fields["agency_timezone"]],
- }
- c.Timezone, _ = time.LoadLocation(agency.Timezone)
- if field, present := fields["agency_lang"]; present {
- agency.Language = record[field]
- }
- if field, present := fields["agency_phone"]; present {
- agency.PhoneNumber = record[field]
- agency.TranslatedPhoneNumbers = translateField(record[field], c.feedInfo.Language, c.defaultLanguage, c.translations)
- }
- if field, present := fields["agency_fare_url"]; present {
- agency.FareWebsite = record[field]
- agency.TranslatedFareWebsites = translateField(record[field], c.feedInfo.Language, c.defaultLanguage, c.translations)
- }
- if field, present := fields["agency_email"]; present {
- agency.Email = record[field]
- agency.TranslatedEmails = translateField(record[field], c.feedInfo.Language, c.defaultLanguage, c.translations)
- }
- bytes, err := bare.Marshal(&agency)
- if err != nil {
- return c, fmt.Errorf("while marshalling: %w", err)
- }
- _, err = result.Write(bytes)
- if err != nil {
- return c, fmt.Errorf("while writing: %w", err)
- }
- }
- return c, nil
- }
- func writeNameIndex(c feedConverter, index map[string][]uint, filename string, raw bool) error {
- path := c.TmpFeedPath
- feed := c.Feed
- result, err := os.Create(filepath.Join(path, filename))
- if err != nil {
- return fmt.Errorf("while creating file: %w", err)
- }
- defer result.Close()
- for name, offsets := range index {
- cleanQuery := name
- if !raw {
- cleanQuery, err = CleanQuery(name, feed)
- if err != nil {
- return fmt.Errorf("while cleaning name %s: %w", name, err)
- }
- }
- stopOffset := NameOffset{
- Name: cleanQuery,
- Offsets: offsets,
- }
- bytes, err := bare.Marshal(&stopOffset)
- if err != nil {
- return fmt.Errorf("while marshalling: %w", err)
- }
- _, err = result.Write(bytes)
- if err != nil {
- return fmt.Errorf("while writing: %w", err)
- }
- }
- return nil
- }
- func writeStopNameIndex(c feedConverter) error {
- err := writeNameIndex(c, c.StopsNameIndex, "ix_stop_names.bare", false)
- c.StopsNameIndex = map[string][]uint{}
- return err
- }
- func writeLineIndex(c feedConverter) error {
- err := writeNameIndex(c, c.LineIndex, "ix_lines.bare", false)
- c.LineIndex = map[string][]uint{}
- return err
- }
- func writeLineIdIndex(c feedConverter) error {
- err := writeCodeIndex(c, c.LineIdIndex, "ix_line_codes.bare")
- c.LineIndex = map[string][]uint{}
- return err
- }
- func writeTripIndex(c feedConverter) error {
- tripIndex := map[string][]uint{}
- for trip, offset := range c.TripsOffsets {
- tripIndex[trip] = []uint{offset}
- }
- err := writeNameIndex(c, tripIndex, "ix_trips.bare", true)
- c.TripsOffsets = map[string]uint{}
- return err
- }
- func writeStopCodeIndex(c feedConverter) error {
- err := writeCodeIndex(c, c.StopsCodeIndex, "ix_stop_codes.bare")
- c.StopsCodeIndex = CodeIndex{}
- return err
- }
- func writeCodeIndex(c feedConverter, i CodeIndex, filename string) error {
- path := c.TmpFeedPath
- result, err := os.Create(filepath.Join(path, filename))
- if err != nil {
- return fmt.Errorf("while creating file: %w", err)
- }
- defer result.Close()
- bytes, err := bare.Marshal(&i)
- if err != nil {
- return fmt.Errorf("while marshalling: %w", err)
- }
- _, err = result.Write(bytes)
- if err != nil {
- return fmt.Errorf("while writing: %w", err)
- }
- return nil
- }
- func deleteTxtFiles(c feedConverter) error {
- return nil
- return file.DeleteTxtFiles(c.TmpFeedPath, c.GtfsFilename)
- }
- func compressTraffic(c feedConverter) error {
- return file.CompressBare(c.TmpFeedPath, c.GtfsFilename)
- }
- func deleteBareFiles(c feedConverter) error {
- return file.DeleteBareFiles(c.TmpFeedPath)
- }
- func moveTraffic(c feedConverter) error {
- if err := append(c.ValidFromError, c.ValidTillError...); len(err) != 0 {
- return errors.Join(err...)
- }
- return file.MoveTraffic(c.GtfsFilename, c.ValidFrom.Format("20060102")+"_"+c.ValidTill.Format("20060102")+".txz", c.TmpFeedPath, c.HomeFeedPath)
- }
- func convert(input ...interface{}) (interface{}, error) {
- allErrors := []error{}
- args := input[0].(result)
- for _, gtfsFile := range args.gtfsFilenames {
- log.Printf("converting feed %s/%s\n", args.feed.Name(), gtfsFile)
- r := gott2.R[feedConverter]{
- S: feedConverter{
- TmpFeedPath: args.tmpFeedPath,
- GtfsFilename: gtfsFile,
- Feed: args.feed,
- HomeFeedPath: args.homeFeedPath,
- feedTranslations: args.feedTranslations,
- config: args.config,
- },
- LogLevel: gott2.Debug,
- }
- r = r.
- Tee(unzipGtfs).
- Tee(prepareFeedGtfs).
- Tee(convertVehicles).
- Bind(convertAgencies).
- Bind(convertFeedInfo).
- Tee(convertLuaScripts).
- Bind(readTranslations).
- Recover(recoverTranslations).
- Bind(createTrafficCalendarFile).
- Bind(convertCalendar).
- Recover(recoverCalendar).
- Bind(convertCalendarDates).
- Recover(recoverCalendar).
- Tee(checkAnyCalendarConverted).
- Tee(saveSchedules).
- Tee(saveFeedInfo).
- Recover(closeTrafficCalendarFile).
- Bind(convertDepartures).
- Bind(getLineNames).
- Bind(getStopNames).
- Bind(convertTrips).
- Map(clearDepartures).
- Map(clearStopNames).
- Map(clearLineNames).
- Bind(convertStops).
- Tee(writeTripIndex).
- Map(clearTripOffsets).
- Tee(writeStopNameIndex).
- Tee(writeStopCodeIndex).
- Map(clearTripsChangeOptions).
- Map(clearTripsThroughStops).
- Map(clearLineNames).
- Bind(getTrips).
- Bind(convertLineGraphs).
- Map(clearStops).
- Bind(convertLines).
- Tee(writeLineIndex).
- Tee(writeLineIdIndex).
- Map(clearLineGraphs).
- Map(clearLineHeadsigns).
- Tee(deleteTxtFiles).
- Tee(compressTraffic).
- Tee(deleteBareFiles).
- Tee(moveTraffic)
- if r.E != nil {
- log.Printf("Error converting %s: %v\n", args.feed.Name(), r.E)
- allErrors = append(allErrors, r.E)
- }
- if err := append(r.S.ValidFromError, r.S.ValidTillError...); len(err) != 0 {
- allErrors = append(allErrors, err...)
- log.Printf("Error converting %s: %v\n", args.feed.Name(), errors.Join(err...))
- }
- }
- if len(allErrors) > 0 {
- return gott.Tuple{args}, errors.Join(allErrors...)
- }
- return gott.Tuple{args}, nil
- }
- func signal(input ...interface{}) (interface{}, error) {
- args := input[0].(result)
- if len(args.gtfsFilenames) > 0 && args.pid > 0 {
- process, err := os.FindProcess(args.pid)
- if err != nil {
- return gott.Tuple{args}, err
- }
- err = process.Signal(syscall.SIGUSR1)
- if err != nil {
- return gott.Tuple{args}, err
- }
- }
- return gott.Tuple{args}, nil
- }
- func openLastUpdated(input ...interface{}) (interface{}, error) {
- args := input[0].(result)
- updatesFilename := filepath.Join(args.config.FeedsPath, "updated.bare")
- var err error
- args.updatesFile, err = os.OpenFile(updatesFilename, os.O_RDWR|os.O_CREATE, 0644)
- return gott.Tuple{args}, err
- }
- func isEmpty(input ...interface{}) error {
- args := input[0].(result)
- stat, err := os.Stat(args.updatesFile.Name())
- if err != nil {
- return err
- }
- if stat.Size() == 0 {
- return ErrEmpty{}
- }
- return nil
- }
- func unmarshalLastUpdated(input ...interface{}) (interface{}, error) {
- args := input[0].(result)
- var lastUpdated map[string]string
- err := bare.UnmarshalReader(args.updatesFile, &lastUpdated)
- args.updates = lastUpdated
- return gott.Tuple{args}, err
- }
- func recoverEmpty(input ...interface{}) (interface{}, error) {
- args := input[0].(result)
- err := input[1].(error)
- var emptyError ErrEmpty
- if errors.As(err, &emptyError) {
- return gott.Tuple{args}, nil
- } else {
- return gott.Tuple{args}, err
- }
- }
- func lastUpdated(input ...interface{}) interface{} {
- args := input[0].(result)
- args.updates[args.feed.String()] = time.Now().Format(time.RFC3339)
- return gott.Tuple{args}
- }
- func seekLastUpdated(input ...interface{}) (interface{}, error) {
- args := input[0].(result)
- _, err := args.updatesFile.Seek(0, 0)
- return gott.Tuple{args}, err
- }
- func marshalLastUpdated(input ...interface{}) error {
- args := input[0].(result)
- err := bare.MarshalWriter(bare.NewWriter(args.updatesFile), &args.updates)
- args.updatesFile.Close()
- return err
- }
- func Prepare(cfg config.Config, t Traffic, bimbaPid int, feedTranslations embed.FS) error { // todo(BAF18) remove pid
- etags, err := readEtags(cfg)
- if err != nil {
- return fmt.Errorf("while reading etags: %w", err)
- }
- newEtags := map[string]string{}
- for _, feed := range t.Feeds {
- log.Printf("converting %s\n", feed.Name())
- r := gott.Tuple{result{
- config: cfg,
- pid: bimbaPid,
- tmpPath: os.TempDir(),
- feed: feed,
- feedName: feed.String(),
- location: feed.getTimezone(),
- updates: map[string]string{},
- etags: etags,
- newEtags: newEtags,
- feedTranslations: feedTranslations,
- }}
- s, err := gott.NewResult(r).
- SetLevelLog(gott.Debug).
- Bind(createTmpPath).
- Bind(createFeedHome).
- Bind(listDownloadedVersions).
- Bind(getAllVersions).
- Map(findValidVersions).
- Bind(getGtfsFiles).
- Bind(convert).
- Bind(signal).
- Bind(openLastUpdated).
- Tee(isEmpty).
- Bind(unmarshalLastUpdated).
- Recover(recoverEmpty).
- Map(lastUpdated).
- Bind(seekLastUpdated).
- Tee(marshalLastUpdated).
- Finish()
- if err != nil {
- log.Printf("Error converting %s: %v\n", feed.String(), err)
- } else {
- etags = s.(gott.Tuple)[0].(result).etags
- newEtags = s.(gott.Tuple)[0].(result).newEtags
- }
- }
- return saveEtags(cfg, newEtags)
- }
|