dataframe - Spark SQL Column Manipulation

I have a Dataset which has below Below Cols .df.show();+--------+---------+---------+---------+---------+| Col1 | Col2 | Expend1 | Expend2 | Expend3 |+--------+---------+---------+---------+---------+| Value1 | Cvalue1 | 123 | 2254 | 22 || Value1 | Cvalue2 | 124 | 2255 | 23 |+--------+---------+---------+---------+---------+I want that to be changed to this below format using some joins or cube or any Operations.1. +--------+---------+------+ | Value1 | Cvalue1 | 123 | | Value1 | Cvalue1 | 2254 | | Valu...Read more

Read .tar.gz file in spark socket dataset

Reading .tar.gz file as file stream in spark streaming is well and good enough.but my requirement is, can we read .tar.gz file that is coming from socketserver in spark-streaming?when is post tar file in socket likenc -l 9990 < input.tar.gzi can read it in spark likeDataset<org.apache.spark.sql.Row> dataset = ss.readStream() .option("host", "localhost") .option("port", "9990") .format("socket") .load();if i try to store the result in csv formatdataset ...Read more

apache spark - PySpark: nested-splitting one column into multiple new ones

I have a network.log on Hadoop:{"Source":"Network","Detail":"Event=01|Device=Mobile|ClientIP=10.0.0.0|URL=example.com"}I want to load it as a data frame splitting Detail with |. Then I want to further split each new column using = with left part as the column names and right part as the values.The expected result would be:Source | Event | Device | ClientIP | URLNetwork | 01 | Mobile | 10.0.0.0 | example.comI've done the first split as follows:from pyspark import SparkContextfrom pyspark.sql import functions, SQLContextINPUT_PATH = 'network....Read more

apache spark - How to use UDF to return multiple columns?

Is it possible to create a UDF which would return the set of columns?I.e. having a data frame as follows:| Feature1 | Feature2 | Feature 3 || 1.3 | 3.4 | 4.5 |Now I would like to extract a new feature, which can be described as a vector of let's say two elements (e.g. as seen in a linear regression - slope and offset). Desired dataset shall look as follows:| Feature1 | Feature2 | Feature 3 | Slope | Offset || 1.3 | 3.4 | 4.5 | 0.5 | 3 |Is it possible to create multiple columns with single UDF or do I need ...Read more

groovy - how to make groovysh work with apache spark

I am using Apache Spark with Groovy successfully, however I have no luck using groovysh as an interactive spark shell.Groovy Shell (2.5.0-beta-3, JVM: 1.8.0_161)Type ':help' or ':h' for help.-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------groovy:000> :grab org.apache.spark:spark-sql_2.11:2.2.1groovy:000> import org.apache.spark.sql.*===> org.apache.spark.sql.*groovy:000> s...Read more

Dynamic Resource allocation in Spark-Yarn Cluster Mode

When i use the below setting to start the spark application (default is yarn-client mode) works finespark_memory_setting="--master yarn --conf spark.dynamicAllocation.enabled=true --conf spark.shuffle.service.enabled=true --conf spark.yarn.queue=ciqhigh --conf spark.dynamicAllocation.initialExecutors=50 --conf spark.dynamicAllocation.maxExecutors=50 --executor-memory 2G --driver-memory 4G"ISSUEWhereas when i change the deploy mode as cluster,application not starting up. Not even throwing any error to move on. spark_memory_setting="--master yarn...Read more

amazon s3 - Spark read s3 using sc.textFile("s3a://bucket/filePath"). java.lang.NoSuchMethodError: com.amazonaws.services.s3.transfer.TransferManager

I have add blew jars to spark/jars path.hadoop-aws-2.7.3.jaraws-java-sdk-s3-1.11.126.jaraws-java-sdk-core-1.11.126.jarspark-2.1.0In spark-shellscala> sc.hadoopConfiguration.set("fs.s3a.access.key", "***")scala> sc.hadoopConfiguration.set("fs.s3a.secret.key", "***")scala> val f = sc.textFile("s3a://bucket/README.md")scala> f.count java.lang.NoSuchMethodError: com.amazonaws.services.s3.transfer.TransferManager.(Lcom/amazonaws/services/s3/AmazonS3;Ljava/util/concurrent/ThreadPoolExecutor;)V at org.apache.hadoop.fs.s3a.S3AFileSystem...Read more

apache spark - How does Pyspark Calculate Doc2Vec from word2vec word embeddings?

I have a pyspark dataframe with a corpus of ~300k unique rows each with a "doc" that contains a few sentences of text in each.After processing, I have a 200 dimension vectorized representation of each row/doc. My NLP Process: Remove Punctuation with regex udf Word Stemming with nltk snowball udf)Pyspark TokenizerWord2Vec (ml.feature.Word2Vec, vectorSize=200, windowSize=5)I understand how this implementation uses the skipgram model to create embeddings for each word based on the full corpus used. My question is: How does this implementation go f...Read more

how to "normalize" vectors values when using Spark CountVectorizer?

CountVectorizer and CountVectorizerModel often creates a sparse feature vector that looks like this:(10,[0,1,4,6,8],[2.0,1.0,1.0,1.0,1.0])this basically says the total size of the vocabulary is 10, the current document has 5 unique elements, and in the feature vector, these 5 unique elements take position at 0, 1, 4, 6 and 8. Also, one of the elements show up twice, therefore the 2.0 value.Now, I would like to "normalize" the above feature vector and make it look like this,(10,[0,1,4,6,8],[0.3333,0.1667,0.1667,0.1667,0.1667])i.e., each value is...Read more

apache spark - sparklyr and standard evaluation (SE) based functions

I'm trying to write a function that performs and sdf_pivot() a creates a Spark DataFrame with column names that includes the name of the original variable or column.set.seed(80)df <- data.frame(id = c(1:5), var1 = sample(LETTERS[1:12], 5, replace = TRUE), var2 = sample(LETTERS[13:16], 5, replace = TRUE))ref <- copy_to(sc, df, "mytbl")glimpse(ref)Observations: 5Variables: 3$ id <int> 1, 2, 3, 4, 5$ var1 <chr> "F", "G", "J", "A", "H"$ var2 <chr> "M", "O", "O", "O", "O"This the expected res...Read more

apache spark - Features for sentiment analysis of twitter data related to music

Need some guidance related to sentiment analysis on tweets related to music on spark.I was trying to perform sentiment analysis on twitter data for tweets related to music. After a lot of searching around the net, I have understood how to fetch the tweets using 'tweepy' python api and also realized that I can use 'Naive Bayes classifier' to finally classify the tweets. Now I am confused regarding how to define features for this classification, I am supposed to define at least 500 features. So here are my questions. I do not want to use any alre...Read more

cassandra - Spark for eCommerce

I am building an online store or eCommerce platform. No additional functionality to the millions of sites out there already. I started to look at Spring, but the overhead in learning and the level of abstraction for me was too much. I found Spark and the online tutorials, though it a much more straight forward and coupled with Velovcity it has to date done as much as I have needed it to do so far. For the backend DB as part of the POC we are using Cassandra. Before going any further on the project I have some questions, you might be able to hel...Read more

apache spark - Google Cloud DataFlow for NRT data application

I'm evaluating Kafka/Spark/HDFS for developing NRT (sub sec) java application that receives data from an external gateway and publishes it to desktop/mobile clients (consumer) for various topics. At the same time the data will be fed through streaming and batching (persistent) pipelines for analytics and ML.For example the flow would be...A standalone TCP client reads streaming data from external TCP serverThe client publishes data for different topics based on the packets (Kafka) and passes it to the streaming pipeline for analytics (Spark)A d...Read more