szczanieckiej.git

commit 70c064ce7bf719e36b022a3f39a5f7c6fc197213

Author: Adam Evyčędo <git@apiote.xyz>

get gtfs updates refactor (vehicles)

 gtfs_rt/main.go | 104 -------------------
 traffic/access.go | 115 ++++++--------------
 traffic/berlin_vbb.go | 3 
 traffic/brussels_stib_mivb.go | 3 
 traffic/errors/errors.go | 6 +
 traffic/feeds.go | 4 
 traffic/gzm_ztm.go | 3 
 traffic/krakow_ztp.go | 5 
 traffic/poznan_ztm.go | 3 
 traffic/realtime.go | 57 +++++++--
 traffic/realtime_gtfs.go | 198 +++++++++++++++++++++++++++++++++++++
 traffic/structs.go | 24 ++--
 traffic/structs_gen.go | 20 +++


diff --git a/gtfs_rt/main.go b/gtfs_rt/main.go
index 7cdedd16593fa6d81c1a671d2ef565d5690886f9..580561e7c941fbcbbf091b23b2c6244263291d89 100644
--- a/gtfs_rt/main.go
+++ b/gtfs_rt/main.go
@@ -4,13 +4,9 @@ import (
 	pb "apiote.xyz/p/szczanieckiej/gtfs_rt/transit_realtime"
 
 	"fmt"
-	"io/ioutil"
-	"net/http"
 	"time"
 
 	"golang.org/x/text/language"
-
-	"google.golang.org/protobuf/proto"
 )
 
 type Update struct { // TODO use traffic/Update
@@ -47,12 +43,8 @@ 	Cause        pb.Alert_Cause
 	Effect       pb.Alert_Effect
 }
 
-var cache map[string][]*pb.FeedMessage
-
 func gatherUpdates(entities []*pb.FeedEntity, updates *map[string]Update, alerts *Alerts) error {
 	for _, entity := range entities {
-		t := entity.TripUpdate
-		v := entity.Vehicle
 		a := entity.Alert
 		if a != nil && alerts != nil {
 			alert := Alert{
@@ -139,102 +131,6 @@ 			}
 
 			alerts.Alerts = append(alerts.Alerts, alert)
 		}
-		if t != nil && updates != nil {
-			up := *updates
-			u := up[*t.Trip.TripId]
-			u.TripUpdate = t
-			u.StopSeq = t.StopTimeUpdate[0].GetStopSequence()
-			u.StopID = t.StopTimeUpdate[0].GetStopId()
-			u.Delay = t.StopTimeUpdate[0].GetArrival().GetDelay()
-			if u.StopSeq == 0 && u.Delay < 0 {
-				u.Delay = 0
-			}
-			up[*t.Trip.TripId] = u
-			*updates = up
-		}
-		if v != nil && updates != nil {
-			up := *updates
-			u := up[*v.Trip.TripId]
-			u.Status = v.GetCurrentStatus()
-			u.VehicleID = *v.Vehicle.Id
-			u.Latitude = v.Position.GetLatitude()
-			u.Longitude = v.Position.GetLongitude()
-			u.Speed = v.Position.GetSpeed()
-			u.CongestionLevel = v.CongestionLevel
-			u.OccupancyStatus = v.OccupancyStatus
-			up[*v.Trip.TripId] = u
-			*updates = up
-		}
 	}
 	return nil
 }
