mirror of
https://github.com/wassname/openshift-celery-cartridge.git
synced 2026-06-27 16:10:05 +08:00
Scaling
This commit is contained in:
@@ -0,0 +1,2 @@
|
||||
env
|
||||
logs
|
||||
+12
@@ -1,5 +1,7 @@
|
||||
#!/bin/bash -e
|
||||
|
||||
source $OPENSHIFT_CARTRIDGE_SDK_BASH
|
||||
|
||||
PATH=$OPENSHIFT_DATA_DIR:$OPENSHIFT_DATA_DIR/config/:${OPENSHIFT_CELERY_DIR}bin/:${OPENSHIFT_CELERY_DIR}usr/bin:${OPENSHIFT_CELERY_DIR}conf.d:$PATH
|
||||
|
||||
#TODO add scaling, better status, variable worker names, stop workers properly
|
||||
@@ -39,10 +41,20 @@ function status() {
|
||||
fi
|
||||
}
|
||||
|
||||
|
||||
function catchall {
|
||||
echo "not yet implemented"
|
||||
}
|
||||
|
||||
# Ensure arguments.
|
||||
if ! [ $# -gt 0 ]; then
|
||||
echo "Usage: $0 [start|restart|stop|status]"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Source utility functions.
|
||||
source $OPENSHIFT_CARTRIDGE_SDK_BASH
|
||||
|
||||
case "$1" in
|
||||
start) start ;;
|
||||
stop) stop ;;
|
||||
|
||||
@@ -0,0 +1,9 @@
|
||||
#!/bin/bash -e
|
||||
|
||||
source $OPENSHIFT_CARTRIDGE_SDK_BASH
|
||||
|
||||
for dir in logs pid tmp env; do
|
||||
mkdir -p $dir
|
||||
done
|
||||
|
||||
mkdir -p $OPENSHIFT_DATA_DIR/.celery
|
||||
Executable
+5
@@ -0,0 +1,5 @@
|
||||
#!/bin/bash
|
||||
|
||||
echo "S_CELERY_MASTER=$CELERY_MASTER"
|
||||
echo "S_CELERY_HOST=$OPENSHIFT_GEAR_DNS"
|
||||
echo "S_CELERY_PORT=$OPENSHIFT_GEAR_PORT"
|
||||
Executable
+4
@@ -0,0 +1,4 @@
|
||||
#!/bin/bash
|
||||
|
||||
echo "OPENSHIFT_CELERY_HOST=$OPENSHIFT_GEAR_DNS"
|
||||
echo "OPENSHIFT_CELERY_PORT=$OPENSHIFT_GEAR_PORT"
|
||||
Executable
+87
@@ -0,0 +1,87 @@
|
||||
#!/usr/bin/ruby
|
||||
|
||||
def gear_info(tokens, gear_id, &block)
|
||||
gears = {}
|
||||
if tokens.length == 1
|
||||
gears[gear_id] = tokens.first
|
||||
else
|
||||
while not tokens.empty?
|
||||
gear, delim, data = tokens.shift(3)
|
||||
data = yield data if block_given?
|
||||
raise "Invalid data" unless delim == '='
|
||||
gears[gear] = data
|
||||
tokens.shift if tokens.first == ' '
|
||||
end
|
||||
end
|
||||
gears
|
||||
end
|
||||
|
||||
def tokenize(s)
|
||||
tokens = []
|
||||
a = ""
|
||||
state = :start
|
||||
s.scan(/([ \t]+)|(\\')|(')|([^ \t']+)/) do |args|
|
||||
space, escaped_delim, delim, text = args
|
||||
case state
|
||||
when :start
|
||||
case
|
||||
when space then " "
|
||||
when escaped_delim then raise "Unexpected delimiter"
|
||||
when delim then state = :within_delim
|
||||
when text then tokens << text
|
||||
else raise "error"
|
||||
end
|
||||
when :within_delim
|
||||
case
|
||||
when space then a << space
|
||||
when escaped_delim then a << '"'
|
||||
when delim then tokens << a; a = ""; state = :start
|
||||
when text then a << text
|
||||
end
|
||||
end
|
||||
end
|
||||
tokens << a if a.length > 0
|
||||
tokens
|
||||
end
|
||||
|
||||
gear_id = ARGV.shift
|
||||
domain = ARGV.shift
|
||||
tokens = tokenize(ARGV.shift)
|
||||
|
||||
# better if this is in creation order (oldest first)
|
||||
gears = gear_info(tokens, gear_id) do |d|
|
||||
d.split(' ').map{ |s| s.scan(/\A(.+?)=(.*?);?\Z/).first }.inject({}){ |h, (k,v)| h[k] = v if v != ''; h }
|
||||
end
|
||||
gears.each_pair{ |k,v| puts "Found gear #{k}#{k == gear_id ? '* ' : ''} with data #{v.inspect}" }
|
||||
gear_ids = gears.keys.sort.uniq
|
||||
|
||||
puts "-------"
|
||||
|
||||
was_master = ENV['CELERY_MASTER'] == '1'
|
||||
masters = gears.map{ |k,v| v['S_CELERY_MASTER'] == '1' ? k : nil }.compact.uniq.sort
|
||||
|
||||
mode =
|
||||
case ENV['CELERY_MODE']
|
||||
when 'read_replica'
|
||||
if masters.length > 1
|
||||
masters = masters[0,1]
|
||||
elsif masters.length == 0
|
||||
masters = gears.keys.uniq.sort[0,1]
|
||||
end
|
||||
puts "Running in read replica mode with master #{masters}"
|
||||
:read_replica
|
||||
else
|
||||
masters = gears.keys.uniq.sort
|
||||
puts "Running sharded with masters #{masters.inspect}"
|
||||
:sharded
|
||||
end
|
||||
|
||||
# An array of all of the host:port pairs for the cluster
|
||||
hosts = gears.map{ |k,v| "#{v['S_CELERY_HOST']}:#{v['S_CELERY_PORT']}" }.compact.uniq.sort
|
||||
# A list of key value pairs in <gear id> => <host:port pair> for the cluster
|
||||
members = gears.map{ |k,v| "#{k}=#{v['S_CELERY_HOST']}:#{v['S_CELERY_PORT']}" }.compact.uniq.sort
|
||||
|
||||
File.open('env/CELERY_CLUSTER_MEMBERS', 'w'){ |f| f.puts members.join("\n") }
|
||||
File.open('env/CELERY_CLUSTER', 'w'){ |f| f.puts hosts.join(",") }
|
||||
File.open('env/CELERY_CLUSTER_MASTERS', 'w'){ |f| f.puts masters.join(",") }
|
||||
File.open('env/CELERY_MASTER', 'w'){ |f| f.puts masters.include?(gear_id) ? "1" : "0" }
|
||||
@@ -3,5 +3,3 @@ locked_files:
|
||||
- env/
|
||||
- env/*
|
||||
- conf.d/*
|
||||
processed_templates:
|
||||
- '**/*.erb'
|
||||
|
||||
+20
-2
@@ -2,7 +2,7 @@ Name: celeryd
|
||||
Cartridge-Short-Name: CELERY
|
||||
Display-Name: Celery Cartridge
|
||||
Description: A cartridge for running the celeryd daemon to allow asynchronous tasks in your application.
|
||||
Version: '0.15'
|
||||
Version: '0.16'
|
||||
License: ASL 2.0
|
||||
Vendor: Matti HEIKKILA
|
||||
Cartridge-Version:
|
||||
@@ -12,9 +12,18 @@ Website: https://github.com/wassname/openshift-celery-cartridge
|
||||
Source-Url: https://github.com/wassname/openshift-celery-cartridge.git
|
||||
Categories:
|
||||
- service
|
||||
Requires: python-2.7
|
||||
- embedded
|
||||
Requires: celery
|
||||
Scaling:
|
||||
Min: 1
|
||||
Max: -1
|
||||
Provides:
|
||||
- celery
|
||||
Endpoints:
|
||||
- Private-IP-Name: HOST
|
||||
Private-Port-Name: PORT
|
||||
Private-Port: 16388
|
||||
Public-Port-Name: PROXY_PORT
|
||||
Cart-Data:
|
||||
- Key: OPENSHIFT_CELERY_HOME
|
||||
Type: environment
|
||||
@@ -28,3 +37,12 @@ Cart-Data:
|
||||
Group-Overrides:
|
||||
- components:
|
||||
- celery
|
||||
Subscribes:
|
||||
set-redis-connection-info:
|
||||
Type: "ENV:NET_TCP:db:celery:connection-info-v1"
|
||||
Required: false
|
||||
Publishes:
|
||||
publish-db-connection-info:
|
||||
Type: "ENV:NET_TCP:db:connection-info"
|
||||
publish-redis-connection-info:
|
||||
Type: "ENV:NET_TCP:db:celery:connection-info-v1"
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
|
||||
@@ -1,6 +0,0 @@
|
||||
#!/bin/bash
|
||||
# This deploy hook gets executed after dependencies are resolved and the
|
||||
# build hook has been run but before the application has been started back
|
||||
# up again. This script gets executed directly, so it could be python, php,
|
||||
# ruby, etc.
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
|
||||
@@ -1,4 +0,0 @@
|
||||
#!/bin/bash
|
||||
|
||||
source $OPENSHIFT_HOMEDIR/python/virtenv/bin/activate
|
||||
export PYTHONPATH=`echo $OPENSHIFT_REPO_DIR`:$PYTHONPATH
|
||||
@@ -1 +0,0 @@
|
||||
|
||||
@@ -1,15 +0,0 @@
|
||||
import celery
|
||||
|
||||
app = celery.Celery()
|
||||
app.config_from_object('celeryconfig')
|
||||
|
||||
@app.task
|
||||
def addtest(x, y):
|
||||
return x + y
|
||||
|
||||
@app.task
|
||||
def run_spiders(name, **kwargs):
|
||||
"""
|
||||
@param name: spider name
|
||||
"""
|
||||
pass
|
||||
Reference in New Issue
Block a user