diff --git a/cmd/serve.go b/cmd/serve.go
index 8590f64..4ce79a1 100644
--- a/cmd/serve.go
+++ b/cmd/serve.go
@@ -30,8 +30,8 @@ func serve(ctx context.Context) error {
so := serveropts.NewServerOptions(serverOpts, k.String("config"))
// pass the logger options to the job queue
- so.Config.Settings.JobQueue.Logger.Debug = k.Bool(debugFlag)
- so.Config.Settings.JobQueue.Logger.Pretty = k.Bool(prettyFlag)
+ so.Config.Settings.River.Logger.Debug = k.Bool(debugFlag)
+ so.Config.Settings.River.Logger.Pretty = k.Bool(prettyFlag)
- return river.Start(ctx, so.Config.Settings.JobQueue)
+ return river.Start(ctx, so.Config.Settings.River)
}
diff --git a/config/.env.example b/config/.env.example
index e7b1b5d..2dd0e57 100644
--- a/config/.env.example
+++ b/config/.env.example
@@ -1,11 +1,11 @@
RIVERBOAT_REFRESHINTERVAL="10m"
-RIVERBOAT_JOBQUEUE_DATABASEHOST="postgres://postgres:password@0.0.0.0:5432/jobs?sslmode=disable"
-RIVERBOAT_JOBQUEUE_QUEUES=""
-RIVERBOAT_JOBQUEUE_WORKERS_EMAILWORKER_DEVMODE="true"
-RIVERBOAT_JOBQUEUE_WORKERS_EMAILWORKER_TESTDIR="fixtures/email"
-RIVERBOAT_JOBQUEUE_WORKERS_EMAILWORKER_TOKEN=""
-RIVERBOAT_JOBQUEUE_WORKERS_EMAILWORKER_FROMEMAIL="no-reply@example.com"
-RIVERBOAT_JOBQUEUE_WORKERS_DATABASEWORKER_CONFIG_ENABLED="true"
-RIVERBOAT_JOBQUEUE_WORKERS_DATABASEWORKER_CONFIG_BASEURL="http://localhost:1337"
-RIVERBOAT_JOBQUEUE_WORKERS_DATABASEWORKER_CONFIG_ENDPOINT="query"
-RIVERBOAT_JOBQUEUE_WORKERS_DATABASEWORKER_CONFIG_DEBUG="false"
+RIVERBOAT_RIVER_DATABASEHOST="postgres://postgres:password@0.0.0.0:5432/jobs?sslmode=disable"
+RIVERBOAT_RIVER_QUEUES=""
+RIVERBOAT_RIVER_WORKERS_EMAILWORKER_CONFIG_DEVMODE="true"
+RIVERBOAT_RIVER_WORKERS_EMAILWORKER_CONFIG_TESTDIR="fixtures/email"
+RIVERBOAT_RIVER_WORKERS_EMAILWORKER_CONFIG_TOKEN=""
+RIVERBOAT_RIVER_WORKERS_EMAILWORKER_CONFIG_FROMEMAIL="no-reply@example.com"
+RIVERBOAT_RIVER_WORKERS_DATABASEWORKER_CONFIG_ENABLED="true"
+RIVERBOAT_RIVER_WORKERS_DATABASEWORKER_CONFIG_BASEURL="http://localhost:1337"
+RIVERBOAT_RIVER_WORKERS_DATABASEWORKER_CONFIG_ENDPOINT="query"
+RIVERBOAT_RIVER_WORKERS_DATABASEWORKER_CONFIG_DEBUG="false"
diff --git a/config/config.example.yaml b/config/config.example.yaml
index bcb8170..82b5fae 100644
--- a/config/config.example.yaml
+++ b/config/config.example.yaml
@@ -1,4 +1,5 @@
-jobQueue:
+refreshInterval: 600000000000
+river:
databaseHost: postgres://postgres:password@0.0.0.0:5432/jobs?sslmode=disable
queues: null
workers:
@@ -9,9 +10,8 @@ jobQueue:
enabled: true
endpoint: query
emailWorker:
- email:
+ config:
devMode: true
fromEmail: no-reply@example.com
testDir: fixtures/email
token: ""
-refreshInterval: 600000000000
diff --git a/config/config.go b/config/config.go
index cf900cc..eb40ca1 100644
--- a/config/config.go
+++ b/config/config.go
@@ -1,7 +1,6 @@
package config
import (
- "fmt"
"strings"
"time"
@@ -23,8 +22,8 @@ var (
type Config struct {
// RefreshInterval determines how often to reload the config
RefreshInterval time.Duration `json:"refreshInterval" koanf:"refreshInterval" default:"10m"`
- // JobQueue is the configuration for the job queue
- JobQueue river.Config `koanf:"jobQueue" json:"jobQueue"`
+ // River is the configuration for the job queue
+ River river.Config `koanf:"river" json:"river"`
}
// Load is responsible for loading the configuration from a YAML file and environment variables.
@@ -44,18 +43,23 @@ func Load(cfgFile *string) (*Config, error) {
// parse yaml config
if err := k.Load(file.Provider(*cfgFile), yaml.Parser()); err != nil {
- fmt.Println("failed to load config file", err)
- } else {
- // unmarshal the config
- if err := k.Unmarshal("", &conf); err != nil {
- panic(err)
- }
+ panic(err)
+ }
+
+ // unmarshal the config
+ if err := k.Unmarshal("", &conf); err != nil {
+ panic(err)
}
// load env vars
- if err := k.Load(env.Provider(envPrefix, ".", func(s string) string {
- return strings.ReplaceAll(strings.ToLower(
- strings.TrimPrefix(s, envPrefix)), "_", ".")
+ if err := k.Load(env.ProviderWithValue(envPrefix, ".", func(s string, v string) (string, interface{}) {
+ key := strings.ReplaceAll(strings.ToLower(strings.TrimPrefix(s, envPrefix)), "_", ".")
+
+ if strings.Contains(v, ",") {
+ return key, strings.Split(v, ",")
+ }
+
+ return key, v
}), nil); err != nil {
panic(err)
}
diff --git a/config/configmap.yaml b/config/configmap.yaml
index 37d33eb..841a3e7 100644
--- a/config/configmap.yaml
+++ b/config/configmap.yaml
@@ -11,13 +11,13 @@ metadata:
{{- end }}
data:
RIVERBOAT_REFRESHINTERVAL: {{ .Values.riverboat.refreshInterval | default "10m" }}
- RIVERBOAT_JOBQUEUE_DATABASEHOST: {{ .Values.riverboat.jobQueue.databaseHost | default "postgres://postgres:password@0.0.0.0:5432/jobs?sslmode=disable" }}
- RIVERBOAT_JOBQUEUE_QUEUES: {{ .Values.riverboat.jobQueue.queues }}
- RIVERBOAT_JOBQUEUE_WORKERS_EMAILWORKER_DEVMODE: {{ .Values.riverboat.jobQueue.workers.emailWorker.devMode | default true }}
- RIVERBOAT_JOBQUEUE_WORKERS_EMAILWORKER_TESTDIR: {{ .Values.riverboat.jobQueue.workers.emailWorker.testDir | default "fixtures/email" }}
- RIVERBOAT_JOBQUEUE_WORKERS_EMAILWORKER_TOKEN: {{ .Values.riverboat.jobQueue.workers.emailWorker.token }}
- RIVERBOAT_JOBQUEUE_WORKERS_EMAILWORKER_FROMEMAIL: {{ .Values.riverboat.jobQueue.workers.emailWorker.fromEmail | default "no-reply@example.com" }}
- RIVERBOAT_JOBQUEUE_WORKERS_DATABASEWORKER_CONFIG_ENABLED: {{ .Values.riverboat.jobQueue.workers.databaseWorker.config.enabled | default true }}
- RIVERBOAT_JOBQUEUE_WORKERS_DATABASEWORKER_CONFIG_BASEURL: {{ .Values.riverboat.jobQueue.workers.databaseWorker.config.baseUrl | default "http://localhost:1337" }}
- RIVERBOAT_JOBQUEUE_WORKERS_DATABASEWORKER_CONFIG_ENDPOINT: {{ .Values.riverboat.jobQueue.workers.databaseWorker.config.endpoint | default "query" }}
- RIVERBOAT_JOBQUEUE_WORKERS_DATABASEWORKER_CONFIG_DEBUG: {{ .Values.riverboat.jobQueue.workers.databaseWorker.config.debug | default false }}
+ RIVERBOAT_RIVER_DATABASEHOST: {{ .Values.riverboat.river.databaseHost | default "postgres://postgres:password@0.0.0.0:5432/jobs?sslmode=disable" }}
+ RIVERBOAT_RIVER_QUEUES: {{ .Values.riverboat.river.queues }}
+ RIVERBOAT_RIVER_WORKERS_EMAILWORKER_CONFIG_DEVMODE: {{ .Values.riverboat.river.workers.emailWorker.config.devMode | default true }}
+ RIVERBOAT_RIVER_WORKERS_EMAILWORKER_CONFIG_TESTDIR: {{ .Values.riverboat.river.workers.emailWorker.config.testDir | default "fixtures/email" }}
+ RIVERBOAT_RIVER_WORKERS_EMAILWORKER_CONFIG_TOKEN: {{ .Values.riverboat.river.workers.emailWorker.config.token }}
+ RIVERBOAT_RIVER_WORKERS_EMAILWORKER_CONFIG_FROMEMAIL: {{ .Values.riverboat.river.workers.emailWorker.config.fromEmail | default "no-reply@example.com" }}
+ RIVERBOAT_RIVER_WORKERS_DATABASEWORKER_CONFIG_ENABLED: {{ .Values.riverboat.river.workers.databaseWorker.config.enabled | default true }}
+ RIVERBOAT_RIVER_WORKERS_DATABASEWORKER_CONFIG_BASEURL: {{ .Values.riverboat.river.workers.databaseWorker.config.baseUrl | default "http://localhost:1337" }}
+ RIVERBOAT_RIVER_WORKERS_DATABASEWORKER_CONFIG_ENDPOINT: {{ .Values.riverboat.river.workers.databaseWorker.config.endpoint | default "query" }}
+ RIVERBOAT_RIVER_WORKERS_DATABASEWORKER_CONFIG_DEBUG: {{ .Values.riverboat.river.workers.databaseWorker.config.debug | default false }}
diff --git a/configgen/api-docs.md b/configgen/api-docs.md
index aaa9a85..d24f9c2 100644
--- a/configgen/api-docs.md
+++ b/configgen/api-docs.md
@@ -5,11 +5,11 @@
|Name|Type|Description|Required|
|----|----|-----------|--------|
|**refreshInterval**|`integer`|||
-|[**jobQueue**](#jobqueue)|`object`|Config is the configuration for the river server
||
+|[**river**](#river)|`object`|Config is the configuration for the river server
||
**Additional Properties:** not allowed
-
-## jobQueue: object
+
+## river: object
Config is the configuration for the river server
@@ -19,17 +19,17 @@ Config is the configuration for the river server
|Name|Type|Description|Required|
|----|----|-----------|--------|
|**databaseHost**|`string`|DatabaseHost for connecting to the postgres database
||
-|[**queues**](#jobqueuequeues)|`array`|||
-|[**workers**](#jobqueueworkers)|`object`|Workers that will be enabled on the server
||
+|[**queues**](#riverqueues)|`array`|||
+|[**workers**](#riverworkers)|`object`|Workers that will be enabled on the server
||
**Additional Properties:** not allowed
-
-### jobQueue\.queues: array
+
+### river\.queues: array
**Items**
-
-### jobQueue\.workers: object
+
+### river\.workers: object
Workers that will be enabled on the server
@@ -38,22 +38,22 @@ Workers that will be enabled on the server
|Name|Type|Description|Required|
|----|----|-----------|--------|
-|[**emailWorker**](#jobqueueworkersemailworker)|`object`|||
-|[**databaseWorker**](#jobqueueworkersdatabaseworker)|`object`|||
+|[**emailWorker**](#riverworkersemailworker)|`object`|||
+|[**databaseWorker**](#riverworkersdatabaseworker)|`object`|||
**Additional Properties:** not allowed
-
-#### jobQueue\.workers\.emailWorker: object
+
+#### river\.workers\.emailWorker: object
**Properties**
|Name|Type|Description|Required|
|----|----|-----------|--------|
-|[**email**](#jobqueueworkersemailworkeremail)|`object`|||
+|[**config**](#riverworkersemailworkerconfig)|`object`|||
**Additional Properties:** not allowed
-
-##### jobQueue\.workers\.emailWorker\.email: object
+
+##### river\.workers\.emailWorker\.config: object
**Properties**
@@ -65,18 +65,18 @@ Workers that will be enabled on the server
|**fromEmail**|`string`|||
**Additional Properties:** not allowed
-
-#### jobQueue\.workers\.databaseWorker: object
+
+#### river\.workers\.databaseWorker: object
**Properties**
|Name|Type|Description|Required|
|----|----|-----------|--------|
-|[**config**](#jobqueueworkersdatabaseworkerconfig)|`object`|||
+|[**config**](#riverworkersdatabaseworkerconfig)|`object`|||
**Additional Properties:** not allowed
-
-##### jobQueue\.workers\.databaseWorker\.config: object
+
+##### river\.workers\.databaseWorker\.config: object
**Properties**
diff --git a/configgen/riverboat.config.json b/configgen/riverboat.config.json
index e140cbe..f035f35 100644
--- a/configgen/riverboat.config.json
+++ b/configgen/riverboat.config.json
@@ -63,7 +63,7 @@
},
"jobs.EmailWorker": {
"properties": {
- "email": {
+ "config": {
"$ref": "#/$defs/jobs.EmailConfig",
"description": "the email configuration"
}
@@ -125,7 +125,7 @@
"refreshInterval": {
"type": "integer"
},
- "jobQueue": {
+ "river": {
"$ref": "#/$defs/river.Config"
}
},
diff --git a/internal/river/workers.go b/internal/river/workers.go
index 93ea50c..67faa56 100644
--- a/internal/river/workers.go
+++ b/internal/river/workers.go
@@ -12,7 +12,7 @@ func createWorkers(c Workers) (*river.Workers, error) {
workers := river.NewWorkers()
if err := river.AddWorkerSafely(workers, &jobs.EmailWorker{
- EmailConfig: c.EmailWorker.EmailConfig,
+ Config: c.EmailWorker.Config,
},
); err != nil {
return nil, err
diff --git a/pkg/jobs/email.go b/pkg/jobs/email.go
index ff643c7..99a40a9 100644
--- a/pkg/jobs/email.go
+++ b/pkg/jobs/email.go
@@ -24,7 +24,7 @@ func (EmailArgs) Kind() string { return "email" }
type EmailWorker struct {
river.WorkerDefaults[EmailArgs]
- EmailConfig `koanf:"email" json:"email" jsonschema:"description=the email configuration"`
+ Config EmailConfig `koanf:"config" json:"config" jsonschema:"description=the email configuration"`
}
// EmailConfig contains the configuration for the email worker
@@ -41,11 +41,11 @@ type EmailConfig struct {
// validateEmailConfig validates the email configuration settings
func (w *EmailWorker) validateEmailConfig() error {
- if w.DevMode && w.TestDir == "" {
+ if w.Config.DevMode && w.Config.TestDir == "" {
return newMissingRequiredArg("test directory", EmailArgs{}.Kind())
}
- if !w.DevMode && w.Token == "" {
+ if !w.Config.DevMode && w.Config.Token == "" {
return newMissingRequiredArg("token", EmailArgs{}.Kind())
}
@@ -67,19 +67,19 @@ func (w *EmailWorker) Work(ctx context.Context, job *river.Job[EmailArgs]) error
// set the options for the resend client
opts := []resend.Option{}
- if w.DevMode {
- log.Debug().Str("directory", w.TestDir).Msg("running in dev mode")
+ if w.Config.DevMode {
+ log.Debug().Str("directory", w.Config.TestDir).Msg("running in dev mode")
- opts = append(opts, resend.WithDevMode(w.TestDir))
+ opts = append(opts, resend.WithDevMode(w.Config.TestDir))
}
// if the from email is not set on the message, use the default from the worker config
if job.Args.Message.From == "" {
- job.Args.Message.From = w.FromEmail
+ job.Args.Message.From = w.Config.FromEmail
}
// create the resend client
- client, err := resend.New(w.Token, opts...)
+ client, err := resend.New(w.Config.Token, opts...)
if err != nil {
return err
}
diff --git a/pkg/jobs/email_test.go b/pkg/jobs/email_test.go
index f7ee014..36ffdcf 100644
--- a/pkg/jobs/email_test.go
+++ b/pkg/jobs/email_test.go
@@ -32,7 +32,7 @@ func (suite *TestSuite) TestEmailWorker() {
{
name: "happy path, dev mode",
worker: &jobs.EmailWorker{
- EmailConfig: jobs.EmailConfig{
+ Config: jobs.EmailConfig{
DevMode: true,
TestDir: "test",
FromEmail: "robin@scherbatsky.net",
@@ -43,7 +43,7 @@ func (suite *TestSuite) TestEmailWorker() {
{
name: "missing test directory",
worker: &jobs.EmailWorker{
- EmailConfig: jobs.EmailConfig{
+ Config: jobs.EmailConfig{
DevMode: true,
FromEmail: "robin@scherbatsky.net",
},
@@ -54,7 +54,7 @@ func (suite *TestSuite) TestEmailWorker() {
{
name: "missing from email",
worker: &jobs.EmailWorker{
- EmailConfig: jobs.EmailConfig{
+ Config: jobs.EmailConfig{
DevMode: true,
TestDir: "test",
},
@@ -65,7 +65,7 @@ func (suite *TestSuite) TestEmailWorker() {
{
name: "happy path, missing from email but in message",
worker: &jobs.EmailWorker{
- EmailConfig: jobs.EmailConfig{
+ Config: jobs.EmailConfig{
DevMode: true,
TestDir: "test",
},
@@ -75,7 +75,7 @@ func (suite *TestSuite) TestEmailWorker() {
{
name: "missing token",
worker: &jobs.EmailWorker{
- EmailConfig: jobs.EmailConfig{
+ Config: jobs.EmailConfig{
DevMode: false,
FromEmail: "robin@scherbatsky.net",
},
diff --git a/test/email/main.go b/test/email/main.go
index 69654a4..e51377e 100644
--- a/test/email/main.go
+++ b/test/email/main.go
@@ -21,6 +21,7 @@ func main() {
newman.WithSubject("test subject"),
newman.WithText("body"),
newman.WithTo([]string{"meowfunk@example.com"}),
+ newman.WithFrom("no-reply@mail.theopenlane.io"),
)
_, err := client.Insert(context.Background(), jobs.EmailArgs{