-
-func GetMessages(previouslyUpdated uint64, feedName string, feedURLs []string) (uint64, []*pb.FeedMessage, error) {
-	now := uint64(time.Now().Unix())
-	lastUpdated := previouslyUpdated
-	updates := cache[feedName]
-	if now-previouslyUpdated > 30 {
-		if cache == nil {
-			cache = map[string][]*pb.FeedMessage{}
-		}
-		cache[feedName] = []*pb.FeedMessage{}
-		for _, url := range feedURLs {
-			client := http.Client{Timeout: 5 * time.Second}
-			response, err := client.Get(url)
-			if err != nil {
-				return lastUpdated, updates, fmt.Errorf("cannot download from ‘%s’: %w", url, err)
-			}
-			message := &pb.FeedMessage{}
-			bytes, err := ioutil.ReadAll(response.Body)
-			if err != nil {
-				return lastUpdated, updates, fmt.Errorf("cannot read response for ‘%s’: %w", url, err)
-			}
-			if err := proto.Unmarshal(bytes, message); err != nil {
-				return lastUpdated, updates, fmt.Errorf("Failed to parse message: %w", err)
-			}
-			cache[feedName] = append(cache[feedName], message)
-		}
-		lastUpdated = now
-		updates = cache[feedName]
-	}
-	return lastUpdated, updates, nil
-}
-
-func GetRt(previouslyUpdated uint64, feedUrls []string, feedName string) (map[string]Update, uint64, error) {
-	updates := map[string]Update{}
-
-	lastUpdated, _, err := GetMessages(previouslyUpdated, feedName, feedUrls)
-	if err != nil {
-		return updates, lastUpdated, fmt.Errorf("while getting messages: %w", err)
-	}
-
-	for _, message := range cache[feedName] {
-		err = gatherUpdates(message.Entity, &updates, nil)
-		if err != nil {
-			return updates, lastUpdated, fmt.Errorf("while gathering updates: %w", err)
-		}
-		timestamp := message.Header.GetTimestamp()
-		if timestamp < lastUpdated {
-			lastUpdated = timestamp
-		}
-	}
-	return updates, lastUpdated, nil
-}
-
-func GetAlerts(previouslyUpdated uint64, feedUrls []string, feedName string) (Alerts, uint64, error) {
-	alerts := Alerts{}
-
-	lastUpdated, _, err := GetMessages(previouslyUpdated, feedName, feedUrls)
-	if err != nil {
-		return alerts, lastUpdated, fmt.Errorf("while getting messages: %w", err)
-	}
-
-	for _, message := range cache[feedName] {
-		err = gatherUpdates(message.Entity, nil, &alerts)
-		if err != nil {
-			return alerts, lastUpdated, fmt.Errorf("while gathering updates: %w", err)
-		}
-	}
-
-	return alerts, lastUpdated, nil
-}




diff --git a/traffic/access.go b/traffic/access.go
index ea71eb9c5a6362db01faaad0790396fb20b47e99..6c4202848618de5e909e77fa5f62d0835f291bdd 100644
--- a/traffic/access.go
+++ b/traffic/access.go
@@ -3,7 +3,6 @@
 import (
 	"apiote.xyz/p/szczanieckiej/config"
 	"apiote.xyz/p/szczanieckiej/file"
-	"apiote.xyz/p/szczanieckiej/gtfs_rt"
 	traffic_errors "apiote.xyz/p/szczanieckiej/traffic/errors"
 	"apiote.xyz/p/szczanieckiej/transformers"
 
@@ -66,9 +65,6 @@ 	Trip       Trip
 	FeedInfo   FeedInfo
 }
 
-var lastUpdatedGtfsRt = map[string]uint64{}
-var lastUpdatedGtfsRt2 = map[string]map[string]uint64{}
-
 func isTimeout(err error) bool {
 	var e net.Error
 	return errors.As(err, &e) && e.Timeout()
@@ -99,28 +95,6 @@ 	if len(schedules) == 0 {
 		err = traffic_errors.NoSchedule{Date: date}
 	}
 	return schedules, err
-}
-
-func getRealtimeOffset(tripID string, stopSequence int,
-	feed Feed) (gtfs_rt.Update, error) {
-	updates, lastUpdated, err := gtfs_rt.GetRt(lastUpdatedGtfsRt[feed.String()],
-		feed.RealtimeFeeds(), feed.String())
-	if err != nil {
-		return gtfs_rt.Update{}, err
-	}
-	lastUpdatedGtfsRt[feed.String()] = lastUpdated
-	update := updates[tripID]
-	return update, nil
-}
-
-func getRealtimeUpdates(feed Feed) (map[string]gtfs_rt.Update, error) {
-	updates, lastUpdated, err := gtfs_rt.GetRt(lastUpdatedGtfsRt[feed.String()],
-		feed.RealtimeFeeds(), feed.String())
-	if err != nil {
-		return map[string]gtfs_rt.Update{}, err
-	}
-	lastUpdatedGtfsRt[feed.String()] = lastUpdated
-	return updates, nil
 }
 
 func calculateGtfsTime(gtfsTime uint, delay int32, date time.Time,
@@ -294,11 +268,17 @@ 			}
 		}
 		departures = append(departures, departure)
 	}
-	result.Departures = enrichDepartures(departures, result.Datetime, result.DeparturesType, result.Ctx, result.TripsFile, result.Location)
-	result.TripsFile.Close()
+	result.Departures = departures
 	return result, nil
 }
 
