Pyspark integration into Pycharm -


i'm bit lost on how confiure pycharm can directly run scripts within pyspark. i'm using pyspark ontop of elasticsearch cluster , using following code run script. it's running default python interpreter tried configure pyspark shell interpreter, didn't work error it's not valid sdk:

__author__ = 'lucas'   pyspark import sparkcontext, sparkconf  if __name__ == "__main__":      conf = sparkconf().setappname("estest")     sc = sparkcontext(conf=conf)      es_read_conf = {         "es.nodes" : "localhost",         "es.port" : "9200",         "es.resource" : "titanic/passenger"     }     es_rdd = sc.newapihadooprdd(         inputformatclass="org.elasticsearch.hadoop.mr.esinputformat",         keyclass="org.apache.hadoop.io.nullwritable",         valueclass="org.elasticsearch.hadoop.mr.linkedmapwritable",         conf=es_read_conf)      es_write_conf = {         "es.nodes" : "localhost",         "es.port" : "9200",         "es.resource" : "titanic/value_counts"     }      doc = es_rdd.first()[1]      field in doc:          value_counts = es_rdd.map(lambda item: item[1][field])         value_counts = value_counts.map(lambda word: (word, 1))         value_counts = value_counts.reducebykey(lambda a, b: a+b)         value_counts = value_counts.filter(lambda item: item[1] > 1)         value_counts = value_counts.map(lambda item: ('key', {             'field': field,             'val': item[0],             'count': item[1]         }))          value_counts.saveasnewapihadoopfile(             path='-',             outputformatclass="org.elasticsearch.hadoop.mr.esoutputformat",             keyclass="org.apache.hadoop.io.nullwritable",             valueclass="org.elasticsearch.hadoop.mr.linkedmapwritable",             conf=es_write_conf) 

but generates following stacktrace:

traceback (most recent call last):   file "/home/lucas/pycharmprojects/tweetspark/analytics/tweetanalyzer.py", line 20, in <module>     conf=es_read_conf)   file "/var/opt/spark/python/pyspark/context.py", line 601, in newapihadooprdd     jconf, batchsize)   file "/var/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__   file "/var/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value py4j.protocol.py4jjavaerror: error occurred while calling z:org.apache.spark.api.python.pythonrdd.newapihadooprdd. : java.lang.classnotfoundexception: org.elasticsearch.hadoop.mr.linkedmapwritable     @ java.net.urlclassloader$1.run(urlclassloader.java:366)     @ java.net.urlclassloader$1.run(urlclassloader.java:355)     @ java.security.accesscontroller.doprivileged(native method)     @ java.net.urlclassloader.findclass(urlclassloader.java:354)     @ java.lang.classloader.loadclass(classloader.java:425)     @ java.lang.classloader.loadclass(classloader.java:358)     @ java.lang.class.forname0(native method)     @ java.lang.class.forname(class.java:278)     @ org.apache.spark.util.utils$.classforname(utils.scala:179)     @ org.apache.spark.api.python.pythonrdd$.newapihadooprddfromclassnames(pythonrdd.scala:519)     @ org.apache.spark.api.python.pythonrdd$.newapihadooprdd(pythonrdd.scala:503)     @ org.apache.spark.api.python.pythonrdd.newapihadooprdd(pythonrdd.scala)     @ sun.reflect.nativemethodaccessorimpl.invoke0(native method)     @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:57)     @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43)     @ java.lang.reflect.method.invoke(method.java:606)     @ py4j.reflection.methodinvoker.invoke(methodinvoker.java:231)     @ py4j.reflection.reflectionengine.invoke(reflectionengine.java:379)     @ py4j.gateway.invoke(gateway.java:259)     @ py4j.commands.abstractcommand.invokemethod(abstractcommand.java:133)     @ py4j.commands.callcommand.execute(callcommand.java:79)     @ py4j.gatewayconnection.run(gatewayconnection.java:207)     @ java.lang.thread.run(thread.java:745) 

what missing elasticsearch-spark.jar. download elasticsearch-hadoop integration, find elasticsearch-spark under dist subdirectory, set spark_classpath environment variable

os.environ['spark_classpath'] = "/path/to/elasticsearch-hadoop-2.3.0/dist/elasticsearch-spark_2.10-2.3.0.jar" 

Comments

Popular posts from this blog

c - Bitwise operation with (signed) enum value -

xslt - Unnest parent nodes by child node -

python - Healpy: From Data to Healpix map -