diff --git a/surge/client.py b/surge/client.py index bbc0391..c3e701d 100644 --- a/surge/client.py +++ b/surge/client.py @@ -1 +1,36 @@ -__author__ = 'kshimamu' +# Copyright 2015 Cisco Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from surge.surge_deployer.surge import VagrantDeployer +import surge.surge_deployer.utils as surge_utils + + +class SurgeClient: + + def launch_kafka(self, pipeline_name, brokers): + deployer = VagrantDeployer(pipeline_name) + surge_utils.generate_kafka_component(pipeline_name, brokers) + deployer.deploy("virtualbox") + + def launch_storm(self, pipeline_name, supervisors): + deployer = VagrantDeployer(pipeline_name) + surge_utils.generate_storm_component(pipeline_name, supervisors) + deployer.deploy("virtualbox") + + def list_pipeline(self): + return surge_utils.list_pipelines() + + def destroy_pipeline(self, pipeline_name): + deployer = VagrantDeployer(pipeline_name) + deployer.destroy() diff --git a/surge/surge_deployer/basevb/Vagrantfile b/surge/surge_deployer/basevb/Vagrantfile index e08e48a..3327b47 100644 --- a/surge/surge_deployer/basevb/Vagrantfile +++ b/surge/surge_deployer/basevb/Vagrantfile @@ -1,5 +1,3 @@ - - # Copyright 2015 Cisco Systems, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may diff --git a/surge/surge_deployer/basevb/__init__.py b/surge/surge_deployer/basevb/__init__.py index 14e168b..5d4a5ed 100644 --- a/surge/surge_deployer/basevb/__init__.py +++ b/surge/surge_deployer/basevb/__init__.py @@ -1,5 +1,3 @@ - - # Copyright 2015 Cisco Systems, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may diff --git a/surge/surge_deployer/basevb/ansible.cfg b/surge/surge_deployer/basevb/ansible.cfg index 523d6a0..497168e 100644 --- a/surge/surge_deployer/basevb/ansible.cfg +++ b/surge/surge_deployer/basevb/ansible.cfg @@ -1,5 +1,3 @@ - - # Copyright 2015 Cisco Systems, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may diff --git a/surge/surge_deployer/basevb/pipeline.yml b/surge/surge_deployer/basevb/pipeline.yml new file mode 100644 index 0000000..0aac4d3 --- /dev/null +++ b/surge/surge_deployer/basevb/pipeline.yml @@ -0,0 +1,16 @@ +hosts: + kafka: + count: 1 + provider: + virtualbox: + memory: 2048 + zookeeper: + count: 1 + provider: + virtualbox: + memory: 1024 +provider: + type: + virtualbox: + hostname_prefix: '' + ip_start: 10.20.30.10 diff --git a/surge/surge_deployer/surge-docker-playbooks/roles/kafka/defaults/main.yml b/surge/surge_deployer/surge-docker-playbooks/roles/kafka/defaults/main.yml index d47591a..83d836e 100644 --- a/surge/surge_deployer/surge-docker-playbooks/roles/kafka/defaults/main.yml +++ b/surge/surge_deployer/surge-docker-playbooks/roles/kafka/defaults/main.yml @@ -1,5 +1,3 @@ - - # Copyright 2015 Cisco Systems, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -23,4 +21,4 @@ kafka_gid: 51521 kafka_logs_dir: /var/log/kafka kafka_port: 9092 kafka_interface: eth1 - +include: kafka.yml diff --git a/surge/surge_deployer/surge-docker-playbooks/roles/kafka/tasks/main.yml b/surge/surge_deployer/surge-docker-playbooks/roles/kafka/tasks/main.yml index 34e2165..16631c2 100644 --- a/surge/surge_deployer/surge-docker-playbooks/roles/kafka/tasks/main.yml +++ b/surge/surge_deployer/surge-docker-playbooks/roles/kafka/tasks/main.yml @@ -1,5 +1,3 @@ - - # Copyright 2015 Cisco Systems, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -57,8 +55,17 @@ shell: sed 's/[^0-9]//g' /etc/hostname register: id +#- name: Updating the server.properties conf +# template: src=server.properties.j2 dest=/usr/local/etc/kafka/config/server.properties owner={{ kafka_user }} group={{ kafka_group }} mode=0644 + - name: Updating the server.properties conf template: src=server.properties.j2 dest=/usr/local/etc/kafka/config/server.properties owner={{ kafka_user }} group={{ kafka_group }} mode=0644 + when: "'zookeeper' in {{ groups }}" + +- name: Updating server.properties conf for the added kafka node + template: src=add.server.properties.j2 dest=/usr/local/etc/kafka/config/server.properties owner={{ kafka_user }} group={{ kafka_group }} mode=0644 + when: "'zookeeper' not in {{ groups }}" + #- name: Restarting Kafka # supervisorctl: name=kafka state=restarted diff --git a/surge/surge_deployer/surge-docker-playbooks/roles/kafka/templates/add.server.properties.j2 b/surge/surge_deployer/surge-docker-playbooks/roles/kafka/templates/add.server.properties.j2 new file mode 100644 index 0000000..1fb3acc --- /dev/null +++ b/surge/surge_deployer/surge-docker-playbooks/roles/kafka/templates/add.server.properties.j2 @@ -0,0 +1,120 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.server.KafkaConfig for additional details and defaults + +############################# Server Basics ############################# + +# The id of the broker. This must be set to a unique integer for each broker. +broker.id={{ id.stdout }} + +############################# Socket Server Settings ############################# + +# The port the socket server listens on +port={{kafka_port}} + +# Hostname the broker will bind to. If not set, the server will bind to all interfaces + +host.name={{ ansible_ssh_host }} + +# Hostname the broker will advertise to producers and consumers. If not set, it uses the +# value for "host.name" if configured. Otherwise, it will use the value returned from +# java.net.InetAddress.getCanonicalHostName(). +#advertised.host.name= + +# The port to publish to ZooKeeper for clients to use. If this is not set, +# it will publish the same port that the broker binds to. +#advertised.port= + +# The number of threads handling network requests + +num.network.threads=2 + +# The number of threads doing disk I/O +num.io.threads=8 + +# The send buffer (SO_SNDBUF) used by the socket server +socket.send.buffer.bytes=1048576 + +# The receive buffer (SO_RCVBUF) used by the socket server +socket.receive.buffer.bytes=1048576 + +# The maximum size of a request that the socket server will accept (protection against OOM) +socket.request.max.bytes=104857600 + + +############################# Log Basics ############################# + +# A comma separated list of directories under which to store log files +log.dirs={{ kafka_logs_dir }}/kafka-logs + +# The default number of log partitions per topic. More partitions allow greater +# parallelism for consumption, but this will also result in more files across +# the brokers. +num.partitions=2 + +############################# Log Flush Policy ############################# + +# Messages are immediately written to the filesystem but by default we only fsync() to sync +# the OS cache lazily. The following configurations control the flush of data to disk. +# There are a few important trade-offs here: +# 1. Durability: Unflushed data may be lost if you are not using replication. +# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. +# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. +# The settings below allow one to configure the flush policy to flush data after a period of time or +# every N messages (or both). This can be done globally and overridden on a per-topic basis. + +# The number of messages to accept before forcing a flush of data to disk +#log.flush.interval.messages=10000 + +# The maximum amount of time a message can sit in a log before we force a flush +#log.flush.interval.ms=1000 + +############################# Log Retention Policy ############################# + +# The following configurations control the disposal of log segments. The policy can +# be set to delete segments after a period of time, or after a given size has accumulated. +# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens +# from the end of the log. + +# The minimum age of a log file to be eligible for deletion +log.retention.hours=168 + +# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining +# segments don't drop below log.retention.bytes. +#log.retention.bytes=1073741824 + +# The maximum size of a log segment file. When this size is reached a new log segment will be created. +log.segment.bytes=536870912 + +# The interval at which log segments are checked to see if they can be deleted according +# to the retention policies +log.retention.check.interval.ms=60000 + +# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires. +# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction. +log.cleaner.enable=false + +############################# Zookeeper ############################# + +# Zookeeper connection string (see zookeeper docs for details). +# This is a comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". +# You can also append an optional chroot string to the urls to specify the +# root directory for all kafka znodes. + +zookeeper.connect={{ zookeeper }} + +# Timeout in ms for connecting to zookeeper +zookeeper.connection.timeout.ms=1000000 \ No newline at end of file diff --git a/surge/surge_deployer/surge-docker-playbooks/roles/storm-common/defaults/main.yml b/surge/surge_deployer/surge-docker-playbooks/roles/storm-common/defaults/main.yml new file mode 100644 index 0000000..e69de29 diff --git a/surge/surge_deployer/surge-docker-playbooks/roles/storm-common/tasks/main.yml b/surge/surge_deployer/surge-docker-playbooks/roles/storm-common/tasks/main.yml index c1e96d3..d5f514e 100644 --- a/surge/surge_deployer/surge-docker-playbooks/roles/storm-common/tasks/main.yml +++ b/surge/surge_deployer/surge-docker-playbooks/roles/storm-common/tasks/main.yml @@ -1,5 +1,3 @@ - - # Copyright 2015 Cisco Systems, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -49,5 +47,13 @@ - name: Creating the logs dir file: path=/var/log/storm owner=storm group=storm mode=0750 state=directory +#- name: Updating the storm.yaml conf +# template: src=storm.yaml.j2 dest=/usr/local/etc/storm/conf/storm.yaml owner=storm group=storm mode=0644 + - name: Updating the storm.yaml conf - template: src=storm.yaml.j2 dest=/usr/local/etc/storm/conf/storm.yaml owner=storm group=storm mode=0644 \ No newline at end of file + template: src=storm.yaml.j2 dest=/usr/local/etc/storm/conf/storm.yaml owner=storm group=storm mode=0644 + when: "'zookeeper' in {{ groups }}" + +- name: Adding storm.yml config for additional server + template: src=supervisor.yaml.j2 dest=/usr/local/etc/storm/conf/storm.yaml owner=storm group=storm mode=0644 + when: "'zookeeper' not in {{ groups }}" diff --git a/surge/surge_deployer/surge-docker-playbooks/roles/storm-common/templates/supervisor.yaml.j2 b/surge/surge_deployer/surge-docker-playbooks/roles/storm-common/templates/supervisor.yaml.j2 new file mode 100644 index 0000000..11b7006 --- /dev/null +++ b/surge/surge_deployer/surge-docker-playbooks/roles/storm-common/templates/supervisor.yaml.j2 @@ -0,0 +1,13 @@ +storm.local.dir: "/usr/local/etc/storm" + +storm.zookeeper.servers: + - "{{ zookeeper }}" + +nimbus.host: "{{ storm_nimbus1 }}" + +nimbus.childopts: "-Xmx1024m -Djava.net.preferIPv4Stack=true" + +ui.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true" + +supervisor.childopts: "-Djava.net.preferIPv4Stack=true" +worker.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true" \ No newline at end of file diff --git a/surge/surge_deployer/surge-playbooks/roles/kafka/defaults/main.yml b/surge/surge_deployer/surge-playbooks/roles/kafka/defaults/main.yml index d47591a..d808c97 100644 --- a/surge/surge_deployer/surge-playbooks/roles/kafka/defaults/main.yml +++ b/surge/surge_deployer/surge-playbooks/roles/kafka/defaults/main.yml @@ -1,5 +1,3 @@ - - # Copyright 2015 Cisco Systems, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -23,4 +21,5 @@ kafka_gid: 51521 kafka_logs_dir: /var/log/kafka kafka_port: 9092 kafka_interface: eth1 +include: kafka.yml diff --git a/surge/surge_deployer/surge-playbooks/roles/kafka/tasks/main.yml b/surge/surge_deployer/surge-playbooks/roles/kafka/tasks/main.yml index 92d54fd..7ec76f4 100644 --- a/surge/surge_deployer/surge-playbooks/roles/kafka/tasks/main.yml +++ b/surge/surge_deployer/surge-playbooks/roles/kafka/tasks/main.yml @@ -64,8 +64,16 @@ shell: sed 's/[^0-9]//g' /etc/hostname register: id +#- name: Updating the server.properties conf +# template: src=server.properties.j2 dest=/usr/local/etc/kafka/config/server.properties owner={{ kafka_user }} group={{ kafka_group }} mode=0644 + - name: Updating the server.properties conf template: src=server.properties.j2 dest=/usr/local/etc/kafka/config/server.properties owner={{ kafka_user }} group={{ kafka_group }} mode=0644 + when: "'zookeeper' in {{ groups }}" + +- name: Updating server.properties conf for the added kafka node + template: src=add.server.properties.j2 dest=/usr/local/etc/kafka/config/server.properties owner={{ kafka_user }} group={{ kafka_group }} mode=0644 + when: "'zookeeper' not in {{ groups }}" - name: Restarting Kafka supervisorctl: name=kafka state=restarted diff --git a/surge/surge_deployer/surge-playbooks/roles/kafka/templates/add.server.properties.j2 b/surge/surge_deployer/surge-playbooks/roles/kafka/templates/add.server.properties.j2 new file mode 100644 index 0000000..1fb3acc --- /dev/null +++ b/surge/surge_deployer/surge-playbooks/roles/kafka/templates/add.server.properties.j2 @@ -0,0 +1,120 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.server.KafkaConfig for additional details and defaults + +############################# Server Basics ############################# + +# The id of the broker. This must be set to a unique integer for each broker. +broker.id={{ id.stdout }} + +############################# Socket Server Settings ############################# + +# The port the socket server listens on +port={{kafka_port}} + +# Hostname the broker will bind to. If not set, the server will bind to all interfaces + +host.name={{ ansible_ssh_host }} + +# Hostname the broker will advertise to producers and consumers. If not set, it uses the +# value for "host.name" if configured. Otherwise, it will use the value returned from +# java.net.InetAddress.getCanonicalHostName(). +#advertised.host.name= + +# The port to publish to ZooKeeper for clients to use. If this is not set, +# it will publish the same port that the broker binds to. +#advertised.port= + +# The number of threads handling network requests + +num.network.threads=2 + +# The number of threads doing disk I/O +num.io.threads=8 + +# The send buffer (SO_SNDBUF) used by the socket server +socket.send.buffer.bytes=1048576 + +# The receive buffer (SO_RCVBUF) used by the socket server +socket.receive.buffer.bytes=1048576 + +# The maximum size of a request that the socket server will accept (protection against OOM) +socket.request.max.bytes=104857600 + + +############################# Log Basics ############################# + +# A comma separated list of directories under which to store log files +log.dirs={{ kafka_logs_dir }}/kafka-logs + +# The default number of log partitions per topic. More partitions allow greater +# parallelism for consumption, but this will also result in more files across +# the brokers. +num.partitions=2 + +############################# Log Flush Policy ############################# + +# Messages are immediately written to the filesystem but by default we only fsync() to sync +# the OS cache lazily. The following configurations control the flush of data to disk. +# There are a few important trade-offs here: +# 1. Durability: Unflushed data may be lost if you are not using replication. +# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. +# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. +# The settings below allow one to configure the flush policy to flush data after a period of time or +# every N messages (or both). This can be done globally and overridden on a per-topic basis. + +# The number of messages to accept before forcing a flush of data to disk +#log.flush.interval.messages=10000 + +# The maximum amount of time a message can sit in a log before we force a flush +#log.flush.interval.ms=1000 + +############################# Log Retention Policy ############################# + +# The following configurations control the disposal of log segments. The policy can +# be set to delete segments after a period of time, or after a given size has accumulated. +# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens +# from the end of the log. + +# The minimum age of a log file to be eligible for deletion +log.retention.hours=168 + +# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining +# segments don't drop below log.retention.bytes. +#log.retention.bytes=1073741824 + +# The maximum size of a log segment file. When this size is reached a new log segment will be created. +log.segment.bytes=536870912 + +# The interval at which log segments are checked to see if they can be deleted according +# to the retention policies +log.retention.check.interval.ms=60000 + +# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires. +# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction. +log.cleaner.enable=false + +############################# Zookeeper ############################# + +# Zookeeper connection string (see zookeeper docs for details). +# This is a comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". +# You can also append an optional chroot string to the urls to specify the +# root directory for all kafka znodes. + +zookeeper.connect={{ zookeeper }} + +# Timeout in ms for connecting to zookeeper +zookeeper.connection.timeout.ms=1000000 \ No newline at end of file diff --git a/surge/surge_deployer/surge-playbooks/roles/storm-common/defaults/main.yml b/surge/surge_deployer/surge-playbooks/roles/storm-common/defaults/main.yml new file mode 100644 index 0000000..e69de29 diff --git a/surge/surge_deployer/surge-playbooks/roles/storm-common/tasks/main.yml b/surge/surge_deployer/surge-playbooks/roles/storm-common/tasks/main.yml index c1e96d3..d5f514e 100644 --- a/surge/surge_deployer/surge-playbooks/roles/storm-common/tasks/main.yml +++ b/surge/surge_deployer/surge-playbooks/roles/storm-common/tasks/main.yml @@ -1,5 +1,3 @@ - - # Copyright 2015 Cisco Systems, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -49,5 +47,13 @@ - name: Creating the logs dir file: path=/var/log/storm owner=storm group=storm mode=0750 state=directory +#- name: Updating the storm.yaml conf +# template: src=storm.yaml.j2 dest=/usr/local/etc/storm/conf/storm.yaml owner=storm group=storm mode=0644 + - name: Updating the storm.yaml conf - template: src=storm.yaml.j2 dest=/usr/local/etc/storm/conf/storm.yaml owner=storm group=storm mode=0644 \ No newline at end of file + template: src=storm.yaml.j2 dest=/usr/local/etc/storm/conf/storm.yaml owner=storm group=storm mode=0644 + when: "'zookeeper' in {{ groups }}" + +- name: Adding storm.yml config for additional server + template: src=supervisor.yaml.j2 dest=/usr/local/etc/storm/conf/storm.yaml owner=storm group=storm mode=0644 + when: "'zookeeper' not in {{ groups }}" diff --git a/surge/surge_deployer/surge-playbooks/roles/storm-common/templates/supervisor.yaml.j2 b/surge/surge_deployer/surge-playbooks/roles/storm-common/templates/supervisor.yaml.j2 new file mode 100644 index 0000000..11b7006 --- /dev/null +++ b/surge/surge_deployer/surge-playbooks/roles/storm-common/templates/supervisor.yaml.j2 @@ -0,0 +1,13 @@ +storm.local.dir: "/usr/local/etc/storm" + +storm.zookeeper.servers: + - "{{ zookeeper }}" + +nimbus.host: "{{ storm_nimbus1 }}" + +nimbus.childopts: "-Xmx1024m -Djava.net.preferIPv4Stack=true" + +ui.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true" + +supervisor.childopts: "-Djava.net.preferIPv4Stack=true" +worker.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true" \ No newline at end of file diff --git a/surge/surge_deployer/surge.py b/surge/surge_deployer/surge.py index 23b1b5c..255948f 100644 --- a/surge/surge_deployer/surge.py +++ b/surge/surge_deployer/surge.py @@ -21,7 +21,7 @@ class VagrantDeployer: - def __init__(self, name, pipeline=None, provider=None): + def __init__(self, name, pipeline=None, provider="virtualbox"): self.name = name self.path = BASE_DIR + '/pipelines/' + name diff --git a/surge/surge_deployer/utils.py b/surge/surge_deployer/utils.py index b49e259..48962c9 100644 --- a/surge/surge_deployer/utils.py +++ b/surge/surge_deployer/utils.py @@ -3,10 +3,69 @@ import yaml BASE_DIR = os.path.dirname(os.path.realpath(__file__)) +VAGRANT_DIR = os.path.join(BASE_DIR, 'basevb/.vagrant') +STORM_VAR_DIR = os.path.join( + BASE_DIR, 'basevb/playbooks/roles/storm-common/defaults/main.yml') +KAFKA_VAR_DIR = os.path.join( + BASE_DIR, 'basevb/playbooks/roles/kafka/defaults/kafka.yml') -def generate_pipeline(number_nimbus=0, number_supervisor=0, - number_kafka=0, number_zk=0): +def get_zookeeper_ip(pipeline_name): + inventory_dir = os.path.join( + BASE_DIR, 'pipelines/' + +pipeline_name + +'/.vagrant/provisioners/ansible/inventory/deployer/vagrant_ansible_inventory') + try: + with open(inventory_dir) as f: + data = f.read() + for line in data.splitlines(): + if line.startswith("#") or not line.strip(): + continue + elif line.startswith("["): + break + host_data = line.split(" ") + if host_data[0] == "zookeeper": + return host_data[1].split("=")[1] + except IOError: + pass + return "" + + +def is_inventory_exist(pipeline_name): + inventory_dir = os.path.join( + BASE_DIR, 'pipelines/' + +pipeline_name + +'/.vagrant/provisioners/ansible/inventory/deployer/vagrant_ansible_inventory') + return os.path.exists(inventory_dir) + + +def remove_inventory(): + try: + shutil.rmtree(VAGRANT_DIR) + except OSError: + pass + + +def write_storm_vars(zookeeper_ip): + with open(STORM_VAR_DIR, 'w') as file: + file.write(yaml.dump( + {"zookeeper": zookeeper_ip}, default_flow_style=False)) + file.close() + + +def write_kafka_vars(zookeeper_ip): + with open(KAFKA_VAR_DIR, 'w') as kafka_vars_file: + kafka_vars_file.write( + yaml.dump({"zookeeper": zookeeper_ip}, default_flow_style=False)) + kafka_vars_file.close() + + +def generate_kafka_component(pipeline_name, number_kafka=0): + if is_inventory_exist(pipeline_name): + number_zk = 0 + write_kafka_vars(get_zookeeper_ip()) + else: + number_zk = 1 pipeline = { 'provider': { @@ -34,9 +93,39 @@ def generate_pipeline(number_nimbus=0, number_supervisor=0, } } + } + } + } + _write_pipeline_file(pipeline_name, pipeline) + + +def generate_storm_component(pipeline_name, number_supervisor=0): + if is_inventory_exist(pipeline_name): + number_zk = 0 + write_storm_vars(get_zookeeper_ip()) + else: + number_zk = 1 + + pipeline = { + 'provider': { + 'type': { + 'virtualbox': { + 'hostname_prefix': "", + 'ip_start': '10.20.30.10' + } + }, + }, + 'hosts': { + "zookeeper": { + "count": number_zk, + "provider": { + 'virtualbox': { + 'memory': 1024 + } + } }, "storm-nimbus": { - "count": number_nimbus, + "count": 1, "provider": { 'virtualbox': { 'memory': 1024, @@ -57,7 +146,14 @@ def generate_pipeline(number_nimbus=0, number_supervisor=0, } } } + _write_pipeline_file(pipeline_name, pipeline) + - with open(BASE_DIR + "/basevb/pipeline.yml", 'w') as pipeline_file: - pipeline_file.write(yaml.dump(pipeline, default_flow_style=False)) +def _write_pipeline_file(pipeline_name, pipeline_obj): + with open(BASE_DIR + "/pipelines/" + pipeline_name + "/pipeline.yml", 'w') as pipeline_file: + pipeline_file.write(yaml.dump(pipeline_obj, default_flow_style=False)) pipeline_file.close() + + +def list_pipelines(): + return os.listdir(BASE_DIR + '/pipelines')