+func makeDeparturesRealtime(input ...interface{}) interface{} {
+	result := input[0].(_Result)
+	result.Departures = enrichDepartures(result.Departures, result.Datetime, result.DeparturesType, result.Ctx, result.TripsFile, result.Location)
+	result.TripsFile.Close()
+	return result
+}
+
 func getDeparture(date time.Time, result _Result, order StopOrder,
 	trip Trip, feed Feed, timedOut bool) (DepartureRealtime, error) {
 	found := false
@@ -329,10 +309,7 @@ 	return departureRt, finalErr
 }
 
 func GetTimeWithDelay(departure DepartureRealtime) time.Time {
-	var delay int
-	if departure.Update.Delay != nil {
-		delay = int(*departure.Update.Delay)
-	}
+	delay := int(departure.Update.Delay)
 	return departure.Time.Add(time.Duration(delay) * time.Nanosecond)
 }
 
@@ -729,6 +706,7 @@ 		Bind(unmarshalStop).
 		Bind(openTripsFile).
 		Bind(readTrips).
 		Bind(getDepartures).
+		Map(makeDeparturesRealtime).
 		Map(filterDepartures).
 		Map(filterDeparturesByLine).
 		Map(sortDepartures).
@@ -1160,38 +1138,11 @@ 	err = file.CleanOldVersions(FeedPath(cfg, feed), validVersionsMap)
 	return err
 }
 
