Commit 826151cb authored by Gonçalo Leal's avatar Gonçalo Leal
Browse files

device controller in go

parent 64d93772
Showing with 647 additions and 100 deletions
+647 -100
......@@ -2,91 +2,137 @@ package main
import (
"encoding/json"
"flag"
"fmt"
"time"
"code.nap.av.it.pt/go-device/device"
"code.nap.av.it.pt/go-device/listener"
"code.nap.av.it.pt/go-device/sender"
"code.nap.av.it.pt/go-device/utils"
)
var regSuccess = make(chan bool, 1)
var deviceID string
// AudioController represents an audio device that can be started and stopped.
type AudioController struct {
device device.Device
}
func handleAudioMessage(msg map[string]interface{}) error {
// handleAudioMessage handles audio messages.
func handleAudioMessage(msg map[string]interface{}, args []interface{}) error {
// Print the received message.
fmt.Println("Received message:", msg)
// Handle the audio message.
// command, ok := args["command"].(string)
// if !ok {
// return fmt.Errorf("command not found or not a string")
// }
// switch command {
// case "play":
// fileName, ok := args["file_name"].(string)
// if !ok {
// return fmt.Errorf("file_name not found or not a string")
// }
// // Add logic to play the file
// fmt.Printf("Playing file: %s\n", fileName)
// case "pause":
// // Add logic to pause playback
// fmt.Println("Pausing playback")
// case "resume":
// // Add logic to resume playback
// fmt.Println("Resuming playback")
// case "stop":
// // Add logic to stop playback
// fmt.Println("Stopping playback")
// default:
// return fmt.Errorf("unknown command: %s", command)
// }
switch msg["address"] {
case fmt.Sprintf("/register/%s", deviceID):
// Cast the message argument to string.
result, ok := args[0].(string)
if !ok {
return fmt.Errorf("result not found or not a string")
}
// Check if the registration was successful.
if result == "success" {
regSuccess <- true
}
fmt.Println("Received registration result:", result)
case "/command/audio/":
// Unmarshal the message arguments.
var info map[string]interface{}
err := json.Unmarshal(args[0].([]byte), &info)
if err != nil {
fmt.Println("Error unmarshalling message arguments:", err)
return err
}
// Handle the audio message.
command, ok := info["action"].(string)
if !ok {
return fmt.Errorf("command not found or not a string")
}
switch command {
case "play":
fileName, ok := info["file"].(string)
if !ok {
return fmt.Errorf("file_name not found or not a string")
}
// Add logic to play the file
fmt.Printf("Playing file: %s\n", fileName)
case "pause":
// Add logic to pause playback
fmt.Println("Pausing playback")
case "resume":
// Add logic to resume playback
fmt.Println("Resuming playback")
case "stop":
// Add logic to stop playback
fmt.Println("Stopping playback")
default:
return fmt.Errorf("unknown command: %s", command)
}
}
return nil
}
func main() {
// Get config file from the command line parameters.
configFile := flag.String("config", "", "Path to the config file")
flag.Parse()
// Check if the config file is provided.
if *configFile == "" {
fmt.Println("Please provide a config file")
return
}
// Read config file to get device information.
config, err := utils.ReadConfigFile(*configFile)
if err != nil {
fmt.Println("Error reading config file:", err)
return
}
// Create a new audio device.
audioController := &AudioController{
// TODO: pass config to the NewDevice function
device: device.NewDevice(
device.Actuator,
listener.NewListener(listener.OSC, handleAudioMessage, "127.0.0.1:8000"),
sender.NewSender(sender.OSC, "localhost", 8002),
config,
listener.NewListener(listener.OSC, handleAudioMessage, fmt.Sprintf("%s:%d", config.DeviceIP, config.DevicePort)),
sender.NewSender(sender.OSC, config.ServerIP, config.ServerPort),
),
}
// Start the audio controller.
err := audioController.device.Start()
err = audioController.device.Start()
if err != nil {
fmt.Printf("Failed to start audio controller: %v\n", err)
return
}
fmt.Println("Audio controller started")
// Device info map
// TODO: Replace with actual device information (from config file).
infoMap := map[string]string{
"deviceID": "1",
"deviceType": "speaker",
"deviceName": "Speaker 1",
"deviceIP": "localhost",
"devicePort": "8002",
}
// Set the device ID.
deviceID = config.DeviceID
// Advertisement message.
infoJson, err := json.Marshal(infoMap)
// Sleep for a while to allow the audio controller to start.
time.Sleep(2 * time.Second)
// TODO: Add logic to handle registration and heartbeat messages.
// Register the device.
err = audioController.device.Register(regSuccess)
if err != nil {
fmt.Println("Error marshalling map to JSON:", err)
fmt.Printf("Failed to register device: %v\n", err)
return
}
audioController.device.Send(
"/test",
[]byte(string(infoJson)),
)
// TODO: Add logic to guarantee that the device is registered before sending heartbeat messages.
// Add a number of retries and a timeout for the registration process in the config.
for {
// Add logic to handle other operations.
......@@ -95,6 +141,9 @@ func main() {
time.Sleep(1 * time.Second)
// Send an OSC message to the tour controller device.
audioController.device.Send("/heartbeat", infoMap["deviceID"])
audioController.device.Send("/heartbeat", config.DeviceID)
}
// TODO: Add logic to handle stop signal and stop the audio controller.
// https://gobyexample.com/signals
}
package main
import (
"flag"
"fmt"
"time"
"code.nap.av.it.pt/go-device/device"
"code.nap.av.it.pt/go-device/listener"
"code.nap.av.it.pt/go-device/sender"
"code.nap.av.it.pt/go-device/utils"
)
func messageCallback(msg map[string]interface{}, args []interface{}) {
fmt.Println("Received message:", msg)
fmt.Println("Topic:", msg["address"])
}
func main() {
// Get config file from the command line parameters.
configFile := flag.String("config", "", "Path to the config file")
flag.Parse()
// Check if the config file is provided.
if *configFile == "" {
fmt.Println("Please provide a config file")
return
}
// Read config file to get device information.
config, err := utils.ReadConfigFile(*configFile)
if err != nil {
fmt.Println("Error reading config file:", err)
return
}
// Create device
audioDevice := device.NewDevice(
config,
listener.OSC,
sender.OSC,
messageCallback,
)
// Start the device
err = audioDevice.Start()
if err != nil {
fmt.Println("Error starting device:", err)
return
}
// Sleep for 2 seconds to allow the device to start.
time.Sleep(2 * time.Second)
// Register the device
err = audioDevice.Register()
if err != nil {
fmt.Println("Error registering device:", err)
return
}
// Wait until the device is registered
for !audioDevice.IsRegistered() {
time.Sleep(1 * time.Second)
}
// Loop lifecycle
for {
// Send heartbeat
err = audioDevice.SendHeartbeat()
if err != nil {
fmt.Println("Error sending heartbeat:", err)
return
}
// DO SOMETHING
// Sleep for 5 seconds
time.Sleep(5 * time.Second)
}
}
module audio_device
go 1.22.2
replace code.nap.av.it.pt/go-device => ../go-device
require code.nap.av.it.pt/go-device v0.0.0-00010101000000-000000000000 // indirect
{
"server_ip": "127.0.0.1",
"server_port": 8000,
"device_id": "audio_device_1",
"device_type": "actuator",
"device_family": "audio",
"device_name": "Audio Device 1",
"device_ip": "127.0.0.1",
"device_port": 9090,
"addresses": [
"/command/audio/*"
]
}
\ No newline at end of file
{
"server_ip": "127.0.0.1",
"server_port": 8000,
"device_id": "audio_device_2",
"device_type": "actuator",
"device_family": "audio",
"device_name": "Audio Device 2",
"device_ip": "127.0.0.1",
"device_port": 9091,
"addresses": [
"/command/audio/*"
]
}
\ No newline at end of file
{
"server_ip": "10.42.0.187",
"server_port": 9000,
"device_id": "controller_1",
"device_type": "actuator",
"device_family": "controller",
"device_name": "Tour Controller 1",
"device_ip": "10.42.0.1",
"device_port": 9000,
"addresses": [
]
}
\ No newline at end of file
package controller
type Controller interface {
Start() error
Stop() error
Send(topic string, message interface{}) error
SendTo(topic string, message interface{}, deviceID string) error
SendHeartbeat() error
}
func NewController() Controller {
return nil
}
......@@ -3,8 +3,14 @@
package device
import (
"encoding/json"
"errors"
"fmt"
"time"
"code.nap.av.it.pt/go-device/listener"
"code.nap.av.it.pt/go-device/sender"
"code.nap.av.it.pt/go-device/utils"
)
// TODO: Docs
......@@ -26,6 +32,19 @@ func (t DeviceType) String() string {
return deviceTypeNames[t]
}
func getDeviceType(deviceType string) (DeviceType, error) {
switch deviceType {
case "actuator":
return Actuator, nil
case "sensor":
return Sensor, nil
case "unity":
return Unity, nil
default:
return -1, errors.New("unknown device type")
}
}
// Device represents a device that can be started and stopped.
type Device interface {
// Start starts the device.
......@@ -34,24 +53,97 @@ type Device interface {
Stop() error
// Send sends a message to a topic.
Send(topic string, message interface{}) error
// Register registers the device.
Register() error
// IsRegistered returns true if the device is registered.
IsRegistered() bool
// SendHeartbeat sends a heartbeat message.
SendHeartbeat() error
}
type device struct {
Type DeviceType
Listener listener.Listener
Sender sender.Sender
Config utils.DeviceConfig
Type DeviceType
Listener listener.Listener
Sender sender.Sender
Callback utils.DeviceCallback
Heartbeat Heartbeat
}
type Heartbeat struct {
Registered bool
LastSent time.Time
LastRecv time.Time
}
func NewDevice(c utils.DeviceConfig,
listenerType listener.ListenerType, senderType sender.SenderType, f utils.DeviceCallback) Device {
devType, err := getDeviceType(c.DeviceType)
if err != nil {
// TODO: Log error
fmt.Println("Error getting device type:", err)
return nil
}
dev := &device{
Config: c,
Type: devType,
Callback: f,
Heartbeat: Heartbeat{
Registered: false,
},
}
l := listener.NewListener(listener.OSC, dev.msgHandler, fmt.Sprintf("%s:%d", c.DeviceIP, c.DevicePort))
s := sender.NewSender(sender.OSC, c.ServerIP, c.ServerPort)
dev.Listener = l
dev.Sender = s
return dev
}
func NewDevice(t DeviceType, l listener.Listener, s sender.Sender) Device {
return &device{Type: t, Listener: l, Sender: s}
func (d *device) msgHandler(msg map[string]interface{}, args []interface{}) {
fmt.Println("received message")
switch msg["address"] {
case fmt.Sprintf("/register/%s", d.Config.DeviceID):
// Cast the message argument to string.
result, ok := args[0].(string)
if !ok {
fmt.Println("result not found or not a string")
}
// Check if the registration was successful.
if result == "success" {
d.Heartbeat.Registered = true
}
fmt.Println("Received registration result:", result)
case "/heartbeat":
fmt.Println("Received heartbeat message")
d.Heartbeat.LastRecv = time.Now()
default:
d.Callback(msg, args)
}
}
func (d *device) Start() error {
err := d.Listener.Listen()
addresses := []string{"/register", "/heartbeat", "/command/audio/"}
err := d.Listener.Listen(addresses)
if err != nil {
return err
}
if d.Sender == nil {
return nil
}
return d.Sender.Connect()
}
......@@ -60,5 +152,43 @@ func (d *device) Stop() error {
}
func (d *device) Send(topic string, message interface{}) error {
if d.Sender == nil {
return errors.New("no sender available")
}
return d.Sender.Publish(topic, message)
}
func (d *device) Register() error {
// Device info map
infoMap := map[string]string{
"device_id": d.Config.DeviceID,
"device_type": d.Config.DeviceType,
"device_family": d.Config.DeviceFamily,
"device_name": d.Config.DeviceName,
"device_ip": d.Config.DeviceIP,
"device_port": fmt.Sprintf("%d", d.Config.DevicePort),
}
// Advertisement message.
infoJson, err := json.Marshal(infoMap)
if err != nil {
return fmt.Errorf("error marshalling map to JSON: %s", err)
}
d.Send(
"/register",
[]byte(string(infoJson)),
)
return nil
}
func (d *device) IsRegistered() bool {
return d.Heartbeat.Registered
}
func (d *device) SendHeartbeat() error {
// send heartbeat
return d.Send("/heartbeat", []byte("ola"))
}
package device
......@@ -22,7 +22,7 @@ func (t ListenerType) String() string {
type Listener interface {
// Listen starts the listener.
Listen() error
Listen(topics []string) error
// Close stops the listener.
Close() error
// Subscribe subscribes to a topic.
......
package osc
import (
"fmt"
"log"
"strings"
"code.nap.av.it.pt/go-device/utils"
"github.com/hypebeast/go-osc/osc"
......@@ -16,7 +18,7 @@ type OSCListener struct {
}
// Listen starts the OSC listener.
func (l *OSCListener) Listen() error {
func (l *OSCListener) Listen(addresses []string) error {
// Create a new OSC server.
// 1. Check if config is valid
......@@ -24,7 +26,6 @@ func (l *OSCListener) Listen() error {
// 2. Get addresses from config
// TODO: Get addresses from config
addresses := []string{"/test", "/heartbeat"}
// 3. Create a new dispatcher
dispatcher := osc.NewStandardDispatcher()
......@@ -34,12 +35,27 @@ func (l *OSCListener) Listen() error {
// create a message handler for the address
err := dispatcher.AddMsgHandler(
address, func(msg *osc.Message) {
args, err := utils.Mapify(msg)
// Create a map to store the message info
msg_info := make(map[string]interface{})
// Add the address to the msg_info
msg_info["address"] = msg.Address
// Add the number of arguments to the msg_info
msg_info["count"] = len(msg.Arguments)
// Add the argument types to the msg_info
types_str, err := msg.TypeTags()
if err != nil {
log.Fatalf("Error mapping OSC message: %v", msg)
fmt.Println("Error getting type tags", err)
return
}
l.HandlerFunc(args)
types := strings.Split(types_str, "")
msg_info["types"] = types[1:]
l.HandlerFunc(msg_info, msg.Arguments)
},
)
if err != nil {
......
package utils
import "fmt"
import (
"encoding/json"
"fmt"
"os"
)
type DeviceCallback func(args map[string]interface{}) error
type DeviceCallback func(msg map[string]interface{}, args []interface{})
type DeviceConfig struct {
ServerIP string `json:"server_ip"`
ServerPort int `json:"server_port"`
DeviceID string `json:"device_id"`
DeviceType string `json:"device_type"`
DeviceFamily string `json:"device_family"`
DeviceName string `json:"device_name"`
DeviceIP string `json:"device_ip"`
DevicePort int `json:"device_port"`
}
// Mapify converts an interface to a map with "field_name":value.
func Mapify(args interface{}) (map[string]interface{}, error) {
......@@ -14,3 +29,28 @@ func Mapify(args interface{}) (map[string]interface{}, error) {
return argsMap, nil
}
func ReadConfigFile(configFile string) (DeviceConfig, error) {
// Check if file exists
if _, err := os.Stat(configFile); os.IsNotExist(err) {
fmt.Println("Config file does not exist")
return DeviceConfig{}, err
}
// Read the config file
configData, err := os.ReadFile(configFile)
if err != nil {
fmt.Println("Error reading config file")
return DeviceConfig{}, err
}
// Unmarshal the config data
var config DeviceConfig
err = json.Unmarshal(configData, &config)
if err != nil {
fmt.Println("Error unmarshalling config data")
return DeviceConfig{}, err
}
return config, nil
}
......@@ -2,71 +2,244 @@ package main
import (
"encoding/json"
"flag"
"fmt"
"strconv"
"time"
"code.nap.av.it.pt/go-device/device"
"code.nap.av.it.pt/go-device/listener"
"code.nap.av.it.pt/go-device/sender"
"code.nap.av.it.pt/go-device/utils"
)
// type TourDevice struct {
// deviceID string
// deviceType string
// deviceName string
// deviceIP string
// devicePort int
// sender sender.Sender
// }
type TourDevice struct {
deviceID string
deviceType string
deviceName string
deviceIP string
devicePort int
sender sender.Sender
}
type TourController struct {
device device.Device
// devices []TourDevice
}
func handleMessage(msg map[string]interface{}) error {
fmt.Println("Received message:", msg)
return nil
var devices = make([]TourDevice, 0)
func handleMessage(msg map[string]interface{}, args []interface{}) {
switch msg["address"] {
case "/heartbeat":
// TODO: Implement heartbeat handling
fmt.Println("Received heartbeat from device", args[0])
case "/register":
// fmt.Println("Received test message with device info", string(args[0].([]byte)))
// Unmarshal the device info.
var deviceInfo map[string]interface{}
err := json.Unmarshal(args[0].([]byte), &deviceInfo)
if err != nil {
fmt.Println("Error unmarshalling device info:", err)
}
// Get and cast the device info.
deviceID, ok := deviceInfo["device_id"].(string)
if !ok {
fmt.Println("Device ID not found or not a string")
}
deviceType, ok := deviceInfo["device_type"].(string)
if !ok {
fmt.Println("Device type not found or not a string")
}
deviceName, ok := deviceInfo["device_name"].(string)
if !ok {
fmt.Println("Device name not found or not a string")
}
deviceIP, ok := deviceInfo["device_ip"].(string)
if !ok {
fmt.Println("Device IP not found or not a string")
}
devicePortStr, ok := deviceInfo["device_port"].(string)
if !ok {
fmt.Println("Device port not found or not a string")
}
devicePort, err := strconv.Atoi(devicePortStr)
if err != nil {
fmt.Println("Error converting device port to int:", err)
}
// Create a new sender
sender := sender.NewSender(sender.OSC, deviceIP, devicePort)
// Add the device to the list of devices.
devices = append(devices, TourDevice{
deviceID: deviceID,
deviceType: deviceType,
deviceName: deviceName,
deviceIP: deviceIP,
devicePort: devicePort,
sender: sender,
})
// Send a response to the device.
err = sender.Publish("/register/"+deviceID, "success")
if err != nil {
fmt.Println("Error sending response to device:", err)
}
fmt.Println("Device registered:", deviceID)
default:
fmt.Println("Unknown message:", msg)
}
}
func main() {
tourController := &TourController{
device: device.NewDevice(
device.Actuator,
listener.NewListener(listener.OSC, handleMessage, "127.0.0.1:8002"),
sender.NewSender(sender.OSC, "localhost", 8000),
),
// Get config file from the command line parameters.
configFile := flag.String("config", "", "Path to the config file")
flag.Parse()
// Check if the config file is provided.
if *configFile == "" {
fmt.Println("Please provide a config file")
return
}
// Start the tour controller.
err := tourController.device.Start()
// Read config file to get device information.
config, err := utils.ReadConfigFile(*configFile)
if err != nil {
fmt.Println("Error starting the tour controller:", err)
fmt.Println("Error reading config file:", err)
return
}
fmt.Println("Tour controller started")
// Send an OSC message.
commandMap := map[string]string{
"command": "play",
"file_name": "test.mp3",
}
// Create device
tourController := device.NewDevice(
config,
listener.OSC,
sender.OSC,
handleMessage,
)
commandJson, err := json.Marshal(commandMap)
// Start the device
err = tourController.Start()
if err != nil {
fmt.Println("Error marshalling command to JSON:", err)
fmt.Println("Error starting device:", err)
return
}
// for _, d := range tourController.devices {
// d.sender.Publish(
tourController.device.Send(
"/test",
[]byte(string(commandJson)),
)
// }
// Sleep for 2 seconds to allow the device to start.
time.Sleep(2 * time.Second)
// Keep the tour controller running.
for {
time.Sleep(1 * time.Millisecond)
fmt.Println("Main Loop")
// for len(devices) == 0 {
// fmt.Println("No devices registered yet")
// time.Sleep(1 * time.Second)
// }
// Show the menu.
// input := showMenu()
// for input != "0" {
// // Get message
// jsonData := getMessage(input)
// if jsonData == nil {
// continue
// }
// // Send command
// jsonDataBytes, err := json.Marshal(jsonData)
// if err != nil {
// fmt.Println("Error marshalling JSON data:", err)
// return
// }
// err = broadcast("/command/audio/", []byte(jsonDataBytes))
// if err != nil {
// fmt.Println("Error sending command:", err)
// }
// // Send heartbeat
// // Get the user input.
// input = showMenu()
// }
// Send heartbeat
err = tourController.SendHeartbeat()
if err != nil {
fmt.Println("Error sending heartbeat:", err)
return
}
time.Sleep(1 * time.Second)
}
}
func showMenu() string {
// Print a menu with the available commands.
fmt.Println("Available commands:")
fmt.Println("1. Play audio")
fmt.Println("2. Stop audio")
fmt.Println("3. Resume audio")
fmt.Println("4. Pause audio")
fmt.Println("0. Exit")
// Get the user input.
var input string
fmt.Print("Enter command: ")
fmt.Scanln(&input)
return input
}
func getMessage(input string) map[string]string {
jsonData := map[string]string{}
switch input {
case "1":
// Send the play command.
jsonData["action"] = "play"
jsonData["file"] = "audio.mp3"
case "2":
// Send the stop command.
jsonData["action"] = "stop"
case "3":
// Send the resume command.
jsonData["action"] = "resume"
case "4":
// Send the pause command.
jsonData["action"] = "pause"
default:
fmt.Println("Unknown command:", input)
return nil
}
return jsonData
}
func broadcast(address string, args_json []byte) error {
for _, d := range devices {
err := d.sender.Publish(address, args_json)
if err != nil {
return fmt.Errorf("error sending message to device %s: %v", d.deviceID, err)
}
}
return nil
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment