diff options
| author | Magdalena GrodziĆska <mag.grodzinska@gmail.com> | 2019-11-24 14:15:45 +0100 | 
|---|---|---|
| committer | GitHub <noreply@github.com> | 2019-11-24 14:15:45 +0100 | 
| commit | d8c3798d068aa2e128ca7a43a9451ce6d87cc230 (patch) | |
| tree | 3dfb19b1c834da6e3fd2d0c8fbb18d9fdbd28782 /src | |
| parent | a1d9f108cf87cc1164343d79bd982ac89cdb59fc (diff) | |
| parent | 3b53e5496e9fe9917db7825ceea1455cca75c155 (diff) | |
Merge pull request #32 from m-chrzan/fix_fetcher_2
Fix fetcher 2
Diffstat (limited to 'src')
3 files changed, 166 insertions, 117 deletions
| diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/fetcher/Fetcher.java b/src/main/java/pl/edu/mimuw/cloudatlas/fetcher/Fetcher.java index f7fdab2..1d4a2ce 100644 --- a/src/main/java/pl/edu/mimuw/cloudatlas/fetcher/Fetcher.java +++ b/src/main/java/pl/edu/mimuw/cloudatlas/fetcher/Fetcher.java @@ -2,74 +2,129 @@ package pl.edu.mimuw.cloudatlas.fetcher;  import pl.edu.mimuw.cloudatlas.api.Api; +import java.rmi.NotBoundException; +import java.rmi.RemoteException;  import java.rmi.registry.LocateRegistry;  import java.rmi.registry.Registry;  import java.io.*;  import java.util.ArrayList; +import java.util.Arrays;  import java.util.List;  import com.google.gson.Gson; +import pl.edu.mimuw.cloudatlas.model.*;  public class Fetcher { +    private static final List<String> fetcherAttributeNames = Arrays.asList( +            "avg_load", +            "free_disk", +            "total_disk", +            "free_ram", +            "total_ram", +            "free_swap", +            "total_swap", +            "num_processes", +            "num_cores", +            "kernel_ver", +            "logged_users", +            "dns_names" +    ); + +    private static final List<Type.PrimaryType> fetcherAttributeTypes = Arrays.asList( +            Type.PrimaryType.DOUBLE, +            Type.PrimaryType.INT, +            Type.PrimaryType.INT, +            Type.PrimaryType.INT, +            Type.PrimaryType.INT, +            Type.PrimaryType.INT, +            Type.PrimaryType.INT, +            Type.PrimaryType.INT, +            Type.PrimaryType.INT, +            Type.PrimaryType.STRING, +            Type.PrimaryType.INT, +            Type.PrimaryType.LIST +    ); + +    private static Api api; +    private static Process pythonProcess; + +    private static Value packAttributeValue(Object rawValue, Type.PrimaryType valueType) { +        Value val = null; +        ArrayList<Value> contacts = new ArrayList<Value>(); + +        if (valueType.equals(Type.PrimaryType.STRING)) { +            val = new ValueString((String) rawValue); +        } else if (valueType.equals(Type.PrimaryType.INT)) { +            val = new ValueInt(((Double) rawValue).longValue()); +        } else if (valueType.equals(Type.PrimaryType.DOUBLE)) { +            val = new ValueDouble((Double) rawValue); +        } else if (valueType.equals(Type.PrimaryType.LIST)) { +            for (Object c : (ArrayList) rawValue) { +                contacts.add(new ValueString((String) c)); +            } +            val = new ValueList(contacts, TypePrimitive.STRING); +        } else { +            throw new UnsupportedOperationException(); +        } -//    private static String processAttribs(String jsonAttribs) { -//        Serializer serializer = new Serializer(); -//        return -//    } +        return val; +    } +     +    private static void initializeApiStub() throws RemoteException, NotBoundException { +        Registry registry = LocateRegistry.getRegistry("localhost"); +        api = (Api) registry.lookup("Api"); +        System.out.println("Fetcher runs with registry"); +    } -    // https://jj09.net/interprocess-communication-python-java/ -    private static void fetchData() { -        try { -            Registry registry = LocateRegistry.getRegistry("localhost"); -            Api stub = (Api) registry.lookup("Api"); -            System.out.println("Fetcher runs with registry"); // TODO +    private static void initializePythonProcess() throws IOException { +        String pythonScript = Fetcher.class.getResource("data_fetcher.py").getFile(); +        String pythonCmd = "/usr/bin/python3 " + pythonScript; +        System.out.println("Run cmd: " + pythonCmd); +        pythonProcess = Runtime.getRuntime().exec(pythonCmd); +    } -            String pythonScript = Fetcher.class.getResource("data_fetcher.py").getFile(); -            String pythonCmd = "/usr/bin/python3 " + pythonScript; -            System.out.println("cmd: " + pythonCmd); -            Process p = Runtime.getRuntime().exec(pythonCmd); -            BufferedReader bufferRead = new BufferedReader( new InputStreamReader(p.getInputStream())); -            BufferedReader errorRead = new BufferedReader( new InputStreamReader(p.getErrorStream())); +    private static ArrayList deserializeAttribs(String serializedAttribs) { +        Gson g = new Gson(); +        return g.fromJson(serializedAttribs, ArrayList.class); +    } +    // https://jj09.net/interprocess-communication-python-java/ +    private static void fetchData() { +        BufferedReader bufferRead; +        ArrayList deserializedAttribs; +        String jsonAttribs; -            System.out.println("Gonna read some attribs"); -            String jsonAttribs = bufferRead.readLine(); -            String serializedAttribs; +        System.out.println(System.getProperty("user.dir")); -            System.out.println("Read some attribs"); -            System.out.println(jsonAttribs); -            System.out.println("Got some attribs"); +        try { +            initializeApiStub(); +            initializePythonProcess(); -            ArrayList aa = deserializeAttribs(jsonAttribs); -            System.out.println(aa); +            bufferRead = new BufferedReader( new InputStreamReader(pythonProcess.getInputStream())); -            // TODO different condition -            while(!jsonAttribs.equals("x")) { +            while((jsonAttribs = bufferRead.readLine()) != null) {                  System.out.println(jsonAttribs);                  System.out.flush(); -                serializedAttribs = "1"; -                // stub.setAttributeValue(serializedAttribs); -                jsonAttribs = bufferRead.readLine(); +                deserializedAttribs = deserializeAttribs(jsonAttribs); +                for (int i = 0; i < fetcherAttributeNames.size(); i++) { +                    api.setAttributeValue( +                            "/", +                            fetcherAttributeNames.get(i), +                            packAttributeValue( +                                    deserializedAttribs.get(i), +                                    fetcherAttributeTypes.get(i))); +                }              }              bufferRead.close(); -        } catch(IOException e) { -            e.printStackTrace();          } catch (Exception e) {              System.err.println("Fetcher exception:");              e.printStackTrace();          }      } -      public static void main(String[] args) {          fetchData();      } - -    public static ArrayList deserializeAttribs(String serializedAttribs) { -        Gson g = new Gson(); - -        return g.fromJson(serializedAttribs, ArrayList.class); -    }  } diff --git a/src/main/resources/pl/edu/mimuw/cloudatlas/fetcher/config.ini b/src/main/resources/pl/edu/mimuw/cloudatlas/fetcher/config.ini index 14af0af..56b59c1 100644 --- a/src/main/resources/pl/edu/mimuw/cloudatlas/fetcher/config.ini +++ b/src/main/resources/pl/edu/mimuw/cloudatlas/fetcher/config.ini @@ -1,5 +1,6 @@  [AttributeParams] +InitialDelay=0  CollectionInterval=5  AveragingPeriod=5  AveragingMethod=arithmetic
\ No newline at end of file diff --git a/src/main/resources/pl/edu/mimuw/cloudatlas/fetcher/data_fetcher.py b/src/main/resources/pl/edu/mimuw/cloudatlas/fetcher/data_fetcher.py index d69c498..67b2eb4 100644 --- a/src/main/resources/pl/edu/mimuw/cloudatlas/fetcher/data_fetcher.py +++ b/src/main/resources/pl/edu/mimuw/cloudatlas/fetcher/data_fetcher.py @@ -1,22 +1,21 @@  import json -import subprocess  import sys  import time - -import schedule  import psutil  import platform  import socket  import configparser -# import sqlite3 +import statistics +import urllib.request  # Pipe  pipe = None  # Attribute params -collectionInterval = 1 -averagingPeriod = 1 -averagingMethod = "" +initial_delay = 0 +collection_interval = 1 +averaging_period = 1 +averaging_method = ""  """      Attributes @@ -37,31 +36,23 @@ averagingMethod = ""  """ -def setup_database(): -    conn = sqlite3.connect('attributes.db') -    c = conn.cursor() -    c.execute('''CREATE TABLE attrib -             (timestamp integer, -              avg_cpu_load integer, -              free_disk integer, -              total_disk integer, -              free_ram integer, -              total_ram integer, -              free_swap integer, -              total_swap integer, -              num_processes integer, -              num_cores integer, -              kernel_ver text, -              logged_users integer, -              dns_names text)  -              ''') -    # TODO format dns_names -    conn.commit() -    conn.close() +def get_avg_load(): +    load_percentages = psutil.cpu_percent(interval=averaging_period, percpu=True) + +    if averaging_method == "arithmetic": +        return statistics.mean(load_percentages) +    elif averaging_method == "geometric": +        return statistics.geometric_mean(load_percentages) +    elif averaging_method == "harmonic": +        return statistics.harmonic_mean(load_percentages) +    elif averaging_method == "median": +        return statistics.median(load_percentages) +    else: +        raise RuntimeError("Avg CPU load error")  def get_data(): -    avg_cpu_load = psutil.cpu_percent(interval=1) # TODO better? +    avg_load = get_avg_load()      free_disk = psutil.disk_usage("/").free      total_disk = psutil.disk_usage("/").total      free_ram = psutil.virtual_memory().available @@ -70,67 +61,69 @@ def get_data():      total_swap = psutil.swap_memory().total      num_processes = len(psutil.pids())      num_cores = psutil.cpu_count(False) -    kernel_ver = platform.release() # TODO ew. version() +    kernel_ver = platform.release()      logged_users = len(psutil.users()) -    hostname = socket.gethostbyaddr("127.0.0.1") # TODO czy dziala +    external_ip = urllib.request.urlopen('https://ident.me').read().decode('utf8') +    hostname = socket.gethostbyaddr(external_ip)      dns_names = ([hostname[0]] + hostname[1])[:3] -    # https://stackoverflow.com/questions/2575760/python-lookup-hostname-from-ip-with-1-second-timeout      sys.stdout.write("[{},{},{},{},{},{},{},{},{},{},{},{}]\n".format( -    #print("[{},{},{},{},{},{},{},{},{},{},{},{}]\r\n".format( -    avg_cpu_load, -    free_disk, -    total_disk, -    free_ram, -    total_ram, -    free_swap, -    total_swap, -    num_processes, -    num_cores, -    kernel_ver, -    logged_users, -    json.dumps(dns_names))) #.encode())  # TODO ten string +        avg_load, +        free_disk, +        total_disk, +        free_ram, +        total_ram, +        free_swap, +        total_swap, +        num_processes, +        num_cores, +        kernel_ver, +        logged_users, +        json.dumps(dns_names)) +    ) +      sys.stdout.flush() -    # TODO error control and pipe restart -""" -    conn = sqlite3.connect('attributes.db') -    c = conn.cursor() -    c.execute("INSERT INTO attrib VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )", -              avg_cpu_load, -              free_disk, -              total_disk, -              free_ram, -              total_ram, -              free_swap, -              total_swap, -              num_processes, -              num_cores, -              kernel_ver, -              logged_users, -              dns_names.__str__()) # TODO ten string -    conn.commit() -    conn.close() -""" -# TODO funkcja do usuwania zbendych -def remove_historical_data(): -    pass +def check_averaging_method(averaging_method): +    averaging_methods = ["arithmetic", "geometric", "harmonic", "mean"] +    return averaging_method and averaging_method in averaging_methods -if __name__ == '__main__': -    # config = configparser.ConfigParser() -    # config.read("config.ini") -    collectionInterval = int(5) # int(config["AttributeParams"]["collectionInterval"]) -    averagingPeriod = int(5) # int(config["AttributeParams"]["averagingPeriod"]) -    averagingMethod = "arithmetic" # config["AttributeParams"]["averagingMethod"] +def read_config(): +    global initial_delay +    global collection_interval +    global averaging_period +    global averaging_method +    config = configparser.ConfigParser() -    # setup_database() +    try: +        # check if running from source code dir +        config.read("config.ini") +    except KeyError: +        pass +    else: +        # we assume that it's running as subprocess from Fetcher.java +        # we assume working dir to be CloudAtlas root +        # because gradle seems to put it this way +        config.read("src/main/resources/pl/edu/mimuw/cloudatlas/fetcher/config.ini") -    # TODO some condition for this? +    initial_delay = int(config["AttributeParams"]["initialDelay"]) +    collection_interval = int(config["AttributeParams"]["collectionInterval"]) +    averaging_period = int(config["AttributeParams"]["averagingPeriod"]) +    averaging_method = config["AttributeParams"]["averagingMethod"] + +    if not check_averaging_method(averaging_method): +        raise ValueError("Incorrect averaging method") +    elif collection_interval < averaging_period: +        raise ValueError("Collection interval smaller than averaging period") + + +if __name__ == '__main__': +    read_config() + +    time.sleep(initial_delay)      while True:          get_data() -        time.sleep(collectionInterval) - -    # schedule.every(collectionInterval).seconds.do(get_data) +        time.sleep(collection_interval - averaging_period) |