convert.go 42 KB


  1. // SPDX-FileCopyrightText: Adam Evyčędo
  2. //
  3. // SPDX-License-Identifier: AGPL-3.0-or-later
  4. package traffic
  5. // TODO(BAF10) direction (0|1) to const (TO|BACK)
  6. // TODO Agency.language, FeedInfo.language -> IETF language tag
  7. // TODO Agency.phoneNumber -> E.123 format
  8. import (
  9. "text/template"
  10. "apiote.xyz/p/szczanieckiej/config"
  11. "apiote.xyz/p/szczanieckiej/file"
  12. "bufio"
  13. "embed"
  14. "encoding/csv"
  15. "errors"
  16. "fmt"
  17. "io"
  18. "io/fs"
  19. "log"
  20. "net/http"
  21. "os"
  22. "path/filepath"
  23. "sort"
  24. "strings"
  25. "syscall"
  26. "time"
  27. "gopkg.in/yaml.v3"
  28. gott2 "apiote.xyz/p/gott/v2"
  29. "git.sr.ht/~sircmpwn/go-bare"
  30. "notabug.org/apiote/gott"
  31. )
  32. //go:embed realtime_lua
  33. var luaScripts embed.FS
  34. type _LineGraph struct {
  35. StopCodesArray []string
  36. StopCodes map[string]int
  37. NextNodes map[int]map[int]struct{}
  38. }
  39. type ErrEmpty struct{}
  40. func (ErrEmpty) Error() string {
  41. return ""
  42. }
  43. type result struct {
  44. config config.Config
  45. pid int
  46. tmpPath string
  47. feed Feed
  48. feedName string
  49. location *time.Location
  50. tmpFeedPath string
  51. homeFeedPath string
  52. downloadedVersions []Version
  53. allVersions []Version
  54. gtfsFilenames []string
  55. missingVersions []Version
  56. updatesFile *os.File
  57. updates map[string]string
  58. etags map[string]string
  59. newEtags map[string]string
  60. feedTranslations embed.FS
  61. }
  62. type feedConverter struct {
  63. TmpFeedPath string
  64. GtfsFilename string
  65. Feed Feed
  66. HomeFeedPath string
  67. feedTranslations embed.FS
  68. config config.Config
  69. Timezone *time.Location
  70. TrafficCalendarFile *os.File
  71. tripsInputIndex map[string]int64
  72. routesInputIndex map[string]int64
  73. stopsInputIndex map[string]int64
  74. tripsOffsets map[string]uint
  75. StopsCodeIndex CodeIndex
  76. StopsNameIndex map[string][]uint
  77. Stops map[string]string
  78. LineGraphs map[string]map[uint]LineGraph
  79. lineHeadsigns map[string]map[uint][]string
  80. LineIndex map[string][]uint
  81. LineIdIndex CodeIndex
  82. ValidFrom time.Time
  83. ValidFromError []error
  84. ValidTill time.Time
  85. ValidTillError []error
  86. feedInfo FeedInfo
  87. defaultLanguage string
  88. translations map[string]map[string]string
  89. schedules map[string]Schedule
  90. trips map[string]Trip
  91. }
  92. // helper functions
  93. func translateFieldDefault(key, feedLanguage, defaultLanguage string, translations map[string]map[string]string) string {
  94. if feedLanguage == "mul" {
  95. if value, ok := translations[key][defaultLanguage]; !ok {
  96. return key
  97. } else {
  98. return value
  99. }
  100. }
  101. return key
  102. }
  103. func translateField(key, feedLanguage, defaultLanguage string, translations map[string]map[string]string) []Translation {
  104. var result []Translation
  105. if feedLanguage == "mul" {
  106. if value, ok := translations[key][defaultLanguage]; !ok {
  107. result = []Translation{{Language: defaultLanguage, Value: key}}
  108. } else {
  109. result = []Translation{{Language: defaultLanguage, Value: value}}
  110. }
  111. }
  112. for language, value := range translations[key] {
  113. if language == defaultLanguage {
  114. continue
  115. }
  116. result = append(result, Translation{Language: language, Value: value})
  117. }
  118. return result
  119. }
  120. func hex2colour(hex string) Colour {
  121. if hex[0] == '#' {
  122. hex = hex[1:]
  123. }
  124. colour := Colour{
  125. A: 0xff,
  126. }
  127. hexToByte := func(b byte) byte {
  128. switch {
  129. case b >= '0' && b <= '9':
  130. return b - '0'
  131. case b >= 'a' && b <= 'f':
  132. return b - 'a' + 10
  133. case b >= 'A' && b <= 'F':
  134. return b - 'A' + 10
  135. default:
  136. return 0
  137. }
  138. }
  139. switch len(hex) {
  140. case 6:
  141. colour.R = hexToByte(hex[0])<<4 + hexToByte(hex[1])
  142. colour.G = hexToByte(hex[2])<<4 + hexToByte(hex[3])
  143. colour.B = hexToByte(hex[4])<<4 + hexToByte(hex[5])
  144. case 3:
  145. colour.R = hexToByte(hex[0])<<4 + hexToByte(hex[0])
  146. colour.G = hexToByte(hex[1])<<4 + hexToByte(hex[1])
  147. colour.B = hexToByte(hex[2])<<4 + hexToByte(hex[2])
  148. }
  149. return colour
  150. }
  151. func readEtags(cfg config.Config) (map[string]string, error) {
  152. etagsFilename := filepath.Join(cfg.FeedsPath, "etags.bare")
  153. etagsFile, err := os.Open(etagsFilename)
  154. if err != nil {
  155. var pathError *os.PathError
  156. if errors.As(err, &pathError) && errors.Is(pathError, fs.ErrNotExist) {
  157. return map[string]string{}, nil
  158. }
  159. return nil, fmt.Errorf("while opening file: %w", err)
  160. }
  161. defer etagsFile.Close()
  162. var etags map[string]string
  163. err = bare.UnmarshalReader(etagsFile, &etags)
  164. if err != nil {
  165. return nil, fmt.Errorf("while unmarshalling: %w", err)
  166. }
  167. return etags, nil
  168. }
  169. func saveEtags(cfg config.Config, etags map[string]string) error {
  170. etagsFilename := filepath.Join(cfg.FeedsPath, "etags.bare")
  171. etagsFile, err := os.OpenFile(etagsFilename, os.O_RDWR|os.O_CREATE, 0644)
  172. if err != nil {
  173. return fmt.Errorf("while opening: %w", err)
  174. }
  175. defer etagsFile.Close()
  176. bytes, err := bare.Marshal(&etags)
  177. if err != nil {
  178. return fmt.Errorf("while marshalling: %w", err)
  179. }
  180. _, err = etagsFile.Write(bytes)
  181. if err != nil {
  182. return fmt.Errorf("while writing: %w", err)
  183. }
  184. return nil
  185. }
  186. // converting functions
  187. func createTmpPath(input ...interface{}) (interface{}, error) {
  188. args := input[0].(result)
  189. p := filepath.Join(args.tmpPath, args.feedName)
  190. err := os.MkdirAll(p, 0755)
  191. args.tmpFeedPath = p
  192. return gott.Tuple{args}, err
  193. }
  194. func createFeedHome(input ...interface{}) (interface{}, error) {
  195. args := input[0].(result)
  196. p := filepath.Join(args.config.FeedsPath, args.feedName)
  197. err := os.MkdirAll(p, 0755)
  198. args.homeFeedPath = p
  199. return gott.Tuple{args}, err
  200. }
  201. func listDownloadedVersions(input ...interface{}) (interface{}, error) {
  202. args := input[0].(result)
  203. v, err := ListVersionsTimezone(args.config, args.feed, args.feed.getTimezone())
  204. args.downloadedVersions = v
  205. return gott.Tuple{args}, err
  206. }
  207. func getAllVersions(input ...interface{}) (interface{}, error) {
  208. args := input[0].(result)
  209. v, err := args.feed.GetVersions(time.Now().In(args.location), args.location)
  210. args.allVersions = v
  211. return gott.Tuple{args}, err
  212. }
  213. func findValidVersions(input ...interface{}) interface{} {
  214. args := input[0].(result)
  215. now := time.Now().In(args.location)
  216. validVersions := FindValidVersions(args.allVersions, now)
  217. downloadedVersions := map[string]struct{}{}
  218. for _, downloadedVersion := range args.downloadedVersions {
  219. downloadedVersions[downloadedVersion.String()] = struct{}{}
  220. }
  221. missingVersions := []Version{}
  222. for _, version := range validVersions {
  223. if _, ok := downloadedVersions[version.String()]; !ok {
  224. missingVersions = append(missingVersions, version)
  225. }
  226. }
  227. // log.Println("all", args.allVersions)
  228. // log.Println("valid", validVersions)
  229. // log.Println("downloaded", downloadedVersions)
  230. // log.Println("missing", missingVersions)
  231. args.missingVersions = missingVersions
  232. return gott.Tuple{args}
  233. }
  234. func getGtfsFiles(input ...interface{}) (interface{}, error) {
  235. args := input[0].(result)
  236. names := []string{}
  237. for i, version := range args.missingVersions {
  238. name := fmt.Sprintf("%s_%d.zip", version.String(), i)
  239. zipPath := filepath.Join(args.tmpFeedPath, name)
  240. url := version.Link
  241. request, err := http.NewRequest("GET", url, nil)
  242. if err != nil {
  243. return gott.Tuple{args}, fmt.Errorf("while creating request for %s: %w", url, err)
  244. }
  245. request.Header.Add("If-None-Match", args.etags[url])
  246. client := http.Client{} // todo timeout
  247. response, err := client.Do(request)
  248. if err != nil {
  249. return gott.Tuple{args}, fmt.Errorf("while downloading gtfs %s %w", name, err)
  250. }
  251. if response.StatusCode != http.StatusOK && response.StatusCode != http.StatusNotModified {
  252. return gott.Tuple{args}, fmt.Errorf("wrong response code %d for %s: %w", response.StatusCode, url, err)
  253. }
  254. if response.StatusCode == 200 {
  255. args.newEtags[url] = response.Header.Get("etag")
  256. } else {
  257. args.newEtags[url] = args.etags[url]
  258. continue
  259. }
  260. file, err := os.Create(zipPath)
  261. if err != nil {
  262. return gott.Tuple{args}, fmt.Errorf("while creating zip for %s %w", name, err)
  263. }
  264. defer file.Close()
  265. _, err = io.Copy(file, response.Body)
  266. if err != nil {
  267. return gott.Tuple{args}, fmt.Errorf("while copying gtfs %s %w", name, err)
  268. }
  269. names = append(names, name)
  270. }
  271. args.gtfsFilenames = names
  272. return gott.Tuple{args}, nil
  273. }
  274. func unzipGtfs(c feedConverter) error {
  275. return file.UnzipGtfs(c.TmpFeedPath, c.GtfsFilename)
  276. }
  277. func convertVehicles(c feedConverter) error { // ( -- >> vehicles.bare)
  278. result, err := os.Create(filepath.Join(c.TmpFeedPath, "vehicles.bare"))
  279. if err != nil {
  280. return fmt.Errorf("while creating file: %w", err)
  281. }
  282. defer result.Close()
  283. vehicles, err := c.Feed.ConvertVehicles()
  284. for _, vehicle := range vehicles {
  285. bytes, err := bare.Marshal(&vehicle)
  286. if err != nil {
  287. return fmt.Errorf("while marshalling: %w", err)
  288. }
  289. _, err = result.Write(bytes)
  290. if err != nil {
  291. return fmt.Errorf("while writing to file: %w", err)
  292. }
  293. }
  294. return nil
  295. }
  296. func prepareFeedGtfs(c feedConverter) error {
  297. return c.Feed.FeedPrepareZip(c.TmpFeedPath)
  298. }
  299. func createTrafficCalendarFile(c feedConverter) (feedConverter, error) {
  300. path := c.TmpFeedPath
  301. var err error
  302. c.TrafficCalendarFile, err = os.Create(filepath.Join(path, "calendar.bare"))
  303. return c, err
  304. }
  305. func recoverCalendar(c feedConverter, e error) (feedConverter, error) {
  306. var pathError *os.PathError
  307. if errors.As(e, &pathError) && errors.Is(pathError, fs.ErrNotExist) {
  308. return c, nil
  309. }
  310. return c, e
  311. }
  312. func convertCalendar(c feedConverter) (feedConverter, error) { // ( feedInfo -- schedules >> )
  313. c.schedules = map[string]Schedule{}
  314. path := c.TmpFeedPath
  315. calendarFile, err := os.Open(filepath.Join(path, "calendar.txt"))
  316. if err != nil {
  317. return c, fmt.Errorf("while opening file: %w", err)
  318. }
  319. defer calendarFile.Close()
  320. r := csv.NewReader(bufio.NewReader(calendarFile))
  321. header, err := r.Read()
  322. if err != nil {
  323. return c, fmt.Errorf("while reading header: %w", err)
  324. }
  325. fields := map[string]int{}
  326. for i, headerField := range header {
  327. fields[headerField] = i
  328. }
  329. for {
  330. schedule := Schedule{}
  331. record, err := r.Read()
  332. if err == io.EOF {
  333. break
  334. }
  335. if err != nil {
  336. return c, fmt.Errorf("while reading a record: %w", err)
  337. }
  338. schedule.Id = record[fields["service_id"]]
  339. startDate := record[fields["start_date"]]
  340. endDate := record[fields["end_date"]]
  341. schedule.DateRanges = []DateRange{
  342. DateRange{
  343. Start: startDate[:4] + "-" + startDate[4:6] + "-" + startDate[6:],
  344. End: endDate[:4] + "-" + endDate[4:6] + "-" + endDate[6:],
  345. },
  346. }
  347. if record[fields["monday"]] == "1" {
  348. schedule.DateRanges[0].Weekdays |= (1 << 1)
  349. }
  350. if record[fields["tuesday"]] == "1" {
  351. schedule.DateRanges[0].Weekdays |= (1 << 2)
  352. }
  353. if record[fields["wednesday"]] == "1" {
  354. schedule.DateRanges[0].Weekdays |= (1 << 3)
  355. }
  356. if record[fields["thursday"]] == "1" {
  357. schedule.DateRanges[0].Weekdays |= (1 << 4)
  358. }
  359. if record[fields["friday"]] == "1" {
  360. schedule.DateRanges[0].Weekdays |= (1 << 5)
  361. }
  362. if record[fields["saturday"]] == "1" {
  363. schedule.DateRanges[0].Weekdays |= (1 << 6)
  364. }
  365. if record[fields["sunday"]] == "1" {
  366. schedule.DateRanges[0].Weekdays |= (1 << 0)
  367. schedule.DateRanges[0].Weekdays |= (1 << 7)
  368. }
  369. c.schedules[schedule.Id] = schedule
  370. scheduleStart, err := time.ParseInLocation(DateFormat, schedule.DateRanges[0].Start, c.Timezone)
  371. if err != nil {
  372. c.ValidFromError = append(c.ValidFromError, err)
  373. }
  374. if err == nil && (c.ValidFrom.IsZero() || scheduleStart.Before(c.ValidFrom)) {
  375. c.ValidFrom = scheduleStart
  376. c.feedInfo.ValidSince = scheduleStart.Format(ValidityFormat)
  377. }
  378. scheduleEnd, err := time.ParseInLocation(DateFormat, schedule.DateRanges[0].End, c.Timezone)
  379. if err != nil {
  380. c.ValidTillError = append(c.ValidTillError, err)
  381. }
  382. if err == nil && (c.ValidTill.IsZero() || scheduleEnd.After(c.ValidTill)) {
  383. c.ValidTill = scheduleEnd
  384. c.feedInfo.ValidTill = scheduleEnd.Format(ValidityFormat)
  385. }
  386. }
  387. return c, nil
  388. }
  389. func convertCalendarDates(c feedConverter) (feedConverter, error) { // ( feedInfo -- schedules >> )
  390. path := c.TmpFeedPath
  391. datesFile, err := os.Open(filepath.Join(path, "calendar_dates.txt"))
  392. if err != nil {
  393. return c, fmt.Errorf("while opening file: %w", err)
  394. }
  395. defer datesFile.Close()
  396. r := csv.NewReader(bufio.NewReader(datesFile))
  397. header, err := r.Read()
  398. if err != nil {
  399. return c, fmt.Errorf("while reading header: %w", err)
  400. }
  401. fields := map[string]int{}
  402. for i, headerField := range header {
  403. fields[headerField] = i
  404. }
  405. for {
  406. record, err := r.Read()
  407. if err == io.EOF {
  408. break
  409. }
  410. if err != nil {
  411. return c, fmt.Errorf("while reading a record: %w", err)
  412. }
  413. if record[fields["exception_type"]] == "1" {
  414. id := record[fields["service_id"]]
  415. schedule := c.schedules[id]
  416. date := record[fields["date"]]
  417. dateRange := DateRange{
  418. Start: date[:4] + "-" + date[4:6] + "-" + date[6:],
  419. End: date[:4] + "-" + date[4:6] + "-" + date[6:],
  420. Weekdays: 0xff,
  421. }
  422. if len(schedule.DateRanges) == 0 {
  423. schedule.Id = id
  424. schedule.DateRanges = []DateRange{dateRange}
  425. } else {
  426. schedule.DateRanges = append(schedule.DateRanges, dateRange)
  427. sort.Slice(schedule.DateRanges, func(i, j int) bool {
  428. return schedule.DateRanges[i].Start < schedule.DateRanges[j].Start
  429. })
  430. }
  431. c.schedules[schedule.Id] = schedule
  432. } else {
  433. date := record[fields["date"]]
  434. formatedDate := date[:4] + "-" + date[4:6] + "-" + date[6:]
  435. scheduleToEdit := c.schedules[record[fields["service_id"]]]
  436. newDateRanges := []DateRange{}
  437. for i := 0; i < len(scheduleToEdit.DateRanges); i++ {
  438. dateRange := scheduleToEdit.DateRanges[i]
  439. if dateRange.Start == formatedDate {
  440. d, _ := time.ParseInLocation(DateFormat, dateRange.Start, c.Timezone)
  441. dateRange.Start = d.AddDate(0, 0, 1).Format(DateFormat)
  442. if dateRange.Start <= dateRange.End {
  443. newDateRanges = append(newDateRanges, dateRange)
  444. }
  445. continue
  446. }
  447. if dateRange.Start < formatedDate && formatedDate < dateRange.End {
  448. d, _ := time.ParseInLocation(DateFormat, formatedDate, c.Timezone)
  449. range1 := DateRange{dateRange.Start, d.AddDate(0, 0, -1).Format(DateFormat), dateRange.Weekdays}
  450. range2 := DateRange{d.AddDate(0, 0, 1).Format(DateFormat), dateRange.End, dateRange.Weekdays}
  451. newDateRanges = append(newDateRanges, range1)
  452. newDateRanges = append(newDateRanges, range2)
  453. continue
  454. }
  455. if formatedDate == dateRange.End {
  456. d, _ := time.ParseInLocation(DateFormat, dateRange.End, c.Timezone)
  457. dateRange.End = d.AddDate(0, 0, -1).Format(DateFormat)
  458. newDateRanges = append(newDateRanges, dateRange)
  459. continue
  460. }
  461. newDateRanges = append(newDateRanges, dateRange)
  462. }
  463. scheduleToEdit.DateRanges = newDateRanges
  464. c.schedules[record[fields["service_id"]]] = scheduleToEdit
  465. }
  466. }
  467. for _, schedule := range c.schedules {
  468. lastDateRange := len(schedule.DateRanges) - 1
  469. scheduleStart, err := time.ParseInLocation(DateFormat, schedule.DateRanges[0].Start, c.Timezone)
  470. if err != nil {
  471. c.ValidFromError = append(c.ValidFromError, err)
  472. }
  473. if err == nil && (c.ValidFrom.IsZero() || scheduleStart.Before(c.ValidFrom)) {
  474. c.ValidFrom = scheduleStart
  475. c.feedInfo.ValidSince = scheduleStart.Format(ValidityFormat)
  476. }
  477. scheduleEnd, err := time.ParseInLocation(DateFormat, schedule.DateRanges[lastDateRange].End, c.Timezone)
  478. if err != nil {
  479. c.ValidTillError = append(c.ValidTillError, err)
  480. }
  481. if err == nil && (c.ValidTill.IsZero() || scheduleEnd.After(c.ValidTill)) {
  482. c.ValidTill = scheduleEnd
  483. c.feedInfo.ValidTill = scheduleEnd.Format(ValidityFormat)
  484. }
  485. }
  486. return c, nil
  487. }
  488. func checkAnyCalendarConverted(c feedConverter) error {
  489. if len(c.schedules) == 0 {
  490. return fmt.Errorf("no calendar converted")
  491. }
  492. return nil
  493. }
  494. func saveSchedules(c feedConverter) error {
  495. resultFile := c.TrafficCalendarFile
  496. schedulesArray := make([]Schedule, len(c.schedules))
  497. i := 0
  498. for _, schedule := range c.schedules {
  499. schedulesArray[i] = schedule
  500. i++
  501. }
  502. sort.Slice(schedulesArray, func(i, j int) bool {
  503. return schedulesArray[i].DateRanges[0].Start < schedulesArray[j].DateRanges[0].Start
  504. })
  505. for _, schedule := range schedulesArray {
  506. bytes, err := bare.Marshal(&schedule)
  507. if err != nil {
  508. return fmt.Errorf("while marshalling: %w", err)
  509. }
  510. _, err = resultFile.Write(bytes)
  511. if err != nil {
  512. return fmt.Errorf("while writing: %w", err)
  513. }
  514. }
  515. c.schedules = map[string]Schedule{}
  516. return nil
  517. }
  518. func saveFeedInfo(c feedConverter) error {
  519. path := c.TmpFeedPath
  520. result, err := os.Create(filepath.Join(path, "feed_info.bare"))
  521. if err != nil {
  522. return fmt.Errorf("while creating file: %w", err)
  523. }
  524. defer result.Close()
  525. bytes, err := bare.Marshal(&c.feedInfo)
  526. if err != nil {
  527. return fmt.Errorf("while marshalling: %w", err)
  528. }
  529. _, err = result.Write(bytes)
  530. if err != nil {
  531. return fmt.Errorf("while writing: %w", err)
  532. }
  533. log.Printf("timetable is valid: %s to %s\n", c.feedInfo.ValidSince, c.feedInfo.ValidTill)
  534. c.feedInfo = FeedInfo{}
  535. return nil
  536. }
  537. func closeTrafficCalendarFile(c feedConverter, e error) (feedConverter, error) {
  538. if c.TrafficCalendarFile != nil {
  539. c.TrafficCalendarFile.Close()
  540. }
  541. return c, e
  542. }
  543. func forEachRow(filename string, f func(int64, map[string]int, []string) error) error {
  544. return forEachRowWithHeader(filename, func(_ []string) error {
  545. return nil
  546. }, f)
  547. }
  548. func forEachRowWithHeader(filename string, h func([]string) error, f func(int64, map[string]int, []string) error) error {
  549. file, err := os.Open(filename)
  550. if err != nil {
  551. return fmt.Errorf("while opening file: %w", err)
  552. }
  553. defer file.Close()
  554. r := csv.NewReader(file)
  555. header, err := r.Read()
  556. if err != nil {
  557. return fmt.Errorf("while reading header: %w", err)
  558. }
  559. err = h(header)
  560. if err != nil {
  561. return fmt.Errorf("while performing function on header: %w", err)
  562. }
  563. fields := map[string]int{}
  564. for i, headerField := range header {
  565. fields[headerField] = i
  566. }
  567. for {
  568. offset := r.InputOffset()
  569. record, err := r.Read()
  570. if err == io.EOF {
  571. break
  572. }
  573. if err != nil {
  574. return fmt.Errorf("while reading a record: %w", err)
  575. }
  576. err = f(offset, fields, record)
  577. if err != nil {
  578. return fmt.Errorf("while performing function: %w", err)
  579. }
  580. }
  581. return nil
  582. }
  583. func clearStops(c feedConverter) feedConverter {
  584. c.Stops = map[string]string{}
  585. return c
  586. }
  587. func clearTripOffsets(c feedConverter) feedConverter {
  588. c.tripsOffsets = map[string]uint{}
  589. return c
  590. }
  591. func clearLineGraphs(c feedConverter) feedConverter {
  592. c.LineGraphs = map[string]map[uint]LineGraph{}
  593. return c
  594. }
  595. func clearLineHeadsigns(c feedConverter) feedConverter {
  596. c.lineHeadsigns = map[string]map[uint][]string{}
  597. return c
  598. }
  599. func getTrips(c feedConverter) (feedConverter, error) {
  600. file, err := os.Open(filepath.Join(c.TmpFeedPath, "trips.bare"))
  601. if err != nil {
  602. return c, fmt.Errorf("while opening trips: %w", err)
  603. }
  604. trips := map[string]Trip{}
  605. for {
  606. var trip Trip
  607. err := bare.UnmarshalReader(file, &trip)
  608. trip.Departures = []Departure{}
  609. trips[trip.Id] = trip
  610. if err != nil {
  611. if err == io.EOF {
  612. break
  613. } else {
  614. return c, fmt.Errorf("while unmarshaling: %w", err)
  615. }
  616. }
  617. }
  618. c.trips = trips
  619. return c, nil
  620. }
  621. func convertLineGraphs(c feedConverter) (feedConverter, error) { // O(n:stop_times) ; (trips, stops -- lineGrapsh:map[lineID]map[direction]graph, lineHeadsigns:map[lineID]map[direction][]headsigns >> )
  622. path := c.TmpFeedPath
  623. trips := c.trips
  624. stops := c.Stops
  625. // lineID dire headsi
  626. lineHeadsignsMap := map[string]map[uint]map[string]struct{}{}
  627. // lineID dire headsi
  628. lineHeadsigns := map[string]map[uint][]string{}
  629. // lineNa dire
  630. graphs := map[string]map[uint]_LineGraph{}
  631. file, err := os.Open(filepath.Join(path, "stop_times.txt"))
  632. if err != nil {
  633. return c, fmt.Errorf("while opening stop_times: %w", err)
  634. }
  635. defer file.Close()
  636. r := csv.NewReader(bufio.NewReader(file))
  637. header, err := r.Read()
  638. if err != nil {
  639. return c, fmt.Errorf("while reading header: %w", err)
  640. }
  641. fields := map[string]int{}
  642. for i, headerField := range header {
  643. fields[headerField] = i
  644. }
  645. previousTripID := ""
  646. previous := -1
  647. previousTrip := Trip{}
  648. for {
  649. record, err := r.Read()
  650. if err == io.EOF {
  651. break
  652. }
  653. if err != nil {
  654. return c, fmt.Errorf("while reading a record: %w", err)
  655. }
  656. tripID := record[fields["trip_id"]]
  657. stop := stops[record[fields["stop_id"]]]
  658. trip := trips[tripID]
  659. if _, ok := lineHeadsignsMap[trip.LineID]; !ok {
  660. lineHeadsignsMap[trip.LineID] = map[uint]map[string]struct{}{}
  661. lineHeadsigns[trip.LineID] = map[uint][]string{}
  662. }
  663. if _, ok := lineHeadsignsMap[trip.LineID][trip.Direction.Value()]; !ok {
  664. lineHeadsignsMap[trip.LineID][trip.Direction.Value()] = map[string]struct{}{}
  665. lineHeadsigns[trip.LineID][trip.Direction.Value()] = []string{}
  666. }
  667. lineHeadsignsMap[trip.LineID][trip.Direction.Value()][trip.Headsign] = struct{}{}
  668. if _, ok := graphs[trip.LineID]; !ok {
  669. graphs[trip.LineID] = map[uint]_LineGraph{}
  670. }
  671. if previousTripID != tripID && previousTripID != "" {
  672. // last of previous trip
  673. graph := graphs[previousTrip.LineID][previousTrip.Direction.Value()]
  674. if graph.NextNodes == nil {
  675. graph.NextNodes = map[int]map[int]struct{}{}
  676. }
  677. if graph.NextNodes[previous] == nil {
  678. graph.NextNodes[previous] = map[int]struct{}{}
  679. }
  680. graphs[previousTrip.LineID][previousTrip.Direction.Value()] = graph
  681. graphs[previousTrip.LineID][previousTrip.Direction.Value()].NextNodes[previous][-1] = struct{}{}
  682. }
  683. graph := graphs[trip.LineID][trip.Direction.Value()]
  684. if graph.StopCodes == nil {
  685. graph.StopCodes = map[string]int{}
  686. }
  687. if graph.NextNodes == nil {
  688. graph.NextNodes = map[int]map[int]struct{}{}
  689. }
  690. current := -1
  691. current, ok := graph.StopCodes[stop]
  692. if !ok {
  693. current = len(graph.StopCodesArray)
  694. graph.StopCodesArray = append(graph.StopCodesArray, stop)
  695. graph.StopCodes[stop] = current
  696. }
  697. if previousTripID != tripID {
  698. // first of current trip
  699. if graph.NextNodes[-1] == nil {
  700. graph.NextNodes[-1] = map[int]struct{}{}
  701. }
  702. if _, ok := graph.NextNodes[-1][current]; !ok {
  703. graph.NextNodes[-1][current] = struct{}{}
  704. }
  705. } else {
  706. // second <- first to last <- penultimate of current trip
  707. if graph.NextNodes[previous] == nil {
  708. graph.NextNodes[previous] = map[int]struct{}{}
  709. }
  710. if _, ok := graph.NextNodes[previous][current]; !ok {
  711. graph.NextNodes[previous][current] = struct{}{}
  712. }
  713. }
  714. previous = current
  715. previousTripID = tripID
  716. previousTrip = trip
  717. graphs[trip.LineID][trip.Direction.Value()] = graph
  718. }
  719. g := graphs[previousTrip.LineID][previousTrip.Direction.Value()]
  720. if g.NextNodes[previous] == nil {
  721. g.NextNodes[previous] = map[int]struct{}{}
  722. }
  723. if _, ok := g.NextNodes[previous][-1]; !ok {
  724. g.NextNodes[previous][-1] = struct{}{}
  725. }
  726. for lineID, directions := range lineHeadsignsMap {
  727. for direction, headsigns := range directions {
  728. for headsign := range headsigns {
  729. lineHeadsigns[lineID][direction] = append(lineHeadsigns[lineID][direction], headsign)
  730. }
  731. }
  732. }
  733. c.LineGraphs = map[string]map[uint]LineGraph{}
  734. for lineID, graphByDirection := range graphs {
  735. c.LineGraphs[lineID] = map[uint]LineGraph{}
  736. for direction, graph := range graphByDirection {
  737. c.LineGraphs[lineID][direction] = LineGraph{
  738. StopCodes: graph.StopCodesArray,
  739. NextNodes: map[int][]int{},
  740. }
  741. for from, tos := range graph.NextNodes {
  742. for to := range tos {
  743. c.LineGraphs[lineID][direction].NextNodes[from] = append(c.LineGraphs[lineID][direction].NextNodes[from], to)
  744. }
  745. }
  746. }
  747. }
  748. c.lineHeadsigns = lineHeadsigns
  749. return c, nil
  750. }
  751. func convertLines(c feedConverter) (feedConverter, error) { // O(n:routes) ; (lineGraphs, lineHeadsigns -- lineIndex:map[lineName][]offsets, lineIdIndex:CodeIndex >> lines)
  752. path := c.TmpFeedPath
  753. feed := c.Feed
  754. file, err := os.Open(filepath.Join(path, "routes.txt"))
  755. if err != nil {
  756. return c, fmt.Errorf("while opening file: %w", err)
  757. }
  758. defer file.Close()
  759. result, err := os.Create(filepath.Join(path, "lines.bare"))
  760. if err != nil {
  761. return c, fmt.Errorf("while creating file: %w", err)
  762. }
  763. defer result.Close()
  764. r := csv.NewReader(bufio.NewReader(file))
  765. header, err := r.Read()
  766. if err != nil {
  767. return c, fmt.Errorf("while reading header: %w", err)
  768. }
  769. fields := map[string]int{}
  770. for i, headerField := range header {
  771. fields[headerField] = i
  772. }
  773. var offset uint = 0
  774. index := map[string][]uint{}
  775. idIndex := CodeIndex{}
  776. for {
  777. record, err := r.Read()
  778. if err == io.EOF {
  779. break
  780. }
  781. if err != nil {
  782. return c, fmt.Errorf("while reading a record: %w", err)
  783. }
  784. routeID := record[fields["route_id"]]
  785. lineName := c.Feed.Flags().LineName
  786. for _, template := range []string{"route_short_name", "route_long_name"} {
  787. lineName = strings.Replace(lineName, "{{"+template+"}}", record[fields[template]], -1)
  788. }
  789. var kind uint
  790. fmt.Sscanf(record[fields["route_type"]], "%d", &kind)
  791. colour := "ffffff"
  792. if colourIx, ok := fields["route_color"]; ok && record[colourIx] != "" {
  793. colour = record[colourIx]
  794. }
  795. directions := []uint{}
  796. for direction := range c.lineHeadsigns[routeID] {
  797. directions = append(directions, direction)
  798. }
  799. sort.Slice(directions, func(i, j int) bool {
  800. return directions[i] < directions[j]
  801. })
  802. headsigns := [][]string{}
  803. translatedHeadsigns := [][][]Translation{}
  804. for _, direction := range directions {
  805. dirHeadsigns := c.lineHeadsigns[routeID][direction]
  806. headsigns = append(headsigns, dirHeadsigns)
  807. translatedHeadsign := [][]Translation{}
  808. for _, headsign := range dirHeadsigns {
  809. translatedHeadsign = append(translatedHeadsign, translateField(headsign, c.feedInfo.Language, c.defaultLanguage, c.translations))
  810. }
  811. translatedHeadsigns = append(translatedHeadsigns, translatedHeadsign)
  812. }
  813. graphs := []LineGraph{}
  814. for _, direction := range directions {
  815. graphs = append(graphs, c.LineGraphs[routeID][direction])
  816. }
  817. line := Line{
  818. Id: routeID,
  819. Name: lineName,
  820. Colour: hex2colour(colour),
  821. Kind: LineType(kind),
  822. Graphs: graphs,
  823. Headsigns: headsigns,
  824. }
  825. if field, present := fields["agency_id"]; present {
  826. line.AgencyID = record[field]
  827. }
  828. bytes, err := bare.Marshal(&line)
  829. if err != nil {
  830. return c, fmt.Errorf("while marshalling: %w", err)
  831. }
  832. b, err := result.Write(bytes)
  833. if err != nil {
  834. return c, fmt.Errorf("while writing: %w", err)
  835. }
  836. cleanQuery, err := CleanQuery(line.Name, feed)
  837. if err != nil {
  838. return c, fmt.Errorf("while cleaning line name: %w", err)
  839. }
  840. index[cleanQuery] = append(index[cleanQuery], offset)
  841. idIndex[routeID] = offset
  842. offset += uint(b)
  843. }
  844. c.LineIdIndex = idIndex
  845. c.LineIndex = index
  846. return c, nil
  847. }
  848. func convertFeedInfo(c feedConverter) (feedConverter, error) { // O(1:feed_info) ; ( -- feed_info >> )
  849. path := c.TmpFeedPath
  850. feedInfo := FeedInfo{}
  851. file, err := os.Open(filepath.Join(path, "feed_info.txt"))
  852. if err != nil {
  853. if errors.Is(err, fs.ErrNotExist) {
  854. log.Println("[WARN] no feed_info.txt")
  855. file = nil
  856. } else {
  857. return c, fmt.Errorf("while opening file: %w", err)
  858. }
  859. }
  860. if file != nil {
  861. defer file.Close()
  862. r := csv.NewReader(bufio.NewReader(file))
  863. header, err := r.Read()
  864. if err != nil {
  865. return c, fmt.Errorf("while reading header: %w", err)
  866. }
  867. fields := map[string]int{}
  868. for i, headerField := range header {
  869. fields[headerField] = i
  870. }
  871. record, err := r.Read()
  872. if err != nil {
  873. return c, fmt.Errorf("while reading a record: %w", err)
  874. }
  875. feedInfo.Website = record[fields["feed_publisher_url"]]
  876. feedInfo.Language = record[fields["feed_lang"]]
  877. if defaultLanguageIndex, ok := fields["default_lang"]; ok {
  878. c.defaultLanguage = record[defaultLanguageIndex]
  879. }
  880. if ix, ok := fields["feed_start_date"]; ok {
  881. c.ValidFrom, err = time.ParseInLocation("20060102", record[ix], c.Timezone)
  882. if err != nil {
  883. c.ValidFromError = append(c.ValidFromError, err)
  884. }
  885. feedInfo.ValidSince = record[ix]
  886. }
  887. if ix, ok := fields["feed_end_date"]; ok {
  888. c.ValidTill, err = time.ParseInLocation("20060102", record[ix], c.Timezone)
  889. if err != nil {
  890. c.ValidTillError = append(c.ValidTillError, err)
  891. }
  892. feedInfo.ValidTill = record[ix]
  893. }
  894. }
  895. feedInfo.Timezone = c.Timezone.String()
  896. feedInfo.RealtimeFeeds = c.Feed.RealtimeFeeds()
  897. feedInfo.QrHost, feedInfo.QrLocation, feedInfo.QrSelector = c.Feed.QRInfo()
  898. feedInfo.Attributions, feedInfo.Descriptions, err = getAttrDesc(c.Feed.String(), c.feedTranslations)
  899. feedInfo.Name = c.Feed.Name()
  900. c.feedInfo = feedInfo
  901. return c, err
  902. }
  903. func convertLuaScripts(c feedConverter) error { // O(1) ; ( -- >> updates.lua, alerts.lua, vehicles.lua )
  904. filenames := []string{"updates", "vehicles", "alerts"}
  905. for _, filename := range filenames {
  906. t, err := template.ParseFS(luaScripts, "realtime_lua/"+c.Feed.String()+"_"+filename+".lua")
  907. if err != nil {
  908. if strings.Contains(err.Error(), "pattern matches no files") {
  909. log.Printf("%s.lua for this feed does not exist, ignoring\n", filename)
  910. continue
  911. }
  912. return fmt.Errorf("while parsing template %s: %w", filename, err)
  913. }
  914. path := c.TmpFeedPath
  915. writeFile, err := os.Create(filepath.Join(path, filename+".lua"))
  916. if err != nil {
  917. return fmt.Errorf("while creating %s: %w", filename, err)
  918. }
  919. defer writeFile.Close()
  920. err = t.Execute(writeFile, c.config.Auth[c.Feed.String()])
  921. if err != nil {
  922. return fmt.Errorf("while executing template %s: %w", filename, err)
  923. }
  924. }
  925. return nil
  926. }
  927. func getAttrDesc(feedID string, feedTranslations embed.FS) (map[string]string, map[string]string, error) {
  928. attributions := map[string]string{}
  929. descriptions := map[string]string{}
  930. dir, err := feedTranslations.ReadDir("translations")
  931. if err != nil {
  932. return attributions, descriptions, err
  933. }
  934. for _, f := range dir {
  935. translation := map[string]string{}
  936. name := f.Name()
  937. lang := strings.Split(name, ".")[1]
  938. fileContent, err := feedTranslations.ReadFile("translations/" + name)
  939. if err != nil {
  940. log.Printf("error reading translation %s\n", name)
  941. continue
  942. }
  943. err = yaml.Unmarshal(fileContent, &translation)
  944. if err != nil {
  945. return attributions, descriptions, fmt.Errorf("while unmarshalling %s: %w", name, err)
  946. }
  947. attributions[lang] = translation[feedID+"_attribution"]
  948. descriptions[lang] = translation[feedID+"_description"]
  949. if lang == "en" {
  950. attributions["und"] = translation[feedID+"_attribution"]
  951. descriptions["und"] = translation[feedID+"_description"]
  952. }
  953. }
  954. return attributions, descriptions, nil
  955. }
  956. func readTranslations(c feedConverter) (feedConverter, error) { // O(n:translations) ; ( -- translations >>)
  957. path := c.TmpFeedPath
  958. file, err := os.Open(filepath.Join(path, "translations.txt"))
  959. if err != nil {
  960. return c, fmt.Errorf("while opening file: %w", err)
  961. }
  962. defer file.Close()
  963. translations := map[string]map[string]string{}
  964. r := csv.NewReader(bufio.NewReader(file))
  965. header, err := r.Read()
  966. if err != nil {
  967. return c, fmt.Errorf("while reading header: %w", err)
  968. }
  969. fields := map[string]int{}
  970. for i, headerField := range header {
  971. fields[headerField] = i
  972. }
  973. for {
  974. record, err := r.Read()
  975. if err == io.EOF {
  976. break
  977. }
  978. if err != nil {
  979. return c, fmt.Errorf("while reading a record: %w", err)
  980. }
  981. key := record[fields["field_value"]]
  982. language := record[fields["language"]]
  983. translation := record[fields["translation"]]
  984. if _, ok := translations[key]; !ok {
  985. translations[key] = map[string]string{}
  986. }
  987. translations[key][language] = translation
  988. }
  989. c.translations = translations
  990. return c, nil
  991. }
  992. func recoverTranslations(c feedConverter, e error) (feedConverter, error) {
  993. var pathError *os.PathError
  994. if errors.As(e, &pathError) && errors.Is(pathError, fs.ErrNotExist) {
  995. return c, nil
  996. }
  997. return c, e
  998. }
  999. func convertAgencies(c feedConverter) (feedConverter, error) { // O(n:agency) ; ( -- >> agencies)
  1000. path := c.TmpFeedPath
  1001. file, err := os.Open(filepath.Join(path, "agency.txt"))
  1002. if err != nil {
  1003. return c, fmt.Errorf("while opening file: %w", err)
  1004. }
  1005. defer file.Close()
  1006. result, err := os.Create(filepath.Join(path, "agencies.bare"))
  1007. if err != nil {
  1008. return c, fmt.Errorf("while creating file: %w", err)
  1009. }
  1010. defer file.Close()
  1011. r := csv.NewReader(bufio.NewReader(file))
  1012. header, err := r.Read()
  1013. if err != nil {
  1014. return c, fmt.Errorf("while reading header: %w", err)
  1015. }
  1016. fields := map[string]int{}
  1017. for i, headerField := range header {
  1018. fields[headerField] = i
  1019. }
  1020. for {
  1021. record, err := r.Read()
  1022. if err == io.EOF {
  1023. break
  1024. }
  1025. if err != nil {
  1026. return c, fmt.Errorf("while reading a record: %w", err)
  1027. }
  1028. agency := Agency{
  1029. Id: record[fields["agency_id"]],
  1030. Name: record[fields["agency_name"]],
  1031. TranslatedNames: translateField(record[fields["agency_name"]], c.feedInfo.Language, c.defaultLanguage, c.translations),
  1032. Website: record[fields["agency_url"]],
  1033. TranslatedWebsites: translateField(record[fields["agency_url"]], c.feedInfo.Language, c.defaultLanguage, c.translations),
  1034. Timezone: record[fields["agency_timezone"]],
  1035. }
  1036. c.Timezone, _ = time.LoadLocation(agency.Timezone)
  1037. if field, present := fields["agency_lang"]; present {
  1038. agency.Language = record[field]
  1039. }
  1040. if field, present := fields["agency_phone"]; present {
  1041. agency.PhoneNumber = record[field]
  1042. agency.TranslatedPhoneNumbers = translateField(record[field], c.feedInfo.Language, c.defaultLanguage, c.translations)
  1043. }
  1044. if field, present := fields["agency_fare_url"]; present {
  1045. agency.FareWebsite = record[field]
  1046. agency.TranslatedFareWebsites = translateField(record[field], c.feedInfo.Language, c.defaultLanguage, c.translations)
  1047. }
  1048. if field, present := fields["agency_email"]; present {
  1049. agency.Email = record[field]
  1050. agency.TranslatedEmails = translateField(record[field], c.feedInfo.Language, c.defaultLanguage, c.translations)
  1051. }
  1052. bytes, err := bare.Marshal(&agency)
  1053. if err != nil {
  1054. return c, fmt.Errorf("while marshalling: %w", err)
  1055. }
  1056. _, err = result.Write(bytes)
  1057. if err != nil {
  1058. return c, fmt.Errorf("while writing: %w", err)
  1059. }
  1060. }
  1061. return c, nil
  1062. }
  1063. func writeNameIndex(c feedConverter, index map[string][]uint, filename string, raw bool) error {
  1064. path := c.TmpFeedPath
  1065. feed := c.Feed
  1066. result, err := os.Create(filepath.Join(path, filename))
  1067. if err != nil {
  1068. return fmt.Errorf("while creating file: %w", err)
  1069. }
  1070. defer result.Close()
  1071. for name, offsets := range index {
  1072. cleanQuery := name
  1073. if !raw {
  1074. cleanQuery, err = CleanQuery(name, feed)
  1075. if err != nil {
  1076. return fmt.Errorf("while cleaning name %s: %w", name, err)
  1077. }
  1078. }
  1079. stopOffset := NameOffset{
  1080. Name: cleanQuery,
  1081. Offsets: offsets,
  1082. }
  1083. bytes, err := bare.Marshal(&stopOffset)
  1084. if err != nil {
  1085. return fmt.Errorf("while marshalling: %w", err)
  1086. }
  1087. _, err = result.Write(bytes)
  1088. if err != nil {
  1089. return fmt.Errorf("while writing: %w", err)
  1090. }
  1091. }
  1092. return nil
  1093. }
  1094. func writeStopNameIndex(c feedConverter) error {
  1095. err := writeNameIndex(c, c.StopsNameIndex, "ix_stop_names.bare", false)
  1096. c.StopsNameIndex = map[string][]uint{}
  1097. return err
  1098. }
  1099. func writeLineIndex(c feedConverter) error {
  1100. err := writeNameIndex(c, c.LineIndex, "ix_lines.bare", false)
  1101. c.LineIndex = map[string][]uint{}
  1102. return err
  1103. }
  1104. func writeLineIdIndex(c feedConverter) error {
  1105. err := writeCodeIndex(c, c.LineIdIndex, "ix_line_codes.bare")
  1106. c.LineIndex = map[string][]uint{}
  1107. return err
  1108. }
  1109. func writeTripIndex(c feedConverter) error {
  1110. tripIndex := map[string][]uint{}
  1111. for trip, offset := range c.tripsOffsets {
  1112. tripIndex[trip] = []uint{offset}
  1113. }
  1114. err := writeNameIndex(c, tripIndex, "ix_trips.bare", true)
  1115. c.tripsOffsets = map[string]uint{}
  1116. return err
  1117. }
  1118. func writeStopCodeIndex(c feedConverter) error {
  1119. err := writeCodeIndex(c, c.StopsCodeIndex, "ix_stop_codes.bare")
  1120. c.StopsCodeIndex = CodeIndex{}
  1121. return err
  1122. }
  1123. func writeCodeIndex(c feedConverter, i CodeIndex, filename string) error {
  1124. path := c.TmpFeedPath
  1125. result, err := os.Create(filepath.Join(path, filename))
  1126. if err != nil {
  1127. return fmt.Errorf("while creating file: %w", err)
  1128. }
  1129. defer result.Close()
  1130. bytes, err := bare.Marshal(&i)
  1131. if err != nil {
  1132. return fmt.Errorf("while marshalling: %w", err)
  1133. }
  1134. _, err = result.Write(bytes)
  1135. if err != nil {
  1136. return fmt.Errorf("while writing: %w", err)
  1137. }
  1138. return nil
  1139. }
  1140. func deleteTxtFiles(c feedConverter) error {
  1141. return file.DeleteTxtFiles(c.TmpFeedPath, c.GtfsFilename)
  1142. }
  1143. func compressTraffic(c feedConverter) error {
  1144. return file.CompressBare(c.TmpFeedPath, c.GtfsFilename)
  1145. }
  1146. func deleteBareFiles(c feedConverter) error {
  1147. return file.DeleteBareFiles(c.TmpFeedPath)
  1148. }
  1149. func moveTraffic(c feedConverter) error {
  1150. if err := append(c.ValidFromError, c.ValidTillError...); len(err) != 0 {
  1151. return errors.Join(err...)
  1152. }
  1153. return file.MoveTraffic(c.GtfsFilename, c.ValidFrom.Format("20060102")+"_"+c.ValidTill.Format("20060102")+".txz", c.TmpFeedPath, c.HomeFeedPath)
  1154. }
  1155. func convert(input ...interface{}) (interface{}, error) {
  1156. allErrors := []error{}
  1157. args := input[0].(result)
  1158. for _, gtfsFile := range args.gtfsFilenames {
  1159. log.Printf("converting feed %s/%s\n", args.feed.Name(), gtfsFile)
  1160. r := gott2.R[feedConverter]{
  1161. S: feedConverter{
  1162. TmpFeedPath: args.tmpFeedPath,
  1163. GtfsFilename: gtfsFile,
  1164. Feed: args.feed,
  1165. HomeFeedPath: args.homeFeedPath,
  1166. feedTranslations: args.feedTranslations,
  1167. config: args.config,
  1168. },
  1169. LogLevel: gott2.Debug,
  1170. }
  1171. r = r.
  1172. Tee(unzipGtfs).
  1173. Tee(prepareFeedGtfs).
  1174. Tee(convertVehicles).
  1175. Bind(convertAgencies).
  1176. Bind(convertFeedInfo).
  1177. Tee(convertLuaScripts).
  1178. Bind(readTranslations).
  1179. Recover(recoverTranslations).
  1180. Bind(createTrafficCalendarFile).
  1181. Bind(convertCalendar).
  1182. Recover(recoverCalendar).
  1183. Bind(convertCalendarDates).
  1184. Recover(recoverCalendar).
  1185. Tee(checkAnyCalendarConverted).
  1186. Tee(saveSchedules).
  1187. Tee(saveFeedInfo).
  1188. Recover(closeTrafficCalendarFile).
  1189. // ---
  1190. Bind(readInputTripsIndex).
  1191. Bind(readInputStopsIndex).
  1192. Bind(readInputRoutesIndex).
  1193. Bind(convertDepartures).
  1194. Map(dropInputRoutesIndex).
  1195. Map(dropInputStopsIndex).
  1196. Map(dropInputTripsIndex).
  1197. // ---
  1198. Tee(sortTripsThroughStop).
  1199. Bind(readTripsThroughStopsIndex).
  1200. Bind(convertStops).
  1201. Map(dropInputRoutesIndex).
  1202. Map(dropInputStopsIndex).
  1203. Tee(writeTripIndex).
  1204. Map(clearTripOffsets).
  1205. Tee(writeStopNameIndex).
  1206. Tee(writeStopCodeIndex).
  1207. // ---
  1208. Bind(getTrips).
  1209. Bind(convertLineGraphs).
  1210. Map(clearStops).
  1211. Bind(convertLines).
  1212. Tee(writeLineIndex).
  1213. Tee(writeLineIdIndex).
  1214. Map(clearLineGraphs).
  1215. Map(clearLineHeadsigns).
  1216. Tee(deleteTxtFiles).
  1217. Tee(compressTraffic).
  1218. Tee(deleteBareFiles).
  1219. Tee(moveTraffic)
  1220. if r.E != nil {
  1221. log.Printf("Error converting %s: %v\n", args.feed.Name(), r.E)
  1222. allErrors = append(allErrors, r.E)
  1223. }
  1224. if err := append(r.S.ValidFromError, r.S.ValidTillError...); len(err) != 0 {
  1225. allErrors = append(allErrors, err...)
  1226. log.Printf("Error converting %s: %v\n", args.feed.Name(), errors.Join(err...))
  1227. }
  1228. }
  1229. if len(allErrors) > 0 {
  1230. return gott.Tuple{args}, errors.Join(allErrors...)
  1231. }
  1232. return gott.Tuple{args}, nil
  1233. }
  1234. func signal(input ...interface{}) (interface{}, error) {
  1235. args := input[0].(result)
  1236. if len(args.gtfsFilenames) > 0 && args.pid > 0 {
  1237. process, err := os.FindProcess(args.pid)
  1238. if err != nil {
  1239. return gott.Tuple{args}, err
  1240. }
  1241. err = process.Signal(syscall.SIGUSR1)
  1242. if err != nil {
  1243. return gott.Tuple{args}, err
  1244. }
  1245. }
  1246. return gott.Tuple{args}, nil
  1247. }
  1248. func openLastUpdated(input ...interface{}) (interface{}, error) {
  1249. args := input[0].(result)
  1250. updatesFilename := filepath.Join(args.config.FeedsPath, "updated.bare")
  1251. var err error
  1252. args.updatesFile, err = os.OpenFile(updatesFilename, os.O_RDWR|os.O_CREATE, 0644)
  1253. return gott.Tuple{args}, err
  1254. }
  1255. func isEmpty(input ...interface{}) error {
  1256. args := input[0].(result)
  1257. stat, err := os.Stat(args.updatesFile.Name())
  1258. if err != nil {
  1259. return err
  1260. }
  1261. if stat.Size() == 0 {
  1262. return ErrEmpty{}
  1263. }
  1264. return nil
  1265. }
  1266. func unmarshalLastUpdated(input ...interface{}) (interface{}, error) {
  1267. args := input[0].(result)
  1268. var lastUpdated map[string]string
  1269. err := bare.UnmarshalReader(args.updatesFile, &lastUpdated)
  1270. args.updates = lastUpdated
  1271. return gott.Tuple{args}, err
  1272. }
  1273. func recoverEmpty(input ...interface{}) (interface{}, error) {
  1274. args := input[0].(result)
  1275. err := input[1].(error)
  1276. var emptyError ErrEmpty
  1277. if errors.As(err, &emptyError) {
  1278. return gott.Tuple{args}, nil
  1279. } else {
  1280. return gott.Tuple{args}, err
  1281. }
  1282. }
  1283. func lastUpdated(input ...interface{}) interface{} {
  1284. args := input[0].(result)
  1285. args.updates[args.feed.String()] = time.Now().Format(time.RFC3339)
  1286. return gott.Tuple{args}
  1287. }
  1288. func seekLastUpdated(input ...interface{}) (interface{}, error) {
  1289. args := input[0].(result)
  1290. _, err := args.updatesFile.Seek(0, 0)
  1291. return gott.Tuple{args}, err
  1292. }
  1293. func marshalLastUpdated(input ...interface{}) error {
  1294. args := input[0].(result)
  1295. err := bare.MarshalWriter(bare.NewWriter(args.updatesFile), &args.updates)
  1296. args.updatesFile.Close()
  1297. return err
  1298. }
  1299. func Prepare(cfg config.Config, t Traffic, bimbaPid int, feedTranslations embed.FS) error { // todo(BAF18) remove pid
  1300. etags, err := readEtags(cfg)
  1301. if err != nil {
  1302. return fmt.Errorf("while reading etags: %w", err)
  1303. }
  1304. newEtags := map[string]string{}
  1305. for _, feed := range t.Feeds {
  1306. log.Printf("converting %s\n", feed.Name())
  1307. r := gott.Tuple{result{
  1308. config: cfg,
  1309. pid: bimbaPid,
  1310. tmpPath: os.TempDir(),
  1311. feed: feed,
  1312. feedName: feed.String(),
  1313. location: feed.getTimezone(),
  1314. updates: map[string]string{},
  1315. etags: etags,
  1316. newEtags: newEtags,
  1317. feedTranslations: feedTranslations,
  1318. }}
  1319. s, err := gott.NewResult(r).
  1320. SetLevelLog(gott.Debug).
  1321. Bind(createTmpPath).
  1322. Bind(createFeedHome).
  1323. Bind(listDownloadedVersions).
  1324. Bind(getAllVersions).
  1325. Map(findValidVersions).
  1326. Bind(getGtfsFiles).
  1327. Bind(convert).
  1328. Bind(signal).
  1329. Bind(openLastUpdated).
  1330. Tee(isEmpty).
  1331. Bind(unmarshalLastUpdated).
  1332. Recover(recoverEmpty).
  1333. Map(lastUpdated).
  1334. Bind(seekLastUpdated).
  1335. Tee(marshalLastUpdated).
  1336. Finish()
  1337. if err != nil {
  1338. log.Printf("Error converting %s: %v\n", feed.String(), err)
  1339. } else {
  1340. etags = s.(gott.Tuple)[0].(result).etags
  1341. newEtags = s.(gott.Tuple)[0].(result).newEtags
  1342. }
  1343. }
  1344. return saveEtags(cfg, newEtags)
  1345. }