123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524 |
- // SPDX-FileCopyrightText: Adam Evyčędo
- //
- // SPDX-License-Identifier: AGPL-3.0-or-later
- package traffic
- // TODO(BAF10) direction (0|1) to const (TO|BACK)
- // TODO Agency.language, FeedInfo.language -> IETF language tag
- // TODO Agency.phoneNumber -> E.123 format
- import (
- "text/template"
- "apiote.xyz/p/szczanieckiej/config"
- "apiote.xyz/p/szczanieckiej/file"
- "bufio"
- "embed"
- "encoding/csv"
- "errors"
- "fmt"
- "io"
- "io/fs"
- "log"
- "net/http"
- "os"
- "path/filepath"
- "sort"
- "strings"
- "syscall"
- "time"
- "gopkg.in/yaml.v3"
- gott2 "apiote.xyz/p/gott/v2"
- "git.sr.ht/~sircmpwn/go-bare"
- "notabug.org/apiote/gott"
- )
- //go:embed realtime_lua
- var luaScripts embed.FS
- type _LineGraph struct {
- StopCodesArray []string
- StopCodes map[string]int
- NextNodes map[int]map[int]struct{}
- }
- type ErrEmpty struct{}
- func (ErrEmpty) Error() string {
- return ""
- }
- type result struct {
- config config.Config
- pid int
- tmpPath string
- feed Feed
- feedName string
- location *time.Location
- tmpFeedPath string
- homeFeedPath string
- downloadedVersions []Version
- allVersions []Version
- gtfsFilenames []string
- missingVersions []Version
- updatesFile *os.File
- updates map[string]string
- etags map[string]string
- newEtags map[string]string
- feedTranslations embed.FS
- }
- type feedConverter struct {
- TmpFeedPath string
- GtfsFilename string
- Feed Feed
- HomeFeedPath string
- feedTranslations embed.FS
- config config.Config
- Timezone *time.Location
- TrafficCalendarFile *os.File
- tripsInputIndex map[string]int64
- routesInputIndex map[string]int64
- stopsInputIndex map[string]int64
- tripsOffsets map[string]uint
- StopsCodeIndex CodeIndex
- StopsNameIndex map[string][]uint
- Stops map[string]string
- LineGraphs map[string]map[uint]LineGraph
- lineHeadsigns map[string]map[uint][]string
- LineIndex map[string][]uint
- LineIdIndex CodeIndex
- ValidFrom time.Time
- ValidFromError []error
- ValidTill time.Time
- ValidTillError []error
- feedInfo FeedInfo
- defaultLanguage string
- translations map[string]map[string]string
- schedules map[string]Schedule
- trips map[string]Trip
- }
- // helper functions
- func translateFieldDefault(key, feedLanguage, defaultLanguage string, translations map[string]map[string]string) string {
- if feedLanguage == "mul" {
- if value, ok := translations[key][defaultLanguage]; !ok {
- return key
- } else {
- return value
- }
- }
- return key
- }
- func translateField(key, feedLanguage, defaultLanguage string, translations map[string]map[string]string) []Translation {
- var result []Translation
- if feedLanguage == "mul" {
- if value, ok := translations[key][defaultLanguage]; !ok {
- result = []Translation{{Language: defaultLanguage, Value: key}}
- } else {
- result = []Translation{{Language: defaultLanguage, Value: value}}
- }
- }
- for language, value := range translations[key] {
- if language == defaultLanguage {
- continue
- }
- result = append(result, Translation{Language: language, Value: value})
- }
- return result
- }
- func hex2colour(hex string) Colour {
- if hex[0] == '#' {
- hex = hex[1:]
- }
- colour := Colour{
- A: 0xff,
- }
- hexToByte := func(b byte) byte {
- switch {
- case b >= '0' && b <= '9':
- return b - '0'
- case b >= 'a' && b <= 'f':
- return b - 'a' + 10
- case b >= 'A' && b <= 'F':
- return b - 'A' + 10
- default:
- return 0
- }
- }
- switch len(hex) {
- case 6:
- colour.R = hexToByte(hex[0])<<4 + hexToByte(hex[1])
- colour.G = hexToByte(hex[2])<<4 + hexToByte(hex[3])
- colour.B = hexToByte(hex[4])<<4 + hexToByte(hex[5])
- case 3:
- colour.R = hexToByte(hex[0])<<4 + hexToByte(hex[0])
- colour.G = hexToByte(hex[1])<<4 + hexToByte(hex[1])
- colour.B = hexToByte(hex[2])<<4 + hexToByte(hex[2])
- }
- return colour
- }
- func readEtags(cfg config.Config) (map[string]string, error) {
- etagsFilename := filepath.Join(cfg.FeedsPath, "etags.bare")
- etagsFile, err := os.Open(etagsFilename)
- if err != nil {
- var pathError *os.PathError
- if errors.As(err, &pathError) && errors.Is(pathError, fs.ErrNotExist) {
- return map[string]string{}, nil
- }
- return nil, fmt.Errorf("while opening file: %w", err)
- }
- defer etagsFile.Close()
- var etags map[string]string
- err = bare.UnmarshalReader(etagsFile, &etags)
- if err != nil {
- return nil, fmt.Errorf("while unmarshalling: %w", err)
- }
- return etags, nil
- }
- func saveEtags(cfg config.Config, etags map[string]string) error {
- etagsFilename := filepath.Join(cfg.FeedsPath, "etags.bare")
- etagsFile, err := os.OpenFile(etagsFilename, os.O_RDWR|os.O_CREATE, 0644)
- if err != nil {
- return fmt.Errorf("while opening: %w", err)
- }
- defer etagsFile.Close()
- bytes, err := bare.Marshal(&etags)
- if err != nil {
- return fmt.Errorf("while marshalling: %w", err)
- }
- _, err = etagsFile.Write(bytes)
- if err != nil {
- return fmt.Errorf("while writing: %w", err)
- }
- return nil
- }
- // converting functions
- func createTmpPath(input ...interface{}) (interface{}, error) {
- args := input[0].(result)
- p := filepath.Join(args.tmpPath, args.feedName)
- err := os.MkdirAll(p, 0755)
- args.tmpFeedPath = p
- return gott.Tuple{args}, err
- }
- func createFeedHome(input ...interface{}) (interface{}, error) {
- args := input[0].(result)
- p := filepath.Join(args.config.FeedsPath, args.feedName)
- err := os.MkdirAll(p, 0755)
- args.homeFeedPath = p
- return gott.Tuple{args}, err
- }
- func listDownloadedVersions(input ...interface{}) (interface{}, error) {
- args := input[0].(result)
- v, err := ListVersionsTimezone(args.config, args.feed, args.feed.getTimezone())
- args.downloadedVersions = v
- return gott.Tuple{args}, err
- }
- func getAllVersions(input ...interface{}) (interface{}, error) {
- args := input[0].(result)
- v, err := args.feed.GetVersions(time.Now().In(args.location), args.location)
- args.allVersions = v
- return gott.Tuple{args}, err
- }
- func findValidVersions(input ...interface{}) interface{} {
- args := input[0].(result)
- now := time.Now().In(args.location)
- validVersions := FindValidVersions(args.allVersions, now)
- downloadedVersions := map[string]struct{}{}
- for _, downloadedVersion := range args.downloadedVersions {
- downloadedVersions[downloadedVersion.String()] = struct{}{}
- }
- missingVersions := []Version{}
- for _, version := range validVersions {
- if _, ok := downloadedVersions[version.String()]; !ok {
- missingVersions = append(missingVersions, version)
- }
- }
- // log.Println("all", args.allVersions)
- // log.Println("valid", validVersions)
- // log.Println("downloaded", downloadedVersions)
- // log.Println("missing", missingVersions)
- args.missingVersions = missingVersions
- return gott.Tuple{args}
- }
- func getGtfsFiles(input ...interface{}) (interface{}, error) {
- args := input[0].(result)
- names := []string{}
- for i, version := range args.missingVersions {
- name := fmt.Sprintf("%s_%d.zip", version.String(), i)
- zipPath := filepath.Join(args.tmpFeedPath, name)
- url := version.Link
- request, err := http.NewRequest("GET", url, nil)
- if err != nil {
- return gott.Tuple{args}, fmt.Errorf("while creating request for %s: %w", url, err)
- }
- request.Header.Add("If-None-Match", args.etags[url])
- client := http.Client{} // todo timeout
- response, err := client.Do(request)
- if err != nil {
- return gott.Tuple{args}, fmt.Errorf("while downloading gtfs %s %w", name, err)
- }
- if response.StatusCode != http.StatusOK && response.StatusCode != http.StatusNotModified {
- return gott.Tuple{args}, fmt.Errorf("wrong response code %d for %s: %w", response.StatusCode, url, err)
- }
- if response.StatusCode == 200 {
- args.newEtags[url] = response.Header.Get("etag")
- } else {
- args.newEtags[url] = args.etags[url]
- continue
- }
- file, err := os.Create(zipPath)
- if err != nil {
- return gott.Tuple{args}, fmt.Errorf("while creating zip for %s %w", name, err)
- }
- defer file.Close()
- _, err = io.Copy(file, response.Body)
- if err != nil {
- return gott.Tuple{args}, fmt.Errorf("while copying gtfs %s %w", name, err)
- }
- names = append(names, name)
- }
- args.gtfsFilenames = names
- return gott.Tuple{args}, nil
- }
- func unzipGtfs(c feedConverter) error {
- return file.UnzipGtfs(c.TmpFeedPath, c.GtfsFilename)
- }
- func convertVehicles(c feedConverter) error { // ( -- >> vehicles.bare)
- result, err := os.Create(filepath.Join(c.TmpFeedPath, "vehicles.bare"))
- if err != nil {
- return fmt.Errorf("while creating file: %w", err)
- }
- defer result.Close()
- vehicles, err := c.Feed.ConvertVehicles()
- for _, vehicle := range vehicles {
- bytes, err := bare.Marshal(&vehicle)
- if err != nil {
- return fmt.Errorf("while marshalling: %w", err)
- }
- _, err = result.Write(bytes)
- if err != nil {
- return fmt.Errorf("while writing to file: %w", err)
- }
- }
- return nil
- }
- func prepareFeedGtfs(c feedConverter) error {
- return c.Feed.FeedPrepareZip(c.TmpFeedPath)
- }
- func createTrafficCalendarFile(c feedConverter) (feedConverter, error) {
- path := c.TmpFeedPath
- var err error
- c.TrafficCalendarFile, err = os.Create(filepath.Join(path, "calendar.bare"))
- return c, err
- }
- func recoverCalendar(c feedConverter, e error) (feedConverter, error) {
- var pathError *os.PathError
- if errors.As(e, &pathError) && errors.Is(pathError, fs.ErrNotExist) {
- return c, nil
- }
- return c, e
- }
- func convertCalendar(c feedConverter) (feedConverter, error) { // ( feedInfo -- schedules >> )
- c.schedules = map[string]Schedule{}
- path := c.TmpFeedPath
- calendarFile, err := os.Open(filepath.Join(path, "calendar.txt"))
- if err != nil {
- return c, fmt.Errorf("while opening file: %w", err)
- }
- defer calendarFile.Close()
- r := csv.NewReader(bufio.NewReader(calendarFile))
- header, err := r.Read()
- if err != nil {
- return c, fmt.Errorf("while reading header: %w", err)
- }
- fields := map[string]int{}
- for i, headerField := range header {
- fields[headerField] = i
- }
- for {
- schedule := Schedule{}
- record, err := r.Read()
- if err == io.EOF {
- break
- }
- if err != nil {
- return c, fmt.Errorf("while reading a record: %w", err)
- }
- schedule.Id = record[fields["service_id"]]
- startDate := record[fields["start_date"]]
- endDate := record[fields["end_date"]]
- schedule.DateRanges = []DateRange{
- DateRange{
- Start: startDate[:4] + "-" + startDate[4:6] + "-" + startDate[6:],
- End: endDate[:4] + "-" + endDate[4:6] + "-" + endDate[6:],
- },
- }
- if record[fields["monday"]] == "1" {
- schedule.DateRanges[0].Weekdays |= (1 << 1)
- }
- if record[fields["tuesday"]] == "1" {
- schedule.DateRanges[0].Weekdays |= (1 << 2)
- }
- if record[fields["wednesday"]] == "1" {
- schedule.DateRanges[0].Weekdays |= (1 << 3)
- }
- if record[fields["thursday"]] == "1" {
- schedule.DateRanges[0].Weekdays |= (1 << 4)
- }
- if record[fields["friday"]] == "1" {
- schedule.DateRanges[0].Weekdays |= (1 << 5)
- }
- if record[fields["saturday"]] == "1" {
- schedule.DateRanges[0].Weekdays |= (1 << 6)
- }
- if record[fields["sunday"]] == "1" {
- schedule.DateRanges[0].Weekdays |= (1 << 0)
- schedule.DateRanges[0].Weekdays |= (1 << 7)
- }
- c.schedules[schedule.Id] = schedule
- scheduleStart, err := time.ParseInLocation(DateFormat, schedule.DateRanges[0].Start, c.Timezone)
- if err != nil {
- c.ValidFromError = append(c.ValidFromError, err)
- }
- if err == nil && (c.ValidFrom.IsZero() || scheduleStart.Before(c.ValidFrom)) {
- c.ValidFrom = scheduleStart
- c.feedInfo.ValidSince = scheduleStart.Format(ValidityFormat)
- }
- scheduleEnd, err := time.ParseInLocation(DateFormat, schedule.DateRanges[0].End, c.Timezone)
- if err != nil {
- c.ValidTillError = append(c.ValidTillError, err)
- }
- if err == nil && (c.ValidTill.IsZero() || scheduleEnd.After(c.ValidTill)) {
- c.ValidTill = scheduleEnd
- c.feedInfo.ValidTill = scheduleEnd.Format(ValidityFormat)
- }
- }
- return c, nil
- }
- func convertCalendarDates(c feedConverter) (feedConverter, error) { // ( feedInfo -- schedules >> )
- path := c.TmpFeedPath
- datesFile, err := os.Open(filepath.Join(path, "calendar_dates.txt"))
- if err != nil {
- return c, fmt.Errorf("while opening file: %w", err)
- }
- defer datesFile.Close()
- r := csv.NewReader(bufio.NewReader(datesFile))
- header, err := r.Read()
- if err != nil {
- return c, fmt.Errorf("while reading header: %w", err)
- }
- fields := map[string]int{}
- for i, headerField := range header {
- fields[headerField] = i
- }
- for {
- record, err := r.Read()
- if err == io.EOF {
- break
- }
- if err != nil {
- return c, fmt.Errorf("while reading a record: %w", err)
- }
- if record[fields["exception_type"]] == "1" {
- id := record[fields["service_id"]]
- schedule := c.schedules[id]
- date := record[fields["date"]]
- dateRange := DateRange{
- Start: date[:4] + "-" + date[4:6] + "-" + date[6:],
- End: date[:4] + "-" + date[4:6] + "-" + date[6:],
- Weekdays: 0xff,
- }
- if len(schedule.DateRanges) == 0 {
- schedule.Id = id
- schedule.DateRanges = []DateRange{dateRange}
- } else {
- schedule.DateRanges = append(schedule.DateRanges, dateRange)
- sort.Slice(schedule.DateRanges, func(i, j int) bool {
- return schedule.DateRanges[i].Start < schedule.DateRanges[j].Start
- })
- }
- c.schedules[schedule.Id] = schedule
- } else {
- date := record[fields["date"]]
- formatedDate := date[:4] + "-" + date[4:6] + "-" + date[6:]
- scheduleToEdit := c.schedules[record[fields["service_id"]]]
- newDateRanges := []DateRange{}
- for i := 0; i < len(scheduleToEdit.DateRanges); i++ {
- dateRange := scheduleToEdit.DateRanges[i]
- if dateRange.Start == formatedDate {
- d, _ := time.ParseInLocation(DateFormat, dateRange.Start, c.Timezone)
- dateRange.Start = d.AddDate(0, 0, 1).Format(DateFormat)
- if dateRange.Start <= dateRange.End {
- newDateRanges = append(newDateRanges, dateRange)
- }
- continue
- }
- if dateRange.Start < formatedDate && formatedDate < dateRange.End {
- d, _ := time.ParseInLocation(DateFormat, formatedDate, c.Timezone)
- range1 := DateRange{dateRange.Start, d.AddDate(0, 0, -1).Format(DateFormat), dateRange.Weekdays}
- range2 := DateRange{d.AddDate(0, 0, 1).Format(DateFormat), dateRange.End, dateRange.Weekdays}
- newDateRanges = append(newDateRanges, range1)
- newDateRanges = append(newDateRanges, range2)
- continue
- }
- if formatedDate == dateRange.End {
- d, _ := time.ParseInLocation(DateFormat, dateRange.End, c.Timezone)
- dateRange.End = d.AddDate(0, 0, -1).Format(DateFormat)
- newDateRanges = append(newDateRanges, dateRange)
- continue
- }
- newDateRanges = append(newDateRanges, dateRange)
- }
- scheduleToEdit.DateRanges = newDateRanges
- c.schedules[record[fields["service_id"]]] = scheduleToEdit
- }
- }
- for _, schedule := range c.schedules {
- lastDateRange := len(schedule.DateRanges) - 1
- scheduleStart, err := time.ParseInLocation(DateFormat, schedule.DateRanges[0].Start, c.Timezone)
- if err != nil {
- c.ValidFromError = append(c.ValidFromError, err)
- }
- if err == nil && (c.ValidFrom.IsZero() || scheduleStart.Before(c.ValidFrom)) {
- c.ValidFrom = scheduleStart
- c.feedInfo.ValidSince = scheduleStart.Format(ValidityFormat)
- }
- scheduleEnd, err := time.ParseInLocation(DateFormat, schedule.DateRanges[lastDateRange].End, c.Timezone)
- if err != nil {
- c.ValidTillError = append(c.ValidTillError, err)
- }
- if err == nil && (c.ValidTill.IsZero() || scheduleEnd.After(c.ValidTill)) {
- c.ValidTill = scheduleEnd
- c.feedInfo.ValidTill = scheduleEnd.Format(ValidityFormat)
- }
- }
- return c, nil
- }
- func checkAnyCalendarConverted(c feedConverter) error {
- if len(c.schedules) == 0 {
- return fmt.Errorf("no calendar converted")
- }
- return nil
- }
- func saveSchedules(c feedConverter) error {
- resultFile := c.TrafficCalendarFile
- schedulesArray := make([]Schedule, len(c.schedules))
- i := 0
- for _, schedule := range c.schedules {
- schedulesArray[i] = schedule
- i++
- }
- sort.Slice(schedulesArray, func(i, j int) bool {
- return schedulesArray[i].DateRanges[0].Start < schedulesArray[j].DateRanges[0].Start
- })
- for _, schedule := range schedulesArray {
- bytes, err := bare.Marshal(&schedule)
- if err != nil {
- return fmt.Errorf("while marshalling: %w", err)
- }
- _, err = resultFile.Write(bytes)
- if err != nil {
- return fmt.Errorf("while writing: %w", err)
- }
- }
- c.schedules = map[string]Schedule{}
- return nil
- }
- func saveFeedInfo(c feedConverter) error {
- path := c.TmpFeedPath
- result, err := os.Create(filepath.Join(path, "feed_info.bare"))
- if err != nil {
- return fmt.Errorf("while creating file: %w", err)
- }
- defer result.Close()
- bytes, err := bare.Marshal(&c.feedInfo)
- if err != nil {
- return fmt.Errorf("while marshalling: %w", err)
- }
- _, err = result.Write(bytes)
- if err != nil {
- return fmt.Errorf("while writing: %w", err)
- }
- log.Printf("timetable is valid: %s to %s\n", c.feedInfo.ValidSince, c.feedInfo.ValidTill)
- c.feedInfo = FeedInfo{}
- return nil
- }
- func closeTrafficCalendarFile(c feedConverter, e error) (feedConverter, error) {
- if c.TrafficCalendarFile != nil {
- c.TrafficCalendarFile.Close()
- }
- return c, e
- }
- func forEachRow(filename string, f func(int64, map[string]int, []string) error) error {
- return forEachRowWithHeader(filename, func(_ []string) error {
- return nil
- }, f)
- }
- func forEachRowWithHeader(filename string, h func([]string) error, f func(int64, map[string]int, []string) error) error {
- file, err := os.Open(filename)
- if err != nil {
- return fmt.Errorf("while opening file: %w", err)
- }
- defer file.Close()
- r := csv.NewReader(file)
- header, err := r.Read()
- if err != nil {
- return fmt.Errorf("while reading header: %w", err)
- }
- err = h(header)
- if err != nil {
- return fmt.Errorf("while performing function on header: %w", err)
- }
- fields := map[string]int{}
- for i, headerField := range header {
- fields[headerField] = i
- }
- for {
- offset := r.InputOffset()
- record, err := r.Read()
- if err == io.EOF {
- break
- }
- if err != nil {
- return fmt.Errorf("while reading a record: %w", err)
- }
- err = f(offset, fields, record)
- if err != nil {
- return fmt.Errorf("while performing function: %w", err)
- }
- }
- return nil
- }
- func clearStops(c feedConverter) feedConverter {
- c.Stops = map[string]string{}
- return c
- }
- func clearTripOffsets(c feedConverter) feedConverter {
- c.tripsOffsets = map[string]uint{}
- return c
- }
- func clearLineGraphs(c feedConverter) feedConverter {
- c.LineGraphs = map[string]map[uint]LineGraph{}
- return c
- }
- func clearLineHeadsigns(c feedConverter) feedConverter {
- c.lineHeadsigns = map[string]map[uint][]string{}
- return c
- }
- func getTrips(c feedConverter) (feedConverter, error) {
- file, err := os.Open(filepath.Join(c.TmpFeedPath, "trips.bare"))
- if err != nil {
- return c, fmt.Errorf("while opening trips: %w", err)
- }
- trips := map[string]Trip{}
- for {
- var trip Trip
- err := bare.UnmarshalReader(file, &trip)
- trip.Departures = []Departure{}
- trips[trip.Id] = trip
- if err != nil {
- if err == io.EOF {
- break
- } else {
- return c, fmt.Errorf("while unmarshaling: %w", err)
- }
- }
- }
- c.trips = trips
- return c, nil
- }
- func convertLineGraphs(c feedConverter) (feedConverter, error) { // O(n:stop_times) ; (trips, stops -- lineGrapsh:map[lineID]map[direction]graph, lineHeadsigns:map[lineID]map[direction][]headsigns >> )
- path := c.TmpFeedPath
- trips := c.trips
- stops := c.Stops
- // lineID dire headsi
- lineHeadsignsMap := map[string]map[uint]map[string]struct{}{}
- // lineID dire headsi
- lineHeadsigns := map[string]map[uint][]string{}
- // lineNa dire
- graphs := map[string]map[uint]_LineGraph{}
- file, err := os.Open(filepath.Join(path, "stop_times.txt"))
- if err != nil {
- return c, fmt.Errorf("while opening stop_times: %w", err)
- }
- defer file.Close()
- r := csv.NewReader(bufio.NewReader(file))
- header, err := r.Read()
- if err != nil {
- return c, fmt.Errorf("while reading header: %w", err)
- }
- fields := map[string]int{}
- for i, headerField := range header {
- fields[headerField] = i
- }
- previousTripID := ""
- previous := -1
- previousTrip := Trip{}
- for {
- record, err := r.Read()
- if err == io.EOF {
- break
- }
- if err != nil {
- return c, fmt.Errorf("while reading a record: %w", err)
- }
- tripID := record[fields["trip_id"]]
- stop := stops[record[fields["stop_id"]]]
- trip := trips[tripID]
- if _, ok := lineHeadsignsMap[trip.LineID]; !ok {
- lineHeadsignsMap[trip.LineID] = map[uint]map[string]struct{}{}
- lineHeadsigns[trip.LineID] = map[uint][]string{}
- }
- if _, ok := lineHeadsignsMap[trip.LineID][trip.Direction.Value()]; !ok {
- lineHeadsignsMap[trip.LineID][trip.Direction.Value()] = map[string]struct{}{}
- lineHeadsigns[trip.LineID][trip.Direction.Value()] = []string{}
- }
- lineHeadsignsMap[trip.LineID][trip.Direction.Value()][trip.Headsign] = struct{}{}
- if _, ok := graphs[trip.LineID]; !ok {
- graphs[trip.LineID] = map[uint]_LineGraph{}
- }
- if previousTripID != tripID && previousTripID != "" {
- // last of previous trip
- graph := graphs[previousTrip.LineID][previousTrip.Direction.Value()]
- if graph.NextNodes == nil {
- graph.NextNodes = map[int]map[int]struct{}{}
- }
- if graph.NextNodes[previous] == nil {
- graph.NextNodes[previous] = map[int]struct{}{}
- }
- graphs[previousTrip.LineID][previousTrip.Direction.Value()] = graph
- graphs[previousTrip.LineID][previousTrip.Direction.Value()].NextNodes[previous][-1] = struct{}{}
- }
- graph := graphs[trip.LineID][trip.Direction.Value()]
- if graph.StopCodes == nil {
- graph.StopCodes = map[string]int{}
- }
- if graph.NextNodes == nil {
- graph.NextNodes = map[int]map[int]struct{}{}
- }
- current := -1
- current, ok := graph.StopCodes[stop]
- if !ok {
- current = len(graph.StopCodesArray)
- graph.StopCodesArray = append(graph.StopCodesArray, stop)
- graph.StopCodes[stop] = current
- }
- if previousTripID != tripID {
- // first of current trip
- if graph.NextNodes[-1] == nil {
- graph.NextNodes[-1] = map[int]struct{}{}
- }
- if _, ok := graph.NextNodes[-1][current]; !ok {
- graph.NextNodes[-1][current] = struct{}{}
- }
- } else {
- // second <- first to last <- penultimate of current trip
- if graph.NextNodes[previous] == nil {
- graph.NextNodes[previous] = map[int]struct{}{}
- }
- if _, ok := graph.NextNodes[previous][current]; !ok {
- graph.NextNodes[previous][current] = struct{}{}
- }
- }
- previous = current
- previousTripID = tripID
- previousTrip = trip
- graphs[trip.LineID][trip.Direction.Value()] = graph
- }
- g := graphs[previousTrip.LineID][previousTrip.Direction.Value()]
- if g.NextNodes[previous] == nil {
- g.NextNodes[previous] = map[int]struct{}{}
- }
- if _, ok := g.NextNodes[previous][-1]; !ok {
- g.NextNodes[previous][-1] = struct{}{}
- }
- for lineID, directions := range lineHeadsignsMap {
- for direction, headsigns := range directions {
- for headsign := range headsigns {
- lineHeadsigns[lineID][direction] = append(lineHeadsigns[lineID][direction], headsign)
- }
- }
- }
- c.LineGraphs = map[string]map[uint]LineGraph{}
- for lineID, graphByDirection := range graphs {
- c.LineGraphs[lineID] = map[uint]LineGraph{}
- for direction, graph := range graphByDirection {
- c.LineGraphs[lineID][direction] = LineGraph{
- StopCodes: graph.StopCodesArray,
- NextNodes: map[int][]int{},
- }
- for from, tos := range graph.NextNodes {
- for to := range tos {
- c.LineGraphs[lineID][direction].NextNodes[from] = append(c.LineGraphs[lineID][direction].NextNodes[from], to)
- }
- }
- }
- }
- c.lineHeadsigns = lineHeadsigns
- return c, nil
- }
- func convertLines(c feedConverter) (feedConverter, error) { // O(n:routes) ; (lineGraphs, lineHeadsigns -- lineIndex:map[lineName][]offsets, lineIdIndex:CodeIndex >> lines)
- path := c.TmpFeedPath
- feed := c.Feed
- file, err := os.Open(filepath.Join(path, "routes.txt"))
- if err != nil {
- return c, fmt.Errorf("while opening file: %w", err)
- }
- defer file.Close()
- result, err := os.Create(filepath.Join(path, "lines.bare"))
- if err != nil {
- return c, fmt.Errorf("while creating file: %w", err)
- }
- defer result.Close()
- r := csv.NewReader(bufio.NewReader(file))
- header, err := r.Read()
- if err != nil {
- return c, fmt.Errorf("while reading header: %w", err)
- }
- fields := map[string]int{}
- for i, headerField := range header {
- fields[headerField] = i
- }
- var offset uint = 0
- index := map[string][]uint{}
- idIndex := CodeIndex{}
- for {
- record, err := r.Read()
- if err == io.EOF {
- break
- }
- if err != nil {
- return c, fmt.Errorf("while reading a record: %w", err)
- }
- routeID := record[fields["route_id"]]
- lineName := c.Feed.Flags().LineName
- for _, template := range []string{"route_short_name", "route_long_name"} {
- lineName = strings.Replace(lineName, "{{"+template+"}}", record[fields[template]], -1)
- }
- var kind uint
- fmt.Sscanf(record[fields["route_type"]], "%d", &kind)
- colour := "ffffff"
- if colourIx, ok := fields["route_color"]; ok && record[colourIx] != "" {
- colour = record[colourIx]
- }
- directions := []uint{}
- for direction := range c.lineHeadsigns[routeID] {
- directions = append(directions, direction)
- }
- sort.Slice(directions, func(i, j int) bool {
- return directions[i] < directions[j]
- })
- headsigns := [][]string{}
- translatedHeadsigns := [][][]Translation{}
- for _, direction := range directions {
- dirHeadsigns := c.lineHeadsigns[routeID][direction]
- headsigns = append(headsigns, dirHeadsigns)
- translatedHeadsign := [][]Translation{}
- for _, headsign := range dirHeadsigns {
- translatedHeadsign = append(translatedHeadsign, translateField(headsign, c.feedInfo.Language, c.defaultLanguage, c.translations))
- }
- translatedHeadsigns = append(translatedHeadsigns, translatedHeadsign)
- }
- graphs := []LineGraph{}
- for _, direction := range directions {
- graphs = append(graphs, c.LineGraphs[routeID][direction])
- }
- line := Line{
- Id: routeID,
- Name: lineName,
- Colour: hex2colour(colour),
- Kind: LineType(kind),
- Graphs: graphs,
- Headsigns: headsigns,
- }
- if field, present := fields["agency_id"]; present {
- line.AgencyID = record[field]
- }
- bytes, err := bare.Marshal(&line)
- if err != nil {
- return c, fmt.Errorf("while marshalling: %w", err)
- }
- b, err := result.Write(bytes)
- if err != nil {
- return c, fmt.Errorf("while writing: %w", err)
- }
- cleanQuery, err := CleanQuery(line.Name, feed)
- if err != nil {
- return c, fmt.Errorf("while cleaning line name: %w", err)
- }
- index[cleanQuery] = append(index[cleanQuery], offset)
- idIndex[routeID] = offset
- offset += uint(b)
- }
- c.LineIdIndex = idIndex
- c.LineIndex = index
- return c, nil
- }
- func convertFeedInfo(c feedConverter) (feedConverter, error) { // O(1:feed_info) ; ( -- feed_info >> )
- path := c.TmpFeedPath
- feedInfo := FeedInfo{}
- file, err := os.Open(filepath.Join(path, "feed_info.txt"))
- if err != nil {
- if errors.Is(err, fs.ErrNotExist) {
- log.Println("[WARN] no feed_info.txt")
- file = nil
- } else {
- return c, fmt.Errorf("while opening file: %w", err)
- }
- }
- if file != nil {
- defer file.Close()
- r := csv.NewReader(bufio.NewReader(file))
- header, err := r.Read()
- if err != nil {
- return c, fmt.Errorf("while reading header: %w", err)
- }
- fields := map[string]int{}
- for i, headerField := range header {
- fields[headerField] = i
- }
- record, err := r.Read()
- if err != nil {
- return c, fmt.Errorf("while reading a record: %w", err)
- }
- feedInfo.Website = record[fields["feed_publisher_url"]]
- feedInfo.Language = record[fields["feed_lang"]]
- if defaultLanguageIndex, ok := fields["default_lang"]; ok {
- c.defaultLanguage = record[defaultLanguageIndex]
- }
- if ix, ok := fields["feed_start_date"]; ok {
- c.ValidFrom, err = time.ParseInLocation("20060102", record[ix], c.Timezone)
- if err != nil {
- c.ValidFromError = append(c.ValidFromError, err)
- }
- feedInfo.ValidSince = record[ix]
- }
- if ix, ok := fields["feed_end_date"]; ok {
- c.ValidTill, err = time.ParseInLocation("20060102", record[ix], c.Timezone)
- if err != nil {
- c.ValidTillError = append(c.ValidTillError, err)
- }
- feedInfo.ValidTill = record[ix]
- }
- }
- feedInfo.Timezone = c.Timezone.String()
- feedInfo.RealtimeFeeds = c.Feed.RealtimeFeeds()
- feedInfo.QrHost, feedInfo.QrLocation, feedInfo.QrSelector = c.Feed.QRInfo()
- feedInfo.Attributions, feedInfo.Descriptions, err = getAttrDesc(c.Feed.String(), c.feedTranslations)
- feedInfo.Name = c.Feed.Name()
- c.feedInfo = feedInfo
- return c, err
- }
- func convertLuaScripts(c feedConverter) error { // O(1) ; ( -- >> updates.lua, alerts.lua, vehicles.lua )
- filenames := []string{"updates", "vehicles", "alerts"}
- for _, filename := range filenames {
- t, err := template.ParseFS(luaScripts, "realtime_lua/"+c.Feed.String()+"_"+filename+".lua")
- if err != nil {
- if strings.Contains(err.Error(), "pattern matches no files") {
- log.Printf("%s.lua for this feed does not exist, ignoring\n", filename)
- continue
- }
- return fmt.Errorf("while parsing template %s: %w", filename, err)
- }
- path := c.TmpFeedPath
- writeFile, err := os.Create(filepath.Join(path, filename+".lua"))
- if err != nil {
- return fmt.Errorf("while creating %s: %w", filename, err)
- }
- defer writeFile.Close()
- err = t.Execute(writeFile, c.config.Auth[c.Feed.String()])
- if err != nil {
- return fmt.Errorf("while executing template %s: %w", filename, err)
- }
- }
- return nil
- }
- func getAttrDesc(feedID string, feedTranslations embed.FS) (map[string]string, map[string]string, error) {
- attributions := map[string]string{}
- descriptions := map[string]string{}
- dir, err := feedTranslations.ReadDir("translations")
- if err != nil {
- return attributions, descriptions, err
- }
- for _, f := range dir {
- translation := map[string]string{}
- name := f.Name()
- lang := strings.Split(name, ".")[1]
- fileContent, err := feedTranslations.ReadFile("translations/" + name)
- if err != nil {
- log.Printf("error reading translation %s\n", name)
- continue
- }
- err = yaml.Unmarshal(fileContent, &translation)
- if err != nil {
- return attributions, descriptions, fmt.Errorf("while unmarshalling %s: %w", name, err)
- }
- attributions[lang] = translation[feedID+"_attribution"]
- descriptions[lang] = translation[feedID+"_description"]
- if lang == "en" {
- attributions["und"] = translation[feedID+"_attribution"]
- descriptions["und"] = translation[feedID+"_description"]
- }
- }
- return attributions, descriptions, nil
- }
- func readTranslations(c feedConverter) (feedConverter, error) { // O(n:translations) ; ( -- translations >>)
- path := c.TmpFeedPath
- file, err := os.Open(filepath.Join(path, "translations.txt"))
- if err != nil {
- return c, fmt.Errorf("while opening file: %w", err)
- }
- defer file.Close()
- translations := map[string]map[string]string{}
- r := csv.NewReader(bufio.NewReader(file))
- header, err := r.Read()
- if err != nil {
- return c, fmt.Errorf("while reading header: %w", err)
- }
- fields := map[string]int{}
- for i, headerField := range header {
- fields[headerField] = i
- }
- for {
- record, err := r.Read()
- if err == io.EOF {
- break
- }
- if err != nil {
- return c, fmt.Errorf("while reading a record: %w", err)
- }
- key := record[fields["field_value"]]
- language := record[fields["language"]]
- translation := record[fields["translation"]]
- if _, ok := translations[key]; !ok {
- translations[key] = map[string]string{}
- }
- translations[key][language] = translation
- }
- c.translations = translations
- return c, nil
- }
- func recoverTranslations(c feedConverter, e error) (feedConverter, error) {
- var pathError *os.PathError
- if errors.As(e, &pathError) && errors.Is(pathError, fs.ErrNotExist) {
- return c, nil
- }
- return c, e
- }
- func convertAgencies(c feedConverter) (feedConverter, error) { // O(n:agency) ; ( -- >> agencies)
- path := c.TmpFeedPath
- file, err := os.Open(filepath.Join(path, "agency.txt"))
- if err != nil {
- return c, fmt.Errorf("while opening file: %w", err)
- }
- defer file.Close()
- result, err := os.Create(filepath.Join(path, "agencies.bare"))
- if err != nil {
- return c, fmt.Errorf("while creating file: %w", err)
- }
- defer file.Close()
- r := csv.NewReader(bufio.NewReader(file))
- header, err := r.Read()
- if err != nil {
- return c, fmt.Errorf("while reading header: %w", err)
- }
- fields := map[string]int{}
- for i, headerField := range header {
- fields[headerField] = i
- }
- for {
- record, err := r.Read()
- if err == io.EOF {
- break
- }
- if err != nil {
- return c, fmt.Errorf("while reading a record: %w", err)
- }
- agency := Agency{
- Id: record[fields["agency_id"]],
- Name: record[fields["agency_name"]],
- TranslatedNames: translateField(record[fields["agency_name"]], c.feedInfo.Language, c.defaultLanguage, c.translations),
- Website: record[fields["agency_url"]],
- TranslatedWebsites: translateField(record[fields["agency_url"]], c.feedInfo.Language, c.defaultLanguage, c.translations),
- Timezone: record[fields["agency_timezone"]],
- }
- c.Timezone, _ = time.LoadLocation(agency.Timezone)
- if field, present := fields["agency_lang"]; present {
- agency.Language = record[field]
- }
- if field, present := fields["agency_phone"]; present {
- agency.PhoneNumber = record[field]
- agency.TranslatedPhoneNumbers = translateField(record[field], c.feedInfo.Language, c.defaultLanguage, c.translations)
- }
- if field, present := fields["agency_fare_url"]; present {
- agency.FareWebsite = record[field]
- agency.TranslatedFareWebsites = translateField(record[field], c.feedInfo.Language, c.defaultLanguage, c.translations)
- }
- if field, present := fields["agency_email"]; present {
- agency.Email = record[field]
- agency.TranslatedEmails = translateField(record[field], c.feedInfo.Language, c.defaultLanguage, c.translations)
- }
- bytes, err := bare.Marshal(&agency)
- if err != nil {
- return c, fmt.Errorf("while marshalling: %w", err)
- }
- _, err = result.Write(bytes)
- if err != nil {
- return c, fmt.Errorf("while writing: %w", err)
- }
- }
- return c, nil
- }
- func writeNameIndex(c feedConverter, index map[string][]uint, filename string, raw bool) error {
- path := c.TmpFeedPath
- feed := c.Feed
- result, err := os.Create(filepath.Join(path, filename))
- if err != nil {
- return fmt.Errorf("while creating file: %w", err)
- }
- defer result.Close()
- for name, offsets := range index {
- cleanQuery := name
- if !raw {
- cleanQuery, err = CleanQuery(name, feed)
- if err != nil {
- return fmt.Errorf("while cleaning name %s: %w", name, err)
- }
- }
- stopOffset := NameOffset{
- Name: cleanQuery,
- Offsets: offsets,
- }
- bytes, err := bare.Marshal(&stopOffset)
- if err != nil {
- return fmt.Errorf("while marshalling: %w", err)
- }
- _, err = result.Write(bytes)
- if err != nil {
- return fmt.Errorf("while writing: %w", err)
- }
- }
- return nil
- }
- func writeStopNameIndex(c feedConverter) error {
- err := writeNameIndex(c, c.StopsNameIndex, "ix_stop_names.bare", false)
- c.StopsNameIndex = map[string][]uint{}
- return err
- }
- func writeLineIndex(c feedConverter) error {
- err := writeNameIndex(c, c.LineIndex, "ix_lines.bare", false)
- c.LineIndex = map[string][]uint{}
- return err
- }
- func writeLineIdIndex(c feedConverter) error {
- err := writeCodeIndex(c, c.LineIdIndex, "ix_line_codes.bare")
- c.LineIndex = map[string][]uint{}
- return err
- }
- func writeTripIndex(c feedConverter) error {
- tripIndex := map[string][]uint{}
- for trip, offset := range c.tripsOffsets {
- tripIndex[trip] = []uint{offset}
- }
- err := writeNameIndex(c, tripIndex, "ix_trips.bare", true)
- c.tripsOffsets = map[string]uint{}
- return err
- }
- func writeStopCodeIndex(c feedConverter) error {
- err := writeCodeIndex(c, c.StopsCodeIndex, "ix_stop_codes.bare")
- c.StopsCodeIndex = CodeIndex{}
- return err
- }
- func writeCodeIndex(c feedConverter, i CodeIndex, filename string) error {
- path := c.TmpFeedPath
- result, err := os.Create(filepath.Join(path, filename))
- if err != nil {
- return fmt.Errorf("while creating file: %w", err)
- }
- defer result.Close()
- bytes, err := bare.Marshal(&i)
- if err != nil {
- return fmt.Errorf("while marshalling: %w", err)
- }
- _, err = result.Write(bytes)
- if err != nil {
- return fmt.Errorf("while writing: %w", err)
- }
- return nil
- }
- func deleteTxtFiles(c feedConverter) error {
- return file.DeleteTxtFiles(c.TmpFeedPath, c.GtfsFilename)
- }
- func compressTraffic(c feedConverter) error {
- return file.CompressBare(c.TmpFeedPath, c.GtfsFilename)
- }
- func deleteBareFiles(c feedConverter) error {
- return file.DeleteBareFiles(c.TmpFeedPath)
- }
- func moveTraffic(c feedConverter) error {
- if err := append(c.ValidFromError, c.ValidTillError...); len(err) != 0 {
- return errors.Join(err...)
- }
- return file.MoveTraffic(c.GtfsFilename, c.ValidFrom.Format("20060102")+"_"+c.ValidTill.Format("20060102")+".txz", c.TmpFeedPath, c.HomeFeedPath)
- }
- func convert(input ...interface{}) (interface{}, error) {
- allErrors := []error{}
- args := input[0].(result)
- for _, gtfsFile := range args.gtfsFilenames {
- log.Printf("converting feed %s/%s\n", args.feed.Name(), gtfsFile)
- r := gott2.R[feedConverter]{
- S: feedConverter{
- TmpFeedPath: args.tmpFeedPath,
- GtfsFilename: gtfsFile,
- Feed: args.feed,
- HomeFeedPath: args.homeFeedPath,
- feedTranslations: args.feedTranslations,
- config: args.config,
- },
- LogLevel: gott2.Debug,
- }
- r = r.
- Tee(unzipGtfs).
- Tee(prepareFeedGtfs).
- Tee(convertVehicles).
- Bind(convertAgencies).
- Bind(convertFeedInfo).
- Tee(convertLuaScripts).
- Bind(readTranslations).
- Recover(recoverTranslations).
- Bind(createTrafficCalendarFile).
- Bind(convertCalendar).
- Recover(recoverCalendar).
- Bind(convertCalendarDates).
- Recover(recoverCalendar).
- Tee(checkAnyCalendarConverted).
- Tee(saveSchedules).
- Tee(saveFeedInfo).
- Recover(closeTrafficCalendarFile).
- // ---
- Bind(readInputTripsIndex).
- Bind(readInputStopsIndex).
- Bind(readInputRoutesIndex).
- Bind(convertDepartures).
- Map(dropInputRoutesIndex).
- Map(dropInputStopsIndex).
- Map(dropInputTripsIndex).
- // ---
- Tee(sortTripsThroughStop).
- Bind(readTripsThroughStopsIndex).
- Bind(convertStops).
- Map(dropInputRoutesIndex).
- Map(dropInputStopsIndex).
- Tee(writeTripIndex).
- Map(clearTripOffsets).
- Tee(writeStopNameIndex).
- Tee(writeStopCodeIndex).
- // ---
- Bind(getTrips).
- Bind(convertLineGraphs).
- Map(clearStops).
- Bind(convertLines).
- Tee(writeLineIndex).
- Tee(writeLineIdIndex).
- Map(clearLineGraphs).
- Map(clearLineHeadsigns).
- Tee(deleteTxtFiles).
- Tee(compressTraffic).
- Tee(deleteBareFiles).
- Tee(moveTraffic)
- if r.E != nil {
- log.Printf("Error converting %s: %v\n", args.feed.Name(), r.E)
- allErrors = append(allErrors, r.E)
- }
- if err := append(r.S.ValidFromError, r.S.ValidTillError...); len(err) != 0 {
- allErrors = append(allErrors, err...)
- log.Printf("Error converting %s: %v\n", args.feed.Name(), errors.Join(err...))
- }
- }
- if len(allErrors) > 0 {
- return gott.Tuple{args}, errors.Join(allErrors...)
- }
- return gott.Tuple{args}, nil
- }
- func signal(input ...interface{}) (interface{}, error) {
- args := input[0].(result)
- if len(args.gtfsFilenames) > 0 && args.pid > 0 {
- process, err := os.FindProcess(args.pid)
- if err != nil {
- return gott.Tuple{args}, err
- }
- err = process.Signal(syscall.SIGUSR1)
- if err != nil {
- return gott.Tuple{args}, err
- }
- }
- return gott.Tuple{args}, nil
- }
- func openLastUpdated(input ...interface{}) (interface{}, error) {
- args := input[0].(result)
- updatesFilename := filepath.Join(args.config.FeedsPath, "updated.bare")
- var err error
- args.updatesFile, err = os.OpenFile(updatesFilename, os.O_RDWR|os.O_CREATE, 0644)
- return gott.Tuple{args}, err
- }
- func isEmpty(input ...interface{}) error {
- args := input[0].(result)
- stat, err := os.Stat(args.updatesFile.Name())
- if err != nil {
- return err
- }
- if stat.Size() == 0 {
- return ErrEmpty{}
- }
- return nil
- }
- func unmarshalLastUpdated(input ...interface{}) (interface{}, error) {
- args := input[0].(result)
- var lastUpdated map[string]string
- err := bare.UnmarshalReader(args.updatesFile, &lastUpdated)
- args.updates = lastUpdated
- return gott.Tuple{args}, err
- }
- func recoverEmpty(input ...interface{}) (interface{}, error) {
- args := input[0].(result)
- err := input[1].(error)
- var emptyError ErrEmpty
- if errors.As(err, &emptyError) {
- return gott.Tuple{args}, nil
- } else {
- return gott.Tuple{args}, err
- }
- }
- func lastUpdated(input ...interface{}) interface{} {
- args := input[0].(result)
- args.updates[args.feed.String()] = time.Now().Format(time.RFC3339)
- return gott.Tuple{args}
- }
- func seekLastUpdated(input ...interface{}) (interface{}, error) {
- args := input[0].(result)
- _, err := args.updatesFile.Seek(0, 0)
- return gott.Tuple{args}, err
- }
- func marshalLastUpdated(input ...interface{}) error {
- args := input[0].(result)
- err := bare.MarshalWriter(bare.NewWriter(args.updatesFile), &args.updates)
- args.updatesFile.Close()
- return err
- }
- func Prepare(cfg config.Config, t Traffic, bimbaPid int, feedTranslations embed.FS) error { // todo(BAF18) remove pid
- etags, err := readEtags(cfg)
- if err != nil {
- return fmt.Errorf("while reading etags: %w", err)
- }
- newEtags := map[string]string{}
- for _, feed := range t.Feeds {
- log.Printf("converting %s\n", feed.Name())
- r := gott.Tuple{result{
- config: cfg,
- pid: bimbaPid,
- tmpPath: os.TempDir(),
- feed: feed,
- feedName: feed.String(),
- location: feed.getTimezone(),
- updates: map[string]string{},
- etags: etags,
- newEtags: newEtags,
- feedTranslations: feedTranslations,
- }}
- s, err := gott.NewResult(r).
- SetLevelLog(gott.Debug).
- Bind(createTmpPath).
- Bind(createFeedHome).
- Bind(listDownloadedVersions).
- Bind(getAllVersions).
- Map(findValidVersions).
- Bind(getGtfsFiles).
- Bind(convert).
- Bind(signal).
- Bind(openLastUpdated).
- Tee(isEmpty).
- Bind(unmarshalLastUpdated).
- Recover(recoverEmpty).
- Map(lastUpdated).
- Bind(seekLastUpdated).
- Tee(marshalLastUpdated).
- Finish()
- if err != nil {
- log.Printf("Error converting %s: %v\n", feed.String(), err)
- } else {
- etags = s.(gott.Tuple)[0].(result).etags
- newEtags = s.(gott.Tuple)[0].(result).newEtags
- }
- }
- return saveEtags(cfg, newEtags)
- }
|