diff options
3 files changed, 166 insertions, 117 deletions
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/fetcher/Fetcher.java b/src/main/java/pl/edu/mimuw/cloudatlas/fetcher/Fetcher.java index f7fdab2..1d4a2ce 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/fetcher/Fetcher.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/fetcher/Fetcher.java @@ -2,74 +2,129 @@ package pl.edu.mimuw.cloudatlas.fetcher; import pl.edu.mimuw.cloudatlas.api.Api; +import java.rmi.NotBoundException; +import java.rmi.RemoteException; import java.rmi.registry.LocateRegistry; import java.rmi.registry.Registry; import java.io.*; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import com.google.gson.Gson; +import pl.edu.mimuw.cloudatlas.model.*; public class Fetcher { + private static final List<String> fetcherAttributeNames = Arrays.asList( + "avg_load", + "free_disk", + "total_disk", + "free_ram", + "total_ram", + "free_swap", + "total_swap", + "num_processes", + "num_cores", + "kernel_ver", + "logged_users", + "dns_names" + ); + + private static final List<Type.PrimaryType> fetcherAttributeTypes = Arrays.asList( + Type.PrimaryType.DOUBLE, + Type.PrimaryType.INT, + Type.PrimaryType.INT, + Type.PrimaryType.INT, + Type.PrimaryType.INT, + Type.PrimaryType.INT, + Type.PrimaryType.INT, + Type.PrimaryType.INT, + Type.PrimaryType.INT, + Type.PrimaryType.STRING, + Type.PrimaryType.INT, + Type.PrimaryType.LIST + ); + + private static Api api; + private static Process pythonProcess; + + private static Value packAttributeValue(Object rawValue, Type.PrimaryType valueType) { + Value val = null; + ArrayList<Value> contacts = new ArrayList<Value>(); + + if (valueType.equals(Type.PrimaryType.STRING)) { + val = new ValueString((String) rawValue); + } else if (valueType.equals(Type.PrimaryType.INT)) { + val = new ValueInt(((Double) rawValue).longValue()); + } else if (valueType.equals(Type.PrimaryType.DOUBLE)) { + val = new ValueDouble((Double) rawValue); + } else if (valueType.equals(Type.PrimaryType.LIST)) { + for (Object c : (ArrayList) rawValue) { + contacts.add(new ValueString((String) c)); + } + val = new ValueList(contacts, TypePrimitive.STRING); + } else { + throw new UnsupportedOperationException(); + } -// private static String processAttribs(String jsonAttribs) { -// Serializer serializer = new Serializer(); -// return -// } + return val; + } + + private static void initializeApiStub() throws RemoteException, NotBoundException { + Registry registry = LocateRegistry.getRegistry("localhost"); + api = (Api) registry.lookup("Api"); + System.out.println("Fetcher runs with registry"); + } - // https://jj09.net/interprocess-communication-python-java/ - private static void fetchData() { - try { - Registry registry = LocateRegistry.getRegistry("localhost"); - Api stub = (Api) registry.lookup("Api"); - System.out.println("Fetcher runs with registry"); // TODO + private static void initializePythonProcess() throws IOException { + String pythonScript = Fetcher.class.getResource("data_fetcher.py").getFile(); + String pythonCmd = "/usr/bin/python3 " + pythonScript; + System.out.println("Run cmd: " + pythonCmd); + pythonProcess = Runtime.getRuntime().exec(pythonCmd); + } - String pythonScript = Fetcher.class.getResource("data_fetcher.py").getFile(); - String pythonCmd = "/usr/bin/python3 " + pythonScript; - System.out.println("cmd: " + pythonCmd); - Process p = Runtime.getRuntime().exec(pythonCmd); - BufferedReader bufferRead = new BufferedReader( new InputStreamReader(p.getInputStream())); - BufferedReader errorRead = new BufferedReader( new InputStreamReader(p.getErrorStream())); + private static ArrayList deserializeAttribs(String serializedAttribs) { + Gson g = new Gson(); + return g.fromJson(serializedAttribs, ArrayList.class); + } + // https://jj09.net/interprocess-communication-python-java/ + private static void fetchData() { + BufferedReader bufferRead; + ArrayList deserializedAttribs; + String jsonAttribs; - System.out.println("Gonna read some attribs"); - String jsonAttribs = bufferRead.readLine(); - String serializedAttribs; + System.out.println(System.getProperty("user.dir")); - System.out.println("Read some attribs"); - System.out.println(jsonAttribs); - System.out.println("Got some attribs"); + try { + initializeApiStub(); + initializePythonProcess(); - ArrayList aa = deserializeAttribs(jsonAttribs); - System.out.println(aa); + bufferRead = new BufferedReader( new InputStreamReader(pythonProcess.getInputStream())); - // TODO different condition - while(!jsonAttribs.equals("x")) { + while((jsonAttribs = bufferRead.readLine()) != null) { System.out.println(jsonAttribs); System.out.flush(); - serializedAttribs = "1"; - // stub.setAttributeValue(serializedAttribs); - jsonAttribs = bufferRead.readLine(); + deserializedAttribs = deserializeAttribs(jsonAttribs); + for (int i = 0; i < fetcherAttributeNames.size(); i++) { + api.setAttributeValue( + "/", + fetcherAttributeNames.get(i), + packAttributeValue( + deserializedAttribs.get(i), + fetcherAttributeTypes.get(i))); + } } bufferRead.close(); - } catch(IOException e) { - e.printStackTrace(); } catch (Exception e) { System.err.println("Fetcher exception:"); e.printStackTrace(); } } - public static void main(String[] args) { fetchData(); } - - public static ArrayList deserializeAttribs(String serializedAttribs) { - Gson g = new Gson(); - - return g.fromJson(serializedAttribs, ArrayList.class); - } } diff --git a/src/main/resources/pl/edu/mimuw/cloudatlas/fetcher/config.ini b/src/main/resources/pl/edu/mimuw/cloudatlas/fetcher/config.ini index 14af0af..56b59c1 100644 --- a/src/main/resources/pl/edu/mimuw/cloudatlas/fetcher/config.ini +++ b/src/main/resources/pl/edu/mimuw/cloudatlas/fetcher/config.ini @@ -1,5 +1,6 @@ [AttributeParams] +InitialDelay=0 CollectionInterval=5 AveragingPeriod=5 AveragingMethod=arithmetic
\ No newline at end of file diff --git a/src/main/resources/pl/edu/mimuw/cloudatlas/fetcher/data_fetcher.py b/src/main/resources/pl/edu/mimuw/cloudatlas/fetcher/data_fetcher.py index d69c498..67b2eb4 100644 --- a/src/main/resources/pl/edu/mimuw/cloudatlas/fetcher/data_fetcher.py +++ b/src/main/resources/pl/edu/mimuw/cloudatlas/fetcher/data_fetcher.py @@ -1,22 +1,21 @@ import json -import subprocess import sys import time - -import schedule import psutil import platform import socket import configparser -# import sqlite3 +import statistics +import urllib.request # Pipe pipe = None # Attribute params -collectionInterval = 1 -averagingPeriod = 1 -averagingMethod = "" +initial_delay = 0 +collection_interval = 1 +averaging_period = 1 +averaging_method = "" """ Attributes @@ -37,31 +36,23 @@ averagingMethod = "" """ -def setup_database(): - conn = sqlite3.connect('attributes.db') - c = conn.cursor() - c.execute('''CREATE TABLE attrib - (timestamp integer, - avg_cpu_load integer, - free_disk integer, - total_disk integer, - free_ram integer, - total_ram integer, - free_swap integer, - total_swap integer, - num_processes integer, - num_cores integer, - kernel_ver text, - logged_users integer, - dns_names text) - ''') - # TODO format dns_names - conn.commit() - conn.close() +def get_avg_load(): + load_percentages = psutil.cpu_percent(interval=averaging_period, percpu=True) + + if averaging_method == "arithmetic": + return statistics.mean(load_percentages) + elif averaging_method == "geometric": + return statistics.geometric_mean(load_percentages) + elif averaging_method == "harmonic": + return statistics.harmonic_mean(load_percentages) + elif averaging_method == "median": + return statistics.median(load_percentages) + else: + raise RuntimeError("Avg CPU load error") def get_data(): - avg_cpu_load = psutil.cpu_percent(interval=1) # TODO better? + avg_load = get_avg_load() free_disk = psutil.disk_usage("/").free total_disk = psutil.disk_usage("/").total free_ram = psutil.virtual_memory().available @@ -70,67 +61,69 @@ def get_data(): total_swap = psutil.swap_memory().total num_processes = len(psutil.pids()) num_cores = psutil.cpu_count(False) - kernel_ver = platform.release() # TODO ew. version() + kernel_ver = platform.release() logged_users = len(psutil.users()) - hostname = socket.gethostbyaddr("127.0.0.1") # TODO czy dziala + external_ip = urllib.request.urlopen('https://ident.me').read().decode('utf8') + hostname = socket.gethostbyaddr(external_ip) dns_names = ([hostname[0]] + hostname[1])[:3] - # https://stackoverflow.com/questions/2575760/python-lookup-hostname-from-ip-with-1-second-timeout sys.stdout.write("[{},{},{},{},{},{},{},{},{},{},{},{}]\n".format( - #print("[{},{},{},{},{},{},{},{},{},{},{},{}]\r\n".format( - avg_cpu_load, - free_disk, - total_disk, - free_ram, - total_ram, - free_swap, - total_swap, - num_processes, - num_cores, - kernel_ver, - logged_users, - json.dumps(dns_names))) #.encode()) # TODO ten string + avg_load, + free_disk, + total_disk, + free_ram, + total_ram, + free_swap, + total_swap, + num_processes, + num_cores, + kernel_ver, + logged_users, + json.dumps(dns_names)) + ) + sys.stdout.flush() - # TODO error control and pipe restart -""" - conn = sqlite3.connect('attributes.db') - c = conn.cursor() - c.execute("INSERT INTO attrib VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )", - avg_cpu_load, - free_disk, - total_disk, - free_ram, - total_ram, - free_swap, - total_swap, - num_processes, - num_cores, - kernel_ver, - logged_users, - dns_names.__str__()) # TODO ten string - conn.commit() - conn.close() -""" -# TODO funkcja do usuwania zbendych -def remove_historical_data(): - pass +def check_averaging_method(averaging_method): + averaging_methods = ["arithmetic", "geometric", "harmonic", "mean"] + return averaging_method and averaging_method in averaging_methods -if __name__ == '__main__': - # config = configparser.ConfigParser() - # config.read("config.ini") - collectionInterval = int(5) # int(config["AttributeParams"]["collectionInterval"]) - averagingPeriod = int(5) # int(config["AttributeParams"]["averagingPeriod"]) - averagingMethod = "arithmetic" # config["AttributeParams"]["averagingMethod"] +def read_config(): + global initial_delay + global collection_interval + global averaging_period + global averaging_method + config = configparser.ConfigParser() - # setup_database() + try: + # check if running from source code dir + config.read("config.ini") + except KeyError: + pass + else: + # we assume that it's running as subprocess from Fetcher.java + # we assume working dir to be CloudAtlas root + # because gradle seems to put it this way + config.read("src/main/resources/pl/edu/mimuw/cloudatlas/fetcher/config.ini") - # TODO some condition for this? + initial_delay = int(config["AttributeParams"]["initialDelay"]) + collection_interval = int(config["AttributeParams"]["collectionInterval"]) + averaging_period = int(config["AttributeParams"]["averagingPeriod"]) + averaging_method = config["AttributeParams"]["averagingMethod"] + + if not check_averaging_method(averaging_method): + raise ValueError("Incorrect averaging method") + elif collection_interval < averaging_period: + raise ValueError("Collection interval smaller than averaging period") + + +if __name__ == '__main__': + read_config() + + time.sleep(initial_delay) while True: get_data() - time.sleep(collectionInterval) - - # schedule.every(collectionInterval).seconds.do(get_data) + time.sleep(collection_interval - averaging_period) |