skip to main content


Title: TrioStat: Online Workload Estimation in Distributed Spatial Data Streaming Systems
The wide spread of GPS-enabled devices and the Internet of Things (IoT) has increased the amount of spatial data being generated every second. The current scale of spatial data cannot be handled using centralized systems. This has led to the development of distributed spatial data streaming systems that scale to process in real-time large amounts of streamed spatial data. The performance of distributed streaming systems relies on how even the workload is distributed among their machines. However, it is challenging to estimate the workload of each machine because spatial data and query streams are skewed and rapidly change with time and users' interests. Moreover, a distributed spatial streaming system often does not maintain a global system workload state because it requires high network and processing overheads to be collected from the machines in the system. This paper introduces TrioStat; an online workload estimation technique that relies on a probabilistic model for estimating the workload of partitions and machines in a distributed spatial data streaming system. It is infeasible to collect and exchange statistics with a centralized unit because it requires high network overhead. Instead, TrioStat uses a decentralised technique to collect and maintain the required statistics in real-time locally in each machine. TrioStat enables distributed spatial data streaming systems to compare the workloads of machines as well as the workloads of data partitions. TrioStat requires minimal network and storage overhead. Moreover, the required storage is distributed across the system's machines.  more » « less
Award ID(s):
1815796 1910216
NSF-PAR ID:
10301809
Author(s) / Creator(s):
; ;
Date Published:
Journal Name:
SIGSPATIAL '20: Proceedings of the 28th International Conference on Advances in Geographic Information Systems
Page Range / eLocation ID:
78 to 86
Format(s):
Medium: X
Sponsoring Org:
National Science Foundation
More Like this
  1. null (Ed.)
    The proliferation of GPS-enabled devices has led to the development of numerous location-based services. These services need to process massive amounts of streamed spatial data in real-time. The current scale of spatial data cannot be handled using centralized systems. This has led to the development of distributed spatial streaming systems. Existing systems are using static spatial partitioning to distribute the workload. In contrast, the real-time streamed spatial data follows non-uniform spatial distributions that are continuously changing over time. Distributed spatial streaming systems need to react to the changes in the distribution of spatial data and queries. This article introduces SWARM, a lightweight adaptivity protocol that continuously monitors the data and query workloads across the distributed processes of the spatial data streaming system and redistributes and rebalances the workloads as soon as performance bottlenecks get detected. SWARM is able to handle multiple query-execution and data-persistence models. A distributed streaming system can directly use SWARM to adaptively rebalance the system’s workload among its machines with minimal changes to the original code of the underlying spatial application. Extensive experimental evaluation using real and synthetic datasets illustrate that, on average, SWARM achieves 2 improvement in throughput over a static grid partitioning that is determined based on observing a limited history of the data and query workloads. Moreover, SWARM reduces execution latency on average 4 compared with the other technique. 
    more » « less
  2. Obeid, Iyad Selesnick (Ed.)
    Electroencephalography (EEG) is a popular clinical monitoring tool used for diagnosing brain-related disorders such as epilepsy [1]. As monitoring EEGs in a critical-care setting is an expensive and tedious task, there is a great interest in developing real-time EEG monitoring tools to improve patient care quality and efficiency [2]. However, clinicians require automatic seizure detection tools that provide decisions with at least 75% sensitivity and less than 1 false alarm (FA) per 24 hours [3]. Some commercial tools recently claim to reach such performance levels, including the Olympic Brainz Monitor [4] and Persyst 14 [5]. In this abstract, we describe our efforts to transform a high-performance offline seizure detection system [3] into a low latency real-time or online seizure detection system. An overview of the system is shown in Figure 1. The main difference between an online versus offline system is that an online system should always be causal and has minimum latency which is often defined by domain experts. The offline system, shown in Figure 2, uses two phases of deep learning models with postprocessing [3]. The channel-based long short term memory (LSTM) model (Phase 1 or P1) processes linear frequency cepstral coefficients (LFCC) [6] features from each EEG channel separately. We use the hypotheses generated by the P1 model and create additional features that carry information about the detected events and their confidence. The P2 model uses these additional features and the LFCC features to learn the temporal and spatial aspects of the EEG signals using a hybrid convolutional neural network (CNN) and LSTM model. Finally, Phase 3 aggregates the results from both P1 and P2 before applying a final postprocessing step. The online system implements Phase 1 by taking advantage of the Linux piping mechanism, multithreading techniques, and multi-core processors. To convert Phase 1 into an online system, we divide the system into five major modules: signal preprocessor, feature extractor, event decoder, postprocessor, and visualizer. The system reads 0.1-second frames from each EEG channel and sends them to the feature extractor and the visualizer. The feature extractor generates LFCC features in real time from the streaming EEG signal. Next, the system computes seizure and background probabilities using a channel-based LSTM model and applies a postprocessor to aggregate the detected events across channels. The system then displays the EEG signal and the decisions simultaneously using a visualization module. The online system uses C++, Python, TensorFlow, and PyQtGraph in its implementation. The online system accepts streamed EEG data sampled at 250 Hz as input. The system begins processing the EEG signal by applying a TCP montage [8]. Depending on the type of the montage, the EEG signal can have either 22 or 20 channels. To enable the online operation, we send 0.1-second (25 samples) length frames from each channel of the streamed EEG signal to the feature extractor and the visualizer. Feature extraction is performed sequentially on each channel. The signal preprocessor writes the sample frames into two streams to facilitate these modules. In the first stream, the feature extractor receives the signals using stdin. In parallel, as a second stream, the visualizer shares a user-defined file with the signal preprocessor. This user-defined file holds raw signal information as a buffer for the visualizer. The signal preprocessor writes into the file while the visualizer reads from it. Reading and writing into the same file poses a challenge. The visualizer can start reading while the signal preprocessor is writing into it. To resolve this issue, we utilize a file locking mechanism in the signal preprocessor and visualizer. Each of the processes temporarily locks the file, performs its operation, releases the lock, and tries to obtain the lock after a waiting period. The file locking mechanism ensures that only one process can access the file by prohibiting other processes from reading or writing while one process is modifying the file [9]. The feature extractor uses circular buffers to save 0.3 seconds or 75 samples from each channel for extracting 0.2-second or 50-sample long center-aligned windows. The module generates 8 absolute LFCC features where the zeroth cepstral coefficient is replaced by a temporal domain energy term. For extracting the rest of the features, three pipelines are used. The differential energy feature is calculated in a 0.9-second absolute feature window with a frame size of 0.1 seconds. The difference between the maximum and minimum temporal energy terms is calculated in this range. Then, the first derivative or the delta features are calculated using another 0.9-second window. Finally, the second derivative or delta-delta features are calculated using a 0.3-second window [6]. The differential energy for the delta-delta features is not included. In total, we extract 26 features from the raw sample windows which add 1.1 seconds of delay to the system. We used the Temple University Hospital Seizure Database (TUSZ) v1.2.1 for developing the online system [10]. The statistics for this dataset are shown in Table 1. A channel-based LSTM model was trained using the features derived from the train set using the online feature extractor module. A window-based normalization technique was applied to those features. In the offline model, we scale features by normalizing using the maximum absolute value of a channel [11] before applying a sliding window approach. Since the online system has access to a limited amount of data, we normalize based on the observed window. The model uses the feature vectors with a frame size of 1 second and a window size of 7 seconds. We evaluated the model using the offline P1 postprocessor to determine the efficacy of the delayed features and the window-based normalization technique. As shown by the results of experiments 1 and 4 in Table 2, these changes give us a comparable performance to the offline model. The online event decoder module utilizes this trained model for computing probabilities for the seizure and background classes. These posteriors are then postprocessed to remove spurious detections. The online postprocessor receives and saves 8 seconds of class posteriors in a buffer for further processing. It applies multiple heuristic filters (e.g., probability threshold) to make an overall decision by combining events across the channels. These filters evaluate the average confidence, the duration of a seizure, and the channels where the seizures were observed. The postprocessor delivers the label and confidence to the visualizer. The visualizer starts to display the signal as soon as it gets access to the signal file, as shown in Figure 1 using the “Signal File” and “Visualizer” blocks. Once the visualizer receives the label and confidence for the latest epoch from the postprocessor, it overlays the decision and color codes that epoch. The visualizer uses red for seizure with the label SEIZ and green for the background class with the label BCKG. Once the streaming finishes, the system saves three files: a signal file in which the sample frames are saved in the order they were streamed, a time segmented event (TSE) file with the overall decisions and confidences, and a hypotheses (HYP) file that saves the label and confidence for each epoch. The user can plot the signal and decisions using the signal and HYP files with only the visualizer by enabling appropriate options. For comparing the performance of different stages of development, we used the test set of TUSZ v1.2.1 database. It contains 1015 EEG records of varying duration. The any-overlap performance [12] of the overall system shown in Figure 2 is 40.29% sensitivity with 5.77 FAs per 24 hours. For comparison, the previous state-of-the-art model developed on this database performed at 30.71% sensitivity with 6.77 FAs per 24 hours [3]. The individual performances of the deep learning phases are as follows: Phase 1’s (P1) performance is 39.46% sensitivity and 11.62 FAs per 24 hours, and Phase 2 detects seizures with 41.16% sensitivity and 11.69 FAs per 24 hours. We trained an LSTM model with the delayed features and the window-based normalization technique for developing the online system. Using the offline decoder and postprocessor, the model performed at 36.23% sensitivity with 9.52 FAs per 24 hours. The trained model was then evaluated with the online modules. The current performance of the overall online system is 45.80% sensitivity with 28.14 FAs per 24 hours. Table 2 summarizes the performances of these systems. The performance of the online system deviates from the offline P1 model because the online postprocessor fails to combine the events as the seizure probability fluctuates during an event. The modules in the online system add a total of 11.1 seconds of delay for processing each second of the data, as shown in Figure 3. In practice, we also count the time for loading the model and starting the visualizer block. When we consider these facts, the system consumes 15 seconds to display the first hypothesis. The system detects seizure onsets with an average latency of 15 seconds. Implementing an automatic seizure detection model in real time is not trivial. We used a variety of techniques such as the file locking mechanism, multithreading, circular buffers, real-time event decoding, and signal-decision plotting to realize the system. A video demonstrating the system is available at: https://www.isip.piconepress.com/projects/nsf_pfi_tt/resources/videos/realtime_eeg_analysis/v2.5.1/video_2.5.1.mp4. The final conference submission will include a more detailed analysis of the online performance of each module. ACKNOWLEDGMENTS Research reported in this publication was most recently supported by the National Science Foundation Partnership for Innovation award number IIP-1827565 and the Pennsylvania Commonwealth Universal Research Enhancement Program (PA CURE). Any opinions, findings, and conclusions or recommendations expressed in this material are those of the author(s) and do not necessarily reflect the official views of any of these organizations. REFERENCES [1] A. Craik, Y. He, and J. L. Contreras-Vidal, “Deep learning for electroencephalogram (EEG) classification tasks: a review,” J. Neural Eng., vol. 16, no. 3, p. 031001, 2019. https://doi.org/10.1088/1741-2552/ab0ab5. [2] A. C. Bridi, T. Q. Louro, and R. C. L. Da Silva, “Clinical Alarms in intensive care: implications of alarm fatigue for the safety of patients,” Rev. Lat. Am. Enfermagem, vol. 22, no. 6, p. 1034, 2014. https://doi.org/10.1590/0104-1169.3488.2513. [3] M. Golmohammadi, V. Shah, I. Obeid, and J. Picone, “Deep Learning Approaches for Automatic Seizure Detection from Scalp Electroencephalograms,” in Signal Processing in Medicine and Biology: Emerging Trends in Research and Applications, 1st ed., I. Obeid, I. Selesnick, and J. Picone, Eds. New York, New York, USA: Springer, 2020, pp. 233–274. https://doi.org/10.1007/978-3-030-36844-9_8. [4] “CFM Olympic Brainz Monitor.” [Online]. Available: https://newborncare.natus.com/products-services/newborn-care-products/newborn-brain-injury/cfm-olympic-brainz-monitor. [Accessed: 17-Jul-2020]. [5] M. L. Scheuer, S. B. Wilson, A. Antony, G. Ghearing, A. Urban, and A. I. Bagic, “Seizure Detection: Interreader Agreement and Detection Algorithm Assessments Using a Large Dataset,” J. Clin. Neurophysiol., 2020. https://doi.org/10.1097/WNP.0000000000000709. [6] A. Harati, M. Golmohammadi, S. Lopez, I. Obeid, and J. Picone, “Improved EEG Event Classification Using Differential Energy,” in Proceedings of the IEEE Signal Processing in Medicine and Biology Symposium, 2015, pp. 1–4. https://doi.org/10.1109/SPMB.2015.7405421. [7] V. Shah, C. Campbell, I. Obeid, and J. Picone, “Improved Spatio-Temporal Modeling in Automated Seizure Detection using Channel-Dependent Posteriors,” Neurocomputing, 2021. [8] W. Tatum, A. Husain, S. Benbadis, and P. Kaplan, Handbook of EEG Interpretation. New York City, New York, USA: Demos Medical Publishing, 2007. [9] D. P. Bovet and C. Marco, Understanding the Linux Kernel, 3rd ed. O’Reilly Media, Inc., 2005. https://www.oreilly.com/library/view/understanding-the-linux/0596005652/. [10] V. Shah et al., “The Temple University Hospital Seizure Detection Corpus,” Front. Neuroinform., vol. 12, pp. 1–6, 2018. https://doi.org/10.3389/fninf.2018.00083. [11] F. Pedregosa et al., “Scikit-learn: Machine Learning in Python,” J. Mach. Learn. Res., vol. 12, pp. 2825–2830, 2011. https://dl.acm.org/doi/10.5555/1953048.2078195. [12] J. Gotman, D. Flanagan, J. Zhang, and B. Rosenblatt, “Automatic seizure detection in the newborn: Methods and initial evaluation,” Electroencephalogr. Clin. Neurophysiol., vol. 103, no. 3, pp. 356–362, 1997. https://doi.org/10.1016/S0013-4694(97)00003-9. 
    more » « less
  3. Investigating the nature of system intrusions in large distributed systems remains a notoriously difficult challenge. While monitoring tools (e.g., Firewalls, IDS) provide preliminary alerts through easy-to-use administrative interfaces, attack reconstruction still requires that administrators sift through gigabytes of system audit logs stored locally on hundreds of machines. At present, two fundamental obstacles prevent synergy between system-layer auditing and modern cluster monitoring tools: 1) the sheer volume of audit data generated in a data center is prohibitively costly to transmit to a central node, and 2) system- layer auditing poses a “needle-in-a-haystack” problem, such that hundreds of employee hours may be required to diagnose a single intrusion. This paper presents Winnower, a scalable system for audit-based cluster monitoring that addresses these challenges. Our key insight is that, for tasks that are replicated across nodes in a distributed application, a model can be defined over audit logs to succinctly summarize the behavior of many nodes, thus eliminating the need to transmit redundant audit records to a central monitoring node. Specifically, Winnower parses audit records into provenance graphs that describe the actions of individual nodes, then performs grammatical inference over individual graphs using a novel adaptation of Deterministic Finite Automata (DFA) Learning to produce a behavioral model of many nodes at once. This provenance model can be efficiently transmitted to a central node and used to identify anomalous events in the cluster. We have implemented Winnower for Docker Swarm container clusters and evaluate our system against real-world applications and attacks. We show that Winnower dramatically reduces storage and network overhead associated with aggregating system audit logs, by as much as 98%, without sacrificing the important information needed for attack investigation. Winnower thus represents a significant step forward for security monitoring in distributed systems. 
    more » « less
  4. Obeid, Iyad ; Selesnick, Ivan ; Picone, Joseph (Ed.)
    The goal of this work was to design a low-cost computing facility that can support the development of an open source digital pathology corpus containing 1M images [1]. A single image from a clinical-grade digital pathology scanner can range in size from hundreds of megabytes to five gigabytes. A 1M image database requires over a petabyte (PB) of disk space. To do meaningful work in this problem space requires a significant allocation of computing resources. The improvements and expansions to our HPC (highperformance computing) cluster, known as Neuronix [2], required to support working with digital pathology fall into two broad categories: computation and storage. To handle the increased computational burden and increase job throughput, we are using Slurm [3] as our scheduler and resource manager. For storage, we have designed and implemented a multi-layer filesystem architecture to distribute a filesystem across multiple machines. These enhancements, which are entirely based on open source software, have extended the capabilities of our cluster and increased its cost-effectiveness. Slurm has numerous features that allow it to generalize to a number of different scenarios. Among the most notable is its support for GPU (graphics processing unit) scheduling. GPUs can offer a tremendous performance increase in machine learning applications [4] and Slurm’s built-in mechanisms for handling them was a key factor in making this choice. Slurm has a general resource (GRES) mechanism that can be used to configure and enable support for resources beyond the ones provided by the traditional HPC scheduler (e.g. memory, wall-clock time), and GPUs are among the GRES types that can be supported by Slurm [5]. In addition to being able to track resources, Slurm does strict enforcement of resource allocation. This becomes very important as the computational demands of the jobs increase, so that they have all the resources they need, and that they don’t take resources from other jobs. It is a common practice among GPU-enabled frameworks to query the CUDA runtime library/drivers and iterate over the list of GPUs, attempting to establish a context on all of them. Slurm is able to affect the hardware discovery process of these jobs, which enables a number of these jobs to run alongside each other, even if the GPUs are in exclusive-process mode. To store large quantities of digital pathology slides, we developed a robust, extensible distributed storage solution. We utilized a number of open source tools to create a single filesystem, which can be mounted by any machine on the network. At the lowest layer of abstraction are the hard drives, which were split into 4 60-disk chassis, using 8TB drives. To support these disks, we have two server units, each equipped with Intel Xeon CPUs and 128GB of RAM. At the filesystem level, we have implemented a multi-layer solution that: (1) connects the disks together into a single filesystem/mountpoint using the ZFS (Zettabyte File System) [6], and (2) connects filesystems on multiple machines together to form a single mountpoint using Gluster [7]. ZFS, initially developed by Sun Microsystems, provides disk-level awareness and a filesystem which takes advantage of that awareness to provide fault tolerance. At the filesystem level, ZFS protects against data corruption and the infamous RAID write-hole bug by implementing a journaling scheme (the ZFS intent log, or ZIL) and copy-on-write functionality. Each machine (1 controller + 2 disk chassis) has its own separate ZFS filesystem. Gluster, essentially a meta-filesystem, takes each of these, and provides the means to connect them together over the network and using distributed (similar to RAID 0 but without striping individual files), and mirrored (similar to RAID 1) configurations [8]. By implementing these improvements, it has been possible to expand the storage and computational power of the Neuronix cluster arbitrarily to support the most computationally-intensive endeavors by scaling horizontally. We have greatly improved the scalability of the cluster while maintaining its excellent price/performance ratio [1]. 
    more » « less
  5. Due to the developments of topographic techniques, clear satellite imagery, and various means for collecting information, geospatial datasets are growing in volume, complexity, and heterogeneity. For efficient execution of spatial computations and analytics on large spatial data sets, parallel processing is required. To exploit fine-grained parallel processing in large scale compute clusters, partitioning in a load-balanced way is necessary for skewed datasets. In this work, we focus on spatial join operation where the inputs are two layers of geospatial data. Our partitioning method for spatial join uses Adaptive Partitioning (ADP) technique, which is based on Quadtree partitioning. Unlike existing partitioning techniques, ADP partitions the spatial join workload instead of partitioning the individual datasets separately to provide better load-balancing. Based on our experimental evaluation, ADP partitions spatial data in a more balanced way than Quadtree partitioning and Uniform grid partitioning. ADP uses an output-sensitive duplication avoidance technique which minimizes duplication of geometries that are not part of spatial join output. In a distributed memory environment, this technique can reduce data communication and storage requirements compared to traditional methods. To improve the performance of ADP, an MPI+Threads based parallelization is presented. With ParADP, a pair of real world datasets, one with 717 million polylines and another with 10 million polygons, is partitioned into 65,536 grid cells within 7 seconds. ParADP performs well with both good weak scaling up to 4,032 CPU cores and good strong scaling up to 4,032 CPU cores. 
    more » « less