convert.go 52 KB


  1. // SPDX-FileCopyrightText: Adam Evyčędo
  2. //
  3. // SPDX-License-Identifier: AGPL-3.0-or-later
  4. package traffic
  5. // TODO(BAF10) direction (0|1) to const (TO|BACK)
  6. // TODO Agency.language, FeedInfo.language -> IETF language tag
  7. // TODO Agency.phoneNumber -> E.123 format
  8. import (
  9. "text/template"
  10. "apiote.xyz/p/szczanieckiej/config"
  11. "apiote.xyz/p/szczanieckiej/file"
  12. "bufio"
  13. "embed"
  14. "encoding/csv"
  15. "errors"
  16. "fmt"
  17. "io"
  18. "io/fs"
  19. "log"
  20. "net/http"
  21. "os"
  22. "path/filepath"
  23. "sort"
  24. "strings"
  25. "syscall"
  26. "time"
  27. "gopkg.in/yaml.v3"
  28. gott2 "apiote.xyz/p/gott/v2"
  29. "git.sr.ht/~sircmpwn/go-bare"
  30. "notabug.org/apiote/gott"
  31. )
  32. //go:embed realtime_lua
  33. var luaScripts embed.FS
  34. type _LineGraph struct {
  35. StopCodesArray []string
  36. StopCodes map[string]int
  37. NextNodes map[int]map[int]struct{}
  38. }
  39. type ErrEmpty struct{}
  40. func (ErrEmpty) Error() string {
  41. return ""
  42. }
  43. type result struct {
  44. config config.Config
  45. pid int
  46. tmpPath string
  47. feed Feed
  48. feedName string
  49. location *time.Location
  50. tmpFeedPath string
  51. homeFeedPath string
  52. downloadedVersions []Version
  53. allVersions []Version
  54. gtfsFilenames []string
  55. missingVersions []Version
  56. updatesFile *os.File
  57. updates map[string]string
  58. etags map[string]string
  59. newEtags map[string]string
  60. feedTranslations embed.FS
  61. }
  62. type feedConverter struct {
  63. TmpFeedPath string
  64. GtfsFilename string
  65. Feed Feed
  66. HomeFeedPath string
  67. feedTranslations embed.FS
  68. config config.Config
  69. Timezone *time.Location
  70. TrafficCalendarFile *os.File
  71. Departures map[string][]Departure
  72. TripsThroughStop map[string]map[string]StopOrder
  73. LineNames map[string]string
  74. TripsOffsets map[string]uint
  75. TripChangeOpts map[string]ChangeOption
  76. StopsCodeIndex CodeIndex
  77. StopsNameIndex map[string][]uint
  78. Stops map[string]string
  79. LineGraphs map[string]map[uint]LineGraph
  80. lineHeadsigns map[string]map[uint][]string
  81. LineIndex map[string][]uint
  82. LineIdIndex CodeIndex
  83. ValidFrom time.Time
  84. ValidFromError []error
  85. ValidTill time.Time
  86. ValidTillError []error
  87. tripHeadsigns map[string]string
  88. stopNames map[string]string
  89. feedInfo FeedInfo
  90. defaultLanguage string
  91. translations map[string]map[string]string
  92. schedules map[string]Schedule
  93. trips map[string]Trip
  94. }
  95. // helper functions
  96. func translateFieldDefault(key, feedLanguage, defaultLanguage string, translations map[string]map[string]string) string {
  97. if feedLanguage == "mul" {
  98. if value, ok := translations[key][defaultLanguage]; !ok {
  99. return key
  100. } else {
  101. return value
  102. }
  103. }
  104. return key
  105. }
  106. func translateField(key, feedLanguage, defaultLanguage string, translations map[string]map[string]string) []Translation {
  107. var result []Translation
  108. if feedLanguage == "mul" {
  109. if value, ok := translations[key][defaultLanguage]; !ok {
  110. result = []Translation{{Language: defaultLanguage, Value: key}}
  111. } else {
  112. result = []Translation{{Language: defaultLanguage, Value: value}}
  113. }
  114. }
  115. for language, value := range translations[key] {
  116. if language == defaultLanguage {
  117. continue
  118. }
  119. result = append(result, Translation{Language: language, Value: value})
  120. }
  121. return result
  122. }
  123. func hex2colour(hex string) Colour {
  124. if hex[0] == '#' {
  125. hex = hex[1:]
  126. }
  127. colour := Colour{
  128. A: 0xff,
  129. }
  130. hexToByte := func(b byte) byte {
  131. switch {
  132. case b >= '0' && b <= '9':
  133. return b - '0'
  134. case b >= 'a' && b <= 'f':
  135. return b - 'a' + 10
  136. case b >= 'A' && b <= 'F':
  137. return b - 'A' + 10
  138. default:
  139. return 0
  140. }
  141. }
  142. switch len(hex) {
  143. case 6:
  144. colour.R = hexToByte(hex[0])<<4 + hexToByte(hex[1])
  145. colour.G = hexToByte(hex[2])<<4 + hexToByte(hex[3])
  146. colour.B = hexToByte(hex[4])<<4 + hexToByte(hex[5])
  147. case 3:
  148. colour.R = hexToByte(hex[0])<<4 + hexToByte(hex[0])
  149. colour.G = hexToByte(hex[1])<<4 + hexToByte(hex[1])
  150. colour.B = hexToByte(hex[2])<<4 + hexToByte(hex[2])
  151. }
  152. return colour
  153. }
  154. func readEtags(cfg config.Config) (map[string]string, error) {
  155. etagsFilename := filepath.Join(cfg.FeedsPath, "etags.bare")
  156. etagsFile, err := os.Open(etagsFilename)
  157. if err != nil {
  158. var pathError *os.PathError
  159. if errors.As(err, &pathError) && errors.Is(pathError, fs.ErrNotExist) {
  160. return map[string]string{}, nil
  161. }
  162. return nil, fmt.Errorf("while opening file: %w", err)
  163. }
  164. defer etagsFile.Close()
  165. var etags map[string]string
  166. err = bare.UnmarshalReader(etagsFile, &etags)
  167. if err != nil {
  168. return nil, fmt.Errorf("while unmarshalling: %w", err)
  169. }
  170. return etags, nil
  171. }
  172. func saveEtags(cfg config.Config, etags map[string]string) error {
  173. etagsFilename := filepath.Join(cfg.FeedsPath, "etags.bare")
  174. etagsFile, err := os.OpenFile(etagsFilename, os.O_RDWR|os.O_CREATE, 0644)
  175. if err != nil {
  176. return fmt.Errorf("while opening: %w", err)
  177. }
  178. defer etagsFile.Close()
  179. bytes, err := bare.Marshal(&etags)
  180. if err != nil {
  181. return fmt.Errorf("while marshalling: %w", err)
  182. }
  183. _, err = etagsFile.Write(bytes)
  184. if err != nil {
  185. return fmt.Errorf("while writing: %w", err)
  186. }
  187. return nil
  188. }
  189. // converting functions
  190. func createTmpPath(input ...interface{}) (interface{}, error) {
  191. args := input[0].(result)
  192. p := filepath.Join(args.tmpPath, args.feedName)
  193. err := os.MkdirAll(p, 0755)
  194. args.tmpFeedPath = p
  195. return gott.Tuple{args}, err
  196. }
  197. func createFeedHome(input ...interface{}) (interface{}, error) {
  198. args := input[0].(result)
  199. p := filepath.Join(args.config.FeedsPath, args.feedName)
  200. err := os.MkdirAll(p, 0755)
  201. args.homeFeedPath = p
  202. return gott.Tuple{args}, err
  203. }
  204. func listDownloadedVersions(input ...interface{}) (interface{}, error) {
  205. args := input[0].(result)
  206. v, err := ListVersionsTimezone(args.config, args.feed, args.feed.getTimezone())
  207. args.downloadedVersions = v
  208. return gott.Tuple{args}, err
  209. }
  210. func getAllVersions(input ...interface{}) (interface{}, error) {
  211. args := input[0].(result)
  212. v, err := args.feed.GetVersions(time.Now().In(args.location), args.location)
  213. args.allVersions = v
  214. return gott.Tuple{args}, err
  215. }
  216. func findValidVersions(input ...interface{}) interface{} {
  217. args := input[0].(result)
  218. now := time.Now().In(args.location)
  219. validVersions := FindValidVersions(args.allVersions, now)
  220. downloadedVersions := map[string]struct{}{}
  221. for _, downloadedVersion := range args.downloadedVersions {
  222. downloadedVersions[downloadedVersion.String()] = struct{}{}
  223. }
  224. missingVersions := []Version{}
  225. for _, version := range validVersions {
  226. if _, ok := downloadedVersions[version.String()]; !ok {
  227. missingVersions = append(missingVersions, version)
  228. }
  229. }
  230. // log.Println("all", args.allVersions)
  231. // log.Println("valid", validVersions)
  232. // log.Println("downloaded", downloadedVersions)
  233. // log.Println("missing", missingVersions)
  234. args.missingVersions = missingVersions
  235. return gott.Tuple{args}
  236. }
  237. func getGtfsFiles(input ...interface{}) (interface{}, error) {
  238. args := input[0].(result)
  239. names := []string{}
  240. for i, version := range args.missingVersions {
  241. name := fmt.Sprintf("%s_%d.zip", version.String(), i)
  242. zipPath := filepath.Join(args.tmpFeedPath, name)
  243. url := version.Link
  244. request, err := http.NewRequest("GET", url, nil)
  245. if err != nil {
  246. return gott.Tuple{args}, fmt.Errorf("while creating request for %s: %w", url, err)
  247. }
  248. request.Header.Add("If-None-Match", args.etags[url])
  249. client := http.Client{} // todo timeout
  250. response, err := client.Do(request)
  251. if err != nil {
  252. return gott.Tuple{args}, fmt.Errorf("while downloading gtfs %s %w", name, err)
  253. }
  254. if response.StatusCode != http.StatusOK && response.StatusCode != http.StatusNotModified {
  255. return gott.Tuple{args}, fmt.Errorf("wrong response code %d for %s: %w", response.StatusCode, url, err)
  256. }
  257. if response.StatusCode == 200 {
  258. args.newEtags[url] = response.Header.Get("etag")
  259. } else {
  260. args.newEtags[url] = args.etags[url]
  261. continue
  262. }
  263. file, err := os.Create(zipPath)
  264. if err != nil {
  265. return gott.Tuple{args}, fmt.Errorf("while creating zip for %s %w", name, err)
  266. }
  267. defer file.Close()
  268. _, err = io.Copy(file, response.Body)
  269. if err != nil {
  270. return gott.Tuple{args}, fmt.Errorf("while copying gtfs %s %w", name, err)
  271. }
  272. names = append(names, name)
  273. }
  274. args.gtfsFilenames = names
  275. return gott.Tuple{args}, nil
  276. }
  277. func unzipGtfs(c feedConverter) error {
  278. return file.UnzipGtfs(c.TmpFeedPath, c.GtfsFilename)
  279. }
  280. func convertVehicles(c feedConverter) error { // ( -- >> vehicles.bare)
  281. result, err := os.Create(filepath.Join(c.TmpFeedPath, "vehicles.bare"))
  282. if err != nil {
  283. return fmt.Errorf("while creating file: %w", err)
  284. }
  285. defer result.Close()
  286. vehicles, err := c.Feed.ConvertVehicles()
  287. for _, vehicle := range vehicles {
  288. bytes, err := bare.Marshal(&vehicle)
  289. if err != nil {
  290. return fmt.Errorf("while marshalling: %w", err)
  291. }
  292. _, err = result.Write(bytes)
  293. if err != nil {
  294. return fmt.Errorf("while writing to file: %w", err)
  295. }
  296. }
  297. return nil
  298. }
  299. func prepareFeedGtfs(c feedConverter) error {
  300. return c.Feed.FeedPrepareZip(c.TmpFeedPath)
  301. }
  302. func createTrafficCalendarFile(c feedConverter) (feedConverter, error) {
  303. path := c.TmpFeedPath
  304. var err error
  305. c.TrafficCalendarFile, err = os.Create(filepath.Join(path, "calendar.bare"))
  306. return c, err
  307. }
  308. func recoverCalendar(c feedConverter, e error) (feedConverter, error) {
  309. var pathError *os.PathError
  310. if errors.As(e, &pathError) && errors.Is(pathError, fs.ErrNotExist) {
  311. return c, nil
  312. }
  313. return c, e
  314. }
  315. func convertCalendar(c feedConverter) (feedConverter, error) { // ( feedInfo -- schedules >> )
  316. c.schedules = map[string]Schedule{}
  317. path := c.TmpFeedPath
  318. calendarFile, err := os.Open(filepath.Join(path, "calendar.txt"))
  319. if err != nil {
  320. return c, fmt.Errorf("while opening file: %w", err)
  321. }
  322. defer calendarFile.Close()
  323. r := csv.NewReader(bufio.NewReader(calendarFile))
  324. header, err := r.Read()
  325. if err != nil {
  326. return c, fmt.Errorf("while reading header: %w", err)
  327. }
  328. fields := map[string]int{}
  329. for i, headerField := range header {
  330. fields[headerField] = i
  331. }
  332. for {
  333. schedule := Schedule{}
  334. record, err := r.Read()
  335. if err == io.EOF {
  336. break
  337. }
  338. if err != nil {
  339. return c, fmt.Errorf("while reading a record: %w", err)
  340. }
  341. schedule.Id = record[fields["service_id"]]
  342. startDate := record[fields["start_date"]]
  343. endDate := record[fields["end_date"]]
  344. schedule.DateRanges = []DateRange{
  345. DateRange{
  346. Start: startDate[:4] + "-" + startDate[4:6] + "-" + startDate[6:],
  347. End: endDate[:4] + "-" + endDate[4:6] + "-" + endDate[6:],
  348. },
  349. }
  350. if record[fields["monday"]] == "1" {
  351. schedule.DateRanges[0].Weekdays |= (1 << 1)
  352. }
  353. if record[fields["tuesday"]] == "1" {
  354. schedule.DateRanges[0].Weekdays |= (1 << 2)
  355. }
  356. if record[fields["wednesday"]] == "1" {
  357. schedule.DateRanges[0].Weekdays |= (1 << 3)
  358. }
  359. if record[fields["thursday"]] == "1" {
  360. schedule.DateRanges[0].Weekdays |= (1 << 4)
  361. }
  362. if record[fields["friday"]] == "1" {
  363. schedule.DateRanges[0].Weekdays |= (1 << 5)
  364. }
  365. if record[fields["saturday"]] == "1" {
  366. schedule.DateRanges[0].Weekdays |= (1 << 6)
  367. }
  368. if record[fields["sunday"]] == "1" {
  369. schedule.DateRanges[0].Weekdays |= (1 << 0)
  370. schedule.DateRanges[0].Weekdays |= (1 << 7)
  371. }
  372. c.schedules[schedule.Id] = schedule
  373. scheduleStart, err := time.ParseInLocation(DateFormat, schedule.DateRanges[0].Start, c.Timezone)
  374. if err != nil {
  375. c.ValidFromError = append(c.ValidFromError, err)
  376. }
  377. if err == nil && (c.ValidFrom.IsZero() || scheduleStart.Before(c.ValidFrom)) {
  378. c.ValidFrom = scheduleStart
  379. c.feedInfo.ValidSince = scheduleStart.Format(ValidityFormat)
  380. }
  381. scheduleEnd, err := time.ParseInLocation(DateFormat, schedule.DateRanges[0].End, c.Timezone)
  382. if err != nil {
  383. c.ValidTillError = append(c.ValidTillError, err)
  384. }
  385. if err == nil && (c.ValidTill.IsZero() || scheduleEnd.After(c.ValidTill)) {
  386. c.ValidTill = scheduleEnd
  387. c.feedInfo.ValidTill = scheduleEnd.Format(ValidityFormat)
  388. }
  389. }
  390. return c, nil
  391. }
  392. func convertCalendarDates(c feedConverter) (feedConverter, error) { // ( feedInfo -- schedules >> )
  393. path := c.TmpFeedPath
  394. datesFile, err := os.Open(filepath.Join(path, "calendar_dates.txt"))
  395. if err != nil {
  396. return c, fmt.Errorf("while opening file: %w", err)
  397. }
  398. defer datesFile.Close()
  399. r := csv.NewReader(bufio.NewReader(datesFile))
  400. header, err := r.Read()
  401. if err != nil {
  402. return c, fmt.Errorf("while reading header: %w", err)
  403. }
  404. fields := map[string]int{}
  405. for i, headerField := range header {
  406. fields[headerField] = i
  407. }
  408. for {
  409. record, err := r.Read()
  410. if err == io.EOF {
  411. break
  412. }
  413. if err != nil {
  414. return c, fmt.Errorf("while reading a record: %w", err)
  415. }
  416. if record[fields["exception_type"]] == "1" {
  417. id := record[fields["service_id"]]
  418. schedule := c.schedules[id]
  419. date := record[fields["date"]]
  420. dateRange := DateRange{
  421. Start: date[:4] + "-" + date[4:6] + "-" + date[6:],
  422. End: date[:4] + "-" + date[4:6] + "-" + date[6:],
  423. Weekdays: 0xff,
  424. }
  425. if len(schedule.DateRanges) == 0 {
  426. schedule.Id = id
  427. schedule.DateRanges = []DateRange{dateRange}
  428. } else {
  429. schedule.DateRanges = append(schedule.DateRanges, dateRange)
  430. sort.Slice(schedule.DateRanges, func(i, j int) bool {
  431. return schedule.DateRanges[i].Start < schedule.DateRanges[j].Start
  432. })
  433. }
  434. c.schedules[schedule.Id] = schedule
  435. } else {
  436. date := record[fields["date"]]
  437. formatedDate := date[:4] + "-" + date[4:6] + "-" + date[6:]
  438. scheduleToEdit := c.schedules[record[fields["service_id"]]]
  439. newDateRanges := []DateRange{}
  440. for i := 0; i < len(scheduleToEdit.DateRanges); i++ {
  441. dateRange := scheduleToEdit.DateRanges[i]
  442. if dateRange.Start == formatedDate {
  443. d, _ := time.ParseInLocation(DateFormat, dateRange.Start, c.Timezone)
  444. dateRange.Start = d.AddDate(0, 0, 1).Format(DateFormat)
  445. if dateRange.Start <= dateRange.End {
  446. newDateRanges = append(newDateRanges, dateRange)
  447. }
  448. continue
  449. }
  450. if dateRange.Start < formatedDate && formatedDate < dateRange.End {
  451. d, _ := time.ParseInLocation(DateFormat, formatedDate, c.Timezone)
  452. range1 := DateRange{dateRange.Start, d.AddDate(0, 0, -1).Format(DateFormat), dateRange.Weekdays}
  453. range2 := DateRange{d.AddDate(0, 0, 1).Format(DateFormat), dateRange.End, dateRange.Weekdays}
  454. newDateRanges = append(newDateRanges, range1)
  455. newDateRanges = append(newDateRanges, range2)
  456. continue
  457. }
  458. if formatedDate == dateRange.End {
  459. d, _ := time.ParseInLocation(DateFormat, dateRange.End, c.Timezone)
  460. dateRange.End = d.AddDate(0, 0, -1).Format(DateFormat)
  461. newDateRanges = append(newDateRanges, dateRange)
  462. continue
  463. }
  464. newDateRanges = append(newDateRanges, dateRange)
  465. }
  466. scheduleToEdit.DateRanges = newDateRanges
  467. c.schedules[record[fields["service_id"]]] = scheduleToEdit
  468. }
  469. }
  470. for _, schedule := range c.schedules {
  471. lastDateRange := len(schedule.DateRanges) - 1
  472. scheduleStart, err := time.ParseInLocation(DateFormat, schedule.DateRanges[0].Start, c.Timezone)
  473. if err != nil {
  474. c.ValidFromError = append(c.ValidFromError, err)
  475. }
  476. if err == nil && (c.ValidFrom.IsZero() || scheduleStart.Before(c.ValidFrom)) {
  477. c.ValidFrom = scheduleStart
  478. c.feedInfo.ValidSince = scheduleStart.Format(ValidityFormat)
  479. }
  480. scheduleEnd, err := time.ParseInLocation(DateFormat, schedule.DateRanges[lastDateRange].End, c.Timezone)
  481. if err != nil {
  482. c.ValidTillError = append(c.ValidTillError, err)
  483. }
  484. if err == nil && (c.ValidTill.IsZero() || scheduleEnd.After(c.ValidTill)) {
  485. c.ValidTill = scheduleEnd
  486. c.feedInfo.ValidTill = scheduleEnd.Format(ValidityFormat)
  487. }
  488. }
  489. return c, nil
  490. }
  491. func checkAnyCalendarConverted(c feedConverter) error {
  492. if len(c.schedules) == 0 {
  493. return fmt.Errorf("no calendar converted")
  494. }
  495. return nil
  496. }
  497. func saveSchedules(c feedConverter) error {
  498. resultFile := c.TrafficCalendarFile
  499. schedulesArray := make([]Schedule, len(c.schedules))
  500. i := 0
  501. for _, schedule := range c.schedules {
  502. schedulesArray[i] = schedule
  503. i++
  504. }
  505. sort.Slice(schedulesArray, func(i, j int) bool {
  506. return schedulesArray[i].DateRanges[0].Start < schedulesArray[j].DateRanges[0].Start
  507. })
  508. for _, schedule := range schedulesArray {
  509. bytes, err := bare.Marshal(&schedule)
  510. if err != nil {
  511. return fmt.Errorf("while marshalling: %w", err)
  512. }
  513. _, err = resultFile.Write(bytes)
  514. if err != nil {
  515. return fmt.Errorf("while writing: %w", err)
  516. }
  517. }
  518. c.schedules = map[string]Schedule{}
  519. return nil
  520. }
  521. func saveFeedInfo(c feedConverter) error {
  522. path := c.TmpFeedPath
  523. result, err := os.Create(filepath.Join(path, "feed_info.bare"))
  524. if err != nil {
  525. return fmt.Errorf("while creating file: %w", err)
  526. }
  527. defer result.Close()
  528. bytes, err := bare.Marshal(&c.feedInfo)
  529. if err != nil {
  530. return fmt.Errorf("while marshalling: %w", err)
  531. }
  532. _, err = result.Write(bytes)
  533. if err != nil {
  534. return fmt.Errorf("while writing: %w", err)
  535. }
  536. log.Printf("timetable is valid: %s to %s\n", c.feedInfo.ValidSince, c.feedInfo.ValidTill)
  537. c.feedInfo = FeedInfo{}
  538. return nil
  539. }
  540. func closeTrafficCalendarFile(c feedConverter, e error) (feedConverter, error) {
  541. if c.TrafficCalendarFile != nil {
  542. c.TrafficCalendarFile.Close()
  543. }
  544. return c, e
  545. }
  546. func clearDepartures(c feedConverter) feedConverter {
  547. c.Departures = map[string][]Departure{}
  548. return c
  549. }
  550. func convertDepartures(c feedConverter) (feedConverter, error) { // O(n:stop_times) ; ( -- departures:map[tripID][]departure, tripsThroughStop:map[stopID][]{tripID,order}, tripHeadsigns:map[tripID]stopID >> )
  551. path := c.TmpFeedPath
  552. file, err := os.Open(filepath.Join(path, "stop_times.txt"))
  553. if err != nil {
  554. return c, fmt.Errorf("while opening file: %w", err)
  555. }
  556. defer file.Close()
  557. departures := map[string][]Departure{}
  558. r := csv.NewReader(bufio.NewReader(file))
  559. header, err := r.Read()
  560. if err != nil {
  561. return c, fmt.Errorf("while reading header: %w", err)
  562. }
  563. fields := map[string]int{}
  564. for i, headerField := range header {
  565. fields[headerField] = i
  566. }
  567. tripsThroughStop := map[string]map[string]StopOrder{}
  568. tripHeadsigns := map[string]string{}
  569. for {
  570. departure := Departure{}
  571. record, err := r.Read()
  572. if err == io.EOF {
  573. break
  574. }
  575. if err != nil {
  576. return c, fmt.Errorf("while reading a record: %w", err)
  577. }
  578. stopID := record[fields["stop_id"]]
  579. tripID := record[fields["trip_id"]]
  580. fmt.Sscanf(record[fields["stop_sequence"]], "%d", &departure.StopSequence)
  581. fmt.Sscanf(record[fields["pickup_type"]], "%d", &departure.Pickup)
  582. fmt.Sscanf(record[fields["drop_off_type"]], "%d", &departure.Dropoff)
  583. if _, ok := tripsThroughStop[stopID]; !ok {
  584. tripsThroughStop[stopID] = map[string]StopOrder{}
  585. }
  586. tripsThroughStop[stopID][tripID] = StopOrder{
  587. Sequence: departure.StopSequence,
  588. }
  589. if c.Feed.Flags().Headsign == HeadsignTripLastStop {
  590. tripHeadsigns[tripID] = stopID
  591. }
  592. var hours, minutes uint
  593. fmt.Sscanf(record[fields["arrival_time"]], "%d:%d", &hours, &minutes)
  594. departure.Time = hours*60 + minutes
  595. departures[tripID] = append(departures[tripID], departure)
  596. }
  597. c.tripHeadsigns = tripHeadsigns
  598. c.Departures = departures
  599. c.TripsThroughStop = tripsThroughStop
  600. return c, nil
  601. }
  602. func clearLineNames(c feedConverter) feedConverter {
  603. c.LineNames = map[string]string{}
  604. return c
  605. }
  606. func getLineNames(c feedConverter) (feedConverter, error) { // O(n:routes) ; ( -- lineNames:map[routeID]lineName >> )
  607. path := c.TmpFeedPath
  608. file, err := os.Open(filepath.Join(path, "routes.txt"))
  609. if err != nil {
  610. return c, fmt.Errorf("while opening file: %w", err)
  611. }
  612. defer file.Close()
  613. r := csv.NewReader(bufio.NewReader(file))
  614. header, err := r.Read()
  615. if err != nil {
  616. return c, fmt.Errorf("while reading header: %w", err)
  617. }
  618. fields := map[string]int{}
  619. for i, headerField := range header {
  620. fields[headerField] = i
  621. }
  622. names := map[string]string{}
  623. for {
  624. record, err := r.Read()
  625. if err == io.EOF {
  626. break
  627. }
  628. if err != nil {
  629. return c, fmt.Errorf("while reading a record: %w", err)
  630. }
  631. routeID := record[fields["route_id"]]
  632. lineName := c.Feed.Flags().LineName
  633. for _, template := range []string{"route_short_name", "route_long_name"} {
  634. lineName = strings.Replace(lineName, "{{"+template+"}}", record[fields[template]], -1)
  635. }
  636. names[routeID] = lineName
  637. }
  638. c.LineNames = names
  639. return c, nil
  640. }
  641. func clearStopNames(c feedConverter) feedConverter {
  642. c.stopNames = map[string]string{}
  643. return c
  644. }
  645. func getStopNames(c feedConverter) (feedConverter, error) { // O(n:stops) ; ( -- stopNames[stopID]stopName >> )
  646. if c.Feed.Flags().Headsign != HeadsignTripLastStop {
  647. return c, nil
  648. }
  649. stopNames := map[string]string{}
  650. path := c.TmpFeedPath
  651. file, err := os.Open(filepath.Join(path, "stops.txt"))
  652. if err != nil {
  653. return c, fmt.Errorf("while opening file: %w", err)
  654. }
  655. defer file.Close()
  656. r := csv.NewReader(bufio.NewReader(file))
  657. header, err := r.Read()
  658. if err != nil {
  659. return c, fmt.Errorf("while reading header: %w", err)
  660. }
  661. fields := map[string]int{}
  662. for i, headerField := range header {
  663. fields[headerField] = i
  664. }
  665. for {
  666. record, err := r.Read()
  667. if err == io.EOF {
  668. break
  669. }
  670. if err != nil {
  671. return c, fmt.Errorf("while reading a record: %w", err)
  672. }
  673. stopID := record[fields["stop_id"]]
  674. stopName := record[fields["stop_name"]]
  675. stopNames[stopID] = stopName
  676. }
  677. c.stopNames = stopNames
  678. return c, nil
  679. }
  680. func clearTripsChangeOptions(c feedConverter) feedConverter {
  681. c.TripChangeOpts = map[string]ChangeOption{}
  682. return c
  683. }
  684. func clearTripsThroughStops(c feedConverter) feedConverter {
  685. c.TripsThroughStop = map[string]map[string]StopOrder{}
  686. return c
  687. }
  688. func convertTrips(c feedConverter) (feedConverter, error) { // O(n:trips) ; (departures, lineNames, stopNames -- tripsOffsets:map[tripID]offset, tripsChangeOpts:map[tripID]{lineID,headsign} >> trips)
  689. path := c.TmpFeedPath
  690. departures := c.Departures
  691. lineNames := c.LineNames
  692. file, err := os.Open(filepath.Join(path, "trips.txt"))
  693. if err != nil {
  694. return c, fmt.Errorf("while opening file: %w", err)
  695. }
  696. defer file.Close()
  697. result, err := os.Create(filepath.Join(path, "trips.bare"))
  698. if err != nil {
  699. return c, fmt.Errorf("while creating file: %w", err)
  700. }
  701. defer result.Close()
  702. r := csv.NewReader(bufio.NewReader(file))
  703. header, err := r.Read()
  704. if err != nil {
  705. return c, fmt.Errorf("while reading header: %w", err)
  706. }
  707. fields := map[string]int{}
  708. for i, headerField := range header {
  709. fields[headerField] = i
  710. }
  711. var offset uint = 0
  712. tripsOffsets := map[string]uint{}
  713. tripChangeOpts := map[string]ChangeOption{}
  714. for {
  715. trip := Trip{}
  716. record, err := r.Read()
  717. if err == io.EOF {
  718. break
  719. }
  720. if err != nil {
  721. return c, fmt.Errorf("while reading a record: %w", err)
  722. }
  723. trip.Id = record[fields["trip_id"]]
  724. switch c.Feed.Flags().Headsign {
  725. case HeadsignTripHeadsing:
  726. trip.Headsign = record[fields["trip_headsign"]]
  727. case HeadsignTripLastStop:
  728. trip.Headsign = c.stopNames[c.tripHeadsigns[trip.Id]]
  729. }
  730. trip.Departures = departures[trip.Id]
  731. trip.ScheduleID = record[fields["service_id"]]
  732. trip.LineID = record[fields["route_id"]]
  733. fmt.Sscanf(record[fields["direction_id"]], "%d", &trip.Direction)
  734. tripChangeOpts[trip.Id] = ChangeOption{
  735. LineName: lineNames[record[fields["route_id"]]],
  736. Headsign: translateFieldDefault(trip.Headsign, c.feedInfo.Language, c.defaultLanguage, c.translations),
  737. TranslatedHeadsigns: translateField(trip.Headsign, c.feedInfo.Language, c.defaultLanguage, c.translations),
  738. }
  739. bytes, err := bare.Marshal(&trip)
  740. if err != nil {
  741. return c, fmt.Errorf("while marshalling: %w", err)
  742. }
  743. b, err := result.Write(bytes)
  744. if err != nil {
  745. return c, fmt.Errorf("while writing: %w", err)
  746. }
  747. tripsOffsets[trip.Id] = offset
  748. offset += uint(b)
  749. }
  750. c.TripsOffsets = tripsOffsets
  751. c.TripChangeOpts = tripChangeOpts
  752. return c, nil
  753. }
  754. func clearStops(c feedConverter) feedConverter {
  755. c.Stops = map[string]string{}
  756. return c
  757. }
  758. func convertStops(c feedConverter) (feedConverter, error) { // O(n:stops) ; (translations, tripsThroughStop, tripChangeOpts, tripOffsets -- stopsOffsetsByCode:CodeIndex, stopsOffsetsByName:map[name][]offsets >> stops)
  759. path := c.TmpFeedPath
  760. tripsThroughStop := c.TripsThroughStop
  761. tripChangeOpts := c.TripChangeOpts
  762. tripsOffsets := c.TripsOffsets
  763. file, err := os.Open(filepath.Join(path, "stops.txt"))
  764. if err != nil {
  765. return c, fmt.Errorf("while opening file: %w", err)
  766. }
  767. defer file.Close()
  768. result, err := os.Create(filepath.Join(path, "stops.bare"))
  769. if err != nil {
  770. return c, fmt.Errorf("while creating file: %w", err)
  771. }
  772. defer result.Close()
  773. r := csv.NewReader(bufio.NewReader(file))
  774. header, err := r.Read()
  775. if err != nil {
  776. return c, fmt.Errorf("while reading header: %w", err)
  777. }
  778. fields := map[string]int{}
  779. for i, headerField := range header {
  780. fields[headerField] = i
  781. }
  782. var offset uint = 0
  783. stopsOffsetsByName := map[string][]uint{}
  784. stopsOffsetsByCode := CodeIndex{}
  785. stops := map[string]string{}
  786. maxStopTripsLength := 0
  787. for {
  788. stop := Stop{}
  789. record, err := r.Read()
  790. if err == io.EOF {
  791. break
  792. }
  793. if err != nil {
  794. return c, fmt.Errorf("while reading a record: %w", err)
  795. }
  796. if f, ok := fields["location_type"]; ok && record[f] != "" && record[f] != "0" {
  797. // NOTE for now ignore everything that’s not a stop/platform
  798. // TODO use Portals (location_type == 2) to show on map if platform has a parent (location_type == 1) that has a Portal
  799. // TODO use location_type in {3,4} for routing inside stations (with pathways, transfers, and levels)
  800. continue
  801. }
  802. stopID := record[fields["stop_id"]]
  803. stopTrips := tripsThroughStop[stopID]
  804. stopTripsLength := len(stopTrips)
  805. if maxStopTripsLength < stopTripsLength {
  806. maxStopTripsLength = stopTripsLength
  807. }
  808. stop.Id = stopID
  809. templates := []string{"stop_code", "stop_id", "stop_name", "platform_code"}
  810. stop.Code = c.Feed.Flags().StopIdFormat
  811. for _, template := range templates {
  812. stop.Code = strings.Replace(stop.Code, "{{"+template+"}}", record[fields[template]], -1)
  813. }
  814. stop.Name = c.Feed.Flags().StopName
  815. for _, template := range templates {
  816. // TODO if '{{template}}' is empty
  817. stop.Name = strings.Replace(stop.Name, "{{"+template+"}}", record[fields[template]], -1)
  818. }
  819. if field, ok := fields["zone_id"]; ok {
  820. stop.Zone = record[field]
  821. }
  822. stop.NodeName = record[fields["stop_name"]]
  823. stops[record[fields["stop_id"]]] = stop.Code
  824. if field, ok := fields["stop_timezone"]; ok {
  825. stop.Timezone = record[field]
  826. }
  827. if c.feedInfo.Language == "mul" {
  828. key := record[fields["stop_name"]]
  829. if _, ok := c.translations[stop.NodeName][c.defaultLanguage]; !ok {
  830. stop.TranslatedNames = []Translation{{Language: c.defaultLanguage, Value: stop.Name}}
  831. stop.TranslatedNodeNames = []Translation{{Language: c.defaultLanguage, Value: stop.NodeName}}
  832. } else {
  833. stop.TranslatedNames = []Translation{{Language: c.defaultLanguage, Value: strings.ReplaceAll(stop.Name, key, c.translations[key][c.defaultLanguage])}}
  834. stop.TranslatedNodeNames = []Translation{{Language: c.defaultLanguage, Value: c.translations[key][c.defaultLanguage]}}
  835. }
  836. for language, value := range c.translations[key] {
  837. if language == c.defaultLanguage {
  838. continue
  839. }
  840. stop.TranslatedNames = append(stop.TranslatedNames, Translation{Language: c.defaultLanguage, Value: strings.ReplaceAll(stop.Name, key, value)})
  841. stop.TranslatedNodeNames = append(stop.TranslatedNodeNames, Translation{Language: c.defaultLanguage, Value: c.translations[key][value]})
  842. }
  843. }
  844. var lat, lon float64
  845. fmt.Sscanf(record[fields["stop_lat"]], "%f", &lat)
  846. fmt.Sscanf(record[fields["stop_lon"]], "%f", &lon)
  847. stop.Position = Position{lat, lon}
  848. changeOptionMap := map[string]ChangeOption{}
  849. stop.ChangeOptions = []ChangeOption{}
  850. stop.Order = map[string]StopOrder{}
  851. for tripID, stopTrip := range stopTrips {
  852. changeOption := tripChangeOpts[tripID]
  853. stopOrder := StopOrder{
  854. TripOffset: tripsOffsets[tripID],
  855. Sequence: stopTrip.Sequence,
  856. }
  857. stop.Order[tripID] = stopOrder
  858. changeOptionMap[changeOption.LineName+"->"+changeOption.Headsign] = changeOption
  859. }
  860. for _, option := range changeOptionMap {
  861. stop.ChangeOptions = append(stop.ChangeOptions, option)
  862. }
  863. sort.Slice(stop.ChangeOptions, func(i, j int) bool {
  864. var num1, num2 int
  865. _, err1 := fmt.Sscanf(stop.ChangeOptions[i].LineName, "%d", &num1)
  866. _, err2 := fmt.Sscanf(stop.ChangeOptions[j].LineName, "%d", &num2)
  867. if err1 != nil && err2 != nil {
  868. return stop.ChangeOptions[i].LineName < stop.ChangeOptions[j].LineName
  869. } else if err1 != nil {
  870. return false
  871. } else if err2 != nil {
  872. return true
  873. } else {
  874. return num1 < num2
  875. }
  876. })
  877. bytes, err := bare.Marshal(&stop)
  878. if err != nil {
  879. return c, fmt.Errorf("while marshalling: %w", err)
  880. }
  881. b, err := result.Write(bytes)
  882. if err != nil {
  883. return c, fmt.Errorf("while writing: %w", err)
  884. }
  885. if len(stop.TranslatedNames) == 0 {
  886. stopsOffsetsByName[stop.Name] = append(stopsOffsetsByName[stop.Name], offset)
  887. }
  888. for _, v := range stop.TranslatedNames {
  889. stopsOffsetsByName[v.Value] = append(stopsOffsetsByName[v.Value], offset)
  890. }
  891. stopsOffsetsByCode[stop.Code] = offset
  892. offset += uint(b)
  893. }
  894. if maxStopTripsLength > 8192 {
  895. log.Printf("maximum length of StopOrder is %d, more than 8192, which may need to be tweaked", maxStopTripsLength)
  896. }
  897. c.StopsCodeIndex = stopsOffsetsByCode
  898. c.StopsNameIndex = stopsOffsetsByName
  899. c.Stops = stops
  900. return c, nil
  901. }
  902. func clearTripOffsets(c feedConverter) feedConverter {
  903. c.TripsOffsets = map[string]uint{}
  904. return c
  905. }
  906. func clearLineGraphs(c feedConverter) feedConverter {
  907. c.LineGraphs = map[string]map[uint]LineGraph{}
  908. return c
  909. }
  910. func clearLineHeadsigns(c feedConverter) feedConverter {
  911. c.lineHeadsigns = map[string]map[uint][]string{}
  912. return c
  913. }
  914. func getTrips(c feedConverter) (feedConverter, error) {
  915. file, err := os.Open(filepath.Join(c.TmpFeedPath, "trips.bare"))
  916. if err != nil {
  917. return c, fmt.Errorf("while opening trips: %w", err)
  918. }
  919. trips := map[string]Trip{}
  920. for {
  921. var trip Trip
  922. err := bare.UnmarshalReader(file, &trip)
  923. trip.Departures = []Departure{}
  924. trips[trip.Id] = trip
  925. if err != nil {
  926. if err == io.EOF {
  927. break
  928. } else {
  929. return c, fmt.Errorf("while unmarshaling: %w", err)
  930. }
  931. }
  932. }
  933. c.trips = trips
  934. return c, nil
  935. }
  936. func convertLineGraphs(c feedConverter) (feedConverter, error) { // O(n:stop_times) ; (trips, stops -- lineGrapsh:map[lineID]map[direction]graph, lineHeadsigns:map[lineID]map[direction][]headsigns >> )
  937. path := c.TmpFeedPath
  938. trips := c.trips
  939. stops := c.Stops
  940. // lineID dire headsi
  941. lineHeadsignsMap := map[string]map[uint]map[string]struct{}{}
  942. // lineID dire headsi
  943. lineHeadsigns := map[string]map[uint][]string{}
  944. // lineNa dire
  945. graphs := map[string]map[uint]_LineGraph{}
  946. file, err := os.Open(filepath.Join(path, "stop_times.txt"))
  947. if err != nil {
  948. return c, fmt.Errorf("while opening stop_times: %w", err)
  949. }
  950. defer file.Close()
  951. r := csv.NewReader(bufio.NewReader(file))
  952. header, err := r.Read()
  953. if err != nil {
  954. return c, fmt.Errorf("while reading header: %w", err)
  955. }
  956. fields := map[string]int{}
  957. for i, headerField := range header {
  958. fields[headerField] = i
  959. }
  960. previousTripID := ""
  961. previous := -1
  962. previousTrip := Trip{}
  963. for {
  964. record, err := r.Read()
  965. if err == io.EOF {
  966. break
  967. }
  968. if err != nil {
  969. return c, fmt.Errorf("while reading a record: %w", err)
  970. }
  971. tripID := record[fields["trip_id"]]
  972. stop := stops[record[fields["stop_id"]]]
  973. trip := trips[tripID]
  974. if _, ok := lineHeadsignsMap[trip.LineID]; !ok {
  975. lineHeadsignsMap[trip.LineID] = map[uint]map[string]struct{}{}
  976. lineHeadsigns[trip.LineID] = map[uint][]string{}
  977. }
  978. if _, ok := lineHeadsignsMap[trip.LineID][trip.Direction.Value()]; !ok {
  979. lineHeadsignsMap[trip.LineID][trip.Direction.Value()] = map[string]struct{}{}
  980. lineHeadsigns[trip.LineID][trip.Direction.Value()] = []string{}
  981. }
  982. lineHeadsignsMap[trip.LineID][trip.Direction.Value()][trip.Headsign] = struct{}{}
  983. if _, ok := graphs[trip.LineID]; !ok {
  984. graphs[trip.LineID] = map[uint]_LineGraph{}
  985. }
  986. if previousTripID != tripID && previousTripID != "" {
  987. // last of previous trip
  988. graph := graphs[previousTrip.LineID][previousTrip.Direction.Value()]
  989. if graph.NextNodes == nil {
  990. graph.NextNodes = map[int]map[int]struct{}{}
  991. }
  992. if graph.NextNodes[previous] == nil {
  993. graph.NextNodes[previous] = map[int]struct{}{}
  994. }
  995. graphs[previousTrip.LineID][previousTrip.Direction.Value()] = graph
  996. graphs[previousTrip.LineID][previousTrip.Direction.Value()].NextNodes[previous][-1] = struct{}{}
  997. }
  998. graph := graphs[trip.LineID][trip.Direction.Value()]
  999. if graph.StopCodes == nil {
  1000. graph.StopCodes = map[string]int{}
  1001. }
  1002. if graph.NextNodes == nil {
  1003. graph.NextNodes = map[int]map[int]struct{}{}
  1004. }
  1005. current := -1
  1006. current, ok := graph.StopCodes[stop]
  1007. if !ok {
  1008. current = len(graph.StopCodesArray)
  1009. graph.StopCodesArray = append(graph.StopCodesArray, stop)
  1010. graph.StopCodes[stop] = current
  1011. }
  1012. if previousTripID != tripID {
  1013. // first of current trip
  1014. if graph.NextNodes[-1] == nil {
  1015. graph.NextNodes[-1] = map[int]struct{}{}
  1016. }
  1017. if _, ok := graph.NextNodes[-1][current]; !ok {
  1018. graph.NextNodes[-1][current] = struct{}{}
  1019. }
  1020. } else {
  1021. // second <- first to last <- penultimate of current trip
  1022. if graph.NextNodes[previous] == nil {
  1023. graph.NextNodes[previous] = map[int]struct{}{}
  1024. }
  1025. if _, ok := graph.NextNodes[previous][current]; !ok {
  1026. graph.NextNodes[previous][current] = struct{}{}
  1027. }
  1028. }
  1029. previous = current
  1030. previousTripID = tripID
  1031. previousTrip = trip
  1032. graphs[trip.LineID][trip.Direction.Value()] = graph
  1033. }
  1034. g := graphs[previousTrip.LineID][previousTrip.Direction.Value()]
  1035. if g.NextNodes[previous] == nil {
  1036. g.NextNodes[previous] = map[int]struct{}{}
  1037. }
  1038. if _, ok := g.NextNodes[previous][-1]; !ok {
  1039. g.NextNodes[previous][-1] = struct{}{}
  1040. }
  1041. for lineID, directions := range lineHeadsignsMap {
  1042. for direction, headsigns := range directions {
  1043. for headsign := range headsigns {
  1044. lineHeadsigns[lineID][direction] = append(lineHeadsigns[lineID][direction], headsign)
  1045. }
  1046. }
  1047. }
  1048. c.LineGraphs = map[string]map[uint]LineGraph{}
  1049. for lineID, graphByDirection := range graphs {
  1050. c.LineGraphs[lineID] = map[uint]LineGraph{}
  1051. for direction, graph := range graphByDirection {
  1052. c.LineGraphs[lineID][direction] = LineGraph{
  1053. StopCodes: graph.StopCodesArray,
  1054. NextNodes: map[int][]int{},
  1055. }
  1056. for from, tos := range graph.NextNodes {
  1057. for to := range tos {
  1058. c.LineGraphs[lineID][direction].NextNodes[from] = append(c.LineGraphs[lineID][direction].NextNodes[from], to)
  1059. }
  1060. }
  1061. }
  1062. }
  1063. c.lineHeadsigns = lineHeadsigns
  1064. return c, nil
  1065. }
  1066. func convertLines(c feedConverter) (feedConverter, error) { // O(n:routes) ; (lineGraphs, lineHeadsigns -- lineIndex:map[lineName][]offsets, lineIdIndex:CodeIndex >> lines)
  1067. path := c.TmpFeedPath
  1068. feed := c.Feed
  1069. file, err := os.Open(filepath.Join(path, "routes.txt"))
  1070. if err != nil {
  1071. return c, fmt.Errorf("while opening file: %w", err)
  1072. }
  1073. defer file.Close()
  1074. result, err := os.Create(filepath.Join(path, "lines.bare"))
  1075. if err != nil {
  1076. return c, fmt.Errorf("while creating file: %w", err)
  1077. }
  1078. defer result.Close()
  1079. r := csv.NewReader(bufio.NewReader(file))
  1080. header, err := r.Read()
  1081. if err != nil {
  1082. return c, fmt.Errorf("while reading header: %w", err)
  1083. }
  1084. fields := map[string]int{}
  1085. for i, headerField := range header {
  1086. fields[headerField] = i
  1087. }
  1088. var offset uint = 0
  1089. index := map[string][]uint{}
  1090. idIndex := CodeIndex{}
  1091. for {
  1092. record, err := r.Read()
  1093. if err == io.EOF {
  1094. break
  1095. }
  1096. if err != nil {
  1097. return c, fmt.Errorf("while reading a record: %w", err)
  1098. }
  1099. routeID := record[fields["route_id"]]
  1100. lineName := c.Feed.Flags().LineName
  1101. for _, template := range []string{"route_short_name", "route_long_name"} {
  1102. lineName = strings.Replace(lineName, "{{"+template+"}}", record[fields[template]], -1)
  1103. }
  1104. var kind uint
  1105. fmt.Sscanf(record[fields["route_type"]], "%d", &kind)
  1106. colour := "ffffff"
  1107. if colourIx, ok := fields["route_color"]; ok && record[colourIx] != "" {
  1108. colour = record[colourIx]
  1109. }
  1110. directions := []uint{}
  1111. for direction := range c.lineHeadsigns[routeID] {
  1112. directions = append(directions, direction)
  1113. }
  1114. sort.Slice(directions, func(i, j int) bool {
  1115. return directions[i] < directions[j]
  1116. })
  1117. headsigns := [][]string{}
  1118. translatedHeadsigns := [][][]Translation{}
  1119. for _, direction := range directions {
  1120. dirHeadsigns := c.lineHeadsigns[routeID][direction]
  1121. headsigns = append(headsigns, dirHeadsigns)
  1122. translatedHeadsign := [][]Translation{}
  1123. for _, headsign := range dirHeadsigns {
  1124. translatedHeadsign = append(translatedHeadsign, translateField(headsign, c.feedInfo.Language, c.defaultLanguage, c.translations))
  1125. }
  1126. translatedHeadsigns = append(translatedHeadsigns, translatedHeadsign)
  1127. }
  1128. graphs := []LineGraph{}
  1129. for _, direction := range directions {
  1130. graphs = append(graphs, c.LineGraphs[routeID][direction])
  1131. }
  1132. line := Line{
  1133. Id: routeID,
  1134. Name: lineName,
  1135. Colour: hex2colour(colour),
  1136. Kind: LineType(kind),
  1137. Graphs: graphs,
  1138. Headsigns: headsigns,
  1139. }
  1140. if field, present := fields["agency_id"]; present {
  1141. line.AgencyID = record[field]
  1142. }
  1143. bytes, err := bare.Marshal(&line)
  1144. if err != nil {
  1145. return c, fmt.Errorf("while marshalling: %w", err)
  1146. }
  1147. b, err := result.Write(bytes)
  1148. if err != nil {
  1149. return c, fmt.Errorf("while writing: %w", err)
  1150. }
  1151. cleanQuery, err := CleanQuery(line.Name, feed)
  1152. if err != nil {
  1153. return c, fmt.Errorf("while cleaning line name: %w", err)
  1154. }
  1155. index[cleanQuery] = append(index[cleanQuery], offset)
  1156. idIndex[routeID] = offset
  1157. offset += uint(b)
  1158. }
  1159. c.LineIdIndex = idIndex
  1160. c.LineIndex = index
  1161. return c, nil
  1162. }
  1163. func convertFeedInfo(c feedConverter) (feedConverter, error) { // O(1:feed_info) ; ( -- feed_info >> )
  1164. path := c.TmpFeedPath
  1165. feedInfo := FeedInfo{}
  1166. file, err := os.Open(filepath.Join(path, "feed_info.txt"))
  1167. if err != nil {
  1168. if errors.Is(err, fs.ErrNotExist) {
  1169. log.Println("[WARN] no feed_info.txt")
  1170. file = nil
  1171. } else {
  1172. return c, fmt.Errorf("while opening file: %w", err)
  1173. }
  1174. }
  1175. if file != nil {
  1176. defer file.Close()
  1177. r := csv.NewReader(bufio.NewReader(file))
  1178. header, err := r.Read()
  1179. if err != nil {
  1180. return c, fmt.Errorf("while reading header: %w", err)
  1181. }
  1182. fields := map[string]int{}
  1183. for i, headerField := range header {
  1184. fields[headerField] = i
  1185. }
  1186. record, err := r.Read()
  1187. if err != nil {
  1188. return c, fmt.Errorf("while reading a record: %w", err)
  1189. }
  1190. feedInfo.Website = record[fields["feed_publisher_url"]]
  1191. feedInfo.Language = record[fields["feed_lang"]]
  1192. if defaultLanguageIndex, ok := fields["default_lang"]; ok {
  1193. c.defaultLanguage = record[defaultLanguageIndex]
  1194. }
  1195. if ix, ok := fields["feed_start_date"]; ok {
  1196. c.ValidFrom, err = time.ParseInLocation("20060102", record[ix], c.Timezone)
  1197. if err != nil {
  1198. c.ValidFromError = append(c.ValidFromError, err)
  1199. }
  1200. feedInfo.ValidSince = record[ix]
  1201. }
  1202. if ix, ok := fields["feed_end_date"]; ok {
  1203. c.ValidTill, err = time.ParseInLocation("20060102", record[ix], c.Timezone)
  1204. if err != nil {
  1205. c.ValidTillError = append(c.ValidTillError, err)
  1206. }
  1207. feedInfo.ValidTill = record[ix]
  1208. }
  1209. }
  1210. feedInfo.Timezone = c.Timezone.String()
  1211. feedInfo.RealtimeFeeds = c.Feed.RealtimeFeeds()
  1212. feedInfo.QrHost, feedInfo.QrLocation, feedInfo.QrSelector = c.Feed.QRInfo()
  1213. feedInfo.Attributions, feedInfo.Descriptions, err = getAttrDesc(c.Feed.String(), c.feedTranslations)
  1214. feedInfo.Name = c.Feed.Name()
  1215. c.feedInfo = feedInfo
  1216. return c, err
  1217. }
  1218. func convertLuaScripts(c feedConverter) error { // O(1) ; ( -- >> updates.lua, alerts.lua, vehicles.lua )
  1219. filenames := []string{"updates", "vehicles", "alerts"}
  1220. for _, filename := range filenames {
  1221. t, err := template.ParseFS(luaScripts, "realtime_lua/"+c.Feed.String()+"_"+filename+".lua")
  1222. if err != nil {
  1223. if strings.Contains(err.Error(), "pattern matches no files") {
  1224. log.Printf("%s.lua for this feed does not exist, ignoring\n", filename)
  1225. continue
  1226. }
  1227. return fmt.Errorf("while parsing template %s: %w", filename, err)
  1228. }
  1229. path := c.TmpFeedPath
  1230. writeFile, err := os.Create(filepath.Join(path, filename+".lua"))
  1231. if err != nil {
  1232. return fmt.Errorf("while creating %s: %w", filename, err)
  1233. }
  1234. defer writeFile.Close()
  1235. err = t.Execute(writeFile, c.config.Auth[c.Feed.String()])
  1236. if err != nil {
  1237. return fmt.Errorf("while executing template %s: %w", filename, err)
  1238. }
  1239. }
  1240. return nil
  1241. }
  1242. func getAttrDesc(feedID string, feedTranslations embed.FS) (map[string]string, map[string]string, error) {
  1243. attributions := map[string]string{}
  1244. descriptions := map[string]string{}
  1245. dir, err := feedTranslations.ReadDir("translations")
  1246. if err != nil {
  1247. return attributions, descriptions, err
  1248. }
  1249. for _, f := range dir {
  1250. translation := map[string]string{}
  1251. name := f.Name()
  1252. lang := strings.Split(name, ".")[1]
  1253. fileContent, err := feedTranslations.ReadFile("translations/" + name)
  1254. if err != nil {
  1255. log.Printf("error reading translation %s\n", name)
  1256. continue
  1257. }
  1258. yaml.Unmarshal(fileContent, &translation)
  1259. attributions[lang] = translation[feedID+"_attribution"]
  1260. descriptions[lang] = translation[feedID+"_description"]
  1261. if lang == "en" {
  1262. attributions["und"] = translation[feedID+"_attribution"]
  1263. descriptions["und"] = translation[feedID+"_description"]
  1264. }
  1265. }
  1266. return attributions, descriptions, nil
  1267. }
  1268. func readTranslations(c feedConverter) (feedConverter, error) { // O(n:translations) ; ( -- translations >>)
  1269. path := c.TmpFeedPath
  1270. file, err := os.Open(filepath.Join(path, "translations.txt"))
  1271. if err != nil {
  1272. return c, fmt.Errorf("while opening file: %w", err)
  1273. }
  1274. defer file.Close()
  1275. translations := map[string]map[string]string{}
  1276. r := csv.NewReader(bufio.NewReader(file))
  1277. header, err := r.Read()
  1278. if err != nil {
  1279. return c, fmt.Errorf("while reading header: %w", err)
  1280. }
  1281. fields := map[string]int{}
  1282. for i, headerField := range header {
  1283. fields[headerField] = i
  1284. }
  1285. for {
  1286. record, err := r.Read()
  1287. if err == io.EOF {
  1288. break
  1289. }
  1290. if err != nil {
  1291. return c, fmt.Errorf("while reading a record: %w", err)
  1292. }
  1293. key := record[fields["field_value"]]
  1294. language := record[fields["language"]]
  1295. translation := record[fields["translation"]]
  1296. if _, ok := translations[key]; !ok {
  1297. translations[key] = map[string]string{}
  1298. }
  1299. translations[key][language] = translation
  1300. }
  1301. c.translations = translations
  1302. return c, nil
  1303. }
  1304. func recoverTranslations(c feedConverter, e error) (feedConverter, error) {
  1305. var pathError *os.PathError
  1306. if errors.As(e, &pathError) && errors.Is(pathError, fs.ErrNotExist) {
  1307. return c, nil
  1308. }
  1309. return c, e
  1310. }
  1311. func convertAgencies(c feedConverter) (feedConverter, error) { // O(n:agency) ; ( -- >> agencies)
  1312. path := c.TmpFeedPath
  1313. file, err := os.Open(filepath.Join(path, "agency.txt"))
  1314. if err != nil {
  1315. return c, fmt.Errorf("while opening file: %w", err)
  1316. }
  1317. defer file.Close()
  1318. result, err := os.Create(filepath.Join(path, "agencies.bare"))
  1319. if err != nil {
  1320. return c, fmt.Errorf("while creating file: %w", err)
  1321. }
  1322. defer file.Close()
  1323. r := csv.NewReader(bufio.NewReader(file))
  1324. header, err := r.Read()
  1325. if err != nil {
  1326. return c, fmt.Errorf("while reading header: %w", err)
  1327. }
  1328. fields := map[string]int{}
  1329. for i, headerField := range header {
  1330. fields[headerField] = i
  1331. }
  1332. for {
  1333. record, err := r.Read()
  1334. if err == io.EOF {
  1335. break
  1336. }
  1337. if err != nil {
  1338. return c, fmt.Errorf("while reading a record: %w", err)
  1339. }
  1340. agency := Agency{
  1341. Id: record[fields["agency_id"]],
  1342. Name: record[fields["agency_name"]],
  1343. TranslatedNames: translateField(record[fields["agency_name"]], c.feedInfo.Language, c.defaultLanguage, c.translations),
  1344. Website: record[fields["agency_url"]],
  1345. TranslatedWebsites: translateField(record[fields["agency_url"]], c.feedInfo.Language, c.defaultLanguage, c.translations),
  1346. Timezone: record[fields["agency_timezone"]],
  1347. }
  1348. c.Timezone, _ = time.LoadLocation(agency.Timezone)
  1349. if field, present := fields["agency_lang"]; present {
  1350. agency.Language = record[field]
  1351. }
  1352. if field, present := fields["agency_phone"]; present {
  1353. agency.PhoneNumber = record[field]
  1354. agency.TranslatedPhoneNumbers = translateField(record[field], c.feedInfo.Language, c.defaultLanguage, c.translations)
  1355. }
  1356. if field, present := fields["agency_fare_url"]; present {
  1357. agency.FareWebsite = record[field]
  1358. agency.TranslatedFareWebsites = translateField(record[field], c.feedInfo.Language, c.defaultLanguage, c.translations)
  1359. }
  1360. if field, present := fields["agency_email"]; present {
  1361. agency.Email = record[field]
  1362. agency.TranslatedEmails = translateField(record[field], c.feedInfo.Language, c.defaultLanguage, c.translations)
  1363. }
  1364. bytes, err := bare.Marshal(&agency)
  1365. if err != nil {
  1366. return c, fmt.Errorf("while marshalling: %w", err)
  1367. }
  1368. _, err = result.Write(bytes)
  1369. if err != nil {
  1370. return c, fmt.Errorf("while writing: %w", err)
  1371. }
  1372. }
  1373. return c, nil
  1374. }
  1375. func writeNameIndex(c feedConverter, index map[string][]uint, filename string, raw bool) error {
  1376. path := c.TmpFeedPath
  1377. feed := c.Feed
  1378. result, err := os.Create(filepath.Join(path, filename))
  1379. if err != nil {
  1380. return fmt.Errorf("while creating file: %w", err)
  1381. }
  1382. defer result.Close()
  1383. for name, offsets := range index {
  1384. cleanQuery := name
  1385. if !raw {
  1386. cleanQuery, err = CleanQuery(name, feed)
  1387. if err != nil {
  1388. return fmt.Errorf("while cleaning name %s: %w", name, err)
  1389. }
  1390. }
  1391. stopOffset := NameOffset{
  1392. Name: cleanQuery,
  1393. Offsets: offsets,
  1394. }
  1395. bytes, err := bare.Marshal(&stopOffset)
  1396. if err != nil {
  1397. return fmt.Errorf("while marshalling: %w", err)
  1398. }
  1399. _, err = result.Write(bytes)
  1400. if err != nil {
  1401. return fmt.Errorf("while writing: %w", err)
  1402. }
  1403. }
  1404. return nil
  1405. }
  1406. func writeStopNameIndex(c feedConverter) error {
  1407. err := writeNameIndex(c, c.StopsNameIndex, "ix_stop_names.bare", false)
  1408. c.StopsNameIndex = map[string][]uint{}
  1409. return err
  1410. }
  1411. func writeLineIndex(c feedConverter) error {
  1412. err := writeNameIndex(c, c.LineIndex, "ix_lines.bare", false)
  1413. c.LineIndex = map[string][]uint{}
  1414. return err
  1415. }
  1416. func writeLineIdIndex(c feedConverter) error {
  1417. err := writeCodeIndex(c, c.LineIdIndex, "ix_line_codes.bare")
  1418. c.LineIndex = map[string][]uint{}
  1419. return err
  1420. }
  1421. func writeTripIndex(c feedConverter) error {
  1422. tripIndex := map[string][]uint{}
  1423. for trip, offset := range c.TripsOffsets {
  1424. tripIndex[trip] = []uint{offset}
  1425. }
  1426. err := writeNameIndex(c, tripIndex, "ix_trips.bare", true)
  1427. c.TripsOffsets = map[string]uint{}
  1428. return err
  1429. }
  1430. func writeStopCodeIndex(c feedConverter) error {
  1431. err := writeCodeIndex(c, c.StopsCodeIndex, "ix_stop_codes.bare")
  1432. c.StopsCodeIndex = CodeIndex{}
  1433. return err
  1434. }
  1435. func writeCodeIndex(c feedConverter, i CodeIndex, filename string) error {
  1436. path := c.TmpFeedPath
  1437. result, err := os.Create(filepath.Join(path, filename))
  1438. if err != nil {
  1439. return fmt.Errorf("while creating file: %w", err)
  1440. }
  1441. defer result.Close()
  1442. bytes, err := bare.Marshal(&i)
  1443. if err != nil {
  1444. return fmt.Errorf("while marshalling: %w", err)
  1445. }
  1446. _, err = result.Write(bytes)
  1447. if err != nil {
  1448. return fmt.Errorf("while writing: %w", err)
  1449. }
  1450. return nil
  1451. }
  1452. func deleteTxtFiles(c feedConverter) error {
  1453. return nil
  1454. return file.DeleteTxtFiles(c.TmpFeedPath, c.GtfsFilename)
  1455. }
  1456. func compressTraffic(c feedConverter) error {
  1457. return file.CompressBare(c.TmpFeedPath, c.GtfsFilename)
  1458. }
  1459. func deleteBareFiles(c feedConverter) error {
  1460. return file.DeleteBareFiles(c.TmpFeedPath)
  1461. }
  1462. func moveTraffic(c feedConverter) error {
  1463. if err := append(c.ValidFromError, c.ValidTillError...); len(err) != 0 {
  1464. return errors.Join(err...)
  1465. }
  1466. return file.MoveTraffic(c.GtfsFilename, c.ValidFrom.Format("20060102")+"_"+c.ValidTill.Format("20060102")+".txz", c.TmpFeedPath, c.HomeFeedPath)
  1467. }
  1468. func convert(input ...interface{}) (interface{}, error) {
  1469. allErrors := []error{}
  1470. args := input[0].(result)
  1471. for _, gtfsFile := range args.gtfsFilenames {
  1472. log.Printf("converting feed %s/%s\n", args.feed.Name(), gtfsFile)
  1473. r := gott2.R[feedConverter]{
  1474. S: feedConverter{
  1475. TmpFeedPath: args.tmpFeedPath,
  1476. GtfsFilename: gtfsFile,
  1477. Feed: args.feed,
  1478. HomeFeedPath: args.homeFeedPath,
  1479. feedTranslations: args.feedTranslations,
  1480. config: args.config,
  1481. },
  1482. LogLevel: gott2.Debug,
  1483. }
  1484. r = r.
  1485. Tee(unzipGtfs).
  1486. Tee(prepareFeedGtfs).
  1487. Tee(convertVehicles).
  1488. Bind(convertAgencies).
  1489. Bind(convertFeedInfo).
  1490. Tee(convertLuaScripts).
  1491. Bind(readTranslations).
  1492. Recover(recoverTranslations).
  1493. Bind(createTrafficCalendarFile).
  1494. Bind(convertCalendar).
  1495. Recover(recoverCalendar).
  1496. Bind(convertCalendarDates).
  1497. Recover(recoverCalendar).
  1498. Tee(checkAnyCalendarConverted).
  1499. Tee(saveSchedules).
  1500. Tee(saveFeedInfo).
  1501. Recover(closeTrafficCalendarFile).
  1502. Bind(convertDepartures).
  1503. Bind(getLineNames).
  1504. Bind(getStopNames).
  1505. Bind(convertTrips).
  1506. Map(clearDepartures).
  1507. Map(clearStopNames).
  1508. Map(clearLineNames).
  1509. Bind(convertStops).
  1510. Tee(writeTripIndex).
  1511. Map(clearTripOffsets).
  1512. Tee(writeStopNameIndex).
  1513. Tee(writeStopCodeIndex).
  1514. Map(clearTripsChangeOptions).
  1515. Map(clearTripsThroughStops).
  1516. Map(clearLineNames).
  1517. Bind(getTrips).
  1518. Bind(convertLineGraphs).
  1519. Map(clearStops).
  1520. Bind(convertLines).
  1521. Tee(writeLineIndex).
  1522. Tee(writeLineIdIndex).
  1523. Map(clearLineGraphs).
  1524. Map(clearLineHeadsigns).
  1525. Tee(deleteTxtFiles).
  1526. Tee(compressTraffic).
  1527. Tee(deleteBareFiles).
  1528. Tee(moveTraffic)
  1529. if r.E != nil {
  1530. log.Printf("Error converting %s: %v\n", args.feed.Name(), r.E)
  1531. allErrors = append(allErrors, r.E)
  1532. }
  1533. if err := append(r.S.ValidFromError, r.S.ValidTillError...); len(err) != 0 {
  1534. allErrors = append(allErrors, err...)
  1535. log.Printf("Error converting %s: %v\n", args.feed.Name(), errors.Join(err...))
  1536. }
  1537. }
  1538. if len(allErrors) > 0 {
  1539. return gott.Tuple{args}, errors.Join(allErrors...)
  1540. }
  1541. return gott.Tuple{args}, nil
  1542. }
  1543. func signal(input ...interface{}) (interface{}, error) {
  1544. args := input[0].(result)
  1545. if len(args.gtfsFilenames) > 0 && args.pid > 0 {
  1546. process, err := os.FindProcess(args.pid)
  1547. if err != nil {
  1548. return gott.Tuple{args}, err
  1549. }
  1550. err = process.Signal(syscall.SIGUSR1)
  1551. if err != nil {
  1552. return gott.Tuple{args}, err
  1553. }
  1554. }
  1555. return gott.Tuple{args}, nil
  1556. }
  1557. func openLastUpdated(input ...interface{}) (interface{}, error) {
  1558. args := input[0].(result)
  1559. updatesFilename := filepath.Join(args.config.FeedsPath, "updated.bare")
  1560. var err error
  1561. args.updatesFile, err = os.OpenFile(updatesFilename, os.O_RDWR|os.O_CREATE, 0644)
  1562. return gott.Tuple{args}, err
  1563. }
  1564. func isEmpty(input ...interface{}) error {
  1565. args := input[0].(result)
  1566. stat, err := os.Stat(args.updatesFile.Name())
  1567. if err != nil {
  1568. return err
  1569. }
  1570. if stat.Size() == 0 {
  1571. return ErrEmpty{}
  1572. }
  1573. return nil
  1574. }
  1575. func unmarshalLastUpdated(input ...interface{}) (interface{}, error) {
  1576. args := input[0].(result)
  1577. var lastUpdated map[string]string
  1578. err := bare.UnmarshalReader(args.updatesFile, &lastUpdated)
  1579. args.updates = lastUpdated
  1580. return gott.Tuple{args}, err
  1581. }
  1582. func recoverEmpty(input ...interface{}) (interface{}, error) {
  1583. args := input[0].(result)
  1584. err := input[1].(error)
  1585. var emptyError ErrEmpty
  1586. if errors.As(err, &emptyError) {
  1587. return gott.Tuple{args}, nil
  1588. } else {
  1589. return gott.Tuple{args}, err
  1590. }
  1591. }
  1592. func lastUpdated(input ...interface{}) interface{} {
  1593. args := input[0].(result)
  1594. args.updates[args.feed.String()] = time.Now().Format(time.RFC3339)
  1595. return gott.Tuple{args}
  1596. }
  1597. func seekLastUpdated(input ...interface{}) (interface{}, error) {
  1598. args := input[0].(result)
  1599. _, err := args.updatesFile.Seek(0, 0)
  1600. return gott.Tuple{args}, err
  1601. }
  1602. func marshalLastUpdated(input ...interface{}) error {
  1603. args := input[0].(result)
  1604. err := bare.MarshalWriter(bare.NewWriter(args.updatesFile), &args.updates)
  1605. args.updatesFile.Close()
  1606. return err
  1607. }
  1608. func Prepare(cfg config.Config, t Traffic, bimbaPid int, feedTranslations embed.FS) error { // todo(BAF18) remove pid
  1609. etags, err := readEtags(cfg)
  1610. if err != nil {
  1611. return fmt.Errorf("while reading etags: %w", err)
  1612. }
  1613. newEtags := map[string]string{}
  1614. for _, feed := range t.Feeds {
  1615. log.Printf("converting %s\n", feed.Name())
  1616. r := gott.Tuple{result{
  1617. config: cfg,
  1618. pid: bimbaPid,
  1619. tmpPath: os.TempDir(),
  1620. feed: feed,
  1621. feedName: feed.String(),
  1622. location: feed.getTimezone(),
  1623. updates: map[string]string{},
  1624. etags: etags,
  1625. newEtags: newEtags,
  1626. feedTranslations: feedTranslations,
  1627. }}
  1628. s, err := gott.NewResult(r).
  1629. SetLevelLog(gott.Debug).
  1630. Bind(createTmpPath).
  1631. Bind(createFeedHome).
  1632. Bind(listDownloadedVersions).
  1633. Bind(getAllVersions).
  1634. Map(findValidVersions).
  1635. Bind(getGtfsFiles).
  1636. Bind(convert).
  1637. Bind(signal).
  1638. Bind(openLastUpdated).
  1639. Tee(isEmpty).
  1640. Bind(unmarshalLastUpdated).
  1641. Recover(recoverEmpty).
  1642. Map(lastUpdated).
  1643. Bind(seekLastUpdated).
  1644. Tee(marshalLastUpdated).
  1645. Finish()
  1646. if err != nil {
  1647. log.Printf("Error converting %s: %v\n", feed.String(), err)
  1648. } else {
  1649. etags = s.(gott.Tuple)[0].(result).etags
  1650. newEtags = s.(gott.Tuple)[0].(result).newEtags
  1651. }
  1652. }
  1653. return saveEtags(cfg, newEtags)
  1654. }