-func convertVehicle(update gtfs_rt.Update, context Context, t *Traffic) (VehicleStatus, error) {
-	vehicles := t.Vehicles[context.FeedID][context.Version]
-	tripID := update.TripUpdate.GetTrip().GetTripId()
-	trip, err := GetTrip(tripID, context, t)
-	if err != nil {
-		return VehicleStatus{}, fmt.Errorf("while converting vehicle: %w", err)
-	}
-	return VehicleStatus{
-		Id:           update.VehicleID,
-		Position:     Position{float64(update.Latitude), float64(update.Longitude)},
-		Capabilities: vehicles[update.VehicleID].Capabilities,
-		Speed:        update.Speed,
-		Headsign:     trip.Headsign,
-		LineName:     trip.LineName,
-	}, nil
-}
+func GetStopsIn(lb, rt Position, context Context, traffic *Traffic) ([]Stop, error) {
+	// TODO limit rect size
+	limit := 100.0
 
-func GetVehicle(tripID string, context Context, t *Traffic) (VehicleStatus, error) {
-	vehicle := VehicleStatus{}
-	update, err := getRealtimeOffset(tripID, 0, t.Feeds[context.FeedID])
-	if err != nil {
-		return vehicle, fmt.Errorf("while getting realtime update: %w", err)
-	}
-	if update.TripUpdate == nil {
-		return vehicle, fmt.Errorf("empty realtime update")
-	}
-	return convertVehicle(update, context, t)
-}
-
-func GetStopsIn(lb, rt Position, context Context, traffic *Traffic) ([]Stop, error) {
-	// todo limit rect size
-	// todo does it take into account rect 179 -> -179 latitude?
+	// TODO does it take into account rect 179 -> -179 latitude?
 	stops := []Stop{}
 	positionIndex := traffic.PositionIndexes[context.FeedID][context.Version]
 	codeIndex := traffic.CodeIndexes[context.FeedID][context.Version]
@@ -1199,6 +1150,11 @@ 	rect, err := rtreego.NewRectFromPoints(rtreego.Point{lb.Lat, lb.Lon}, rtreego.Point{rt.Lat, rt.Lon})
 	if err != nil {
 		return stops, fmt.Errorf("while creating a rect: %w", err)
 	}
+
+	if rect.Size() > limit {
+		return stops, traffic_errors.RectTooBigError{}
+	}
+
 	spatials := positionIndex.SearchIntersect(rect)
 	for _, spatial := range spatials {
 		stop, err := getStopByOffset(codeIndex[spatial.(Stop).Code], context, traffic)
@@ -1211,32 +1167,33 @@ 	return stops, nil
 }
 
 func GetVehiclesIn(lb, rt Position, context Context, t *Traffic) ([]VehicleStatus, error) {
-	// todo limit rect size
+	// TODO limit rect size
+	limit := 100.0
+
+	vehiclesRt := getVehiclePositions(context)
 	vehicles := []VehicleStatus{}
-	updates, err := getRealtimeUpdates(t.Feeds[context.FeedID])
+
+	rect, err := rtreego.NewRectFromPoints(rtreego.Point{lb.Lat, lb.Lon}, rtreego.Point{rt.Lat, rt.Lon})
 	if err != nil {
-		return vehicles, err
+		return vehicles, fmt.Errorf("while creating a rect: %w", err)
 	}
-	for _, update := range updates {
-		if rt.Lon < float64(update.Longitude) || lb.Lon > float64(update.Longitude) {
+
+	if rect.Size() > limit {
+		return vehicles, traffic_errors.RectTooBigError{}
+	}
+
+	for _, vehicleRt := range vehiclesRt {
+		if rt.Lon < float64(vehicleRt.Longitude) || lb.Lon > float64(vehicleRt.Longitude) {
 			continue
 		}
-		lat := float64(update.Latitude)
+		lat := float64(vehicleRt.Latitude)
 		if lb.Lat < rt.Lat {
 			if lb.Lat < lat && lat < rt.Lat {
-				vehicle, err := convertVehicle(update, context, t)
-				if err != nil {
-					return vehicles, fmt.Errorf("while converting vehicle: %w", err)
-				}
-				vehicles = append(vehicles, vehicle)
+				vehicles = append(vehicles, vehicleRt)
 			}
 		} else {
 			if lat > lb.Lat || lat < rt.Lat {
-				vehicle, err := convertVehicle(update, context, t)
-				if err != nil {
-					return vehicles, fmt.Errorf("while converting vehicle: %w", err)
-				}
-				vehicles = append(vehicles, vehicle)
+				vehicles = append(vehicles, vehicleRt)
 			}
 		}
 	}




diff --git a/traffic/berlin_vbb.go b/traffic/berlin_vbb.go
index cf364a5bc7e1067c8557f5c13d12f0e9d366e76f..f9d1361ba14db0da641ceebe89b7ca3f087050b0 100644
--- a/traffic/berlin_vbb.go
+++ b/traffic/berlin_vbb.go
@@ -35,8 +35,9 @@ 	return nil
 }
 
 func (z VbbBerlin) GetVersions(date time.Time) ([]Version, error) {
+	location, _ := time.LoadLocation("Europe/Berlin")
 	versions := []Version{}
-	version, err := MakeVersion("00010101_99991231", z.getLocation())
+	version, err := MakeVersion("00010101_99991231", location)
 	if err != nil {
 		return nil, err
 	}




diff --git a/traffic/brussels_stib_mivb.go b/traffic/brussels_stib_mivb.go
index ddb6f697177d5e32851ce37ce458ee69323fcb47..172b1c07aefcff1fe28e51c4a9c918752d30a8f3 100644
--- a/traffic/brussels_stib_mivb.go
+++ b/traffic/brussels_stib_mivb.go
@@ -46,7 +46,8 @@ 	decoder.Decode(&versionsFeed)
 	updated, _ := time.Parse(string(versionsFeed.Entry[0].Updated), "2006-01-02T15:04:05-07:00")
 	validityString := updated.Format("20060102") + "_99991231"
 
-	version, err := MakeVersion(validityString, z.getLocation())
+	location, _ := time.LoadLocation("Europe/Brussels")
+	version, err := MakeVersion(validityString, location)
 	if err != nil {
 		return nil, err
 	}




diff --git a/traffic/errors/errors.go b/traffic/errors/errors.go
index 1cda2ca506e764794a69c0f6b6a37fafdf222581..a78644ce8765774330603582359feaab4c2195b7 100644
--- a/traffic/errors/errors.go
+++ b/traffic/errors/errors.go
@@ -36,3 +36,9 @@
 func (e NoVersionError) Error() string {
 	return "No version for " + e.Date
 }
+
+type RectTooBigError struct{}
+
+func (RectTooBigError) Error() string {
+	return "Rectangle too big"
+}




diff --git a/traffic/feeds.go b/traffic/feeds.go
index 21f3a2b47a2fff9e502eef471017f930eac4ba8b..42bb9b07399071b8fe8c0c4687face8c71be0f3a 100644
--- a/traffic/feeds.go
+++ b/traffic/feeds.go
@@ -15,9 +15,9 @@ )
 
 type Feed interface {
 	fmt.Stringer
-	ConvertVehicles(string) error
+	ConvertVehicles(string) error // TODO return []Vehicle -> save to file in convert()
 	GetVersions(time.Time) ([]Version, error)
-	RealtimeFeeds() []string
+	RealtimeFeeds() []string // TODO map[RealtimeFeedType]string
 	Transformer() transform.Transformer
 	Name() string
 	Flags() FeedFlags




diff --git a/traffic/gzm_ztm.go b/traffic/gzm_ztm.go
index ee4723ab7398a0e7fa712ea3c1401f9afa933bda..4783876530e2a1ca886a0df9f83bd45e04ed66ef 100644
--- a/traffic/gzm_ztm.go
+++ b/traffic/gzm_ztm.go
@@ -128,9 +128,10 @@
 	regex, err := regexp.Compile("https://otwartedane.metropoliagzm.pl/dataset/86b5ce0c-daea-4b40-bc60-af2c80477d21/resource/[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}/download/schedule_([0-9]{4}).([0-9]{2}).([0-9]{2})_[0-9]{13}_[0-9]{4}.ext_gtfs.zip")
 	urls := regex.FindAllStringSubmatch(string(doc), -1)
 
+	location, _ := time.LoadLocation("Europe/Warsaw")
 	versions := []Version{}
 	for _, u := range urls {
-		version, err := MakeVersion(u[1]+u[2]+u[3]+"_99991231", z.getLocation())
+		version, err := MakeVersion(u[1]+u[2]+u[3]+"_99991231", location)
 		if err != nil {
 			return nil, err
 		}




diff --git a/traffic/krakow_ztp.go b/traffic/krakow_ztp.go
index c374bc2882453689c84f63e2a83f4b8f6391a5ec..f8e12f9e793254f8985b289eb229570998df6b6e 100644
--- a/traffic/krakow_ztp.go
+++ b/traffic/krakow_ztp.go
@@ -30,8 +30,9 @@ 	return nil
 }
 
 func (f ZtpKrakow) GetVersions(date time.Time) ([]Version, error) {
-	startDate := time.Date(date.Year(), date.Month(), date.Day(), 0, 0, 0, 0, f.getLocation())
-	endDate := time.Date(date.Year(), date.Month(), date.Day(), 23, 59, 59, 0, f.getLocation())
+	location, _ := time.LoadLocation("Europe/Warsaw")
+	startDate := time.Date(date.Year(), date.Month(), date.Day(), 0, 0, 0, 0, location)
+	endDate := time.Date(date.Year(), date.Month(), date.Day(), 23, 59, 59, 0, location)
 	v := Version{
 		Link:      "https://gtfs.ztp.krakow.pl/GTFS_KRK_A.zip",
 		ValidFrom: startDate,




diff --git a/traffic/poznan_ztm.go b/traffic/poznan_ztm.go
index c6ddfe6a9b6264c29723716fab1e1409515f5dc7..f41f666294444f726c8c7cca135bbdc085d5260e 100644
--- a/traffic/poznan_ztm.go
+++ b/traffic/poznan_ztm.go
@@ -133,9 +133,10 @@ 		versionsSet[v] = struct{}{}
 	}
 
 	versions := []Version{}
+	location, _ := time.LoadLocation("Europe/Warsaw")
 	for v := range versionsSet {
 		validityString := strings.Replace(v, ".zip", "", 1)
-		version, err := MakeVersion(validityString, z.getLocation())
+		version, err := MakeVersion(validityString, location)
 		if err != nil {
 			return nil, err
 		}




diff --git a/traffic/realtime.go b/traffic/realtime.go
index 5cfe2dd50682df9e763c0b092618d320f669637b..ecfbbe902b7a44a3abec303fdbd77335c325226b 100644
--- a/traffic/realtime.go
+++ b/traffic/realtime.go
@@ -13,7 +13,8 @@
 type TimetableRelationship uint
 
 const (
-	TRIP_SCHEDULED TimetableRelationship = iota
+	NOT_REALTIME TimetableRelationship = iota
+	TRIP_SCHEDULED
 	TRIP_CANCELED
 	TRIP_DELETED
 	TRIP_ADDED
@@ -24,16 +25,19 @@
 type Update struct {
 	StopSequence          uint32
 	StopID                string
-	Delay                 *int32                // seconds, if nil -> unknown
+	Delay                 int32                 // seconds
 	TimetableRelationship TimetableRelationship // TODO better name
-	Status                DepartureStatus
-	CongestionLevel       CongestionLevel
-	OccupancyStatus       OccupancyStatus
-	VehicleID             string
-	Latitude              float64
-	Longitude             float64
-	Speed                 float32
-	// TODO …
+	VehicleStatus         VehicleStatus
+}
+type VehicleStatus struct {
+	Status          DepartureStatus
+	CongestionLevel CongestionLevel
+	OccupancyStatus OccupancyStatus
+	VehicleID       string
+	Latitude        float64
+	Longitude       float64
+	Speed           float32 // m/s
+	Bearing         float64 // radians clockwise from north // TODO maybe (-π, π)
 }
 
 type CongestionLevel uint
@@ -62,8 +66,8 @@
 type DepartureStatus uint
 
 const (
-	AT_STOP DepartureStatus = iota
-	DEPARTED
+	IN_TRANSIT DepartureStatus = iota
+	AT_STOP
 	INCOMING
 )
 
@@ -87,6 +91,7 @@ }
 
 var updates map[string]Update
 var alerts Alerts
+var vehicleStatuses map[string]VehicleStatus
 
 // TODO vehicles cache
 
@@ -103,7 +108,7 @@ 	}
 	return trip.Id, nil
 }
 
-func enrichDepartures(departures []DepartureRealtime, datetime time.Time, departuresType DeparturesType, ctx Context, tripsFile *os.File, timezone *time.Location) []DepartureRealtime {
+func enrichDepartures(departures []DepartureRealtime, datetime time.Time, departuresType DeparturesType, ctx Context, tripsFile *os.File, timezone *time.Location) []DepartureRealtime { // TODO tripsFile -> map[tripOffset]tripID
 	enrichedDepartures := make([]DepartureRealtime, len(departures))
 
 	feedInfo, err := getFeedInfo(ctx.DataHome, ctx.FeedID, ctx.Version)
@@ -113,7 +118,7 @@ 		return departures
 	}
 
 	var enrichMethod func(string, int, Context) (Update, error)
-	if _, ok := feedInfo.RealtimeFeeds["updates"]; ok {
+	if _, ok := feedInfo.RealtimeFeeds[TRIP_UPDATES]; ok {
 		enrichMethod = getGtfsRealtimeUpdates
 	} else {
 		// TODO enrichMethod = getLuaRealtieUpdates
@@ -148,6 +153,26 @@ func GetAlerts(languages []language.Tag /*, feed*/) []Alert {
 	return []Alert{}
 }
 
-func GetVehiclePositions(position Position) []VehicleStatus {
-	return []VehicleStatus{}
+func getVehiclePositions(ctx Context) []VehicleStatus {
+	feedInfo, err := getFeedInfo(ctx.DataHome, ctx.FeedID, ctx.Version)
+	if err != nil {
+		log.Printf("while getting feedInfo: %v\n", err)
+		return []VehicleStatus{}
+	}
+
+	var function func(Context) ([]VehicleStatus, error)
+	if _, ok := feedInfo.RealtimeFeeds[VEHICLE_POSITIONS]; ok {
+		function = getGtfsRealtimeVehicles
+	} else {
+		// TODO enrichMethod = getLuaVehicles
+	}
+	statuses, err := function(ctx)
+	if err != nil {
+		if isTimeout(err) {
+			return []VehicleStatus{}
+		} else {
+			log.Printf("while getting vehicle positions: %v\n", err)
+		}
+	}
+	return statuses
 }




diff --git a/traffic/realtime_gtfs.go b/traffic/realtime_gtfs.go
new file mode 100644
index 0000000000000000000000000000000000000000..94a8350131bb232277698e15ab97d7af66a8e230
--- /dev/null
+++ b/traffic/realtime_gtfs.go
@@ -0,0 +1,198 @@
+package traffic
+
+import (
+	"math"
+
+	"apiote.xyz/p/szczanieckiej/gtfs_rt"
+	pb "apiote.xyz/p/szczanieckiej/gtfs_rt/transit_realtime"
+
+	"fmt"
+	"time"
+)
+
+var lastUpdatedGtfsRt = map[string]map[RealtimeFeedType]uint64{}
+
+func makeTimetableRelationshipFromTripTimetable(r pb.TripDescriptor_ScheduleRelationship) TimetableRelationship {
+	switch r {
+	case pb.TripDescriptor_ADDED:
+		return TRIP_ADDED
+	case pb.TripDescriptor_CANCELED:
+		return TRIP_CANCELED
+	/* TODO update pb schema
+	case pb.TripDescriptor_DELETED:
+		return TRIP_DELETED*/
+	default:
+		return TRIP_SCHEDULED
+	}
+}
+
+func makeTimetableRelationshipFromStopTrip(r pb.TripUpdate_StopTimeUpdate_ScheduleRelationship) TimetableRelationship {
+	switch r {
+	case pb.TripUpdate_StopTimeUpdate_NO_DATA:
+		return NO_DATA
+	case pb.TripUpdate_StopTimeUpdate_SKIPPED:
+		return STOP_SKIPPED
+	default:
+		return TRIP_SCHEDULED
+	}
+}
+
+func makeDepartureStatus(s pb.VehiclePosition_VehicleStopStatus) DepartureStatus {
+	switch s {
+	case pb.VehiclePosition_STOPPED_AT:
+		return AT_STOP
+	case pb.VehiclePosition_INCOMING_AT:
+		return INCOMING
+	default:
+		return IN_TRANSIT
+	}
+}
+
+func makeCongestionLevel(l pb.VehiclePosition_CongestionLevel) CongestionLevel {
+	switch l {
+	case pb.VehiclePosition_RUNNING_SMOOTHLY:
+		return CONGESTION_SMOOTH
+	case pb.VehiclePosition_STOP_AND_GO:
+		return CONGESTION_STOP_AND_GO
+	case pb.VehiclePosition_CONGESTION:
+		return CONGESTION_SIGNIFICANT
+	case pb.VehiclePosition_SEVERE_CONGESTION:
+		return CONGESTION_SEVERE
+	default:
+		return CONGESTION_UNKNOWN
+	}
+}
+
+func makeOccupancyStatus(s pb.VehiclePosition_OccupancyStatus) OccupancyStatus {
+	switch s {
+	case pb.VehiclePosition_EMPTY:
+		return OCCUPANCY_EMPTY
+	case pb.VehiclePosition_MANY_SEATS_AVAILABLE:
+		return OCCUPANCY_MANY_AVAILABLE
+	case pb.VehiclePosition_FEW_SEATS_AVAILABLE:
+		return OCCUPANCY_FEW_AVAILABLE
+	case pb.VehiclePosition_STANDING_ROOM_ONLY:
+		return OCCUPANCY_STANDING_ONLY
+	case pb.VehiclePosition_CRUSHED_STANDING_ROOM_ONLY:
+		return OCCUPANCY_CRUSHED
+	case pb.VehiclePosition_FULL:
+		return OCCUPANCY_FULL
+	case pb.VehiclePosition_NOT_ACCEPTING_PASSENGERS:
+		return OCCUPANCY_NOT_ACCEPTING
+	default:
+		return OCCUPANCY_UNKNOWN
+	}
+}
+
+func getGtfsRtData(entities []*pb.FeedEntity) map[RealtimeFeedType]int {
+	if updates == nil {
+		updates = map[string]Update{}
+	}
+	if vehicleStatuses == nil {
+		vehicleStatuses = map[string]VehicleStatus{}
+	}
+	which := map[RealtimeFeedType]int{}
+	for _, entity := range entities {
+		// TODO a := entity.Alert
+		v := entity.Vehicle
+		t := entity.TripUpdate
+
+		if v != nil {
+			which[VEHICLE_POSITIONS] = 1
+			u := updates[*v.Trip.TripId]
+
+			vehicleUpdate := VehicleStatus{
+				Status:          makeDepartureStatus(v.GetCurrentStatus()),
+				CongestionLevel: makeCongestionLevel(v.GetCongestionLevel()),
+				OccupancyStatus: makeOccupancyStatus(v.GetOccupancyStatus()),
+				VehicleID:       v.GetVehicle().GetId(),
+				Latitude:        float64(v.GetPosition().GetLatitude()),
+				Longitude:       float64(v.GetPosition().GetLongitude()),
+				Speed:           v.GetPosition().GetSpeed(),
+				Bearing:         float64(v.GetPosition().GetBearing() * math.Pi / 180),
+			}
+
+			vehicleStatuses[*v.Trip.TripId] = vehicleUpdate
+			u.VehicleStatus = vehicleUpdate
+			updates[*v.Trip.TripId] = u
+		}
+
+		if t != nil {
+			which[TRIP_UPDATES] = 1
+
+			update := Update{
+				StopSequence: t.StopTimeUpdate[0].GetStopSequence(),
+				StopID:       t.StopTimeUpdate[0].GetStopId(),
+			}
+			if arrival := t.StopTimeUpdate[0].GetArrival(); arrival != nil {
+				update.Delay = *arrival.Delay
+			}
+			if update.StopSequence == 0 && update.Delay < 0 {
+				update.Delay = 0
+			}
+
+			stopTripRelationship := t.StopTimeUpdate[0].GetScheduleRelationship()
+			if stopTripRelationship == pb.TripUpdate_StopTimeUpdate_SCHEDULED {
+				tripTimetableRelationship := t.Trip.GetScheduleRelationship()
+				update.TimetableRelationship = makeTimetableRelationshipFromTripTimetable(tripTimetableRelationship)
+			} else {
+				update.TimetableRelationship = makeTimetableRelationshipFromStopTrip(stopTripRelationship)
+			}
+
+			updates[*t.Trip.TripId] = update
+		}
+	}
+
+	return which
+}
+
+func getGtfsRealtimeMessages(feedType RealtimeFeedType, feedID string, feeds map[RealtimeFeedType]string) error {
+	now := uint64(time.Now().Unix())
+	lastUpdated := lastUpdatedGtfsRt[feedID]
+	if passed := now - lastUpdated[feedType]; passed > 30 {
+		message, err := gtfs_rt.GetMessages2(feedID, feeds[feedType])
+		if err != nil {
+			return fmt.Errorf("while getting messages: %w\n", err)
+		}
+
+		whichUpdated := getGtfsRtData(message.Entity)
+		for key, value := range whichUpdated {
+			if value == 1 {
+				lastUpdated[key] = now
+			}
+		}
+	}
+	return nil
+}
+
+func getGtfsRealtimeUpdates(tripID string, sequence int, ctx Context) (Update, error) {
+	feedInfo, err := getFeedInfo(ctx.DataHome, ctx.FeedID, ctx.Version)
+	if err != nil {
+		return Update{}, fmt.Errorf("while getting feedInfo: %w\n", err)
+	}
+
+	getGtfsRealtimeMessages(TRIP_UPDATES, ctx.FeedID, feedInfo.RealtimeFeeds)
+	if _, ok := feedInfo.RealtimeFeeds[VEHICLE_POSITIONS]; ok {
+		// TODO should be moved to enrichDepartures and conditional (this, or custom API)
+		getGtfsRealtimeMessages(VEHICLE_POSITIONS, ctx.FeedID, feedInfo.RealtimeFeeds)
+	}
+
+	update := updates[tripID]
+	return update, nil
+}
+
+func getGtfsRealtimeVehicles(ctx Context) ([]VehicleStatus, error) {
+	feedInfo, err := getFeedInfo(ctx.DataHome, ctx.FeedID, ctx.Version)
+	if err != nil {
+		return []VehicleStatus{}, fmt.Errorf("while getting feedInfo: %w\n", err)
+	}
+	getGtfsRealtimeMessages(VEHICLE_POSITIONS, ctx.FeedID, feedInfo.RealtimeFeeds)
+
+	vehicles := make([]VehicleStatus, len(vehicleStatuses))
+	i := 0
+	for _, status := range vehicleStatuses {
+		vehicles[i] = status
+		i++
+	}
+	return vehicles, nil
+}




diff --git a/traffic/structs.go b/traffic/structs.go
index 4ade2c55774e6f3b37b767d464221f1533909872..6cdd0a9e773a966bbad232858337ad216f86f471 100644
--- a/traffic/structs.go
+++ b/traffic/structs.go
@@ -39,18 +39,18 @@ 	Website  string
 	Language string // todo(BAF10) language.Tag
 }*/
 
-type VehicleStatus struct { // todo(BAF10) two types of vehicles — descriptions ID-Capabilities, and an actual vehicle that runs on a trip
-	Id           string
-	Capabilities uint16
-	Position     Position
-	Speed        float32
-	Delay        int32 // todo(BAF31)
-	LineName     string
-	Headsign     string
-	// Status
-	// Occupancy
-	// Congestion
-}
+// type VehicleStatus struct { // todo(BAF10) two types of vehicles — descriptions ID-Capabilities, and an actual vehicle that runs on a trip
+// 	Id           string
+// 	Capabilities uint16
+// 	Position     Position
+// 	Speed        float32
+// 	Delay        int32 // todo(BAF31)
+// 	LineName     string
+// 	Headsign     string
+// 	// Status
+// 	// Occupancy
+// 	// Congestion
+// }
 
 func (v VehicleStatus) Location() Position {
 	return v.Position




diff --git a/traffic/structs_gen.go b/traffic/structs_gen.go
index 9c820b5ba44a5dad4aa4528220a6146c956edfc9..8dc02eb74541f35ccb516be8aec3899c142bf651 100644
--- a/traffic/structs_gen.go
+++ b/traffic/structs_gen.go
@@ -393,6 +393,26 @@ 	}
 	panic(errors.New("Invalid LineType value"))
 }
 
+type RealtimeFeedType uint
+
+const (
+	TRIP_UPDATES      RealtimeFeedType = 0
+	VEHICLE_POSITIONS RealtimeFeedType = 1
+	ALERTS            RealtimeFeedType = 2
+)
+
+func (t RealtimeFeedType) String() string {
+	switch t {
+	case TRIP_UPDATES:
+		return "TRIP_UPDATES"
+	case VEHICLE_POSITIONS:
+		return "VEHICLE_POSITIONS"
+	case ALERTS:
+		return "ALERTS"
+	}
+	panic(errors.New("Invalid RealtimeFeedType value"))
+}
+
 type QRLocation uint
 
 const (