From 431d215eee5edf69c038ae57d273dba6baf6b9eb Mon Sep 17 00:00:00 2001 From: sergystepanov Date: Mon, 9 Aug 2021 10:42:06 +0300 Subject: [PATCH] Update server HTTPS configuration (#337) * Merge HTTP and HTTPS routes builder in coordinator * Extract HTTP/S routes * Rename config in coordinator * Generalize child services * Extract games library helper function * Use string address instead of port for HTTP/S * Use a dedicated port extractor function * Add missing GA tag templating * Rename shared server address and port params * Introduce TLS config parameters * Simplify HTTP/S server constructors * Update server auto port roll * Extract init function in worker * Reorder return params of address type port fn * Refactor config handler names * Update TLS default config params * Extract HTTP to HTTPS redirect function * Use httpx in monitoring * Don't log echo requests * Remove error return from the abstract server * Add WSS option to the worker-coordinator connection * Change default worker config * Make worker send its internal connection params * Decouple gamelib from the coordinator * Expose HTTP/S listener address * Keep original HTTP/S addresses * Remove no config handler in worker * Use HTTP-HTTPS redirection * Wrap net.Listener into a struct * Clean http server creation fn * Redirect to https with a generated address * Use URL in the redirector * Use zone address param in worker * Make use of actual addr and port in the monitoring servers * Use auto-justified monitoring addresses info * Add the non-HTTPS worker to HTTPS coordinator connection warning * Embed TLS struct * Move connection API struct into cws package --- cmd/coordinator/main.go | 34 ++---- cmd/worker/main.go | 27 +---- configs/config.yaml | 45 +++++--- pkg/config/coordinator/config.go | 12 +- pkg/config/emulator/config.go | 6 +- pkg/config/monitoring/config.go | 4 +- pkg/config/shared/config.go | 28 +++-- pkg/config/worker/config.go | 57 +++++---- pkg/coordinator/coordinator.go | 165 +++----------------------- pkg/coordinator/handlers.go | 67 ++++------- pkg/coordinator/http.go | 47 ++++++++ pkg/coordinator/internalhandlers.go | 24 ++-- pkg/coordinator/routes.go | 1 - pkg/coordinator/worker.go | 2 +- pkg/cws/api/coordinator.go | 23 ++-- pkg/games/game_library.go | 16 ++- pkg/games/game_library_test.go | 2 +- pkg/monitoring/monitoring.go | 173 ++++++++++++++-------------- pkg/network/httpx/address.go | 40 +++++++ pkg/network/httpx/address_test.go | 47 ++++++++ pkg/network/httpx/listener.go | 79 +++++++++++++ pkg/network/httpx/listener_test.go | 85 ++++++++++++++ pkg/network/httpx/options.go | 56 +++++++++ pkg/network/httpx/server.go | 136 ++++++++++++++++++++++ pkg/network/httpx/tls.go | 21 ++++ pkg/network/websocket/websocket.go | 17 +++ pkg/os/os.go | 23 ++++ pkg/server/server.go | 9 -- pkg/service/service.go | 48 ++++++++ pkg/worker/handlers.go | 86 ++++++-------- pkg/worker/http.go | 31 +++++ pkg/worker/internalhandlers.go | 2 - pkg/worker/server.go | 123 -------------------- pkg/worker/worker.go | 74 ++---------- 34 files changed, 952 insertions(+), 658 deletions(-) create mode 100644 pkg/coordinator/http.go create mode 100644 pkg/network/httpx/address.go create mode 100644 pkg/network/httpx/address_test.go create mode 100644 pkg/network/httpx/listener.go create mode 100644 pkg/network/httpx/listener_test.go create mode 100644 pkg/network/httpx/options.go create mode 100644 pkg/network/httpx/server.go create mode 100644 pkg/network/httpx/tls.go create mode 100644 pkg/network/websocket/websocket.go create mode 100644 pkg/os/os.go delete mode 100644 pkg/server/server.go create mode 100644 pkg/service/service.go create mode 100644 pkg/worker/http.go delete mode 100644 pkg/worker/server.go diff --git a/cmd/coordinator/main.go b/cmd/coordinator/main.go index bbbaad7c..94223f31 100644 --- a/cmd/coordinator/main.go +++ b/cmd/coordinator/main.go @@ -4,13 +4,11 @@ import ( "context" goflag "flag" "math/rand" - "os" - "os/signal" - "syscall" "time" config "github.com/giongto35/cloud-game/v2/pkg/config/coordinator" "github.com/giongto35/cloud-game/v2/pkg/coordinator" + "github.com/giongto35/cloud-game/v2/pkg/os" "github.com/giongto35/cloud-game/v2/pkg/util/logging" "github.com/golang/glog" flag "github.com/spf13/pflag" @@ -18,9 +16,11 @@ import ( var Version = "" -func main() { +func init() { rand.Seed(time.Now().UTC().UnixNano()) +} +func main() { conf := config.NewConfig() flag.CommandLine.AddGoFlagSet(goflag.CommandLine) conf.ParseFlags() @@ -28,29 +28,13 @@ func main() { logging.Init() defer logging.Flush() - ctx, cancelCtx := context.WithCancel(context.Background()) - glog.Infof("[coordinator] version: %v", Version) - glog.Infof("Initializing coordinator server") glog.V(4).Infof("Coordinator configs %v", conf) - app := coordinator.New(ctx, conf) - if err := app.Run(); err != nil { - glog.Errorf("Failed to run coordinator server, reason %v", err) - os.Exit(1) - } + c := coordinator.New(conf) + c.Start() - signals := make(chan os.Signal, 1) - done := make(chan struct{}, 1) - - signal.Notify(signals, os.Interrupt, syscall.SIGTERM) - - go func() { - sig := <-signals - glog.V(4).Infof("[coordinator] Shutting down [os:%v]", sig) - done <- struct{}{} - }() - - <-done - app.Shutdown() + ctx, cancelCtx := context.WithCancel(context.Background()) + defer c.Shutdown(ctx) + <-os.ExpectTermination() cancelCtx() } diff --git a/cmd/worker/main.go b/cmd/worker/main.go index 8d4c4397..162c311e 100644 --- a/cmd/worker/main.go +++ b/cmd/worker/main.go @@ -4,12 +4,10 @@ import ( "context" goflag "flag" "math/rand" - "os" - "os/signal" - "syscall" "time" config "github.com/giongto35/cloud-game/v2/pkg/config/worker" + "github.com/giongto35/cloud-game/v2/pkg/os" "github.com/giongto35/cloud-game/v2/pkg/thread" "github.com/giongto35/cloud-game/v2/pkg/util/logging" "github.com/giongto35/cloud-game/v2/pkg/worker" @@ -31,27 +29,14 @@ func run() { logging.Init() defer logging.Flush() - ctx, cancelCtx := context.WithCancel(context.Background()) - glog.Infof("[worker] version: %v", Version) - glog.V(4).Info("[worker] Initialization") glog.V(4).Infof("[worker] Local configuration %+v", conf) - app := worker.New(ctx, conf) - app.Run() + wrk := worker.New(conf) + wrk.Start() - signals := make(chan os.Signal, 1) - done := make(chan struct{}, 1) - - signal.Notify(signals, os.Interrupt, syscall.SIGTERM) - - go func() { - sig := <-signals - glog.V(4).Infof("[worker] Shutting down [os:%v]", sig) - done <- struct{}{} - }() - - <-done - app.Shutdown() + ctx, cancelCtx := context.WithCancel(context.Background()) + defer wrk.Shutdown(ctx) + <-os.ExpectTermination() cancelCtx() } diff --git a/configs/config.yaml b/configs/config.yaml index 8da6459f..3cc117b8 100644 --- a/configs/config.yaml +++ b/configs/config.yaml @@ -3,6 +3,7 @@ # # application environment (dev, staging, prod) +# deprecated environment: dev coordinator: @@ -31,16 +32,20 @@ coordinator: profilingEnabled: false metricEnabled: false urlPrefix: /coordinator - # the public domain of the coordinator - publicDomain: http://localhost:8000 - # specify the worker address that the client can ping (with protocol and port) - pingServer: # HTTP(S) server config server: - port: 8000 - httpsPort: 443 - httpsKey: - httpsChain: + address: :8000 + https: false + # Letsencrypt or self cert config + tls: + address: :443 + letsencryptUrl: + # allowed host name + domain: + # if both are set then will use certs + # and Letsencryt instead + httpsCert: + httpsKey: analytics: inject: false gtag: @@ -49,6 +54,12 @@ worker: network: # a coordinator address to connect to coordinatorAddress: localhost:8000 + # where to connect + endpoint: /wso + # ping endpoint + pingEndpoint: /echo + # make coordinator connection secure (wss) + secure: false # ISO Alpha-2 country code to group workers by zones zone: monitoring: @@ -56,13 +67,21 @@ worker: port: 6602 profilingEnabled: false # monitoring server URL prefix - metricEnabled: true + metricEnabled: false urlPrefix: /worker server: - port: 9000 - httpsPort: 443 - httpsKey: - httpsChain: + address: :9000 + https: false + tls: + address: :444 + # LetsEncrypt config + # allowed host name + domain: + # if empty will use URL from Go + letsencryptUrl: + # Own certs config + httpsCert: + httpsKey: emulator: # set output viewport scale factor diff --git a/pkg/config/coordinator/config.go b/pkg/config/coordinator/config.go index c11b98b8..80eb12c8 100644 --- a/pkg/config/coordinator/config.go +++ b/pkg/config/coordinator/config.go @@ -12,13 +12,11 @@ import ( type Config struct { Coordinator struct { - Analytics Analytics - PublicDomain string - PingServer string - DebugHost string - Library games.Config - Monitoring monitoring.ServerMonitoringConfig - Server shared.Server + DebugHost string + Library games.Config + Monitoring monitoring.Config + Server shared.Server + Analytics Analytics } Emulator emulator.Emulator Environment shared.Environment diff --git a/pkg/config/emulator/config.go b/pkg/config/emulator/config.go index 97b64ba6..ebb51135 100644 --- a/pkg/config/emulator/config.go +++ b/pkg/config/emulator/config.go @@ -53,7 +53,7 @@ type LibretroCoreConfig struct { } // GetLibretroCoreConfig returns a core config with expanded paths. -func (e *Emulator) GetLibretroCoreConfig(emulator string) LibretroCoreConfig { +func (e Emulator) GetLibretroCoreConfig(emulator string) LibretroCoreConfig { cores := e.Libretro.Cores conf := cores.List[emulator] conf.Lib = path.Join(cores.Paths.Libs, conf.Lib) @@ -65,7 +65,7 @@ func (e *Emulator) GetLibretroCoreConfig(emulator string) LibretroCoreConfig { // GetEmulatorByRom returns emulator name by its supported ROM name. // !to cache into an optimized data structure -func (e *Emulator) GetEmulatorByRom(rom string) string { +func (e Emulator) GetEmulatorByRom(rom string) string { for emu, core := range e.Libretro.Cores.List { for _, romName := range core.Roms { if rom == romName { @@ -76,7 +76,7 @@ func (e *Emulator) GetEmulatorByRom(rom string) string { return "" } -func (e *Emulator) GetSupportedExtensions() []string { +func (e Emulator) GetSupportedExtensions() []string { var extensions []string for _, core := range e.Libretro.Cores.List { extensions = append(extensions, core.Roms...) diff --git a/pkg/config/monitoring/config.go b/pkg/config/monitoring/config.go index 319304f6..da89ead0 100644 --- a/pkg/config/monitoring/config.go +++ b/pkg/config/monitoring/config.go @@ -1,8 +1,10 @@ package monitoring -type ServerMonitoringConfig struct { +type Config struct { Port int URLPrefix string MetricEnabled bool `json:"metric_enabled"` ProfilingEnabled bool `json:"profiling_enabled"` } + +func (c *Config) IsEnabled() bool { return c.MetricEnabled || c.ProfilingEnabled } diff --git a/pkg/config/shared/config.go b/pkg/config/shared/config.go index e962ca12..3eec5a69 100644 --- a/pkg/config/shared/config.go +++ b/pkg/config/shared/config.go @@ -8,17 +8,29 @@ import ( type Environment environment.Env type Server struct { - Port int - HttpsPort int - HttpsKey string - HttpsChain string + Address string + Https bool + Tls struct { + Address string + Domain string + LetsencryptUrl string + HttpsKey string + HttpsCert string + } } func (s *Server) WithFlags() { - flag.IntVar(&s.Port, "port", s.Port, "HTTP server port") - flag.IntVar(&s.HttpsPort, "httpsPort", s.HttpsPort, "HTTPS server port (just why?)") - flag.StringVar(&s.HttpsKey, "httpsKey", s.HttpsKey, "HTTPS key") - flag.StringVar(&s.HttpsChain, "httpsChain", s.HttpsChain, "HTTPS chain") + flag.StringVar(&s.Address, "address", s.Address, "HTTP server address (host:port)") + flag.StringVar(&s.Tls.Address, "httpsAddress", s.Tls.Address, "HTTPS server address (host:port)") + flag.StringVar(&s.Tls.HttpsKey, "httpsKey", s.Tls.HttpsKey, "HTTPS key") + flag.StringVar(&s.Tls.HttpsCert, "httpsCert", s.Tls.HttpsCert, "HTTPS chain") +} + +func (s *Server) GetAddr() string { + if s.Https { + return s.Tls.Address + } + return s.Address } func (env *Environment) Get() environment.Env { diff --git a/pkg/config/worker/config.go b/pkg/config/worker/config.go index f4897d8f..a5dfe8a2 100644 --- a/pkg/config/worker/config.go +++ b/pkg/config/worker/config.go @@ -1,8 +1,8 @@ package worker import ( - "encoding/json" "log" + "net/url" "strings" "github.com/giongto35/cloud-game/v2/pkg/config" @@ -19,34 +19,31 @@ type Config struct { Encoder encoder.Encoder Emulator emulator.Emulator Environment shared.Environment - Worker struct { - Monitoring monitoring.ServerMonitoringConfig - Network struct { - CoordinatorAddress string - Zone string - } - Server shared.Server + Worker Worker + Webrtc webrtcConfig.Webrtc +} + +type Worker struct { + Monitoring monitoring.Config + Network struct { + CoordinatorAddress string + Endpoint string + PingEndpoint string + Secure bool + Zone string } - Webrtc webrtcConfig.Webrtc - Loaded bool + Server shared.Server } // allows custom config path var configPath string func NewConfig() (conf Config) { - if err := config.LoadConfig(&conf, configPath); err == nil { - conf.Loaded = true - } + _ = config.LoadConfig(&conf, configPath) conf.expandSpecialTags() return } -func EmptyConfig() (conf Config) { - conf.Loaded = false - return -} - // ParseFlags updates config values from passed runtime flags. // Define own flags with default value set to the current config param. // Don't forget to call flag.Parse(). @@ -60,18 +57,6 @@ func (c *Config) ParseFlags() { flag.Parse() } -func (c *Config) Serialize() []byte { - res, _ := json.Marshal(c) - return res -} - -func (c *Config) Deserialize(data []byte) { - if err := json.Unmarshal(data, c); err == nil { - c.Loaded = true - } - c.expandSpecialTags() -} - // expandSpecialTags replaces all the special tags in the config. func (c *Config) expandSpecialTags() { // home dir @@ -87,3 +72,15 @@ func (c *Config) expandSpecialTags() { } } } + +// GetAddr returns defined in the config server address. +func (w *Worker) GetAddr() string { return w.Server.GetAddr() } + +// GetPingAddr returns exposed to clients server ping endpoint address. +func (w *Worker) GetPingAddr(address string) string { + pingURL := url.URL{Scheme: "http", Host: address, Path: w.Network.PingEndpoint} + if w.Server.Https { + pingURL.Scheme = "https" + } + return pingURL.String() +} diff --git a/pkg/coordinator/coordinator.go b/pkg/coordinator/coordinator.go index 92111c6d..05ccd265 100644 --- a/pkg/coordinator/coordinator.go +++ b/pkg/coordinator/coordinator.go @@ -1,166 +1,27 @@ package coordinator import ( - "context" - "crypto/tls" - "fmt" "log" "net/http" - "strconv" - "time" "github.com/giongto35/cloud-game/v2/pkg/config/coordinator" - "github.com/giongto35/cloud-game/v2/pkg/environment" "github.com/giongto35/cloud-game/v2/pkg/games" "github.com/giongto35/cloud-game/v2/pkg/monitoring" - "github.com/golang/glog" - "github.com/gorilla/mux" - "golang.org/x/crypto/acme" - "golang.org/x/crypto/acme/autocert" + "github.com/giongto35/cloud-game/v2/pkg/service" ) -const stagingLEURL = "https://acme-staging-v02.api.letsencrypt.org/directory" - -type Coordinator struct { - ctx context.Context - cfg coordinator.Config - - monitoringServer *monitoring.ServerMonitoring -} - -func New(ctx context.Context, cfg coordinator.Config) *Coordinator { - return &Coordinator{ - ctx: ctx, - cfg: cfg, - - monitoringServer: monitoring.NewServerMonitoring(cfg.Coordinator.Monitoring, "cord"), - } -} - -func (c *Coordinator) Run() error { - go c.initializeCoordinator() - go c.RunMonitoringServer() - return nil -} - -func (c *Coordinator) RunMonitoringServer() { - glog.Infoln("Starting monitoring server for coordinator") - err := c.monitoringServer.Run() +func New(conf coordinator.Config) (services service.Group) { + srv := NewServer(conf, games.NewLibWhitelisted(conf.Coordinator.Library, conf.Emulator)) + httpSrv, err := NewHTTPServer(conf, func(mux *http.ServeMux) { + mux.HandleFunc("/ws", srv.WS) + mux.HandleFunc("/wso", srv.WSO) + }) if err != nil { - glog.Errorf("Failed to start monitoring server, reason %s", err) - } -} - -func (c *Coordinator) Shutdown() { - if err := c.monitoringServer.Shutdown(c.ctx); err != nil { - glog.Errorln("Failed to shutdown monitoring server") - } -} - -func makeServerFromMux(mux *http.ServeMux) *http.Server { - // set timeouts so that a slow or malicious client doesn't - // hold resources forever - return &http.Server{ - ReadTimeout: 5 * time.Second, - WriteTimeout: 5 * time.Second, - IdleTimeout: 120 * time.Second, - Handler: mux, - } -} - -func makeHTTPServer(server *Server) *http.Server { - r := mux.NewRouter() - r.HandleFunc("/ws", server.WS) - r.HandleFunc("/wso", server.WSO) - r.PathPrefix("/static/").Handler(http.StripPrefix("/static/", http.FileServer(http.Dir("./web")))) - r.PathPrefix("/").Handler(server.GetWeb(server.cfg)) - - svmux := &http.ServeMux{} - svmux.Handle("/", r) - - return makeServerFromMux(svmux) -} - -func makeHTTPToHTTPSRedirectServer(server *Server) *http.Server { - handleRedirect := func(w http.ResponseWriter, r *http.Request) { - newURI := "https://" + r.Host + r.URL.String() - http.Redirect(w, r, newURI, http.StatusFound) - } - r := mux.NewRouter() - r.PathPrefix("/").HandlerFunc(handleRedirect) - - svmux := &http.ServeMux{} - svmux.Handle("/", r) - - return makeServerFromMux(svmux) -} - -// initializeCoordinator setup an coordinator server -func (c *Coordinator) initializeCoordinator() { - // init games library - libraryConf := c.cfg.Coordinator.Library - if len(libraryConf.Supported) == 0 { - libraryConf.Supported = c.cfg.Emulator.GetSupportedExtensions() - } - lib := games.NewLibrary(libraryConf) - lib.Scan() - - server := NewServer(c.cfg, lib) - - var certManager *autocert.Manager - var httpsSrv *http.Server - - log.Println("Initializing Coordinator Server") - mode := c.cfg.Environment.Get() - if mode.AnyOf(environment.Production, environment.Staging) { - serverConfig := c.cfg.Coordinator.Server - httpsSrv = makeHTTPServer(server) - httpsSrv.Addr = fmt.Sprintf(":%d", serverConfig.HttpsPort) - - if serverConfig.HttpsChain == "" || serverConfig.HttpsKey == "" { - serverConfig.HttpsChain = "" - serverConfig.HttpsKey = "" - - var leurl string - if mode == environment.Staging { - leurl = stagingLEURL - } else { - leurl = acme.LetsEncryptURL - } - - certManager = &autocert.Manager{ - Prompt: autocert.AcceptTOS, - HostPolicy: autocert.HostWhitelist(c.cfg.Coordinator.PublicDomain), - Cache: autocert.DirCache("assets/cache"), - Client: &acme.Client{DirectoryURL: leurl}, - } - - httpsSrv.TLSConfig = &tls.Config{GetCertificate: certManager.GetCertificate} - } - - go func(chain string, key string) { - fmt.Printf("Starting HTTPS server on %s\n", httpsSrv.Addr) - err := httpsSrv.ListenAndServeTLS(chain, key) - if err != nil { - log.Fatalf("httpsSrv.ListendAndServeTLS() failed with %s", err) - } - }(serverConfig.HttpsChain, serverConfig.HttpsKey) - } - - var httpSrv *http.Server - if mode.AnyOf(environment.Production, environment.Staging) { - httpSrv = makeHTTPToHTTPSRedirectServer(server) - } else { - httpSrv = makeHTTPServer(server) - } - - if certManager != nil { - httpSrv.Handler = certManager.HTTPHandler(httpSrv.Handler) - } - - httpSrv.Addr = ":" + strconv.Itoa(c.cfg.Coordinator.Server.Port) - err := httpSrv.ListenAndServe() - if err != nil { - log.Fatalf("httpSrv.ListenAndServe() failed with %s", err) + log.Fatalf("http init fail: %v", err) } + services.Add(srv, httpSrv) + if conf.Coordinator.Monitoring.IsEnabled() { + services.Add(monitoring.New(conf.Coordinator.Monitoring, httpSrv.GetHost(), "cord")) + } + return } diff --git a/pkg/coordinator/handlers.go b/pkg/coordinator/handlers.go index 8e7fb1a3..ae7ca763 100644 --- a/pkg/coordinator/handlers.go +++ b/pkg/coordinator/handlers.go @@ -3,8 +3,6 @@ package coordinator import ( "encoding/json" "errors" - "fmt" - "html/template" "log" "math" "net/http" @@ -16,14 +14,15 @@ import ( "github.com/giongto35/cloud-game/v2/pkg/environment" "github.com/giongto35/cloud-game/v2/pkg/games" "github.com/giongto35/cloud-game/v2/pkg/ice" + "github.com/giongto35/cloud-game/v2/pkg/service" "github.com/giongto35/cloud-game/v2/pkg/util" "github.com/gofrs/uuid" "github.com/gorilla/websocket" ) -const index = "./web/index.html" - type Server struct { + service.Service + cfg coordinator.Config // games library library games.GameLibrary @@ -35,12 +34,12 @@ type Server struct { browserClients map[string]*BrowserClient } -const pingServerTemp = "https://%s.%s/echo" -const devPingServer = "http://localhost:9000/echo" - var upgrader = websocket.Upgrader{} func NewServer(cfg coordinator.Config, library games.GameLibrary) *Server { + // scan the lib right away + library.Scan() + return &Server{ cfg: cfg, library: library, @@ -53,39 +52,20 @@ func NewServer(cfg coordinator.Config, library games.GameLibrary) *Server { } } -// GetWeb returns web frontend -func (s *Server) GetWeb(conf coordinator.Config) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - tpl, err := template.ParseFiles(index) - if err != nil { - log.Fatal(err) - } - - // render index page with some tpl values - if err = tpl.Execute(w, conf.Coordinator.Analytics); err != nil { - log.Fatal(err) - } - }) -} - -// getPingServer returns the server for latency check of a zone. -// In latency check to find best worker step, we use this server to find the closest worker. -func (s *Server) getPingServer(zone string) string { - if s.cfg.Coordinator.PingServer != "" { - return fmt.Sprintf("%s/echo", s.cfg.Coordinator.PingServer) - } - - mode := s.cfg.Environment.Get() - if mode.AnyOf(environment.Production, environment.Staging) { - return fmt.Sprintf(pingServerTemp, zone, s.cfg.Coordinator.PublicDomain) - } - return devPingServer -} - // WSO handles all connections from a new worker to coordinator func (s *Server) WSO(w http.ResponseWriter, r *http.Request) { log.Println("Coordinator: A worker is connecting...") + connRt, err := GetConnectionRequest(r.URL.Query().Get("data")) + if err != nil { + log.Printf("Coordinator: got a malformed request: %v", err.Error()) + return + } + + if s.cfg.Coordinator.Server.Https && !connRt.IsHTTPS { + log.Printf("Warning! Unsecure connection. The worker may not work properly without HTTPS on its side!") + } + // be aware of ReadBufferSize, WriteBufferSize (default 4096) // https://pkg.go.dev/github.com/gorilla/websocket?tab=doc#Upgrader c, err := upgrader.Upgrade(w, r, nil) @@ -107,20 +87,17 @@ func (s *Server) WSO(w http.ResponseWriter, r *http.Request) { // Create a workerClient instance wc := NewWorkerClient(c, workerID) wc.Println("Generated worker ID") + wc.Zone = connRt.Zone + wc.PingServer = connRt.PingAddr // Register to workersClients map the client connection address := util.GetRemoteAddress(c) - wc.Println("Address:", address) - // Zone of the worker - zone := r.URL.Query().Get("zone") - wc.Printf("Is public: %v zone: %v", util.IsPublicIP(address), zone) + public := util.IsPublicIP(address) - pingServer := s.getPingServer(zone) - - wc.Printf("Set ping server address: %s", pingServer) + wc.Printf("addr: %v | zone: %v | pub: %v | ping: %v", address, wc.Zone, public, wc.PingServer) // In case worker and coordinator in the same host - if !util.IsPublicIP(address) && s.cfg.Environment.Get() == environment.Production { + if !public && s.cfg.Environment.Get() == environment.Production { // Don't accept private IP for worker's address in prod mode // However, if the worker in the same host with coordinator, we can get public IP of worker wc.Printf("[!] Address %s is invalid", address) @@ -138,8 +115,6 @@ func (s *Server) WSO(w http.ResponseWriter, r *http.Request) { // Create a workerClient instance wc.Address = address wc.StunTurnServer = ice.ToJson(s.cfg.Webrtc.IceServers, ice.Replacement{From: "server-ip", To: address}) - wc.Zone = zone - wc.PingServer = pingServer // Attach to Server instance with workerID, add defer s.workerClients[workerID] = wc diff --git a/pkg/coordinator/http.go b/pkg/coordinator/http.go new file mode 100644 index 00000000..99fd5260 --- /dev/null +++ b/pkg/coordinator/http.go @@ -0,0 +1,47 @@ +package coordinator + +import ( + "html/template" + "log" + "net/http" + + "github.com/giongto35/cloud-game/v2/pkg/config/coordinator" + "github.com/giongto35/cloud-game/v2/pkg/network/httpx" +) + +func NewHTTPServer(conf coordinator.Config, fnMux func(mux *http.ServeMux)) (*httpx.Server, error) { + return httpx.NewServer( + conf.Coordinator.Server.GetAddr(), + func(*httpx.Server) http.Handler { + h := http.NewServeMux() + h.Handle("/", index(conf)) + h.Handle("/static/", static("./web")) + fnMux(h) + return h + }, + httpx.WithServerConfig(conf.Coordinator.Server), + ) +} + +func index(conf coordinator.Config) http.Handler { + tpl, err := template.ParseFiles("./web/index.html") + if err != nil { + log.Fatal(err) + } + + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // return 404 on unknown + if r.URL.Path != "/" { + http.NotFound(w, r) + return + } + // render index page with some tpl values + if err = tpl.Execute(w, conf.Coordinator.Analytics); err != nil { + log.Fatal(err) + } + }) +} + +func static(dir string) http.Handler { + return http.StripPrefix("/static/", http.FileServer(http.Dir(dir))) +} diff --git a/pkg/coordinator/internalhandlers.go b/pkg/coordinator/internalhandlers.go index 92f8b189..9e502fad 100644 --- a/pkg/coordinator/internalhandlers.go +++ b/pkg/coordinator/internalhandlers.go @@ -1,27 +1,33 @@ package coordinator import ( + "encoding/base64" + "encoding/json" "log" - "github.com/giongto35/cloud-game/v2/pkg/config/worker" "github.com/giongto35/cloud-game/v2/pkg/cws" "github.com/giongto35/cloud-game/v2/pkg/cws/api" ) -func (wc *WorkerClient) handleConfigRequest() cws.PacketHandler { - return func(resp cws.WSPacket) cws.WSPacket { - // try to load worker config - conf := worker.NewConfig() - return api.ConfigRequestPacket(conf.Serialize()) - } -} - func (wc *WorkerClient) handleHeartbeat() cws.PacketHandler { return func(resp cws.WSPacket) cws.WSPacket { return resp } } +func GetConnectionRequest(data string) (api.ConnectionRequest, error) { + req := api.ConnectionRequest{} + if data == "" { + return req, nil + } + decodeString, err := base64.URLEncoding.DecodeString(data) + if err != nil { + return req, err + } + err = json.Unmarshal(decodeString, &req) + return req, err +} + // handleRegisterRoom event from a worker, when worker created a new room. // RoomID is global so it is managed by coordinator. func (wc *WorkerClient) handleRegisterRoom(s *Server) cws.PacketHandler { diff --git a/pkg/coordinator/routes.go b/pkg/coordinator/routes.go index aa054f85..5652fb9a 100644 --- a/pkg/coordinator/routes.go +++ b/pkg/coordinator/routes.go @@ -7,7 +7,6 @@ func (s *Server) workerRoutes(wc *WorkerClient) { if wc == nil { return } - wc.Receive(api.ConfigRequest, wc.handleConfigRequest()) wc.Receive(api.Heartbeat, wc.handleHeartbeat()) wc.Receive(api.RegisterRoom, wc.handleRegisterRoom(s)) wc.Receive(api.GetRoom, wc.handleGetRoom(s)) diff --git a/pkg/coordinator/worker.go b/pkg/coordinator/worker.go index ca5cef11..83b661f6 100644 --- a/pkg/coordinator/worker.go +++ b/pkg/coordinator/worker.go @@ -14,7 +14,7 @@ type WorkerClient struct { WorkerID string Address string // ip address of worker - // public server used for ping check (Cannot use worker address because they are not publicly exposed) + // public server used for ping check PingServer string StunTurnServer string userCount int // may be atomic diff --git a/pkg/cws/api/coordinator.go b/pkg/cws/api/coordinator.go index 58a7780b..696e5223 100644 --- a/pkg/cws/api/coordinator.go +++ b/pkg/cws/api/coordinator.go @@ -3,12 +3,11 @@ package api import "github.com/giongto35/cloud-game/v2/pkg/cws" const ( - ConfigRequest = "config_request" - GetRoom = "get_room" - CloseRoom = "close_room" - RegisterRoom = "register_room" - Heartbeat = "heartbeat" - IceCandidate = "ice_candidate" + GetRoom = "get_room" + CloseRoom = "close_room" + RegisterRoom = "register_room" + Heartbeat = "heartbeat" + IceCandidate = "ice_candidate" NoData = "" @@ -38,10 +37,14 @@ type GameStartCall struct { func (packet *GameStartCall) From(data string) error { return from(packet, data) } func (packet *GameStartCall) To() (string, error) { return to(packet) } -// -// *** packets *** -// -func ConfigPacket() cws.WSPacket { return cws.WSPacket{ID: ConfigRequest} } +type ConnectionRequest struct { + Zone string `json:"zone,omitempty"` + PingAddr string `json:"ping_addr,omitempty"` + IsHTTPS bool `json:"is_https,omitempty"` +} + +// packets + func RegisterRoomPacket(data string) cws.WSPacket { return cws.WSPacket{ID: RegisterRoom, Data: data} } func GetRoomPacket(data string) cws.WSPacket { return cws.WSPacket{ID: GetRoom, Data: data} } func CloseRoomPacket(data string) cws.WSPacket { return cws.WSPacket{ID: CloseRoom, Data: data} } diff --git a/pkg/games/game_library.go b/pkg/games/game_library.go index c7db977e..76046af9 100644 --- a/pkg/games/game_library.go +++ b/pkg/games/game_library.go @@ -16,7 +16,7 @@ import ( // Config is an external configuration type Config struct { - // some directory which is gonna be + // some directory which is going to be // the root folder for the library BasePath string // a list of supported file extensions @@ -64,6 +64,10 @@ type GameLibrary interface { Scan() } +type FileExtensionWhitelist interface { + GetSupportedExtensions() []string +} + type GameMetadata struct { uid string // the display name of the game @@ -74,7 +78,11 @@ type GameMetadata struct { Path string } -func NewLibrary(conf Config) GameLibrary { +func (c Config) GetSupportedExtensions() []string { return c.Supported } + +func NewLib(conf Config) GameLibrary { return NewLibWhitelisted(conf, conf) } + +func NewLibWhitelisted(conf Config, filter FileExtensionWhitelist) GameLibrary { hasSource := true dir, err := filepath.Abs(conf.BasePath) if err != nil { @@ -82,6 +90,10 @@ func NewLibrary(conf Config) GameLibrary { log.Printf("[lib] invalid source: %v (%v)\n", conf.BasePath, err) } + if len(conf.Supported) == 0 { + conf.Supported = filter.GetSupportedExtensions() + } + library := &library{ config: libConf{ path: dir, diff --git a/pkg/games/game_library_test.go b/pkg/games/game_library_test.go index c0b5a9fa..d360abd2 100644 --- a/pkg/games/game_library_test.go +++ b/pkg/games/game_library_test.go @@ -19,7 +19,7 @@ func TestLibraryScan(t *testing.T) { } for _, test := range tests { - library := NewLibrary(Config{ + library := NewLib(Config{ BasePath: test.directory, Supported: []string{"gba", "zip", "nes"}, Ignored: []string{"neogeo", "pgm"}, diff --git a/pkg/monitoring/monitoring.go b/pkg/monitoring/monitoring.go index 35fd71c0..257a744b 100644 --- a/pkg/monitoring/monitoring.go +++ b/pkg/monitoring/monitoring.go @@ -3,104 +3,109 @@ package monitoring import ( "context" "fmt" + "log" + "math" + "net" "net/http" "net/http/pprof" + "strconv" "strings" "github.com/giongto35/cloud-game/v2/pkg/config/monitoring" - config "github.com/giongto35/cloud-game/v2/pkg/config/worker" - "github.com/golang/glog" + "github.com/giongto35/cloud-game/v2/pkg/network/httpx" + "github.com/giongto35/cloud-game/v2/pkg/service" "github.com/prometheus/client_golang/prometheus/promhttp" ) -type ServerMonitoring struct { - cfg monitoring.ServerMonitoringConfig +const debugEndpoint = "/debug/pprof" +const metricsEndpoint = "/metrics" + +type Monitoring struct { + service.RunnableService + + conf monitoring.Config tag string - server *http.Server + server *httpx.Server } -func NewServerMonitoring(cfg monitoring.ServerMonitoringConfig, tag string) *ServerMonitoring { - return &ServerMonitoring{cfg: validate(&cfg), tag: tag} -} - -func (sm *ServerMonitoring) Init(conf interface{}) error { - cfg := conf.(config.Config).Worker.Monitoring - sm.cfg = validate(&cfg) - return nil -} - -func (sm *ServerMonitoring) Run() error { - if sm.cfg.ProfilingEnabled || sm.cfg.MetricEnabled { - monitoringServerMux := http.NewServeMux() - - srv := http.Server{ - Addr: fmt.Sprintf(":%d", sm.cfg.Port), - Handler: monitoringServerMux, - } - sm.server = &srv - glog.Infof("[%v] Starting monitoring server at %v", sm.tag, srv.Addr) - - if sm.cfg.ProfilingEnabled { - pprofPath := fmt.Sprintf("%s/debug/pprof", sm.cfg.URLPrefix) - glog.Infof("[%v] Profiling is enabled at %v", sm.tag, srv.Addr+pprofPath) - monitoringServerMux.Handle(pprofPath+"/", http.HandlerFunc(pprof.Index)) - monitoringServerMux.Handle(pprofPath+"/cmdline", http.HandlerFunc(pprof.Cmdline)) - monitoringServerMux.Handle(pprofPath+"/profile", http.HandlerFunc(pprof.Profile)) - monitoringServerMux.Handle(pprofPath+"/symbol", http.HandlerFunc(pprof.Symbol)) - monitoringServerMux.Handle(pprofPath+"/trace", http.HandlerFunc(pprof.Trace)) - // pprof handler for custom pprof path needs to be explicitly specified, according to: https://github.com/gin-contrib/pprof/issues/8 . Don't know why this is not fired as ticket - // https://golang.org/src/net/http/pprof/pprof.go?s=7411:7461#L305 only render index page - monitoringServerMux.Handle(pprofPath+"/allocs", pprof.Handler("allocs")) - monitoringServerMux.Handle(pprofPath+"/block", pprof.Handler("block")) - monitoringServerMux.Handle(pprofPath+"/goroutine", pprof.Handler("goroutine")) - monitoringServerMux.Handle(pprofPath+"/heap", pprof.Handler("heap")) - monitoringServerMux.Handle(pprofPath+"/mutex", pprof.Handler("mutex")) - monitoringServerMux.Handle(pprofPath+"/threadcreate", pprof.Handler("threadcreate")) - } - - if sm.cfg.MetricEnabled { - metricPath := fmt.Sprintf("%s/metrics", sm.cfg.URLPrefix) - glog.Infof("[%v] Prometheus metric is enabled at %v", sm.tag, srv.Addr+metricPath) - monitoringServerMux.Handle(metricPath, promhttp.Handler()) - } - - err := srv.ListenAndServe() - if err == http.ErrServerClosed { - glog.Infof("[%v] The main HTTP server has been closed", sm.tag) - return nil - } - return err +// New creates new monitoring service. +// The tag param specifies owner label for logs. +func New(conf monitoring.Config, baseAddr string, tag string) *Monitoring { + serv, err := httpx.NewServer( + net.JoinHostPort(baseAddr, strconv.Itoa(conf.Port)), + func(*httpx.Server) http.Handler { + h := http.NewServeMux() + if conf.ProfilingEnabled { + prefix := conf.URLPrefix + debugEndpoint + h.HandleFunc(prefix+"/", pprof.Index) + h.HandleFunc(prefix+"/cmdline", pprof.Cmdline) + h.HandleFunc(prefix+"/profile", pprof.Profile) + h.HandleFunc(prefix+"/symbol", pprof.Symbol) + h.HandleFunc(prefix+"/trace", pprof.Trace) + h.Handle(prefix+"/allocs", pprof.Handler("allocs")) + h.Handle(prefix+"/block", pprof.Handler("block")) + h.Handle(prefix+"/goroutine", pprof.Handler("goroutine")) + h.Handle(prefix+"/heap", pprof.Handler("heap")) + h.Handle(prefix+"/mutex", pprof.Handler("mutex")) + h.Handle(prefix+"/threadcreate", pprof.Handler("threadcreate")) + } + if conf.MetricEnabled { + h.Handle(conf.URLPrefix+metricsEndpoint, promhttp.Handler()) + } + return h + }, + httpx.WithPortRoll(true)) + if err != nil { + log.Fatalf("couldn't start monitoring server: %v", err) } - return nil + return &Monitoring{conf: conf, tag: tag, server: serv} } -func (sm *ServerMonitoring) Shutdown(ctx context.Context) error { - if sm.server == nil { - return nil +func (m *Monitoring) Run() { + m.printInfo() + m.server.Run() +} + +func (m *Monitoring) Shutdown(ctx context.Context) error { + log.Printf("[%v] Shutting down monitoring server", m.tag) + return m.server.Shutdown(ctx) +} + +func (m *Monitoring) String() string { + return fmt.Sprintf("monitoring::%s:%d", m.conf.URLPrefix, m.conf.Port) +} + +func (m *Monitoring) GetMetricsPublicAddress() string { + return m.server.GetProtocol() + "://" + m.server.Addr + m.conf.URLPrefix + metricsEndpoint +} + +func (m *Monitoring) GetProfilingAddress() string { + return m.server.GetProtocol() + "://" + m.server.Addr + m.conf.URLPrefix + debugEndpoint +} + +func (m *Monitoring) printInfo() { + length, pad := 42, 20 + var table, records strings.Builder + table.Grow(length * 4) + records.Grow(length * 2) + + if m.conf.ProfilingEnabled { + addr := m.GetProfilingAddress() + length = int(math.Max(float64(length), float64(len(addr)+pad))) + records.WriteString(" Profiling " + addr + "\n") + } + if m.conf.MetricEnabled { + addr := m.GetMetricsPublicAddress() + length = int(math.Max(float64(length), float64(len(addr)+pad))) + records.WriteString(" Prometheus " + addr + "\n") } - glog.Infof("[%v] Shutting down monitoring server", sm.tag) - return sm.server.Shutdown(ctx) -} - -func (sm *ServerMonitoring) String() string { - return fmt.Sprintf("monitoring::%s:%d", sm.cfg.URLPrefix, sm.cfg.Port) -} - -func validate(conf *monitoring.ServerMonitoringConfig) monitoring.ServerMonitoringConfig { - if conf.Port == 0 { - conf.Port = 6365 - } - - if len(conf.URLPrefix) > 0 { - conf.URLPrefix = strings.TrimSpace(conf.URLPrefix) - if !strings.HasPrefix(conf.URLPrefix, "/") { - conf.URLPrefix = "/" + conf.URLPrefix - } - - if strings.HasSuffix(conf.URLPrefix, "/") { - conf.URLPrefix = strings.TrimSuffix(conf.URLPrefix, "/") - } - } - return *conf + title := "Monitoring" + edge := strings.Repeat("-", length) + c := (length-len(title)-3)/2 + 1 + len(title) - 3 + table.WriteString(fmt.Sprintf("[%s]\n", m.tag)) + table.WriteString(fmt.Sprintf("%s\n---%*s%*s\n%s\n", edge, c, title, length-(c+len(title))+6+1, "---", edge)) + table.WriteString(records.String()) + table.WriteString(edge) + log.Printf(table.String()) } diff --git a/pkg/network/httpx/address.go b/pkg/network/httpx/address.go new file mode 100644 index 00000000..58f0322e --- /dev/null +++ b/pkg/network/httpx/address.go @@ -0,0 +1,40 @@ +package httpx + +import ( + "net" + "strconv" +) + +// buildAddress joins network host from the first param, +// zone from the second, and +// the port value of a listener from the third param. +// +// As example, address host.com:8080 and listener 123.123.123.123:8888 will be +// transformed to host.com:8888. +func buildAddress(address string, zone string, l Listener) string { + addr, _, err := net.SplitHostPort(address) + if err != nil { + addr = address + } + if addr == "" { + addr = "localhost" + } + + port := l.GetPort() + if port > 0 && port != 80 && port != 443 { + addr += ":" + strconv.Itoa(port) + } + + if zone != "" { + addr = zone + "." + addr + } + return addr +} + +func extractHost(address string) string { + addr, _, err := net.SplitHostPort(address) + if err != nil { + addr = address + } + return addr +} diff --git a/pkg/network/httpx/address_test.go b/pkg/network/httpx/address_test.go new file mode 100644 index 00000000..517e4c47 --- /dev/null +++ b/pkg/network/httpx/address_test.go @@ -0,0 +1,47 @@ +package httpx + +import ( + "net" + "testing" +) + +type testListener struct { + addr net.TCPAddr +} + +func (tl testListener) Accept() (net.Conn, error) { return nil, nil } +func (tl testListener) Close() error { return nil } +func (tl testListener) Addr() net.Addr { return &tl.addr } + +func NewTCP(port int) Listener { + return Listener{testListener{addr: net.TCPAddr{Port: port}}} +} + +func TestMergeAddresses(t *testing.T) { + tests := []struct { + addr string + zone string + ls Listener + rez string + }{ + {addr: "", rez: "localhost"}, + {addr: ":", ls: NewTCP(0), rez: "localhost"}, + {addr: "", ls: NewTCP(393), rez: "localhost:393"}, + {addr: ":8080", ls: NewTCP(8080), rez: "localhost:8080"}, + {addr: ":8080", ls: NewTCP(8081), rez: "localhost:8081"}, + {addr: "host:8080", ls: NewTCP(8080), rez: "host:8080"}, + {addr: "host:8080", ls: NewTCP(8081), rez: "host:8081"}, + {addr: "host:8080", zone: "test", ls: NewTCP(8081), rez: "test.host:8081"}, + {addr: ":80", ls: NewTCP(80), rez: "localhost"}, + {addr: ":", ls: NewTCP(344), rez: "localhost:344"}, + {addr: "https://garbage.com:99a9a", rez: "https://garbage.com:99a9a"}, + {addr: "[::]", rez: "[::]"}, + } + + for _, test := range tests { + address := buildAddress(test.addr, test.zone, test.ls) + if address != test.rez { + t.Errorf("expected %v, got %v", test.rez, address) + } + } +} diff --git a/pkg/network/httpx/listener.go b/pkg/network/httpx/listener.go new file mode 100644 index 00000000..156aa739 --- /dev/null +++ b/pkg/network/httpx/listener.go @@ -0,0 +1,79 @@ +package httpx + +import ( + "errors" + "net" + "os" + "runtime" + "strconv" + "syscall" +) + +const listenAttempts = 42 + +type Listener struct { + net.Listener +} + +func NewListener(address string, withNextFreePort bool) (*Listener, error) { + listener, err := listener(address, withNextFreePort) + if err != nil { + return nil, err + } + return &Listener{listener}, err +} + +func listener(address string, withNextFreePort bool) (net.Listener, error) { + listener, err := net.Listen("tcp", address) + if err == nil || !withNextFreePort || !isPortBusyError(err) { + return listener, err + } + // we will roll next available port + host, prt, err := net.SplitHostPort(address) + if err != nil { + return listener, err + } + // it should be impossible to get 0 port here + // or that's going to break otherwise + port, err := strconv.Atoi(prt) + if err != nil { + return listener, err + } + for i := port + 1; i < port+listenAttempts; i++ { + listener, err := net.Listen("tcp", host+":"+strconv.Itoa(i)) + if err == nil { + return listener, nil + } + } + return nil, errors.New("no available ports") +} + +func (l Listener) GetPort() int { + if l.Listener == nil { + return 0 + } + tcp, ok := l.Addr().(*net.TCPAddr) + if ok && tcp != nil { + return tcp.Port + } + return 0 +} + +func isPortBusyError(err error) bool { + var eOsSyscall *os.SyscallError + if !errors.As(err, &eOsSyscall) { + return false + } + var errErrno syscall.Errno + if !errors.As(eOsSyscall, &errErrno) { + return false + } + if errErrno == syscall.EADDRINUSE { + return true + } + const WSAEADDRINUSE = 10048 + if runtime.GOOS == "windows" && errErrno == WSAEADDRINUSE { + return true + } + return false +} diff --git a/pkg/network/httpx/listener_test.go b/pkg/network/httpx/listener_test.go new file mode 100644 index 00000000..7a8900f4 --- /dev/null +++ b/pkg/network/httpx/listener_test.go @@ -0,0 +1,85 @@ +package httpx + +import ( + "net" + "strings" + "testing" +) + +func TestListenerCreation(t *testing.T) { + tests := []struct { + addr string + port string + random bool + error bool + }{ + {addr: ":80", port: "80"}, + {addr: ":", random: true}, + {addr: ":0", random: true}, + {addr: "", random: true}, + {addr: "https://garbage.com:99a9a", error: true}, + {addr: ":8082", port: "8082"}, + {addr: "localhost:8888", port: "8888"}, + {addr: "localhost:abc1", error: true}, + } + + for _, test := range tests { + ls, err := NewListener(test.addr, false) + + if test.error { + if err == nil { + t.Errorf("expected error, but got none") + } + continue + } + + if !test.error && err != nil { + t.Errorf("unexpected error %v", err) + continue + } + + defer ls.Close() + + addr := ls.Addr().(*net.TCPAddr) + port := ls.GetPort() + + hasPort := port > 0 + isPortSame := strings.HasSuffix(addr.String(), ":"+test.port) + + if test.random { + if !hasPort { + t.Errorf("expected a random port, got %v", port) + } + continue + } + + if !isPortSame { + t.Errorf("expected the same port %v != %v", test.port, port) + } + } +} + +func TestFailOnPortInUse(t *testing.T) { + a, err := NewListener(":3333", false) + if err != nil { + t.Errorf("expected no error, got %v", err) + } + defer a.Close() + _, err = NewListener(":3333", false) + if err == nil { + t.Errorf("expected busy port error, but got none") + } +} + +func TestListenerPortRoll(t *testing.T) { + a, err := NewListener("127.0.0.1:3333", false) + if err != nil { + t.Errorf("expected no error, got %v", err) + } + defer a.Close() + b, err := NewListener("127.0.0.1:3333", true) + if err != nil { + t.Errorf("expected no port error, but got %v", err) + } + b.Close() +} diff --git a/pkg/network/httpx/options.go b/pkg/network/httpx/options.go new file mode 100644 index 00000000..aa6fa666 --- /dev/null +++ b/pkg/network/httpx/options.go @@ -0,0 +1,56 @@ +package httpx + +import ( + "time" + + "github.com/giongto35/cloud-game/v2/pkg/config/shared" +) + +type ( + Options struct { + Https bool + HttpsRedirect bool + HttpsRedirectAddress string + HttpsCert string + HttpsKey string + HttpsDomain string + PortRoll bool + IdleTimeout time.Duration + ReadTimeout time.Duration + WriteTimeout time.Duration + Zone string + } + Option func(*Options) +) + +func (o *Options) override(options ...Option) { + for _, opt := range options { + opt(o) + } +} + +func (o *Options) IsAutoHttpsCert() bool { return !(o.HttpsCert != "" && o.HttpsKey != "") } + +func HttpsRedirect(redirect bool) Option { + return func(opts *Options) { opts.HttpsRedirect = redirect } +} + +//func Https(is bool) Option { return func(opts *Options) { opts.Https = is } } +//func HttpsCert(cert string) Option { return func(opts *Options) { opts.HttpsCert = cert } } +//func HttpsKey(key string) Option { return func(opts *Options) { opts.HttpsKey = key } } +//func HttpsDomain(domain string) Option { return func(opts *Options) { opts.HttpsDomain = domain } } +//func IdleTimeout(t time.Duration) Option { return func(opts *Options) { opts.IdleTimeout = t } } +//func ReadTimeout(t time.Duration) Option { return func(opts *Options) { opts.ReadTimeout = t } } +//func WriteTimeout(t time.Duration) Option { return func(opts *Options) { opts.WriteTimeout = t } } + +func WithPortRoll(roll bool) Option { return func(opts *Options) { opts.PortRoll = roll } } +func WithZone(zone string) Option { return func(opts *Options) { opts.Zone = zone } } +func WithServerConfig(conf shared.Server) Option { + return func(opts *Options) { + opts.Https = conf.Https + opts.HttpsCert = conf.Tls.HttpsCert + opts.HttpsKey = conf.Tls.HttpsKey + opts.HttpsDomain = conf.Tls.Domain + opts.HttpsRedirectAddress = conf.Address + } +} diff --git a/pkg/network/httpx/server.go b/pkg/network/httpx/server.go new file mode 100644 index 00000000..45ec1b77 --- /dev/null +++ b/pkg/network/httpx/server.go @@ -0,0 +1,136 @@ +package httpx + +import ( + "context" + "log" + "net/http" + "net/url" + "time" + + "github.com/giongto35/cloud-game/v2/pkg/service" + "golang.org/x/crypto/acme/autocert" +) + +type Server struct { + http.Server + service.RunnableService + + autoCert *autocert.Manager + opts Options + + listener *Listener + redirect *Server +} + +func NewServer(address string, handler func(*Server) http.Handler, options ...Option) (*Server, error) { + opts := &Options{ + Https: false, + HttpsRedirect: true, + IdleTimeout: 120 * time.Second, + ReadTimeout: 5 * time.Second, + WriteTimeout: 5 * time.Second, + } + opts.override(options...) + + server := &Server{ + Server: http.Server{ + Addr: address, + IdleTimeout: opts.IdleTimeout, + ReadTimeout: opts.ReadTimeout, + WriteTimeout: opts.WriteTimeout, + }, + opts: *opts, + } + // (╯°□°)╯︵ ┻━┻ + server.Handler = handler(server) + + if opts.Https && opts.IsAutoHttpsCert() { + server.autoCert = NewTLSConfig(opts.HttpsDomain).CertManager + server.TLSConfig = server.autoCert.TLSConfig() + } + + addr := server.Addr + if server.Addr == "" { + addr = ":http" + if opts.Https { + addr = ":https" + } + log.Printf("Warning! Empty server address has been changed to %v", server.Addr) + } + listener, err := NewListener(addr, server.opts.PortRoll) + if err != nil { + return nil, err + } + server.listener = listener + + addr = buildAddress(server.Addr, opts.Zone, *listener) + log.Printf("[server] address was set to %v (%v)", addr, server.Addr) + server.Addr = addr + + return server, nil +} + +func (s *Server) Run() { + protocol := s.GetProtocol() + log.Printf("Starting %s server on %s", protocol, s.Addr) + + if s.opts.Https && s.opts.HttpsRedirect { + rdr, err := s.redirection() + if err != nil { + log.Fatalf("couldn't init redirection server: %v", err) + } + s.redirect = rdr + go s.redirect.Run() + } + + var err error + if s.opts.Https { + err = s.ServeTLS(*s.listener, s.opts.HttpsCert, s.opts.HttpsKey) + } else { + err = s.Serve(*s.listener) + } + switch err { + case http.ErrServerClosed: + log.Printf("%s server was closed", protocol) + return + default: + log.Printf("error: %s", err) + } +} + +func (s *Server) Shutdown(ctx context.Context) (err error) { + if s.redirect != nil { + err = s.redirect.Shutdown(ctx) + } + err = s.Server.Shutdown(ctx) + return +} + +func (s *Server) GetHost() string { return extractHost(s.Addr) } + +func (s *Server) GetProtocol() string { + protocol := "http" + if s.opts.Https { + protocol = "https" + } + return protocol +} + +func (s *Server) redirection() (*Server, error) { + srv, err := NewServer(s.opts.HttpsRedirectAddress, func(serv *Server) http.Handler { + h := http.NewServeMux() + h.Handle("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + httpsURL := url.URL{Scheme: "https", Host: s.Addr, Path: r.URL.Path, RawQuery: r.URL.RawQuery} + rdr := httpsURL.String() + log.Printf("Redirect: http://%s%s -> %s", r.Host, r.URL.String(), rdr) + http.Redirect(w, r, rdr, http.StatusFound) + })) + // do we need this after all? + if serv.autoCert != nil { + return serv.autoCert.HTTPHandler(h) + } + return h + }) + log.Printf("Starting HTTP->HTTPS redirection server on %s", srv.Addr) + return srv, err +} diff --git a/pkg/network/httpx/tls.go b/pkg/network/httpx/tls.go new file mode 100644 index 00000000..b85c531d --- /dev/null +++ b/pkg/network/httpx/tls.go @@ -0,0 +1,21 @@ +package httpx + +import ( + "golang.org/x/crypto/acme" + "golang.org/x/crypto/acme/autocert" +) + +type TLS struct { + CertManager *autocert.Manager +} + +func NewTLSConfig(domain string) *TLS { + return &TLS{ + CertManager: &autocert.Manager{ + Prompt: autocert.AcceptTOS, + HostPolicy: autocert.HostWhitelist(domain), + Cache: autocert.DirCache("assets/cache"), + Client: &acme.Client{DirectoryURL: acme.LetsEncryptURL}, + }, + } +} diff --git a/pkg/network/websocket/websocket.go b/pkg/network/websocket/websocket.go new file mode 100644 index 00000000..18687162 --- /dev/null +++ b/pkg/network/websocket/websocket.go @@ -0,0 +1,17 @@ +package websocket + +import ( + "crypto/tls" + "net/url" + + "github.com/gorilla/websocket" +) + +func Connect(address url.URL) (*websocket.Conn, error) { + dialer := websocket.Dialer{} + if address.Scheme == "wss" { + dialer.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} + } + ws, _, err := dialer.Dial(address.String(), nil) + return ws, err +} diff --git a/pkg/os/os.go b/pkg/os/os.go new file mode 100644 index 00000000..ea4e0ecd --- /dev/null +++ b/pkg/os/os.go @@ -0,0 +1,23 @@ +package os + +import ( + "os" + "os/signal" + "syscall" +) + +type Signal struct { + event chan os.Signal + done chan struct{} +} + +func ExpectTermination() chan struct{} { + signals := make(chan os.Signal, 1) + signal.Notify(signals, os.Interrupt, syscall.SIGTERM) + done := make(chan struct{}, 1) + go func() { + <-signals + done <- struct{}{} + }() + return done +} diff --git a/pkg/server/server.go b/pkg/server/server.go deleted file mode 100644 index 288e4b05..00000000 --- a/pkg/server/server.go +++ /dev/null @@ -1,9 +0,0 @@ -package server - -import "context" - -type Server interface { - Init(conf interface{}) error - Run() error - Shutdown(ctx context.Context) error -} diff --git a/pkg/service/service.go b/pkg/service/service.go new file mode 100644 index 00000000..84437a1a --- /dev/null +++ b/pkg/service/service.go @@ -0,0 +1,48 @@ +package service + +import ( + "context" + "log" +) + +// Service defines a generic service. +type Service interface{} + +// RunnableService defines a service that can be run. +type RunnableService interface { + Service + + Run() + Shutdown(ctx context.Context) error +} + +// Group is a container for managing a bunch of services. +type Group struct { + list []Service +} + +func (g *Group) Add(services ...Service) { + for _, s := range services { + g.list = append(g.list, s) + } +} + +// Start starts each service in the group. +func (g *Group) Start() { + for _, s := range g.list { + if v, ok := s.(RunnableService); ok { + go v.Run() + } + } +} + +// Shutdown terminates a group of services. +func (g *Group) Shutdown(ctx context.Context) { + for _, s := range g.list { + if v, ok := s.(RunnableService); ok { + if err := v.Shutdown(ctx); err != nil && err != context.Canceled { + log.Printf("error: failed to stop [%s] because of %v", s, err) + } + } + } +} diff --git a/pkg/worker/handlers.go b/pkg/worker/handlers.go index e7b41747..9954bd8a 100644 --- a/pkg/worker/handlers.go +++ b/pkg/worker/handlers.go @@ -1,7 +1,9 @@ package worker import ( - "crypto/tls" + "context" + "encoding/base64" + "encoding/json" "log" "net/url" "os" @@ -10,20 +12,21 @@ import ( "github.com/giongto35/cloud-game/v2/pkg/config/worker" "github.com/giongto35/cloud-game/v2/pkg/cws/api" "github.com/giongto35/cloud-game/v2/pkg/emulator/libretro/manager/remotehttp" - "github.com/giongto35/cloud-game/v2/pkg/environment" "github.com/giongto35/cloud-game/v2/pkg/games" + "github.com/giongto35/cloud-game/v2/pkg/network/websocket" + "github.com/giongto35/cloud-game/v2/pkg/service" "github.com/giongto35/cloud-game/v2/pkg/webrtc" storage "github.com/giongto35/cloud-game/v2/pkg/worker/cloud-storage" "github.com/giongto35/cloud-game/v2/pkg/worker/room" - "github.com/gorilla/websocket" ) type Handler struct { + service.RunnableService + + address string // Client that connects to coordinator oClient *CoordinatorClient - // Raw address of coordinator - coordinatorHost string - cfg worker.Config + cfg worker.Config // Rooms map : RoomID -> Room rooms map[string]*room.Room // global ID of the current server @@ -32,38 +35,34 @@ type Handler struct { onlineStorage *storage.Client // sessions handles all sessions server is handler (key is sessionID) sessions map[string]*Session - - w *Worker } // NewHandler returns a new server -func NewHandler(cfg worker.Config, wrk *Worker) *Handler { +func NewHandler(conf worker.Config, address string) *Handler { // Create offline storage folder - createOfflineStorage(cfg.Emulator.Storage) - + createOfflineStorage(conf.Emulator.Storage) // Init online storage onlineStorage := storage.NewInitClient() return &Handler{ - rooms: map[string]*room.Room{}, - sessions: map[string]*Session{}, - coordinatorHost: cfg.Worker.Network.CoordinatorAddress, - cfg: cfg, - onlineStorage: onlineStorage, - w: wrk, + address: address, + cfg: conf, + onlineStorage: onlineStorage, + rooms: map[string]*room.Room{}, + sessions: map[string]*Session{}, } } // Run starts a Handler running logic func (h *Handler) Run() { - conf := h.cfg.Worker.Network + coordinatorAddress := h.cfg.Worker.Network.CoordinatorAddress for { - conn, err := setupCoordinatorConnection(conf.CoordinatorAddress, conf.Zone, h.cfg) + conn, err := newCoordinatorConnection(coordinatorAddress, h.cfg.Worker, h.address) if err != nil { log.Printf("Cannot connect to coordinator. %v Retrying...", err) time.Sleep(time.Second) continue } - log.Printf("[worker] connected to: %v", conf.CoordinatorAddress) + log.Printf("[worker] connected to: %v", coordinatorAddress) h.oClient = conn go h.oClient.Heartbeat() @@ -73,13 +72,7 @@ func (h *Handler) Run() { } } -func (h *Handler) RequestConfig() { - log.Printf("[worker] asking for a config...") - response := h.oClient.SyncSend(api.ConfigPacket()) - conf := worker.EmptyConfig() - conf.Deserialize([]byte(response.Data)) - log.Printf("[worker] pulled config: %+v", conf) -} +func (h *Handler) Shutdown(context.Context) error { return nil } func (h *Handler) Prepare() { if !h.cfg.Emulator.Libretro.Cores.Repo.Sync { @@ -99,39 +92,36 @@ func (h *Handler) Prepare() { } } -func setupCoordinatorConnection(host string, zone string, cfg worker.Config) (*CoordinatorClient, error) { - var scheme string - env := cfg.Environment.Get() - if env.AnyOf(environment.Production, environment.Staging) { +func newCoordinatorConnection(host string, conf worker.Worker, addr string) (*CoordinatorClient, error) { + scheme := "ws" + if conf.Network.Secure { scheme = "wss" - } else { - scheme = "ws" + } + address := url.URL{Scheme: scheme, Host: host, Path: conf.Network.Endpoint} + + req, err := MakeConnectionRequest(conf, addr) + if req != "" && err == nil { + address.RawQuery = "data=" + req } - coordinatorURL := url.URL{Scheme: scheme, Host: host, Path: "/wso", RawQuery: "zone=" + zone} - log.Println("Worker connecting to coordinator:", coordinatorURL.String()) - - conn, err := createCoordinatorConnection(&coordinatorURL) + conn, err := websocket.Connect(address) if err != nil { return nil, err } return NewCoordinatorClient(conn), nil } -func createCoordinatorConnection(url *url.URL) (*websocket.Conn, error) { - var d websocket.Dialer - if url.Scheme == "wss" { - d = websocket.Dialer{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}} - } else { - d = websocket.Dialer{} +func MakeConnectionRequest(conf worker.Worker, address string) (string, error) { + req := api.ConnectionRequest{ + Zone: conf.Network.Zone, + PingAddr: conf.GetPingAddr(address), + IsHTTPS: conf.Server.Https, } - - ws, _, err := d.Dial(url.String(), nil) + rez, err := json.Marshal(req) if err != nil { - return nil, err + return "", err } - - return ws, nil + return base64.URLEncoding.EncodeToString(rez), nil } func (h *Handler) GetCoordinatorClient() *CoordinatorClient { diff --git a/pkg/worker/http.go b/pkg/worker/http.go new file mode 100644 index 00000000..f7de3fa8 --- /dev/null +++ b/pkg/worker/http.go @@ -0,0 +1,31 @@ +package worker + +import ( + "net/http" + + "github.com/giongto35/cloud-game/v2/pkg/config/worker" + "github.com/giongto35/cloud-game/v2/pkg/network/httpx" +) + +func NewHTTPServer(conf worker.Config) (*httpx.Server, error) { + srv, err := httpx.NewServer( + conf.Worker.GetAddr(), + func(*httpx.Server) http.Handler { + h := http.NewServeMux() + h.HandleFunc(conf.Worker.Network.PingEndpoint, func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", "*") + _, _ = w.Write([]byte{0x65, 0x63, 0x68, 0x6f}) // echo + }) + return h + }, + httpx.WithServerConfig(conf.Worker.Server), + // no need just for one route + httpx.HttpsRedirect(false), + httpx.WithPortRoll(true), + httpx.WithZone(conf.Worker.Network.Zone), + ) + if err != nil { + return nil, err + } + return srv, nil +} diff --git a/pkg/worker/internalhandlers.go b/pkg/worker/internalhandlers.go index eb17c6a1..46cfe5bf 100644 --- a/pkg/worker/internalhandlers.go +++ b/pkg/worker/internalhandlers.go @@ -16,8 +16,6 @@ func (h *Handler) handleServerId() cws.PacketHandler { return func(resp cws.WSPacket) (req cws.WSPacket) { log.Printf("[worker] new id: %s", resp.Data) h.serverID = resp.Data - // unlock worker if it's locked - h.w.lock.Unlock() return } } diff --git a/pkg/worker/server.go b/pkg/worker/server.go deleted file mode 100644 index 5dacef6f..00000000 --- a/pkg/worker/server.go +++ /dev/null @@ -1,123 +0,0 @@ -package worker - -import ( - "crypto/tls" - "fmt" - "log" - "net/http" - "strconv" - "time" - - "github.com/giongto35/cloud-game/v2/pkg/environment" - "golang.org/x/crypto/acme" - "golang.org/x/crypto/acme/autocert" -) - -const stagingLEURL = "https://acme-staging-v02.api.letsencrypt.org/directory" - -var echo = []byte{0x65, 0x63, 0x68, 0x6f} - -func makeServerFromMux(mux *http.ServeMux) *http.Server { - // set timeouts so that a slow or malicious client doesn't - // hold resources forever - return &http.Server{ - ReadTimeout: 5 * time.Second, - WriteTimeout: 5 * time.Second, - IdleTimeout: 120 * time.Second, - Handler: mux, - } -} - -func makeHTTPServer() *http.Server { - mux := &http.ServeMux{} - mux.HandleFunc("/echo", func(w http.ResponseWriter, _ *http.Request) { - w.Header().Set("Access-Control-Allow-Origin", "*") - _, _ = w.Write(echo) - }) - - return makeServerFromMux(mux) -} - -func makeHTTPToHTTPSRedirectServer() *http.Server { - handleRedirect := func(w http.ResponseWriter, r *http.Request) { - newURI := "https://" + r.Host + r.URL.String() - http.Redirect(w, r, newURI, http.StatusFound) - } - mux := &http.ServeMux{} - mux.HandleFunc("/", handleRedirect) - - return makeServerFromMux(mux) -} - -func (wrk *Worker) spawnServer(port int) { - var certManager *autocert.Manager - var httpsSrv *http.Server - - mode := wrk.conf.Environment.Get() - if mode.AnyOf(environment.Production, environment.Staging) { - serverConfig := wrk.conf.Worker.Server - httpsSrv = makeHTTPServer() - httpsSrv.Addr = fmt.Sprintf(":%d", serverConfig.HttpsPort) - - if serverConfig.HttpsChain == "" || serverConfig.HttpsKey == "" { - serverConfig.HttpsChain = "" - serverConfig.HttpsKey = "" - - var leurl string - if mode == environment.Staging { - leurl = stagingLEURL - } else { - leurl = acme.LetsEncryptURL - } - - certManager = &autocert.Manager{ - Prompt: autocert.AcceptTOS, - Cache: autocert.DirCache("assets/cache"), - Client: &acme.Client{DirectoryURL: leurl}, - } - - httpsSrv.TLSConfig = &tls.Config{GetCertificate: certManager.GetCertificate} - } - - go func(chain string, key string) { - log.Printf("Starting HTTPS server on %s\n", httpsSrv.Addr) - err := httpsSrv.ListenAndServeTLS(chain, key) - if err != nil { - log.Printf("httpsSrv.ListendAndServeTLS() failed with %s", err) - } - }(serverConfig.HttpsChain, serverConfig.HttpsKey) - } - - var httpSrv *http.Server - if mode.AnyOf(environment.Production, environment.Staging) { - httpSrv = makeHTTPToHTTPSRedirectServer() - } else { - httpSrv = makeHTTPServer() - } - - if certManager != nil { - httpSrv.Handler = certManager.HTTPHandler(httpSrv.Handler) - } - - startServer(httpSrv, port) -} - -func startServer(serv *http.Server, startPort int) { - // It's recommend to run one worker on one instance. - // This logic is to make sure more than 1 workers still work - for port, n := startPort, startPort+100; port < n; port++ { - serv.Addr = ":" + strconv.Itoa(port) - err := serv.ListenAndServe() - switch err { - case http.ErrServerClosed: - log.Printf("HTTP(S) server was closed") - return - default: - } - port++ - - if port == n { - log.Printf("error: couldn't find an open port in range %v-%v\n", startPort, port) - } - } -} diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 4ebb2b9c..a3f276c7 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -1,74 +1,24 @@ package worker import ( - "context" "log" - "time" "github.com/giongto35/cloud-game/v2/pkg/config/worker" - "github.com/giongto35/cloud-game/v2/pkg/lock" "github.com/giongto35/cloud-game/v2/pkg/monitoring" - "github.com/giongto35/cloud-game/v2/pkg/server" - "github.com/golang/glog" + "github.com/giongto35/cloud-game/v2/pkg/service" ) -type Worker struct { - ctx context.Context - conf worker.Config - servers []server.Server - // to pause initialization - lock *lock.TimeLock -} - -func New(ctx context.Context, conf worker.Config) *Worker { - return &Worker{ctx: ctx, conf: conf, lock: lock.NewLock()} -} - -func (wrk *Worker) Run() { - go wrk.init() - wrk.servers = []server.Server{ - monitoring.NewServerMonitoring(wrk.conf.Worker.Monitoring, "worker"), +func New(conf worker.Config) (services service.Group) { + httpSrv, err := NewHTTPServer(conf) + if err != nil { + log.Fatalf("http init fail: %v", err) } - wrk.startModules() -} - -func (wrk *Worker) init() { - h := NewHandler(wrk.conf, wrk) - defer func() { - log.Printf("[worker] Closing handler") - h.Close() - }() - - go h.Run() - if !wrk.conf.Loaded { - wrk.lock.LockFor(time.Second * 10) - h.RequestConfig() - } - h.Prepare() - wrk.spawnServer(wrk.conf.Worker.Server.Port) -} - -func (wrk *Worker) startModules() { - glog.Info("[worker] active modules: ", wrk.servers) - for _, s := range wrk.servers { - s := s - go func() { - if err := s.Init(wrk.conf); err != nil { - glog.Errorf("failed server init") - return - } - if err := s.Run(); err != nil { - glog.Errorf("failed start server: [%v]", err) - } - }() - } -} - -// !to add a proper HTTP(S) server shutdown (cws/handler bad loop) -func (wrk *Worker) Shutdown() { - for _, s := range wrk.servers { - if err := s.Shutdown(wrk.ctx); err != nil { - glog.Errorln("failed server shutdown") - } + services.Add( + httpSrv, + NewHandler(conf, httpSrv.Addr), + ) + if conf.Worker.Monitoring.IsEnabled() { + services.Add(monitoring.New(conf.Worker.Monitoring, httpSrv.GetHost(), "worker")) } + return }