is an agent for concurrent bacth board
of magellan-blocks.
builds your managed instance groups and
watches metrics of your pubsub subscriptions. When the number of the unsubscribed messages
increases or descreases, blocks-concurrent-batch-agent
detects it and scale your managed instance group.
- Install Go
- Or use goenv
- You can install goenv by anyenv
- Install the App Engine SDK for Go
git clone
- Install Ruby
make test
make test-coverage
open test/coverage.yyyy-mm-ddThh:mm:ssZ/index.html
$ make run
- Open http://localhost:8080/_ah/login and login
- Open http://localhost:8080/admin/orgs
- Click [New Organization]
- Enter your organization Name and click [Create]
- Reload the page unless your organization appears
- Click [Show] of your organization
- Click [Auth List]
- Click [Create new token]
- Copy the organization ID and the token
Make pipeline.json
like this:
"boot_disk": {
"disk_type": "pd-ssd",
"disk_size_gb": 30
"preemptible": true,
"stackdriver_agent": true,
"token_consumption": 1
$ export AEHOST=localhost:8080
$ export ORG_ID="[the organization ID you got before]"
$ export TOKEN="[the token you got before]"
$ curl -H "Authorization: Bearer $TOKEN" -H 'Content-Type: application/json' -X POST http://$AEHOST/orgs/$ORG_ID/pipelines --data @pipeline.json
$ curl -H "Authorization: Bearer $TOKEN" -H 'Content-Type: application/json' http://$AEHOST/orgs/$ORG_ID/pipelines
$ curl -H "Authorization: Bearer $TOKEN" -H 'Content-Type: application/json' -X DELETE http://$AEHOST/orgs/$ORG_ID/pipelines/$ID
Generate app.yaml
$ erb -T - app/concurrent-batch-agent/app.yaml.erb > app/concurrent-batch-agent/app.yaml
See app.yaml.erb for more detail.
$ make deploy
If you want to set it active, run the following command
$ make update-traffic
means the host name you deployed
- Open http://$AEHOST/admin/orgs
- Click [New Organization]
- Enter your organization Name and click [Create]
- Reload the page unless your organization appears
- Click [Show] of your organization
- Click [Auth List]
- Click [Create new token]
- Copy the token shown
$ export AEHOST="[the host name you deployed]"
$ export ORG_ID="[the organization ID you got before]"
$ export TOKEN="[the token you got before]"
$ curl -v -H "Authorization: Bearer $TOKEN" -H 'Content-Type: application/json' -X POST http://$AEHOST/orgs/$ORG_ID/pipelines --data @pipeline.json
$ curl -v -H "Authorization: Bearer $TOKEN" -H 'Content-Type: application/json' http://$AEHOST/orgs/$ORG_ID/pipelines
Create job.json like this:
"id_by_client": "ID on your app",
And publish (or reserve publishing) the job to the pipeline.
$ export ID="[id of the pipeline]"
$ curl -v -H "Authorization: Bearer $TOKEN" -H 'Content-Type: application/json' -X POST "http://$AEHOST/pipelines/$ID/jobs?ready=true" --data @job.json
$ export ID="[id of the result]"
$ curl -v -H "Authorization: Bearer $TOKEN" -H 'Content-Type: application/json' -X PUT http://$AEHOST/pipelines/$ID/close --data ""
$ curl -v -H "Authorization: Bearer $TOKEN" -H 'Content-Type: application/json' -X DELETE http://$AEHOST/pipelines/$ID
Name | Type | Required | Description |
boot_disk | object | true | Boot disk for VM |
boot_disk.disk_size_gb | int | false | Boot disk size in GB |
boot_disk.disk_type | string | false | Boot disk type: "pd-standard", "pd-ssd" |
boot_disk.source_image | string | true | Boot disk source image URL |
close_policy | int | false | Close policy at the end of jobs: 0: CloseAnyway, 1: CloseOnAllSuccess, 2: CloseNever |
command | string | false | Command given to container |
container_size | int | true | The number of containers on each VM |
container_name | string | true | Container name to pull |
dependency | object | false | Dependency to jobs |
dependency.condition | int | false | Job's Condition to start the pipeline: 0:OnSuccess, 1:OnFailure, 2: OnFinish |
dependency.job_ids | []string | true | Job IDs which the pipeline waits to finish |
docker_run_options | string | false | Options for docker run in startup script |
gpu_accelerators | object | false | GPU accelerator settings |
gpu_accelerators.Count | int | true | The number of GPU accelerators to use |
gpu_accelerators.Type | string | true | GPU accelerator type name (not URL). Run gcloud compute accelerator-types list |
hibernation_delay | int | false | The number of second to start hibernation after all of the jobs finished |
job_scaler | object | false | Setting to scale out |
job_scaler.enabled | bool | true | If true, scaling out is enabled |
job_scaler.max_instance_size | int | true | Max instance size to increase by job_scaler |
machine_type | string | true | VM Machine type: Run gcloud compute machine-types list |
name | string | true | Name of the pipeline |
pulling | object | false | Pulling settings |
pulling.message_per_pull | int | false | The number of messages to pull once. Default is 100. |
pulling.interval_seconds | int | false | The number of second of interval to pull. Default is 30. |
pulling.jobs_per_task | int | false | The number of jobs to pull in a task. Default is 50. |
preemptible | bool | false | If true, use preemptible VMs |
project_id | string | true | GCP Project ID to run |
stackdriver_agent | bool | false | If true, use stackdriver agent |
target_size | int | true | The number of VMs |
token_consumption | int | false | The number of Organization tokens to consume |
zone | string | true | GCP zone to run |
Name | Type | Required | Description |
id_by_client | string | true | The ID which client app generate for the job |
message | map | false | The message to publish to pipeline-job-topic |
message.attributes | map[string]string | false | The attributes of the message | | string | false | The data of the message |
The max size of key of message.attributes
is 256 bytes.
The max size of value of message.attributes
is 1,024 bytes.
If the data for message.attributes
has the key or value which is more than each max size,
You can pass the data to
in JSON format by setting an attribute use-data-as-attributes